k-means 算法基础原理

本文注重对 k-means 算法的实现作出分析,基础原理不作赘述。

k-means 算法的特点

1. k-means 算法的基本概念:

  • k 代表分为几个簇
  • means 表示寻求新质心点的时候采用求均值的方法

2. k-means 算法的特点:

  • k-means 算法与 knn 算法类似,都是需要“聚类”
  • k-means 算法是无监督学习算法的一种

k-means 算法的实现步骤:

  1. 随机选取 K 个对象,并以它们为质心;
  2. 计算数据集样本点到质心的距离;
  3. 根据样本点距离质心的距离将其分簇(类),距离哪个近,划分到哪个簇(类);
  4. 以簇内所有样本点的均值重新计算质心,,然后重复第二步,直到划分的簇(类)不在变化后停止。

k-means 算法代码实现

模块的导入与 K 均值聚类算法类的定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import random

import numpy as np
from matplotlib import pyplot as plt


class KMeans:
    """
    K-means clustering(K均值聚类)
    """

    def __init__(self, n_clusters: int, iterations=100, eps=1e-3):
        """
        Args:
            n_clusters (int): 聚类类别数.
            iterations (int, optional): 迭代次数, 默认为100.
            eps (float, optional): 中心点最小更新量, 默认为1e-3.
        """
        self.n_clusters, self.iterations, self.eps, self.centers = n_clusters, iterations, eps, None

    def fit(self, X: np.ndarray):
        """
        Args:
            X (np.ndarray): 输入
        """
        # 随机选择k个点作为中心点
        self.centers = X[random.sample(range(len(X)), self.n_clusters)] # range(len(X))返回一个迭代对象 random.sample截取一段数字返回一个列表

        for _ in range(self.iterations):
            y_pred = self(X)

            # 各类别的均值作为新的中心点,
            centers = np.stack(
                [
                    # 存在元素属于类别i则计算类别i所有点的均值,否则随机选择一个点作为类别i的均值
                    np.mean(X[y_pred == i], axis=0) if np.any(y_pred == i) else random.choice(X) for i in range(self.n_clusters)  # np.any()用于判读真假,
                ]
            )

            # 中心点最大更新值小于eps则停止迭代
            if np.abs(self.centers - centers).max() < self.eps:
                break

            # 将更新后的均值作为各类别中心点
            self.centers = centers

    def __call__(self, X: np.ndarray):
        return np.array([np.argmin(np.linalg.norm(self.centers - x, axis=1)) for x in X])  # 每一点类别为最近的中心点类别,返回下标

随机生成测试集

1
2
3
4
def load_data(n_samples_per_class=200, n_classes=5):
    X = np.concatenate([np.random.randn(n_samples_per_class, 2) + 3 * np.random.randn(2) for _ in range(n_classes)])
    y = np.concatenate([np.full(n_samples_per_class, label) for label in range(n_classes)])
    return X, y

画出图像,实现聚类算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
if __name__ == "__main__":
    n_classes = 5
    X, y = load_data(n_classes=n_classes)

    plt.figure(figsize=[12, 6])
    plt.subplot(1, 2, 1)
    plt.title("Ground Truth")
    for label in range(n_classes):
        plt.scatter(X[y == label, 0], X[y == label, 1], marker=".")

    kmeans = KMeans(n_clusters=n_classes)
    kmeans.fit(X)
    y_pred = kmeans(X)

    plt.subplot(1, 2, 2)
    plt.title("Clustering")
    for label in range(n_classes):
        plt.scatter(X[y_pred == label, 0], X[y_pred == label, 1], marker=".")

    plt.scatter(kmeans.centers[:, 0], kmeans.centers[:, 1], marker="*")

    plt.show()

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
#!/user/bin/env python3
# -*- coding: utf-8 -*-

import random

import numpy as np
from matplotlib import pyplot as plt


class KMeans:
    """
    K-means clustering(K均值聚类)
    """

    def __init__(self, n_clusters: int, iterations=100, eps=1e-3):
        """
        Args:
            n_clusters (int): 聚类类别数.
            iterations (int, optional): 迭代次数, 默认为100.
            eps (float, optional): 中心点最小更新量, 默认为1e-3.
        """
        self.n_clusters, self.iterations, self.eps, self.centers = n_clusters, iterations, eps, None

    def fit(self, X: np.ndarray):
        """
        Args:
            X (np.ndarray): 输入
        """
        # 随机选择k个点作为中心点
        self.centers = X[random.sample(range(len(X)), self.n_clusters)] # range(len(X))返回一个迭代对象 random.sample截取一段数字返回一个列表

        for _ in range(self.iterations):
            y_pred = self(X)

            # 各类别的均值作为新的中心点,
            centers = np.stack(
                [
                    # 存在元素属于类别i则计算类别i所有点的均值,否则随机选择一个点作为类别i的均值
                    np.mean(X[y_pred == i], axis=0) if np.any(y_pred == i) else random.choice(X) for i in range(self.n_clusters)  # np.any()用于判读真假,
                ]
            )

            # 中心点最大更新值小于eps则停止迭代
            if np.abs(self.centers - centers).max() < self.eps:
                break

            # 将更新后的均值作为各类别中心点
            self.centers = centers

    def __call__(self, X: np.ndarray):
        return np.array([np.argmin(np.linalg.norm(self.centers - x, axis=1)) for x in X])  # 每一点类别为最近的中心点类别,返回下标


def load_data(n_samples_per_class=200, n_classes=5):
    X = np.concatenate([np.random.randn(n_samples_per_class, 2) + 3 * np.random.randn(2) for _ in range(n_classes)])
    y = np.concatenate([np.full(n_samples_per_class, label) for label in range(n_classes)])
    return X, y


if __name__ == "__main__":
    n_classes = 5
    X, y = load_data(n_classes=n_classes)

    plt.figure(figsize=[12, 6])
    plt.subplot(1, 2, 1)
    plt.title("Ground Truth")
    for label in range(n_classes):
        plt.scatter(X[y == label, 0], X[y == label, 1], marker=".")

    kmeans = KMeans(n_clusters=n_classes)
    kmeans.fit(X)
    y_pred = kmeans(X)

    plt.subplot(1, 2, 2)
    plt.title("Clustering")
    for label in range(n_classes):
        plt.scatter(X[y_pred == label, 0], X[y_pred == label, 1], marker=".")

    plt.scatter(kmeans.centers[:, 0], kmeans.centers[:, 1], marker="*")

    plt.show()

源码参考自 https://github.com/luokn/ml/blob/master/src/kmeans.py