python类RDD的实例源码

test_tutorial_macros.py 文件源码 项目:Eskapade 作者: KaveIO 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_esk609(self):
        """Test Esk-609: Map data-frame groups"""

        # run Eskapade
        self.run_eskapade('esk609_map_df_groups.py')
        proc_mgr = ProcessManager()
        ds = proc_mgr.service(DataStore)

        # check input data
        for key in ('map_rdd', 'flat_map_rdd'):
            self.assertIn(key, ds, 'no data found with key "{}"'.format(key))
            self.assertIsInstance(ds[key], pyspark.RDD,
                                  'object "{0:s}" is not an RDD (type "{1:s}")'.format(key, str(type(ds[key]))))

        # sums of "bar" variable
        bar_sums = [(0, 27.5), (1, 77.5), (2, 127.5), (3, 177.5), (4, 227.5), (5, 277.5), (6, 327.5), (7, 377.5),
                    (8, 427.5), (9, 477.5)]
        flmap_rows = [(it, 'foo{:d}'.format(it), (it + 1) / 2., bar_sums[it // 10][1]) for it in range(100)]

        # check mapped data frames
        self.assertListEqual(sorted(ds['map_rdd'].collect()), bar_sums, 'unexpected values in "map_rdd"')
        self.assertListEqual(sorted(ds['flat_map_rdd'].collect()), flmap_rows, 'unexpected values in "flat_map_rdd"')
__init__.py 文件源码 项目:geopyspark 作者: locationtech-labs 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def create_python_rdd(jrdd, serializer):
    """Creates a Python RDD from a RDD from Scala.

    Args:
        jrdd (org.apache.spark.api.java.JavaRDD): The RDD that came from Scala.
        serializer (:class:`~geopyspark.AvroSerializer` or pyspark.serializers.AutoBatchedSerializer(AvroSerializer)):
            An instance of ``AvroSerializer`` that is either alone, or wrapped by ``AutoBatchedSerializer``.

    Returns:
        RDD
    """

    pysc = get_spark_context()

    if isinstance(serializer, AutoBatchedSerializer):
        return RDD(jrdd, pysc, serializer)
    else:
        return RDD(jrdd, pysc, AutoBatchedSerializer(serializer))
dstream.py 文件源码 项目:MIT-Thesis 作者: alec-heif 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def pprint(self, num=10):
        """
        Print the first num elements of each RDD generated in this DStream.

        @param num: the number of elements from the first will be printed.
        """
        def takeAndPrint(time, rdd):
            taken = rdd.take(num + 1)
            print("-------------------------------------------")
            print("Time: %s" % time)
            print("-------------------------------------------")
            for record in taken[:num]:
                print(record)
            if len(taken) > num:
                print("...")
            print("")

        self.foreachRDD(takeAndPrint)
dstream.py 文件源码 项目:MIT-Thesis 作者: alec-heif 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def window(self, windowDuration, slideDuration=None):
        """
        Return a new DStream in which each RDD contains all the elements in seen in a
        sliding window of time over this DStream.

        @param windowDuration: width of the window; must be a multiple of this DStream's
                              batching interval
        @param slideDuration:  sliding interval of the window (i.e., the interval after which
                              the new DStream will generate RDDs); must be a multiple of this
                              DStream's batching interval
        """
        self._validate_window_param(windowDuration, slideDuration)
        d = self._ssc._jduration(windowDuration)
        if slideDuration is None:
            return DStream(self._jdstream.window(d), self._ssc, self._jrdd_deserializer)
        s = self._ssc._jduration(slideDuration)
        return DStream(self._jdstream.window(d, s), self._ssc, self._jrdd_deserializer)
dstream.py 文件源码 项目:MIT-Thesis 作者: alec-heif 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def countByValueAndWindow(self, windowDuration, slideDuration, numPartitions=None):
        """
        Return a new DStream in which each RDD contains the count of distinct elements in
        RDDs in a sliding window over this DStream.

        @param windowDuration: width of the window; must be a multiple of this DStream's
                              batching interval
        @param slideDuration:  sliding interval of the window (i.e., the interval after which
                              the new DStream will generate RDDs); must be a multiple of this
                              DStream's batching interval
        @param numPartitions:  number of partitions of each RDD in the new DStream.
        """
        keyed = self.map(lambda x: (x, 1))
        counted = keyed.reduceByKeyAndWindow(operator.add, operator.sub,
                                             windowDuration, slideDuration, numPartitions)
        return counted.filter(lambda kv: kv[1] > 0)
common.py 文件源码 项目:MIT-Thesis 作者: alec-heif 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def _py2java(sc, obj):
    """ Convert Python object into Java """
    if isinstance(obj, RDD):
        obj = _to_java_object_rdd(obj)
    elif isinstance(obj, DataFrame):
        obj = obj._jdf
    elif isinstance(obj, SparkContext):
        obj = obj._jsc
    elif isinstance(obj, list):
        obj = [_py2java(sc, x) for x in obj]
    elif isinstance(obj, JavaObject):
        pass
    elif isinstance(obj, (int, long, float, bool, bytes, unicode)):
        pass
    else:
        data = bytearray(PickleSerializer().dumps(obj))
        obj = sc._jvm.org.apache.spark.ml.python.MLSerDe.loads(data)
    return obj
classification.py 文件源码 项目:MIT-Thesis 作者: alec-heif 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def train(cls, data, lambda_=1.0):
        """
        Train a Naive Bayes model given an RDD of (label, features)
        vectors.

        This is the Multinomial NB (U{http://tinyurl.com/lsdw6p}) which
        can handle all kinds of discrete data.  For example, by
        converting documents into TF-IDF vectors, it can be used for
        document classification. By making every vector a 0-1 vector,
        it can also be used as Bernoulli NB (U{http://tinyurl.com/p7c96j6}).
        The input feature values must be nonnegative.

        :param data:
          RDD of LabeledPoint.
        :param lambda_:
          The smoothing parameter.
          (default: 1.0)
        """
        first = data.first()
        if not isinstance(first, LabeledPoint):
            raise ValueError("`data` should be an RDD of LabeledPoint")
        labels, pi, theta = callMLlibFunc("trainNaiveBayesModel", data, lambda_)
        return NaiveBayesModel(labels.toArray(), pi.toArray(), numpy.array(theta))
tree.py 文件源码 项目:MIT-Thesis 作者: alec-heif 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def predict(self, x):
        """
        Predict the label of one or more examples.

        .. note:: In Python, predict cannot currently be used within an RDD
            transformation or action.
            Call predict directly on the RDD instead.

        :param x:
          Data point (feature vector), or an RDD of data points (feature
          vectors).
        """
        if isinstance(x, RDD):
            return self.call("predict", x.map(_convert_to_vector))

        else:
            return self.call("predict", _convert_to_vector(x))
common.py 文件源码 项目:MIT-Thesis 作者: alec-heif 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _py2java(sc, obj):
    """ Convert Python object into Java """
    if isinstance(obj, RDD):
        obj = _to_java_object_rdd(obj)
    elif isinstance(obj, DataFrame):
        obj = obj._jdf
    elif isinstance(obj, SparkContext):
        obj = obj._jsc
    elif isinstance(obj, list):
        obj = [_py2java(sc, x) for x in obj]
    elif isinstance(obj, JavaObject):
        pass
    elif isinstance(obj, (int, long, float, bool, bytes, unicode)):
        pass
    else:
        data = bytearray(PickleSerializer().dumps(obj))
        obj = sc._jvm.org.apache.spark.mllib.api.python.SerDe.loads(data)
    return obj
regression.py 文件源码 项目:MIT-Thesis 作者: alec-heif 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def predict(self, x):
        """
        Predict labels for provided features.
        Using a piecewise linear function.
        1) If x exactly matches a boundary then associated prediction
        is returned. In case there are multiple predictions with the
        same boundary then one of them is returned. Which one is
        undefined (same as java.util.Arrays.binarySearch).
        2) If x is lower or higher than all boundaries then first or
        last prediction is returned respectively. In case there are
        multiple predictions with the same boundary then the lowest
        or highest is returned respectively.
        3) If x falls between two values in boundary array then
        prediction is treated as piecewise linear function and
        interpolated value is returned. In case there are multiple
        values with the same boundary then the same rules as in 2)
        are used.

        :param x:
          Feature or RDD of Features to be labeled.
        """
        if isinstance(x, RDD):
            return x.map(lambda v: self.predict(v))
        return np.interp(x, self.boundaries, self.predictions)
distributed.py 文件源码 项目:MIT-Thesis 作者: alec-heif 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def rows(self):
        """
        Rows of the IndexedRowMatrix stored as an RDD of IndexedRows.

        >>> mat = IndexedRowMatrix(sc.parallelize([IndexedRow(0, [1, 2, 3]),
        ...                                        IndexedRow(1, [4, 5, 6])]))
        >>> rows = mat.rows
        >>> rows.first()
        IndexedRow(0, [1.0,2.0,3.0])
        """
        # We use DataFrames for serialization of IndexedRows from
        # Java, so we first convert the RDD of rows to a DataFrame
        # on the Scala/Java side. Then we map each Row in the
        # DataFrame back to an IndexedRow on this side.
        rows_df = callMLlibFunc("getIndexedRows", self._java_matrix_wrapper._java_model)
        rows = rows_df.rdd.map(lambda row: IndexedRow(row[0], row[1]))
        return rows
distributed.py 文件源码 项目:MIT-Thesis 作者: alec-heif 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def entries(self):
        """
        Entries of the CoordinateMatrix stored as an RDD of
        MatrixEntries.

        >>> mat = CoordinateMatrix(sc.parallelize([MatrixEntry(0, 0, 1.2),
        ...                                        MatrixEntry(6, 4, 2.1)]))
        >>> entries = mat.entries
        >>> entries.first()
        MatrixEntry(0, 0, 1.2)
        """
        # We use DataFrames for serialization of MatrixEntry entries
        # from Java, so we first convert the RDD of entries to a
        # DataFrame on the Scala/Java side. Then we map each Row in
        # the DataFrame back to a MatrixEntry on this side.
        entries_df = callMLlibFunc("getMatrixEntries", self._java_matrix_wrapper._java_model)
        entries = entries_df.rdd.map(lambda row: MatrixEntry(row[0], row[1], row[2]))
        return entries
distributed.py 文件源码 项目:MIT-Thesis 作者: alec-heif 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def blocks(self):
        """
        The RDD of sub-matrix blocks
        ((blockRowIndex, blockColIndex), sub-matrix) that form this
        distributed matrix.

        >>> mat = BlockMatrix(
        ...     sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
        ...                     ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))]), 3, 2)
        >>> blocks = mat.blocks
        >>> blocks.first()
        ((0, 0), DenseMatrix(3, 2, [1.0, 2.0, 3.0, 4.0, 5.0, 6.0], 0))

        """
        # We use DataFrames for serialization of sub-matrix blocks
        # from Java, so we first convert the RDD of blocks to a
        # DataFrame on the Scala/Java side. Then we map each Row in
        # the DataFrame back to a sub-matrix block on this side.
        blocks_df = callMLlibFunc("getMatrixBlocks", self._java_matrix_wrapper._java_model)
        blocks = blocks_df.rdd.map(lambda row: ((row[0][0], row[0][1]), row[1]))
        return blocks
dstream.py 文件源码 项目:pyspark 作者: v-v-vishnevskiy 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def pprint(self, num=10):
        """
        Print the first num elements of each RDD generated in this DStream.

        @param num: the number of elements from the first will be printed.
        """
        def takeAndPrint(time, rdd):
            taken = rdd.take(num + 1)
            print("-------------------------------------------")
            print("Time: %s" % time)
            print("-------------------------------------------")
            for record in taken[:num]:
                print(record)
            if len(taken) > num:
                print("...")
            print("")

        self.foreachRDD(takeAndPrint)
dstream.py 文件源码 项目:pyspark 作者: v-v-vishnevskiy 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def window(self, windowDuration, slideDuration=None):
        """
        Return a new DStream in which each RDD contains all the elements in seen in a
        sliding window of time over this DStream.

        @param windowDuration: width of the window; must be a multiple of this DStream's
                              batching interval
        @param slideDuration:  sliding interval of the window (i.e., the interval after which
                              the new DStream will generate RDDs); must be a multiple of this
                              DStream's batching interval
        """
        self._validate_window_param(windowDuration, slideDuration)
        d = self._ssc._jduration(windowDuration)
        if slideDuration is None:
            return DStream(self._jdstream.window(d), self._ssc, self._jrdd_deserializer)
        s = self._ssc._jduration(slideDuration)
        return DStream(self._jdstream.window(d, s), self._ssc, self._jrdd_deserializer)
dstream.py 文件源码 项目:pyspark 作者: v-v-vishnevskiy 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def reduceByWindow(self, reduceFunc, invReduceFunc, windowDuration, slideDuration):
        """
        Return a new DStream in which each RDD has a single element generated by reducing all
        elements in a sliding window over this DStream.

        if `invReduceFunc` is not None, the reduction is done incrementally
        using the old window's reduced value :

        1. reduce the new values that entered the window (e.g., adding new counts)

        2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
        This is more efficient than `invReduceFunc` is None.

        @param reduceFunc:     associative reduce function
        @param invReduceFunc:  inverse reduce function of `reduceFunc`
        @param windowDuration: width of the window; must be a multiple of this DStream's
                               batching interval
        @param slideDuration:  sliding interval of the window (i.e., the interval after which
                               the new DStream will generate RDDs); must be a multiple of this
                               DStream's batching interval
        """
        keyed = self.map(lambda x: (1, x))
        reduced = keyed.reduceByKeyAndWindow(reduceFunc, invReduceFunc,
                                             windowDuration, slideDuration, 1)
        return reduced.map(lambda kv: kv[1])
dstream.py 文件源码 项目:pyspark 作者: v-v-vishnevskiy 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def countByValueAndWindow(self, windowDuration, slideDuration, numPartitions=None):
        """
        Return a new DStream in which each RDD contains the count of distinct elements in
        RDDs in a sliding window over this DStream.

        @param windowDuration: width of the window; must be a multiple of this DStream's
                              batching interval
        @param slideDuration:  sliding interval of the window (i.e., the interval after which
                              the new DStream will generate RDDs); must be a multiple of this
                              DStream's batching interval
        @param numPartitions:  number of partitions of each RDD in the new DStream.
        """
        keyed = self.map(lambda x: (x, 1))
        counted = keyed.reduceByKeyAndWindow(operator.add, operator.sub,
                                             windowDuration, slideDuration, numPartitions)
        return counted.filter(lambda kv: kv[1] > 0).count()
classification.py 文件源码 项目:pyspark 作者: v-v-vishnevskiy 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def train(cls, data, lambda_=1.0):
        """
        Train a Naive Bayes model given an RDD of (label, features)
        vectors.

        This is the Multinomial NB (U{http://tinyurl.com/lsdw6p}) which
        can handle all kinds of discrete data.  For example, by
        converting documents into TF-IDF vectors, it can be used for
        document classification. By making every vector a 0-1 vector,
        it can also be used as Bernoulli NB (U{http://tinyurl.com/p7c96j6}).
        The input feature values must be nonnegative.

        :param data: RDD of LabeledPoint.
        :param lambda_: The smoothing parameter (default: 1.0).
        """
        first = data.first()
        if not isinstance(first, LabeledPoint):
            raise ValueError("`data` should be an RDD of LabeledPoint")
        labels, pi, theta = callMLlibFunc("trainNaiveBayesModel", data, lambda_)
        return NaiveBayesModel(labels.toArray(), pi.toArray(), numpy.array(theta))
tree.py 文件源码 项目:pyspark 作者: v-v-vishnevskiy 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def predict(self, x):
        """
        Predict the label of one or more examples.

        Note: In Python, predict cannot currently be used within an RDD
              transformation or action.
              Call predict directly on the RDD instead.

        :param x:  Data point (feature vector),
                   or an RDD of data points (feature vectors).
        """
        if isinstance(x, RDD):
            return self.call("predict", x.map(_convert_to_vector))

        else:
            return self.call("predict", _convert_to_vector(x))
common.py 文件源码 项目:pyspark 作者: v-v-vishnevskiy 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def _py2java(sc, obj):
    """ Convert Python object into Java """
    if isinstance(obj, RDD):
        obj = _to_java_object_rdd(obj)
    elif isinstance(obj, DataFrame):
        obj = obj._jdf
    elif isinstance(obj, SparkContext):
        obj = obj._jsc
    elif isinstance(obj, list):
        obj = ListConverter().convert([_py2java(sc, x) for x in obj], sc._gateway._gateway_client)
    elif isinstance(obj, JavaObject):
        pass
    elif isinstance(obj, (int, long, float, bool, bytes, unicode)):
        pass
    else:
        data = bytearray(PickleSerializer().dumps(obj))
        obj = sc._jvm.SerDe.loads(data)
    return obj
distributed.py 文件源码 项目:pyspark 作者: v-v-vishnevskiy 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def entries(self):
        """
        Entries of the CoordinateMatrix stored as an RDD of
        MatrixEntries.

        >>> mat = CoordinateMatrix(sc.parallelize([MatrixEntry(0, 0, 1.2),
        ...                                        MatrixEntry(6, 4, 2.1)]))
        >>> entries = mat.entries
        >>> entries.first()
        MatrixEntry(0, 0, 1.2)
        """
        # We use DataFrames for serialization of MatrixEntry entries
        # from Java, so we first convert the RDD of entries to a
        # DataFrame on the Scala/Java side. Then we map each Row in
        # the DataFrame back to a MatrixEntry on this side.
        entries_df = callMLlibFunc("getMatrixEntries", self._java_matrix_wrapper._java_model)
        entries = entries_df.map(lambda row: MatrixEntry(row[0], row[1], row[2]))
        return entries
distributed.py 文件源码 项目:pyspark 作者: v-v-vishnevskiy 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def blocks(self):
        """
        The RDD of sub-matrix blocks
        ((blockRowIndex, blockColIndex), sub-matrix) that form this
        distributed matrix.

        >>> mat = BlockMatrix(
        ...     sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
        ...                     ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))]), 3, 2)
        >>> blocks = mat.blocks
        >>> blocks.first()
        ((0, 0), DenseMatrix(3, 2, [1.0, 2.0, 3.0, 4.0, 5.0, 6.0], 0))

        """
        # We use DataFrames for serialization of sub-matrix blocks
        # from Java, so we first convert the RDD of blocks to a
        # DataFrame on the Scala/Java side. Then we map each Row in
        # the DataFrame back to a sub-matrix block on this side.
        blocks_df = callMLlibFunc("getMatrixBlocks", self._java_matrix_wrapper._java_model)
        blocks = blocks_df.map(lambda row: ((row[0][0], row[0][1]), row[1]))
        return blocks
rdd_group_mapper.py 文件源码 项目:Eskapade 作者: KaveIO 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def execute(self):
        """Execute RddGroupMapper"""

        # get process manager and data store
        proc_mgr = ProcessManager()
        ds = proc_mgr.service(DataStore)

        # fetch data frame from data store
        if self.read_key not in ds:
            raise KeyError('no input data found in data store with key "{}"'.format(self.read_key))
        data = ds[self.read_key]
        if not isinstance(data, pyspark.RDD):
            raise TypeError('expected a Spark RDD for "{0:s}" (got "{1:s}")'.format(self.read_key, str(type(data))))

        # apply input map
        if self.input_map:
            data = data.map(self.input_map)

        # group data by keys in the data
        data = data.groupByKey(numPartitions=self.num_group_partitions)

        # apply map on group values
        if self.flatten_output_groups:
            data = data.flatMapValues(self.group_map)
        else:
            data = data.mapValues(self.group_map)

        # apply map on result
        if self.result_map:
            data = data.map(self.result_map)

        # store data in data store
        ds[self.store_key] = data

        return StatusCode.Success
test_tutorial_macros.py 文件源码 项目:Eskapade 作者: KaveIO 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_esk606(self):
        """Test Esk-606: Convert Spark data frame"""

        # run Eskapade
        self.run_eskapade('esk606_convert_spark_df.py')
        proc_mgr = ProcessManager()
        ds = proc_mgr.service(DataStore)

        # define types of stored data sets
        data_types = {'df': pyspark.sql.DataFrame, 'rdd': pyspark.RDD, 'list': list, 'pd': pd.DataFrame}

        # define functions to obtain data-frame content
        content_funcs = {'df': lambda d: sorted(d.rdd.map(tuple).collect()),
                         'rdd': lambda d: sorted(d.collect()),
                         'list': lambda d: sorted(d),
                         'pd': lambda d: sorted(map(tuple, d.values))}

        # check input data
        self.assertIn('df', ds, 'no data found with key "df"')
        self.assertIsInstance(ds['df'], pyspark.sql.DataFrame, 'unexpected type for input data frame')

        # check created data sets
        rows = [(it, 'foo{:d}'.format(it), (it + 1) / 2.) for it in range(20, 100)]
        for key, dtype in data_types.items():
            # check content
            dkey = '{}_output'.format(key)
            self.assertIn(dkey, ds, 'no data found with key "{}"'.format(dkey))
            self.assertIsInstance(ds[dkey], dtype, 'unexpected type for "{}" data'.format(key))
            self.assertListEqual(content_funcs[key](ds[dkey]), rows, 'unexpected content for "{}" data'.format(key))

            # check schema
            skey = '{}_schema'.format(key)
            self.assertIn(skey, ds, 'no schema found with key {}'.format(skey))
            self.assertListEqual(list(ds[skey]), list(ds['df'].schema), 'unexpected schema for "{}" data'.format(key))
context.py 文件源码 项目:MIT-Thesis 作者: alec-heif 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def queueStream(self, rdds, oneAtATime=True, default=None):
        """
        Create an input stream from an queue of RDDs or list. In each batch,
        it will process either one or all of the RDDs returned by the queue.

        .. note:: Changes to the queue after the stream is created will not be recognized.

        @param rdds:       Queue of RDDs
        @param oneAtATime: pick one rdd each time or pick all of them once.
        @param default:    The default rdd if no more in rdds
        """
        if default and not isinstance(default, RDD):
            default = self._sc.parallelize(default)

        if not rdds and default:
            rdds = [rdds]

        if rdds and not isinstance(rdds[0], RDD):
            rdds = [self._sc.parallelize(input) for input in rdds]
        self._check_serializers(rdds)

        queue = self._jvm.PythonDStream.toRDDQueue([r._jrdd for r in rdds])
        if default:
            default = default._reserialize(rdds[0]._jrdd_deserializer)
            jdstream = self._jssc.queueStream(queue, oneAtATime, default._jrdd)
        else:
            jdstream = self._jssc.queueStream(queue, oneAtATime)
        return DStream(jdstream, self, rdds[0]._jrdd_deserializer)
context.py 文件源码 项目:MIT-Thesis 作者: alec-heif 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def transform(self, dstreams, transformFunc):
        """
        Create a new DStream in which each RDD is generated by applying
        a function on RDDs of the DStreams. The order of the JavaRDDs in
        the transform function parameter will be the same as the order
        of corresponding DStreams in the list.
        """
        jdstreams = [d._jdstream for d in dstreams]
        # change the final serializer to sc.serializer
        func = TransformFunction(self._sc,
                                 lambda t, *rdds: transformFunc(rdds).map(lambda x: x),
                                 *[d._jrdd_deserializer for d in dstreams])
        jfunc = self._jvm.TransformFunction(func)
        jdstream = self._jssc.transform(jdstreams, jfunc)
        return DStream(jdstream, self, self._sc.serializer)
dstream.py 文件源码 项目:MIT-Thesis 作者: alec-heif 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def count(self):
        """
        Return a new DStream in which each RDD has a single element
        generated by counting each RDD of this DStream.
        """
        return self.mapPartitions(lambda i: [sum(1 for _ in i)]).reduce(operator.add)
dstream.py 文件源码 项目:MIT-Thesis 作者: alec-heif 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def mapPartitions(self, f, preservesPartitioning=False):
        """
        Return a new DStream in which each RDD is generated by applying
        mapPartitions() to each RDDs of this DStream.
        """
        def func(s, iterator):
            return f(iterator)
        return self.mapPartitionsWithIndex(func, preservesPartitioning)
dstream.py 文件源码 项目:MIT-Thesis 作者: alec-heif 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def mapPartitionsWithIndex(self, f, preservesPartitioning=False):
        """
        Return a new DStream in which each RDD is generated by applying
        mapPartitionsWithIndex() to each RDDs of this DStream.
        """
        return self.transform(lambda rdd: rdd.mapPartitionsWithIndex(f, preservesPartitioning))
dstream.py 文件源码 项目:MIT-Thesis 作者: alec-heif 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def reduceByKey(self, func, numPartitions=None):
        """
        Return a new DStream by applying reduceByKey to each RDD.
        """
        if numPartitions is None:
            numPartitions = self._sc.defaultParallelism
        return self.combineByKey(lambda x: x, func, func, numPartitions)


问题


面经


文章

微信
公众号

扫码关注公众号