def _write_df_to_table(
self,
tbl,
df,
txn,
chunk_size,
idx=True,
idx_label=None,
):
df.to_sql(
tbl.name,
txn.connection,
index=idx,
index_label=(
idx_label
if idx_label is not None else
first(tbl.primary_key.columns).name
),
if_exists='append',
chunksize=chunk_size,
)
python类first()的实例源码
def test_comprehension():
user_config.session_overrides['EXPERIMENTAL_MODE'] = False
nodes = [node for node in use_comprehension_node.children
if node.code_element.name == 'ff']
assert nodes
ff_node = tz.first(nodes)
fn_with_comprehension_node = tz.first(
node for node in use_comprehension_node.children
if node.code_element.name == 'fn_with_comprehension'
)
assert any(node.code_element.name == 'fn_with_comprehension' for node in ff_node.parents)
assert any(node.code_element.name == 'ff' for node in fn_with_comprehension_node.children)
def extract_feature_array(beatmaps_and_mods):
"""Extract all features from a beatmap.
Parameters
----------
beatmaps_and_mods : list[Beatmap, dict[str, bool]]
The beatmaps and mod information to extract features from.
Returns
-------
features : np.ndarray[float64]
The features as an array.
"""
cache = {}
return np.array(
[
[
snd for
fst, snd in sorted(
extract_features(
beatmap,
**mods,
_cache=cache,
).items(),
key=first,
)
]
for beatmap, mods in beatmaps_and_mods
]
)
def is_requirement_exists(title):
q = title_query(title)
reqs = query_requirement(q)
def fltr(r):
print "Checking", unicode(r.title)
return title in unicode(r.title)
try:
res = first(filter(fltr, reqs))
except StopIteration:
res = False
return res
def is_in_requirements(title, requirements):
titles = list(filter(lambda r: title in str(r.title), requirements))
if len(titles) > 2:
raise Exception("Should not have multiple matches on Requirements")
elif len(titles) == 0:
return False
else:
return first(titles)
def get_module_node(effective_sys_path: List[Path], module_name: str) -> Tuple[Optional[Node], Optional[Exception]]:
from .errors import ModuleResolutionError
import_script = create_import_script(effective_sys_path, module_name)
definitions = import_script.goto_definitions()
if definitions:
mod = tz.first(definitions)
if tuple(map(int, jedi.__version__.split('.'))) >= (0,10,1):
# duck punch to avoid mod._name.api_type error, which uses parent_context.
mod._name.parent_context = mod._name.get_root_context()
if mod.module_path:
JediCodeElementNode.usage_resolution_modules |= frozenset((mod._name.get_root_context(),))
node = JediCodeElementNode.from_definition(
role='definition',
call_pos=(mod.module_path, (1,0), (None,None)),
definition=mod)
err = None
else:
node = None
err = ModuleResolutionError(
'Could not resolve module {} (did you mean to use "-f"?)'.format(module_name))
return node, err
def toggle_auto_highlight(self):
self.auto_highlight = not self.auto_highlight
self.status_bar.showMessage(
'Auto highlight toggled {}'.format(
'on' if self.auto_highlight else 'off'), msecs=3000)
self.status_bar.update()
if self.auto_highlight:
current_callList = tz.first(cl for cl in self.callLists if cl.hasFocus())
current = current_callList.currentItem()
if current:
current_callList.focus(current)
def test_get_called_functions():
test_script = """
import call_map.jedi_ast_tools as jat
def thunk():
print('hi')
def ff(node):
aa = jat.get_called_functions(node)
thunk()
"""
text_script = textwrap.dedent(test_script)
definitions = jedi.api.names(source=test_script)
def_ff = tz.first(filter(lambda x: x.name == 'ff', definitions))
called_by_ff = list(jat.get_called_functions(def_ff._name.tree_name.get_definition().children[-1]))
assert len(called_by_ff) == 2
assert {name.value for role, name, ast_node, start_pos, end_pos in called_by_ff} == {'thunk', 'get_called_functions'}
def summary(feature_names, features, **labels):
"""Summarize the data we are about to train with.
Parameters
----------
feature_names : iterable[str]
The names of the features in the ``features`` array.
features : np.ndarray
The 3d feature array.
**labels
The named label arrays.
Returns
-------
summary : str
A summary of the features and labels.
"""
single_attribute_template = dedent(
"""\
{name}:
mean: {mean}
std: {std}
min: {min}
max: {max}""",
)
def format_attribute(name, values):
return ' ' + '\n '.join(
single_attribute_template.format(
name=name,
mean=values.mean(),
std=values.std(),
min=values.min(),
max=values.max(),
).splitlines(),
)
return '\n'.join(concatv(
(
'summary:',
' labels:',
),
(
format_attribute(name, value)
for name, value in sorted(labels.items(), key=first)
),
(
'features:',
),
(
format_attribute(name, features[..., ix])
for ix, name in enumerate(feature_names)
)
))
def rolling_window(array, length):
"""Restride an array of shape (X_0, ... X_N) into an array of shape
(length, X_0 - length + 1, ... X_N) where each slice at index i along the
first axis is equivalent to result[i] = array[length * i:length * (i + 1)]
Parameters
----------
array : np.ndarray
The base array.
length : int
Length of the synthetic first axis to generate.
Returns
-------
out : np.ndarray
Example
-------
>>> from numpy import arange
>>> a = arange(25).reshape(5, 5)
>>> a
array([[ 0, 1, 2, 3, 4],
[ 5, 6, 7, 8, 9],
[10, 11, 12, 13, 14],
[15, 16, 17, 18, 19],
[20, 21, 22, 23, 24]])
>>> rolling_window(a, 2)
array([[[ 0, 1, 2, 3, 4],
[ 5, 6, 7, 8, 9]],
<BLANKLINE>
[[ 5, 6, 7, 8, 9],
[10, 11, 12, 13, 14]],
<BLANKLINE>
[[10, 11, 12, 13, 14],
[15, 16, 17, 18, 19]],
<BLANKLINE>
[[15, 16, 17, 18, 19],
[20, 21, 22, 23, 24]]])
"""
orig_shape = array.shape
if not orig_shape:
raise IndexError("Can't restride a scalar.")
elif orig_shape[0] <= length:
raise IndexError(
"Can't restride array of shape {shape} with"
" a window length of {len}".format(
shape=orig_shape,
len=length,
)
)
num_windows = (orig_shape[0] - length + 1)
new_shape = (num_windows, length) + orig_shape[1:]
new_strides = (array.strides[0],) + array.strides
return np.lib.stride_tricks.as_strided(array, new_shape, new_strides)
def _split_symbol_mappings(df):
"""Split out the symbol: sid mappings from the raw data.
Parameters
----------
df : pd.DataFrame
The dataframe with multiple rows for each symbol: sid pair.
Returns
-------
asset_info : pd.DataFrame
The asset info with one row per asset.
symbol_mappings : pd.DataFrame
The dataframe of just symbol: sid mappings. The index will be
the sid, then there will be three columns: symbol, start_date, and
end_date.
"""
mappings = df[list(mapping_columns)]
ambigious = {}
for symbol in mappings.symbol.unique():
persymbol = mappings[mappings.symbol == symbol]
intersections = list(intersecting_ranges(map(
from_tuple,
zip(persymbol.start_date, persymbol.end_date),
)))
if intersections:
ambigious[symbol] = (
intersections,
persymbol[['start_date', 'end_date']].astype('datetime64[ns]'),
)
if ambigious:
raise ValueError(
'Ambiguous ownership for %d symbol%s, multiple assets held the'
' following symbols:\n%s' % (
len(ambigious),
'' if len(ambigious) == 1 else 's',
'\n'.join(
'%s:\n intersections: %s\n %s' % (
symbol,
tuple(map(_format_range, intersections)),
# indent the dataframe string
'\n '.join(str(df).splitlines()),
)
for symbol, (intersections, df) in sorted(
ambigious.items(),
key=first,
),
),
)
)
return (
df.groupby(level=0).apply(_check_asset_group),
df[list(mapping_columns)],
)
def _train(client, params, data, labels, dmatrix_kwargs={}, **kwargs):
"""
Asynchronous version of train
See Also
--------
train
"""
# Break apart Dask.array/dataframe into chunks/parts
data_parts = data.to_delayed()
label_parts = labels.to_delayed()
if isinstance(data_parts, np.ndarray):
assert data_parts.shape[1] == 1
data_parts = data_parts.flatten().tolist()
if isinstance(label_parts, np.ndarray):
assert label_parts.ndim == 1 or label_parts.shape[1] == 1
label_parts = label_parts.flatten().tolist()
# Arrange parts into pairs. This enforces co-locality
parts = list(map(delayed, zip(data_parts, label_parts)))
parts = client.compute(parts) # Start computation in the background
yield _wait(parts)
# Because XGBoost-python doesn't yet allow iterative training, we need to
# find the locations of all chunks and map them to particular Dask workers
key_to_part_dict = dict([(part.key, part) for part in parts])
who_has = yield client.scheduler.who_has(keys=[part.key for part in parts])
worker_map = defaultdict(list)
for key, workers in who_has.items():
worker_map[first(workers)].append(key_to_part_dict[key])
ncores = yield client.scheduler.ncores() # Number of cores per worker
# Start the XGBoost tracker on the Dask scheduler
host, port = parse_host_port(client.scheduler.address)
env = yield client._run_on_scheduler(start_tracker,
host.strip('/:'),
len(worker_map))
# Tell each worker to train on the chunks/parts that it has locally
futures = [client.submit(train_part, env,
assoc(params, 'nthread', ncores[worker]),
list_of_parts, workers=worker,
dmatrix_kwargs=dmatrix_kwargs, **kwargs)
for worker, list_of_parts in worker_map.items()]
# Get the results, only one will be non-None
results = yield client._gather(futures)
result = [v for v in results if v][0]
raise gen.Return(result)