def _clean_names(cls):
'''
Remove indices from non-repeated peeker names that don't need them.
When created, all peekers get an index appended to their name to
disambiguate any repeated names. If the name isn't actually repeated,
then the index is removed.
'''
index_re = '\[\d+\]$'
for name, peeker in cls._peekers.items():
if not peeker.name_dup:
new_name = re.sub(index_re, '', name)
if new_name != name:
peeker.trace.name = new_name
cls._peekers[new_name] = cls._peekers.pop(name)
python类sub()的实例源码
def _test_quantity_add_sub(self, unit, func):
x = self.Q_(unit, 'centimeter')
y = self.Q_(unit, 'inch')
z = self.Q_(unit, 'second')
a = self.Q_(unit, None)
func(op.add, x, x, self.Q_(unit + unit, 'centimeter'))
func(op.add, x, y, self.Q_(unit + 2.54 * unit, 'centimeter'))
func(op.add, y, x, self.Q_(unit + unit / (2.54 * unit), 'inch'))
func(op.add, a, unit, self.Q_(unit + unit, None))
self.assertRaises(DimensionalityError, op.add, 10, x)
self.assertRaises(DimensionalityError, op.add, x, 10)
self.assertRaises(DimensionalityError, op.add, x, z)
func(op.sub, x, x, self.Q_(unit - unit, 'centimeter'))
func(op.sub, x, y, self.Q_(unit - 2.54 * unit, 'centimeter'))
func(op.sub, y, x, self.Q_(unit - unit / (2.54 * unit), 'inch'))
func(op.sub, a, unit, self.Q_(unit - unit, None))
self.assertRaises(DimensionalityError, op.sub, 10, x)
self.assertRaises(DimensionalityError, op.sub, x, 10)
self.assertRaises(DimensionalityError, op.sub, x, z)
def string_preprocessor(input_string):
input_string = input_string.replace(",", "")
input_string = input_string.replace(" per ", "/")
for a, b in _subs_re:
input_string = a.sub(b, input_string)
# Replace pretty format characters
for pretty_exp in _pretty_exp_re.findall(input_string):
exp = '**' + pretty_exp.translate(_pretty_table)
input_string = input_string.replace(pretty_exp, exp)
input_string = input_string.translate(_pretty_table)
# Handle caret exponentiation
input_string = input_string.replace("^", "**")
return input_string
def test_safe_binop():
# Test checked arithmetic routines
ops = [
(operator.add, 1),
(operator.sub, 2),
(operator.mul, 3)
]
with exc_iter(ops, INT64_VALUES, INT64_VALUES) as it:
for xop, a, b in it:
pyop, op = xop
c = pyop(a, b)
if not (INT64_MIN <= c <= INT64_MAX):
assert_raises(OverflowError, mt.extint_safe_binop, a, b, op)
else:
d = mt.extint_safe_binop(a, b, op)
if c != d:
# assert_equal is slow
assert_equal(d, c)
def check_sub(Poly):
# This checks commutation, not numerical correctness
c1 = list(random((4,)) + .5)
c2 = list(random((3,)) + .5)
p1 = Poly(c1)
p2 = Poly(c2)
p3 = p1 - p2
assert_poly_almost_equal(p2 - p1, -p3)
assert_poly_almost_equal(p1 - c2, p3)
assert_poly_almost_equal(c2 - p1, -p3)
assert_poly_almost_equal(p1 - tuple(c2), p3)
assert_poly_almost_equal(tuple(c2) - p1, -p3)
assert_poly_almost_equal(p1 - np.array(c2), p3)
assert_poly_almost_equal(np.array(c2) - p1, -p3)
assert_raises(TypeError, op.sub, p1, Poly([0], domain=Poly.domain + 1))
assert_raises(TypeError, op.sub, p1, Poly([0], window=Poly.window + 1))
if Poly is Polynomial:
assert_raises(TypeError, op.sub, p1, Chebyshev([0]))
else:
assert_raises(TypeError, op.sub, p1, Polynomial([0]))
def countByValueAndWindow(self, windowDuration, slideDuration, numPartitions=None):
"""
Return a new DStream in which each RDD contains the count of distinct elements in
RDDs in a sliding window over this DStream.
@param windowDuration: width of the window; must be a multiple of this DStream's
batching interval
@param slideDuration: sliding interval of the window (i.e., the interval after which
the new DStream will generate RDDs); must be a multiple of this
DStream's batching interval
@param numPartitions: number of partitions of each RDD in the new DStream.
"""
keyed = self.map(lambda x: (x, 1))
counted = keyed.reduceByKeyAndWindow(operator.add, operator.sub,
windowDuration, slideDuration, numPartitions)
return counted.filter(lambda kv: kv[1] > 0)
def operate(self, left, right, operation):
""" Do operation on colors
args:
left (str): left side
right (str): right side
operation (str): Operation
returns:
str
"""
operation = {
'+': operator.add,
'-': operator.sub,
'*': operator.mul,
'/': operator.truediv
}.get(operation)
return operation(left, right)
def fetch_span(width_span, height_span, step=100):
width_diff = sub(*reversed(width_span))
step_count = width_diff // step
if step_count <= 1:
width = min(*width_span)
else:
width = width_span[0] + randint(0, step_count) * step
height_diff = sub(*reversed(height_span))
step_count = height_diff // step
if step_count <= 1:
height = min(*height_span)
else:
height = height_span[0] + randint(0, step_count) * step
return width, height
def test_safe_binop():
# Test checked arithmetic routines
ops = [
(operator.add, 1),
(operator.sub, 2),
(operator.mul, 3)
]
with exc_iter(ops, INT64_VALUES, INT64_VALUES) as it:
for xop, a, b in it:
pyop, op = xop
c = pyop(a, b)
if not (INT64_MIN <= c <= INT64_MAX):
assert_raises(OverflowError, mt.extint_safe_binop, a, b, op)
else:
d = mt.extint_safe_binop(a, b, op)
if c != d:
# assert_equal is slow
assert_equal(d, c)
def check_sub(Poly):
# This checks commutation, not numerical correctness
c1 = list(random((4,)) + .5)
c2 = list(random((3,)) + .5)
p1 = Poly(c1)
p2 = Poly(c2)
p3 = p1 - p2
assert_poly_almost_equal(p2 - p1, -p3)
assert_poly_almost_equal(p1 - c2, p3)
assert_poly_almost_equal(c2 - p1, -p3)
assert_poly_almost_equal(p1 - tuple(c2), p3)
assert_poly_almost_equal(tuple(c2) - p1, -p3)
assert_poly_almost_equal(p1 - np.array(c2), p3)
assert_poly_almost_equal(np.array(c2) - p1, -p3)
assert_raises(TypeError, op.sub, p1, Poly([0], domain=Poly.domain + 1))
assert_raises(TypeError, op.sub, p1, Poly([0], window=Poly.window + 1))
if Poly is Polynomial:
assert_raises(TypeError, op.sub, p1, Chebyshev([0]))
else:
assert_raises(TypeError, op.sub, p1, Polynomial([0]))
def select(self, id, start, stop, rate=False, maxlen=float('inf'), fixed=0):
"""Return points for a series within inclusive interval of most granular samples.
Optionally derive the rate of change of points.
Optionally limit number of points by increasing sample resolution.
Optionally return fixed intervals with padding and arbitrary resolution.
"""
minstep = total_seconds(stop - start) / maxlen
for index, model in enumerate(self):
if start >= model.start(id) and model.step >= minstep:
break
points = model.select(id, dt__gte=start, dt__lt=stop)
points = list(points if index else model.reduce(points))
if rate:
points = map(operator.sub, points[1:], points[:-1])
if fixed:
step = (stop - start) / fixed
intervals = [Point(start + step * index, 0.0, 0) for index in range(fixed)]
for point in points:
intervals[int(total_seconds(point.dt - start) / total_seconds(step))] += point
points = intervals
return points
def install_subnetwork(self, owner_object, node_tree_name, initial_status):
#transform the tree name into a NL module name
valid_characters = "abcdefghijklmnopqrstuvwxyz1234567890ABCDEFGHIJKLMNOPQRSTUVWXYZ"
stripped_name = "".join([c for c in node_tree_name if c in valid_characters])
if stripped_name in owner_object:
print("Network {} already installed for {}".format(stripped_name, owner_object.name))
if(initial_status is True): owner_object[node_tree_name].stopped = False
else:
print("Installing sub network...")
initial_status_key = 'NL_{}_initial_status'.format(node_tree_name)
owner_object[initial_status_key] = initial_status
module_name = 'bgelogic.NL{}'.format(stripped_name)
module = load_user_module(module_name)
module._initialize(owner_object)
subnetwork = owner_object[node_tree_name]
self.sub_networks.append(subnetwork)
def __init__(self, code, objects=None):
self._OPERATORS = [
('|', operator.or_),
('^', operator.xor),
('&', operator.and_),
('>>', operator.rshift),
('<<', operator.lshift),
('-', operator.sub),
('+', operator.add),
('%', operator.mod),
('/', operator.truediv),
('*', operator.mul),
]
self._ASSIGN_OPERATORS = [(op + '=', opfunc)
for op, opfunc in self._OPERATORS]
self._ASSIGN_OPERATORS.append(('=', lambda cur, right: right))
self._VARNAME_PATTERN = r'[a-zA-Z_$][a-zA-Z_$0-9]*'
if objects is None:
objects = {}
self.code = code
self._functions = {}
self._objects = objects
def number_of_args(fn):
"""Return the number of positional arguments for a function, or None if the number is variable.
Looks inside any decorated functions."""
try:
if hasattr(fn, '__wrapped__'):
return number_of_args(fn.__wrapped__)
if any(p.kind == p.VAR_POSITIONAL for p in signature(fn).parameters.values()):
return None
else:
return sum(p.kind in (p.POSITIONAL_ONLY, p.POSITIONAL_OR_KEYWORD) for p in signature(fn).parameters.values())
except ValueError:
# signatures don't work for built-in operators, so check for a few explicitly
UNARY_OPS = [len, op.not_, op.truth, op.abs, op.index, op.inv, op.invert, op.neg, op.pos]
BINARY_OPS = [op.lt, op.le, op.gt, op.ge, op.eq, op.ne, op.is_, op.is_not, op.add, op.and_, op.floordiv, op.lshift, op.mod, op.mul, op.or_, op.pow, op.rshift, op.sub, op.truediv, op.xor, op.concat, op.contains, op.countOf, op.delitem, op.getitem, op.indexOf]
TERNARY_OPS = [op.setitem]
if fn in UNARY_OPS:
return 1
elif fn in BINARY_OPS:
return 2
elif fn in TERNARY_OPS:
return 3
else:
raise NotImplementedError("Bult-in operator {} not supported".format(fn))
def focus_selectable(self, forward=True):
"""Change focus to next selectable widget
forward: True to select next widget, False to select previous widget
Returns True if focus was changed, False otherwise.
"""
op = operator.add if forward else operator.sub
max_pos = len(self._main.contents)-1
new_pos = None
pos = self.focus_position
while 0 < pos < max_pos:
pos = op(pos, 1)
item = self._get_item_by_position(pos, visible=True)
if item is not None and item['widget'].selectable():
new_pos = pos
break
if new_pos is not None:
self.focus_position = new_pos
return True
return False
def filter(self, result, from_addr, to_addr, distance):
filtered_result = {}
for key in result:
filtered_list = result[key]
if from_addr:
filtered_list = [item for item in filtered_list if self.base_address + item[0] >= from_addr]
if to_addr:
filtered_list = [item for item in filtered_list if self.base_address + item[0] <= to_addr]
if distance:
if len(filtered_list) < 2:
filtered_list = []
else:
offsets_a = [item[0] for item in filtered_list]
offsets_b = offsets_a[1:] + [0]
api_distances = list(map(operator.sub, offsets_b, offsets_a))
distance_filtered = []
for index, api_distance in enumerate(api_distances[:-1]):
if api_distance <= distance:
if filtered_list[index] not in distance_filtered:
distance_filtered.append(filtered_list[index])
if filtered_list[index + 1] not in distance_filtered:
distance_filtered.append(filtered_list[index + 1])
filtered_list = distance_filtered
filtered_result[key] = filtered_list
return filtered_result
def run_single_regret(bandit_list,bandit_params,plays):
sum_probs_chosen=0
opt=np.zeros(plays)
chosen=np.zeros(plays)
bandit_probs = [x.get_prob() for x in bandit_list]
opt_solution = max(bandit_probs)
for i in range(0,plays):
index = sample_distributions_and_choose(bandit_params)
sum_probs_chosen+=bandit_probs[index]
if(bandit_list[index].pull_handle()):
bandit_params[index]=\
(bandit_params[index][0]+1,bandit_params[index][1])
else:
bandit_params[index]=\
(bandit_params[index][0],bandit_params[index][1]+1)
opt[i] = (i+1)*opt_solution
chosen[i] = sum_probs_chosen
regret_total = map(sub,opt,chosen)
return regret_total
#7.9
#Plot params beforehand
test_range.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 49
收藏 0
点赞 0
评论 0
def test_binops(self):
ops = [operator.add, operator.sub, operator.mul, operator.floordiv,
operator.truediv, pow]
scalars = [-1, 1, 2]
idxs = [RangeIndex(0, 10, 1), RangeIndex(0, 20, 2),
RangeIndex(-10, 10, 2), RangeIndex(5, -5, -1)]
for op in ops:
for a, b in combinations(idxs, 2):
result = op(a, b)
expected = op(Int64Index(a), Int64Index(b))
tm.assert_index_equal(result, expected)
for idx in idxs:
for scalar in scalars:
result = op(idx, scalar)
expected = op(Int64Index(idx), scalar)
tm.assert_index_equal(result, expected)
test_range.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 29
收藏 0
点赞 0
评论 0
def test_explicit_conversions(self):
# GH 8608
# add/sub are overriden explicity for Float/Int Index
idx = RangeIndex(5)
# float conversions
arr = np.arange(5, dtype='int64') * 3.2
expected = Float64Index(arr)
fidx = idx * 3.2
tm.assert_index_equal(fidx, expected)
fidx = 3.2 * idx
tm.assert_index_equal(fidx, expected)
# interops with numpy arrays
expected = Float64Index(arr)
a = np.zeros(5, dtype='float64')
result = fidx - a
tm.assert_index_equal(result, expected)
expected = Float64Index(-arr)
a = np.zeros(5, dtype='float64')
result = a - fidx
tm.assert_index_equal(result, expected)
test_panel.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def test_arith(self):
self._test_op(self.panel, operator.add)
self._test_op(self.panel, operator.sub)
self._test_op(self.panel, operator.mul)
self._test_op(self.panel, operator.truediv)
self._test_op(self.panel, operator.floordiv)
self._test_op(self.panel, operator.pow)
self._test_op(self.panel, lambda x, y: y + x)
self._test_op(self.panel, lambda x, y: y - x)
self._test_op(self.panel, lambda x, y: y * x)
self._test_op(self.panel, lambda x, y: y / x)
self._test_op(self.panel, lambda x, y: y ** x)
self._test_op(self.panel, lambda x, y: x + y) # panel + 1
self._test_op(self.panel, lambda x, y: x - y) # panel - 1
self._test_op(self.panel, lambda x, y: x * y) # panel * 1
self._test_op(self.panel, lambda x, y: x / y) # panel / 1
self._test_op(self.panel, lambda x, y: x ** y) # panel ** 1
self.assertRaises(Exception, self.panel.__add__, self.panel['ItemA'])
test_panel.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def test_arith_flex_panel(self):
ops = ['add', 'sub', 'mul', 'div', 'truediv', 'pow', 'floordiv', 'mod']
if not compat.PY3:
aliases = {}
else:
aliases = {'div': 'truediv'}
self.panel = self.panel.to_panel()
for n in [np.random.randint(-50, -1), np.random.randint(1, 50), 0]:
for op in ops:
alias = aliases.get(op, op)
f = getattr(operator, alias)
exp = f(self.panel, n)
result = getattr(self.panel, op)(n)
assert_panel_equal(result, exp, check_panel_type=True)
# rops
r_f = lambda x, y: f(y, x)
exp = r_f(self.panel, n)
result = getattr(self.panel, 'r' + op)(n)
assert_panel_equal(result, exp)
test_operators.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def test_operators_none_as_na(self):
df = DataFrame({"col1": [2, 5.0, 123, None],
"col2": [1, 2, 3, 4]}, dtype=object)
ops = [operator.add, operator.sub, operator.mul, operator.truediv]
# since filling converts dtypes from object, changed expected to be
# object
for op in ops:
filled = df.fillna(np.nan)
result = op(df, 3)
expected = op(filled, 3).astype(object)
expected[com.isnull(expected)] = None
assert_frame_equal(result, expected)
result = op(df, df)
expected = op(filled, filled).astype(object)
expected[com.isnull(expected)] = None
assert_frame_equal(result, expected)
result = op(df, df.fillna(7))
assert_frame_equal(result, expected)
result = op(df.fillna(7), df)
assert_frame_equal(result, expected, check_dtype=False)
test_operators.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def test_arith_getitem_commute(self):
df = DataFrame({'A': [1.1, 3.3], 'B': [2.5, -3.9]})
self._test_op(df, operator.add)
self._test_op(df, operator.sub)
self._test_op(df, operator.mul)
self._test_op(df, operator.truediv)
self._test_op(df, operator.floordiv)
self._test_op(df, operator.pow)
self._test_op(df, lambda x, y: y + x)
self._test_op(df, lambda x, y: y - x)
self._test_op(df, lambda x, y: y * x)
self._test_op(df, lambda x, y: y / x)
self._test_op(df, lambda x, y: y ** x)
self._test_op(df, lambda x, y: x + y)
self._test_op(df, lambda x, y: x - y)
self._test_op(df, lambda x, y: x * y)
self._test_op(df, lambda x, y: x / y)
self._test_op(df, lambda x, y: x ** y)
test_sparse.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 36
收藏 0
点赞 0
评论 0
def test_binary_operators(self):
# skipping for now #####
raise nose.SkipTest("skipping sparse binary operators test")
def _check_inplace_op(iop, op):
tmp = self.bseries.copy()
expected = op(tmp, self.bseries)
iop(tmp, self.bseries)
assert_sp_series_equal(tmp, expected)
inplace_ops = ['add', 'sub', 'mul', 'truediv', 'floordiv', 'pow']
for op in inplace_ops:
_check_inplace_op(getattr(operator, "i%s" % op),
getattr(operator, op))
test_classes.py 文件源码
项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda
作者: SignalMedia
项目源码
文件源码
阅读 33
收藏 0
点赞 0
评论 0
def check_sub(Poly):
# This checks commutation, not numerical correctness
c1 = list(random((4,)) + .5)
c2 = list(random((3,)) + .5)
p1 = Poly(c1)
p2 = Poly(c2)
p3 = p1 - p2
assert_poly_almost_equal(p2 - p1, -p3)
assert_poly_almost_equal(p1 - c2, p3)
assert_poly_almost_equal(c2 - p1, -p3)
assert_poly_almost_equal(p1 - tuple(c2), p3)
assert_poly_almost_equal(tuple(c2) - p1, -p3)
assert_poly_almost_equal(p1 - np.array(c2), p3)
assert_poly_almost_equal(np.array(c2) - p1, -p3)
assert_raises(TypeError, op.sub, p1, Poly([0], domain=Poly.domain + 1))
assert_raises(TypeError, op.sub, p1, Poly([0], window=Poly.window + 1))
if Poly is Polynomial:
assert_raises(TypeError, op.sub, p1, Chebyshev([0]))
else:
assert_raises(TypeError, op.sub, p1, Polynomial([0]))
def sample_environment():
environment = Environment.create_standard()
extras = {
'variable': 20,
'abc': None,
'def': None,
'add': operator.add,
'subtract': operator.sub,
'multiply': lambda x, y, z: x * y * z,
'return10': lambda: 10,
'x': 10,
'y': 20,
'z': 30,
'foo': 100,
}
environment.define_batch_of(extras.items())
return environment
def get_default_operators():
""" generate a mapping of default operators allowed for evaluation """
return {
'u-': Func(1, operator.neg), # unary negation
'u%': Func(1, lambda a: a / Decimal(100)), # unary percentage
'&': Func(2, operator.concat),
'^': Func(2, operator.pow),
'+': Func(2, op_add),
'-': Func(2, operator.sub),
'/': Func(2, operator.truediv),
'*': Func(2, operator.mul),
'=': Func(2, operator.eq),
'<>': Func(2, lambda a, b: not operator.eq(a, b)),
'>': Func(2, operator.gt),
'<': Func(2, operator.lt),
'>=': Func(2, operator.ge),
'<=': Func(2, operator.le),
}
def diffWaysToCompute(self, s):
"""
:type s: str
:rtype: List[int]
"""
tokens = re.split('(\D)', s) # magical regular expression
nums = map(int, tokens[::2]) # nums: an array, ops: a method (operator.xxx)
ops = map({'+': operator.add, '-': operator.sub, '*': operator.mul}.get, tokens[1::2])
def build(lo, hi): # top-down method
if lo == hi:
return [nums[lo]]
return [ops[i](a, b)
for i in xrange(lo, hi)
for a in build(lo, i)
for b in build(i + 1, hi)]
return build(0, len(nums)-1)
def check_sub(Poly):
# This checks commutation, not numerical correctness
c1 = list(random((4,)) + .5)
c2 = list(random((3,)) + .5)
p1 = Poly(c1)
p2 = Poly(c2)
p3 = p1 - p2
assert_poly_almost_equal(p2 - p1, -p3)
assert_poly_almost_equal(p1 - c2, p3)
assert_poly_almost_equal(c2 - p1, -p3)
assert_poly_almost_equal(p1 - tuple(c2), p3)
assert_poly_almost_equal(tuple(c2) - p1, -p3)
assert_poly_almost_equal(p1 - np.array(c2), p3)
assert_poly_almost_equal(np.array(c2) - p1, -p3)
assert_raises(TypeError, op.sub, p1, Poly([0], domain=Poly.domain + 1))
assert_raises(TypeError, op.sub, p1, Poly([0], window=Poly.window + 1))
if Poly is Polynomial:
assert_raises(TypeError, op.sub, p1, Chebyshev([0]))
else:
assert_raises(TypeError, op.sub, p1, Polynomial([0]))
def countByValueAndWindow(self, windowDuration, slideDuration, numPartitions=None):
"""
Return a new DStream in which each RDD contains the count of distinct elements in
RDDs in a sliding window over this DStream.
@param windowDuration: width of the window; must be a multiple of this DStream's
batching interval
@param slideDuration: sliding interval of the window (i.e., the interval after which
the new DStream will generate RDDs); must be a multiple of this
DStream's batching interval
@param numPartitions: number of partitions of each RDD in the new DStream.
"""
keyed = self.map(lambda x: (x, 1))
counted = keyed.reduceByKeyAndWindow(operator.add, operator.sub,
windowDuration, slideDuration, numPartitions)
return counted.filter(lambda kv: kv[1] > 0).count()