python类imap()的实例源码

pool.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def imap(self, func, iterable, chunksize=1):
        '''
        Equivalent of `itertools.imap()` -- can be MUCH slower than `Pool.map()`
        '''
        assert self._state == RUN
        if chunksize == 1:
            result = IMapIterator(self._cache)
            self._taskqueue.put((((result._job, i, func, (x,), {})
                         for i, x in enumerate(iterable)), result._set_length))
            return result
        else:
            assert chunksize > 1
            task_batches = Pool._get_tasks(func, iterable, chunksize)
            result = IMapIterator(self._cache)
            self._taskqueue.put((((result._job, i, mapstar, (x,), {})
                     for i, x in enumerate(task_batches)), result._set_length))
            return (item for chunk in result for item in chunk)
pool.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def imap_unordered(self, func, iterable, chunksize=1):
        '''
        Like `imap()` method but ordering of results is arbitrary
        '''
        assert self._state == RUN
        if chunksize == 1:
            result = IMapUnorderedIterator(self._cache)
            self._taskqueue.put((((result._job, i, func, (x,), {})
                         for i, x in enumerate(iterable)), result._set_length))
            return result
        else:
            assert chunksize > 1
            task_batches = Pool._get_tasks(func, iterable, chunksize)
            result = IMapUnorderedIterator(self._cache)
            self._taskqueue.put((((result._job, i, mapstar, (x,), {})
                     for i, x in enumerate(task_batches)), result._set_length))
            return (item for chunk in result for item in chunk)
ScatterPlotItem.py 文件源码 项目:NeoAnalysis 作者: neoanalysis 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def updateSpots(self, dataSet=None):
        if dataSet is None:
            dataSet = self.data

        invalidate = False
        if self.opts['pxMode']:
            mask = np.equal(dataSet['sourceRect'], None)
            if np.any(mask):
                invalidate = True
                opts = self.getSpotOpts(dataSet[mask])
                sourceRect = self.fragmentAtlas.getSymbolCoords(opts)
                dataSet['sourceRect'][mask] = sourceRect

            self.fragmentAtlas.getAtlas() # generate atlas so source widths are available.

            dataSet['width'] = np.array(list(imap(QtCore.QRectF.width, dataSet['sourceRect'])))/2
            dataSet['targetRect'] = None
            self._maxSpotPxWidth = self.fragmentAtlas.max_width
        else:
            self._maxSpotWidth = 0
            self._maxSpotPxWidth = 0
            self.measureSpotSizes(dataSet)

        if invalidate:
            self.invalidate()
ScatterPlotItem.py 文件源码 项目:NeoAnalysis 作者: neoanalysis 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def updateSpots(self, dataSet=None):
        if dataSet is None:
            dataSet = self.data

        invalidate = False
        if self.opts['pxMode']:
            mask = np.equal(dataSet['sourceRect'], None)
            if np.any(mask):
                invalidate = True
                opts = self.getSpotOpts(dataSet[mask])
                sourceRect = self.fragmentAtlas.getSymbolCoords(opts)
                dataSet['sourceRect'][mask] = sourceRect

            self.fragmentAtlas.getAtlas() # generate atlas so source widths are available.

            dataSet['width'] = np.array(list(imap(QtCore.QRectF.width, dataSet['sourceRect'])))/2
            dataSet['targetRect'] = None
            self._maxSpotPxWidth = self.fragmentAtlas.max_width
        else:
            self._maxSpotWidth = 0
            self._maxSpotPxWidth = 0
            self.measureSpotSizes(dataSet)

        if invalidate:
            self.invalidate()
pool.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def imap(self, func, iterable, chunksize=1):
        '''
        Equivalent of `itertools.imap()` -- can be MUCH slower than `Pool.map()`
        '''
        assert self._state == RUN
        if chunksize == 1:
            result = IMapIterator(self._cache)
            self._taskqueue.put((((result._job, i, func, (x,), {})
                         for i, x in enumerate(iterable)), result._set_length))
            return result
        else:
            assert chunksize > 1
            task_batches = Pool._get_tasks(func, iterable, chunksize)
            result = IMapIterator(self._cache)
            self._taskqueue.put((((result._job, i, mapstar, (x,), {})
                     for i, x in enumerate(task_batches)), result._set_length))
            return (item for chunk in result for item in chunk)
