def __init__(self, name, retain_file=False, gen_flex=False):
self.num_kernels = 0
self.module = None
self.functions = dict()
self.arg_descs = dict()
self.cache = dict()
self.compiled = False
self.retain_file = retain_file
self.buffer = StringIO()
# Open file and add header
if self.retain_file:
self.f = tempfile.NamedTemporaryFile(mode='w', suffix='.c', prefix=name, delete=False)
self.filename = self.f.name
self.buffer.write(_includes_template)
if gen_flex:
self.buffer.write(_flex_includes_template)
python类StringIO()的实例源码
def test_parse_stream(self):
"""
It can parse input from a file stream.
"""
stream = StringIO("""
@nice-blue: #5B83AD;
""")
self.parser.parse(file=stream)
# A single object is parser which is the expected variable.
self.assertEqual(1, len(self.parser.result))
# This is a stream without a name so it sets default name.
self.assertEqual('(stream)', self.parser.target)
variable = self.parser.result[0]
self.assertEqual('@nice-blue', variable.name)
self.assertEqual(['#5b83ad'], variable.value)
def test_sign(self):
message = six.StringIO(u'Hello, World!')
sig_algs = (hash_.ES256, hash_.ES384, hash_.ES521)
origin_sig = (
[88, 34, 78, 248, 120, 105, 162, 172, 14, 179, 252, 201, 193, 230, 124, 105, 227, 145, 236, 104, 31, 153, 215, 117,
193, 126, 229, 98, 143, 88, 81, 216, 19, 67, 86, 127, 212, 195, 149, 200, 63, 117, 65, 155, 70, 6, 219, 234, 151,
120, 229, 214, 193, 210, 165, 158, 135, 160, 244, 210, 41, 49, 132, 71],
[65, 50, 99, 81, 100, 105, 169, 194, 75, 54, 38, 34, 69, 117, 22, 1, 105, 176, 138, 47, 254, 233, 225, 132, 39, 40,
160, 2, 9, 208, 78, 11, 76, 20, 163, 57, 222, 176, 108, 93, 155, 163, 102, 185, 211, 138, 83, 92, 67, 92, 133, 51,
119, 58, 20, 212, 51, 37, 20, 175, 202, 134, 85, 14],
[112, 133, 177, 239, 79, 252, 240, 252, 28, 96, 174, 40, 109, 34, 109, 141, 183, 16, 246, 249, 102, 213, 128, 132,
206, 148, 124, 150, 254, 248, 164, 62, 109, 95, 178, 120, 53, 52, 216, 135, 213, 193, 97, 109, 255, 0, 220, 219, 41,
224, 112, 236, 14, 131, 170, 191, 223, 234, 36, 90, 152, 133, 138, 247]
)
for i, sa in enumerate(sig_algs):
message.seek(0)
sig, alg = self.private_key.sign(message, sa.hash_id)
message.seek(0)
self.assertTrue(self.public_key.verify(message, alg, sig))
message.seek(0)
self.assertTrue(self.public_key.verify(message, alg, bytes(bytearray(origin_sig[i]))))
def sign(self, private_key, timestamp=None):
protected = self.protected_header(timestamp=timestamp)
sign_bytes = six.StringIO(self.sign_bytes(protected).decode())
sig_bytes, algorithm = private_key.sign(sign_bytes, hash_.HashID.SHA256)
self.signatures.append(
JsSignature(
JsHeader(
private_key.public_key(),
algorithm
),
util.jose_base64_url_encode(sig_bytes),
protected
)
)
return sig_bytes, algorithm
def verify(self):
keys = []
for sign in self.signatures:
sign_bytes = self.sign_bytes(sign.protected)
if sign.header.chain:
raise NotImplementedError()
elif sign.header.jwk:
public_key = sign.header.jwk
else:
raise JSONSignError("missing public key")
sig_bytes = util.jose_base64_url_decode(sign.signature)
try:
public_key.verify(six.StringIO(sign_bytes.decode()), sign.header.algorithm, sig_bytes)
except Exception as e:
raise e
keys.append(public_key)
return keys
def __repr__(self, verbosity=0):
repr = StringIO()
repr.write("Metafeatures for dataset %s\n" % self.name)
for name in self.values:
if verbosity == 0 and self.values[name].type_ != "METAFEATURE":
continue
if verbosity == 0:
repr.write(" %s: %s\n" %
(str(name), str(self.values[name].value)))
elif verbosity >= 1:
repr.write(" %s: %10s (%10fs)\n" %
(str(name), str(self.values[
name].value)[:10],
self.values[name].time))
# Add the reason for a crash if one happened!
if verbosity > 1 and self.values[name].comment:
repr.write(" %s\n" % self.values[name].comment)
return repr.getvalue()
def _render(self, mode='human', close=False):
if close:
return
outfile = StringIO() if mode == 'ansi' else sys.stdout
row, col = self.s // self.ncol, self.s % self.ncol
desc = self.desc.tolist()
desc = [[c.decode('utf-8') for c in line] for line in desc]
desc[row][col] = utils.colorize(desc[row][col], "red", highlight=True)
if self.lastaction is not None:
outfile.write(" ({})\n".format(["Left","Down","Right","Up"][self.lastaction]))
else:
outfile.write("\n")
outfile.write("\n".join(''.join(line) for line in desc)+"\n")
return outfile
def test_run_invalid(self, *args):
m_fin = StringIO()
m_fout = StringIO()
code = self.runner.run(
{'CNI_COMMAND': 'INVALID', 'CNI_ARGS': 'foo=bar'}, m_fin, m_fout)
self.assertEqual(1, code)
def test_run_write_version(self, *args):
m_fin = StringIO()
m_fout = StringIO()
code = self.runner.run(
{'CNI_COMMAND': 'VERSION', 'CNI_ARGS': 'foo=bar'}, m_fin, m_fout)
result = jsonutils.loads(m_fout.getvalue())
self.assertEqual(0, code)
self.assertEqual(api.CNIRunner.SUPPORTED_VERSIONS,
result['supportedVersions'])
self.assertEqual(api.CNIRunner.VERSION, result['cniVersion'])
def test_run_del(self, m_k8s_delete):
vif = fake._fake_vif()
m_k8s_delete.return_value = vif
m_fin = StringIO()
m_fout = StringIO()
env = {
'CNI_COMMAND': 'DEL',
'CNI_ARGS': 'foo=bar',
}
self.runner.run(env, m_fin, m_fout)
self.assertTrue(m_k8s_delete.called)
self.assertEqual('foo=bar', m_k8s_delete.call_args[0][0].CNI_ARGS)
def _test_run(self, cni_cmd, path, m_post):
m_fin = StringIO()
m_fout = StringIO()
env = {
'CNI_COMMAND': cni_cmd,
'CNI_ARGS': 'foo=bar',
}
result = self.runner.run(env, m_fin, m_fout)
m_post.assert_called_with(
'http://127.0.0.1:%d/%s' % (self.port, path),
json=mock.ANY, headers={'Connection': 'close'})
return result
def __init__(self, context):
self._context = context
self._start_mem = memory_profiler.memory_usage()[0]
self._log = StringIO()
self._log.write("START RequestId: {r} Version: {v}\n".format(
r=context.aws_request_id, v=context.function_version
))
self._start_time = timeit.default_timer()
self._previous_stdout = sys.stdout
handler = logging.StreamHandler(stream=self._log)
logging.getLogger().addHandler(handler)
sys.stdout = self._log
def call(arguments):
output = six.StringIO()
with mock.patch("sys.stdout", output):
with mock.patch("sys.argv", arguments):
main.main()
return str(output.getvalue())
def log_stream():
stream = StringIO()
handler = logging.StreamHandler(stream)
logger = logging.getLogger('test_logger')
logger.setLevel(logging.WARNING)
logger.addHandler(handler)
yield stream, logger
handler.close()
def log_stream():
stream = StringIO()
handler = logging.StreamHandler(stream)
logger = logging.getLogger('test_logger')
logger.setLevel(logging.WARNING)
logger.addHandler(handler)
prior_logger = v1.logger
v1.logger = logger
yield stream
v1.logger = prior_logger
handler.close()
# test the retry logic with a custom logger
def _get_auth(self, ims_host, ims_endpoint_jwt,
tech_acct_id=None, api_key=None, client_secret=None,
private_key_file=None, private_key_data=None,
**kwargs):
tech_acct_id = tech_acct_id or kwargs.get("tech_acct")
private_key_file = private_key_file or kwargs.get("priv_key_path")
if not (tech_acct_id and api_key and client_secret and (private_key_data or private_key_file)):
raise ArgumentError("Connector create: not all required auth parameters were supplied; please see docs")
if private_key_data:
jwt = JWT(self.org_id, tech_acct_id, ims_host, api_key, six.StringIO(private_key_data))
else:
with open(private_key_file, 'r') as private_key_stream:
jwt = JWT(self.org_id, tech_acct_id, ims_host, api_key, private_key_stream)
token = AccessRequest("https://" + ims_host + ims_endpoint_jwt, api_key, client_secret, jwt())
return Auth(api_key, token())
def _run_test(self, filename):
mock_pipeline = test_helper.get_mock_pipeline([
helper.DOCUMENT, helper.PICTURE,
helper.ARCHIVE, helper.DISKIMAGE])
detector = detect_type.Subscriber(mock_pipeline)
with open('config.yml') as inp:
detector.setup(yaml.load(inp.read()))
doc = document.get_document(filename)
detector.consume(doc, StringIO('dummy'))
return doc
def from_string(cls, key, password='notasecret'):
"""Construct an RsaSigner instance from a string.
Args:
key: string, private key in PEM format.
password: string, password for private key file. Unused for PEM
files.
Returns:
RsaSigner instance.
Raises:
ValueError if the key cannot be parsed as PKCS#1 or PKCS#8 in
PEM format.
"""
key = _from_bytes(key) # pem expects str in Py3
marker_id, key_bytes = pem.readPemBlocksFromFile(
six.StringIO(key), _PKCS1_MARKER, _PKCS8_MARKER)
if marker_id == 0:
pkey = rsa.key.PrivateKey.load_pkcs1(key_bytes,
format='DER')
elif marker_id == 1:
key_info, remaining = decoder.decode(
key_bytes, asn1Spec=_PKCS8_SPEC)
if remaining != b'':
raise ValueError('Unused bytes', remaining)
pkey_info = key_info.getComponentByName('privateKey')
pkey = rsa.key.PrivateKey.load_pkcs1(pkey_info.asOctets(),
format='DER')
else:
raise ValueError('No key could be detected.')
return cls(pkey)
def test_genbank_record_fixing(self):
with open(os.path.join(FILES_PATH, 'sample.gb')) as f:
first_record = next(SeqIO.parse(f, 'genbank'))
first_record.features = [unconvert_feature(convert_feature(f)) for f in first_record.features]
output = StringIO()
SeqIO.write(first_record, output, "genbank")
with open(os.path.join(FILES_PATH, 'sample.fixed.gb')) as f:
self.assertEqual(output.getvalue(), f.read())
output.close()