def doctable(ctx):
df = pd.read_csv('./docs/flight-options.csv')
# open an existing document
doc = docx.Document('./docs/style-reference.docx')
as_int = partial(format_decimal, format='#')
as_usd = partial(format_currency, currency='USD')
s = doc.sections[0]
width = s.page_width - s.left_margin - s.right_margin
doc.add_picture('./docs/diagrams_002.png', width=width)
formatters = {
'ticket_price': as_usd,
'total_hours': as_int,
'trip': as_int,
'airline': partial(shorten_long_name, width=20),
'selected': compose({0: 'No', 1: 'Yes'}.get, int)
}
add_table(df, doc, table_style='Plain Table 3', formatters=formatters)
# save the doc
doc.save('./docs/test.docx')
python类compose()的实例源码
def matches(self, pattern):
"""
Elementwise regex match.
Parameters
----------
pattern : str or compiled regex
Returns
-------
matches : np.ndarray[bool]
An array with the same shape as self indicating whether each
element of self was matched by ``pattern``.
"""
return self.map_predicate(compose(bool, pattern.match))
# These types all implement an O(N) __contains__, so pre-emptively
# coerce to `set`.
def create_influxdb_writer(influxdb_client, series_name="gpu_measurements", **tags):
""" Returns function which writes to influxdb
Parameters
----------
influxdb_client:
series_name: (str)
tags: Extra tags to be added to the measurements
"""
to_influxdb = _influxdb_writer_for(influxdb_client, series_name)
if tags:
logger.debug('Creating writer with tags')
write_to_db = compose(to_influxdb,
_add_tags(tags),
_to_json_dict,
parse_line)
else:
logger.debug('Creating writer')
write_to_db = compose(to_influxdb,
_to_json_dict,
parse_line)
return _call_when(write_to_db, lambda x: x is not None and '#' not in x)
def test_quantiles_uneven_buckets(self):
permute = partial(permute_rows, 5)
shape = (5, 5)
factor_data = permute(log1p(arange(25, dtype=float).reshape(shape)))
mask_data = permute(self.eye_mask(shape=shape))
f = F()
m = Mask()
permuted_array = compose(permute, partial(array, dtype=int64_dtype))
self.check_terms(
terms={
'3_masked': f.quantiles(bins=3, mask=m),
'7_masked': f.quantiles(bins=7, mask=m),
},
initial_workspace={
f: factor_data,
m: mask_data,
},
expected={
'3_masked': permuted_array([[-1, 0, 0, 1, 2],
[0, -1, 0, 1, 2],
[0, 0, -1, 1, 2],
[0, 0, 1, -1, 2],
[0, 0, 1, 2, -1]]),
'7_masked': permuted_array([[-1, 0, 2, 4, 6],
[0, -1, 2, 4, 6],
[0, 2, -1, 4, 6],
[0, 2, 4, -1, 6],
[0, 2, 4, 6, -1]]),
},
mask=self.build_mask(self.ones_mask(shape=shape)),
)
def __call__(self, *args, **kwargs):
contract = self.contract(*args, **kwargs)
if type(contract) == type:
# contract still needs to be initialized, wrap afterwards
return compose(ContractSugar, contract)
else:
return ContractSugar(contract)
def test_quantiles_uneven_buckets(self):
permute = partial(permute_rows, 5)
shape = (5, 5)
factor_data = permute(log1p(arange(25, dtype=float).reshape(shape)))
mask_data = permute(self.eye_mask(shape=shape))
f = F()
m = Mask()
permuted_array = compose(permute, partial(array, dtype=int64_dtype))
self.check_terms(
terms={
'3_masked': f.quantiles(bins=3, mask=m),
'7_masked': f.quantiles(bins=7, mask=m),
},
initial_workspace={
f: factor_data,
m: mask_data,
},
expected={
'3_masked': permuted_array([[-1, 0, 0, 1, 2],
[0, -1, 0, 1, 2],
[0, 0, -1, 1, 2],
[0, 0, 1, -1, 2],
[0, 0, 1, 2, -1]]),
'7_masked': permuted_array([[-1, 0, 2, 4, 6],
[0, -1, 2, 4, 6],
[0, 2, -1, 4, 6],
[0, 2, 4, -1, 6],
[0, 2, 4, 6, -1]]),
},
mask=self.build_mask(self.ones_mask(shape=shape)),
)
def format_results(terminal_width, key_list, separator, text_list,
left_align=True, min_factor=3, **kwargs):
"""Returns formatted results in two columns.
"""
key_width = max(map(len, key_list))
separator_length = len(separator)
desc_wrap = toolz.identity
if terminal_width:
if key_width / terminal_width > .5:
key_width = terminal_width // 2 - 3
text_width = terminal_width - key_width - separator_length
if text_width * min_factor > terminal_width:
desc_wrap = toolz.compose(
('\n' + ' ' * (key_width + separator_length)).join,
toolz.partial(textwrap.wrap, width=text_width, **kwargs),
)
if left_align:
fmt = '%-*s%s%s'
else:
fmt = '%*s%s%s'
for key, text in zip(key_list, text_list):
text = desc_wrap(text)
if len(key) > key_width:
yield fmt % (key_width, key, separator, '')
yield fmt % (key_width, '', ' ' * separator_length, text)
else:
yield fmt % (key_width, key, separator, text)
def transform_paragraphs(notebook, transformations=None):
""""""
if transformations:
f = compose(*reversed(transformations))
notebook["paragraphs"] = [f(p) for p in notebook.get("paragraphs", [])]
return notebook
def test_quantiles_unmasked(self, seed):
permute = partial(permute_rows, seed)
shape = (6, 6)
# Shuffle the input rows to verify that we don't depend on the order.
# Take the log to ensure that we don't depend on linear scaling or
# integrality of inputs
factor_data = permute(log1p(arange(36, dtype=float).reshape(shape)))
f = self.f
# Apply the same shuffle we applied to the input rows to our
# expectations. Doing it this way makes it obvious that our
# expectation corresponds to our input, while still testing against
# a range of input orderings.
permuted_array = compose(permute, partial(array, dtype=int64_dtype))
self.check_terms(
terms={
'2': f.quantiles(bins=2),
'3': f.quantiles(bins=3),
'6': f.quantiles(bins=6),
},
initial_workspace={
f: factor_data,
},
expected={
# The values in the input are all increasing, so the first half
# of each row should be in the bottom bucket, and the second
# half should be in the top bucket.
'2': permuted_array([[0, 0, 0, 1, 1, 1],
[0, 0, 0, 1, 1, 1],
[0, 0, 0, 1, 1, 1],
[0, 0, 0, 1, 1, 1],
[0, 0, 0, 1, 1, 1],
[0, 0, 0, 1, 1, 1]]),
# Similar for three buckets.
'3': permuted_array([[0, 0, 1, 1, 2, 2],
[0, 0, 1, 1, 2, 2],
[0, 0, 1, 1, 2, 2],
[0, 0, 1, 1, 2, 2],
[0, 0, 1, 1, 2, 2],
[0, 0, 1, 1, 2, 2]]),
# In the limiting case, we just have every column different.
'6': permuted_array([[0, 1, 2, 3, 4, 5],
[0, 1, 2, 3, 4, 5],
[0, 1, 2, 3, 4, 5],
[0, 1, 2, 3, 4, 5],
[0, 1, 2, 3, 4, 5],
[0, 1, 2, 3, 4, 5]]),
},
mask=self.build_mask(self.ones_mask(shape=shape)),
)
def expect_types(__funcname=_qualified_name, **named):
"""
Preprocessing decorator that verifies inputs have expected types.
Examples
--------
>>> @expect_types(x=int, y=str)
... def foo(x, y):
... return x, y
...
>>> foo(2, '3')
(2, '3')
>>> foo(2.0, '3') # doctest: +NORMALIZE_WHITESPACE +ELLIPSIS
Traceback (most recent call last):
...
TypeError: ...foo() expected a value of type int for argument 'x',
but got float instead.
Notes
-----
A special argument, __funcname, can be provided as a string to override the
function name shown in error messages. This is most often used on __init__
or __new__ methods to make errors refer to the class name instead of the
function name.
"""
for name, type_ in iteritems(named):
if not isinstance(type_, (type, tuple)):
raise TypeError(
"expect_types() expected a type or tuple of types for "
"argument '{name}', but got {type_} instead.".format(
name=name, type_=type_,
)
)
def _expect_type(type_):
# Slightly different messages for type and tuple of types.
_template = (
"%(funcname)s() expected a value of type {type_or_types} "
"for argument '%(argname)s', but got %(actual)s instead."
)
if isinstance(type_, tuple):
template = _template.format(
type_or_types=' or '.join(map(_qualified_name, type_))
)
else:
template = _template.format(type_or_types=_qualified_name(type_))
return make_check(
exc_type=TypeError,
template=template,
pred=lambda v: not isinstance(v, type_),
actual=compose(_qualified_name, type),
funcname=__funcname,
)
return preprocess(**valmap(_expect_type, named))
def test_quantiles_unmasked(self, seed):
permute = partial(permute_rows, seed)
shape = (6, 6)
# Shuffle the input rows to verify that we don't depend on the order.
# Take the log to ensure that we don't depend on linear scaling or
# integrality of inputs
factor_data = permute(log1p(arange(36, dtype=float).reshape(shape)))
f = self.f
# Apply the same shuffle we applied to the input rows to our
# expectations. Doing it this way makes it obvious that our
# expectation corresponds to our input, while still testing against
# a range of input orderings.
permuted_array = compose(permute, partial(array, dtype=int64_dtype))
self.check_terms(
terms={
'2': f.quantiles(bins=2),
'3': f.quantiles(bins=3),
'6': f.quantiles(bins=6),
},
initial_workspace={
f: factor_data,
},
expected={
# The values in the input are all increasing, so the first half
# of each row should be in the bottom bucket, and the second
# half should be in the top bucket.
'2': permuted_array([[0, 0, 0, 1, 1, 1],
[0, 0, 0, 1, 1, 1],
[0, 0, 0, 1, 1, 1],
[0, 0, 0, 1, 1, 1],
[0, 0, 0, 1, 1, 1],
[0, 0, 0, 1, 1, 1]]),
# Similar for three buckets.
'3': permuted_array([[0, 0, 1, 1, 2, 2],
[0, 0, 1, 1, 2, 2],
[0, 0, 1, 1, 2, 2],
[0, 0, 1, 1, 2, 2],
[0, 0, 1, 1, 2, 2],
[0, 0, 1, 1, 2, 2]]),
# In the limiting case, we just have every column different.
'6': permuted_array([[0, 1, 2, 3, 4, 5],
[0, 1, 2, 3, 4, 5],
[0, 1, 2, 3, 4, 5],
[0, 1, 2, 3, 4, 5],
[0, 1, 2, 3, 4, 5],
[0, 1, 2, 3, 4, 5]]),
},
mask=self.build_mask(self.ones_mask(shape=shape)),
)
def test_top_and_bottom_with_groupby_and_mask(self, dtype, seed):
permute = partial(permute_rows, seed)
permuted_array = compose(permute, partial(array, dtype=int64_dtype))
shape = (8, 8)
# Shuffle the input rows to verify that we correctly pick out the top
# values independently of order.
factor_data = permute(arange(0, 64, dtype=dtype).reshape(shape))
classifier_data = permuted_array([[0, 0, 1, 1, 2, 2, 0, 0],
[0, 0, 1, 1, 2, 2, 0, 0],
[0, 1, 2, 3, 0, 1, 2, 3],
[0, 1, 2, 3, 0, 1, 2, 3],
[0, 0, 0, 0, 1, 1, 1, 1],
[0, 0, 0, 0, 1, 1, 1, 1],
[0, 0, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0, 0, 0]])
f = self.f
c = self.c
self.check_terms(
terms={
'top2': f.top(2, groupby=c),
'bottom2': f.bottom(2, groupby=c),
},
initial_workspace={
f: factor_data,
c: classifier_data,
},
expected={
# Should be the rightmost two entries in classifier_data,
# ignoring the off-diagonal.
'top2': permuted_array([[0, 1, 1, 1, 1, 1, 1, 0],
[0, 1, 1, 1, 1, 1, 0, 1],
[1, 1, 1, 1, 1, 0, 1, 1],
[1, 1, 1, 1, 0, 1, 1, 1],
[0, 1, 1, 0, 0, 0, 1, 1],
[0, 1, 0, 1, 0, 0, 1, 1],
[0, 0, 0, 0, 0, 0, 1, 1],
[0, 0, 0, 0, 0, 0, 1, 1]], dtype=bool),
# Should be the rightmost two entries in classifier_data,
# ignoring the off-diagonal.
'bottom2': permuted_array([[1, 1, 1, 1, 1, 1, 0, 0],
[1, 1, 1, 1, 1, 1, 0, 0],
[1, 1, 1, 1, 1, 0, 1, 1],
[1, 1, 1, 1, 0, 1, 1, 1],
[1, 1, 0, 0, 1, 1, 0, 0],
[1, 1, 0, 0, 1, 1, 0, 0],
[1, 0, 1, 0, 0, 0, 0, 0],
[0, 1, 1, 0, 0, 0, 0, 0]],
dtype=bool),
},
mask=self.build_mask(permute(rot90(self.eye_mask(shape=shape)))),
)