pool.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def imap_unordered(self, func, iterable, chunksize=1):
        '''
        Like `imap()` method but ordering of results is arbitrary
        '''
        assert self._state == RUN
        if chunksize == 1:
            result = IMapUnorderedIterator(self._cache)
            self._taskqueue.put((((result._job, i, func, (x,), {})
                         for i, x in enumerate(iterable)), result._set_length))
            return result
        else:
            assert chunksize > 1
            task_batches = Pool._get_tasks(func, iterable, chunksize)
            result = IMapUnorderedIterator(self._cache)
            self._taskqueue.put((((result._job, i, mapstar, (x,), {})
                     for i, x in enumerate(task_batches)), result._set_length))
            return (item for chunk in result for item in chunk)
dstream.py 文件源码 项目:MIT-Thesis 作者: alec-heif 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def countByValueAndWindow(self, windowDuration, slideDuration, numPartitions=None):
        """
        Return a new DStream in which each RDD contains the count of distinct elements in
        RDDs in a sliding window over this DStream.

        @param windowDuration: width of the window; must be a multiple of this DStream's
                              batching interval
        @param slideDuration:  sliding interval of the window (i.e., the interval after which
                              the new DStream will generate RDDs); must be a multiple of this
                              DStream's batching interval
        @param numPartitions:  number of partitions of each RDD in the new DStream.
        """
        keyed = self.map(lambda x: (x, 1))
        counted = keyed.reduceByKeyAndWindow(operator.add, operator.sub,
                                             windowDuration, slideDuration, numPartitions)
        return counted.filter(lambda kv: kv[1] > 0)
functions.py 文件源码 项目:MIT-Thesis 作者: alec-heif 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def log(arg1, arg2=None):
    """Returns the first argument-based logarithm of the second argument.

    If there is only one argument, then this takes the natural logarithm of the argument.

    >>> df.select(log(10.0, df.age).alias('ten')).rdd.map(lambda l: str(l.ten)[:7]).collect()
    ['0.30102', '0.69897']

    >>> df.select(log(df.age).alias('e')).rdd.map(lambda l: str(l.e)[:7]).collect()
    ['0.69314', '1.60943']
    """
    sc = SparkContext._active_spark_context
    if arg2 is None:
        jc = sc._jvm.functions.log(_to_java_column(arg1))
    else:
        jc = sc._jvm.functions.log(arg1, _to_java_column(arg2))
    return Column(jc)
functions.py 文件源码 项目:MIT-Thesis 作者: alec-heif 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def create_map(*cols):
    """Creates a new map column.

    :param cols: list of column names (string) or list of :class:`Column` expressions that grouped
        as key-value pairs, e.g. (key1, value1, key2, value2, ...).

    >>> df.select(create_map('name', 'age').alias("map")).collect()
    [Row(map={u'Alice': 2}), Row(map={u'Bob': 5})]
    >>> df.select(create_map([df.name, df.age]).alias("map")).collect()
    [Row(map={u'Alice': 2}), Row(map={u'Bob': 5})]
    """
    sc = SparkContext._active_spark_context
    if len(cols) == 1 and isinstance(cols[0], (list, set)):
        cols = cols[0]
    jc = sc._jvm.functions.map(_to_seq(sc, cols, _to_java_column))
    return Column(jc)
functions.py 文件源码 项目:MIT-Thesis 作者: alec-heif 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def posexplode(col):
    """Returns a new row for each element with position in the given array or map.

    >>> from pyspark.sql import Row
    >>> eDF = spark.createDataFrame([Row(a=1, intlist=[1,2,3], mapfield={"a": "b"})])
    >>> eDF.select(posexplode(eDF.intlist)).collect()
    [Row(pos=0, col=1), Row(pos=1, col=2), Row(pos=2, col=3)]

    >>> eDF.select(posexplode(eDF.mapfield)).show()
    +---+---+-----+
    |pos|key|value|
    +---+---+-----+
    |  0|  a|    b|
    +---+---+-----+
    """
    sc = SparkContext._active_spark_context
    jc = sc._jvm.functions.posexplode(_to_java_column(col))
    return Column(jc)
