def check_quality(result):
"""Run quality tests on the given generated output."""
for dirpath, _dirnames, filenames in os.walk(str(result.project)):
pylintrc = str(result.project.join('pylintrc'))
for filename in filenames:
name = os.path.join(dirpath, filename)
if not name.endswith('.py'):
continue
try:
sh.pylint(name, rcfile=pylintrc)
sh.pylint(name, py3k=True)
sh.pycodestyle(name)
if filename != 'setup.py':
sh.pydocstyle(name)
sh.isort(name, check_only=True)
except sh.ErrorReturnCode as exc:
pytest.fail(str(exc))
tox_ini = result.project.join('tox.ini')
docs_build_dir = result.project.join('docs/_build')
try:
# Sanity check the generated Makefile
sh.make('help')
# quality check docs
sh.doc8(result.project.join("README.rst"), ignore_path=docs_build_dir, config=tox_ini)
sh.doc8(result.project.join("docs"), ignore_path=docs_build_dir, config=tox_ini)
except sh.ErrorReturnCode as exc:
pytest.fail(str(exc))
python类fail()的实例源码
def test_all_iam_templates(template_name):
"""Verify all IAM templates render as proper JSON."""
*_, service_json = template_name.split('/')
service, *_ = service_json.split('.')
items = ['resource1', 'resource2']
if service == 'rds-db':
items = {
'resource1': 'user1',
'resource2': 'user2',
}
try:
rendered = render_policy_template(
account_number='',
app='coreforrest',
env='dev',
group='forrest',
items=items,
pipeline_settings={
'lambda': {
'vpc_enabled': False,
},
},
region='us-east-1',
service=service)
except json.decoder.JSONDecodeError:
pytest.fail('Bad template: {0}'.format(template_name), pytrace=False)
assert isinstance(rendered, list)
def _release(self, name):
attr = getattr(self, '_orig_' + name)
setattr(subprocess, name, attr)
delattr(self, '_orig_' + name)
responder = self.responders[name]
if not responder.is_satisfied:
pytest.fail('Unsatisfied responder: %s' % responder)
def test_ufodiff_delta_class_instantiation_commit():
try:
deltaobj = Delta('.', [], is_commit_test=True, commit_number='2')
assert deltaobj.is_commit_test is True
assert deltaobj.is_branch_test is False
assert deltaobj.commit_number == "2"
assert deltaobj.compare_branch_name is None
assert len(deltaobj.ufo_directory_list) == 0
except Exception as e:
pytest.fail(e)
def test_ufodiff_delta_class_instantiation_branch():
make_testing_branch()
try:
deltaobj = Delta('.', [], is_branch_test=True, compare_branch_name='testing_branch')
assert deltaobj.is_commit_test is False
assert deltaobj.is_branch_test is True
assert deltaobj.compare_branch_name == "testing_branch"
assert deltaobj.commit_number == "0"
assert len(deltaobj.ufo_directory_list) == 0
except Exception as e:
delete_testing_branch()
pytest.fail(e)
delete_testing_branch()
def test_ufodiff_delta_class_instantiation_commit_with_ufo_filter():
try:
deltaobj = Delta('.', ['Font-Regular.ufo'], is_commit_test=True, commit_number='2')
assert deltaobj.is_commit_test is True
assert deltaobj.is_branch_test is False
assert deltaobj.commit_number == "2"
assert deltaobj.compare_branch_name is None
assert len(deltaobj.ufo_directory_list) == 1
assert deltaobj.ufo_directory_list[0] == "Font-Regular.ufo"
except Exception as e:
pytest.fail(e)
def error_on_ResourceWarning():
"""This fixture captures ResourceWarning's and reports an "error"
describing the file handles left open.
This is shown regardless of how successful the test was, if a test fails
and leaves files open then those files will be reported. Ideally, even
those files should be closed properly after a test failure or exception.
Since only Python 3 and PyPy3 have ResourceWarning's, this context will
have no effect when running tests on Python 2 or PyPy.
Because of autouse=True, this function will be automatically enabled for
all test_* functions in this module.
This code is primarily based on the examples found here:
https://stackoverflow.com/questions/24717027/convert-python-3-resourcewarnings-into-exception
"""
try:
ResourceWarning
except NameError:
# Python 2, PyPy
yield
return
# Python 3, PyPy3
with warnings.catch_warnings(record=True) as caught:
warnings.resetwarnings() # clear all filters
warnings.simplefilter('ignore') # ignore all
warnings.simplefilter('always', ResourceWarning) # add filter
yield # run tests in this context
gc.collect() # run garbage collection (for pypy3)
if not caught:
return
pytest.fail('The following file descriptors were not closed properly:\n' +
'\n'.join((str(warning.message) for warning in caught)),
pytrace=False)
def yocto_id_from_ext4(self, filename):
try:
cmd = "debugfs -R 'cat %s' %s| sed -n 's/^%s=//p'" % (self.artifact_info_file, filename, self.artifact_prefix)
output = subprocess.check_output(cmd, shell=True).strip()
logger.info("Running: " + cmd + " returned: " + output)
return output
except subprocess.CalledProcessError:
pytest.fail("Unable to read: %s, is it a broken image?" % (filename))
except Exception, e:
pytest.fail("Unexpected error trying to read ext4 image: %s, error: %s" % (filename, str(e)))
def artifact_id_randomize(self, install_image, device_type="vexpress-qemu", specific_image_id=None):
if specific_image_id:
imageid = specific_image_id
else:
imageid = "mender-%s" % str(random.randint(0, 99999999))
config_file = r"""%s=%s""" % (self.artifact_prefix, imageid)
tfile = tempfile.NamedTemporaryFile(delete=False)
tfile.write(config_file)
tfile.close()
try:
cmd = "debugfs -w -R 'rm %s' %s" % (self.artifact_info_file, install_image)
logger.info("Running: " + cmd)
output = subprocess.check_output(cmd, shell=True).strip()
logger.info("Returned: " + output)
cmd = ("printf 'cd %s\nwrite %s %s\n' | debugfs -w %s"
% (os.path.dirname(self.artifact_info_file),
tfile.name, os.path.basename(self.artifact_info_file), install_image))
logger.info("Running: " + cmd)
output = subprocess.check_output(cmd, shell=True).strip()
logger.info("Returned: " + output)
except subprocess.CalledProcessError:
pytest.fail("Trying to modify ext4 image failed, probably because it's not a valid image.")
except Exception, e:
pytest.fail("Unexpted error trying to modify ext4 image: %s, error: %s" % (install_image, str(e)))
finally:
os.remove(tfile.name)
return imageid
def verify_reboot_performed(self, max_wait=60*30):
logger.info("waiting for system to reboot")
successful_connections = 0
timeout = time.time() + max_wait
while time.time() <= timeout:
try:
with settings(warn_only=True, abort_exception=FabricFatalException):
time.sleep(1)
if exists(self.tfile):
logger.debug("temp. file still exists, device hasn't rebooted.")
continue
else:
logger.debug("temp. file no longer exists, device has rebooted.")
successful_connections += 1
# try connecting 10 times before returning
if successful_connections <= 9:
continue
return
except (BaseException):
logger.debug("system exit was caught, this is probably because SSH connectivity is broken while the system is rebooting")
continue
if time.time() > timeout:
pytest.fail("Device never rebooted!")
def common_update_procedure(install_image,
regenerate_image_id=True,
device_type="vexpress-qemu",
broken_image=False,
verify_status=True,
signed=False,
devices=None,
scripts=[]):
with artifact_lock:
if broken_image:
artifact_id = "broken_image_" + str(random.randint(0, 999999))
elif regenerate_image_id:
artifact_id = Helpers.artifact_id_randomize(install_image)
logger.debug("randomized image id: " + artifact_id)
else:
artifact_id = Helpers.yocto_id_from_ext4(install_image)
# create atrifact
with tempfile.NamedTemporaryFile() as artifact_file:
created_artifact = image.make_artifact(install_image, device_type, artifact_id, artifact_file, signed=signed, scripts=scripts)
if created_artifact:
deploy.upload_image(created_artifact)
if devices is None:
devices = list(set([device["device_id"] for device in adm.get_devices_status("accepted")]))
deployment_id = deploy.trigger_deployment(name="New valid update",
artifact_name=artifact_id,
devices=devices)
else:
logger.warn("failed to create artifact")
pytest.fail("error creating artifact")
# wait until deployment is in correct state
if verify_status:
deploy.check_expected_status("inprogress", deployment_id)
return deployment_id, artifact_id
def test_ssl_only(self):
""" make sure we are not exposing any non-ssl connections in production environment """
done = False
sleep_time = 2
# start production environment
subprocess.call(["./production_test_env.py", "--start"])
# get all exposed ports from docker
for _ in range(3):
exposed_hosts = subprocess.check_output("docker ps | grep %s | grep -o -E '0.0.0.0:[0-9]*'" % ("testprod"), shell=True)
try:
for host in exposed_hosts.split():
with contextlib.closing(ssl.wrap_socket(socket.socket())) as sock:
logging.info("%s: connect to host with TLS" % host)
host, port = host.split(":")
sock.connect((host, int(port)))
done = True
except:
sleep_time *= 2
time.sleep(sleep_time)
continue
if done:
break
# tear down production env
subprocess.call(["./production_test_env.py", "--kill"])
if not done:
pytest.fail("failed to connect to production env. using SSL")
def test_token_token_expiration(self):
""" verify that an expired token is handled correctly (client gets a new, valid one)
and that deployments are still recieved by the client
"""
if not env.host_string:
execute(self.test_token_token_expiration,
hosts=get_mender_clients())
return
timeout_time = int(time.time()) + 60
while int(time.time()) < timeout_time:
with quiet():
output = run("journalctl -u mender -l --no-pager | grep \"received new authorization data\"")
time.sleep(1)
if output.return_code == 0:
logging.info("mender logs indicate new authorization data available")
break
if timeout_time <= int(time.time()):
pytest.fail("timed out waiting for download retries")
# this call verifies that the deployment process goes into an "inprogress" state
# which is only possible when the client has a valid token.
common_update_procedure(install_image=conftest.get_valid_image())
def mender_log_contains_aborted_string(self, mender_client_container="mender-client"):
expected_string = "deployment aborted at the backend"
for _ in range(60*5):
with settings(hide('everything'), warn_only=True):
out = run("journalctl -u mender | grep \"%s\"" % expected_string)
if out.succeeded:
return
else:
time.sleep(2)
pytest.fail("deployment never aborted.")
def perform_update(self, mender_client_container="mender-client", fail=False):
if fail:
execute(update_image_failed,
hosts=get_mender_client_by_container_name(mender_client_container))
else:
execute(update_image_successful,
install_image=conftest.get_valid_image(),
skip_reboot_verification=True,
hosts=get_mender_client_by_container_name(mender_client_container))
def wait_for_containers(expected_containers, defined_in):
for _ in range(60 * 5):
out = subprocess.check_output("docker-compose -p %s %s ps -q" % (conftest.docker_compose_instance, "-f " + " -f ".join(defined_in)), shell=True)
if len(out.split()) == expected_containers:
time.sleep(60)
return
else:
time.sleep(1)
pytest.fail("timeout: %d containers not running for docker-compose project: %s" % (expected_containers, conftest.docker_compose_instance))
def get_ursula_by_id(self, ursula_id):
try:
ursula = self._ursulas[ursula_id]
except KeyError:
pytest.fail("No Ursula with ID {}".format(ursula_id))
return ursula
def test_all_ursulas_know_about_all_other_ursulas(ursulas):
"""
Once launched, all Ursulas know about - and can help locate - all other Ursulas in the network.
"""
ignorance = []
for acounter, announcing_ursula in enumerate(blockchain_client._ursulas_on_blockchain):
for counter, propagating_ursula in enumerate(ursulas):
if not digest(announcing_ursula) in propagating_ursula.server.storage:
ignorance.append((counter, acounter))
if ignorance:
pytest.fail(str(["{} didn't know about {}".format(counter, acounter) for counter, acounter in ignorance]))
def test_execute_fails_with_graphs_with_isles():
g = make_a_graph_with_isles()
with pytest.raises(exceptions.GraphExecutionError):
g.execute()
pytest.fail()