def test_unicode_sequence_form_value(resp_mock, mock):
http = mock.return_value
http.request.return_value = (Mock(), Mock())
data = {
"body": [u('\xe5'), u('\xe7')],
}
resources.make_request("POST", "http://www.example.com", data=data)
http.request.assert_called_with(
"http://www.example.com",
"POST",
headers=None,
body="body=%C3%A5&body=%C3%A7",
)
python类u()的实例源码
def encode_tile_key(self, project_info, resolution, x_index, y_index, z_index, t_index=0):
"""A method to create a tile key.
The tile key is the key used for each individual tile file.
Args:
project_info(list): A list of strings containing the project/data model information for where data belongs
resolution(int): The level of the resolution hierarchy. Typically 0
x_index(int): The x tile index
y_index(int): The y tile index
z_index(int): The z tile index
t_index(int): The time index
Returns:
(str): The object key to use for uploading to the tile bucket
"""
proj_str = six.u("&".join([str(x) for x in project_info]))
base_key = six.u("{}&{}&{}&{}&{}&{}".format(proj_str, resolution, x_index, y_index, z_index, t_index))
hashm = hashlib.md5()
hashm.update(base_key.encode())
return six.u("{}&{}".format(hashm.hexdigest(), base_key))
def test_encode_tile_key(self):
"""Test encoding an object key"""
b = BossBackend(self.example_config_data)
b.setup(self.api_token)
params = {"collection": 1,
"experiment": 2,
"channel": 3,
"resolution": 0,
"x_index": 5,
"y_index": 6,
"z_index": 1,
"t_index": 0,
}
proj = [str(params['collection']), str(params['experiment']), str(params['channel'])]
key = b.encode_tile_key(proj,
params['resolution'],
params['x_index'],
params['y_index'],
params['z_index'],
params['t_index'],
)
assert key == six.u("03ca58a12ec662954ac12e06517d4269&1&2&3&0&5&6&1&0")
def test_encode_tile_key(self):
"""Test encoding an object key"""
b = BossBackend(self.example_config_data)
b.setup(self.api_token)
params = {"collection": 1,
"experiment": 2,
"channel": 3,
"resolution": 0,
"x_index": 5,
"y_index": 6,
"z_index": 1,
"t_index": 0,
}
proj = [str(params['collection']), str(params['experiment']), str(params['channel'])]
key = b.encode_tile_key(proj,
params['resolution'],
params['x_index'],
params['y_index'],
params['z_index'],
params['t_index'],
)
assert key == six.u("03ca58a12ec662954ac12e06517d4269&1&2&3&0&5&6&1&0")
def test_task_init(self):
task = self.RandomTask(string=six.u('XYZ'), integer=10)
self.assertEqual(task.Input.string, six.u('XYZ'))
self.assertEqual(task.Input.integer, 10)
task.set_input(string=six.u('DEF'), integer=20)
self.assertEqual(task.Input.string, six.u('DEF'))
self.assertEqual(task.Input.integer, 20)
def test_task_fields(self):
task = self.RandomTask()
self.assertEqual(task.Input.string, six.u('ABCD'))
task.Input.string = six.u('XYZ')
self.assertEqual(task.Input.string, six.u('XYZ'))
with self.assertRaises(InvalidValueException):
task.Input.string = 10
self.assertEqual(task.Input.integer, 5)
task.Input.integer = 10
self.assertEqual(task.Input.integer, 10)
with self.assertRaises(InvalidValueException):
task.Input.integer = 'ABCD'
self.assertEqual(task.Output.floating, 1.5)
task.Output.floating = 5.5
self.assertEqual(task.Output.floating, 5.5)
with self.assertRaises(InvalidValueException):
task.Output.floating = 'ABCD'
task2 = self.RandomTask()
self.assertEqual(task2.Input.string, 'ABCD')
task2.Input.string = six.u('123')
self.assertEqual(task2.Input.string, six.u('123'))
# Test if the tasks share the same fields
self.assertEqual(task.Input.string, six.u('XYZ'))
# Check if it's cloned
self.assertNotEqual(task.Input.instream, task2.Input.instream)
def test_task_serialize(self):
task = self.RandomTask()
serialized = {
'ID': 'random',
'Input': {
'string': six.u('ABCD'),
'integer': 5,
'integer_array': [],
'anyf': six.u(base64.b64encode(pickle.dumps({1: 2})))
},
'Output': {
'floating': 1.5,
'none': None
}
}
self.assertEqual(task.serialize(), serialized)
task = self.RandomTask()
task.Input.string = six.u('XYZ')
task.Input.integer = 10
task.Input.integer_array = [1, 2, 3]
task.Input.anyf = ['hello', 'world']
task.Output.floating = 2.5
serialized = {
'ID': 'random',
'Input': {
'string': six.u('XYZ'),
'integer': 10,
'integer_array': [1, 2, 3],
'anyf': six.u(base64.b64encode(pickle.dumps(['hello', 'world'])))
},
'Output': {
'floating': 2.5,
'none': None
}
}
self.assertEqual(task.serialize(), serialized)
def test_task_deserialize(self):
serialized = {
'ID': 'random',
'Input': {
'string': six.u('ABCD'),
'integer': 5,
'integer_array': [1, 2, 3],
'anyf': six.u(base64.b64encode(pickle.dumps(100)))
},
'Output': {
'floating': 1.5,
'none': None
}
}
instance = Task.deserialize(data=serialized)
self.assertTrue(isinstance(instance, self.RandomTask))
self.assertEqual(instance.Input.string, six.u('ABCD'))
self.assertEqual(instance.Input.integer, 5)
self.assertEqual(instance.Output.floating, 1.5)
serialized = {
'ID': 'random',
'Input': {
'string': six.u('XYZ'),
'integer': 10
},
'Output': {
'floating': 2.5,
'none': None
}
}
instance = Task.deserialize(data=serialized)
self.assertTrue(isinstance(instance, self.RandomTask))
self.assertEqual(instance.Input.string, six.u('XYZ'))
self.assertEqual(instance.Input.integer, 10)
self.assertEqual(instance.Output.floating, 2.5)
def test_string_stream(self):
instance = StringStreamField().create()
with self.assertRaises(StreamNotAvailableException):
instance.write(six.u('hello'))
with self.assertRaises(StreamNotAvailableException):
instance.read()
class DummyStream(object):
def __init__(self):
self.buffer = six.u('')
def read(self, n=-1):
if n == -1:
n = len(self.buffer)
result = self.buffer[:n]
self.buffer = self.buffer[n:]
return result
def write(self, data):
self.buffer += data
instance.set_real_stream(DummyStream())
with self.assertRaises(InvalidValueException):
instance.write(six.b('hello'))
instance.write(six.u('hello'))
self.assertEqual(instance.read(), six.u('hello'))
def test_byte_stream(self):
instance = ByteStreamField().create()
with self.assertRaises(StreamNotAvailableException):
instance.write(six.u('hello'))
with self.assertRaises(StreamNotAvailableException):
instance.read()
class DummyStream(object):
def __init__(self):
self.buffer = six.b('')
def read(self, n=-1):
if n == -1:
n = len(self.buffer)
result = self.buffer[:n]
self.buffer = self.buffer[n:]
return result
def write(self, data):
self.buffer += data
instance.set_real_stream(DummyStream())
with self.assertRaises(InvalidValueException):
instance.write(six.u('hello'))
instance.write(six.b('hello'))
self.assertEqual(instance.read(), six.b('hello'))
with self.assertRaises(NotImplementedError):
StreamField().create() # Test for coverage :)
def to_son(self, value):
value = decimal.Decimal(value)
return six.u(str(value.quantize(self.precision, rounding=self.rounding)))
def ints_to_string(ints):
"""Convert a list of integers to a *|* separated string.
Args:
ints (list[int]|int): List of integer items to convert or single
integer to convert.
Returns:
str: Formatted string
"""
if not isinstance(ints, list):
return six.u(str(ints))
return '|'.join(six.u(str(l)) for l in ints)
def format(self, entry):
message = entry.message
timestamp = entry.timestamp.to("local")
source = entry.message_dict.get("source")
facility = entry.message_dict.get("facility")
custom_fields = list(self.fields)
log_level = LogLevel.find_by_syslog_code(entry.level)
args = {
'timestamp': timestamp.format(utils.DEFAULT_DATE_FORMAT),
'level': log_level['name'],
'message': self.encode_message(message),
'source': source or '',
'facility': facility or ''
}
for field in custom_fields:
if field not in self.DEFAULT_FIELDS:
args[field] = entry.message_dict.get(field, '')
log = six.u(self.format_template).format(**args)
if self.color:
return colored(log, log_level['color'], log_level['bg_color'])
else:
return log
_test_unittest2_with.py 文件源码
项目:devsecops-example-helloworld
作者: boozallen
项目源码
文件源码
阅读 28
收藏 0
点赞 0
评论 0
def test_assert_dict_unicode_error(self):
with catch_warnings(record=True):
# This causes a UnicodeWarning due to its craziness
one = ''.join(chr(i) for i in range(255))
# this used to cause a UnicodeDecodeError constructing the failure msg
with self.assertRaises(self.failureException):
self.assertDictContainsSubset({'foo': one}, {'foo': u('\uFFFD')})
_test_unittest2_with.py 文件源码
项目:devsecops-example-helloworld
作者: boozallen
项目源码
文件源码
阅读 29
收藏 0
点赞 0
评论 0
def test_formatMessage_unicode_error(self):
with catch_warnings(record=True):
# This causes a UnicodeWarning due to its craziness
one = ''.join(chr(i) for i in range(255))
# this used to cause a UnicodeDecodeError constructing msg
self._formatMessage(one, u('\uFFFD'))
def test_pickle_unpickle(self):
# Issue #7197: a TextTestRunner should be (un)pickleable. This is
# required by test_multiprocessing under Windows (in verbose mode).
stream = StringIO(u("foo"))
runner = unittest2.TextTestRunner(stream)
for protocol in range(2, pickle.HIGHEST_PROTOCOL + 1):
s = pickle.dumps(runner, protocol=protocol)
obj = pickle.loads(s)
# StringIO objects never compare equal, a cheap test instead.
self.assertEqual(obj.stream.getvalue(), stream.getvalue())
def testAssertRaisesRegex(self):
class ExceptionMock(Exception):
pass
def Stub():
raise ExceptionMock('We expect')
self.assertRaisesRegex(ExceptionMock, re.compile('expect$'), Stub)
self.assertRaisesRegex(ExceptionMock, 'expect$', Stub)
self.assertRaisesRegex(ExceptionMock, u('expect$'), Stub)
with self.assertWarns(DeprecationWarning):
self.assertRaisesRegex(ExceptionMock, 'expect$', None)
def writeln(self, arg=None):
if arg:
self.write(arg)
self.write(u('\n')) # text-mode streams translate to \r\n if needed
def addUnexpectedSuccess(self, test):
super(TextTestResult, self).addUnexpectedSuccess(test)
if self.showAll:
self.stream.writeln("unexpected success")
elif self.dots:
self.stream.write("u")
self.stream.flush()
def test_format_exception_only_undecodable__str__(self):
# This won't decode via the ascii codec.
X = Exception(u('\u5341').encode('shift-jis'))
err = traceback.format_exception_only(type(X), X)
self.assertEqual(len(err), 1)
str_value = "b'\\x8f\\\\'"
self.assertEqual(err[0], "Exception: %s\n" % str_value)