def fx_local(local_commit, remote):
"""Local git repository with branches, light tags, and annotated tags pushed to remote.
:param local_commit: local fixture.
:param remote: local fixture.
:return: Path to repo root.
:rtype: py.path.local
"""
run(local_commit, ['git', 'tag', 'light_tag'])
run(local_commit, ['git', 'tag', '--annotate', '-m', 'Tag annotation.', 'annotated_tag'])
run(local_commit, ['git', 'checkout', '-b', 'feature'])
run(local_commit, ['git', 'checkout', 'master'])
run(local_commit, ['git', 'remote', 'add', 'origin', remote])
run(local_commit, ['git', 'push', 'origin', 'master', 'feature', 'light_tag', 'annotated_tag'])
return local_commit
python类fixture()的实例源码
def testing_workdir(tmpdir, request):
""" Create a workdir in a safe temporary folder; cd into dir above before test, cd out after
:param tmpdir: py.test fixture, will be injected
:param request: py.test fixture-related, will be injected (see pytest docs)
"""
saved_path = os.getcwd()
tmpdir.chdir()
# temporary folder for profiling output, if any
tmpdir.mkdir('prof')
def return_to_saved_path():
if os.path.isdir(os.path.join(saved_path, 'prof')):
profdir = tmpdir.join('prof')
files = profdir.listdir('*.prof') if profdir.isdir() else []
for f in files:
f.rename(os.path.join(saved_path, 'prof', f.basename))
os.chdir(saved_path)
request.addfinalizer(return_to_saved_path)
return str(tmpdir)
def x_series_device():
system = nidaqmx.system.System.local()
for device in system.devices:
if (not device.dev_is_simulated and
device.product_category == ProductCategory.X_SERIES_DAQ and
len(device.ao_physical_chans) >= 2 and
len(device.ai_physical_chans) >= 4 and
len(device.do_lines) >= 8 and
(len(device.di_lines) == len(device.do_lines)) and
len(device.ci_physical_chans) >= 4):
return device
raise NoFixtureDetectedError(
'Could not detect a device that meets the requirements to be an '
'X Series fixture. Cannot proceed to run tests.')
def multi_threading_test_devices():
system = nidaqmx.system.System.local()
devices = []
for device in system.devices:
if (device.dev_is_simulated and
device.product_category == ProductCategory.X_SERIES_DAQ and
len(device.ai_physical_chans) >= 1):
devices.append(device)
if len(devices) == 4:
return devices
raise NoFixtureDetectedError(
'Could not detect 4 simulated X Series devices so as to meet the '
'requirements to be a multi-threading test test fixture. Cannot '
'proceed to run tests. Import the NI MAX configuration file located '
'at nidaqmx\\tests\\max_config\\nidaqmxMaxConfig.ini to create these '
'devices.')
def neo4j_test_ws_dir(datafiles):
return datafiles
# @pytest.fixture(scope="session")
# def workspace(request, data_directory):
# wsconf_file = data_directory.join("workspace.yaml")
# temp_root = tempfile.mkdtemp()
# ws = Workspace("saapy-test-ws",
# temp_root,
# "saapy-test-ws",
# configuration_text=wsconf_file.read_text("utf-8"))
#
# def fin():
# shutil.rmtree(temp_root)
#
# request.addfinalizer(fin)
# return ws # provide the fixture value
def fx_local_light(tmpdir, local, remote):
"""Light-weight local repository similar to how Travis/AppVeyor clone repos.
:param tmpdir: pytest fixture.
:param local: local fixture.
:param remote: local fixture.
:return: Path to repo root.
:rtype: py.path.local
"""
assert local # Ensures local pushes feature branch before this fixture is called.
local2 = tmpdir.ensure_dir('local2')
run(local2, ['git', 'clone', '--depth=1', '--branch=feature', remote, '.'])
sha = run(local2, ['git', 'rev-parse', 'HEAD']).strip()
run(local2, ['git', 'checkout', '-qf', sha])
return local2
def outdate_local(tmpdir, local_light, remote):
"""Clone remote to other directory and push changes. Causes `local` fixture to be outdated.
:param tmpdir: pytest fixture.
:param local_light: local fixture.
:param remote: local fixture.
:return: Path to repo root.
:rtype: py.path.local
"""
assert local_light # Ensures local_light is setup before this fixture pushes to remote.
local_ahead = tmpdir.ensure_dir('local_ahead')
run(local_ahead, ['git', 'clone', remote, '.'])
run(local_ahead, ['git', 'checkout', '-b', 'un_pushed_branch'])
local_ahead.join('README').write('changed')
run(local_ahead, ['git', 'commit', '-am', 'Changed new branch'], environ=author_committer_dates(1))
run(local_ahead, ['git', 'tag', 'nb_tag'])
run(local_ahead, ['git', 'checkout', '--orphan', 'orphaned_branch'])
local_ahead.join('README').write('new')
run(local_ahead, ['git', 'add', 'README'])
run(local_ahead, ['git', 'commit', '-m', 'Added new README'], environ=author_committer_dates(2))
run(local_ahead, ['git', 'tag', '--annotate', '-m', 'Tag annotation.', 'ob_at'])
run(local_ahead, ['git', 'push', 'origin', 'nb_tag', 'orphaned_branch', 'ob_at'])
return local_ahead
def create_bot():
""" Fixture - create the bot object given the bot_class and then add the
endpoint object using the http_class.
Using the fixture because at the end of the test the webserver is
closed no matter the test outcome
"""
fixture = dict()
def create(bot, endpoint):
""" The real function that creates the bot.
This is needed because the fixture cannot accept parameters.
"""
fixture['bot'] = bot
fixture['ep'] = endpoint
fixture['bot'].add_endpoint(fixture['ep'])
fixture['bot'].run()
return fixture['bot']
yield create
fixture['bot'].stop()
def nopovm(request, local_dim, rgen):
"""Provide different POVMs and non-POVMs for testing
We provide instances of :class:`povm.localpovm.POVM` with the
following elements:
* `pauli`: Generated by :func:`povm.pauli_povm()`
* `random`: Random (non-Hermitian, non-positive) elements for
testing. (These elements do not constitute a POVM. We use them
to distinguish elem.conj() from elem.T in our code.)
"""
nopovm_name = request.param
if nopovm_name == 'pauli':
return povm.pauli_povm(local_dim)
elif nopovm_name == 'random':
d = local_dim
return povm.localpovm.POVM(factory._zrandn((2 * d**2, d, d), rgen))
else:
raise ValueError('Unknown fixture name {}'.format(nopovm_name))
def pytest_addoption(parser):
group = parser.getgroup("general")
group.addoption(
'--lf', '--last-failed', action='store_true', dest="lf",
help="rerun only the tests that failed "
"at the last run (or all if none failed)")
group.addoption(
'--ff', '--failed-first', action='store_true', dest="failedfirst",
help="run all tests but run the last failures first. "
"This may re-order tests and thus lead to "
"repeated fixture setup/teardown")
group.addoption(
'--cache-show', action='store_true', dest="cacheshow",
help="show cache contents, don't perform collection or tests")
group.addoption(
'--cache-clear', action='store_true', dest="cacheclear",
help="remove all cache contents at start of test run.")
def makefile(self, ext, *args, **kwargs):
"""Create a new file in the testdir.
ext: The extension the file should use, including the dot.
E.g. ".py".
args: All args will be treated as strings and joined using
newlines. The result will be written as contents to the
file. The name of the file will be based on the test
function requesting this fixture.
E.g. "testdir.makefile('.txt', 'line1', 'line2')"
kwargs: Each keyword is the name of a file, while the value of
it will be written as contents of the file.
E.g. "testdir.makefile('.ini', pytest='[pytest]\naddopts=-rs\n')"
"""
return self._makefile(ext, args, kwargs)
def monkeypatch(request):
"""The returned ``monkeypatch`` fixture provides these
helper methods to modify objects, dictionaries or os.environ::
monkeypatch.setattr(obj, name, value, raising=True)
monkeypatch.delattr(obj, name, raising=True)
monkeypatch.setitem(mapping, name, value)
monkeypatch.delitem(obj, name, raising=True)
monkeypatch.setenv(name, value, prepend=False)
monkeypatch.delenv(name, value, raising=True)
monkeypatch.syspath_prepend(path)
monkeypatch.chdir(path)
All modifications will be undone after the requesting
test function or fixture has finished. The ``raising``
parameter determines if a KeyError or AttributeError
will be raised if the set/deletion operation has no target.
"""
mpatch = MonkeyPatch()
request.addfinalizer(mpatch.undo)
return mpatch
def __init__(self, fixturemanager, baseid, argname, func, scope, params,
unittest=False, ids=None):
self._fixturemanager = fixturemanager
self.baseid = baseid or ''
self.has_location = baseid is not None
self.func = func
self.argname = argname
self.scope = scope
self.scopenum = scope2index(
scope or "function",
descr='fixture {0}'.format(func.__name__),
where=baseid
)
self.params = params
startindex = unittest and 1 or None
self.argnames = getfuncargnames(func, startindex=startindex)
self.unittest = unittest
self.ids = ids
self._finalizer = []
def record_xml_property(request):
"""Add extra xml properties to the tag for the calling test.
The fixture is callable with ``(name, value)``, with value being automatically
xml-encoded.
"""
request.node.warn(
code='C3',
message='record_xml_property is an experimental feature',
)
xml = getattr(request.config, "_xml", None)
if xml is not None:
node_reporter = xml.node_reporter(request.node.nodeid)
return node_reporter.add_property
else:
def add_property_noop(name, value):
pass
return add_property_noop
def app(request, mock_team):
"""Flask application test fixture"""
app_ = bcjoy_app.setup(
TESTING=True
)
ctx = app_.app_context()
ctx.push()
g.bcjoy_team = mock_team
def teardown():
ctx.pop()
request.addfinalizer(teardown)
return app_
def test_make_summaries(make_summaries):
"""Verify our fixture maker works as expected."""
expected_values = [
(10, 0), # 0
(9, 1), # 1 - 1
(8, 2), # 2 - 1
(8, 2), # 3 - 0
(7, 3), # 4 - 1
(6, 4), # 5 - 1
(6, 4), # 6 - 0
(5, 5), # 7 - 1
(4, 6), # 8 - 1
(4, 6), # 9 - 0
]
backlog_start = 10
zero_work_modulo = 3
step = 1
summaries = make_summaries(backlog_start, step, zero_work_modulo)
actual = [(s.incomplete, s.complete) for s in summaries]
assert expected_values == actual
def test_fs():
"""A convenience fixture for retrieving data from test files"""
import pytest_pootle
class TestFs(object):
def path(self, path):
return os.path.join(
os.path.dirname(pytest_pootle.__file__),
path)
def open(self, paths, *args, **kwargs):
if isinstance(paths, (list, tuple)):
paths = os.path.join(*paths)
return open(self.path(paths), *args, **kwargs)
return TestFs()
def test_load_configfile_valid(self):
lbag = LsdiBagger()
# use a Mock to simulate argparse options
lbag.options = Mock(item_ids=[], gen_config=False, digwf_url=None,
output=None)
# load fixture that should work
lbag.options.config = os.path.join(FIXTURE_DIR, 'lsdi-bagger.cfg')
lbag.load_configfile()
# value from the config fixture
assert lbag.options.digwf_url == 'http://example.co:3100/digwf_api/'
assert lbag.options.output == '/tmp/bags'
assert lbag.options.fedora_url == 'http://server.edu:8080/fedora/'
# if output is specified on command line, that takes precedence
lbag.options.output = '/i/want/bags/somewhere/else'
lbag.load_configfile()
assert lbag.options.output != '/tmp/bags'
def main_loop():
loop = GLib.MainLoop()
timeout = GLib.Timeout(RUN_TIMEOUT)
timeout.set_callback(lambda loop: loop.quit(), loop)
timeout.attach()
return loop
# https://github.com/Mirantis/ceph-lcm/blob/1b95e76503d9869da4bf4e91e24c848d3f683624/tests/controller/test_mainloop.py
# @pytest.fixture(scope='function')
# def main_loop_threading():
# thread = threading.Thread(target=mainloop.main)
# thread.start()
#
# return thread
#
#
# def test_shutdown_callback(main_loop_threading):
# time.sleep(.5)
#
# assert main_loop_threading.is_alive()
# mainloop.shutdown_callback()
#
# time.sleep(.5)
# assert not main_loop_threading.is_alive()
def create_example_fixture(example):
"""Create a pytest fixture to run the example in pty subprocess & cleanup.
:param example: relative path like 'examples/input.py'
:return: pytest fixture
"""
@pytest.fixture
def example_app():
p = SimplePty.spawn(['python', example])
yield p
# it takes some time to collect the coverage data
# if the main process exits too early the coverage data is not available
time.sleep(p.delayafterterminate)
p.sendintr() # in case the subprocess was not ended by the test
p.wait() # without wait() the coverage info never arrives
return example_app
def driver():
"""
Selenium driver fixture
"""
# Start a selenium server running chrome
capabilities = DesiredCapabilities.CHROME.copy()
capabilities['chromeOptions'] = {
'binary': os.getenv('CHROME_BIN', '/usr/bin/google-chrome-stable'),
'args': ['--no-sandbox'],
}
driver = Remote(
os.getenv('SELENIUM_URL', 'http://chrome:5555/wd/hub'),
capabilities,
)
driver.implicitly_wait(10)
yield driver
driver.close()
def needle(request, selenium):
"""Visual regression testing fixture
:param request: pytest request
:param selenium: Selenium web driver
:return:
"""
options = {
'cleanup_on_success': request.config.getoption('needle_cleanup_on_success'),
'save_baseline': request.config.getoption('needle_save_baseline'),
'needle_engine': request.config.getoption('needle_engine'),
'baseline_dir': request.config.getoption('baseline_dir'),
'output_dir': request.config.getoption('output_dir'),
'viewport_size': request.config.getoption('viewport_size')
}
return NeedleDriver(selenium, **options)
def testing_workdir(tmpdir, request):
""" Create a workdir in a safe temporary folder; cd into dir above before test, cd out after
:param tmpdir: py.test fixture, will be injected
:param request: py.test fixture-related, will be injected (see pytest docs)
"""
saved_path = os.getcwd()
tmpdir.chdir()
# temporary folder for profiling output, if any
tmpdir.mkdir('prof')
def return_to_saved_path():
if os.path.isdir(os.path.join(saved_path, 'prof')):
profdir = tmpdir.join('prof')
files = profdir.listdir('*.prof') if profdir.isdir() else []
for f in files:
f.rename(os.path.join(saved_path, 'prof', f.basename))
os.chdir(saved_path)
request.addfinalizer(return_to_saved_path)
return str(tmpdir)
def _test_validate(self, schema, expect_failure, input_files, input):
"""validates input yaml against schema.
:param schema: schema yaml file
:param expect_failure: should the validation pass or fail.
:param input_files: pytest fixture used to access the test input files
:param input: test input yaml doc filename"""
schema_dir = pkg_resources.resource_filename('drydock_provisioner',
'schemas')
schema_filename = os.path.join(schema_dir, schema)
schema_file = open(schema_filename, 'r')
schema = yaml.safe_load(schema_file)
input_file = input_files.join(input)
instance_file = open(str(input_file), 'r')
instance = yaml.safe_load(instance_file)
if expect_failure:
with pytest.raises(ValidationError):
jsonschema.validate(instance['spec'], schema['data'])
else:
jsonschema.validate(instance['spec'], schema['data'])
def force_serialization_computations(monkeypatch):
"""
This integration test fixture breaks a few tests as false positives (whenever there are
interactions between multiple computations in a single transformer), so it is designed to be an
aid for widely testing serialization and not a true integration test that must pass on every
merge.
"""
if pytest.config.getoption("--serialization_integration_test"):
original_computation = ngt.Transformer.add_computation
def monkey_add_computation(self, comp):
if comp.name.startswith('init'):
return original_computation(self, comp)
ser_comp = serde.serialize_graph([comp], only_return_handle_ops=True)
deser_comp = serde.deserialize_graph(ser_comp)
assert len(deser_comp) == 1
return original_computation(self, deser_comp[0])
monkeypatch.setattr(ngt.Transformer, 'add_computation', monkey_add_computation)
def profiler(request, tmpdir):
if request.config.option.profiler == 'cpu':
from raiden.utils.profiling.cpu import CpuProfiler
profiler = CpuProfiler(str(tmpdir))
profiler.start()
yield
profiler.stop()
elif request.config.option.profiler == 'sample':
from raiden.utils.profiling.sampler import SampleProfiler
profiler = SampleProfiler(str(tmpdir))
profiler.start()
yield
profiler.stop()
else:
# do nothing, but yield a valid generator otherwise the autouse fixture
# will fail
yield
def random_marker():
""" A random marker used to identify a pytest run.
Some tests will spawn a private chain, the private chain will be one or
more ethereum nodes on a new subprocesss. These nodes may fail to start on
concurrent test runs, mostly because of port number conflicts, but even
though the test fails to start its private chain it may run interacting
with the geth process from a different test run! This leads to
unreasonable test errors.
This fixture creates a random marker used to distinguish pytest runs and
avoid test failures. Note this could fail for other reasons and fail to
detect unwanted interations if the user sets the PYTHONHASHSEED to the same
value.
"""
random_hex = hex(random.getrandbits(100))
# strip the leading 0x and trailing L
return random_hex[2:-1]
def api_backend(rest_api_port_number):
# Initializing it without raiden_service.api here since that is a
# function scope fixture. We will inject it to rest_api object later
rest_api = RestAPI(None)
api_server = APIServer(rest_api)
api_server.flask_app.config['SERVER_NAME'] = 'localhost:{}'.format(rest_api_port_number)
# TODO: Find out why tests fail with debug=True
server = Greenlet.spawn(
api_server.run,
port=rest_api_port_number,
debug=False,
use_evalex=False,
)
# Fixes flaky test, were requests are done prior to the server initializing
# the listening socket.
# https://github.com/raiden-network/raiden/issues/389#issuecomment-305551563
wait_for_listening_port(rest_api_port_number)
yield api_server, rest_api
server.kill(block=True, timeout=10)
def wait_for_partners(app_list, sleep=0.5, timeout=10):
waiting = list(app_list)
while waiting:
# Poll the events to register the new channels
waiting[0].raiden.poll_blockchain_events()
all_healthy = all(
status == NODE_NETWORK_REACHABLE
for status in waiting[0].raiden.protocol.nodeaddresses_networkstatuses.values()
)
if timeout <= 0:
raise RuntimeError('fixture setup failed, nodes are unreachable')
if all_healthy:
waiting.pop(0)
else:
timeout -= sleep
gevent.sleep(sleep)
def response():
"""Sample pytest fixture.
See more at: http://doc.pytest.org/en/latest/fixture.html
"""
# import requests
# return requests.get('https://github.com/audreyr/cookiecutter-pypackage')