def test_delete_action_plan(self):
_, goal = self.client.show_goal("dummy")
_, audit_template = self.create_audit_template(goal['uuid'])
_, audit = self.create_audit(audit_template['uuid'])
self.assertTrue(test_utils.call_until_true(
func=functools.partial(self.has_audit_finished, audit['uuid']),
duration=30,
sleep_for=.5
))
_, action_plans = self.client.list_action_plans(
audit_uuid=audit['uuid'])
action_plan = action_plans['action_plans'][0]
_, action_plan = self.client.show_action_plan(action_plan['uuid'])
self.client.delete_action_plan(action_plan['uuid'])
self.assertRaises(exceptions.NotFound, self.client.show_action_plan,
action_plan['uuid'])
python类partial()的实例源码
def create_action_plan(cls, audit_template_uuid, **audit_kwargs):
"""Wrapper utility for creating a test action plan
:param audit_template_uuid: Audit template UUID to use
:param audit_kwargs: Dict of audit properties to set
:return: The action plan as dict
"""
_, audit = cls.create_audit(audit_template_uuid, **audit_kwargs)
audit_uuid = audit['uuid']
assert test_utils.call_until_true(
func=functools.partial(cls.has_audit_finished, audit_uuid),
duration=30,
sleep_for=.5
)
_, action_plans = cls.client.list_action_plans(audit_uuid=audit_uuid)
if len(action_plans['action_plans']) == 0:
return
return action_plans['action_plans'][0]
def _augment_module_post(net: nn.Module, callback_dict: dict) -> (dict, list):
backward_hook_remove_func_list = []
vis_param_dict = dict()
vis_param_dict['layer'] = None
vis_param_dict['index'] = None
vis_param_dict['method'] = GradType.NAIVE
for x, y in net.named_modules():
if not isinstance(y, nn.Sequential) and y is not net:
# I should add hook to all layers, in case they will be needed.
backward_hook_remove_func_list.append(
y.register_backward_hook(
partial(_backward_hook, module_name=x, callback_dict=callback_dict, vis_param_dict=vis_param_dict)))
def remove_handles():
for x in backward_hook_remove_func_list:
x.remove()
return vis_param_dict, remove_handles
def test_cascade(self):
# Register 2 functions and make sure the last registered
# function is executed first.
ret = pyrun(textwrap.dedent(
"""
import functools, os, imp
mod = imp.load_source("mod", r"{}")
def foo(s):
with open(r"{}", "ab") as f:
f.write(s)
mod.register_exit_fun(functools.partial(foo, b'1'))
mod.register_exit_fun(functools.partial(foo, b'2'))
""".format(os.path.abspath(__file__), TESTFN)
))
self.assertEqual(ret, 0)
with open(TESTFN, "rb") as f:
self.assertEqual(f.read(), b"21")
def __build_buttons(self):
self.__reset_button = tkinter.Button(self)
self.__reset_button['text'] = 'Reset'
self.__reset_button['command'] = self.__reset
self.__reset_button.grid(column=0, row=0,
columnspan=10, sticky=tkinter.EW)
self.__buttons = []
for y in range(self.__height):
row = []
for x in range(self.__width):
button = tkinter.Button(self)
button.grid(column=x, row=y+1)
button['text'] = '?'
command = functools.partial(self.__push, x, y)
button['command'] = command
row.append(button)
self.__buttons.append(row)
def __build_buttons(self):
self.__reset_button = tkinter.Button(self)
self.__reset_button['text'] = 'Reset'
self.__reset_button['command'] = self.__reset
self.__reset_button.grid(column=0, row=1,
columnspan=10, sticky=tkinter.EW)
self.__buttons = []
for y in range(self.__height):
row = []
for x in range(self.__width):
button = tkinter.Button(self, width=2, height=1, text='?')
button.grid(column=x, row=y+2)
command = functools.partial(self.__push, x, y)
button['command'] = command
row.append(button)
self.__buttons.append(row)
def __build_buttons(self):
self.__reset_button = tkinter.Button(self)
self.__reset_button['text'] = 'Reset'
self.__reset_button['command'] = self.__reset
self.__reset_button.grid(column=0, row=1,
columnspan=10, sticky=tkinter.EW)
self.__buttons = []
for y in range(self.__height):
row = []
for x in range(self.__width):
button = tkinter.Button(self, width=2, height=1, text='?')
button.grid(column=x, row=y+2)
command = functools.partial(self.__push, x, y)
button['command'] = command
row.append(button)
self.__buttons.append(row)
def __build_buttons(self):
self.__reset_button = tkinter.Button(self)
self.__reset_button['text'] = 'Reset'
self.__reset_button['command'] = self.__reset
self.__reset_button.grid(column=0, row=1,
columnspan=10, sticky=tkinter.EW)
self.__buttons = []
for y in range(self.__height):
row = []
for x in range(self.__width):
button = tkinter.Button(self, width=2, height=1, text='?')
button.grid(column=x, row=y+2)
command = functools.partial(self.__push, x, y)
button['command'] = command
row.append(button)
self.__buttons.append(row)
def __build_buttons(self):
self.__reset_button = tkinter.Button(self)
self.__reset_button['text'] = 'Reset'
self.__reset_button['command'] = self.__reset
self.__reset_button.grid(column=0, row=1,
columnspan=10, sticky=tkinter.EW)
self.__buttons = []
for y in range(self.__height):
row = []
for x in range(self.__width):
button = tkinter.Button(self, width=2, height=1, text='?')
button.grid(column=x, row=y+2)
command = functools.partial(self.__push, x, y)
button['command'] = command
row.append(button)
self.__buttons.append(row)
def __build_buttons(self):
self.__reset_button = tkinter.Button(self)
self.__reset_button['text'] = 'Reset'
self.__reset_button['command'] = self.__reset
self.__reset_button.grid(column=0, row=1,
columnspan=10, sticky=tkinter.EW)
self.__buttons = []
for y in range(self.__height):
row = []
for x in range(self.__width):
button = tkinter.Button(self, width=2, height=1, text='?')
button.grid(column=x, row=y+2)
command = functools.partial(self.__push, x, y)
button['command'] = command
row.append(button)
self.__buttons.append(row)
def data_parallel(f, input, params, stats, mode, device_ids, output_device=None):
if output_device is None:
output_device = device_ids[0]
if len(device_ids) == 1:
return f(input, params, stats, mode)
def replicate(param_dict, g):
replicas = [{} for d in device_ids]
for k,v in param_dict.items():
for i,u in enumerate(g(v)):
replicas[i][k] = u
return replicas
params_replicas = replicate(params, lambda x: Broadcast(device_ids)(x))
stats_replicas = replicate(stats, lambda x: comm.broadcast(x, device_ids))
replicas = [partial(f, params=p, stats=s, mode=mode)
for p,s in zip(params_replicas, stats_replicas)]
inputs = scatter([input], device_ids)
outputs = parallel_apply(replicas, inputs)
return gather(outputs, output_device)
def __init__(self, *args, **kwargs):
super(GirderSession, self).__init__(*args, **kwargs)
self.wait_for_success = functools.partial(
self.wait_for,
predicate=lambda j:
j['status'] == JobStatus.SUCCESS,
on_timeout=lambda j:
'Timed out waiting for job/%s to move into success state' % j['_id'])
self.wait_for_error = functools.partial(
self.wait_for,
predicate=lambda j:
j['status'] == JobStatus.ERROR,
on_timeout=lambda j:
'Timed out waiting for job/%s to move into error state' % j['_id'])
self.wait_for_canceled = functools.partial(
self.wait_for,
predicate=lambda j:
j['status'] == JobStatus.CANCELED,
on_timeout=lambda j:
'Timed out waiting for job/%s to move into canceled state' % j['_id'])
def executemany(self, query, args):
"""Must be used with 'yield' as 'n = yield cursor.executemany(stmt)'.
"""
yield self._sem.acquire()
self._thread_pool.async_task(self._exec_task,
partial_func(self._cursor.executemany, query, args))
def callproc(self, proc, args=()):
"""Must be used with 'yield' as 'yield cursor.callproc(proc)'.
"""
yield self._sem.acquire()
self._thread_pool.async_task(self._exec_task,
partial_func(self._cursor.callproc, proc, args))
def execute(self, query, args=None):
"""Must be used with 'yield' as 'n = yield cursor.execute(stmt)'.
"""
yield self._sem.acquire()
self._thread_pool.async_task(self._exec_task,
partial_func(self._cursor.execute, query, args))
def executemany(self, query, args):
"""Must be used with 'yield' as 'n = yield cursor.executemany(stmt)'.
"""
yield self._sem.acquire()
self._thread_pool.async_task(self._exec_task,
partial_func(self._cursor.executemany, query, args))
def callproc(self, proc, args=()):
"""Must be used with 'yield' as 'yield cursor.callproc(proc)'.
"""
yield self._sem.acquire()
self._thread_pool.async_task(self._exec_task,
partial_func(self._cursor.callproc, proc, args))
def lazycache(filename, module_globals):
"""Seed the cache for filename with module_globals.
The module loader will be asked for the source only when getlines is
called, not immediately.
If there is an entry in the cache already, it is not altered.
:return: True if a lazy load is registered in the cache,
otherwise False. To register such a load a module loader with a
get_source method must be found, the filename must be a cachable
filename, and the filename must not be already cached.
"""
if filename in cache:
if len(cache[filename]) == 1:
return True
else:
return False
if not filename or (filename.startswith('<') and filename.endswith('>')):
return False
# Try for a __loader__, if available
if module_globals and '__loader__' in module_globals:
name = module_globals.get('__name__')
loader = module_globals['__loader__']
get_source = getattr(loader, 'get_source', None)
if name and get_source:
get_lines = functools.partial(get_source, name)
cache[filename] = (get_lines,)
return True
return False
def parsers(self):
"""Metadata item name to parser function mapping."""
parse_list = self._parse_list
parse_list_semicolon = partial(self._parse_list, separator=';')
parse_bool = self._parse_bool
parse_dict = self._parse_dict
return {
'zip_safe': parse_bool,
'use_2to3': parse_bool,
'include_package_data': parse_bool,
'package_dir': parse_dict,
'use_2to3_fixers': parse_list,
'use_2to3_exclude_fixers': parse_list,
'convert_2to3_doctests': parse_list,
'scripts': parse_list,
'eager_resources': parse_list,
'dependency_links': parse_list,
'namespace_packages': parse_list,
'install_requires': parse_list_semicolon,
'setup_requires': parse_list_semicolon,
'tests_require': parse_list_semicolon,
'packages': self._parse_packages,
'entry_points': self._parse_file,
'py_modules': parse_list,
}
def parse_section_extras_require(self, section_options):
"""Parses `extras_require` configuration file section.
:param dict section_options:
"""
parse_list = partial(self._parse_list, separator=';')
self['extras_require'] = self._parse_section_to_dict(
section_options, parse_list)