def main():
args = parse_args()
stream = subunit.ByteStreamToStreamResult(
sys.stdin, non_subunit_name='stdout')
starts = Starts(sys.stdout)
outcomes = testtools.StreamToDict(
functools.partial(show_outcome, sys.stdout,
print_failures=args.print_failures,
failonly=args.failonly
))
summary = testtools.StreamSummary()
result = testtools.CopyStreamResult([starts, outcomes, summary])
result.startTestRun()
try:
stream.run(result)
finally:
result.stopTestRun()
if count_tests('status', '.*') == 0:
print("The test run didn't actually run any tests")
return 1
if args.post_fails:
print_fails(sys.stdout)
print_summary(sys.stdout)
return (0 if summary.wasSuccessful() else 1)
python类StreamToDict()的实例源码
def main():
args = parse_args()
stream = subunit.ByteStreamToStreamResult(
sys.stdin, non_subunit_name='stdout')
starts = Starts(sys.stdout)
outcomes = testtools.StreamToDict(
functools.partial(show_outcome, sys.stdout,
print_failures=args.print_failures,
failonly=args.failonly))
summary = testtools.StreamSummary()
result = testtools.CopyStreamResult([starts, outcomes, summary])
result.startTestRun()
try:
stream.run(result)
finally:
result.stopTestRun()
if count_tests('status', '.*') == 0:
print("The test run didn't actually run any tests")
return 1
if args.post_fails:
print_fails(sys.stdout)
print_summary(sys.stdout)
return (0 if summary.wasSuccessful() else 1)
def main():
args = parse_args()
stream = subunit.ByteStreamToStreamResult(
sys.stdin, non_subunit_name='stdout')
starts = Starts(sys.stdout)
outcomes = testtools.StreamToDict(
functools.partial(show_outcome, sys.stdout,
print_failures=args.print_failures,
failonly=args.failonly))
summary = testtools.StreamSummary()
result = testtools.CopyStreamResult([starts, outcomes, summary])
result.startTestRun()
try:
stream.run(result)
finally:
result.stopTestRun()
if count_tests('status', '.*') == 0:
print("The test run didn't actually run any tests")
return 1
if args.post_fails:
print_fails(sys.stdout)
print_summary(sys.stdout)
return (0 if summary.wasSuccessful() else 1)
def main():
args = parse_args()
stream = subunit.ByteStreamToStreamResult(
sys.stdin, non_subunit_name='stdout')
starts = Starts(sys.stdout)
outcomes = testtools.StreamToDict(
functools.partial(show_outcome, sys.stdout,
print_failures=args.print_failures,
failonly=args.failonly))
summary = testtools.StreamSummary()
result = testtools.CopyStreamResult([starts, outcomes, summary])
result.startTestRun()
try:
stream.run(result)
finally:
result.stopTestRun()
if count_tests('status', '.*') == 0:
print("The test run didn't actually run any tests")
return 1
if args.post_fails:
print_fails(sys.stdout)
print_summary(sys.stdout)
return (0 if summary.wasSuccessful() else 1)
def main():
args = parse_args()
stream = subunit.ByteStreamToStreamResult(
sys.stdin, non_subunit_name='stdout')
starts = Starts(sys.stdout)
outcomes = testtools.StreamToDict(
functools.partial(show_outcome, sys.stdout,
print_failures=args.print_failures,
failonly=args.failonly))
summary = testtools.StreamSummary()
result = testtools.CopyStreamResult([starts, outcomes, summary])
result.startTestRun()
try:
stream.run(result)
finally:
result.stopTestRun()
if count_tests('status', '.*') == 0:
print("The test run didn't actually run any tests")
return 1
if args.post_fails:
print_fails(sys.stdout)
print_summary(sys.stdout)
return (0 if summary.wasSuccessful() else 1)
def main():
args = parse_args()
stream = subunit.ByteStreamToStreamResult(
sys.stdin, non_subunit_name='stdout')
starts = Starts(sys.stdout)
outcomes = testtools.StreamToDict(
functools.partial(show_outcome, sys.stdout,
print_failures=args.print_failures,
failonly=args.failonly))
summary = testtools.StreamSummary()
result = testtools.CopyStreamResult([starts, outcomes, summary])
result.startTestRun()
try:
stream.run(result)
finally:
result.stopTestRun()
if count_tests('status', '.*') == 0:
print("The test run didn't actually run any tests")
return 1
if args.post_fails:
print_fails(sys.stdout)
print_summary(sys.stdout)
return (0 if summary.wasSuccessful() else 1)
def main():
args = parse_args()
stream = subunit.ByteStreamToStreamResult(
sys.stdin, non_subunit_name='stdout')
starts = Starts(sys.stdout)
outcomes = testtools.StreamToDict(
functools.partial(show_outcome, sys.stdout,
print_failures=args.print_failures,
failonly=args.failonly
))
summary = testtools.StreamSummary()
result = testtools.CopyStreamResult([starts, outcomes, summary])
result.startTestRun()
try:
stream.run(result)
finally:
result.stopTestRun()
if count_tests('status', '.*') == 0:
print("The test run didn't actually run any tests")
return 1
if args.post_fails:
print_fails(sys.stdout)
print_summary(sys.stdout)
return (0 if summary.wasSuccessful() else 1)
def main():
args = parse_args()
stream = subunit.ByteStreamToStreamResult(
sys.stdin, non_subunit_name='stdout')
starts = Starts(sys.stdout)
outcomes = testtools.StreamToDict(
functools.partial(show_outcome, sys.stdout,
print_failures=args.print_failures,
failonly=args.failonly))
summary = testtools.StreamSummary()
result = testtools.CopyStreamResult([starts, outcomes, summary])
result.startTestRun()
try:
stream.run(result)
finally:
result.stopTestRun()
if count_tests('status', '.*') == 0:
print("The test run didn't actually run any tests")
return 1
if args.post_fails:
print_fails(sys.stdout)
print_summary(sys.stdout)
return (0 if summary.wasSuccessful() else 1)
def _find_failing(repo):
run = repo.get_failing()
case = run.get_test()
ids = []
def gather_errors(test_dict):
if test_dict['status'] == 'fail':
ids.append(test_dict['id'])
result = testtools.StreamToDict(gather_errors)
result.startTestRun()
try:
case.run(result)
finally:
result.stopTestRun()
return ids
def __init__(self, repository, partial=False, run_id=None):
# XXX: Perhaps should factor into a decorator and use an unaltered
# TestProtocolClient.
self._repository = repository
self._run_id = run_id
if not self._run_id:
fd, name = tempfile.mkstemp(dir=self._repository.base)
self.fname = name
stream = os.fdopen(fd, 'wb')
else:
self.fname = os.path.join(self._repository.base, self._run_id)
stream = open(self.fname, 'ab')
self.partial = partial
# The time take by each test, flushed at the end.
self._times = {}
self._test_start = None
self._time = None
subunit_client = testtools.StreamToExtendedDecorator(
TestProtocolClient(stream))
self.hook = testtools.CopyStreamResult([
subunit_client,
testtools.StreamToDict(self._handle_test)])
self._stream = stream
def startTestRun(self):
self._subunit = io.BytesIO()
self.subunit_stream = subunit.v2.StreamResultToBytes(self._subunit)
self.hook = testtools.CopyStreamResult([
testtools.StreamToDict(self._handle_test),
self.subunit_stream])
self.hook.startTestRun()
self.start_time = datetime.datetime.utcnow()
session = self.session_factory()
if not self._run_id:
self.run = db_api.create_run(session=session)
self._run_id = self.run.uuid
else:
int_id = db_api.get_run_id_from_uuid(self._run_id, session=session)
self.run = db_api.get_run_by_id(int_id, session=session)
session.close()
self.totals = {}
def trace(stdin, stdout, print_failures=False, failonly=False,
enable_diff=False, abbreviate=False, color=False, post_fails=False,
no_summary=False):
stream = subunit.ByteStreamToStreamResult(
stdin, non_subunit_name='stdout')
outcomes = testtools.StreamToDict(
functools.partial(show_outcome, stdout,
print_failures=print_failures,
failonly=failonly,
enable_diff=enable_diff,
abbreviate=abbreviate,
enable_color=color))
summary = testtools.StreamSummary()
result = testtools.CopyStreamResult([outcomes, summary])
result = testtools.StreamResultRouter(result)
cat = subunit.test_results.CatFiles(stdout)
result.add_rule(cat, 'test_id', test_id=None)
start_time = datetime.datetime.utcnow()
result.startTestRun()
try:
stream.run(result)
finally:
result.stopTestRun()
stop_time = datetime.datetime.utcnow()
elapsed_time = stop_time - start_time
if count_tests('status', '.*') == 0:
print("The test run didn't actually run any tests")
return 1
if post_fails:
print_fails(stdout)
if not no_summary:
print_summary(stdout, elapsed_time)
# NOTE(mtreinish): Ideally this should live in testtools streamSummary
# this is just in place until the behavior lands there (if it ever does)
if count_tests('status', '^success$') == 0:
print("\nNo tests were successful during the run")
return 1
return 0 if summary.wasSuccessful() else 1
def _prior_tests(self, run, failing_id):
"""Calculate what tests from the test run run ran before test_id.
Tests that ran in a different worker are not included in the result.
"""
if not getattr(self, '_worker_to_test', False):
case = run.get_test()
# Use None if there is no worker-N tag
# If there are multiple, map them all.
# (worker-N -> [testid, ...])
worker_to_test = {}
# (testid -> [workerN, ...])
test_to_worker = {}
def map_test(test_dict):
tags = test_dict['tags']
id = test_dict['id']
workers = []
for tag in tags:
if tag.startswith('worker-'):
workers.append(tag)
if not workers:
workers = [None]
for worker in workers:
worker_to_test.setdefault(worker, []).append(id)
test_to_worker.setdefault(id, []).extend(workers)
mapper = testtools.StreamToDict(map_test)
mapper.startTestRun()
try:
case.run(mapper)
finally:
mapper.stopTestRun()
self._worker_to_test = worker_to_test
self._test_to_worker = test_to_worker
failing_workers = self._test_to_worker[failing_id]
prior_tests = []
for worker in failing_workers:
worker_tests = self._worker_to_test[worker]
prior_tests.extend(worker_tests[:worker_tests.index(failing_id)])
return prior_tests
def assertRunExit(self, cmd, expected, subunit=False, stdin=None):
if stdin:
p = subprocess.Popen(
"%s" % cmd, shell=True, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = p.communicate(stdin)
else:
p = subprocess.Popen(
"%s" % cmd, shell=True,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = p.communicate()
if not subunit:
self.assertEqual(
p.returncode, expected,
"Stdout: %s; Stderr: %s" % (out, err))
return (out, err)
else:
self.assertEqual(p.returncode, expected,
"Expected return code: %s doesn't match actual "
"return code of: %s" % (expected, p.returncode))
output_stream = io.BytesIO(out)
stream = subunit_lib.ByteStreamToStreamResult(output_stream)
starts = testtools.StreamResult()
summary = testtools.StreamSummary()
tests = []
def _add_dict(test):
tests.append(test)
outcomes = testtools.StreamToDict(functools.partial(_add_dict))
result = testtools.CopyStreamResult([starts, outcomes, summary])
result.startTestRun()
try:
stream.run(result)
finally:
result.stopTestRun()
self.assertThat(len(tests), testtools.matchers.GreaterThan(0))
return (out, err)
def startTestRun(self):
self._subunit = BytesIO()
serialiser = subunit.v2.StreamResultToBytes(self._subunit)
self._hook = testtools.CopyStreamResult([
testtools.StreamToDict(self._handle_test),
serialiser])
self._hook.startTestRun()
def to_disk(argv=None, stdin=None, stdout=None):
if stdout is None:
stdout = sys.stdout
if stdin is None:
stdin = sys.stdin
parser = optparse.OptionParser(
description="Export a subunit stream to files on disk.",
epilog=dedent("""\
Creates a directory per test id, a JSON file with test
metadata within that directory, and each attachment
is written to their name relative to that directory.
Global packages (no test id) are discarded.
Exits 0 if the export was completed, or non-zero otherwise.
"""))
parser.add_option(
"-d", "--directory", help="Root directory to export to.",
default=".")
options, args = parser.parse_args(argv)
if len(args) > 1:
raise Exception("Unexpected arguments.")
if len(args):
source = io.open(args[0], 'rb')
else:
source = stdin
exporter = DiskExporter(options.directory)
result = StreamToDict(exporter.export)
run_tests_from_stream(source, result, protocol_version=2)
return 0