def load_previous(self, path=None):
"""Load previous copy of config from disk.
In normal usage you don't need to call this method directly - it
is called automatically at object initialization.
:param path:
File path from which to load the previous config. If `None`,
config is loaded from the default location. If `path` is
specified, subsequent `save()` calls will write to the same
path.
"""
self.path = path or self.path
with open(self.path) as f:
self._prev_dict = json.load(f)
for k, v in copy.deepcopy(self._prev_dict).items():
if k not in self:
self[k] = v
python类deepcopy()的实例源码
def handle_results_collected(self, signal, sender, results, context, **kw):
name_value_pairs = list(map(self.to_name_value_pair, results))
self.ensure_unique_names(name_value_pairs)
def get_data(*parts):
d = self.data
for p in parts:
d.setdefault(p, {})
d = d[p]
return d
def handle_result(name, result):
d = get_data(sender.id_, sender.type_name)
current = d.get(name, None)
if current is None or current.value < result:
# TODO: once serialization, no need to deepcopy
d[name] = Result(value=result, context=copy.deepcopy(context))
for name, result in name_value_pairs:
handle_result(name, result)
def __exit__(self, exc_type, exc_val, exc_tb):
self.before_exit()
signal_responses = results_collected.send_robust(
sender=self, results=self.get_results_to_send(),
context=copy.deepcopy(context.current.data))
if exc_type is None:
for (receiver, response) in signal_responses:
if isinstance(response, BaseException):
orig_tb = ''.join(
traceback.format_tb(response.__traceback__))
error_msg = '{}{}: {}'.format(
orig_tb,
type(response).__name__,
str(response)
)
if hasattr(response, 'clone_with_more_info'):
new_exc = response.clone_with_more_info(
orig_tb=orig_tb)
else:
new_exc = type(response)(error_msg)
raise new_exc
def list_kuryr_opts():
"""Return a list of oslo_config options available in Kuryr service.
Each element of the list is a tuple. The first element is the name of the
group under which the list of elements in the second element will be
registered. A group name of None corresponds to the [DEFAULT] group in
config files.
This function is also discoverable via the 'kuryr' entry point under
the 'oslo_config.opts' namespace.
The purpose of this is to allow tools like the Oslo sample config file
generator to discover the options exposed to users by Kuryr.
:returns: a list of (group_name, opts) tuples
"""
return ([(k, copy.deepcopy(o)) for k, o in _kuryr_k8s_opts] +
lib_opts.list_kuryr_opts() + _options.list_opts())
def get_item_label(self, item):
"""
retrieve item label property
:param item: single item in JSON format
:return: label property
"""
label = EMPTY_RESPONSE
item_type = get_json_property(item, TYPE)
if item_type == 'smartfield':
custom_response_id_to_label_map = self.audit_custom_response_id_to_label_map()
conditional_id = get_json_property(item, 'options', 'condition')
if conditional_id:
label = copy.deepcopy(smartfield_conditional_id_to_statement_map.get(conditional_id)) or EMPTY_RESPONSE
for value in get_json_property(item, 'options', 'values'):
label += '|'
if value in standard_response_id_map.keys():
label += standard_response_id_map[value]
elif value in custom_response_id_to_label_map.keys():
label += custom_response_id_to_label_map[value]
else:
label += str(value)
label += '|'
return label
else:
return get_json_property(item, LABEL)
def test_file_load(self):
"""
Load the simple filters by file
"""
entry = copy(self.template_entry)
fb = NNTPFilterBase(paths=join(self.var_dir, 'simple.nrf'))
# Our hash will start at 0 (Zero)
assert len(fb._regex_hash) == 0
# But now we meet our score
entry['subject'] = 'A great video called "blah.avi"'
assert fb.blacklist(**entry) == False
entry['subject'] = 'A malicious file because it is "blah.avi.exe"'
assert fb.blacklist(**entry) == True
# Now load the directory; it should just find the same nrf file.
fbd = NNTPFilterBase(paths=self.var_dir)
def test_scoring_image_files(self):
"""
Test that we correctly score image files
"""
sf = NNTPSimpleFilter()
entry = copy(self.template_entry)
# Expected Score
score = 15
# Test against video files:
for e in [ 'jpg', 'jpeg', 'gif', 'png', 'bmp' ]:
entry['subject'] = 'What.A.Great.Image (1/1) ' +\
'"what.a.great.image.%s" Yenc (1/1)' % e
assert sf.score(**entry) == score
def _get_tags(data):
ret = {}
for toplist, toplevel in data.get('firewall', {}).iteritems():
for audit_dict in toplevel:
for audit_id, audit_data in audit_dict.iteritems():
tags_dict = audit_data.get('data', {})
tag = tags_dict.pop('tag')
if tag not in ret:
ret[tag] = []
formatted_data = copy.deepcopy(tags_dict)
formatted_data['type'] = toplist
formatted_data['tag'] = tag
formatted_data['module'] = 'firewall'
formatted_data.update(audit_data)
formatted_data.pop('data')
ret[tag].append(formatted_data)
return ret
def test_update_text(self):
with open(os.path.join(self.FIXTURES_DIR, 'eyes_in_the_skies.json')) as f:
final_data = json.load(f)
original_text = final_data['RTR']['cards'][0]['originalText']
final_text = final_data['RTR']['cards'][0]['text']
# Copy the data and munge it into its original state.
original_data = copy.deepcopy(final_data)
original_data['RTR']['cards'][0]['text'] = original_text
# Import the original data.
parse_data(original_data, ['RTR'])
eyes_in_the_skies = Card.objects.first()
self.assertEqual(eyes_in_the_skies.text, original_text)
# Import the final, updated data.
parse_data(final_data, ['RTR'])
eyes_in_the_skies.refresh_from_db()
self.assertEqual(eyes_in_the_skies.text, final_text)
def test_update_types(self):
with open(os.path.join(self.FIXTURES_DIR, 'jackal_pup.json')) as f:
final_data = json.load(f)
# Copy the data and munge the types.
original_data = copy.deepcopy(final_data)
original_subtype = 'Hound'
original_data['TMP']['cards'][0]['subtypes'] = [original_subtype]
# Import the original data.
parse_data(original_data, ['TMP'])
jackal_pup = Card.objects.first()
self.assertEqual(jackal_pup.subtypes.count(), 1)
self.assertEqual(jackal_pup.subtypes.first().name, original_subtype)
# Import the final, updated data.
parse_data(final_data, ['TMP'])
jackal_pup.refresh_from_db()
self.assertEqual(jackal_pup.subtypes.count(), 1)
self.assertEqual(jackal_pup.subtypes.first().name, 'Jackal')
# The Hound subtype has been deleted.
self.assertFalse(CardSubtype.objects.filter(name=original_subtype).exists())
def test_update_loyalty(self):
"""
Simulates the upgrade process from version 0.2 to version 0.4.
"""
with open(os.path.join(self.FIXTURES_DIR, 'vraska_the_unseen.json')) as f:
final_data = json.load(f)
# Copy the data and munge it to remove the loyalty.
original_data = copy.deepcopy(final_data)
del original_data['RTR']['cards'][0]['loyalty']
# Import the original data.
parse_data(original_data, ['RTR'])
vraska = Card.objects.first()
self.assertIsNone(vraska.loyalty)
# Import the final, updated data.
parse_data(final_data, ['RTR'])
vraska.refresh_from_db()
self.assertEqual(vraska.loyalty, 5)
test_pytest_selenium_pdiff.py 文件源码
项目:pytest-selenium-pdiff
作者: rentlytics
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def test_pytest_report_header():
old_settings = deepcopy(settings)
settings['USE_IMAGEMAGICK'] = True
settings['USE_PERCEPTUALDIFF'] = True
assert 'ImageMagick' in pytest_report_header(None)
settings['USE_IMAGEMAGICK'] = False
assert 'perceptualdiff' in pytest_report_header(None)
settings['USE_PERCEPTUALDIFF'] = False
with pytest.raises(Exception) as e:
pytest_report_header(None)
settings.update(old_settings)
def test_create_router_failure(self):
router_info = copy.deepcopy(fake_router_object)
router_info['router'].update({'status': 'ACTIVE',
'id': fake_router_uuid})
context = mock.Mock(current=fake_router_object)
response = self._create_rest_response(requests.codes.bad_gateway)
with mock.patch.object(ac_rest.RestClient, 'process_request',
return_value=response):
with mock.patch.object(L3_NAT_db_mixin,
'create_router',
return_value=fake_router_db):
acl3router = HuaweiACL3RouterPlugin()
self.assertRaises(ml2_exc.MechanismDriverError,
acl3router.create_router,
context, router_info)
def test_add_router_interface_key_error_exception(self):
router_info = copy.deepcopy(fake_router_object)
router_info['router'].update({'status': 'ACTIVE',
'id': fake_router_uuid})
context = mock.Mock(current=fake_router_object)
interface_info = {'port_id': fake_port_id}
del interface_info['port_id']
with mock.patch.object(L3_NAT_db_mixin, 'get_router',
return_value=fake_router_db):
with mock.patch.object(L3_NAT_with_dvr_db_mixin,
'add_router_interface',
return_value=interface_info):
acl3router = HuaweiACL3RouterPlugin()
self.assertRaises(KeyError,
acl3router.add_router_interface,
context,
fake_router_db['id'],
interface_info)
def get_problem_instance(pid, tid):
"""
Returns the problem instance dictionary that can be displayed to the user.
Args:
pid: the problem id
tid: the team id
Returns:
The problem instance
"""
problem = deepcopy(get_problem(pid=pid, tid=tid))
instance = get_instance_data(pid, tid)
problem['solves'] = api.stats.get_problem_solves(pid=pid)
problem.pop("instances")
problem.update(instance)
return problem
def setType(props, prop_id, _type, typeCheck=True):
'''
change the property type for the element in the props sequence with id of prop_id
This method returns a copy of props (does not modify the props input)
'''
if typeCheck:
__typeCheck(props)
if not properties.getTypeMap().has_key(_type):
raise BadValue('Type "'+_type+'" does not exist')
if _type == getType(props, prop_id, False):
return props
prop_idx = __getPropIdx(props, prop_id)
ret_props = copy.deepcopy(props)
if props[prop_idx].value._t._k == CORBA.tk_sequence:
ret_props[prop_idx].value._t._d = (props[prop_idx].value._t._d[0], tvCode(_type), props[prop_idx].value._t._d[2])
else:
ret_props[prop_idx].value = properties.to_tc_value(props[prop_idx].value,_type)
return ret_props
def sendChangedPropertiesEvent(self):
eventable_ids = []
for prop_id in self._component._props.keys():
prop_def = self._component._props.getPropDef(prop_id)
if prop_def.isSendEventChange():
newValue = self._component._props[prop_id]
try:
oldValue = self._last_property_event_state[prop_id]
self._last_property_event_state[prop_id] = copy.deepcopy(newValue)
except KeyError:
self._component._log.debug("Issuing event for the first time %s", prop_id)
self._last_property_event_state[prop_id] = copy.deepcopy(newValue)
eventable_ids.append(prop_id)
else:
if prop_def.compareValues(oldValue, newValue):
self._component._log.debug("Issuing event for %s (%s != %s)", prop_id, oldValue, newValue)
eventable_ids.append(prop_id)
self._component._log.debug("Eventing for properties %s", eventable_ids)
self.sendPropertiesEvent(eventable_ids)
def load_previous(self, path=None):
"""Load previous copy of config from disk.
In normal usage you don't need to call this method directly - it
is called automatically at object initialization.
:param path:
File path from which to load the previous config. If `None`,
config is loaded from the default location. If `path` is
specified, subsequent `save()` calls will write to the same
path.
"""
self.path = path or self.path
with open(self.path) as f:
self._prev_dict = json.load(f)
for k, v in copy.deepcopy(self._prev_dict).items():
if k not in self:
self[k] = v
def load_previous(self, path=None):
"""Load previous copy of config from disk.
In normal usage you don't need to call this method directly - it
is called automatically at object initialization.
:param path:
File path from which to load the previous config. If `None`,
config is loaded from the default location. If `path` is
specified, subsequent `save()` calls will write to the same
path.
"""
self.path = path or self.path
with open(self.path) as f:
self._prev_dict = json.load(f)
for k, v in copy.deepcopy(self._prev_dict).items():
if k not in self:
self[k] = v
def prepare(self):
gt_labels = self.load_labels()
if self.flipped:
print('Appending horizontally-flipped training examples ...')
gt_labels_cp = copy.deepcopy(gt_labels)
for idx in range(len(gt_labels_cp)):
gt_labels_cp[idx]['flipped'] = True
gt_labels_cp[idx]['label'] = gt_labels_cp[idx]['label'][:, ::-1, :]
for i in xrange(self.cell_size):
for j in xrange(self.cell_size):
if gt_labels_cp[idx]['label'][i, j, 0] == 1:
gt_labels_cp[idx]['label'][i, j, 1] = self.image_size - 1 - gt_labels_cp[idx]['label'][i, j, 1]
gt_labels += gt_labels_cp
np.random.shuffle(gt_labels)
self.gt_labels = gt_labels
return gt_labels
def clustering_plot_func(chart, sample_properties, sample_data, plot_func, args=[], kwargs={}):
if len(sample_properties['genomes']) > 1:
return None
analysis = sample_data.analysis
if analysis is None:
return None
new_charts = []
for clustering_key, clustering in analysis.clusterings.iteritems():
kwargs['clustering'] = clustering
kwargs['original_cluster_sizes'] = sample_data.original_cluster_sizes[clustering_key]
kwargs['diff_expr'] = analysis.differential_expression[clustering_key]
new_chart = plot_func(copy.deepcopy(chart), *args, **kwargs)
if new_chart is not None:
new_chart['filters'] = {ws_gex_constants.CLUSTERS_FILTER_TITLE: clustering.description}
new_charts.append(new_chart)
return new_charts
def build_charts(sample_properties, chart_dicts, sample_data, module=None):
modules = [module, globals()] if module else [globals()]
filters = make_chart_filters(sample_properties, sample_data.analysis)
charts = []
for chart_dict in chart_dicts:
chart_dict = copy.deepcopy(chart_dict)
function = chart_dict.pop('function')
for module in modules:
f = module.get(function)
if f is not None:
break
kwargs = chart_dict.pop('kwargs', {})
new_chart_obj = f(chart_dict, sample_properties, sample_data, **kwargs)
if new_chart_obj is None:
continue
new_charts = new_chart_obj if isinstance(new_chart_obj, list) else [new_chart_obj]
charts.extend(new_charts)
return charts, filters
def build_web_summary_json(sample_properties, sample_data, pipeline):
view = copy.deepcopy(sample_properties)
metrics, alarms, charts, all_prefixes = get_constants_for_pipeline(pipeline)
tables, alarms = build_tables(sample_properties, metrics, alarms, sample_data, all_prefixes=all_prefixes)
if tables:
view['tables'] = tables
if alarms:
view['alarms'] = alarms
charts, filters = build_charts(sample_properties, charts,
sample_data=sample_data)
if charts:
view['charts'] = charts
if filters:
view['filters'] = filters
# Selected metrics that the web summary template needs
info = build_info_dict(sample_properties, sample_data, pipeline)
if info:
view['info'] = info
return view
def collapse_recursive(self, buf, level):
# print 'level:\t', level
merged = self.collapse(self.buffer[level], buf)
if level + 1 >= len(self.buffer):
self.buffer.append([])
self.b += 1
using_tmp_merge = True
if len(self.buffer[level + 1]) == 0:
self.buffer[level + 1] = copy.deepcopy(merged)
using_tmp_merge = False
if not using_tmp_merge:
return
self.collapse_recursive(merged, level + 1)
def _deepcopy_tuple(x, memo):
y = []
for a in x:
y.append(deepcopy(a, memo))
d = id(x)
try:
return memo[d]
except KeyError:
pass
for i in range(len(x)):
if x[i] is not y[i]:
y = tuple(y)
break
else:
y = x
memo[d] = y
return y
def _deepcopy_inst(x, memo):
if hasattr(x, '__deepcopy__'):
return x.__deepcopy__(memo)
if hasattr(x, '__getinitargs__'):
args = x.__getinitargs__()
args = deepcopy(args, memo)
y = x.__class__(*args)
else:
y = _EmptyClass()
y.__class__ = x.__class__
memo[id(x)] = y
if hasattr(x, '__getstate__'):
state = x.__getstate__()
else:
state = x.__dict__
state = deepcopy(state, memo)
if hasattr(y, '__setstate__'):
y.__setstate__(state)
else:
y.__dict__.update(state)
return y
def __deepcopy__(self, memo):
"""Return a deep copy of a set; used by copy module."""
# This pre-creates the result and inserts it in the memo
# early, in case the deep copy recurses into another reference
# to this same set. A set can't be an element of itself, but
# it can certainly contain an object that has a reference to
# itself.
from copy import deepcopy
result = self.__class__()
memo[id(self)] = result
data = result._data
value = True
for elt in self:
data[deepcopy(elt, memo)] = value
return result
# Standard set operations: union, intersection, both differences.
# Each has an operator version (e.g. __or__, invoked with |) and a
# method version (e.g. union).
# Subtle: Each pair requires distinct code so that the outcome is
# correct when the type of other isn't suitable. For example, if
# we did "union = __or__" instead, then Set().union(3) would return
# NotImplemented instead of raising TypeError (albeit that *why* it
# raises TypeError as-is is also a bit subtle).
def _update_entire_object(self):
if self.__class__.primary_key in self._dirty_attributes.keys() or self._model_unique_id is None:
new_object_info = deepcopy(self._info)
try:
if not self._new_object_needs_primary_key:
del(new_object_info[self.__class__.primary_key])
except Exception:
pass
log.debug("Creating a new {0:s} object".format(self.__class__.__name__))
ret = self._cb.api_json_request(self.__class__._new_object_http_method, self.urlobject,
data={self.info_key: new_object_info})
else:
log.debug("Updating {0:s} with unique ID {1:s}".format(self.__class__.__name__, str(self._model_unique_id)))
ret = self._cb.api_json_request(self.__class__._change_object_http_method,
self._build_api_request_uri(), data={self.info_key: self._info})
return self._refresh_if_needed(ret)
def _update_object(self):
if self.__class__.primary_key in self._dirty_attributes.keys() or self._model_unique_id is None:
new_object_info = deepcopy(self._info)
try:
if not self._new_object_needs_primary_key:
del(new_object_info[self.__class__.primary_key])
except Exception:
pass
log.debug("Creating a new {0:s} object".format(self.__class__.__name__))
ret = self._cb.api_json_request(self.__class__._new_object_http_method, self.urlobject,
data=new_object_info)
else:
log.debug("Updating {0:s} with unique ID {1:s}".format(self.__class__.__name__, str(self._model_unique_id)))
http_method = self.__class__._change_object_http_method
ret = self._cb.api_json_request(http_method,self._build_api_request_uri(http_method=http_method),
data=self._info)
return self._refresh_if_needed(ret)
def nested_derivations(self, style):
# type: (Style) -> List[Style]
options = [('BreakBeforeBraces', 'Custom')]
nstyles = []
for optionname, value in options:
optdef = styledef_option(self.styledefinition, optionname)
# We can only use this nested option if the clang version in use supports it.
if optdef is None:
continue
if value not in option_configs(optdef):
continue
if style.get(optionname) != value:
nstyle = Style(copy.deepcopy(style))
set_option(nstyle, optionname, value)
nstyles.append(nstyle)
return nstyles