Gaussian Mixture Model Clustering from Scratch Using Python

Gaussian mixture model (GMM) clustering is a complex alternative to k-means clustering. Compared to k-means, GMM assumes the data clusters are spherical or elliptical (instead of just spherical for k-means), and GMM gives you cluster membership pseudo-probabilities for each data item (instead of just cluster IDs).

I decided to implement GMM clustering from mostly scratch using Python. I say “mostly” because I used lots of built-in Python, NumPy, and SciPy functions. Most notably, I used the scipy.stats multivariate_normal.pdf() function, which is very complicated.

The process of implementing GMM clustering from scratch took me three full days. I started by looking at examples on the Internet but was distressed to see that all of the examples I found had significant errors. So, instead, I started with source information from a Stanford class lecture: web.stanford.edu/~lmackey/stats306b/doc/stats306b-spring14-lecture2_scribed.pdf.

The GMM implementation has several parts, and each is complex. The implementation depends on how the various data structures are set up: the source data, the membership probabilities, the membership coefficients, the cluster means, and the cluster covariances. I discovered that a significant problem when exploring GMM clustering is that the vocabulary varies wildly. For example, the membership coefficients are also called weights, pi values, theta values, and many other terms. The membership pseudo-probabilities are also called weights (again!), responsibilities, tau values, and many other terms.

The key equations, from the class lecture notes, are deceptively simple looking:

For my demo, I set up a small 10-item 2-D dummy dataset like so:

To check my from-scratch implementation, I ran the data through the scikit sklearn.mixture GaussianMixture module. The screenshot above shows I got the same results, after fiddling a bit with my random seed (used for initialization) and number of iterations.

The scikit GMM clustering function automatically stops iterating when the change in average log-likelihood from one iteration to the next drops below 1.0e-3. My from-scratch version loops a fixed number of iterations. You could examine how the internal self.probs result matrix changes (or not) in each iteration but naively stopping automatically based on that criteria doesn’t work well because in many cases the probs matrix doesn’t change for the first few iterations.

I did some experiments and determined that a reasonable auto-stopping scheme looks like:

