def test_esk609(self):
"""Test Esk-609: Map data-frame groups"""
# run Eskapade
self.run_eskapade('esk609_map_df_groups.py')
proc_mgr = ProcessManager()
ds = proc_mgr.service(DataStore)
# check input data
for key in ('map_rdd', 'flat_map_rdd'):
self.assertIn(key, ds, 'no data found with key "{}"'.format(key))
self.assertIsInstance(ds[key], pyspark.RDD,
'object "{0:s}" is not an RDD (type "{1:s}")'.format(key, str(type(ds[key]))))
# sums of "bar" variable
bar_sums = [(0, 27.5), (1, 77.5), (2, 127.5), (3, 177.5), (4, 227.5), (5, 277.5), (6, 327.5), (7, 377.5),
(8, 427.5), (9, 477.5)]
flmap_rows = [(it, 'foo{:d}'.format(it), (it + 1) / 2., bar_sums[it // 10][1]) for it in range(100)]
# check mapped data frames
self.assertListEqual(sorted(ds['map_rdd'].collect()), bar_sums, 'unexpected values in "map_rdd"')
self.assertListEqual(sorted(ds['flat_map_rdd'].collect()), flmap_rows, 'unexpected values in "flat_map_rdd"')
python类RDD的实例源码
def create_python_rdd(jrdd, serializer):
"""Creates a Python RDD from a RDD from Scala.
Args:
jrdd (org.apache.spark.api.java.JavaRDD): The RDD that came from Scala.
serializer (:class:`~geopyspark.AvroSerializer` or pyspark.serializers.AutoBatchedSerializer(AvroSerializer)):
An instance of ``AvroSerializer`` that is either alone, or wrapped by ``AutoBatchedSerializer``.
Returns:
RDD
"""
pysc = get_spark_context()
if isinstance(serializer, AutoBatchedSerializer):
return RDD(jrdd, pysc, serializer)
else:
return RDD(jrdd, pysc, AutoBatchedSerializer(serializer))
def pprint(self, num=10):
"""
Print the first num elements of each RDD generated in this DStream.
@param num: the number of elements from the first will be printed.
"""
def takeAndPrint(time, rdd):
taken = rdd.take(num + 1)
print("-------------------------------------------")
print("Time: %s" % time)
print("-------------------------------------------")
for record in taken[:num]:
print(record)
if len(taken) > num:
print("...")
print("")
self.foreachRDD(takeAndPrint)
def window(self, windowDuration, slideDuration=None):
"""
Return a new DStream in which each RDD contains all the elements in seen in a
sliding window of time over this DStream.
@param windowDuration: width of the window; must be a multiple of this DStream's
batching interval
@param slideDuration: sliding interval of the window (i.e., the interval after which
the new DStream will generate RDDs); must be a multiple of this
DStream's batching interval
"""
self._validate_window_param(windowDuration, slideDuration)
d = self._ssc._jduration(windowDuration)
if slideDuration is None:
return DStream(self._jdstream.window(d), self._ssc, self._jrdd_deserializer)
s = self._ssc._jduration(slideDuration)
return DStream(self._jdstream.window(d, s), self._ssc, self._jrdd_deserializer)
def countByValueAndWindow(self, windowDuration, slideDuration, numPartitions=None):
"""
Return a new DStream in which each RDD contains the count of distinct elements in
RDDs in a sliding window over this DStream.
@param windowDuration: width of the window; must be a multiple of this DStream's
batching interval
@param slideDuration: sliding interval of the window (i.e., the interval after which
the new DStream will generate RDDs); must be a multiple of this
DStream's batching interval
@param numPartitions: number of partitions of each RDD in the new DStream.
"""
keyed = self.map(lambda x: (x, 1))
counted = keyed.reduceByKeyAndWindow(operator.add, operator.sub,
windowDuration, slideDuration, numPartitions)
return counted.filter(lambda kv: kv[1] > 0)
def _py2java(sc, obj):
""" Convert Python object into Java """
if isinstance(obj, RDD):
obj = _to_java_object_rdd(obj)
elif isinstance(obj, DataFrame):
obj = obj._jdf
elif isinstance(obj, SparkContext):
obj = obj._jsc
elif isinstance(obj, list):
obj = [_py2java(sc, x) for x in obj]
elif isinstance(obj, JavaObject):
pass
elif isinstance(obj, (int, long, float, bool, bytes, unicode)):
pass
else:
data = bytearray(PickleSerializer().dumps(obj))
obj = sc._jvm.org.apache.spark.ml.python.MLSerDe.loads(data)
return obj
def train(cls, data, lambda_=1.0):
"""
Train a Naive Bayes model given an RDD of (label, features)
vectors.
This is the Multinomial NB (U{http://tinyurl.com/lsdw6p}) which
can handle all kinds of discrete data. For example, by
converting documents into TF-IDF vectors, it can be used for
document classification. By making every vector a 0-1 vector,
it can also be used as Bernoulli NB (U{http://tinyurl.com/p7c96j6}).
The input feature values must be nonnegative.
:param data:
RDD of LabeledPoint.
:param lambda_:
The smoothing parameter.
(default: 1.0)
"""
first = data.first()
if not isinstance(first, LabeledPoint):
raise ValueError("`data` should be an RDD of LabeledPoint")
labels, pi, theta = callMLlibFunc("trainNaiveBayesModel", data, lambda_)
return NaiveBayesModel(labels.toArray(), pi.toArray(), numpy.array(theta))
def predict(self, x):
"""
Predict the label of one or more examples.
.. note:: In Python, predict cannot currently be used within an RDD
transformation or action.
Call predict directly on the RDD instead.
:param x:
Data point (feature vector), or an RDD of data points (feature
vectors).
"""
if isinstance(x, RDD):
return self.call("predict", x.map(_convert_to_vector))
else:
return self.call("predict", _convert_to_vector(x))
def _py2java(sc, obj):
""" Convert Python object into Java """
if isinstance(obj, RDD):
obj = _to_java_object_rdd(obj)
elif isinstance(obj, DataFrame):
obj = obj._jdf
elif isinstance(obj, SparkContext):
obj = obj._jsc
elif isinstance(obj, list):
obj = [_py2java(sc, x) for x in obj]
elif isinstance(obj, JavaObject):
pass
elif isinstance(obj, (int, long, float, bool, bytes, unicode)):
pass
else:
data = bytearray(PickleSerializer().dumps(obj))
obj = sc._jvm.org.apache.spark.mllib.api.python.SerDe.loads(data)
return obj
def predict(self, x):
"""
Predict labels for provided features.
Using a piecewise linear function.
1) If x exactly matches a boundary then associated prediction
is returned. In case there are multiple predictions with the
same boundary then one of them is returned. Which one is
undefined (same as java.util.Arrays.binarySearch).
2) If x is lower or higher than all boundaries then first or
last prediction is returned respectively. In case there are
multiple predictions with the same boundary then the lowest
or highest is returned respectively.
3) If x falls between two values in boundary array then
prediction is treated as piecewise linear function and
interpolated value is returned. In case there are multiple
values with the same boundary then the same rules as in 2)
are used.
:param x:
Feature or RDD of Features to be labeled.
"""
if isinstance(x, RDD):
return x.map(lambda v: self.predict(v))
return np.interp(x, self.boundaries, self.predictions)
def rows(self):
"""
Rows of the IndexedRowMatrix stored as an RDD of IndexedRows.
>>> mat = IndexedRowMatrix(sc.parallelize([IndexedRow(0, [1, 2, 3]),
... IndexedRow(1, [4, 5, 6])]))
>>> rows = mat.rows
>>> rows.first()
IndexedRow(0, [1.0,2.0,3.0])
"""
# We use DataFrames for serialization of IndexedRows from
# Java, so we first convert the RDD of rows to a DataFrame
# on the Scala/Java side. Then we map each Row in the
# DataFrame back to an IndexedRow on this side.
rows_df = callMLlibFunc("getIndexedRows", self._java_matrix_wrapper._java_model)
rows = rows_df.rdd.map(lambda row: IndexedRow(row[0], row[1]))
return rows
def entries(self):
"""
Entries of the CoordinateMatrix stored as an RDD of
MatrixEntries.
>>> mat = CoordinateMatrix(sc.parallelize([MatrixEntry(0, 0, 1.2),
... MatrixEntry(6, 4, 2.1)]))
>>> entries = mat.entries
>>> entries.first()
MatrixEntry(0, 0, 1.2)
"""
# We use DataFrames for serialization of MatrixEntry entries
# from Java, so we first convert the RDD of entries to a
# DataFrame on the Scala/Java side. Then we map each Row in
# the DataFrame back to a MatrixEntry on this side.
entries_df = callMLlibFunc("getMatrixEntries", self._java_matrix_wrapper._java_model)
entries = entries_df.rdd.map(lambda row: MatrixEntry(row[0], row[1], row[2]))
return entries
def blocks(self):
"""
The RDD of sub-matrix blocks
((blockRowIndex, blockColIndex), sub-matrix) that form this
distributed matrix.
>>> mat = BlockMatrix(
... sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))]), 3, 2)
>>> blocks = mat.blocks
>>> blocks.first()
((0, 0), DenseMatrix(3, 2, [1.0, 2.0, 3.0, 4.0, 5.0, 6.0], 0))
"""
# We use DataFrames for serialization of sub-matrix blocks
# from Java, so we first convert the RDD of blocks to a
# DataFrame on the Scala/Java side. Then we map each Row in
# the DataFrame back to a sub-matrix block on this side.
blocks_df = callMLlibFunc("getMatrixBlocks", self._java_matrix_wrapper._java_model)
blocks = blocks_df.rdd.map(lambda row: ((row[0][0], row[0][1]), row[1]))
return blocks
def pprint(self, num=10):
"""
Print the first num elements of each RDD generated in this DStream.
@param num: the number of elements from the first will be printed.
"""
def takeAndPrint(time, rdd):
taken = rdd.take(num + 1)
print("-------------------------------------------")
print("Time: %s" % time)
print("-------------------------------------------")
for record in taken[:num]:
print(record)
if len(taken) > num:
print("...")
print("")
self.foreachRDD(takeAndPrint)
def window(self, windowDuration, slideDuration=None):
"""
Return a new DStream in which each RDD contains all the elements in seen in a
sliding window of time over this DStream.
@param windowDuration: width of the window; must be a multiple of this DStream's
batching interval
@param slideDuration: sliding interval of the window (i.e., the interval after which
the new DStream will generate RDDs); must be a multiple of this
DStream's batching interval
"""
self._validate_window_param(windowDuration, slideDuration)
d = self._ssc._jduration(windowDuration)
if slideDuration is None:
return DStream(self._jdstream.window(d), self._ssc, self._jrdd_deserializer)
s = self._ssc._jduration(slideDuration)
return DStream(self._jdstream.window(d, s), self._ssc, self._jrdd_deserializer)
def reduceByWindow(self, reduceFunc, invReduceFunc, windowDuration, slideDuration):
"""
Return a new DStream in which each RDD has a single element generated by reducing all
elements in a sliding window over this DStream.
if `invReduceFunc` is not None, the reduction is done incrementally
using the old window's reduced value :
1. reduce the new values that entered the window (e.g., adding new counts)
2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
This is more efficient than `invReduceFunc` is None.
@param reduceFunc: associative reduce function
@param invReduceFunc: inverse reduce function of `reduceFunc`
@param windowDuration: width of the window; must be a multiple of this DStream's
batching interval
@param slideDuration: sliding interval of the window (i.e., the interval after which
the new DStream will generate RDDs); must be a multiple of this
DStream's batching interval
"""
keyed = self.map(lambda x: (1, x))
reduced = keyed.reduceByKeyAndWindow(reduceFunc, invReduceFunc,
windowDuration, slideDuration, 1)
return reduced.map(lambda kv: kv[1])
def countByValueAndWindow(self, windowDuration, slideDuration, numPartitions=None):
"""
Return a new DStream in which each RDD contains the count of distinct elements in
RDDs in a sliding window over this DStream.
@param windowDuration: width of the window; must be a multiple of this DStream's
batching interval
@param slideDuration: sliding interval of the window (i.e., the interval after which
the new DStream will generate RDDs); must be a multiple of this
DStream's batching interval
@param numPartitions: number of partitions of each RDD in the new DStream.
"""
keyed = self.map(lambda x: (x, 1))
counted = keyed.reduceByKeyAndWindow(operator.add, operator.sub,
windowDuration, slideDuration, numPartitions)
return counted.filter(lambda kv: kv[1] > 0).count()
def train(cls, data, lambda_=1.0):
"""
Train a Naive Bayes model given an RDD of (label, features)
vectors.
This is the Multinomial NB (U{http://tinyurl.com/lsdw6p}) which
can handle all kinds of discrete data. For example, by
converting documents into TF-IDF vectors, it can be used for
document classification. By making every vector a 0-1 vector,
it can also be used as Bernoulli NB (U{http://tinyurl.com/p7c96j6}).
The input feature values must be nonnegative.
:param data: RDD of LabeledPoint.
:param lambda_: The smoothing parameter (default: 1.0).
"""
first = data.first()
if not isinstance(first, LabeledPoint):
raise ValueError("`data` should be an RDD of LabeledPoint")
labels, pi, theta = callMLlibFunc("trainNaiveBayesModel", data, lambda_)
return NaiveBayesModel(labels.toArray(), pi.toArray(), numpy.array(theta))
def predict(self, x):
"""
Predict the label of one or more examples.
Note: In Python, predict cannot currently be used within an RDD
transformation or action.
Call predict directly on the RDD instead.
:param x: Data point (feature vector),
or an RDD of data points (feature vectors).
"""
if isinstance(x, RDD):
return self.call("predict", x.map(_convert_to_vector))
else:
return self.call("predict", _convert_to_vector(x))
def _py2java(sc, obj):
""" Convert Python object into Java """
if isinstance(obj, RDD):
obj = _to_java_object_rdd(obj)
elif isinstance(obj, DataFrame):
obj = obj._jdf
elif isinstance(obj, SparkContext):
obj = obj._jsc
elif isinstance(obj, list):
obj = ListConverter().convert([_py2java(sc, x) for x in obj], sc._gateway._gateway_client)
elif isinstance(obj, JavaObject):
pass
elif isinstance(obj, (int, long, float, bool, bytes, unicode)):
pass
else:
data = bytearray(PickleSerializer().dumps(obj))
obj = sc._jvm.SerDe.loads(data)
return obj
def entries(self):
"""
Entries of the CoordinateMatrix stored as an RDD of
MatrixEntries.
>>> mat = CoordinateMatrix(sc.parallelize([MatrixEntry(0, 0, 1.2),
... MatrixEntry(6, 4, 2.1)]))
>>> entries = mat.entries
>>> entries.first()
MatrixEntry(0, 0, 1.2)
"""
# We use DataFrames for serialization of MatrixEntry entries
# from Java, so we first convert the RDD of entries to a
# DataFrame on the Scala/Java side. Then we map each Row in
# the DataFrame back to a MatrixEntry on this side.
entries_df = callMLlibFunc("getMatrixEntries", self._java_matrix_wrapper._java_model)
entries = entries_df.map(lambda row: MatrixEntry(row[0], row[1], row[2]))
return entries
def blocks(self):
"""
The RDD of sub-matrix blocks
((blockRowIndex, blockColIndex), sub-matrix) that form this
distributed matrix.
>>> mat = BlockMatrix(
... sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
... ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))]), 3, 2)
>>> blocks = mat.blocks
>>> blocks.first()
((0, 0), DenseMatrix(3, 2, [1.0, 2.0, 3.0, 4.0, 5.0, 6.0], 0))
"""
# We use DataFrames for serialization of sub-matrix blocks
# from Java, so we first convert the RDD of blocks to a
# DataFrame on the Scala/Java side. Then we map each Row in
# the DataFrame back to a sub-matrix block on this side.
blocks_df = callMLlibFunc("getMatrixBlocks", self._java_matrix_wrapper._java_model)
blocks = blocks_df.map(lambda row: ((row[0][0], row[0][1]), row[1]))
return blocks
def execute(self):
"""Execute RddGroupMapper"""
# get process manager and data store
proc_mgr = ProcessManager()
ds = proc_mgr.service(DataStore)
# fetch data frame from data store
if self.read_key not in ds:
raise KeyError('no input data found in data store with key "{}"'.format(self.read_key))
data = ds[self.read_key]
if not isinstance(data, pyspark.RDD):
raise TypeError('expected a Spark RDD for "{0:s}" (got "{1:s}")'.format(self.read_key, str(type(data))))
# apply input map
if self.input_map:
data = data.map(self.input_map)
# group data by keys in the data
data = data.groupByKey(numPartitions=self.num_group_partitions)
# apply map on group values
if self.flatten_output_groups:
data = data.flatMapValues(self.group_map)
else:
data = data.mapValues(self.group_map)
# apply map on result
if self.result_map:
data = data.map(self.result_map)
# store data in data store
ds[self.store_key] = data
return StatusCode.Success
def test_esk606(self):
"""Test Esk-606: Convert Spark data frame"""
# run Eskapade
self.run_eskapade('esk606_convert_spark_df.py')
proc_mgr = ProcessManager()
ds = proc_mgr.service(DataStore)
# define types of stored data sets
data_types = {'df': pyspark.sql.DataFrame, 'rdd': pyspark.RDD, 'list': list, 'pd': pd.DataFrame}
# define functions to obtain data-frame content
content_funcs = {'df': lambda d: sorted(d.rdd.map(tuple).collect()),
'rdd': lambda d: sorted(d.collect()),
'list': lambda d: sorted(d),
'pd': lambda d: sorted(map(tuple, d.values))}
# check input data
self.assertIn('df', ds, 'no data found with key "df"')
self.assertIsInstance(ds['df'], pyspark.sql.DataFrame, 'unexpected type for input data frame')
# check created data sets
rows = [(it, 'foo{:d}'.format(it), (it + 1) / 2.) for it in range(20, 100)]
for key, dtype in data_types.items():
# check content
dkey = '{}_output'.format(key)
self.assertIn(dkey, ds, 'no data found with key "{}"'.format(dkey))
self.assertIsInstance(ds[dkey], dtype, 'unexpected type for "{}" data'.format(key))
self.assertListEqual(content_funcs[key](ds[dkey]), rows, 'unexpected content for "{}" data'.format(key))
# check schema
skey = '{}_schema'.format(key)
self.assertIn(skey, ds, 'no schema found with key {}'.format(skey))
self.assertListEqual(list(ds[skey]), list(ds['df'].schema), 'unexpected schema for "{}" data'.format(key))
def queueStream(self, rdds, oneAtATime=True, default=None):
"""
Create an input stream from an queue of RDDs or list. In each batch,
it will process either one or all of the RDDs returned by the queue.
.. note:: Changes to the queue after the stream is created will not be recognized.
@param rdds: Queue of RDDs
@param oneAtATime: pick one rdd each time or pick all of them once.
@param default: The default rdd if no more in rdds
"""
if default and not isinstance(default, RDD):
default = self._sc.parallelize(default)
if not rdds and default:
rdds = [rdds]
if rdds and not isinstance(rdds[0], RDD):
rdds = [self._sc.parallelize(input) for input in rdds]
self._check_serializers(rdds)
queue = self._jvm.PythonDStream.toRDDQueue([r._jrdd for r in rdds])
if default:
default = default._reserialize(rdds[0]._jrdd_deserializer)
jdstream = self._jssc.queueStream(queue, oneAtATime, default._jrdd)
else:
jdstream = self._jssc.queueStream(queue, oneAtATime)
return DStream(jdstream, self, rdds[0]._jrdd_deserializer)
def transform(self, dstreams, transformFunc):
"""
Create a new DStream in which each RDD is generated by applying
a function on RDDs of the DStreams. The order of the JavaRDDs in
the transform function parameter will be the same as the order
of corresponding DStreams in the list.
"""
jdstreams = [d._jdstream for d in dstreams]
# change the final serializer to sc.serializer
func = TransformFunction(self._sc,
lambda t, *rdds: transformFunc(rdds).map(lambda x: x),
*[d._jrdd_deserializer for d in dstreams])
jfunc = self._jvm.TransformFunction(func)
jdstream = self._jssc.transform(jdstreams, jfunc)
return DStream(jdstream, self, self._sc.serializer)
def count(self):
"""
Return a new DStream in which each RDD has a single element
generated by counting each RDD of this DStream.
"""
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).reduce(operator.add)
def mapPartitions(self, f, preservesPartitioning=False):
"""
Return a new DStream in which each RDD is generated by applying
mapPartitions() to each RDDs of this DStream.
"""
def func(s, iterator):
return f(iterator)
return self.mapPartitionsWithIndex(func, preservesPartitioning)
def mapPartitionsWithIndex(self, f, preservesPartitioning=False):
"""
Return a new DStream in which each RDD is generated by applying
mapPartitionsWithIndex() to each RDDs of this DStream.
"""
return self.transform(lambda rdd: rdd.mapPartitionsWithIndex(f, preservesPartitioning))
def reduceByKey(self, func, numPartitions=None):
"""
Return a new DStream by applying reduceByKey to each RDD.
"""
if numPartitions is None:
numPartitions = self._sc.defaultParallelism
return self.combineByKey(lambda x: x, func, func, numPartitions)