def train(self, data, sample_weight=None):
"""
:type data: pyspark.RDD
:param data: (key, k-dim vector like)
Train the model using a (key, vector) RDD
"""
parts = KDPartitioner(data, self.max_partitions)
self.data = data
self.bounding_boxes = parts.bounding_boxes
self.expanded_boxes = {}
self._create_neighborhoods()
# repartition data set on the partition label
self.data = self.data.map(lambda ((k, p), v): (p, (k, v))) \
.partitionBy(len(parts.partitions)) \
.map(lambda (p, (k, v)): ((k, p), v))
# create parameters for sklearn DBSCAN
params = self.dbscan_params or {
'eps': self.eps,
'min_samples': self.min_samples,
'metric': self.metric}
# perform dbscan on each part
self.data = self.data.mapPartitions(
lambda iterable: dbscan_partition(iterable, params, sample_weight))
self.data.cache()
self._remap_cluster_ids()
评论列表
文章目录