def imap(self, func, iterable, chunksize=1):
'''
Equivalent of `itertools.imap()` -- can be MUCH slower than `Pool.map()`
'''
assert self._state == RUN
if chunksize == 1:
result = IMapIterator(self._cache)
self._taskqueue.put((((result._job, i, func, (x,), {})
for i, x in enumerate(iterable)), result._set_length))
return result
else:
assert chunksize > 1
task_batches = Pool._get_tasks(func, iterable, chunksize)
result = IMapIterator(self._cache)
self._taskqueue.put((((result._job, i, mapstar, (x,), {})
for i, x in enumerate(task_batches)), result._set_length))
return (item for chunk in result for item in chunk)
python类imap()的实例源码
def imap_unordered(self, func, iterable, chunksize=1):
'''
Like `imap()` method but ordering of results is arbitrary
'''
assert self._state == RUN
if chunksize == 1:
result = IMapUnorderedIterator(self._cache)
self._taskqueue.put((((result._job, i, func, (x,), {})
for i, x in enumerate(iterable)), result._set_length))
return result
else:
assert chunksize > 1
task_batches = Pool._get_tasks(func, iterable, chunksize)
result = IMapUnorderedIterator(self._cache)
self._taskqueue.put((((result._job, i, mapstar, (x,), {})
for i, x in enumerate(task_batches)), result._set_length))
return (item for chunk in result for item in chunk)
def updateSpots(self, dataSet=None):
if dataSet is None:
dataSet = self.data
invalidate = False
if self.opts['pxMode']:
mask = np.equal(dataSet['sourceRect'], None)
if np.any(mask):
invalidate = True
opts = self.getSpotOpts(dataSet[mask])
sourceRect = self.fragmentAtlas.getSymbolCoords(opts)
dataSet['sourceRect'][mask] = sourceRect
self.fragmentAtlas.getAtlas() # generate atlas so source widths are available.
dataSet['width'] = np.array(list(imap(QtCore.QRectF.width, dataSet['sourceRect'])))/2
dataSet['targetRect'] = None
self._maxSpotPxWidth = self.fragmentAtlas.max_width
else:
self._maxSpotWidth = 0
self._maxSpotPxWidth = 0
self.measureSpotSizes(dataSet)
if invalidate:
self.invalidate()
def updateSpots(self, dataSet=None):
if dataSet is None:
dataSet = self.data
invalidate = False
if self.opts['pxMode']:
mask = np.equal(dataSet['sourceRect'], None)
if np.any(mask):
invalidate = True
opts = self.getSpotOpts(dataSet[mask])
sourceRect = self.fragmentAtlas.getSymbolCoords(opts)
dataSet['sourceRect'][mask] = sourceRect
self.fragmentAtlas.getAtlas() # generate atlas so source widths are available.
dataSet['width'] = np.array(list(imap(QtCore.QRectF.width, dataSet['sourceRect'])))/2
dataSet['targetRect'] = None
self._maxSpotPxWidth = self.fragmentAtlas.max_width
else:
self._maxSpotWidth = 0
self._maxSpotPxWidth = 0
self.measureSpotSizes(dataSet)
if invalidate:
self.invalidate()
def imap(self, func, iterable, chunksize=1):
'''
Equivalent of `itertools.imap()` -- can be MUCH slower than `Pool.map()`
'''
assert self._state == RUN
if chunksize == 1:
result = IMapIterator(self._cache)
self._taskqueue.put((((result._job, i, func, (x,), {})
for i, x in enumerate(iterable)), result._set_length))
return result
else:
assert chunksize > 1
task_batches = Pool._get_tasks(func, iterable, chunksize)
result = IMapIterator(self._cache)
self._taskqueue.put((((result._job, i, mapstar, (x,), {})
for i, x in enumerate(task_batches)), result._set_length))
return (item for chunk in result for item in chunk)
def imap_unordered(self, func, iterable, chunksize=1):
'''
Like `imap()` method but ordering of results is arbitrary
'''
assert self._state == RUN
if chunksize == 1:
result = IMapUnorderedIterator(self._cache)
self._taskqueue.put((((result._job, i, func, (x,), {})
for i, x in enumerate(iterable)), result._set_length))
return result
else:
assert chunksize > 1
task_batches = Pool._get_tasks(func, iterable, chunksize)
result = IMapUnorderedIterator(self._cache)
self._taskqueue.put((((result._job, i, mapstar, (x,), {})
for i, x in enumerate(task_batches)), result._set_length))
return (item for chunk in result for item in chunk)
def countByValueAndWindow(self, windowDuration, slideDuration, numPartitions=None):
"""
Return a new DStream in which each RDD contains the count of distinct elements in
RDDs in a sliding window over this DStream.
@param windowDuration: width of the window; must be a multiple of this DStream's
batching interval
@param slideDuration: sliding interval of the window (i.e., the interval after which
the new DStream will generate RDDs); must be a multiple of this
DStream's batching interval
@param numPartitions: number of partitions of each RDD in the new DStream.
"""
keyed = self.map(lambda x: (x, 1))
counted = keyed.reduceByKeyAndWindow(operator.add, operator.sub,
windowDuration, slideDuration, numPartitions)
return counted.filter(lambda kv: kv[1] > 0)
def log(arg1, arg2=None):
"""Returns the first argument-based logarithm of the second argument.
If there is only one argument, then this takes the natural logarithm of the argument.
>>> df.select(log(10.0, df.age).alias('ten')).rdd.map(lambda l: str(l.ten)[:7]).collect()
['0.30102', '0.69897']
>>> df.select(log(df.age).alias('e')).rdd.map(lambda l: str(l.e)[:7]).collect()
['0.69314', '1.60943']
"""
sc = SparkContext._active_spark_context
if arg2 is None:
jc = sc._jvm.functions.log(_to_java_column(arg1))
else:
jc = sc._jvm.functions.log(arg1, _to_java_column(arg2))
return Column(jc)
def create_map(*cols):
"""Creates a new map column.
:param cols: list of column names (string) or list of :class:`Column` expressions that grouped
as key-value pairs, e.g. (key1, value1, key2, value2, ...).
>>> df.select(create_map('name', 'age').alias("map")).collect()
[Row(map={u'Alice': 2}), Row(map={u'Bob': 5})]
>>> df.select(create_map([df.name, df.age]).alias("map")).collect()
[Row(map={u'Alice': 2}), Row(map={u'Bob': 5})]
"""
sc = SparkContext._active_spark_context
if len(cols) == 1 and isinstance(cols[0], (list, set)):
cols = cols[0]
jc = sc._jvm.functions.map(_to_seq(sc, cols, _to_java_column))
return Column(jc)
def posexplode(col):
"""Returns a new row for each element with position in the given array or map.
>>> from pyspark.sql import Row
>>> eDF = spark.createDataFrame([Row(a=1, intlist=[1,2,3], mapfield={"a": "b"})])
>>> eDF.select(posexplode(eDF.intlist)).collect()
[Row(pos=0, col=1), Row(pos=1, col=2), Row(pos=2, col=3)]
>>> eDF.select(posexplode(eDF.mapfield)).show()
+---+---+-----+
|pos|key|value|
+---+---+-----+
| 0| a| b|
+---+---+-----+
"""
sc = SparkContext._active_spark_context
jc = sc._jvm.functions.posexplode(_to_java_column(col))
return Column(jc)
def _inferSchemaFromList(self, data):
"""
Infer schema from list of Row or tuple.
:param data: list of Row or tuple
:return: :class:`pyspark.sql.types.StructType`
"""
if not data:
raise ValueError("can not infer schema from empty dataset")
first = data[0]
if type(first) is dict:
warnings.warn("inferring schema from dict is deprecated,"
"please use pyspark.sql.Row instead")
schema = reduce(_merge_type, map(_infer_schema, data))
if _has_nulltype(schema):
raise ValueError("Some of types cannot be determined after inferring")
return schema
def _createFromLocal(self, data, schema):
"""
Create an RDD for DataFrame from a list or pandas.DataFrame, returns
the RDD and schema.
"""
# make sure data could consumed multiple times
if not isinstance(data, list):
data = list(data)
if schema is None or isinstance(schema, (list, tuple)):
struct = self._inferSchemaFromList(data)
converter = _create_converter(struct)
data = map(converter, data)
if isinstance(schema, (list, tuple)):
for i, name in enumerate(schema):
struct.fields[i].name = name
struct.names[i] = name
schema = struct
elif not isinstance(schema, StructType):
raise TypeError("schema should be StructType or list or None, but got: %s" % schema)
# convert python objects to sql data
data = [schema.toInternal(row) for row in data]
return self._sc.parallelize(data), schema
def fullOuterJoin(self, other, numPartitions=None):
"""
Perform a right outer join of C{self} and C{other}.
For each element (k, v) in C{self}, the resulting RDD will either
contain all pairs (k, (v, w)) for w in C{other}, or the pair
(k, (v, None)) if no elements in C{other} have key k.
Similarly, for each element (k, w) in C{other}, the resulting RDD will
either contain all pairs (k, (v, w)) for v in C{self}, or the pair
(k, (None, w)) if no elements in C{self} have key k.
Hash-partitions the resulting RDD into the given number of partitions.
>>> x = sc.parallelize([("a", 1), ("b", 4)])
>>> y = sc.parallelize([("a", 2), ("c", 8)])
>>> sorted(x.fullOuterJoin(y).collect())
[('a', (1, 2)), ('b', (4, None)), ('c', (None, 8))]
"""
return python_full_outer_join(self, other, numPartitions)
# TODO: add option to control map-side combining
# portable_hash is used as default, because builtin hash of None is different
# cross machines.
def sampleByKey(self, withReplacement, fractions, seed=None):
"""
Return a subset of this RDD sampled by key (via stratified sampling).
Create a sample of this RDD using variable sampling rates for
different keys as specified by fractions, a key to sampling rate map.
>>> fractions = {"a": 0.2, "b": 0.1}
>>> rdd = sc.parallelize(fractions.keys()).cartesian(sc.parallelize(range(0, 1000)))
>>> sample = dict(rdd.sampleByKey(False, fractions, 2).groupByKey().collect())
>>> 100 < len(sample["a"]) < 300 and 50 < len(sample["b"]) < 150
True
>>> max(sample["a"]) <= 999 and min(sample["a"]) >= 0
True
>>> max(sample["b"]) <= 999 and min(sample["b"]) >= 0
True
"""
for fraction in fractions.values():
assert fraction >= 0.0, "Negative fraction value: %s" % fraction
return self.mapPartitionsWithIndex(
RDDStratifiedSampler(withReplacement, fractions, seed).func, True)
def meanApprox(self, timeout, confidence=0.95):
"""
.. note:: Experimental
Approximate operation to return the mean within a timeout
or meet the confidence.
>>> rdd = sc.parallelize(range(1000), 10)
>>> r = sum(range(1000)) / 1000.0
>>> abs(rdd.meanApprox(1000) - r) / r < 0.05
True
"""
jrdd = self.map(float)._to_java_object_rdd()
jdrdd = self.ctx._jvm.JavaDoubleRDD.fromRDD(jrdd.rdd())
r = jdrdd.meanApprox(timeout, confidence).getFinalValue()
return BoundedFloat(r.mean(), r.confidence(), r.low(), r.high())
def _ip_int_from_string(self, ip_str):
"""Turn the given IP string into an integer for comparison.
Args:
ip_str: A string, the IP ip_str.
Returns:
The IP ip_str as an integer.
Raises:
AddressValueError: if ip_str isn't a valid IPv4 Address.
"""
if not ip_str:
raise AddressValueError('Address cannot be empty')
octets = ip_str.split('.')
if len(octets) != 4:
raise AddressValueError("Expected 4 octets in %r" % ip_str)
try:
return _int_from_bytes(map(self._parse_octet, octets), 'big')
except ValueError as exc:
raise AddressValueError("%s in %r" % (exc, ip_str))
def _is_hostmask(self, ip_str):
"""Test if the IP string is a hostmask (rather than a netmask).
Args:
ip_str: A string, the potential hostmask.
Returns:
A boolean, True if the IP string is a hostmask.
"""
bits = ip_str.split('.')
try:
parts = [x for x in map(int, bits) if x in self._valid_mask_octets]
except ValueError:
return False
if len(parts) != len(bits):
return False
if parts[0] < parts[-1]:
return True
return False
def update(self, iterable):
"""Update the list by adding all elements from *iterable*."""
_maxes, _lists, _keys = self._maxes, self._lists, self._keys
values = sorted(iterable, key=self._key)
if _maxes:
if len(values) * 4 >= self._len:
values.extend(chain.from_iterable(_lists))
values.sort(key=self._key)
self.clear()
else:
_add = self.add
for val in values:
_add(val)
return
_load, _index = self._load, self._index
_lists.extend(values[pos:(pos + _load)]
for pos in range(0, len(values), _load))
_keys.extend(list(map(self._key, _list)) for _list in _lists)
_maxes.extend(sublist[-1] for sublist in _keys)
self._len = len(values)
del _index[:]
def update(self, iterable):
"""Update the list by adding all elements from *iterable*."""
_maxes, _lists, _keys = self._maxes, self._lists, self._keys
values = sorted(iterable, key=self._key)
if _maxes:
if len(values) * 4 >= self._len:
values.extend(chain.from_iterable(_lists))
values.sort(key=self._key)
self.clear()
else:
_add = self.add
for val in values:
_add(val)
return
_load, _index = self._load, self._index
_lists.extend(values[pos:(pos + _load)]
for pos in range(0, len(values), _load))
_keys.extend(list(map(self._key, _list)) for _list in _lists)
_maxes.extend(sublist[-1] for sublist in _keys)
self._len = len(values)
del _index[:]
def csv_iterator(f_csv, clean=True, _PARALLEL=False, merge_cols=False):
'''
Creates and iterator over a CSV file, optionally cleans it.
'''
with open(f_csv) as FIN:
CSV = csv.DictReader(FIN)
if clean and _PARALLEL:
CSV = jobmap(clean_row, CSV, FLAG_PARALLEL=_PARALLEL)
elif clean and not _PARALLEL:
CSV = itertools.imap(clean_row, CSV)
try:
for row in CSV:
yield row
except:
pass
def compute_single(self, INPUT_ITR):
assert(self.method is not None)
print("Scoring {}".format(self.method))
self._ref = []
self.V = []
self.current_filename = None
ITR = itertools.imap(self.score_document, tqdm(INPUT_ITR))
for row in ITR:
# Require that filenames don't change in compute_single
assert (self.current_filename in [None, row["_filename"]])
self.current_filename = row["_filename"]
self.V.append(row["doc_vec"])
self._ref.append(int(row["_ref"]))
self.V = np.array(self.V)
self._ref = np.array(self._ref)
def create_py(self, nb, force=False):
"""Create the python script from the notebook node"""
# Although we would love to simply use ``nbconvert.export_python(nb)``
# this causes troubles in other cells processed by the ipython
# directive. Instead of getting something like ``Out [5]:``, we get
# some weird like '[0;31mOut[[1;31m5[0;31m]: [0m' which look like
# color information if we allow the call of nbconvert.export_python
if list(map(int, re.findall('\d+', nbconvert.__version__))) >= [4, 2]:
py_file = os.path.basename(self.py_file)
else:
py_file = self.py_file
spr.call(['jupyter', 'nbconvert', '--to=python',
'--output=' + py_file, '--log-level=%s' % logger.level,
self.outfile])
with open(self.py_file) as f:
py_content = f.read()
# comment out ipython magics
py_content = re.sub('^\s*get_ipython\(\).magic.*', '# \g<0>',
py_content, flags=re.MULTILINE)
with open(self.py_file, 'w') as f:
f.write(py_content)
def get_all_hits(self):
"""
Return all of a Requester's HITs
Despite what search_hits says, it does not return all hits, but
instead returns a page of hits. This method will pull the hits
from the server 100 at a time, but will yield the results
iteratively, so subsequent requests are made on demand.
"""
page_size = 100
search_rs = self.search_hits(page_size=page_size)
total_records = int(search_rs.TotalNumResults)
get_page_hits = lambda page: self.search_hits(page_size=page_size, page_number=page)
page_nums = self._get_pages(page_size, total_records)
hit_sets = itertools.imap(get_page_hits, page_nums)
return itertools.chain.from_iterable(hit_sets)
def record_to_html(self, cr, uid, field_name, record, options=None, context=None):
if options is None: options = {}
aclasses = ['img', 'img-responsive'] + options.get('class', '').split()
classes = ' '.join(itertools.imap(escape, aclasses))
max_size = None
max_width, max_height = options.get('max_width', 0), options.get('max_height', 0)
if max_width or max_height:
max_size = '%sx%s' % (max_width, max_height)
src = self.pool['website'].image_url(cr, uid, record, field_name, max_size)
alt = None
if options.get('alt-field') and getattr(record, options['alt-field'], None):
alt = record[options['alt-field']]
elif options.get('alt'):
alt = options['alt']
img = '<img class="%s" src="%s" style="%s"%s/>' % (classes, src, options.get('style', ''), ' alt="%s"' % alt if alt else '')
return ir_qweb.HTMLSafe(img)
def everything(use_cache=False):
'''Return all the tags within the database as (globals, contents, frames).'''
if use_cache:
g, f = cached()
else:
print >>output, '--> Grabbing globals...'
g = {ea : d for ea, d in globals()}
print >>output, '--> Grabbing contents from all functions...'
res = (function(ea) for ea in db.functions())
f = {}
map(f.update, itertools.imap(dict, itertools.ifilter(None, res)))
print >>output, '--> Grabbing frames from all functions...'
h = {ea : d for ea, d in frames()}
return (g, f, h)
def use(cls, regs):
_instruction = sys.modules.get('instruction', __import__('instruction'))
# convert any regs that are strings into their correct object type
regs = { _instruction.reg.by_name(r) if isinstance(r, basestring) else r for r in regs }
# returns an iterable of bools that returns whether r is a subset of any of the registers in ``regs``.
match = lambda r, regs=regs: any(itertools.imap(r.relatedQ, regs))
# returns true if the operand at the specified address is related to one of the registers in ``regs``.
def uses_register(ea, opnum):
val = _instruction.op_value(ea, opnum)
if isinstance(val, symbol_t):
return any(map(match, val.__symbols__))
return False
return uses_register
def imap(self, func, iterable, chunksize=1):
'''
Equivalent of `itertools.imap()` -- can be MUCH slower than `Pool.map()`
'''
assert self._state == RUN
if chunksize == 1:
result = IMapIterator(self._cache)
self._taskqueue.put((((result._job, i, func, (x,), {})
for i, x in enumerate(iterable)), result._set_length))
return result
else:
assert chunksize > 1
task_batches = Pool._get_tasks(func, iterable, chunksize)
result = IMapIterator(self._cache)
self._taskqueue.put((((result._job, i, mapstar, (x,), {})
for i, x in enumerate(task_batches)), result._set_length))
return (item for chunk in result for item in chunk)
def imap_unordered(self, func, iterable, chunksize=1):
'''
Like `imap()` method but ordering of results is arbitrary
'''
assert self._state == RUN
if chunksize == 1:
result = IMapUnorderedIterator(self._cache)
self._taskqueue.put((((result._job, i, func, (x,), {})
for i, x in enumerate(iterable)), result._set_length))
return result
else:
assert chunksize > 1
task_batches = Pool._get_tasks(func, iterable, chunksize)
result = IMapUnorderedIterator(self._cache)
self._taskqueue.put((((result._job, i, mapstar, (x,), {})
for i, x in enumerate(task_batches)), result._set_length))
return (item for chunk in result for item in chunk)
def imap(self, func, iterable, chunksize=1):
'''
Equivalent of `itertools.imap()` -- can be MUCH slower than `Pool.map()`
'''
assert self._state == RUN
if chunksize == 1:
result = IMapIterator(self._cache)
self._taskqueue.put((((result._job, i, func, (x,), {})
for i, x in enumerate(iterable)), result._set_length))
return result
else:
assert chunksize > 1
task_batches = Pool._get_tasks(func, iterable, chunksize)
result = IMapIterator(self._cache)
self._taskqueue.put((((result._job, i, mapstar, (x,), {})
for i, x in enumerate(task_batches)), result._set_length))
return (item for chunk in result for item in chunk)
def imap_unordered(self, func, iterable, chunksize=1):
'''
Like `imap()` method but ordering of results is arbitrary
'''
assert self._state == RUN
if chunksize == 1:
result = IMapUnorderedIterator(self._cache)
self._taskqueue.put((((result._job, i, func, (x,), {})
for i, x in enumerate(iterable)), result._set_length))
return result
else:
assert chunksize > 1
task_batches = Pool._get_tasks(func, iterable, chunksize)
result = IMapUnorderedIterator(self._cache)
self._taskqueue.put((((result._job, i, mapstar, (x,), {})
for i, x in enumerate(task_batches)), result._set_length))
return (item for chunk in result for item in chunk)