. . . 
for iter in range(self.max_iter):
  oldLL = self.logLikelihood() // based on this.probs
  self.e_step(X, verbose) // update this.probs
  newLL = self.logLikelihood()
  if iter "gt" (self.maxIter // 2) and
    np.abs(newLL - oldLL) "lt" 1.0e-6:
    break
  self.m_step(X, verbose)

In words, if the change in average log-likelihood of the probs matrix is very small, and also the EM algorithm is at least halfway through its max iterations, then stop.

But clustering is an exploratory process and I recommend that you watch how the internal data structures change by using the verbose parameter in my from-scratch version.

If you have stumbled on this blog post looking for a from-scratch implementation, here you go. But be aware that it will take you many hours of dissecting the code if you want to understand exactly how GMM clustering works. I used low-level Python looping instead of higher-level array functions because I intend to use this code to create a from-scratch C# language version at some point.



Gaussian mixture models are linear combination of multivariate probability distributions. Mixed media art combines different art techniques. Here are three examples by artists whose work I admire. Left: By artist Daniel Arrhakis. Center: By artist Randy Monteith. Right: By artist Adam Eden.


Demo code:

# gmm_example.py
# Gaussian mixture clustering from (mostly) scratch Python
# uses low-level looping so can refactor to C#

# based on:
# https://web.stanford.edu/~lmackey/stats306b/
#   doc/stats306b-spring14-lecture2_scribed.pdf

import numpy as np
from scipy.stats import multivariate_normal

class GMM:
  def __init__(self, k, max_iter=5, seed=1):
    self.k = k  # aka n_components
    self.max_iter = max_iter
    self.rnd = np.random.RandomState(seed)

    self.N = 0  # number of data items TBD
    self.dim = 0  # data item dim TBD

    self.coefs = np.zeros(k)
    for j in range(k):
      self.coefs[j] = 1.0 / k

    self.means = None  # TBD
    self.covars = None  # TBD
    self.probs = None  # TB computed

  # ---------------------------------------------------------

  def predict_probs(self, X):
    result = np.zeros((self.N, self.k)) 

    for j in range(self.k):  # each component
      # very complex: use Cholesky decomp to compute PDF
      curr_dist = multivariate_normal(mean=self.means[j],
        cov=self.covars[j], allow_singular=True)
      for i in range(self.N):  # each data item
        result[i][j] = self.coefs[j] * curr_dist.pdf(X[i])
  
    for i in range(self.N):  # make each row sum to 1
      row_sum = 0.0
      for j in range(self.k):
        row_sum += result[i][j]
      for j in range(self.k):
        result[i][j] /= row_sum

    return result
    
  # ---------------------------------------------------------

  def e_step(self, X, verbose=False):
    # 1. update probabilities (result) using means and covars
    self.probs = self.predict_probs(X)  # by reference
    if verbose == True:
      print("\nnew probabilities = "); print(self.probs)

  # ---------------------------------------------------------
     
  def m_step(self, X, verbose=False):
    # 0. compute new prob column sums needed to update
    prob_sums = np.zeros(self.k)
    for i in range(self.N):
      for j in range(self.k):
        prob_sums[j] += self.probs[i][j]

    # 1. update mixture coefficients directly
    for j in range(self.k):
      self.coefs[j] = prob_sums[j] / self.N

    if verbose == True:
      print("\nnew coefficients: "); print(self.coefs)

    # 2. update means indiectly then copy
    # uj = S(i,n)[pij * xi] / S(i,n)[pij]

    new_means = np.zeros((self.k, self.dim))
    for j in range(self.k):  # each component
      for i in range(self.N):
        for d in range(self.dim): # each dimension of mean
          new_means[j][d] += X[i][d] * self.probs[i][j]

    for j in range(self.k):
      for d in range(self.dim):
        new_means[j][d] /= prob_sums[j]

    for row in range(self.k):
      for col in range(self.dim):
        self.means[row][col] = new_means[row][col]

    if verbose == True:
      print("\nnew means = "); print(self.means)

    # 3. update covariances
    # Cj = S(i,n)[pij * (xi-uj) * (xi-uj)T] / S(i,n)[pij]

    new_covars = np.zeros((self.k, self.dim, self.dim))
    for j in range(self.k):  # each component
      u = self.means[j]  # mean for this component
      for i in range(self.N):
        x = X[i]
        scale = self.probs[i][j]
        tmp = self.vec_vec_scale(x, u, scale) # 2x2 matrix
        # accumulate
        for row in range(self.dim):
          for col in range(self.dim):
            new_covars[j][row][col] += tmp[row][col]
 
      # divide curr covar by prob_sums
      for row in range(self.dim):
        for col in range(self.dim):
          new_covars[j][row][col] /= prob_sums[j]

      # condition the diagonal
      for row in range(self.dim):
        new_covars[j][row][row] += 1.0e-6

    # copy result
    for j in range(self.k):
      for row in range(self.dim):
        for col in range(self.dim):
          self.covars[j][row][col] = new_covars[j][row][col]
    if verbose == True:
      print("\nnew covars = "); print(self.covars)
 
  # ---------------------------------------------------------
  
  def vec_vec_scale(self, x, u, scale):
    # helper for m_step
    # (x-u) * (x-u)T * scale
    n = len(u)  # == self.dim
    x_minus_u = np.zeros((n))
    for i in range(n):
      x_minus_u[i] = x[i] - u[i]
    
    result = np.zeros((n,n))
    for i in range(n):
      for j in range(n):
        result[i][j] = x_minus_u[i] * x_minus_u[j] * scale
    return result

  # ---------------------------------------------------------

  def select_n(self, N, n):
    # pick n distinct from 0 to N-1
    idxs = np.zeros(N, dtype=np.int64)
    for i in range(N):
      idxs[i] = i
    self.rnd.shuffle(idxs)
    result = np.zeros(n, dtype=np.int64)
    for i in range(n):
      result[i] = idxs[i]
    return result

  # ---------------------------------------------------------

  def cluster(self, X, verbose=False):
    self.N = len(X)
    self.dim = len(X[0])

    # init means to self.k random data items
    idxs = self.select_n(self.N, self.k)
    self.means = np.zeros((self.k, self.dim))
    for j in range(self.k):
      idx = idxs[j]
      for d in range(self.dim):
        self.means[j][d] = X[idx][d]

    # init covars to (dim by dim) Identity matrices
    self.covars = np.zeros((self.k, self.dim, self.dim))
    for j in range(self.k):  # each component
      for d in range(self.dim):
        self.covars[j][d][d] = 1.0

    # init self.probs
    self.probs = np.zeros((self.N, self.k))
    for i in range(self.N):
      for j in range(self.k):
        self.probs[i][j] = 1.0 / self.k

    # use EM to compute self.probs
    for iter in range(self.max_iter):
      self.e_step(X, verbose)
      self.m_step(X, verbose)

  # ---------------------------------------------------------
    
  def predict_labels(self, X):
    probs = self.predict_probs(X)
    return np.argmax(probs, axis=1) # collapse cols, use rows

  # ---------------------------------------------------------

# -----------------------------------------------------------

def main():
  print("\nBegin GMM from scratch Python ")
  np.random.seed(0)
  np.set_printoptions(sign=" ", precision=4,
    floatmode="fixed", suppress=True)

  print("\nLoading data ")
  data_file = ".\\Data\\dummy_10.txt"
  # data_file = ".\\Data\\iris_train.txt"
  X = np.loadtxt(data_file, usecols = (0,1),
    delimiter = ",", comments="#", dtype=np.float64)
  print("Done ")

  # print("\ndata: ")
  # print(X)

  print("\n ----- scikit ----- ")

  from sklearn.mixture import GaussianMixture
  print("\nCreating scikit GMM model k=3 ")
  gmm = GaussianMixture(n_components=3, random_state=0,
    init_params='random')
  print("\nClustering ")
  gmm.fit(X)
  print("Done ")

  probs = gmm.predict_proba(X)
  labels = gmm.predict(X)

  n = gmm.n_iter_
  print("\nnumber iterations = " + str(n))

  print("\nFinal scikit clustering: ")
  for i in range(len(probs)):
    print(X[i], end="")
    print(" | ", end="")
    print(probs[i], end = "")
    print(" | ", end="")
    print(labels[i])

  print("\n ------------------ ")

  mi = 100
  print("\nCreating scratch GMM model k=3 ")
  gmm = GMM(k=3, max_iter=mi, seed=4)

  print("\nClustering ")
  gmm.cluster(X)
  print("Done ")

  probs = gmm.predict_probs(X)
  labels = gmm.predict_labels(X)

  print("\nFinal from-scratch clustering: ")
  for i in range(len(probs)):
    print(X[i], end="")
    print(" | ", end="")
    print(probs[i], end = "")
    print(" | ", end="")
    print(labels[i])
 
  print("\nEnd demo ")

if __name__ == "__main__":
  main()

The dummy data:

# dummy_10.txt
0.01, 0.10
0.02, 0.09
0.03, 0.10
0.04, 0.06
0.05, 0.06
0.06, 0.05
0.07, 0.05
0.08, 0.01
0.09, 0.02
0.10, 0.01
This entry was posted in Machine Learning. Bookmark the permalink.