session.py 文件源码 项目:MIT-Thesis 作者: alec-heif 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _inferSchemaFromList(self, data):
        """
        Infer schema from list of Row or tuple.

        :param data: list of Row or tuple
        :return: :class:`pyspark.sql.types.StructType`
        """
        if not data:
            raise ValueError("can not infer schema from empty dataset")
        first = data[0]
        if type(first) is dict:
            warnings.warn("inferring schema from dict is deprecated,"
                          "please use pyspark.sql.Row instead")
        schema = reduce(_merge_type, map(_infer_schema, data))
        if _has_nulltype(schema):
            raise ValueError("Some of types cannot be determined after inferring")
        return schema
session.py 文件源码 项目:MIT-Thesis 作者: alec-heif 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _createFromLocal(self, data, schema):
        """
        Create an RDD for DataFrame from a list or pandas.DataFrame, returns
        the RDD and schema.
        """
        # make sure data could consumed multiple times
        if not isinstance(data, list):
            data = list(data)

        if schema is None or isinstance(schema, (list, tuple)):
            struct = self._inferSchemaFromList(data)
            converter = _create_converter(struct)
            data = map(converter, data)
            if isinstance(schema, (list, tuple)):
                for i, name in enumerate(schema):
                    struct.fields[i].name = name
                    struct.names[i] = name
            schema = struct

        elif not isinstance(schema, StructType):
            raise TypeError("schema should be StructType or list or None, but got: %s" % schema)

        # convert python objects to sql data
        data = [schema.toInternal(row) for row in data]
        return self._sc.parallelize(data), schema
rdd.py 文件源码 项目:MIT-Thesis 作者: alec-heif 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def fullOuterJoin(self, other, numPartitions=None):
        """
        Perform a right outer join of C{self} and C{other}.

        For each element (k, v) in C{self}, the resulting RDD will either
        contain all pairs (k, (v, w)) for w in C{other}, or the pair
        (k, (v, None)) if no elements in C{other} have key k.

        Similarly, for each element (k, w) in C{other}, the resulting RDD will
        either contain all pairs (k, (v, w)) for v in C{self}, or the pair
        (k, (None, w)) if no elements in C{self} have key k.

        Hash-partitions the resulting RDD into the given number of partitions.

        >>> x = sc.parallelize([("a", 1), ("b", 4)])
        >>> y = sc.parallelize([("a", 2), ("c", 8)])
        >>> sorted(x.fullOuterJoin(y).collect())
        [('a', (1, 2)), ('b', (4, None)), ('c', (None, 8))]
        """
        return python_full_outer_join(self, other, numPartitions)

    # TODO: add option to control map-side combining
    # portable_hash is used as default, because builtin hash of None is different
    # cross machines.
rdd.py 文件源码 项目:MIT-Thesis 作者: alec-heif 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def sampleByKey(self, withReplacement, fractions, seed=None):
        """
        Return a subset of this RDD sampled by key (via stratified sampling).
        Create a sample of this RDD using variable sampling rates for
        different keys as specified by fractions, a key to sampling rate map.

        >>> fractions = {"a": 0.2, "b": 0.1}
        >>> rdd = sc.parallelize(fractions.keys()).cartesian(sc.parallelize(range(0, 1000)))
        >>> sample = dict(rdd.sampleByKey(False, fractions, 2).groupByKey().collect())
        >>> 100 < len(sample["a"]) < 300 and 50 < len(sample["b"]) < 150
        True
        >>> max(sample["a"]) <= 999 and min(sample["a"]) >= 0
        True
        >>> max(sample["b"]) <= 999 and min(sample["b"]) >= 0
        True
        """
        for fraction in fractions.values():
            assert fraction >= 0.0, "Negative fraction value: %s" % fraction
        return self.mapPartitionsWithIndex(
            RDDStratifiedSampler(withReplacement, fractions, seed).func, True)
