聚類算法及聚類評估 Silhouette 簡介
聚類算法簡介
聚類(clustering)是屬于無監(jiān)督學(xué)習(xí)(Unsupervised learning)的一種,用來把一組數(shù)據(jù)劃分為幾類,每類中的數(shù)據(jù)盡可能的相似,而不同類之間盡可能的差異最大化。通過聚類,可以為樣本選取提供參考,或進行根源分析,或作為其它算法的預(yù)處理步驟。
聚類算法中,最經(jīng)典的要屬于 Kmeans 算法,它的基本思想是:假設(shè)我們要把一組數(shù)據(jù)聚成 N 類,那就:
把數(shù)據(jù)中的每個樣本作為一個向量,記作Ā
首先隨機選取 n 個樣本,把這 n 個樣本作為 N 類的中心點, 稱為 centroid
針對數(shù)據(jù)中的所有樣本,計算到 n 個 centroid 的距離,距離哪個中心點最近,就屬于哪一類
在每一類中,重新選取 centroid,假設(shè)該類有 k 個樣本,則 centroid 為
重復(fù) 2,3 直到 centroid 的變化小于預(yù)設(shè)的值。
Mahout 是一個開源的機器學(xué)習(xí)軟件,提供了應(yīng)用推薦、聚類、分類、Logistic 回歸分析等算法。特別是由于結(jié)合了 Hadoop 的大數(shù)據(jù)處理能力,每個算法都可以作為獨立的 job 方便的部署在 Hadoop 平臺上,因此得到了越來越廣的應(yīng)用。在聚類領(lǐng)域,Mahout 提供了 Kmeans,LDA, Canopy 等多種算法。
聚類評估算法 Silhouette 簡介
在 Kmeans 中,我們會注意到需要我們預(yù)先設(shè)置聚合成幾類。實際上,在聚類的過程中我們也不可能預(yù)先知道,那只能分成 2 類,3 類,……n 類這樣進行嘗試,并評估每次的聚類效果。
實際上,由于聚類的無監(jiān)督學(xué)習(xí)特性,無論什么算法都需要評估效果。在聚類的評估中,有基于外部數(shù)據(jù)的評價,也有單純的基于聚類本身的評價,其基本思想就是:在同一類中,各個數(shù)據(jù)點越近越好,并且和類外的數(shù)據(jù)點越遠(yuǎn)越好;前者稱為內(nèi)聚因子(cohension),后者稱為離散因子(separation)。
把這兩者結(jié)合起來,就形成了評價聚類效果的 Silhouette 因子:
首先看如何評價一個點的聚類效果:
a = 一個點到同一聚類內(nèi)其它點的平均距離
b=min(一個點到其他聚類內(nèi)的點的平均距離)
Silhouette 因子s = 1 – a/b (a<b) 或b/a -1 (a>=b)
衡量整體聚類的效果,則是所有點的 Silhouette 因子的平均值。范圍應(yīng)該在 (-1,1), 值越大則說明聚類效果越好。
圖 1.Silhouette 中內(nèi)聚、離散因子示意
以圖 1 為例。圖 1 顯示的是一個具有 9 個點的聚類,三個圓形表示聚成了三類,其中的黃點表示質(zhì)心(centroid)。為了評估圖 1 中深藍(lán)色點的聚類效果,其內(nèi)聚因子a就是該點到所在圓中其它三個點的平均距離。離散因子b的計算相對復(fù)雜:我們需要先求出到該點到右上角圓中的三個點的平均距離,記為 b1;然后求出該點到右下角圓中兩個點的平均距離,記為 b2;b1 和 b2 的較小值則為b。
在 IBM 的 SPSS Clementine 中,也有 Silhouett 評估算法的實現(xiàn),不過 IBM 提供的是一個簡化版本,把一個點到一個類內(nèi)的距離的平均值,簡化為到該類質(zhì)心(centroid)的距離,具體來說,就是:
圖 2.IBM 關(guān)于內(nèi)聚、離散因子的簡化實現(xiàn)
還是以上面描述的 9 個點聚成 3 類的例子來說明。IBM 的實現(xiàn)把a的實現(xiàn)簡化為到深藍(lán)色的點所在的質(zhì)心的距離。計算b時候,還是要先計算 b1 和 b2,然后求最小值。但 b1 簡化為到右上角圓質(zhì)心的距離;b2 簡化為到右下角圓質(zhì)心的距離。
在下面的內(nèi)容中,我們嘗試?yán)?IBM 簡化后的公式為 Mahout 增加聚類評估功能。
Mahout 聚類過程分析
Mahout 運行環(huán)境簡介
前面說過,Mahout 是依賴 Hadoop 環(huán)境,每一個算法或輔助功能都是作為 Hadoop 的一個單獨的 job 來運行,所以必須準(zhǔn)備好一個可運行的 Hadoop 環(huán)境,(至少本文寫作時候使用的 Mahout0.9 還在依賴 Hadoop),如何安裝配置一個可運行的 Hadoop 環(huán)境不在這篇文章的介紹范圍內(nèi)。請自行參考 Hadoop 網(wǎng)站。需要說明的是,本文采用的 Hadoop 為 2.2.0。
安裝完 Hadoop 后,下載 mahout-distribution-0.9,解壓縮后的重要內(nèi)容如下:
bin/: 目錄下有 Mahout 可執(zhí)行腳本
mahout-examples-0.9-job.jar,各種算法的實現(xiàn)類
example/ 各種實現(xiàn)算法的源碼
conf/ 存放各實現(xiàn)類的配置文件,其中重要的為 driver.classes.default.props,如果增加實現(xiàn)算法類,可以在該文件中增加配置項,從而可以被 Mahout 啟動腳本調(diào)用。
單獨執(zhí)行 Mahout,是一個實現(xiàn)的各種功能的簡介,如下例:
執(zhí)行 /data01/shanlei/src/mahout-distribution-0.9/bin/mahout
輸出:
MAHOUT_LOCAL is not set; adding
HADOOP_CONF_DIR to classpath.
Running on hadoop, using /data01/shanlei/hadoop-2.2.0/bin/hadoop and
HADOOP_CONF_DIR=/data01/shanlei/hadoop-2.2.0/conf
p1 is org.apache.mahout.driver.MahoutDriver
MAHOUT-JOB:
/data01/shanlei/src/mahout-distribution-0.9/examples/target/mahout-examples-0.9-job.jar
An example program must be given as the first argument.
Valid program names are:
arff.vector: : Generate Vectors from an ARFF file or directory
assesser: : assesse cluster result using silhoueter algorithm
baumwelch: : Baum-Welch algorithm for unsupervised HMM training
canopy: : Canopy clustering
cat: : Print a file or resource as the logistic regression models would see it
cleansvd: : Cleanup and verification of SVD output
clusterdump: : Dump cluster output to text
clusterpp: : Groups Clustering Output In Clusters
cmdump: : Dump confusion matrix in HTML or text formats
concatmatrices: : Concatenates 2 matrices of same cardinality into a single matrix
……
如果要執(zhí)行某種算法,如上面結(jié)果中顯示的 canopy,就需要執(zhí)行 mahout canopy 加上該算法需要的其它參數(shù)。
另外,Mahout 算法的輸入輸出,都是在 Hadoop HDFS 上,因此需要通過 hdfs 命令上傳到 hdfs 文件系統(tǒng);輸出大多為 Mahout 特有的二進制格式,需要通過 mahout seqdumper 等命令來導(dǎo)出并轉(zhuǎn)換為可讀文本。
準(zhǔn)備輸入
Mahout 算法使用的 input 需要特定格式的 Vector 文件,不能夠直接使用一般的文本文件,因此需要把文本轉(zhuǎn)換為 Vector 文件,好在 Mahout 自身提供了這樣的類:
org.apache.mahout.clustering.conversion.InputDriver。
在 Mahout 的 conf 目錄中的 driver.classes.default.props 增加如下行:
org.apache.mahout.clustering.conversion.InputDriver = input2Seq : create sequence file from blank separated files,然后就可以為 Mahout 增加一個功能,把空格分隔的文本文件轉(zhuǎn)換為 Mahout 聚類可以使用的向量。
如下面的數(shù)據(jù)所示,該數(shù)據(jù)每行為一個包含 6 個屬性的向量:
1 4 3 11 4 3
2 2 5 2 10 3
1 1 2 2 10 1
1 4 2 11 5 4
1 1 3 2 10 1
2 4 5 9 5 2
2 6 5 3 8 1
執(zhí)行 ./mahout input2Seq -i /shanlei/userEnum -o /shanlei/vectors 則產(chǎn)生聚類需要的向量文件。
聚類
以 Kmeans 聚類為例:
./mahout kmeans --input /shanlei/vectors --output /shanlei/kmeans -c /shanlei/k --maxIter 5 -k 8 –cl
-k 8 指明產(chǎn)生 8 類,執(zhí)行完成后,在/shanlei/kmeans/下會產(chǎn)生: clusters-0,clusters-1,… …,clusters-n-final 目錄,每個目錄都是一次迭代產(chǎn)生的 centroids, 目錄數(shù)會受 --maxIter 控制;最后的結(jié)果會加上 final。
利用 Mahout 的 clusterdump 功能我們可以查看聚類的結(jié)果:
./mahout clusterdump -i /shanlei/kmeans/clusters-2-final -o ./centroids.txt
more centroids.txt:
VL-869{n=49 c=[1.163, 5.082, 4.000,
4.000, 4.592, 2.429] r=[0.370, 0.965, 1.030, 1.245, 1.244, 1.161]}
VL-949{n=201 c=[1.229, 4.458, 4.403,
10.040, 6.134, 1.458] r=[0.420, 1.079, 0.836, 1.196, 1.392, 0.852]}
… …
VL-980{n=146 c=[1.281, 2.000, 4.178,
2.158, 9.911, 1.918] r=[0.449, 0.712, 1.203, 0.570, 0.437, 1.208]}
VL-869 中的 869 為該類的 id,c=[1.163, 5.082, 4.000, 4.000, 4.592, 2.429] 為 centroid 的坐標(biāo),n=49 表示該類中數(shù)據(jù)點的個數(shù)。
如果使用-cl 參數(shù),則在/shanlei/kmeans/下會產(chǎn)生 clusteredPoints,利用 Mahout 的 seqdumper 可以看其內(nèi)容:
Input Path:
hdfs://rac122:18020/shanlei/kmeans/clusteredPoints/part-m-00000
Key class: class
org.apache.hadoop.io.IntWritable Value Class: class
org.apache.mahout.clustering.classify.WeightedPropertyVectorWri
table
Key: 301: Value: wt: 1.0 distance:
6.6834472852629006 vec: 1 = [1.000, 4.000, 3.000, 11.000, 4.000, 3.000]
Key: 980: Value: wt: 1.0 distance:
2.3966504034528384 vec: 2 = [2.000, 2.000, 5.000, 2.000, 10.000, 3.000]
Key 對應(yīng)的則是相關(guān)聚類的 id,distance 為到 centroid 的距離。vec 則是原始的向量。
從 Mahout 的聚類輸出結(jié)果來看,能夠很容易的實現(xiàn) IBM 簡化后的 Silhouette 算法,內(nèi)聚因子 (a) 可以簡單的獲取到,而離散因子 (b) 也能夠簡單的計算實現(xiàn)。下面我們就來設(shè)計 Mahout 中的實現(xiàn)。
Mahout 中 Silhouette 實現(xiàn)
算法設(shè)計:
遵循 Hadoop 上 MR 程序的設(shè)計原則,算法設(shè)計考慮了 mapper,reducer 及 combiner 類。
Mapper 設(shè)計:
輸入目錄:聚類的最終結(jié)果目錄 clusteredPoints(通過命令行參數(shù)-i 設(shè)置),
輸入:
Key:IntWritable,Value:WeightedPropertyVectorWritable
輸出:
Key:IntWritable(無意義,常量 1),Value:Text(單個點的 Silhouette 值,格式為“cnt,Silhouette 值”)
Setup 過程:
因為需要計算 separation 時候要訪問其它的 centroids,所以在 setup 中讀?。ㄍㄟ^命令行參數(shù)-c 設(shè)置)并緩存。
Map 過程:
由于輸入的 Value 為 WeightedPropertyVectorWritable,可以通過訪問字段 distance獲得參數(shù) a,并遍歷緩存的 centroids,針對其 id 不等于 Key 的,逐一計算距離,其最小的就是參數(shù) b。
Map 的結(jié)果 Key 使用常量 1,Value 為形如“1,0.23”這樣的“cnt,Silhouette 值”格式。
Reducer 設(shè)計:
輸入:
Key:IntWritable(常量 1),Value: Text (combine 后的中間 Silhouette 值,格式為“cnt,Silhouette 值”)。
輸出:
Key:IntWritable(常量 1),Value:整個聚類的 Silhouette 值,格式為“cnt,Silhouette 值”。
輸出目錄:最終文件的產(chǎn)生目錄,通過命令行參數(shù)-o 設(shè)置。
Reduce 過程:
根據(jù)“,”把每個 Value,分解為 cnt,和 Silhouette,最后進行加權(quán)平均。
Combiner 設(shè)計:
為減少數(shù)據(jù)的 copy,采用 combiner,其實現(xiàn)即為 reducer 的實現(xiàn)。
實現(xiàn)代碼:
Mapper 類:
public class AssesserMapper extends Mapper<IntWritable,
WeightedPropertyVectorWritable, IntWritable, Text> {
private List<Cluster> clusterModels;
private static final Logger log = LoggerFactory.getLogger(ClusterAssesser.class);
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
Configuration conf = context.getConfiguration();
String clustersIn = conf.get(ClusterClassificationConfigKeys.CLUSTERS_IN);
clusterModels = Lists.newArrayList();
if (clustersIn != null && !clustersIn.isEmpty()) {
Path clustersInPath = new Path(clustersIn);
clusterModels = populateClusterModels(clustersInPath, conf);
}
}
private static List<Cluster> populateClusterModels(Path clustersIn,
Configuration conf) throws IOException {
List<Cluster> clusterModels = Lists.newArrayList();
Path finalClustersPath = finalClustersPath(conf, clustersIn);
Iterator<?> it = new SequenceFileDirValueIterator<Writable>(finalClustersPath, PathType.LIST,
PathFilters.partFilter(), null, false, conf);
while (it.hasNext()) {
ClusterWritable next = (ClusterWritable) it.next();
Cluster cluster = next.getValue();
cluster.configure(conf);
clusterModels.add(cluster);
}
return clusterModels;
}
private static Path finalClustersPath(Configuration conf,
Path clusterOutputPath) throws IOException {
FileSystem fileSystem = clusterOutputPath.getFileSystem(conf);
FileStatus[] clusterFiles = fileSystem.listStatus(clusterOutputPath,
PathFilters.finalPartFilter());
log.info("files: {}", clusterOutputPath.toString());
return clusterFiles[0].getPath();
}
protected void map(IntWritable key, WeightedPropertyVectorWritable vw, Context context)
throws IOException, InterruptedException {
int clusterId=key.get();
double cohension,separation=-1,silhouete;
Map<Text,Text> props=vw.getProperties();
cohension=Float.valueOf(props.get(new Text("distance")).toString());
Vector vector = vw.getVector();
for ( Cluster centroid : clusterModels) {
if (centroid.getId()!=clusterId) {
DistanceMeasureCluster distanceMeasureCluster = (DistanceMeasureCluster) centroid;
DistanceMeasure distanceMeasure = distanceMeasureCluster.getMeasure();
double f = distanceMeasure.distance(centroid.getCenter(), vector);
if (f<separation || separation<-0.5) separation=f;
}
}
Text value=new Text(Long.toString(1)+","+Double.toString(silhouete));
IntWritable okey=new IntWritable();
okey.set(1);
context.write(okey, value);
}
}
Reducer 類:
public class AssesserReducer extends Reducer<IntWritable, Text, IntWritable, Text> {
private static final Logger log = LoggerFactory.getLogger(ClusterAssesser.class);
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
log.info("reducer");
}
private static final Pattern SEPARATOR = Pattern.compile("[t,]");
public void reduce(IntWritable key, Iterable<Text> values,
Context context)
throws IOException, InterruptedException {
long cnt=0;
double total=0;
for (Text value : values) {
String[] p=SEPARATOR.split(value.toString());
Long itemCnt=Long.parseLong(p[0]);
double v=Double.parseDouble(p[1]);
total=total+ itemCnt*v;
cnt=cnt+itemCnt;
}
}
}
Job 類:
public class ClusterAssesser extends AbstractJob {
private ClusterAssesser() {
}
public int run(String[] args) throws Exception {
addInputOption();
addOutputOption();
//addOption(DefaultOptionCreator.methodOption().create());
addOption(DefaultOptionCreator.clustersInOption()
.withDescription("The input centroids").create());
if (parseArguments(args) == null) {
return -1;
}
Path input = getInputPath();
Path output = getOutputPath();
Path clustersIn = new Path(getOption(DefaultOptionCreator.CLUSTERS_IN_OPTION));
if (getConf() == null) {
setConf(new Configuration());
}
run(getConf(), input, clustersIn, output);
return 0;
}
private void run(Configuration conf, Path input, Path clustersIn,
Path output)throws IOException, InterruptedException,
ClassNotFoundException {
conf.set(ClusterClassificationConfigKeys.CLUSTERS_IN, clustersIn.toUri().toString());
Job job = new Job(conf, "Cluster Assesser using silhouete over input: " + input);
job.setJarByClass(ClusterAssesser.class);
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setMapperClass(AssesserMapper.class);
job.setCombinerClass(AssesserReducer.class);
job.setReducerClass(AssesserReducer.class);
job.setNumReduceTasks(1);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, input);
FileOutputFormat.setOutputPath(job, output);
if (!job.waitForCompletion(true)) {
throw new InterruptedException("Cluster Assesser Job failed processing " + input);
}
}
private static final Logger log = LoggerFactory.getLogger(ClusterAssesser.class);
public static void main(String[] args) throws Exception {
ToolRunner.run(new Configuration(), new ClusterAssesser(), args);
}
}
編譯運行:
編譯環(huán)境準(zhǔn)備:
在從 Mahout 網(wǎng)站下載的包中,同時包含了源碼以及可以導(dǎo)入到 eclipse 的工程,導(dǎo)入后,會產(chǎn)生 mahout-core,mahout-distribution,mahout-example 等不同的 projects,我們首先編譯一遍,保證沒有錯誤,然后再考慮如何增加自己的代碼。
當(dāng)然,Mahout 在頂層目錄也提供了一個編譯腳本:compile.sh, 可以在命令行完成編譯。
代碼編譯:
把自己的代碼放到 example/src/main/java/目錄下,自動編譯就可以了。輸出產(chǎn)生的類:com.ai.cluster.assesser.ClusterAssesser,然后就被打包到了 examples/target/mahout-examples-0.9-job.jar 中。
配置:
把 examples/target/mahout-examples-0.9-job.jar 覆蓋頂層的 mahout-examples-0.9-job.jar
通過在 conf/driver.classes.default.props 文件添加如下行,把我們的實現(xiàn)類加入到 Mahout 的配置中,從而可以通過 Mahout 腳本執(zhí)行:
com.ai.cluster.assesser.ClusterAssesser = assesser : assesse cluster result using silhoueter algorithm
運行
利用前面我們做聚類過程分析產(chǎn)生的聚類結(jié)果:
bin/mahout assesser -i /shanlei/kmeans/clusteredPoints -o /shanlei/silhouete -c
/shanlei/kmeans --tempDir /shanlei/temp
其中的-c 為輸入聚類的中心點,-i 為聚類的點 –o 為最終的輸出。
查看結(jié)果:
bin/mahout seqdumper -i /shanlei/silhouete -o ./a.txt
more a.txt:
Input Path: hdfs://rac122:18020/shanlei/silhouete/part-r-00000
Key class: class org.apache.hadoop.io.IntWritable Value Class: class org.apache.hadoop.io.Text
Key: 1: Value: 1000,0.5217678842906524
Count: 1
1000 表示共 1000 個點,0.52176 為聚類的 Silhouette 值。大于 0.5,看起來效果還行。
結(jié)束語:
不同于其它的套件,Mahout 從發(fā)布起就是為處理海量數(shù)據(jù)、為生產(chǎn)而準(zhǔn)備的。直到現(xiàn)在,Mahout 的重心還是在優(yōu)化各種算法上面,對易用性考慮不多,而且學(xué)習(xí)成本也很高。但 Mahout 不僅僅提供某些特定的算法,而且還把前期準(zhǔn)備中的數(shù)據(jù)清洗,轉(zhuǎn)換,以及后續(xù)的效果評估、圖形化展現(xiàn)都集成在一塊,方便用戶。這不僅是一種發(fā)展趨勢,也是爭取用戶的一個關(guān)鍵因素。希望大家都能夠加入進來,提供各種各樣的輔助功能,讓 Mahout 變得易用起來。
更多信息請查看IT技術(shù)專欄