def tearDown(self):
"""
Each test consumes a source, we need to rewind it.
"""
for _, source in itervalues(self.sim_and_source):
source.rewind()
python类itervalues()的实例源码
def everything_but(k, d):
"""
Return iterator of all values in d except the values in k.
"""
assert k in d
return concat(itervalues(keyfilter(ne(k), d)))
def test_update_timers(self):
images = self.getImages()
img_mocks = {}
for key in self.faker.pyiterable(10, True, str):
img = mock.Mock()
img.update_timer = mock.Mock()
img_mocks[key] = img
images.update(img_mocks)
images.update_timers()
for img in six.itervalues(img_mocks):
img.update_timer.assert_called_once_with()
def update_timers(self):
for image in six.itervalues(self):
image.update_timer()
def test_iter(self):
keys = ['first', 'middle', 'last']
values = list(range(len(keys)))
items = list(zip(keys, values))
om = OrderedMap(items)
itr = iter(om)
self.assertEqual(sum([1 for _ in itr]), len(keys))
self.assertRaises(StopIteration, six.next, itr)
self.assertEqual(list(iter(om)), keys)
self.assertEqual(list(six.iteritems(om)), items)
self.assertEqual(list(six.itervalues(om)), values)
def __check(self, key=None, oldvalue=None, newvalue=None, **kw):
"""Call all registered validation methods on this spec."""
for func in six.itervalues(self.__checks):
func(self, key, oldvalue, newvalue)
def complete_contexts(self):
'''
Returns a list of context interfaces that yield a complete context.
'''
interfaces = []
[interfaces.extend(i.complete_contexts())
for i in six.itervalues(self.templates)]
return interfaces
def get_incomplete_context_data(self, interfaces):
'''
Return dictionary of relation status of interfaces and any missing
required context data. Example:
{'amqp': {'missing_data': ['rabbitmq_password'], 'related': True},
'zeromq-configuration': {'related': False}}
'''
incomplete_context_data = {}
for i in six.itervalues(self.templates):
for context in i.contexts:
for interface in interfaces:
related = False
if interface in context.interfaces:
related = context.get_related()
missing_data = context.missing_data
if missing_data:
incomplete_context_data[interface] = {'missing_data': missing_data}
if related:
if incomplete_context_data.get(interface):
incomplete_context_data[interface].update({'related': True})
else:
incomplete_context_data[interface] = {'related': True}
else:
incomplete_context_data[interface] = {'related': False}
return incomplete_context_data
def complete_contexts(self):
'''
Returns a list of context interfaces that yield a complete context.
'''
interfaces = []
[interfaces.extend(i.complete_contexts())
for i in six.itervalues(self.templates)]
return interfaces
def get_incomplete_context_data(self, interfaces):
'''
Return dictionary of relation status of interfaces and any missing
required context data. Example:
{'amqp': {'missing_data': ['rabbitmq_password'], 'related': True},
'zeromq-configuration': {'related': False}}
'''
incomplete_context_data = {}
for i in six.itervalues(self.templates):
for context in i.contexts:
for interface in interfaces:
related = False
if interface in context.interfaces:
related = context.get_related()
missing_data = context.missing_data
if missing_data:
incomplete_context_data[interface] = {'missing_data': missing_data}
if related:
if incomplete_context_data.get(interface):
incomplete_context_data[interface].update({'related': True})
else:
incomplete_context_data[interface] = {'related': True}
else:
incomplete_context_data[interface] = {'related': False}
return incomplete_context_data
def complete_contexts(self):
'''
Returns a list of context interfaces that yield a complete context.
'''
interfaces = []
[interfaces.extend(i.complete_contexts())
for i in six.itervalues(self.templates)]
return interfaces
def complete_contexts(self):
'''
Returns a list of context interfaces that yield a complete context.
'''
interfaces = []
[interfaces.extend(i.complete_contexts())
for i in six.itervalues(self.templates)]
return interfaces
def get_incomplete_context_data(self, interfaces):
'''
Return dictionary of relation status of interfaces and any missing
required context data. Example:
{'amqp': {'missing_data': ['rabbitmq_password'], 'related': True},
'zeromq-configuration': {'related': False}}
'''
incomplete_context_data = {}
for i in six.itervalues(self.templates):
for context in i.contexts:
for interface in interfaces:
related = False
if interface in context.interfaces:
related = context.get_related()
missing_data = context.missing_data
if missing_data:
incomplete_context_data[interface] = {'missing_data': missing_data}
if related:
if incomplete_context_data.get(interface):
incomplete_context_data[interface].update({'related': True})
else:
incomplete_context_data[interface] = {'related': True}
else:
incomplete_context_data[interface] = {'related': False}
return incomplete_context_data
def complete_contexts(self):
'''
Returns a list of context interfaces that yield a complete context.
'''
interfaces = []
[interfaces.extend(i.complete_contexts())
for i in six.itervalues(self.templates)]
return interfaces
def complete_contexts(self):
'''
Returns a list of context interfaces that yield a complete context.
'''
interfaces = []
[interfaces.extend(i.complete_contexts())
for i in six.itervalues(self.templates)]
return interfaces
def get_incomplete_context_data(self, interfaces):
'''
Return dictionary of relation status of interfaces and any missing
required context data. Example:
{'amqp': {'missing_data': ['rabbitmq_password'], 'related': True},
'zeromq-configuration': {'related': False}}
'''
incomplete_context_data = {}
for i in six.itervalues(self.templates):
for context in i.contexts:
for interface in interfaces:
related = False
if interface in context.interfaces:
related = context.get_related()
missing_data = context.missing_data
if missing_data:
incomplete_context_data[interface] = {'missing_data': missing_data}
if related:
if incomplete_context_data.get(interface):
incomplete_context_data[interface].update({'related': True})
else:
incomplete_context_data[interface] = {'related': True}
else:
incomplete_context_data[interface] = {'related': False}
return incomplete_context_data
def override_worker_setting(self, setting_name, new_value):
old_value = list(values(self.setenv(setting_name, new_value)))[0]
try:
yield
finally:
self.setenv(setting_name, old_value)
def assert_ok_pidbox_response(self, replies):
for reply in values(replies):
if not reply['ok']:
raise RuntimeError(
'Worker remote control command raised: {0!r}'.format(
reply.get('error', reply)))
return replies
def contribute_to_model(self, model):
# type: (type) -> type
[event.connect_model(model) for event in values(self.events) if event]
model.webhooks = self
model.webhook_events = self # XXX remove for Thorn 2.0
return model
def override_worker_setting(self, setting_name, new_value):
old_value = list(values(self.setenv(setting_name, new_value)))[0]
try:
yield
finally:
self.setenv(setting_name, old_value)