def combineByKey(self, createCombiner, mergeValue, mergeCombiners,
numPartitions=None):
"""
Return a new DStream by applying combineByKey to each RDD.
"""
if numPartitions is None:
numPartitions = self._sc.defaultParallelism
def func(rdd):
return rdd.combineByKey(createCombiner, mergeValue, mergeCombiners, numPartitions)
return self.transform(func)
python类RDD的实例源码
def partitionBy(self, numPartitions, partitionFunc=portable_hash):
"""
Return a copy of the DStream in which each RDD are partitioned
using the specified partitioner.
"""
return self.transform(lambda rdd: rdd.partitionBy(numPartitions, partitionFunc))
def foreachRDD(self, func):
"""
Apply a function to each RDD in this DStream.
"""
if func.__code__.co_argcount == 1:
old_func = func
func = lambda t, rdd: old_func(rdd)
jfunc = TransformFunction(self._sc, func, self._jrdd_deserializer)
api = self._ssc._jvm.PythonDStream
api.callForeachRDD(self._jdstream, jfunc)
def checkpoint(self, interval):
"""
Enable periodic checkpointing of RDDs of this DStream
@param interval: time in seconds, after each period of that, generated
RDD will be checkpointed
"""
self.is_checkpointed = True
self._jdstream.checkpoint(self._ssc._jduration(interval))
return self
def groupByKey(self, numPartitions=None):
"""
Return a new DStream by applying groupByKey on each RDD.
"""
if numPartitions is None:
numPartitions = self._sc.defaultParallelism
return self.transform(lambda rdd: rdd.groupByKey(numPartitions))
def countByValue(self):
"""
Return a new DStream in which each RDD contains the counts of each
distinct value in each RDD of this DStream.
"""
return self.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x+y)
def saveAsTextFiles(self, prefix, suffix=None):
"""
Save each RDD in this DStream as at text file, using string
representation of elements.
"""
def saveAsTextFile(t, rdd):
path = rddToFileName(prefix, suffix, t)
try:
rdd.saveAsTextFile(path)
except Py4JJavaError as e:
# after recovered from checkpointing, the foreachRDD may
# be called twice
if 'FileAlreadyExistsException' not in str(e):
raise
return self.foreachRDD(saveAsTextFile)
# TODO: uncomment this until we have ssc.pickleFileStream()
# def saveAsPickleFiles(self, prefix, suffix=None):
# """
# Save each RDD in this DStream as at binary file, the elements are
# serialized by pickle.
# """
# def saveAsPickleFile(t, rdd):
# path = rddToFileName(prefix, suffix, t)
# try:
# rdd.saveAsPickleFile(path)
# except Py4JJavaError as e:
# # after recovered from checkpointing, the foreachRDD may
# # be called twice
# if 'FileAlreadyExistsException' not in str(e):
# raise
# return self.foreachRDD(saveAsPickleFile)
def transform(self, func):
"""
Return a new DStream in which each RDD is generated by applying a function
on each RDD of this DStream.
`func` can have one argument of `rdd`, or have two arguments of
(`time`, `rdd`)
"""
if func.__code__.co_argcount == 1:
oldfunc = func
func = lambda t, rdd: oldfunc(rdd)
assert func.__code__.co_argcount == 2, "func should take one or two arguments"
return TransformedDStream(self, func)
def slice(self, begin, end):
"""
Return all the RDDs between 'begin' to 'end' (both included)
`begin`, `end` could be datetime.datetime() or unix_timestamp
"""
jrdds = self._jdstream.slice(self._jtime(begin), self._jtime(end))
return [RDD(jrdd, self._sc, self._jrdd_deserializer) for jrdd in jrdds]
def reduceByWindow(self, reduceFunc, invReduceFunc, windowDuration, slideDuration):
"""
Return a new DStream in which each RDD has a single element generated by reducing all
elements in a sliding window over this DStream.
if `invReduceFunc` is not None, the reduction is done incrementally
using the old window's reduced value :
1. reduce the new values that entered the window (e.g., adding new counts)
2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
This is more efficient than `invReduceFunc` is None.
@param reduceFunc: associative and commutative reduce function
@param invReduceFunc: inverse reduce function of `reduceFunc`; such that for all y,
and invertible x:
`invReduceFunc(reduceFunc(x, y), x) = y`
@param windowDuration: width of the window; must be a multiple of this DStream's
batching interval
@param slideDuration: sliding interval of the window (i.e., the interval after which
the new DStream will generate RDDs); must be a multiple of this
DStream's batching interval
"""
keyed = self.map(lambda x: (1, x))
reduced = keyed.reduceByKeyAndWindow(reduceFunc, invReduceFunc,
windowDuration, slideDuration, 1)
return reduced.map(lambda kv: kv[1])
def countByWindow(self, windowDuration, slideDuration):
"""
Return a new DStream in which each RDD has a single element generated
by counting the number of elements in a window over this DStream.
windowDuration and slideDuration are as defined in the window() operation.
This is equivalent to window(windowDuration, slideDuration).count(),
but will be more efficient if window is large.
"""
return self.map(lambda x: 1).reduceByWindow(operator.add, operator.sub,
windowDuration, slideDuration)
def updateStateByKey(self, updateFunc, numPartitions=None, initialRDD=None):
"""
Return a new "state" DStream where the state for each key is updated by applying
the given function on the previous state of the key and the new values of the key.
@param updateFunc: State update function. If this function returns None, then
corresponding state key-value pair will be eliminated.
"""
if numPartitions is None:
numPartitions = self._sc.defaultParallelism
if initialRDD and not isinstance(initialRDD, RDD):
initialRDD = self._sc.parallelize(initialRDD)
def reduceFunc(t, a, b):
if a is None:
g = b.groupByKey(numPartitions).mapValues(lambda vs: (list(vs), None))
else:
g = a.cogroup(b.partitionBy(numPartitions), numPartitions)
g = g.mapValues(lambda ab: (list(ab[1]), list(ab[0])[0] if len(ab[0]) else None))
state = g.mapValues(lambda vs_s: updateFunc(vs_s[0], vs_s[1]))
return state.filter(lambda k_v: k_v[1] is not None)
jreduceFunc = TransformFunction(self._sc, reduceFunc,
self._sc.serializer, self._jrdd_deserializer)
if initialRDD:
initialRDD = initialRDD._reserialize(self._jrdd_deserializer)
dstream = self._sc._jvm.PythonStateDStream(self._jdstream.dstream(), jreduceFunc,
initialRDD._jrdd)
else:
dstream = self._sc._jvm.PythonStateDStream(self._jdstream.dstream(), jreduceFunc)
return DStream(dstream.asJavaDStream(), self._ssc, self._sc.serializer)
def __init__(self, ctx, func, *deserializers):
self.ctx = ctx
self.func = func
self.deserializers = deserializers
self.rdd_wrap_func = lambda jrdd, ctx, ser: RDD(jrdd, ctx, ser)
self.failure = None
def _to_java_object_rdd(rdd):
""" Return an JavaRDD of Object by unpickling
It will convert each Python object into Java object by Pyrolite, whenever the
RDD is serialized in batch or not.
"""
rdd = rdd._reserialize(AutoBatchedSerializer(PickleSerializer()))
return rdd.ctx._jvm.org.apache.spark.ml.python.MLSerDe.pythonToJava(rdd._jrdd, True)
def predict(self, test):
"""
Predict values for a single data point or an RDD of points
using the model trained.
"""
raise NotImplementedError
def predict(self, x):
"""
Predict values for a single data point or an RDD of points
using the model trained.
"""
if isinstance(x, RDD):
return x.map(lambda v: self.predict(v))
x = _convert_to_vector(x)
if self.numClasses == 2:
margin = self.weights.dot(x) + self._intercept
if margin > 0:
prob = 1 / (1 + exp(-margin))
else:
exp_margin = exp(margin)
prob = exp_margin / (1 + exp_margin)
if self._threshold is None:
return prob
else:
return 1 if prob > self._threshold else 0
else:
best_class = 0
max_margin = 0.0
if x.size + 1 == self._dataWithBiasSize:
for i in range(0, self._numClasses - 1):
margin = x.dot(self._weightsMatrix[i][0:x.size]) + \
self._weightsMatrix[i][x.size]
if margin > max_margin:
max_margin = margin
best_class = i + 1
else:
for i in range(0, self._numClasses - 1):
margin = x.dot(self._weightsMatrix[i])
if margin > max_margin:
max_margin = margin
best_class = i + 1
return best_class
def predict(self, x):
"""
Predict values for a single data point or an RDD of points
using the model trained.
"""
if isinstance(x, RDD):
return x.map(lambda v: self.predict(v))
x = _convert_to_vector(x)
margin = self.weights.dot(x) + self.intercept
if self._threshold is None:
return margin
else:
return 1 if margin > self._threshold else 0
def trainOn(self, dstream):
"""Train the model on the incoming dstream."""
self._validate(dstream)
def update(rdd):
# LogisticRegressionWithSGD.train raises an error for an empty RDD.
if not rdd.isEmpty():
self._model = LogisticRegressionWithSGD.train(
rdd, self.numIterations, self.stepSize,
self.miniBatchFraction, self._model.weights,
regParam=self.regParam, convergenceTol=self.convergenceTol)
dstream.foreachRDD(update)
def predict(self, x):
"""
Predict values for a single data point or an RDD of points using
the model trained.
.. note:: In Python, predict cannot currently be used within an RDD
transformation or action.
Call predict directly on the RDD instead.
"""
if isinstance(x, RDD):
return self.call("predict", x.map(_convert_to_vector))
else:
return self.call("predict", _convert_to_vector(x))
def _train(cls, data, type, numClasses, features, impurity="gini", maxDepth=5, maxBins=32,
minInstancesPerNode=1, minInfoGain=0.0):
first = data.first()
assert isinstance(first, LabeledPoint), "the data should be RDD of LabeledPoint"
model = callMLlibFunc("trainDecisionTreeModel", data, type, numClasses, features,
impurity, maxDepth, maxBins, minInstancesPerNode, minInfoGain)
return DecisionTreeModel(model)