Today I use spark to deal with my prepared article embedding dataset. After solving some problems, I wrote down the process(this article is still incomplete now). Next step I will introduce some visualizations in this article(using matplotlib).
My data stored in a text file is formatted as follows:
{# of record, [v_d1, v_d2, ... , v_dn]}
di
for thei
-th dimmension.
KMeans algorithm
First we import some used modules:
from pyspark.mllib.clustering import KMeans, KMeansModel
from pyspark.mllib.linalg import DenseVector
from pyspark.mllib.linalg import SparseVector
from numpy import array
Then we preprocess the source data:
data=sc.textFile("/Users/pluto/data/notes_vector.csv")
def parseline(line):
temp = re.sub(r'\[|\]','', line).split(',')
return SparseVector(len(temp)-1, [(idx, float(value)) for idx, value in enumerate(temp[1:])])
parsedData = data.map(lambda line: parseline(line))
Our data preprocess is done. Use train
method to fit the data.
clusters = KMeans.train(parsedData, 100, maxIterations=10, runs=10, initializationMode="random")
explanation of parameters:
- data
- training points stored as RDD[Array[Double}
- k
- number of clusters
- maxIterations
- max number of iterations
- runs
- number of parallel runs, defaults to 1. The best model is returned.
- initializationMode
- initialization model, either "random" or "k-means||" (default).
To evaluate the performance of the model, we need a error or loss function. The idea is to minimise the total euclidean distance between each data point and the mean centre point assigned to itself.
We define an error function as follows:
def error(point):
center = clusters.centers[clusters.predict(point)]
denseCenter = DenseVector(numpy.ndarray.tolist(center))
return sqrt(sum([x**2 for x in (DenseVector(point.toArray()) - denseCenter)]))
Calculate total errors:
WSSSE = parsedData.map(lambda point: error(point)).reduce(lambda x, y: x + y)
Here the toArray
is method for SparseVector. We make them consistent for vector subtraction.
Gaussian Mixture
A Gaussian Mixture Model represents a composite distribution whereby points are drawn from one of k Gaussian sub-distributions, each with its own probability. The MLlib implementation uses the expectation-maximization algorithm to induce the maximum-likelihood model given a set of samples. The implementation has the following parameters:
- k is the number of desired clusters.
- convergenceTol is the maximum change in log-likelihood at which we consider convergence achieved.
- maxIterations is the maximum number of iterations to perform without reaching convergence.
- initialModel is an optional starting point from which to start the EM algorithm. If this parameter is omitted, a random starting point will be constructed from the data.
gmm = GaussianMixture.train(parsedData, 100)
for i in range(100):
print ("weight = ", gmm.weights[i], "mu = ", gmm.gaussians[i].mu,
"sigma = ", gmm.gaussians[i].sigma.toArray())
Some traps
I figured it out. My indices parameters for the sparse vector are messed up. It is a good learning
for me:
When use the Vectors.sparse(int size, int[] indices, double[] values) to generate a vector,
size is the size of the whole vector, not just the size of the elements with value. The indices
array will need to be in ascending order. In many cases, it probably easier to use other two
forms of Vectors.sparse functions if the indices and value positions are not naturally sorted.
-Yao
Subject: KMeans - java.lang.IllegalArgumentException: requirement failed
I am trying to train a KMeans model with sparse vector with Spark 1.0.1.
When I run the training I got the following exception:
java.lang.IllegalArgumentException: requirement failed
at scala.Predef$.require(Predef.scala:221)
at org.apache.spark.mllib.util.MLUtils$.fastSquaredDistance(MLUtils.scala:271)
at org.apache.spark.mllib.clustering.KMeans$.fastSquaredDistance(KMeans.scala:398)
at org.apache.spark.mllib.clustering.KMeans$$anonfun$findClosest$1.apply(KMeans.scala:372)
at org.apache.spark.mllib.clustering.KMeans$$anonfun$findClosest$1.apply(KMeans.scala:366)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.mllib.clustering.KMeans$.findClosest(KMeans.scala:366)
at org.apache.spark.mllib.clustering.KMeans$.pointCost(KMeans.scala:389)
at org.apache.spark.mllib.clustering.KMeans$$anonfun$17$$anonfun$apply$7.apply(KMeans.scala:269)
at org.apache.spark.mllib.clustering.KMeans$$anonfun$17$$anonfun$apply$7.apply(KMeans.scala:268)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.Range.foreach(Range.scala:141)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.mllib.clustering.KMeans$$anonfun$17.apply(KMeans.scala:268)
at org.apache.spark.mllib.clustering.KMeans$$anonfun$17.apply(KMeans.scala:267)
What does this means? How do I troubleshoot this problem?
Thanks.
-Yao