def get_ceph_pools(self, sentry_unit):
"""Return a dict of ceph pools from a single ceph unit, with
pool name as keys, pool id as vals."""
pools = {}
cmd = 'sudo ceph osd lspools'
output, code = sentry_unit.run(cmd)
if code != 0:
msg = ('{} `{}` returned {} '
'{}'.format(sentry_unit.info['unit_name'],
cmd, code, output))
amulet.raise_status(amulet.FAIL, msg=msg)
# Example output: 0 data,1 metadata,2 rbd,3 cinder,4 glance,
for pool in str(output).split(','):
pool_id_name = pool.split(' ')
if len(pool_id_name) == 2:
pool_id = pool_id_name[0]
pool_name = pool_id_name[1]
pools[pool_name] = int(pool_id)
self.log.debug('Pools on {}: {}'.format(sentry_unit.info['unit_name'],
pools))
return pools
python类raise_status()的实例源码
def get_process_id_list(self, sentry_unit, process_name,
expect_success=True):
"""Get a list of process ID(s) from a single sentry juju unit
for a single process name.
:param sentry_unit: Amulet sentry instance (juju unit)
:param process_name: Process name
:param expect_success: If False, expect the PID to be missing,
raise if it is present.
:returns: List of process IDs
"""
cmd = 'pidof -x {}'.format(process_name)
if not expect_success:
cmd += " || exit 0 && exit 1"
output, code = sentry_unit.run(cmd)
if code != 0:
msg = ('{} `{}` returned {} '
'{}'.format(sentry_unit.info['unit_name'],
cmd, code, output))
amulet.raise_status(amulet.FAIL, msg=msg)
return str(output).split()
def get_ceph_pools(self, sentry_unit):
"""Return a dict of ceph pools from a single ceph unit, with
pool name as keys, pool id as vals."""
pools = {}
cmd = 'sudo ceph osd lspools'
output, code = sentry_unit.run(cmd)
if code != 0:
msg = ('{} `{}` returned {} '
'{}'.format(sentry_unit.info['unit_name'],
cmd, code, output))
amulet.raise_status(amulet.FAIL, msg=msg)
# Example output: 0 data,1 metadata,2 rbd,3 cinder,4 glance,
for pool in str(output).split(','):
pool_id_name = pool.split(' ')
if len(pool_id_name) == 2:
pool_id = pool_id_name[0]
pool_name = pool_id_name[1]
pools[pool_name] = int(pool_id)
self.log.debug('Pools on {}: {}'.format(sentry_unit.info['unit_name'],
pools))
return pools
def get_process_id_list(self, sentry_unit, process_name,
expect_success=True):
"""Get a list of process ID(s) from a single sentry juju unit
for a single process name.
:param sentry_unit: Amulet sentry instance (juju unit)
:param process_name: Process name
:param expect_success: If False, expect the PID to be missing,
raise if it is present.
:returns: List of process IDs
"""
cmd = 'pidof -x "{}"'.format(process_name)
if not expect_success:
cmd += " || exit 0 && exit 1"
output, code = sentry_unit.run(cmd)
if code != 0:
msg = ('{} `{}` returned {} '
'{}'.format(sentry_unit.info['unit_name'],
cmd, code, output))
amulet.raise_status(amulet.FAIL, msg=msg)
return str(output).split()
def test_106_swift_object_store_endpoint(self):
"""Verify the swift object-store endpoint data."""
u.log.debug('Checking keystone endpoint for swift object store...')
endpoints = self.keystone.endpoints.list()
admin_port = internal_port = public_port = '8080'
expected = {'id': u.not_null,
'region': 'RegionOne',
'adminurl': u.valid_url,
'internalurl': u.valid_url,
'publicurl': u.valid_url,
'service_id': u.not_null}
ret = u.validate_endpoint_data(endpoints, admin_port, internal_port,
public_port, expected)
if ret:
message = 'object-store endpoint: {}'.format(ret)
amulet.raise_status(amulet.FAIL, msg=message)
def test_200_swift_proxy_identity_service_relation(self):
"""Verify the swift-proxy to keystone identity relation data."""
u.log.debug('Checking swift-proxy:keystone identity relation...')
unit = self.swift_proxy_sentry
relation = ['identity-service', 'keystone:identity-service']
expected = {
'swift_service': 'swift',
'swift_region': 'RegionOne',
'swift_public_url': u.valid_url,
'swift_internal_url': u.valid_url,
'swift_admin_url': u.valid_url,
's3_service': 's3',
's3_region': 'RegionOne',
's3_public_url': u.valid_url,
's3_internal_url': u.valid_url,
's3_admin_url': u.valid_url,
'private-address': u.valid_ip,
}
ret = u.validate_relation_data(unit, relation, expected)
if ret:
message = u.relation_error('swift-proxy identity-service', ret)
amulet.raise_status(amulet.FAIL, msg=message)
def test_202_keystone_identity_service_relation(self):
"""Verify the keystone to swift-proxy identity relation data."""
u.log.debug('Checking keystone:swift-proxy identity relation...')
unit = self.keystone_sentry
relation = ['identity-service', 'swift-proxy:identity-service']
expected = {
'service_protocol': 'http',
'service_tenant': 'services',
'admin_token': 'ubuntutesting',
'service_password': u.not_null,
'service_port': '5000',
'auth_port': '35357',
'auth_protocol': 'http',
'private-address': u.valid_ip,
'auth_host': u.valid_ip,
'service_username': 's3_swift',
'service_tenant_id': u.not_null,
'service_host': u.valid_ip
}
ret = u.validate_relation_data(unit, relation, expected)
if ret:
message = u.relation_error('keystone identity-service', ret)
amulet.raise_status(amulet.FAIL, msg=message)
def test_204_swift_storage_swift_storage_relation(self):
"""Verify the swift-storage to swift-proxy swift-storage relation
data."""
u.log.debug('Checking swift:swift-proxy swift-storage relation...')
unit = self.swift_storage_sentry
relation = ['swift-storage', 'swift-proxy:swift-storage']
expected = {
'account_port': '6002',
'zone': '1',
'object_port': '6000',
'container_port': '6001',
'private-address': u.valid_ip,
'device': 'vdb'
}
ret = u.validate_relation_data(unit, relation, expected)
if ret:
message = u.relation_error('swift-storage swift-storage', ret)
amulet.raise_status(amulet.FAIL, msg=message)
def test_206_swift_proxy_swift_storage_relation(self):
"""Verify the swift-proxy to swift-storage swift-storage relation
data."""
u.log.debug('Checking swift-proxy:swift swift-storage relation...')
unit = self.swift_proxy_sentry
relation = ['swift-storage', 'swift-storage:swift-storage']
expected = {
'private-address': u.valid_ip,
'trigger': u.not_null,
'rings_url': u.valid_url,
'swift_hash': u.not_null
}
ret = u.validate_relation_data(unit, relation, expected)
if ret:
message = u.relation_error('swift-proxy swift-storage', ret)
amulet.raise_status(amulet.FAIL, msg=message)
def test_300_swift_config(self):
"""Verify the data in the swift-hash section of the swift config
file."""
u.log.debug('Checking swift config...')
unit = self.swift_storage_sentry
conf = '/etc/swift/swift.conf'
swift_proxy_relation = self.swift_proxy_sentry.relation(
'swift-storage', 'swift-storage:swift-storage')
expected = {
'swift_hash_path_suffix': swift_proxy_relation['swift_hash']
}
ret = u.validate_config_data(unit, conf, 'swift-hash', expected)
if ret:
message = "swift config error: {}".format(ret)
amulet.raise_status(amulet.FAIL, msg=message)
def _test_pause(self):
u.log.info("Testing pause action")
self._assert_services(should_run=True)
pause_action_id = u.run_action(self.swift_proxy_sentry, "pause")
assert u.wait_on_action(pause_action_id), "Pause action failed."
self._assert_services(should_run=False)
status, message = u.status_get(self.swift_proxy_sentry)
if status != "maintenance":
msg = ("Pause action failed to move unit to maintenance "
"status (got {} instead)".format(status))
amulet.raise_status(amulet.FAIL, msg=msg)
if message != "Paused. Use 'resume' action to resume normal service.":
msg = ("Pause action failed to set message"
" (got {} instead)".format(message))
amulet.raise_status(amulet.FAIL, msg=msg)
def _test_resume(self):
u.log.info("Testing resume action")
# service is left paused by _test_pause
self._assert_services(should_run=False)
resume_action_id = u.run_action(self.swift_proxy_sentry, "resume")
assert u.wait_on_action(resume_action_id), "Resume action failed."
self._assert_services(should_run=True)
status, message = u.status_get(self.swift_proxy_sentry)
if status != "active":
msg = ("Resume action failed to move unit to active "
"status (got {} instead)".format(status))
amulet.raise_status(amulet.FAIL, msg=msg)
if message != "Unit is ready":
msg = ("Resume action failed to clear message"
" (got {} instead)".format(message))
amulet.raise_status(amulet.FAIL, msg=msg)
def get_ceph_pools(self, sentry_unit):
"""Return a dict of ceph pools from a single ceph unit, with
pool name as keys, pool id as vals."""
pools = {}
cmd = 'sudo ceph osd lspools'
output, code = sentry_unit.run(cmd)
if code != 0:
msg = ('{} `{}` returned {} '
'{}'.format(sentry_unit.info['unit_name'],
cmd, code, output))
amulet.raise_status(amulet.FAIL, msg=msg)
# Example output: 0 data,1 metadata,2 rbd,3 cinder,4 glance,
for pool in str(output).split(','):
pool_id_name = pool.split(' ')
if len(pool_id_name) == 2:
pool_id = pool_id_name[0]
pool_name = pool_id_name[1]
pools[pool_name] = int(pool_id)
self.log.debug('Pools on {}: {}'.format(sentry_unit.info['unit_name'],
pools))
return pools
def get_ceph_pools(self, sentry_unit):
"""Return a dict of ceph pools from a single ceph unit, with
pool name as keys, pool id as vals."""
pools = {}
cmd = 'sudo ceph osd lspools'
output, code = sentry_unit.run(cmd)
if code != 0:
msg = ('{} `{}` returned {} '
'{}'.format(sentry_unit.info['unit_name'],
cmd, code, output))
amulet.raise_status(amulet.FAIL, msg=msg)
# Example output: 0 data,1 metadata,2 rbd,3 cinder,4 glance,
for pool in str(output).split(','):
pool_id_name = pool.split(' ')
if len(pool_id_name) == 2:
pool_id = pool_id_name[0]
pool_name = pool_id_name[1]
pools[pool_name] = int(pool_id)
self.log.debug('Pools on {}: {}'.format(sentry_unit.info['unit_name'],
pools))
return pools
def get_process_id_list(self, sentry_unit, process_name,
expect_success=True):
"""Get a list of process ID(s) from a single sentry juju unit
for a single process name.
:param sentry_unit: Amulet sentry instance (juju unit)
:param process_name: Process name
:param expect_success: If False, expect the PID to be missing,
raise if it is present.
:returns: List of process IDs
"""
cmd = 'pidof -x "{}"'.format(process_name)
if not expect_success:
cmd += " || exit 0 && exit 1"
output, code = sentry_unit.run(cmd)
if code != 0:
msg = ('{} `{}` returned {} '
'{}'.format(sentry_unit.info['unit_name'],
cmd, code, output))
amulet.raise_status(amulet.FAIL, msg=msg)
return str(output).split()
def _image_create(self):
"""Create an image to be used by the heat template, verify it exists"""
u.log.debug('Creating glance image ({})...'.format(IMAGE_NAME))
# Create a new image
image_new = u.create_cirros_image(self.glance, IMAGE_NAME)
# Confirm image is created and has status of 'active'
if not image_new:
message = 'glance image create failed'
amulet.raise_status(amulet.FAIL, msg=message)
# Verify new image name
images_list = list(self.glance.images.list())
if images_list[0].name != IMAGE_NAME:
message = ('glance image create failed or unexpected '
'image name {}'.format(images_list[0].name))
amulet.raise_status(amulet.FAIL, msg=message)
def test_110_service_catalog(self):
"""Expect certain endpoints and endpoint data to be
present in the Keystone service catalog"""
u.log.debug('Checking service catalog endpoint data...')
ep_validate = {'adminURL': u.valid_url,
'region': 'RegionOne',
'publicURL': u.valid_url,
'internalURL': u.valid_url,
'id': u.not_null}
expected = {'compute': [ep_validate], 'orchestration': [ep_validate],
'image': [ep_validate], 'identity': [ep_validate]}
actual = self.keystone.service_catalog.get_endpoints()
ret = u.validate_svc_catalog_endpoint_data(expected, actual)
if ret:
amulet.raise_status(amulet.FAIL, msg=ret)
def test_120_heat_endpoint(self):
"""Verify the heat api endpoint data."""
u.log.debug('Checking api endpoint data...')
endpoints = self.keystone.endpoints.list()
if self._get_openstack_release() < self.trusty_kilo:
# Before Kilo
admin_port = internal_port = public_port = '3333'
else:
# Kilo and later
admin_port = internal_port = public_port = '8004'
expected = {'id': u.not_null,
'region': 'RegionOne',
'adminurl': u.valid_url,
'internalurl': u.valid_url,
'publicurl': u.valid_url,
'service_id': u.not_null}
ret = u.validate_endpoint_data(endpoints, admin_port, internal_port,
public_port, expected)
if ret:
message = 'heat endpoint: {}'.format(ret)
amulet.raise_status(amulet.FAIL, msg=message)
def test_200_heat_mysql_shared_db_relation(self):
"""Verify the heat:mysql shared-db relation data"""
u.log.debug('Checking heat:mysql shared-db relation data...')
unit = self.heat_sentry
relation = ['shared-db', 'percona-cluster:shared-db']
expected = {
'private-address': u.valid_ip,
'heat_database': 'heat',
'heat_username': 'heat',
'heat_hostname': u.valid_ip
}
ret = u.validate_relation_data(unit, relation, expected)
if ret:
message = u.relation_error('heat:mysql shared-db', ret)
amulet.raise_status(amulet.FAIL, msg=message)
def test_201_mysql_heat_shared_db_relation(self):
"""Verify the mysql:heat shared-db relation data"""
u.log.debug('Checking mysql:heat shared-db relation data...')
unit = self.pxc_sentry
relation = ['shared-db', 'heat:shared-db']
expected = {
'private-address': u.valid_ip,
'db_host': u.valid_ip,
'heat_allowed_units': u.not_null,
'heat_password': u.not_null
}
ret = u.validate_relation_data(unit, relation, expected)
if ret:
message = u.relation_error('percona-cluster:heat shared-db', ret)
amulet.raise_status(amulet.FAIL, msg=message)
def test_203_keystone_heat_identity_relation(self):
"""Verify the keystone:heat identity-service relation data"""
u.log.debug('Checking keystone:heat identity-service relation data...')
unit = self.keystone_sentry
relation = ['identity-service', 'heat:identity-service']
expected = {
'service_protocol': 'http',
'service_tenant': 'services',
'admin_token': 'ubuntutesting',
'service_password': u.not_null,
'service_port': '5000',
'auth_port': '35357',
'auth_protocol': 'http',
'private-address': u.valid_ip,
'auth_host': u.valid_ip,
'service_username': 'heat-cfn_heat',
'service_tenant_id': u.not_null,
'service_host': u.valid_ip
}
ret = u.validate_relation_data(unit, relation, expected)
if ret:
message = u.relation_error('keystone:heat identity-service', ret)
amulet.raise_status(amulet.FAIL, msg=message)
def test_500_auth_encryption_key_same_on_units(self):
"""Test that the auth_encryption_key in heat.conf is the same on all of
the units.
"""
u.log.debug("Checking the 'auth_encryption_key' is the same on "
"all units.")
output, ret = self._run_arbitrary(
"--application heat "
"--format json "
"grep auth_encryption_key /etc/heat/heat.conf")
if ret:
msg = "juju run returned error: ({}) -> {}".format(ret, output)
amulet.raise_status(amulet.FAIL, msg=msg)
output = json.loads(output)
keys = {}
for r in output:
k = r['Stdout'].split('=')[1].strip()
keys[r['UnitId']] = k
# see if keys are different.
ks = keys.values()
if any(((k != ks[0]) for k in ks[1:])):
msg = ("'auth_encryption_key' is not identical on every unit: {}"
.format("{}={}".format(k, v) for k, v in keys.items()))
amulet.raise_status(amulet.FAIL, msg=msg)
def get_ceph_pools(self, sentry_unit):
"""Return a dict of ceph pools from a single ceph unit, with
pool name as keys, pool id as vals."""
pools = {}
cmd = 'sudo ceph osd lspools'
output, code = sentry_unit.run(cmd)
if code != 0:
msg = ('{} `{}` returned {} '
'{}'.format(sentry_unit.info['unit_name'],
cmd, code, output))
amulet.raise_status(amulet.FAIL, msg=msg)
# Example output: 0 data,1 metadata,2 rbd,3 cinder,4 glance,
for pool in str(output).split(','):
pool_id_name = pool.split(' ')
if len(pool_id_name) == 2:
pool_id = pool_id_name[0]
pool_name = pool_id_name[1]
pools[pool_name] = int(pool_id)
self.log.debug('Pools on {}: {}'.format(sentry_unit.info['unit_name'],
pools))
return pools
def get_ceph_pools(self, sentry_unit):
"""Return a dict of ceph pools from a single ceph unit, with
pool name as keys, pool id as vals."""
pools = {}
cmd = 'sudo ceph osd lspools'
output, code = sentry_unit.run(cmd)
if code != 0:
msg = ('{} `{}` returned {} '
'{}'.format(sentry_unit.info['unit_name'],
cmd, code, output))
amulet.raise_status(amulet.FAIL, msg=msg)
# Example output: 0 data,1 metadata,2 rbd,3 cinder,4 glance,
for pool in str(output).split(','):
pool_id_name = pool.split(' ')
if len(pool_id_name) == 2:
pool_id = pool_id_name[0]
pool_name = pool_id_name[1]
pools[pool_name] = int(pool_id)
self.log.debug('Pools on {}: {}'.format(sentry_unit.info['unit_name'],
pools))
return pools
def get_ceph_pools(self, sentry_unit):
"""Return a dict of ceph pools from a single ceph unit, with
pool name as keys, pool id as vals."""
pools = {}
cmd = 'sudo ceph osd lspools'
output, code = sentry_unit.run(cmd)
if code != 0:
msg = ('{} `{}` returned {} '
'{}'.format(sentry_unit.info['unit_name'],
cmd, code, output))
amulet.raise_status(amulet.FAIL, msg=msg)
# Example output: 0 data,1 metadata,2 rbd,3 cinder,4 glance,
for pool in str(output).split(','):
pool_id_name = pool.split(' ')
if len(pool_id_name) == 2:
pool_id = pool_id_name[0]
pool_name = pool_id_name[1]
pools[pool_name] = int(pool_id)
self.log.debug('Pools on {}: {}'.format(sentry_unit.info['unit_name'],
pools))
return pools
def get_ceph_pools(self, sentry_unit):
"""Return a dict of ceph pools from a single ceph unit, with
pool name as keys, pool id as vals."""
pools = {}
cmd = 'sudo ceph osd lspools'
output, code = sentry_unit.run(cmd)
if code != 0:
msg = ('{} `{}` returned {} '
'{}'.format(sentry_unit.info['unit_name'],
cmd, code, output))
amulet.raise_status(amulet.FAIL, msg=msg)
# Example output: 0 data,1 metadata,2 rbd,3 cinder,4 glance,
for pool in str(output).split(','):
pool_id_name = pool.split(' ')
if len(pool_id_name) == 2:
pool_id = pool_id_name[0]
pool_name = pool_id_name[1]
pools[pool_name] = int(pool_id)
self.log.debug('Pools on {}: {}'.format(sentry_unit.info['unit_name'],
pools))
return pools
def get_process_id_list(self, sentry_unit, process_name,
expect_success=True):
"""Get a list of process ID(s) from a single sentry juju unit
for a single process name.
:param sentry_unit: Amulet sentry instance (juju unit)
:param process_name: Process name
:param expect_success: If False, expect the PID to be missing,
raise if it is present.
:returns: List of process IDs
"""
cmd = 'pidof -x "{}"'.format(process_name)
if not expect_success:
cmd += " || exit 0 && exit 1"
output, code = sentry_unit.run(cmd)
if code != 0:
msg = ('{} `{}` returned {} '
'{}'.format(sentry_unit.info['unit_name'],
cmd, code, output))
amulet.raise_status(amulet.FAIL, msg=msg)
return str(output).split()
def validate_sectionless_conf(self, file_contents, expected):
"""A crude conf parser. Useful to inspect configuration files which
do not have section headers (as would be necessary in order to use
the configparser). Such as openstack-dashboard or rabbitmq confs."""
for line in file_contents.split('\n'):
if '=' in line:
args = line.split('=')
if len(args) <= 1:
continue
key = args[0].strip()
value = args[1].strip()
if key in expected.keys():
if expected[key] != value:
msg = ('Config mismatch. Expected, actual: {}, '
'{}'.format(expected[key], value))
amulet.raise_status(amulet.FAIL, msg=msg)
def validate_keystone_tenants(self, client):
"""Verify all existing tenants."""
u.log.debug('Checking keystone tenants...')
expected = [
{'name': 'services',
'enabled': True,
'description': 'Created by Juju',
'id': u.not_null},
{'name': 'demoTenant',
'enabled': True,
'description': 'demo tenant',
'id': u.not_null},
{'name': 'admin',
'enabled': True,
'description': 'Created by Juju',
'id': u.not_null}
]
if self.keystone_api_version == 2:
actual = client.tenants.list()
else:
actual = client.projects.list()
ret = u.validate_tenant_data(expected, actual)
if ret:
amulet.raise_status(amulet.FAIL, msg=ret)
def test_140_keystone_endpoint(self):
"""Verify the keystone endpoint data."""
u.log.debug('Checking keystone api endpoint data...')
endpoints = self.keystone_v2.endpoints.list()
admin_port = '35357'
internal_port = public_port = '5000'
expected = {
'id': u.not_null,
'region': 'RegionOne',
'adminurl': u.valid_url,
'internalurl': u.valid_url,
'publicurl': u.valid_url,
'service_id': u.not_null
}
ret = u.validate_endpoint_data(endpoints, admin_port, internal_port,
public_port, expected)
if ret:
amulet.raise_status(amulet.FAIL,
msg='keystone endpoint: {}'.format(ret))