def test_run_add(self, m_k8s_add):
vif = fake._fake_vif()
m_k8s_add.return_value = vif
m_fin = StringIO()
m_fout = StringIO()
env = {
'CNI_COMMAND': 'ADD',
'CNI_ARGS': 'foo=bar',
}
self.runner.run(env, m_fin, m_fout)
self.assertTrue(m_k8s_add.called)
self.assertEqual('foo=bar', m_k8s_add.call_args[0][0].CNI_ARGS)
result = jsonutils.loads(m_fout.getvalue())
self.assertDictEqual(
{"cniVersion": "0.3.0",
"dns": {"nameservers": ["192.168.0.1"]},
"ip4": {"gateway": "192.168.0.1", "ip": "192.168.0.2/24"}},
result)
python类StringIO()的实例源码
def test_successful_call(self):
def handle(event_arg, context_arg):
return event["number"] + int(context.memory_limit_in_mb)
event = {"number": 5, "other": "ignored"}
context = context_module.MockLambdaContext.Builder()\
.set_function_name("test_handle")\
.set_memory_limit_in_mb("100")\
.build()
result = call_module.run_lambda(handle, event, context)
self.assertIsInstance(result, call_module.LambdaResult)
self.assertFalse(result.timed_out)
self.assertIsNone(result.exception)
self.assertEqual(result.value, 105)
self.assertTrue(result.summary.duration_in_millis >= 0)
self.assertIsInstance(result.summary.duration_in_millis, int)
self.assertTrue(result.summary.max_memory_used_in_mb >= 0)
self.assertIsInstance(result.summary.log, str)
output = six.StringIO()
result.display(output)
def __init__(self, body, mimetype='application/octet-stream',
chunksize=DEFAULT_CHUNK_SIZE, resumable=False):
"""Create a new MediaInMemoryUpload.
DEPRECATED: Use MediaIoBaseUpload with either io.TextIOBase or StringIO for
the stream.
Args:
body: string, Bytes of body content.
mimetype: string, Mime-type of the file or default of
'application/octet-stream'.
chunksize: int, File will be uploaded in chunks of this many bytes. Only
used if resumable=True.
resumable: bool, True if this is a resumable upload. False means upload
in a single request.
"""
fd = BytesIO(body)
super(MediaInMemoryUpload, self).__init__(fd, mimetype, chunksize=chunksize,
resumable=resumable)
def test_load(self):
config = ReleaseConfig()
stream = StringIO(
'projects:\n dev: 123\n prod: 321\n'
'images:\n dev: registry/user/project\n prod: user/project\n'
'endpoints:\n dev: http://127.0.0.1/api/scrapyd/\n'
'apikeys:\n default: abcde\n'
'version: GIT')
config.load(stream)
assert getattr(config, 'projects') == {'dev': 123, 'prod': 321}
assert getattr(config, 'endpoints') == {
'default': 'https://app.scrapinghub.com/api/',
'dev': 'http://127.0.0.1/api/scrapyd/'}
assert config.images == {
'dev': 'registry/user/project',
'prod': 'user/project'}
assert getattr(config, 'apikeys') == {'default': 'abcde'}
assert getattr(config, 'version') == 'GIT'
def test_unseekable_file(self):
def tell_fails():
raise IOError()
ticket = get_ticket(urls=[get_http_ticket(EXAMPLE_URL)])
for num_retries in range(10):
temp_file = StringIO()
temp_file.tell = tell_fails
with mock.patch("time.sleep") as mock_sleep, \
mock.patch("logging.warning") as mock_warning:
dm = RetryCountDownloadManager(
ticket, temp_file, max_retries=num_retries)
self.assertEqual(dm.max_retries, num_retries)
self.assertRaises(exceptions.RetryableError, dm.run)
self.assertEqual(dm.attempt_counts[EXAMPLE_URL], 1)
self.assertEqual(mock_sleep.call_count, 0)
self.assertEqual(mock_warning.call_count, 0)
def logger(test_case):
log_buffer = StringIO()
ROOT_LOGGER.handlers[:] = []
stream_handler = logging.StreamHandler(stream=log_buffer)
stream_handler.setLevel(logging.DEBUG)
formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - %(name)s#%(lineno)d - %(message)s')
stream_handler.setFormatter(formatter)
ROOT_LOGGER.addHandler(stream_handler)
yield log_buffer
if test_case.is_failed:
log_path = os.path.join(test_case._test_report_dir, 'test.log')
with test_case.log_exception("Attach test log"):
with open(log_path, 'w') as log_file:
log_file.write(log_buffer.getvalue().encode('utf-8'))
def test_plural_form(self):
buf = StringIO(
(
'<html><translate translate-plural="hello {$count$} worlds!">'
'hello one world!</translate></html>'
))
messages = list(extract_angular(buf, [], [], {}))
self.assertEqual(
[
(1, 'ngettext',
('hello one world!',
'hello {$count$} worlds!'
),
[])
], messages)
def test_multiple_comments(self):
buf = StringIO(
'<html><translate '
'translate-comment="What a beautiful world"'
'translate-comment="Another comment"'
'>hello world!</translate></html>')
messages = list(extract_angular(buf, [], [], {}))
self.assertEqual(
[
(1, 'gettext', 'hello world!',
[
'What a beautiful world',
'Another comment'
])
],
messages)
def test_nested_variations(self):
buf = StringIO(
'''
<p translate>To <a href="link">link</a> here</p>
<p translate>To <!-- a comment!! --> here</p>
<p translate>To trademark® > > here</p>
'''
)
messages = list(extract_angular(buf, [], [], {}))
self.assertEqual(
[
(2, u'gettext', 'To <a href="link">link</a> here', []),
(3, u'gettext', 'To <!-- a comment!! --> here', []),
(4, u'gettext', u'To trademark® > > here', []),
],
messages)
def test_print_ascii(self):
qr = qrcode.QRCode(border=0)
f = six.StringIO()
qr.print_ascii(out=f)
printed = f.getvalue()
f.close()
expected = u'\u2588\u2580\u2580\u2580\u2580\u2580\u2588'
self.assertEqual(printed[:len(expected)], expected)
f = six.StringIO()
f.isatty = lambda: True
qr.print_ascii(out=f, tty=True)
printed = f.getvalue()
f.close()
expected = (
u'\x1b[48;5;232m\x1b[38;5;255m' +
u'\xa0\u2584\u2584\u2584\u2584\u2584\xa0')
self.assertEqual(printed[:len(expected)], expected)
def generate_handler_stub(router, handler_template=HANDLER_TEMPLATE):
output = StringIO()
func_name_to_operation = {}
for path in router.get_paths():
for operation in path.get_operations():
snake_operation_id = camel_case_to_spaces(operation.id).replace(' ', '_')
func_name_to_operation[snake_operation_id] = operation
for func_name, operation in sorted(func_name_to_operation.items()):
parameter_names = [p['name'] for p in operation.parameters]
handler = handler_template.format(
func_name=func_name,
operation_id=operation.id,
parameters=', '.join(parameter_names),
)
output.write(handler)
output.write('\n\n\n')
return output.getvalue()
def read_content(queue):
frame = yield queue.get()
header = frame.payload
children = []
for i in range(header.weight):
content = yield read_content(queue)
children.append(content)
size = header.size
read = 0
buf = six.StringIO()
while read < size:
body = yield queue.get()
content = body.payload.content
# if this is the first instance of real binary content, convert the string buffer to BytesIO
# Not a nice fix but it preserves the original behaviour
if six.PY3 and isinstance(content, bytes) and isinstance(buf, six.StringIO):
buf = six.BytesIO()
buf.write(content)
read += len(content)
defer.returnValue(Content(buf.getvalue(), children, header.properties.copy()))
def generate_handler_stub(router, handler_template=HANDLER_TEMPLATE):
output = StringIO()
func_name_to_operation = {}
for path in router.get_paths():
for operation in path.get_operations():
snake_operation_id = camel_case_to_spaces(operation.id).replace(' ', '_')
func_name_to_operation[snake_operation_id] = operation
for func_name, operation in sorted(func_name_to_operation.items()):
parameter_names = [p['name'] for p in operation.parameters]
handler = handler_template.format(
func_name=func_name,
operation_id=operation.id,
parameters=', '.join(parameter_names),
)
output.write(handler)
output.write('\n\n\n')
return output.getvalue()
def __init__(self, body, mimetype='application/octet-stream',
chunksize=DEFAULT_CHUNK_SIZE, resumable=False):
"""Create a new MediaInMemoryUpload.
DEPRECATED: Use MediaIoBaseUpload with either io.TextIOBase or StringIO for
the stream.
Args:
body: string, Bytes of body content.
mimetype: string, Mime-type of the file or default of
'application/octet-stream'.
chunksize: int, File will be uploaded in chunks of this many bytes. Only
used if resumable=True.
resumable: bool, True if this is a resumable upload. False means upload
in a single request.
"""
fd = BytesIO(body)
super(MediaInMemoryUpload, self).__init__(fd, mimetype, chunksize=chunksize,
resumable=resumable)
def assert_csv_response(self, response, status_code, expected_lines=None):
assert_that(response.headers["Content-Type"], starts_with("text/csv"))
# always validate status code
assert_that(response.status_code, is_(equal_to(status_code)))
# expect JSON data except on 204
if status_code == 204:
response_lines = None
else:
response_lines = [row for row in reader(StringIO(response.data.decode("utf-8")))]
# validate data if provided
assert_that(
response_lines,
has_length(len(expected_lines)),
)
if response_lines is not None and expected_lines is not None:
for index, line in enumerate(response_lines):
assert_that(
line,
contains(*expected_lines[index]),
)
def create_plan(self, loom_input_tensor):
p = plan.TrainPlan()
foo = tf.get_variable('foo', [], tf.float32, tf.constant_initializer(12))
p.compiler = block_compiler.Compiler.create(
blocks.Scalar() >> blocks.Function(lambda x: x * foo),
loom_input_tensor=loom_input_tensor)
p.losses['foo'] = p.compiler.output_tensors[0]
p.finalize_stats()
p.train_op = tf.train.GradientDescentOptimizer(1.0).minimize(
p.loss_total, global_step=p.global_step)
p.logdir = self.get_temp_dir()
p.dev_examples = [2]
p.is_chief_trainer = True
p.batch_size = 2
p.epochs = 3
p.print_file = six.StringIO()
return p
def test_run_once(self):
p = self._make_plan()
p.save_best = False
# We aren't using a managed session, so we need to run this ourselves.
init_op = tf.global_variables_initializer()
sv = p.create_supervisor()
with self.test_session() as sess:
p.run(sv, sess)
log_str = p.print_file.getvalue()
self.assertTrue(
log_str.endswith('could not restore from %s\n' % p.logdir_restore),
msg=log_str)
p.print_file = six.StringIO()
sess.run(init_op)
tf.gfile.MkDir(p.logdir_restore)
save_path = os.path.join(p.logdir_restore, 'model')
sv.saver.save(sess, save_path, global_step=42)
p.run(sv, sess)
log_str = p.print_file.getvalue()
expected_lines = ['restoring from %s-42' % save_path,
'step: 0 loss: 3.000e+00 foo: 4.200e+01']
expected = '\n'.join(expected_lines) + '\n'
self.assertTrue(log_str.endswith(expected), msg=log_str)
def _request(self, method, url, headers=None, data=None,
content_length=None):
call = build_call_record(method, sort_url_by_query_keys(url),
headers or {}, data)
if content_length is not None:
call = tuple(list(call) + [content_length])
self.calls.append(call)
fixture = self.fixtures[sort_url_by_query_keys(url)][method]
data = fixture[1]
if isinstance(fixture[1], six.string_types):
try:
data = json.loads(fixture[1])
except ValueError:
data = six.StringIO(fixture[1])
return FakeResponse(fixture[0], fixture[1]), data
def test_print_report(self):
hook = memory_hooks.LineProfileHook()
p = self.pool.malloc(1000)
del p
with hook:
p1 = self.pool.malloc(1000)
p2 = self.pool.malloc(2000)
del p1
del p2
io = six.StringIO()
hook.print_report(file=io)
actual = io.getvalue()
expect = r'\A_root \(3\.00KB, 2\.00KB\)'
six.assertRegex(self, actual, expect)
expect = r'.*\.py:[0-9]+:test_print_report \(1\.00KB, 0\.00B\)'
six.assertRegex(self, actual, expect)
expect = r'.*\.py:[0-9]+:test_print_report \(2\.00KB, 2\.00KB\)'
six.assertRegex(self, actual, expect)
def test_print_report_max_depth(self):
hook = memory_hooks.LineProfileHook(max_depth=1)
with hook:
p = self.pool.malloc(1000)
del p
io = six.StringIO()
hook.print_report(file=io)
actual = io.getvalue()
self.assertEqual(2, len(actual.split('\n')))
hook = memory_hooks.LineProfileHook(max_depth=2)
with hook:
p = self.pool.malloc(1000)
del p
io = six.StringIO()
hook.print_report(file=io)
actual = io.getvalue()
self.assertEqual(3, len(actual.split('\n')))
def _render(self, mode='human', close=False):
if close:
return
outfile = StringIO() if mode == 'ansi' else sys.stdout
row, col = self.s // self.ncol, self.s % self.ncol
desc = self.desc.tolist()
desc = [[c.decode('utf-8') for c in line] for line in desc]
desc[row][col] = utils.colorize(desc[row][col], "red", highlight=True)
if self.lastaction is not None:
outfile.write(" ({})\n".format(["Left","Down","Right","Up"][self.lastaction]))
else:
outfile.write("\n")
outfile.write("\n".join(''.join(line) for line in desc)+"\n")
return outfile
def setUp(self):
super(PostArgParseSetupTest, self).setUp()
self.config.debug = False
self.config.max_log_backups = 1000
self.config.quiet = False
self.config.verbose_count = constants.CLI_DEFAULTS['verbose_count']
self.devnull = open(os.devnull, 'w')
from certbot.log import ColoredStreamHandler
self.stream_handler = ColoredStreamHandler(six.StringIO())
from certbot.log import MemoryHandler, TempHandler
self.temp_handler = TempHandler()
self.temp_path = self.temp_handler.path
self.memory_handler = MemoryHandler(self.temp_handler)
self.root_logger = mock.MagicMock(
handlers=[self.memory_handler, self.stream_handler])
def _test_common(self, error_type, debug):
"""Returns the mocked logger and stderr output."""
mock_err = six.StringIO()
try:
raise error_type(self.error_msg)
except BaseException:
exc_info = sys.exc_info()
with mock.patch('certbot.log.logger') as mock_logger:
with mock.patch('certbot.log.sys.stderr', mock_err):
try:
# pylint: disable=star-args
self._call(
*exc_info, debug=debug, log_path=self.log_path)
except SystemExit as exit_err:
mock_err.write(str(exit_err))
else: # pragma: no cover
self.fail('SystemExit not raised.')
output = mock_err.getvalue()
return mock_logger, output
def test_to_str_exclude():
def exclude(h):
if h[0].lower() == 'multi-line':
return None
return h
sah = StatusAndHeadersParser(['HTTP/1.0']).parse(StringIO(status_headers_1))
res = sah.to_str(exclude)
exp = "\
HTTP/1.0 200 OK\r\n\
Content-Type: ABC\r\n\
Some: Value\r\n\
"
assert(res == exp)
assert(sah.to_bytes(exclude) == (exp.encode('latin-1') + b'\r\n'))
def test_simmatrix_import_run():
output_fn = tmpname()
tsv = '''frag_id1 frag_id2 score
2mlm_2W7_frag1 2mlm_2W7_frag2 0.5877164873731594
2mlm_2W7_frag2 3wvm_STE_frag1 0.4633096818493935
'''
inputfile = StringIO(tsv)
try:
script.simmatrix_import_run(inputfile=inputfile,
inputformat='tsv',
simmatrixfn=output_fn,
fragmentsdb='data/fragments.sqlite',
nrrows=2)
simmatrix = SimilarityMatrix(output_fn)
result = [r for r in simmatrix]
simmatrix.close()
expected = [('2mlm_2W7_frag1', '2mlm_2W7_frag2xx', 0.5877), ('2mlm_2W7_frag2', '3wvm_STE_frag1', 0.4633)]
assert_array_almost_equal([r[2] for r in result], [r[2] for r in expected], 3)
assert [(r[0], r[1],) for r in result] == [(r[0], r[1],) for r in result]
finally:
if os.path.exists(output_fn):
os.remove(output_fn)
def test_simmatrix_importfpneigh_run():
output_fn = tmpname()
tsv = '''Compounds similar to 2mlm_2W7_frag1:
2mlm_2W7_frag1 1.0000
2mlm_2W7_frag2 0.5877
Compounds similar to 2mlm_2W7_frag2:
2mlm_2W7_frag2 1.0000
3wvm_STE_frag1 0.4633
'''
inputfile = StringIO(tsv)
try:
script.simmatrix_importfpneigh_run(inputfile=inputfile,
simmatrixfn=output_fn,
fragmentsdb='data/fragments.sqlite',
nrrows=3)
simmatrix = SimilarityMatrix(output_fn)
rows = [r for r in simmatrix]
simmatrix.close()
expected = [(u'2mlm_2W7_frag1', u'2mlm_2W7_frag2', 0.5877), (u'2mlm_2W7_frag2', u'3wvm_STE_frag1', 0.4633)]
assert rows == expected
finally:
os.remove(output_fn)
def test_fpneigh2tsv_run():
fpneigh_in = '''Compounds similar to 2mlm_2W7_frag1:
2mlm_2W7_frag1 1.0000
2mlm_2W7_frag2 0.5877
Compounds similar to 2mlm_2W7_frag2:
2mlm_2W7_frag2 1.0000
3wvm_STE_frag1 0.4633
'''
inputfile = StringIO(fpneigh_in)
outputfile = StringIO()
script.fpneigh2tsv_run(inputfile, outputfile)
expected = '''frag_id1\tfrag_id2\tscore
2mlm_2W7_frag1\t2mlm_2W7_frag2\t0.5877
2mlm_2W7_frag2\t3wvm_STE_frag1\t0.4633
'''
assert outputfile.getvalue() == expected
def test_dump_pairs_astsv(self, bitsets, number_of_bits, label2id):
out = StringIO()
pairs.dump_pairs(bitsets,
bitsets,
'tsv',
'StringIO',
out,
number_of_bits,
0.4,
0.05,
label2id,
False,
True,
)
result = out.getvalue()
expected = "a\tc\t0.13556\n"
assert result == expected
def test_dump_pairs_astsv_nomem(self, bitsets, number_of_bits, label2id):
out = StringIO()
pairs.dump_pairs(bitsets,
bitsets,
'tsv',
'StringIO',
out,
number_of_bits,
0.4,
0.05,
label2id,
True,
True,
)
result = out.getvalue()
expected = "a\tc\t0.13556\n"
assert result == expected
def make_audio(tensor, sample_rate, length_frames, num_channels):
"""Convert an numpy representation audio to Audio protobuf"""
output = StringIO()
wav_out = wave.open(output, "w")
wav_out.setframerate(float(sample_rate))
wav_out.setsampwidth(2)
wav_out.setcomptype('NONE', 'not compressed')
wav_out.setnchannels(num_channels)
wav_out.writeframes(tensor.astype("int16").tostring())
wav_out.close()
output.flush()
audio_string = output.getvalue()
return Summary.Audio(sample_rate=float(sample_rate),
num_channels=num_channels,
length_frames=length_frames,
encoded_audio_string=audio_string,
content_type="audio/wav")