def test_relation_clear(self, local_unit,
relation_get,
relation_set):
local_unit.return_value = 'local-unit'
relation_get.return_value = {
'private-address': '10.5.0.1',
'foo': 'bar',
'public-address': '146.192.45.6'
}
hookenv.relation_clear('relation:1')
relation_get.assert_called_with(rid='relation:1',
unit='local-unit')
relation_set.assert_called_with(
relation_id='relation:1',
**{'private-address': '10.5.0.1',
'foo': None,
'public-address': '146.192.45.6'})
python类local_unit()的实例源码
def test_gets_execution_environment(self, os_, relations_get,
relations, relation_id, local_unit,
relation_type, config):
config.return_value = 'some-config'
relation_type.return_value = 'some-type'
local_unit.return_value = 'some-unit'
relation_id.return_value = 'some-id'
relations.return_value = 'all-relations'
relations_get.return_value = 'some-relations'
os_.environ = 'some-environment'
result = hookenv.execution_environment()
self.assertEqual(result, {
'conf': 'some-config',
'reltype': 'some-type',
'unit': 'some-unit',
'relid': 'some-id',
'rel': 'some-relations',
'rels': 'all-relations',
'env': 'some-environment',
})
def test_gets_execution_environment_no_relation(
self, os_, relations_get, relations, relation_id,
local_unit, relation_type, config):
config.return_value = 'some-config'
relation_type.return_value = 'some-type'
local_unit.return_value = 'some-unit'
relation_id.return_value = None
relations.return_value = 'all-relations'
relations_get.return_value = 'some-relations'
os_.environ = 'some-environment'
result = hookenv.execution_environment()
self.assertEqual(result, {
'conf': 'some-config',
'unit': 'some-unit',
'rels': 'all-relations',
'env': 'some-environment',
})
def setUp(self):
del hookenv._atstart[:]
del hookenv._atexit[:]
hookenv.cache.clear()
coordinator.Singleton._instances.clear()
def install(patch):
patch.start()
self.addCleanup(patch.stop)
install(patch.object(hookenv, 'local_unit', return_value='foo/1'))
install(patch.object(hookenv, 'is_leader', return_value=False))
install(patch.object(hookenv, 'metadata',
return_value={'peers': {'cluster': None}}))
install(patch.object(hookenv, 'log'))
# Ensure _timestamp always increases.
install(patch.object(coordinator, '_utcnow',
side_effect=self._utcnow))
def test_acquire_leader(self):
# When acquire() is called by the leader, it needs
# to make a grant decision immediately. It can't defer
# making the decision until a future hook, as no future
# hooks will be triggered.
hookenv.is_leader.return_value = True
c = coordinator.Serial() # Not Base. Test hooks into default_grant.
lock = 'mylock'
unit = hookenv.local_unit()
c.grants = {}
c.requests = {unit: {}}
with patch.object(c, 'default_grant') as default_grant:
default_grant.side_effect = iter([False, True])
self.assertFalse(c.acquire(lock))
ts = c.request_timestamp(lock)
self.assertTrue(c.acquire(lock))
self.assertEqual(ts, c.request_timestamp(lock))
# If it it granted, the leader doesn't make a decision again.
self.assertTrue(c.acquire(lock))
self.assertEqual(ts, c.request_timestamp(lock))
self.assertEqual(default_grant.call_count, 2)
def setUp(self):
def install(*args, **kw):
p = patch.object(*args, **kw)
p.start()
self.addCleanup(p.stop)
install(hookenv, 'relation_types', return_value=['rel', 'pear'])
install(hookenv, 'peer_relation_id', return_value='pear:9')
install(hookenv, 'relation_ids',
side_effect=lambda x: ['{}:{}'.format(x, i)
for i in range(9, 11)])
install(hookenv, 'related_units',
side_effect=lambda x: ['svc_' + x.replace(':', '/')])
install(hookenv, 'local_unit', return_value='foo/1')
install(hookenv, 'relation_get')
install(hookenv, 'relation_set')
# install(hookenv, 'is_leader', return_value=False)
def local_unit_name(self):
"""
@return local unit name
"""
return hookenv.local_unit().replace('/', '-')
def create_sync_src_info_file(self):
"""Touch a file which indicates where this sync file came from
:returns: None
"""
unit_name = hookenv.local_unit().replace('/', '_')
touch_file = '{}/juju-zone-src-{}'.format(ZONE_DIR, unit_name)
open(touch_file, 'w+').close()
def prepare_tls_certificates(tls):
status_set('maintenance', 'Requesting tls certificates.')
common_name = hookenv.unit_public_ip()
sans = []
sans.append(hookenv.unit_public_ip())
sans.append(hookenv.unit_private_ip())
sans.append(socket.gethostname())
certificate_name = hookenv.local_unit().replace('/', '_')
tls.request_server_cert(common_name, sans, certificate_name)
def test_gets_the_local_unit(self, os_):
os_.environ = {
'JUJU_UNIT_NAME': 'foo',
}
self.assertEqual(hookenv.local_unit(), 'foo')
def test_gets_relations(self, relation_get, related_units,
relation_ids, relation_types, local_unit):
local_unit.return_value = 'u0'
relation_types.return_value = ['t1', 't2']
relation_ids.return_value = ['i1']
related_units.return_value = ['u1', 'u2']
relation_get.return_value = {'key': 'val'}
result = hookenv.relations()
self.assertEqual(result, {
't1': {
'i1': {
'u0': {'key': 'val'},
'u1': {'key': 'val'},
'u2': {'key': 'val'},
},
},
't2': {
'i1': {
'u0': {'key': 'val'},
'u1': {'key': 'val'},
'u2': {'key': 'val'},
},
},
})
def test_sets_relation_with_kwargs(self, check_call_, check_output,
local_unit):
hookenv.relation_set(foo="bar")
check_call_.assert_called_with(['relation-set', 'foo=bar'])
def test_sets_relation_with_dict(self, check_call_, check_output,
local_unit):
hookenv.relation_set(relation_settings={"foo": "bar"})
check_call_.assert_called_with(['relation-set', 'foo=bar'])
def test_sets_relation_with_relation_id(self, check_call_, check_output,
local_unit):
hookenv.relation_set(relation_id="foo", bar="baz")
check_call_.assert_called_with(['relation-set', '-r', 'foo',
'bar=baz'])
def test_sets_relation_with_missing_value(self, check_call_, check_output,
local_unit):
hookenv.relation_set(foo=None)
check_call_.assert_called_with(['relation-set', 'foo='])
def test_granted(self):
c = coordinator.BaseCoordinator()
unit = hookenv.local_unit()
lock = 'mylock'
ts = coordinator._timestamp()
c.grants = {}
# Unit makes a request, but it isn't granted
c.requests = {unit: {lock: ts}}
self.assertFalse(c.granted(lock))
# Once the leader has granted the request, all good.
# It does this by mirroring the request timestamp.
c.grants = {unit: {lock: ts}}
self.assertTrue(c.granted(lock))
# The unit releases the lock by removing the request.
c.requests = {unit: {}}
self.assertFalse(c.granted(lock))
# If the unit makes a new request before the leader
# has had a chance to do its housekeeping, the timestamps
# do not match and the lock not considered granted.
ts = coordinator._timestamp()
c.requests = {unit: {lock: ts}}
self.assertFalse(c.granted(lock))
# Until the leader gets around to its duties.
c.grants = {unit: {lock: ts}}
self.assertTrue(c.granted(lock))
def test_requested(self):
c = coordinator.BaseCoordinator()
lock = 'mylock'
c.requests = {hookenv.local_unit(): {}}
c.grants = {}
self.assertFalse(c.requested(lock))
c.acquire(lock)
self.assertTrue(c.requested(lock))
def test_request_timestamp(self):
c = coordinator.BaseCoordinator()
lock = 'mylock'
unit = hookenv.local_unit()
c.requests = {unit: {}}
c.grants = {}
self.assertIsNone(c.request_timestamp(lock))
now = datetime.utcnow()
fmt = coordinator._timestamp_format
c.requests = {hookenv.local_unit(): {lock: now.strftime(fmt)}}
self.assertEqual(c.request_timestamp(lock), now)
def test_load_state(self, leader_get):
c = coordinator.BaseCoordinator()
unit = hookenv.local_unit()
# c.granted is just the leader_get decoded.
leader_get.return_value = '{"json": true}'
c._load_state()
self.assertDictEqual(c.grants, {'json': True})
# With no relid, there is no peer relation so request state
# is pulled from a local stash.
with patch.object(c, '_load_local_state') as loc_state:
loc_state.return_value = {'local': True}
c._load_state()
self.assertDictEqual(c.requests, {unit: {'local': True}})
# With a relid, request details are pulled from the peer relation.
# If there is no data in the peer relation from the local unit,
# we still pull it from the local stash as it means this is the
# first time we have joined.
c.relid = 'cluster:1'
with patch.object(c, '_load_local_state') as loc_state, \
patch.object(c, '_load_peer_state') as peer_state:
loc_state.return_value = {'local': True}
peer_state.return_value = {'foo/2': {'mylock': 'whatever'}}
c._load_state()
self.assertDictEqual(c.requests, {unit: {'local': True},
'foo/2': {'mylock': 'whatever'}})
# If there are local details in the peer relation, the local
# stash is ignored.
with patch.object(c, '_load_local_state') as loc_state, \
patch.object(c, '_load_peer_state') as peer_state:
loc_state.return_value = {'local': True}
peer_state.return_value = {unit: {},
'foo/2': {'mylock': 'whatever'}}
c._load_state()
self.assertDictEqual(c.requests, {unit: {},
'foo/2': {'mylock': 'whatever'}})
def test_save_state(self, leader_set, relation_set):
c = coordinator.BaseCoordinator()
unit = hookenv.local_unit()
c.grants = {'directdump': True}
c.requests = {unit: 'data1', 'foo/2': 'data2'}
# grants is dumped to leadership settings, if the unit is leader.
with patch.object(c, '_save_local_state') as save_loc:
c._save_state()
self.assertFalse(leader_set.called)
hookenv.is_leader.return_value = True
c._save_state()
leader_set.assert_called_once_with({c.key: '{"directdump": true}'})
# If there is no relation id, the local units requests is dumped
# to a local stash.
with patch.object(c, '_save_local_state') as save_loc:
c._save_state()
save_loc.assert_called_once_with('data1')
# If there is a relation id, the local units requests is dumped
# to the peer relation.
with patch.object(c, '_save_local_state') as save_loc:
c.relid = 'cluster:1'
c._save_state()
self.assertFalse(save_loc.called)
relation_set.assert_called_once_with(
c.relid, relation_settings={c.key: '"data1"'}) # JSON encoded
def test_release_granted(self):
c = coordinator.BaseCoordinator()
unit = hookenv.local_unit()
c.requests = {unit: {'lock1': sentinel.ts, 'lock2': sentinel.ts},
'foo/2': {'lock1': sentinel.ts}}
c.grants = {unit: {'lock1': sentinel.ts},
'foo/2': {'lock1': sentinel.ts}}
# The granted lock for the local unit is released.
c._release_granted()
self.assertDictEqual(c.requests, {unit: {'lock2': sentinel.ts},
'foo/2': {'lock1': sentinel.ts}})
def test_relation(self):
rel = context.Relations()['rel']['rel:9']
self.assertEqual(rel.relid, 'rel:9')
self.assertEqual(rel.relname, 'rel')
self.assertEqual(rel.service, 'svc_rel')
self.assertTrue(isinstance(rel.local, context.RelationInfo))
self.assertEqual(rel.local.unit, hookenv.local_unit())
self.assertTrue(isinstance(rel.peers, context.OrderedDict))
self.assertTrue(len(rel.peers), 2)
self.assertTrue(isinstance(rel.peers['svc_pear/9'],
context.RelationInfo))
# I use this in my log messages. Relation id for identity
# plus service name for ease of reference.
self.assertEqual(str(rel), 'rel:9 (svc_rel)')
def test_relationinfo_local(self):
r = context.RelationInfo('rel:10', hookenv.local_unit())
# Updates work, with standard strings.
r[sentinel.key] = 'value'
hookenv.relation_set.assert_called_once_with(
'rel:10', {sentinel.key: 'value'})
# Python 2 unicode strings work too.
hookenv.relation_set.reset_mock()
r[sentinel.key] = six.u('value')
hookenv.relation_set.assert_called_once_with(
'rel:10', {sentinel.key: six.u('value')})
# Byte strings fail under Python 3.
if six.PY3:
with self.assertRaises(ValueError):
r[sentinel.key] = six.b('value')
# Deletes work
del r[sentinel.key]
hookenv.relation_set.assert_called_with('rel:10', {sentinel.key: None})
# Attempting to write a non-string fails
with self.assertRaises(ValueError):
r[sentinel.key] = 42
def granted(self, lock):
'''Return True if a previously requested lock has been granted'''
unit = hookenv.local_unit()
ts = self.requests[unit].get(lock)
if ts and self.grants.get(unit, {}).get(lock) == ts:
return True
return False
def requested(self, lock):
'''Return True if we are in the queue for the lock'''
return lock in self.requests[hookenv.local_unit()]
def request_timestamp(self, lock):
'''Return the timestamp of our outstanding request for lock, or None.
Returns a datetime.datetime() UTC timestamp, with no tzinfo attribute.
'''
ts = self.requests[hookenv.local_unit()].get(lock, None)
if ts is not None:
return datetime.strptime(ts, _timestamp_format)
def _load_state(self):
self.msg('Loading state'.format(self._name()))
# All responses must be stored in the leadership settings.
# The leader cannot use local state, as a different unit may
# be leader next time. Which is fine, as the leadership
# settings are always available.
self.grants = json.loads(hookenv.leader_get(self.key) or '{}')
local_unit = hookenv.local_unit()
# All requests must be stored on the peers relation. This is
# the only channel units have to communicate with the leader.
# Even the leader needs to store its requests here, as a
# different unit may be leader by the time the request can be
# granted.
if self.relid is None:
# The peers relation is not available. Maybe we are early in
# the units's lifecycle. Maybe this unit is standalone.
# Fallback to using local state.
self.msg('No peer relation. Loading local state')
self.requests = {local_unit: self._load_local_state()}
else:
self.requests = self._load_peer_state()
if local_unit not in self.requests:
# The peers relation has just been joined. Update any state
# loaded from our peers with our local state.
self.msg('New peer relation. Merging local state')
self.requests[local_unit] = self._load_local_state()
def _emit_state(self):
# Emit this units lock status.
for lock in sorted(self.requests[hookenv.local_unit()].keys()):
if self.granted(lock):
self.msg('Granted {}'.format(lock))
else:
self.msg('Waiting on {}'.format(lock))
def _load_peer_state(self):
requests = {}
units = set(hookenv.related_units(self.relid))
units.add(hookenv.local_unit())
for unit in units:
raw = hookenv.relation_get(self.key, unit, self.relid)
if raw:
requests[unit] = json.loads(raw)
return requests
def _release_granted(self):
# At the end of every hook, release all locks granted to
# this unit. If a hook neglects to make use of what it
# requested, it will just have to make the request again.
# Implicit release is the only way this will work, as
# if the unit is standalone there may be no future triggers
# called to do a manual release.
unit = hookenv.local_unit()
for lock in list(self.requests[unit].keys()):
if self.granted(lock):
self.msg('Released local {} lock'.format(lock))
del self.requests[unit][lock]