def handle_read(self, data):
logging.info("handle_read")
data = to_unicode(data)
if data == data.upper():
self.stream.write(b"error\talready capitalized\n")
else:
# data already has \n
self.stream.write(utf8("ok\t%s" % data.upper()))
self.stream.close()
python类to_unicode()的实例源码
def process_response(self, data):
status, message = re.match('(.*)\t(.*)\n', to_unicode(data)).groups()
if status == 'ok':
return message
else:
raise CapError(message)
def _handle_message(self, opcode, data):
if self.client_terminated:
return
if self._frame_compressed:
data = self._decompressor.decompress(data)
if opcode == 0x1:
# UTF-8 data
self._message_bytes_in += len(data)
try:
decoded = data.decode("utf-8")
except UnicodeDecodeError:
self._abort()
return
self._run_callback(self.handler.on_message, decoded)
elif opcode == 0x2:
# Binary data
self._message_bytes_in += len(data)
self._run_callback(self.handler.on_message, data)
elif opcode == 0x8:
# Close
self.client_terminated = True
if len(data) >= 2:
self.handler.close_code = struct.unpack('>H', data[:2])[0]
if len(data) > 2:
self.handler.close_reason = to_unicode(data[2:])
# Echo the received close code, if any (RFC 6455 section 5.5.1).
self.close(self.handler.close_code)
elif opcode == 0x9:
# Ping
self._write_frame(True, 0xA, data)
elif opcode == 0xA:
# Pong
self._run_callback(self.handler.on_pong, data)
else:
self._abort()
def test_bytes_apply(self):
def upper(s):
return utf8(to_unicode(s).upper())
template = Template(utf8(u("{% apply upper %}foo \u00e9{% end %}")))
self.assertEqual(template.generate(upper=upper), utf8(u("FOO \u00c9")))
def test_utf8_in_file(self):
tmpl = self.loader.load("utf8.html")
result = tmpl.generate()
self.assertEqual(to_unicode(result).strip(), u("H\u00e9llo"))
def decode_argument(self, value, name=None):
if type(value) != bytes:
raise Exception("unexpected type for value: %r" % type(value))
# use self.request.arguments directly to avoid recursion
if 'encoding' in self.request.arguments:
return value.decode(to_unicode(self.request.arguments['encoding'][0]))
else:
return value
def test_types(self):
cookie_value = to_unicode(create_signed_value(self.COOKIE_SECRET,
"asdf", "qwer"))
response = self.fetch("/typecheck/asdf?foo=bar",
headers={"Cookie": "asdf=" + cookie_value})
data = json_decode(response.body)
self.assertEqual(data, {})
response = self.fetch("/typecheck/asdf?foo=bar", method="POST",
headers={"Cookie": "asdf=" + cookie_value},
body="foo=bar")
def test_url_unescape_unicode(self):
tests = [
('%C3%A9', u('\u00e9'), 'utf8'),
('%C3%A9', u('\u00c3\u00a9'), 'latin1'),
('%C3%A9', utf8(u('\u00e9')), None),
]
for escaped, unescaped, encoding in tests:
# input strings to url_unescape should only contain ascii
# characters, but make sure the function accepts both byte
# and unicode strings.
self.assertEqual(url_unescape(to_unicode(escaped), encoding), unescaped)
self.assertEqual(url_unescape(utf8(escaped), encoding), unescaped)
def process_response(self, data):
status, message = re.match('(.*)\t(.*)\n', to_unicode(data)).groups()
if status == 'ok':
return message
else:
raise CapError(message)
def _handle_message(self, opcode, data):
if self.client_terminated:
return
if self._frame_compressed:
data = self._decompressor.decompress(data)
if opcode == 0x1:
# UTF-8 data
self._message_bytes_in += len(data)
try:
decoded = data.decode("utf-8")
except UnicodeDecodeError:
self._abort()
return
self._run_callback(self.handler.on_message, decoded)
elif opcode == 0x2:
# Binary data
self._message_bytes_in += len(data)
self._run_callback(self.handler.on_message, data)
elif opcode == 0x8:
# Close
self.client_terminated = True
if len(data) >= 2:
self.handler.close_code = struct.unpack('>H', data[:2])[0]
if len(data) > 2:
self.handler.close_reason = to_unicode(data[2:])
# Echo the received close code, if any (RFC 6455 section 5.5.1).
self.close(self.handler.close_code)
elif opcode == 0x9:
# Ping
self._write_frame(True, 0xA, data)
elif opcode == 0xA:
# Pong
self._run_callback(self.handler.on_pong, data)
else:
self._abort()
def test_unicode_apply(self):
def upper(s):
return to_unicode(s).upper()
template = Template(utf8(u("{% apply upper %}foo \u00e9{% end %}")))
self.assertEqual(template.generate(upper=upper), utf8(u("FOO \u00c9")))
def test_utf8_in_file(self):
tmpl = self.loader.load("utf8.html")
result = tmpl.generate()
self.assertEqual(to_unicode(result).strip(), u("H\u00e9llo"))
def decode_argument(self, value, name=None):
if type(value) != bytes:
raise Exception("unexpected type for value: %r" % type(value))
# use self.request.arguments directly to avoid recursion
if 'encoding' in self.request.arguments:
return value.decode(to_unicode(self.request.arguments['encoding'][0]))
else:
return value
def test_types(self):
cookie_value = to_unicode(create_signed_value(self.COOKIE_SECRET,
"asdf", "qwer"))
response = self.fetch("/typecheck/asdf?foo=bar",
headers={"Cookie": "asdf=" + cookie_value})
data = json_decode(response.body)
self.assertEqual(data, {})
response = self.fetch("/typecheck/asdf?foo=bar", method="POST",
headers={"Cookie": "asdf=" + cookie_value},
body="foo=bar")
def test_url_unescape_unicode(self):
tests = [
('%C3%A9', u('\u00e9'), 'utf8'),
('%C3%A9', u('\u00c3\u00a9'), 'latin1'),
('%C3%A9', utf8(u('\u00e9')), None),
]
for escaped, unescaped, encoding in tests:
# input strings to url_unescape should only contain ascii
# characters, but make sure the function accepts both byte
# and unicode strings.
self.assertEqual(url_unescape(to_unicode(escaped), encoding), unescaped)
self.assertEqual(url_unescape(utf8(escaped), encoding), unescaped)
def handle_read(self, data):
logging.info("handle_read")
data = to_unicode(data)
if data == data.upper():
self.stream.write(b"error\talready capitalized\n")
else:
# data already has \n
self.stream.write(utf8("ok\t%s" % data.upper()))
self.stream.close()
def _handle_message(self, opcode, data):
if self.client_terminated:
return
if self._frame_compressed:
data = self._decompressor.decompress(data)
if opcode == 0x1:
# UTF-8 data
self._message_bytes_in += len(data)
try:
decoded = data.decode("utf-8")
except UnicodeDecodeError:
self._abort()
return
self._run_callback(self.handler.on_message, decoded)
elif opcode == 0x2:
# Binary data
self._message_bytes_in += len(data)
self._run_callback(self.handler.on_message, data)
elif opcode == 0x8:
# Close
self.client_terminated = True
if len(data) >= 2:
self.handler.close_code = struct.unpack('>H', data[:2])[0]
if len(data) > 2:
self.handler.close_reason = to_unicode(data[2:])
# Echo the received close code, if any (RFC 6455 section 5.5.1).
self.close(self.handler.close_code)
elif opcode == 0x9:
# Ping
self._write_frame(True, 0xA, data)
elif opcode == 0xA:
# Pong
self._run_callback(self.handler.on_pong, data)
else:
self._abort()
template_test.py 文件源码
项目:My-Web-Server-Framework-With-Python2.7
作者: syjsu
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def test_unicode_apply(self):
def upper(s):
return to_unicode(s).upper()
template = Template(utf8(u("{% apply upper %}foo \u00e9{% end %}")))
self.assertEqual(template.generate(upper=upper), utf8(u("FOO \u00c9")))
template_test.py 文件源码
项目:My-Web-Server-Framework-With-Python2.7
作者: syjsu
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def test_bytes_apply(self):
def upper(s):
return utf8(to_unicode(s).upper())
template = Template(utf8(u("{% apply upper %}foo \u00e9{% end %}")))
self.assertEqual(template.generate(upper=upper), utf8(u("FOO \u00c9")))
template_test.py 文件源码
项目:My-Web-Server-Framework-With-Python2.7
作者: syjsu
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def test_utf8_in_file(self):
tmpl = self.loader.load("utf8.html")
result = tmpl.generate()
self.assertEqual(to_unicode(result).strip(), u("H\u00e9llo"))