def recursive_repr(func):
"""Decorator to prevent infinite repr recursion."""
repr_running = set()
@wraps(func)
def wrapper(self):
key = id(self), get_ident()
if key in repr_running:
return '...'
repr_running.add(key)
try:
return func(self)
finally:
repr_running.discard(key)
return wrapper
python类add()的实例源码
def add(self, val):
"""Add the element *val* to the list."""
_maxes, _lists = self._maxes, self._lists
if _maxes:
pos = bisect_right(_maxes, val)
if pos == len(_maxes):
pos -= 1
_maxes[pos] = val
_lists[pos].append(val)
else:
insort(_lists[pos], val)
self._expand(pos)
else:
_maxes.append(val)
_lists.append([val])
self._len += 1
def update(self, iterable):
"""Update the list by adding all elements from *iterable*."""
_maxes, _lists = self._maxes, self._lists
values = sorted(iterable)
if _maxes:
if len(values) * 4 >= self._len:
values.extend(chain.from_iterable(_lists))
values.sort()
self.clear()
else:
_add = self.add
for val in values:
_add(val)
return
_load, _index = self._load, self._index
_lists.extend(values[pos:(pos + _load)]
for pos in range(0, len(values), _load))
_maxes.extend(sublist[-1] for sublist in _lists)
self._len = len(values)
del _index[:]
def update(self, iterable):
"""Update the list by adding all elements from *iterable*."""
_maxes, _lists, _keys = self._maxes, self._lists, self._keys
values = sorted(iterable, key=self._key)
if _maxes:
if len(values) * 4 >= self._len:
values.extend(chain.from_iterable(_lists))
values.sort(key=self._key)
self.clear()
else:
_add = self.add
for val in values:
_add(val)
return
_load, _index = self._load, self._index
_lists.extend(values[pos:(pos + _load)]
for pos in range(0, len(values), _load))
_keys.extend(list(map(self._key, _list)) for _list in _lists)
_maxes.extend(sublist[-1] for sublist in _keys)
self._len = len(values)
del _index[:]
def mark_todelete(cls, drive_id):
"""Marks PD for deletion. Also creates a new one PD with the same
'name' and owner, but with different physical 'drive_name'. It is
needed for possibility to fast relaunch the pod right after PD
deletion. If we will use the same physical drive name, then we have
to wait until old drive will be actually deleted.
"""
pd = cls.query.filter(cls.id == drive_id, cls.pod_id.is_(None)).first()
if not pd or pd.state == PersistentDiskStatuses.TODELETE:
return
new_drive_name = cls._increment_drive_name(pd)
old_name = pd.name
# change name for deleting PD to prevent conflict of uniques and
# to hide PD from search by name
pd.name = uuid.uuid4().hex
pd.state = PersistentDiskStatuses.TODELETE
db.session.flush()
new_pd = cls(
drive_name=new_drive_name, name=old_name, owner_id=pd.owner_id,
size=pd.size, state=PersistentDiskStatuses.DELETED
)
db.session.add(new_pd)
db.session.commit()
return new_pd
def scanr(col, func=add, acc=None):
'''
Use a given accumulator value to build a list of values obtained
by repeatedly applying acc = func(next(list), acc) from the right.
WARNING: Right folds and scans will blow up for infinite generators!
'''
try:
col = reversed(col)
except TypeError:
col = reversed(list(col))
if acc is not None:
col = chain([acc], col)
return list(itools.accumulate(col, func))
def _columns_plus_names(self):
if self.use_labels:
names = set()
def name_for_col(c):
if c._label is None:
return (None, c)
name = c._label
if name in names:
name = c.anon_label
else:
names.add(name)
return name, c
return [
name_for_col(c)
for c in util.unique_list(
_select_iterables(self._raw_columns))
]
else:
return [
(None, c)
for c in util.unique_list(
_select_iterables(self._raw_columns))
]
find_model_collaborative.py 文件源码
项目:spark-recommendation-engine
作者: GoogleCloudPlatform
项目源码
文件源码
阅读 81
收藏 0
点赞 0
评论 0
def howFarAreWe(model, against, sizeAgainst):
# Ignore the rating column
againstNoRatings = against.map(lambda x: (int(x[0]), int(x[1])) )
# Keep the rating to compare against
againstWiRatings = against.map(lambda x: ((int(x[0]),int(x[1])), int(x[2])) )
# Make a prediction and map it for later comparison
# The map has to be ((user,product), rating) not ((product,user), rating)
predictions = model.predictAll(againstNoRatings).map(lambda p: ( (p[0],p[1]), p[2]) )
# Returns the pairs (prediction, rating)
predictionsAndRatings = predictions.join(againstWiRatings).values()
# Returns the variance
return sqrt(predictionsAndRatings.map(lambda s: (s[0] - s[1]) ** 2).reduce(add) / float(sizeAgainst))
#[END how_far]
# Read the data from the Cloud SQL
# Create dataframes
def run(self, job, computation, node):
def _run(self, task=None):
self.task.send({'req': 'run', 'auth': computation._auth, 'job': job, 'client': task})
rtask = yield task.receive(timeout=MsgTimeout)
# currently fault-tolerancy is not supported, so clear job's
# args to save space
job.args = job.kwargs = None
if isinstance(rtask, Task):
# TODO: keep func too for fault-tolerance
job.done = pycos.Event()
self.rtasks[rtask] = (rtask, job)
if self.askew_results:
msg = self.askew_results.pop(rtask, None)
if msg:
Scheduler.__status_task.send(msg)
else:
logger.debug('failed to create rtask: %s', rtask)
if job.cpu:
self.avail.set()
node.cpus_used -= 1
node.load = float(node.cpus_used) / len(node.servers)
self.scheduler._avail_nodes.add(node)
self.scheduler._nodes_avail.set()
node.avail.set()
raise StopIteration(rtask)
rtask = yield SysTask(_run, self).finish()
job.client.send(rtask)
def status(self):
pending = sum(node.cpus_used for node in self._nodes.itervalues())
servers = reduce(operator.add, [node.servers.keys()
for node in self._nodes.itervalues()], [])
return {'Client': self._cur_computation._pulse_task.location if self._cur_computation else '',
'Pending': pending, 'Nodes': self._nodes.keys(), 'Servers': servers
}
def run(self, job, computation, node):
def _run(self, task=None):
self.task.send({'req': 'run', 'auth': computation._auth, 'job': job, 'client': task})
rtask = yield task.receive(timeout=MsgTimeout)
# currently fault-tolerancy is not supported, so clear job's
# args to save space
job.args = job.kwargs = None
if isinstance(rtask, Task):
# TODO: keep func too for fault-tolerance
job.done = pycos.Event()
self.rtasks[rtask] = (rtask, job)
if self.askew_results:
msg = self.askew_results.pop(rtask, None)
if msg:
Scheduler.__status_task.send(msg)
else:
logger.debug('failed to create rtask: %s', rtask)
if job.cpu:
self.avail.set()
node.cpus_used -= 1
node.load = float(node.cpus_used) / len(node.servers)
self.scheduler._avail_nodes.add(node)
self.scheduler._nodes_avail.set()
node.avail.set()
raise StopIteration(rtask)
rtask = yield SysTask(_run, self).finish()
job.client.send(rtask)
def status(self):
pending = sum(node.cpus_used for node in self._nodes.values())
servers = functools.reduce(operator.add, [list(node.servers.keys())
for node in self._nodes.values()], [])
return {'Client': self._cur_computation._pulse_task.location if self._cur_computation else '',
'Pending': pending, 'Nodes': list(self._nodes.keys()), 'Servers': servers
}
def __add__(self, trc):
return self.apply_op2(trc, operator.add)
def __init__(self, capacity):
super(SumSegmentTree, self).__init__(
capacity=capacity,
operation=operator.add,
neutral_element=0.0
)
def test_preduce_one_process(self):
""" Test that preduce reduces to functools.reduce for a single process """
integers = list(range(0, 10))
preduce_results = preduce(add, integers, processes = 1)
reduce_results = reduce(add, integers)
self.assertEqual(preduce_results, reduce_results)
def test_preduce_multiple_processes(self):
""" Test that preduce reduces to functools.reduce for a single process """
integers = list(range(0, 10))
preduce_results = preduce(add, integers, processes = 2)
reduce_results = reduce(add, integers)
self.assertEqual(preduce_results, reduce_results)
def test_on_numpy_arrays(self):
""" Test sum of numpy arrays as parallel reduce"""
arrays = [np.zeros((32,32)) for _ in range(10)]
s = preduce(add, arrays, processes = 2)
self.assertTrue(np.allclose(s, arrays[0]))
def equalize(image, mask=None):
"""
Equalize the image histogram. This function applies a non-linear
mapping to the input image, in order to create a uniform
distribution of grayscale values in the output image.
:param image: The image to equalize.
:param mask: An optional mask. If given, only the pixels selected by
the mask are included in the analysis.
:return: An image.
"""
if image.mode == "P":
image = image.convert("RGB")
h = image.histogram(mask)
lut = []
for b in range(0, len(h), 256):
histo = [_f for _f in h[b:b+256] if _f]
if len(histo) <= 1:
lut.extend(list(range(256)))
else:
step = (functools.reduce(operator.add, histo) - histo[-1]) // 255
if not step:
lut.extend(list(range(256)))
else:
n = step // 2
for i in range(256):
lut.append(n // step)
n = n + h[i+b]
return _lut(image, lut)
def _getcount(self):
"Get total number of pixels in each layer"
v = []
for i in range(0, len(self.h), 256):
v.append(functools.reduce(operator.add, self.h[i:i+256]))
return v
def addc(a: int, b: int) -> int:
return operator.add(a, b)