def main():
args = parse_args()
stream = subunit.ByteStreamToStreamResult(
sys.stdin, non_subunit_name='stdout')
starts = Starts(sys.stdout)
outcomes = testtools.StreamToDict(
functools.partial(show_outcome, sys.stdout,
print_failures=args.print_failures,
failonly=args.failonly
))
summary = testtools.StreamSummary()
result = testtools.CopyStreamResult([starts, outcomes, summary])
result.startTestRun()
try:
stream.run(result)
finally:
result.stopTestRun()
if count_tests('status', '.*') == 0:
print("The test run didn't actually run any tests")
return 1
if args.post_fails:
print_fails(sys.stdout)
print_summary(sys.stdout)
return (0 if summary.wasSuccessful() else 1)
python类CopyStreamResult()的实例源码
def main():
args = parse_args()
stream = subunit.ByteStreamToStreamResult(
sys.stdin, non_subunit_name='stdout')
starts = Starts(sys.stdout)
outcomes = testtools.StreamToDict(
functools.partial(show_outcome, sys.stdout,
print_failures=args.print_failures,
failonly=args.failonly))
summary = testtools.StreamSummary()
result = testtools.CopyStreamResult([starts, outcomes, summary])
result.startTestRun()
try:
stream.run(result)
finally:
result.stopTestRun()
if count_tests('status', '.*') == 0:
print("The test run didn't actually run any tests")
return 1
if args.post_fails:
print_fails(sys.stdout)
print_summary(sys.stdout)
return (0 if summary.wasSuccessful() else 1)
def main():
args = parse_args()
stream = subunit.ByteStreamToStreamResult(
sys.stdin, non_subunit_name='stdout')
starts = Starts(sys.stdout)
outcomes = testtools.StreamToDict(
functools.partial(show_outcome, sys.stdout,
print_failures=args.print_failures,
failonly=args.failonly))
summary = testtools.StreamSummary()
result = testtools.CopyStreamResult([starts, outcomes, summary])
result.startTestRun()
try:
stream.run(result)
finally:
result.stopTestRun()
if count_tests('status', '.*') == 0:
print("The test run didn't actually run any tests")
return 1
if args.post_fails:
print_fails(sys.stdout)
print_summary(sys.stdout)
return (0 if summary.wasSuccessful() else 1)
def main():
args = parse_args()
stream = subunit.ByteStreamToStreamResult(
sys.stdin, non_subunit_name='stdout')
starts = Starts(sys.stdout)
outcomes = testtools.StreamToDict(
functools.partial(show_outcome, sys.stdout,
print_failures=args.print_failures,
failonly=args.failonly))
summary = testtools.StreamSummary()
result = testtools.CopyStreamResult([starts, outcomes, summary])
result.startTestRun()
try:
stream.run(result)
finally:
result.stopTestRun()
if count_tests('status', '.*') == 0:
print("The test run didn't actually run any tests")
return 1
if args.post_fails:
print_fails(sys.stdout)
print_summary(sys.stdout)
return (0 if summary.wasSuccessful() else 1)
def main():
args = parse_args()
stream = subunit.ByteStreamToStreamResult(
sys.stdin, non_subunit_name='stdout')
starts = Starts(sys.stdout)
outcomes = testtools.StreamToDict(
functools.partial(show_outcome, sys.stdout,
print_failures=args.print_failures,
failonly=args.failonly))
summary = testtools.StreamSummary()
result = testtools.CopyStreamResult([starts, outcomes, summary])
result.startTestRun()
try:
stream.run(result)
finally:
result.stopTestRun()
if count_tests('status', '.*') == 0:
print("The test run didn't actually run any tests")
return 1
if args.post_fails:
print_fails(sys.stdout)
print_summary(sys.stdout)
return (0 if summary.wasSuccessful() else 1)
def main():
args = parse_args()
stream = subunit.ByteStreamToStreamResult(
sys.stdin, non_subunit_name='stdout')
starts = Starts(sys.stdout)
outcomes = testtools.StreamToDict(
functools.partial(show_outcome, sys.stdout,
print_failures=args.print_failures,
failonly=args.failonly))
summary = testtools.StreamSummary()
result = testtools.CopyStreamResult([starts, outcomes, summary])
result.startTestRun()
try:
stream.run(result)
finally:
result.stopTestRun()
if count_tests('status', '.*') == 0:
print("The test run didn't actually run any tests")
return 1
if args.post_fails:
print_fails(sys.stdout)
print_summary(sys.stdout)
return (0 if summary.wasSuccessful() else 1)
def main():
args = parse_args()
stream = subunit.ByteStreamToStreamResult(
sys.stdin, non_subunit_name='stdout')
starts = Starts(sys.stdout)
outcomes = testtools.StreamToDict(
functools.partial(show_outcome, sys.stdout,
print_failures=args.print_failures,
failonly=args.failonly
))
summary = testtools.StreamSummary()
result = testtools.CopyStreamResult([starts, outcomes, summary])
result.startTestRun()
try:
stream.run(result)
finally:
result.stopTestRun()
if count_tests('status', '.*') == 0:
print("The test run didn't actually run any tests")
return 1
if args.post_fails:
print_fails(sys.stdout)
print_summary(sys.stdout)
return (0 if summary.wasSuccessful() else 1)
def main():
args = parse_args()
stream = subunit.ByteStreamToStreamResult(
sys.stdin, non_subunit_name='stdout')
starts = Starts(sys.stdout)
outcomes = testtools.StreamToDict(
functools.partial(show_outcome, sys.stdout,
print_failures=args.print_failures,
failonly=args.failonly))
summary = testtools.StreamSummary()
result = testtools.CopyStreamResult([starts, outcomes, summary])
result.startTestRun()
try:
stream.run(result)
finally:
result.stopTestRun()
if count_tests('status', '.*') == 0:
print("The test run didn't actually run any tests")
return 1
if args.post_fails:
print_fails(sys.stdout)
print_summary(sys.stdout)
return (0 if summary.wasSuccessful() else 1)
def __init__(self, repository, partial=False, run_id=None):
# XXX: Perhaps should factor into a decorator and use an unaltered
# TestProtocolClient.
self._repository = repository
self._run_id = run_id
if not self._run_id:
fd, name = tempfile.mkstemp(dir=self._repository.base)
self.fname = name
stream = os.fdopen(fd, 'wb')
else:
self.fname = os.path.join(self._repository.base, self._run_id)
stream = open(self.fname, 'ab')
self.partial = partial
# The time take by each test, flushed at the end.
self._times = {}
self._test_start = None
self._time = None
subunit_client = testtools.StreamToExtendedDecorator(
TestProtocolClient(stream))
self.hook = testtools.CopyStreamResult([
subunit_client,
testtools.StreamToDict(self._handle_test)])
self._stream = stream
def startTestRun(self):
self._subunit = io.BytesIO()
self.subunit_stream = subunit.v2.StreamResultToBytes(self._subunit)
self.hook = testtools.CopyStreamResult([
testtools.StreamToDict(self._handle_test),
self.subunit_stream])
self.hook.startTestRun()
self.start_time = datetime.datetime.utcnow()
session = self.session_factory()
if not self._run_id:
self.run = db_api.create_run(session=session)
self._run_id = self.run.uuid
else:
int_id = db_api.get_run_id_from_uuid(self._run_id, session=session)
self.run = db_api.get_run_by_id(int_id, session=session)
session.close()
self.totals = {}
def trace(stdin, stdout, print_failures=False, failonly=False,
enable_diff=False, abbreviate=False, color=False, post_fails=False,
no_summary=False):
stream = subunit.ByteStreamToStreamResult(
stdin, non_subunit_name='stdout')
outcomes = testtools.StreamToDict(
functools.partial(show_outcome, stdout,
print_failures=print_failures,
failonly=failonly,
enable_diff=enable_diff,
abbreviate=abbreviate,
enable_color=color))
summary = testtools.StreamSummary()
result = testtools.CopyStreamResult([outcomes, summary])
result = testtools.StreamResultRouter(result)
cat = subunit.test_results.CatFiles(stdout)
result.add_rule(cat, 'test_id', test_id=None)
start_time = datetime.datetime.utcnow()
result.startTestRun()
try:
stream.run(result)
finally:
result.stopTestRun()
stop_time = datetime.datetime.utcnow()
elapsed_time = stop_time - start_time
if count_tests('status', '.*') == 0:
print("The test run didn't actually run any tests")
return 1
if post_fails:
print_fails(stdout)
if not no_summary:
print_summary(stdout, elapsed_time)
# NOTE(mtreinish): Ideally this should live in testtools streamSummary
# this is just in place until the behavior lands there (if it ever does)
if count_tests('status', '^success$') == 0:
print("\nNo tests were successful during the run")
return 1
return 0 if summary.wasSuccessful() else 1
def assertRunExit(self, cmd, expected, subunit=False, stdin=None):
if stdin:
p = subprocess.Popen(
"%s" % cmd, shell=True, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = p.communicate(stdin)
else:
p = subprocess.Popen(
"%s" % cmd, shell=True,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = p.communicate()
if not subunit:
self.assertEqual(
p.returncode, expected,
"Stdout: %s; Stderr: %s" % (out, err))
return (out, err)
else:
self.assertEqual(p.returncode, expected,
"Expected return code: %s doesn't match actual "
"return code of: %s" % (expected, p.returncode))
output_stream = io.BytesIO(out)
stream = subunit_lib.ByteStreamToStreamResult(output_stream)
starts = testtools.StreamResult()
summary = testtools.StreamSummary()
tests = []
def _add_dict(test):
tests.append(test)
outcomes = testtools.StreamToDict(functools.partial(_add_dict))
result = testtools.CopyStreamResult([starts, outcomes, summary])
result.startTestRun()
try:
stream.run(result)
finally:
result.stopTestRun()
self.assertThat(len(tests), testtools.matchers.GreaterThan(0))
return (out, err)
def startTestRun(self):
self._subunit = BytesIO()
serialiser = subunit.v2.StreamResultToBytes(self._subunit)
self._hook = testtools.CopyStreamResult([
testtools.StreamToDict(self._handle_test),
serialiser])
self._hook.startTestRun()
def tag_stream(original, filtered, tags):
"""Alter tags on a stream.
:param original: The input stream.
:param filtered: The output stream.
:param tags: The tags to apply. As in a normal stream - a list of 'TAG' or
'-TAG' commands.
A 'TAG' command will add the tag to the output stream,
and override any existing '-TAG' command in that stream.
Specifically:
* A global 'tags: TAG' will be added to the start of the stream.
* Any tags commands with -TAG will have the -TAG removed.
A '-TAG' command will remove the TAG command from the stream.
Specifically:
* A 'tags: -TAG' command will be added to the start of the stream.
* Any 'tags: TAG' command will have 'TAG' removed from it.
Additionally, any redundant tagging commands (adding a tag globally
present, or removing a tag globally removed) are stripped as a
by-product of the filtering.
:return: 0
"""
new_tags, gone_tags = tags_to_new_gone(tags)
source = ByteStreamToStreamResult(original, non_subunit_name='stdout')
class Tagger(CopyStreamResult):
def status(self, **kwargs):
tags = kwargs.get('test_tags')
if not tags:
tags = set()
tags.update(new_tags)
tags.difference_update(gone_tags)
if tags:
kwargs['test_tags'] = tags
else:
kwargs['test_tags'] = None
super(Tagger, self).status(**kwargs)
output = Tagger([StreamResultToBytes(filtered)])
source.run(output)
return 0
def run_tests_from_stream(input_stream, result, passthrough_stream=None,
forward_stream=None, protocol_version=1, passthrough_subunit=True):
"""Run tests from a subunit input stream through 'result'.
Non-test events - top level file attachments - are expected to be
dropped by v2 StreamResults at the present time (as all the analysis code
is in ExtendedTestResult API's), so to implement passthrough_stream they
are diverted and copied directly when that is set.
:param input_stream: A stream containing subunit input.
:param result: A TestResult that will receive the test events.
NB: This should be an ExtendedTestResult for v1 and a StreamResult for
v2.
:param passthrough_stream: All non-subunit input received will be
sent to this stream. If not provided, uses the ``TestProtocolServer``
default, which is ``sys.stdout``.
:param forward_stream: All subunit input received will be forwarded
to this stream. If not provided, uses the ``TestProtocolServer``
default, which is to not forward any input. Do not set this when
transforming the stream - items would be double-reported.
:param protocol_version: What version of the subunit protocol to expect.
:param passthrough_subunit: If True, passthrough should be as subunit
otherwise unwrap it. Only has effect when forward_stream is None.
(when forwarding as subunit non-subunit input is always turned into
subunit)
"""
if 1==protocol_version:
test = ProtocolTestCase(
input_stream, passthrough=passthrough_stream,
forward=forward_stream)
elif 2==protocol_version:
# In all cases we encapsulate unknown inputs.
if forward_stream is not None:
# Send events to forward_stream as subunit.
forward_result = StreamResultToBytes(forward_stream)
# If we're passing non-subunit through, copy:
if passthrough_stream is None:
# Not passing non-test events - split them off to nothing.
router = StreamResultRouter(forward_result)
router.add_rule(StreamResult(), 'test_id', test_id=None)
result = CopyStreamResult([router, result])
else:
# otherwise, copy all events to forward_result
result = CopyStreamResult([forward_result, result])
elif passthrough_stream is not None:
if not passthrough_subunit:
# Route non-test events to passthrough_stream, unwrapping them for
# display.
passthrough_result = CatFiles(passthrough_stream)
else:
passthrough_result = StreamResultToBytes(passthrough_stream)
result = StreamResultRouter(result)
result.add_rule(passthrough_result, 'test_id', test_id=None)
test = ByteStreamToStreamResult(input_stream,
non_subunit_name='stdout')
else:
raise Exception("Unknown protocol version.")
result.startTestRun()
test.run(result)
result.stopTestRun()