rdd.py 文件源码 项目:MIT-Thesis 作者: alec-heif 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def meanApprox(self, timeout, confidence=0.95):
        """
        .. note:: Experimental

        Approximate operation to return the mean within a timeout
        or meet the confidence.

        >>> rdd = sc.parallelize(range(1000), 10)
        >>> r = sum(range(1000)) / 1000.0
        >>> abs(rdd.meanApprox(1000) - r) / r < 0.05
        True
        """
        jrdd = self.map(float)._to_java_object_rdd()
        jdrdd = self.ctx._jvm.JavaDoubleRDD.fromRDD(jrdd.rdd())
        r = jdrdd.meanApprox(timeout, confidence).getFinalValue()
        return BoundedFloat(r.mean(), r.confidence(), r.low(), r.high())
ipaddress.py 文件源码 项目:aws-cfn-plex 作者: lordmuffin 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _ip_int_from_string(self, ip_str):
        """Turn the given IP string into an integer for comparison.

        Args:
            ip_str: A string, the IP ip_str.

        Returns:
            The IP ip_str as an integer.

        Raises:
            AddressValueError: if ip_str isn't a valid IPv4 Address.

        """
        if not ip_str:
            raise AddressValueError('Address cannot be empty')

        octets = ip_str.split('.')
        if len(octets) != 4:
            raise AddressValueError("Expected 4 octets in %r" % ip_str)

        try:
            return _int_from_bytes(map(self._parse_octet, octets), 'big')
        except ValueError as exc:
            raise AddressValueError("%s in %r" % (exc, ip_str))
ipaddress.py 文件源码 项目:aws-cfn-plex 作者: lordmuffin 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _is_hostmask(self, ip_str):
        """Test if the IP string is a hostmask (rather than a netmask).

        Args:
            ip_str: A string, the potential hostmask.

        Returns:
            A boolean, True if the IP string is a hostmask.

        """
        bits = ip_str.split('.')
        try:
            parts = [x for x in map(int, bits) if x in self._valid_mask_octets]
        except ValueError:
            return False
        if len(parts) != len(bits):
            return False
        if parts[0] < parts[-1]:
            return True
        return False
sortedlistwithkey.py 文件源码 项目:Intranet-Penetration 作者: yuxiaokui 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def update(self, iterable):
        """Update the list by adding all elements from *iterable*."""
        _maxes, _lists, _keys = self._maxes, self._lists, self._keys
        values = sorted(iterable, key=self._key)

        if _maxes:
            if len(values) * 4 >= self._len:
                values.extend(chain.from_iterable(_lists))
                values.sort(key=self._key)
                self.clear()
            else:
                _add = self.add
                for val in values:
                    _add(val)
                return

        _load, _index = self._load, self._index
        _lists.extend(values[pos:(pos + _load)]
                      for pos in range(0, len(values), _load))
        _keys.extend(list(map(self._key, _list)) for _list in _lists)
        _maxes.extend(sublist[-1] for sublist in _keys)
        self._len = len(values)
        del _index[:]
sortedlistwithkey.py 文件源码 项目:MKFQ 作者: maojingios 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def update(self, iterable):
        """Update the list by adding all elements from *iterable*."""
        _maxes, _lists, _keys = self._maxes, self._lists, self._keys
        values = sorted(iterable, key=self._key)

        if _maxes:
            if len(values) * 4 >= self._len:
                values.extend(chain.from_iterable(_lists))
                values.sort(key=self._key)
                self.clear()
            else:
                _add = self.add
                for val in values:
                    _add(val)
                return

        _load, _index = self._load, self._index
        _lists.extend(values[pos:(pos + _load)]
                      for pos in range(0, len(values), _load))
        _keys.extend(list(map(self._key, _list)) for _list in _lists)
        _maxes.extend(sublist[-1] for sublist in _keys)
        self._len = len(values)
        del _index[:]
import_data.py 文件源码 项目:word2vec_pipeline 作者: NIHOPA 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def csv_iterator(f_csv, clean=True, _PARALLEL=False, merge_cols=False):
    '''
    Creates and iterator over a CSV file, optionally cleans it.
    '''
    with open(f_csv) as FIN:
        CSV = csv.DictReader(FIN)

        if clean and _PARALLEL:
            CSV = jobmap(clean_row, CSV, FLAG_PARALLEL=_PARALLEL)
        elif clean and not _PARALLEL:
            CSV = itertools.imap(clean_row, CSV)

        try:
            for row in CSV:
                yield row
        except:
            pass
