def get_comma_separated_data(raw):
# Convert to long string
header, data = "".join(raw).strip().split(" = ")
# Remove trailing comma
assert data[-1] == ';'
data = data[:-1]
# Remove newline characters and convert to list
data = eval(data.replace("\n", ''))
shape = tuple(eval(header[header.index("["):header.index("]") + 1]))
step_size = functools.reduce(operator.mul, shape) + 1
years = np.array(data[::step_size], dtype=int)
data = np.stack([
np.array(data[1 + index * step_size:(index + 1) * step_size]).reshape(shape)
for index in range(len(years))
], axis=-1)
return header, years, data
python类mul()的实例源码
def get_space_separated_data(raw):
assert raw[0].strip().endswith("= [")
assert raw[-1].strip().endswith("];")
header = raw[0].replace("= [", "").strip()
shape = tuple(eval(header[header.index("["):header.index("]") + 1]))
data = [eval(line.strip().replace(" ", ",")) for line in raw[1:-1]]
if len(shape) == 1:
step_size = 1
else:
step_size = functools.reduce(operator.mul, shape[:-1])
years = np.array(data[::step_size + 1], dtype=int)
subarrays = [
np.array(data[index * (step_size + 1) + 1:(index + 1) * (step_size + 1)]).reshape(shape)
for index in range(len(years))
]
return header, years, np.stack(subarrays, axis=-1)
def test_combine(self):
cases = (
({'a': 2},
{'a': 3},
operator.mul,
None,
{'a': 6}),
({'a': 2},
{'a': 3},
operator.mul,
{'a': True},
{'a': 2}),
)
for a, b, op, exclude, expected in cases:
result = dictutil.combine(a, b, op, exclude=exclude)
self.assertIsNot(a, result)
self.assertDictEqual(expected, result,
repr([a, b, op, exclude, expected, result]))
result = dictutil.combineto(a, b, op, exclude=exclude)
self.assertIs(a, result)
self.assertDictEqual(expected, result,
repr([a, b, op, exclude, expected, result]))
factorial.py 文件源码
项目:Software-Architecture-with-Python
作者: PacktPublishing
项目源码
文件源码
阅读 36
收藏 0
点赞 0
评论 0
def factorial(n):
""" Factorial of a number.
>>> factorial(0)
1
>>> factorial(1)
1
>>> factorial(5)
120
>>> factorial(10)
3628800
"""
# Handle 0 as a special case
if n == 0:
return 1
return functools.reduce(operator.mul, range(1,n+1))
def test_safe_binop():
# Test checked arithmetic routines
ops = [
(operator.add, 1),
(operator.sub, 2),
(operator.mul, 3)
]
with exc_iter(ops, INT64_VALUES, INT64_VALUES) as it:
for xop, a, b in it:
pyop, op = xop
c = pyop(a, b)
if not (INT64_MIN <= c <= INT64_MAX):
assert_raises(OverflowError, mt.extint_safe_binop, a, b, op)
else:
d = mt.extint_safe_binop(a, b, op)
if c != d:
# assert_equal is slow
assert_equal(d, c)
def test_datafriendly_mul(self):
# Test keeping data w/ (inplace) multiplication
# Test mul w/ scalar
x = array([1, 2, 3], mask=[0, 0, 1])
xx = x * 2
assert_equal(xx.data, [2, 4, 3])
assert_equal(xx.mask, [0, 0, 1])
# Test imul w/ scalar
x = array([1, 2, 3], mask=[0, 0, 1])
x *= 2
assert_equal(x.data, [2, 4, 3])
assert_equal(x.mask, [0, 0, 1])
# Test mul w/ array
x = array([1, 2, 3], mask=[0, 0, 1])
xx = x * array([10, 20, 30], mask=[1, 0, 0])
assert_equal(xx.data, [1, 40, 3])
assert_equal(xx.mask, [1, 0, 1])
# Test imul w/ array
x = array([1, 2, 3], mask=[0, 0, 1])
x *= array([10, 20, 30], mask=[1, 0, 0])
assert_equal(x.data, [1, 40, 3])
assert_equal(x.mask, [1, 0, 1])
def _read_datafile(self, path, expected_dims):
"""Helper function to read a file in IDX format."""
base_magic_num = 2048
with gzip.GzipFile(path) as f:
magic_num = struct.unpack('>I', f.read(4))[0]
expected_magic_num = base_magic_num + expected_dims
if magic_num != expected_magic_num:
raise ValueError('Incorrect MNIST magic number (expected '
'{}, got {})'
.format(expected_magic_num, magic_num))
dims = struct.unpack('>' + 'I' * expected_dims,
f.read(4 * expected_dims))
buf = f.read(reduce(operator.mul, dims))
data = np.frombuffer(buf, dtype=np.uint8)
data = data.reshape(*dims)
return data
def variable_with_weight_decay(name, shape, stddev, wd):
"""Helper to create an initialized Variable with weight decay.
Note that the Variable is initialized with a truncated normal distribution.
A weight decay is added only if one is specified.
Args:
name: name of the variable
shape: list of ints
stddev: standard deviation of a truncated Gaussian
wd: add L2Loss weight decay multiplied by this float. If None, weight
decay is not added for this Variable.
Returns:
Variable Tensor
"""
var = variable_on_cpu(name, shape,
tf.truncated_normal_initializer(stddev=stddev))
if wd:
weight_decay = tf.mul(tf.nn.l2_loss(var), wd, name='weight_loss')
tf.add_to_collection('losses', weight_decay)
return var
def _generate_normal(self, func, size, dtype, *args):
# curand functions below don't support odd size.
# * curand.generateNormal
# * curand.generateNormalDouble
# * curand.generateLogNormal
# * curand.generateLogNormalDouble
size = core.get_size(size)
element_size = six.moves.reduce(operator.mul, size, 1)
if element_size % 2 == 0:
out = cupy.empty(size, dtype=dtype)
func(self._generator, out.data.ptr, out.size, *args)
return out
else:
out = cupy.empty((element_size + 1,), dtype=dtype)
func(self._generator, out.data.ptr, out.size, *args)
return out[:element_size].reshape(size)
# NumPy compatible functions
def categorical_accuracy(y_true, y_pred, mask=True):
'''
categorical_accuracy adjusted for padding mask
'''
# if mask is not None:
print y_true
print y_pred
eval_shape = (reduce(mul, y_true.shape[:-1]), y_true.shape[-1])
print eval_shape
y_true_ = np.reshape(y_true, eval_shape)
y_pred_ = np.reshape(y_pred, eval_shape)
flat_mask = np.flatten(mask)
comped = np.equal(np.argmax(y_true_, axis=-1),
np.argmax(y_pred_, axis=-1))
## not sure how to do this in tensor flow
good_entries = flat_mask.nonzero()[0]
return np.mean(np.gather(comped, good_entries))
# else:
# return K.mean(K.equal(K.argmax(y_true, axis=-1),
# K.argmax(y_pred, axis=-1)))
def tdma(a, b, c, d, workgrp_size=None):
assert a.shape == b.shape == c.shape == d.shape
assert a.dtype == b.dtype == c.dtype == d.dtype
# Check that PyOpenCL is installed and that the Bohrium runtime uses the OpenCL backend
if not bh.interop_pyopencl.available():
raise NotImplementedError("OpenCL not available")
# Get the OpenCL context from Bohrium
ctx = bh.interop_pyopencl.get_context()
queue = cl.CommandQueue(ctx)
ret = bh.empty(a.shape, dtype=a.dtype)
a_buf, b_buf, c_buf, d_buf, ret_buf = map(bh.interop_pyopencl.get_buffer, (a, b, c, d, ret))
prg = compile_tdma(ret.shape[-1], bh.interop_pyopencl.type_np2opencl_str(a.dtype))
global_size = functools.reduce(operator.mul, ret.shape[:-1])
prg.tdma(queue, [global_size], workgrp_size, a_buf, b_buf, c_buf, d_buf, ret_buf)
return ret
def test_reduceby(self):
data = [1, 2, 3, 4, 5]
def iseven(x): return x % 2 == 0
assert reduceby(iseven, add, data, 0) == {False: 9, True: 6}
assert reduceby(iseven, mul, data, 1) == {False: 15, True: 8}
projects = [{'name': 'build roads', 'state': 'CA', 'cost': 1000000},
{'name': 'fight crime', 'state': 'IL', 'cost': 100000},
{'name': 'help farmers', 'state': 'IL', 'cost': 2000000},
{'name': 'help farmers', 'state': 'CA', 'cost': 200000}]
assert reduceby(lambda x: x['state'],
lambda acc, x: acc + x['cost'],
projects, 0) == {'CA': 1200000, 'IL': 2100000}
assert reduceby('state',
lambda acc, x: acc + x['cost'],
projects, 0) == {'CA': 1200000, 'IL': 2100000}
def operate(self, left, right, operation):
""" Do operation on colors
args:
left (str): left side
right (str): right side
operation (str): Operation
returns:
str
"""
operation = {
'+': operator.add,
'-': operator.sub,
'*': operator.mul,
'/': operator.truediv
}.get(operation)
return operation(left, right)
def variable_with_weight_decay(name, shape, stddev, wd):
"""Helper to create an initialized Variable with weight decay.
Note that the Variable is initialized with a truncated normal distribution.
A weight decay is added only if one is specified.
Args:
name: name of the variable
shape: list of ints
stddev: standard deviation of a truncated Gaussian
wd: add L2Loss weight decay multiplied by this float. If None, weight
decay is not added for this Variable.
Returns:
Variable Tensor
"""
var = variable_on_cpu(name, shape,
tf.truncated_normal_initializer(stddev=stddev))
if wd:
weight_decay = tf.mul(tf.nn.l2_loss(var), wd, name='weight_loss')
tf.add_to_collection('losses', weight_decay)
return var
def create_tomo_blocks(qubits, numPulses, alignment='parallel'):
'''
Helper function to create the tomography pulse block in either parallel or serial.
'''
#Tomography pulse sets
if numPulses == 4:
tomoSet = [Id, X90, Y90, X]
elif numPulses == 6:
tomoSet = [Id, X90, X90m, Y90, Y90m, X]
else:
raise ValueError("Only able to handle numPulses=4 or 6")
#Create all combinations of pulses for the number of qubits
if alignment == 'parallel':
return [reduce(operator.mul, [p(q) for p, q in zip(pulseSet, qubits)])
for pulseSet in product(tomoSet, repeat=len(qubits))]
elif alignment == 'serial':
return [[p(q) for p, q in zip(pulseSet, qubits)]
for pulseSet in product(tomoSet, repeat=len(qubits))]
else:
raise ValueError("Alignment must be either serial or parallel")
def state_tomo(seq, qubits, numPulses=4, measChans=None):
'''
Apply state tomography readout pulses and measurement.
Parameters
-----------
seq : a single entry list sequence to perform tomography on
qubits : which qubits to act on
numPulses : number of readout pulses
measChans : tuple of measurement channels to readout (defaults to individual qubit channels)
'''
if measChans is None:
measChans = qubits
measBlock = reduce(operator.mul, [MEAS(q) for q in measChans])
return [seq + [tomoBlock, measBlock]
for tomoBlock in create_tomo_blocks(qubits, numPulses)]
def process_tomo(seq, qubits, numPulses=4, measChans=None):
'''
Apply process tomography state prep. and readout pulses and measurement.
Parameters
-----------
seq : a single entry list sequence to perform tomography on
qubits : which qubits to act on
numPulses : number of prep/readout pulses
measChans : tuple of measurement channels to readout (defaults to individual qubit channels)
'''
if measChans is None:
measChans = qubits
measBlock = reduce(operator.mul, [MEAS(q) for q in measChans])
seqs = []
for k in range(numPulses**len(qubits)):
for readoutBlock in create_tomo_blocks(qubits, numPulses):
prepBlock = create_tomo_blocks(qubits, numPulses)[k]
tomoseq = [prepBlock] + seq + [readoutBlock, measBlock]
seqs.append(tomoseq)
return seqs
def get_samples(desired_data):
all_samples = []
for data in desired_data:
temperatures = np.atleast_1d(data['conditions']['T'])
num_configs = np.array(data['solver'].get('sublattice_configurations'), dtype=np.object).shape[0]
site_fractions = data['solver'].get('sublattice_occupancies', [[1]] * num_configs)
site_fraction_product = [reduce(operator.mul, list(itertools.chain(*[np.atleast_1d(f) for f in fracs])), 1)
for fracs in site_fractions]
# TODO: Subtle sorting bug here, if the interactions aren't already in sorted order...
interaction_product = []
for fracs in site_fractions:
interaction_product.append(float(reduce(operator.mul,
[f[0] - f[1] for f in fracs if isinstance(f, list) and len(f) == 2],
1)))
if len(interaction_product) == 0:
interaction_product = [0]
comp_features = zip(site_fraction_product, interaction_product)
all_samples.extend(list(itertools.product(temperatures, comp_features)))
return all_samples
def variable_with_weight_decay(name, shape, stddev, wd):
"""Helper to create an initialized Variable with weight decay.
Note that the Variable is initialized with a truncated normal distribution.
A weight decay is added only if one is specified.
Args:
name: name of the variable
shape: list of ints
stddev: standard deviation of a truncated Gaussian
wd: add L2Loss weight decay multiplied by this float. If None, weight
decay is not added for this Variable.
Returns:
Variable Tensor
"""
var = variable_on_cpu(name, shape,
tf.truncated_normal_initializer(stddev=stddev))
if wd:
weight_decay = tf.mul(tf.nn.l2_loss(var), wd, name='weight_loss')
tf.add_to_collection('losses', weight_decay)
return var
def prod(a, start=1):
"""Return product of elements of a. Start with int 1 so if only
ints are included then an int result is returned.
Examples
========
>>> from sympy import prod, S
>>> prod(range(3))
0
>>> type(_) is int
True
>>> prod([S(2), 3])
6
>>> _.is_Integer
True
You can start the product at something other than 1:
>>> prod([1, 2], 3)
6
"""
return reduce(operator.mul, a, start)
def pde_separate_mul(eq, fun, sep):
"""
Helper function for searching multiplicative separable solutions.
Consider an equation of two independent variables x, y and a dependent
variable w, we look for the product of two functions depending on different
arguments:
`w(x, y, z) = X(x)*u(y, z)`
Examples
========
>>> from sympy import Function, Eq, pde_separate_mul, Derivative as D
>>> from sympy.abc import x, y
>>> u, X, Y = map(Function, 'uXY')
>>> eq = Eq(D(u(x, y), x, 2), D(u(x, y), y, 2))
>>> pde_separate_mul(eq, u(x, y), [X(x), Y(y)])
[Derivative(X(x), x, x)/X(x), Derivative(Y(y), y, y)/Y(y)]
"""
return pde_separate(eq, fun, sep, strategy='mul')
def _rebuild_expr(self, expr, mapping):
domain = self.domain
def _rebuild(expr):
generator = mapping.get(expr)
if generator is not None:
return generator
elif expr.is_Add:
return reduce(add, list(map(_rebuild, expr.args)))
elif expr.is_Mul:
return reduce(mul, list(map(_rebuild, expr.args)))
elif expr.is_Pow and expr.exp.is_Integer:
return _rebuild(expr.base)**int(expr.exp)
else:
try:
return domain.convert(expr)
except CoercionFailed:
if not domain.has_Field and domain.has_assoc_Field:
return domain.get_field().convert(expr)
else:
raise
return _rebuild(sympify(expr))
def _rebuild_expr(self, expr, mapping):
domain = self.domain
def _rebuild(expr):
generator = mapping.get(expr)
if generator is not None:
return generator
elif expr.is_Add:
return reduce(add, list(map(_rebuild, expr.args)))
elif expr.is_Mul:
return reduce(mul, list(map(_rebuild, expr.args)))
elif expr.is_Pow and expr.exp.is_Integer and expr.exp >= 0:
return _rebuild(expr.base)**int(expr.exp)
else:
return domain.convert(expr)
return _rebuild(sympify(expr))
def mul(self, *objs):
"""
Multiply a sequence of polynomials or containers of polynomials.
Example
-------
>>> from sympy.polys.rings import ring
>>> from sympy.polys.domains import ZZ
>>> R, x = ring("x", ZZ)
>>> R.mul([ x**2 + 2*i + 3 for i in range(4) ])
x**8 + 24*x**6 + 206*x**4 + 744*x**2 + 945
>>> _.factor_list()
(1, [(x**2 + 3, 1), (x**2 + 5, 1), (x**2 + 7, 1), (x**2 + 9, 1)])
"""
p = self.one
for obj in objs:
if is_sequence(obj, include=GeneratorType):
p *= self.mul(*obj)
else:
p *= obj
return p
def test_safe_binop():
# Test checked arithmetic routines
ops = [
(operator.add, 1),
(operator.sub, 2),
(operator.mul, 3)
]
with exc_iter(ops, INT64_VALUES, INT64_VALUES) as it:
for xop, a, b in it:
pyop, op = xop
c = pyop(a, b)
if not (INT64_MIN <= c <= INT64_MAX):
assert_raises(OverflowError, mt.extint_safe_binop, a, b, op)
else:
d = mt.extint_safe_binop(a, b, op)
if c != d:
# assert_equal is slow
assert_equal(d, c)
def check_mul(Poly):
c1 = list(random((4,)) + .5)
c2 = list(random((3,)) + .5)
p1 = Poly(c1)
p2 = Poly(c2)
p3 = p1 * p2
assert_poly_almost_equal(p2 * p1, p3)
assert_poly_almost_equal(p1 * c2, p3)
assert_poly_almost_equal(c2 * p1, p3)
assert_poly_almost_equal(p1 * tuple(c2), p3)
assert_poly_almost_equal(tuple(c2) * p1, p3)
assert_poly_almost_equal(p1 * np.array(c2), p3)
assert_poly_almost_equal(np.array(c2) * p1, p3)
assert_poly_almost_equal(p1 * 2, p1 * Poly([2]))
assert_poly_almost_equal(2 * p1, p1 * Poly([2]))
assert_raises(TypeError, op.mul, p1, Poly([0], domain=Poly.domain + 1))
assert_raises(TypeError, op.mul, p1, Poly([0], window=Poly.window + 1))
if Poly is Polynomial:
assert_raises(TypeError, op.mul, p1, Chebyshev([0]))
else:
assert_raises(TypeError, op.mul, p1, Polynomial([0]))
def list(**type):
"""List all the enumerations within the database.
Search type can be identified by providing a named argument.
like = glob match
regex = regular expression
index = particular index
identifier = particular id number
pred = function predicate
"""
res = __builtin__.list(iterate(**type))
maxindex = max(__builtin__.map(idaapi.get_enum_idx, res))
maxname = max(__builtin__.map(utils.compose(idaapi.get_enum_name, len), res))
maxsize = max(__builtin__.map(size, res))
cindex = math.ceil(math.log(maxindex or 1)/math.log(10))
cmask = max(__builtin__.map(utils.compose(mask, math.log, functools.partial(operator.mul, 1.0/math.log(16)), math.ceil), res) or [database.config.bits()/4.0])
for n in res:
print("[{:{:d}d}] {:>{:d}s} & {:#<{:d}x} ({:d} members){:s}".format(idaapi.get_enum_idx(n), int(cindex), idaapi.get_enum_name(n), maxname, mask(n), int(cmask), len(__builtin__.list(members(n))), " // {:s}".format(comment(n)) if comment(n) else ''))
return
## members
def test_optimized_concat(self):
x = 'x' * MAX_Py_ssize_t
try:
x = x + '?' # this statement uses a fast path in ceval.c
except OverflowError:
pass
else:
self.fail("should have raised OverflowError")
try:
x += '?' # this statement uses a fast path in ceval.c
except OverflowError:
pass
else:
self.fail("should have raised OverflowError")
self.assertEqual(len(x), MAX_Py_ssize_t)
### the following test is pending a patch
# (http://mail.python.org/pipermail/python-dev/2006-July/067774.html)
#@bigaddrspacetest
#def test_repeat(self):
# self.assertRaises(OverflowError, operator.mul, 'x', MAX_Py_ssize_t + 1)
def test_optimized_concat(self):
x = 'x' * MAX_Py_ssize_t
try:
x = x + '?' # this statement uses a fast path in ceval.c
except OverflowError:
pass
else:
self.fail("should have raised OverflowError")
try:
x += '?' # this statement uses a fast path in ceval.c
except OverflowError:
pass
else:
self.fail("should have raised OverflowError")
self.assertEqual(len(x), MAX_Py_ssize_t)
### the following test is pending a patch
# (http://mail.python.org/pipermail/python-dev/2006-July/067774.html)
#@bigaddrspacetest
#def test_repeat(self):
# self.assertRaises(OverflowError, operator.mul, 'x', MAX_Py_ssize_t + 1)
def largest_product(txt, n):
if len(txt) < n or n < 0:
raise ValueError("Input length is less than n")
if any(not c.isdigit() for c in txt):
raise ValueError("Input must be numeric")
if n == 0:
return 1
products = [reduce(mul, grp) for grp in slices(txt, n)]
largest = reduce(max, products)
return largest
# from series.py