def _parse_pem_key(raw_key_input):
"""Identify and extract PEM keys.
Determines whether the given key is in the format of PEM key, and extracts
the relevant part of the key if it is.
Args:
raw_key_input: The contents of a private key file (either PEM or
PKCS12).
Returns:
string, The actual key if the contents are from a PEM file, or
else None.
"""
offset = raw_key_input.find(b'-----BEGIN ')
if offset != -1:
return raw_key_input[offset:]
python类b()的实例源码
test_run_no_updates_available.py 文件源码
项目:pyupdater-wx-demo
作者: wettenhj
项目源码
文件源码
阅读 43
收藏 0
点赞 0
评论 0
def setUp(self):
tempFile = tempfile.NamedTemporaryFile()
self.fileServerDir = tempFile.name
tempFile.close()
os.mkdir(self.fileServerDir)
os.environ['PYUPDATER_FILESERVER_DIR'] = self.fileServerDir
privateKey = ed25519.SigningKey(PRIVATE_KEY.encode('utf-8'),
encoding='base64')
signature = privateKey.sign(six.b(json.dumps(VERSIONS, sort_keys=True)),
encoding='base64').decode()
VERSIONS['signature'] = signature
keysFilePath = os.path.join(self.fileServerDir, 'keys.gz')
with gzip.open(keysFilePath, 'wb') as keysFile:
keysFile.write(json.dumps(KEYS, sort_keys=True))
versionsFilePath = os.path.join(self.fileServerDir, 'versions.gz')
with gzip.open(versionsFilePath, 'wb') as versionsFile:
versionsFile.write(json.dumps(VERSIONS, sort_keys=True))
os.environ['WXUPDATEDEMO_TESTING'] = 'True'
from wxupdatedemo.config import CLIENT_CONFIG
self.clientConfig = CLIENT_CONFIG
self.clientConfig.PUBLIC_KEY = PUBLIC_KEY
def setUp(self):
tempFile = tempfile.NamedTemporaryFile()
self.fileServerDir = tempFile.name
tempFile.close()
os.mkdir(self.fileServerDir)
os.environ['PYUPDATER_FILESERVER_DIR'] = self.fileServerDir
privateKey = ed25519.SigningKey(PRIVATE_KEY.encode('utf-8'),
encoding='base64')
signature = privateKey.sign(six.b(json.dumps(VERSIONS, sort_keys=True)),
encoding='base64').decode()
VERSIONS['signature'] = signature
keysFilePath = os.path.join(self.fileServerDir, 'keys.gz')
with gzip.open(keysFilePath, 'wb') as keysFile:
keysFile.write(json.dumps(KEYS, sort_keys=True))
versionsFilePath = os.path.join(self.fileServerDir, 'versions.gz')
with gzip.open(versionsFilePath, 'wb') as versionsFile:
versionsFile.write(json.dumps(VERSIONS, sort_keys=True))
os.environ['WXUPDATEDEMO_TESTING'] = 'True'
from wxupdatedemo.config import CLIENT_CONFIG
self.clientConfig = CLIENT_CONFIG
self.clientConfig.PUBLIC_KEY = PUBLIC_KEY
self.clientConfig.APP_NAME = APP_NAME
def test_clear(self):
keyspace = 'keyspace'
routing_key = 'routing_key'
custom_payload = {'key': six.b('value')}
ss = SimpleStatement('whatever', keyspace=keyspace, routing_key=routing_key, custom_payload=custom_payload)
batch = BatchStatement()
batch.add(ss)
self.assertTrue(batch._statements_and_parameters)
self.assertEqual(batch.keyspace, keyspace)
self.assertEqual(batch.routing_key, routing_key)
self.assertEqual(batch.custom_payload, custom_payload)
batch.clear()
self.assertFalse(batch._statements_and_parameters)
self.assertIsNone(batch.keyspace)
self.assertIsNone(batch.routing_key)
self.assertFalse(batch.custom_payload)
batch.add(ss)
def test_non_frozen_udts(self):
"""
Test to ensure that non frozen udt's work with C* >3.6.
@since 3.7.0
@jira_ticket PYTHON-498
@expected_result Non frozen UDT's are supported
@test_category data_types, udt
"""
self.session.execute("USE {0}".format(self.keyspace_name))
self.session.execute("CREATE TYPE user (state text, has_corn boolean)")
self.session.execute("CREATE TABLE {0} (a int PRIMARY KEY, b user)".format(self.function_table_name))
User = namedtuple('user', ('state', 'has_corn'))
self.cluster.register_user_type(self.keyspace_name, "user", User)
self.session.execute("INSERT INTO {0} (a, b) VALUES (%s, %s)".format(self.function_table_name), (0, User("Nebraska", True)))
self.session.execute("UPDATE {0} SET b.has_corn = False where a = 0".format(self.function_table_name))
result = self.session.execute("SELECT * FROM {0}".format(self.function_table_name))
self.assertFalse(result[0].b.has_corn)
table_sql = self.cluster.metadata.keyspaces[self.keyspace_name].tables[self.function_table_name].as_cql_query()
self.assertNotIn("<frozen>", table_sql)
def test_raise_error_on_nonexisting_udts(self):
"""
Test for ensuring that an error is raised for operating on a nonexisting udt or an invalid keyspace
"""
c = Cluster(protocol_version=PROTOCOL_VERSION)
s = c.connect(self.keyspace_name, wait_for_all_pools=True)
User = namedtuple('user', ('age', 'name'))
with self.assertRaises(UserTypeDoesNotExist):
c.register_user_type("some_bad_keyspace", "user", User)
with self.assertRaises(UserTypeDoesNotExist):
c.register_user_type("system", "user", User)
with self.assertRaises(InvalidRequest):
s.execute("CREATE TABLE mytable (a int PRIMARY KEY, b frozen<user>)")
c.shutdown()
def test_cluster_column_ordering_reversed_metadata(self):
"""
Simple test to ensure that the metatdata associated with cluster ordering is surfaced is surfaced correctly.
Creates a table with a few clustering keys. Then checks the clustering order associated with clustering columns
and ensure it's set correctly.
@since 3.0.0
@jira_ticket PYTHON-402
@expected_result is_reversed is set on DESC order, and is False on ASC
@test_category metadata
"""
create_statement = self.make_create_statement(["a"], ["b", "c"], ["d"], compact=True)
create_statement += " AND CLUSTERING ORDER BY (b ASC, c DESC)"
self.session.execute(create_statement)
tablemeta = self.get_table_metadata()
b_column = tablemeta.columns['b']
self.assertFalse(b_column.is_reversed)
c_column = tablemeta.columns['c']
self.assertTrue(c_column.is_reversed)
def test_cql_compatibility(self):
if CASS_SERVER_VERSION >= (3, 0):
raise unittest.SkipTest("cql compatibility does not apply Cassandra 3.0+")
# having more than one non-PK column is okay if there aren't any
# clustering columns
create_statement = self.make_create_statement(["a"], [], ["b", "c", "d"], compact=True)
self.session.execute(create_statement)
tablemeta = self.get_table_metadata()
self.assertEqual([u'a'], [c.name for c in tablemeta.partition_key])
self.assertEqual([], tablemeta.clustering_key)
self.assertEqual([u'a', u'b', u'c', u'd'], sorted(tablemeta.columns.keys()))
self.assertTrue(tablemeta.is_cql_compatible)
# ... but if there are clustering columns, it's not CQL compatible.
# This is a hacky way to simulate having clustering columns.
tablemeta.clustering_key = ["foo", "bar"]
tablemeta.columns["foo"] = None
tablemeta.columns["bar"] = None
self.assertFalse(tablemeta.is_cql_compatible)
def test_indexes(self):
create_statement = self.make_create_statement(["a"], ["b", "c"], ["d", "e", "f"])
create_statement += " WITH CLUSTERING ORDER BY (b ASC, c ASC)"
execute_until_pass(self.session, create_statement)
d_index = "CREATE INDEX d_index ON %s.%s (d)" % (self.keyspace_name, self.function_table_name)
e_index = "CREATE INDEX e_index ON %s.%s (e)" % (self.keyspace_name, self.function_table_name)
execute_until_pass(self.session, d_index)
execute_until_pass(self.session, e_index)
tablemeta = self.get_table_metadata()
statements = tablemeta.export_as_string().strip()
statements = [s.strip() for s in statements.split(';')]
statements = list(filter(bool, statements))
self.assertEqual(3, len(statements))
self.assertIn(d_index, statements)
self.assertIn(e_index, statements)
# make sure indexes are included in KeyspaceMetadata.export_as_string()
ksmeta = self.cluster.metadata.keyspaces[self.keyspace_name]
statement = ksmeta.export_as_string()
self.assertIn('CREATE INDEX d_index', statement)
self.assertIn('CREATE INDEX e_index', statement)
def test_non_size_tiered_compaction(self):
"""
test options for non-size-tiered compaction strategy
Creates a table with LeveledCompactionStrategy, specifying one non-default option. Verifies that the option is
present in generated CQL, and that other legacy table parameters (min_threshold, max_threshold) are not included.
@since 2.6.0
@jira_ticket PYTHON-352
@expected_result the options map for LeveledCompactionStrategy does not contain min_threshold, max_threshold
@test_category metadata
"""
create_statement = self.make_create_statement(["a"], [], ["b", "c"])
create_statement += "WITH COMPACTION = {'class': 'LeveledCompactionStrategy', 'tombstone_threshold': '0.3'}"
self.session.execute(create_statement)
table_meta = self.get_table_metadata()
cql = table_meta.export_as_string()
self.assertIn("'tombstone_threshold': '0.3'", cql)
self.assertIn("LeveledCompactionStrategy", cql)
self.assertNotIn("min_threshold", cql)
self.assertNotIn("max_threshold", cql)
def test_simple(self):
task1 = TaskRegistry.create('test_executor_add')
task1.Input.a = 10
task1.Input.b = 20
task2 = TaskRegistry.create('test_executor_add')
task2.Input.a = 100
task2.Input.b = 200
executor1 = TaskExecutor()
executor1.set_task(task1)
executor1.execute()
executor2 = TaskExecutor()
executor2.set_task(task2)
executor2.execute()
self.assertEqual(task1.Output.c, 30)
self.assertEqual(task2.Output.c, 300)
def test_stream(self):
with open('test.tmp', 'wb') as fd:
fd.write(six.b('Hello World ') * 100)
task = TaskRegistry.create('test_executor_stream_copy')
task.Input.stream = open('test.tmp', 'rb')
task.Output.stream = open('test.out', 'wb')
executor = TaskExecutor()
executor.set_task(task)
executor.execute()
with open('test.out', 'rb') as fd:
self.assertEqual(fd.read(), six.b('Hello World ') * 100)
os.unlink('test.tmp')
os.unlink('test.out')
def test_string_field(self):
instance = StringField()
self.assertEqual(instance.get_initial(), None)
with self.assertRaises(InvalidValueException):
instance.validate(1)
with self.assertRaises(InvalidValueException):
instance.validate(1.5)
with self.assertRaises(InvalidValueException):
instance.validate(six.b('ABC'))
instance = StringField(default=six.u('ABC'))
self.assertEqual(instance.get_initial(), six.u('ABC'))
instance.validate(six.u('hello'))
def test_byte_field(self):
instance = ByteField()
self.assertEqual(instance.get_initial(), None)
with self.assertRaises(InvalidValueException):
instance.validate(1)
with self.assertRaises(InvalidValueException):
instance.validate(1.5)
with self.assertRaises(InvalidValueException):
instance.validate(six.u('ABC'))
instance = ByteField(default=six.b('ABC'))
self.assertEqual(instance.get_initial(), six.b('ABC'))
instance.validate(six.b('hello'))
def test_struct_field(self):
instance = StructField(a=IntegerField(), b=FloatField())
val = instance.create()
val.a = 100
val.b = 3.14
self.assertEqual(val.a, 100)
nested_instance = StructField(
a=IntegerField(),
b=StructField(
c=FloatField(),
d=StringField(default=six.u('hello world'))
)
)
val = nested_instance.create()
val.a = 100
val.b.c = 3.14
self.assertEqual(val.b.c, 3.14)
self.assertEqual(val.b.d, six.u('hello world'))
def test_single_io_write_stream_encode_output(
tmpdir,
patch_aws_encryption_sdk_stream,
patch_json_ready_header,
patch_json_ready_header_auth
):
patch_aws_encryption_sdk_stream.return_value = io.BytesIO(DATA)
patch_aws_encryption_sdk_stream.return_value.header = MagicMock(encryption_context=sentinel.encryption_context)
target_file = tmpdir.join('target')
mock_source = MagicMock()
kwargs = GOOD_IOHANDLER_KWARGS.copy()
kwargs['encode_output'] = True
handler = io_handling.IOHandler(**kwargs)
with open(str(target_file), 'wb') as destination_writer:
handler._single_io_write(
stream_args={
'mode': 'encrypt',
'a': sentinel.a,
'b': sentinel.b
},
source=mock_source,
destination_writer=destination_writer
)
assert target_file.read('rb') == base64.b64encode(DATA)
def test_should_write_file_does_exist(tmpdir, patch_input, interactive, no_overwrite, user_input, expected):
target_file = tmpdir.join('target')
target_file.write(b'')
patch_input.return_value = user_input
kwargs = GOOD_IOHANDLER_KWARGS.copy()
kwargs.update(dict(
interactive=interactive,
no_overwrite=no_overwrite
))
handler = io_handling.IOHandler(**kwargs)
should_write = handler._should_write_file(str(target_file))
if expected:
assert should_write
else:
assert not should_write
def write_metadata(self, **metadata):
# type: (**Any) -> Optional[int]
"""Writes metadata to the output stream if output is not suppressed.
:param **metadata: JSON-serializeable metadata kwargs to write
"""
if self.suppress_output:
return 0 # wrote 0 bytes
metadata_line = json.dumps(metadata, sort_keys=True) + os.linesep
metadata_output = '' # type: Union[str, bytes]
if 'b' in self._output_mode:
metadata_output = metadata_line.encode('utf-8')
else:
metadata_output = metadata_line
return self._output_stream.write(metadata_output)
def optimal_data_chunks(data, minimum=4):
"""
An iterator returning QRData chunks optimized to the data content.
:param minimum: The minimum number of bytes in a row to split as a chunk.
"""
data = to_bytestring(data)
re_repeat = (
six.b('{') + six.text_type(minimum).encode('ascii') + six.b(',}'))
num_pattern = re.compile(six.b('\d') + re_repeat)
num_bits = _optimal_split(data, num_pattern)
alpha_pattern = re.compile(
six.b('[') + re.escape(ALPHA_NUM) + six.b(']') + re_repeat)
for is_num, chunk in num_bits:
if is_num:
yield QRData(chunk, mode=MODE_NUMBER, check_data=False)
else:
for is_alpha, sub_chunk in _optimal_split(chunk, alpha_pattern):
if is_alpha:
mode = MODE_ALPHA_NUM
else:
mode = MODE_8BIT_BYTE
yield QRData(sub_chunk, mode=mode, check_data=False)
def _parse_pem_key(raw_key_input):
"""Identify and extract PEM keys.
Determines whether the given key is in the format of PEM key, and extracts
the relevant part of the key if it is.
Args:
raw_key_input: The contents of a private key file (either PEM or
PKCS12).
Returns:
string, The actual key if the contents are from a PEM file, or
else None.
"""
offset = raw_key_input.find(b'-----BEGIN ')
if offset != -1:
return raw_key_input[offset:]
def rotate_vector(v, axis, theta):
"""
Return the rotation matrix associated with counterclockwise rotation about
the given axis by theta radians.
"""
axis = np.asarray(axis)
axis = axis/math.sqrt(np.dot(axis, axis))
a = math.cos(theta/2.0)
b, c, d = -axis*math.sin(theta/2.0)
aa, bb, cc, dd = a*a, b*b, c*c, d*d
bc, ad, ac, ab, bd, cd = b*c, a*d, a*c, a*b, b*d, c*d
R = np.array([[aa+bb-cc-dd, 2*(bc+ad), 2*(bd-ac)],
[2*(bc-ad), aa+cc-bb-dd, 2*(cd+ab)],
[2*(bd+ac), 2*(cd-ab), aa+dd-bb-cc]])
return np.dot(R, v)
def test_format_locals(self):
def some_inner(k, v):
a = 1
b = 2
return traceback.StackSummary.extract(
traceback.walk_stack(None), capture_locals=True, limit=1)
s = some_inner(3, 4)
self.assertEqual(
[' File "' + FNAME + '", line 651, '
'in some_inner\n'
' traceback.walk_stack(None), capture_locals=True, limit=1)\n'
' a = 1\n'
' b = 2\n'
' k = 3\n'
' v = 4\n'
], s.format())
def test_host_override(self):
loader = load_from_dict(
port_forwarding=dict(
host="service",
)
)
client = self.init(loader)
response = client.get(
"/",
headers={
"X-Forwarded-Port": "8080",
},
)
assert_that(response.status_code, is_(equal_to(200)))
assert_that(response.data, is_(equal_to(b("http://service/"))))
test_requests_staticmock.py 文件源码
项目:requests-staticmock
作者: tonybaloney
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def test_class_context_manager_good_factory():
class_session = Session()
class TestMockClass(BaseMockClass):
def _test_json(self, request):
return StaticResponseFactory.GoodResponse(
request=request,
body=b("it's my life"),
headers={'now': 'never'},
status_code=201
)
with mock_session_with_class(class_session, TestMockClass, 'http://test.com'):
response = class_session.get('http://test.com/test.json')
assert response.text == "it's my life"
assert 'now' in response.headers.keys()
assert response.headers['now'] == 'never'
assert response.status_code == 201
test_requests_staticmock.py 文件源码
项目:requests-staticmock
作者: tonybaloney
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def test_class_context_manager_bad_factory():
class_session = Session()
class TestMockClass(BaseMockClass):
def _test_json(self, request):
return StaticResponseFactory.BadResponse(
request=request,
body=b("it's not over"),
headers={'now': 'never'},
)
with mock_session_with_class(class_session, TestMockClass, 'http://test.com'):
response = class_session.get('http://test.com/test.json')
assert response.text == "it's not over"
assert 'now' in response.headers.keys()
assert response.headers['now'] == 'never'
assert response.status_code == DEFAULT_BAD_STATUS_CODE
def test_format_locals(self):
def some_inner(k, v):
a = 1
b = 2
return traceback.StackSummary.extract(
traceback.walk_stack(None), capture_locals=True, limit=1)
s = some_inner(3, 4)
self.assertEqual(
[' File "' + FNAME + '", line 651, '
'in some_inner\n'
' traceback.walk_stack(None), capture_locals=True, limit=1)\n'
' a = 1\n'
' b = 2\n'
' k = 3\n'
' v = 4\n'
], s.format())
def __call__(self, environ, start_response):
for err_str in self.app_iter:
err = {}
try:
err = json.loads(err_str.decode('utf-8'))
except ValueError:
pass
links = {'rel': 'help', 'href': 'https://developer.openstack.org'
'/api-guide/compute/microversions.html'}
err['max_version'] = self.max_version
err['min_version'] = self.min_version
err['code'] = "zun.microversion-unsupported"
err['links'] = [links]
err['title'] = "Requested microversion is unsupported"
self.app_iter = [six.b(json.dump_as_bytes(err))]
self.headers['Content-Length'] = str(len(self.app_iter[0]))
return super(HTTPNotAcceptableAPIVersion, self).__call__(
environ, start_response)
def get_id(source_uuid):
"""Derive a short (12 character) id from a random UUID.
The supplied UUID must be a version 4 UUID object.
"""
if isinstance(source_uuid, six.string_types):
source_uuid = uuid.UUID(source_uuid)
if source_uuid.version != 4:
raise ValueError(_('Invalid UUID version (%d)') % source_uuid.version)
# The "time" field of a v4 UUID contains 60 random bits
# (see RFC4122, Section 4.4)
random_bytes = _to_byte_string(source_uuid.time, 60)
# The first 12 bytes (= 60 bits) of base32-encoded output is our data
encoded = base64.b32encode(six.b(random_bytes))[:12]
if six.PY3:
return encoded.lower().decode('utf-8')
else:
return encoded.lower()
def check(self, fmt, value):
from random import randrange
# build a buffer which is surely big enough to contain what we need
# and check:
# 1) that we correctly write the bytes we expect
# 2) that we do NOT write outside the bounds
#
pattern = [six.int2byte(randrange(256)) for _ in range(256)]
pattern = b''.join(pattern)
buf = bytearray(pattern)
buf2 = bytearray(pattern)
offset = 16
pack_into(ord(fmt), buf, offset, value)
struct.pack_into(fmt, buf2, offset, value)
assert buf == buf2
#
# check that it raises if it's out of bound
out_of_bound = 256-struct.calcsize(fmt)+1
pytest.raises(IndexError, "pack_into(ord(fmt), buf, out_of_bound, value)")
def test_dumps_alignment():
class Person(Struct):
pass
buf = b('\x20\x00\x00\x00\x00\x00\x00\x00' # age=32
'\x01\x00\x00\x00\x2a\x00\x00\x00' # name=ptr
'J' 'o' 'h' 'n' '\x00\x00\x00\x00') # John
p = Person.from_buffer(buf, 0, data_size=1, ptrs_size=1)
msg = dumps(p)
exp = b('\x00\x00\x00\x00\x04\x00\x00\x00' # message header: 1 segment, size 3 words
'\x00\x00\x00\x00\x01\x00\x01\x00' # ptr to payload
'\x20\x00\x00\x00\x00\x00\x00\x00' # age=32
'\x01\x00\x00\x00\x2a\x00\x00\x00' # name=ptr
'J' 'o' 'h' 'n' '\x00\x00\x00\x00') # John
assert msg == exp