document_scores.py 文件源码 项目:word2vec_pipeline 作者: NIHOPA 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def compute_single(self, INPUT_ITR):

        assert(self.method is not None)
        print("Scoring {}".format(self.method))

        self._ref = []
        self.V = []
        self.current_filename = None
        ITR = itertools.imap(self.score_document, tqdm(INPUT_ITR))

        for row in ITR:

            # Require that filenames don't change in compute_single
            assert (self.current_filename in [None, row["_filename"]])
            self.current_filename = row["_filename"]

            self.V.append(row["doc_vec"])
            self._ref.append(int(row["_ref"]))

        self.V = np.array(self.V)
        self._ref = np.array(self._ref)
__init__.py 文件源码 项目:sphinx-nbexamples 作者: Chilipp 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def create_py(self, nb, force=False):
        """Create the python script from the notebook node"""
        # Although we would love to simply use ``nbconvert.export_python(nb)``
        # this causes troubles in other cells processed by the ipython
        # directive. Instead of getting something like ``Out [5]:``, we get
        # some weird like '[0;31mOut[[1;31m5[0;31m]: [0m' which look like
        # color information if we allow the call of nbconvert.export_python
        if list(map(int, re.findall('\d+', nbconvert.__version__))) >= [4, 2]:
            py_file = os.path.basename(self.py_file)
        else:
            py_file = self.py_file
        spr.call(['jupyter', 'nbconvert', '--to=python',
                  '--output=' + py_file, '--log-level=%s' % logger.level,
                  self.outfile])
        with open(self.py_file) as f:
            py_content = f.read()
        # comment out ipython magics
        py_content = re.sub('^\s*get_ipython\(\).magic.*', '# \g<0>',
                            py_content, flags=re.MULTILINE)
        with open(self.py_file, 'w') as f:
            f.write(py_content)
connection.py 文件源码 项目:cuny-bdif 作者: aristotle-tek 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_all_hits(self):
        """
        Return all of a Requester's HITs

        Despite what search_hits says, it does not return all hits, but
        instead returns a page of hits. This method will pull the hits
        from the server 100 at a time, but will yield the results
        iteratively, so subsequent requests are made on demand.
        """
        page_size = 100
        search_rs = self.search_hits(page_size=page_size)
        total_records = int(search_rs.TotalNumResults)
        get_page_hits = lambda page: self.search_hits(page_size=page_size, page_number=page)
        page_nums = self._get_pages(page_size, total_records)
        hit_sets = itertools.imap(get_page_hits, page_nums)
        return itertools.chain.from_iterable(hit_sets)
ir_qweb.py 文件源码 项目:gooderp_org 作者: osbzr 项目源码 文件源码 阅读 54 收藏 0 点赞 0 评论 0
def record_to_html(self, cr, uid, field_name, record, options=None, context=None):
        if options is None: options = {}
        aclasses = ['img', 'img-responsive'] + options.get('class', '').split()
        classes = ' '.join(itertools.imap(escape, aclasses))

        max_size = None
        max_width, max_height = options.get('max_width', 0), options.get('max_height', 0)
        if max_width or max_height:
            max_size = '%sx%s' % (max_width, max_height)

        src = self.pool['website'].image_url(cr, uid, record, field_name, max_size)
        alt = None
        if options.get('alt-field') and getattr(record, options['alt-field'], None):
            alt = record[options['alt-field']]
        elif options.get('alt'):
            alt = options['alt']
        img = '<img class="%s" src="%s" style="%s"%s/>' % (classes, src, options.get('style', ''), ' alt="%s"' % alt if alt else '')
        return ir_qweb.HTMLSafe(img)
tags.py 文件源码 项目:idascripts 作者: ctfhacker 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def everything(use_cache=False):
    '''Return all the tags within the database as (globals, contents, frames).'''
    if use_cache:
        g, f = cached()

    else:
        print >>output, '--> Grabbing globals...'
        g = {ea : d for ea, d in globals()}

        print >>output, '--> Grabbing contents from all functions...'
        res = (function(ea) for ea in db.functions())
        f = {}
        map(f.update, itertools.imap(dict, itertools.ifilter(None, res)))

    print >>output, '--> Grabbing frames from all functions...'
    h = {ea : d for ea, d in frames()}
    return (g, f, h)
_interface.py 文件源码 项目:idascripts 作者: ctfhacker 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def use(cls, regs):
        _instruction = sys.modules.get('instruction', __import__('instruction'))

        # convert any regs that are strings into their correct object type
        regs = { _instruction.reg.by_name(r) if isinstance(r, basestring) else r for r in regs }

        # returns an iterable of bools that returns whether r is a subset of any of the registers in ``regs``.
        match = lambda r, regs=regs: any(itertools.imap(r.relatedQ, regs))

        # returns true if the operand at the specified address is related to one of the registers in ``regs``.
        def uses_register(ea, opnum):
            val = _instruction.op_value(ea, opnum)
            if isinstance(val, symbol_t):
                return any(map(match, val.__symbols__))
            return False

        return uses_register
pool.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def imap(self, func, iterable, chunksize=1):
        '''
        Equivalent of `itertools.imap()` -- can be MUCH slower than `Pool.map()`
        '''
        assert self._state == RUN
        if chunksize == 1:
            result = IMapIterator(self._cache)
            self._taskqueue.put((((result._job, i, func, (x,), {})
                         for i, x in enumerate(iterable)), result._set_length))
            return result
        else:
            assert chunksize > 1
            task_batches = Pool._get_tasks(func, iterable, chunksize)
            result = IMapIterator(self._cache)
            self._taskqueue.put((((result._job, i, mapstar, (x,), {})
                     for i, x in enumerate(task_batches)), result._set_length))
            return (item for chunk in result for item in chunk)
pool.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def imap_unordered(self, func, iterable, chunksize=1):
        '''
        Like `imap()` method but ordering of results is arbitrary
        '''
        assert self._state == RUN
        if chunksize == 1:
            result = IMapUnorderedIterator(self._cache)
            self._taskqueue.put((((result._job, i, func, (x,), {})
                         for i, x in enumerate(iterable)), result._set_length))
            return result
        else:
            assert chunksize > 1
            task_batches = Pool._get_tasks(func, iterable, chunksize)
            result = IMapUnorderedIterator(self._cache)
            self._taskqueue.put((((result._job, i, mapstar, (x,), {})
                     for i, x in enumerate(task_batches)), result._set_length))
            return (item for chunk in result for item in chunk)
pool.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def imap(self, func, iterable, chunksize=1):
        '''
        Equivalent of `itertools.imap()` -- can be MUCH slower than `Pool.map()`
        '''
        assert self._state == RUN
        if chunksize == 1:
            result = IMapIterator(self._cache)
            self._taskqueue.put((((result._job, i, func, (x,), {})
                         for i, x in enumerate(iterable)), result._set_length))
            return result
        else:
            assert chunksize > 1
            task_batches = Pool._get_tasks(func, iterable, chunksize)
            result = IMapIterator(self._cache)
            self._taskqueue.put((((result._job, i, mapstar, (x,), {})
                     for i, x in enumerate(task_batches)), result._set_length))
            return (item for chunk in result for item in chunk)
pool.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def imap_unordered(self, func, iterable, chunksize=1):
        '''
        Like `imap()` method but ordering of results is arbitrary
        '''
        assert self._state == RUN
        if chunksize == 1:
            result = IMapUnorderedIterator(self._cache)
            self._taskqueue.put((((result._job, i, func, (x,), {})
                         for i, x in enumerate(iterable)), result._set_length))
            return result
        else:
            assert chunksize > 1
            task_batches = Pool._get_tasks(func, iterable, chunksize)
            result = IMapUnorderedIterator(self._cache)
            self._taskqueue.put((((result._job, i, mapstar, (x,), {})
                     for i, x in enumerate(task_batches)), result._set_length))
            return (item for chunk in result for item in chunk)


问题


面经


文章

微信
公众号

扫码关注公众号