def test_decode_argument(self):
# These urls all decode to the same thing
urls = ["/decode_arg/%C3%A9?foo=%C3%A9&encoding=utf-8",
"/decode_arg/%E9?foo=%E9&encoding=latin1",
"/decode_arg_kw/%E9?foo=%E9&encoding=latin1",
]
for req_url in urls:
response = self.fetch(req_url)
response.rethrow()
data = json_decode(response.body)
self.assertEqual(data, {u('path'): [u('unicode'), u('\u00e9')],
u('query'): [u('unicode'), u('\u00e9')],
})
response = self.fetch("/decode_arg/%C3%A9?foo=%C3%A9")
response.rethrow()
data = json_decode(response.body)
self.assertEqual(data, {u('path'): [u('bytes'), u('c3a9')],
u('query'): [u('bytes'), u('c3a9')],
})
python类json_decode()的实例源码
def test_multipart_form(self):
# Encodings here are tricky: Headers are latin1, bodies can be
# anything (we use utf8 by default).
response = self.raw_fetch([
b"POST /multipart HTTP/1.0",
b"Content-Type: multipart/form-data; boundary=1234567890",
b"X-Header-encoding-test: \xe9",
],
b"\r\n".join([
b"Content-Disposition: form-data; name=argument",
b"",
u("\u00e1").encode("utf-8"),
b"--1234567890",
u('Content-Disposition: form-data; name="files"; filename="\u00f3"').encode("utf8"),
b"",
u("\u00fa").encode("utf-8"),
b"--1234567890--",
b"",
]))
data = json_decode(response)
self.assertEqual(u("\u00e9"), data["header"])
self.assertEqual(u("\u00e1"), data["argument"])
self.assertEqual(u("\u00f3"), data["filename"])
self.assertEqual(u("\u00fa"), data["filebody"])
def test_chunked_request_body(self):
# Chunked requests are not widely supported and we don't have a way
# to generate them in AsyncHTTPClient, but HTTPServer will read them.
self.stream.write(b"""\
POST /echo HTTP/1.1
Transfer-Encoding: chunked
Content-Type: application/x-www-form-urlencoded
4
foo=
3
bar
0
""".replace(b"\n", b"\r\n"))
read_stream_body(self.stream, self.stop)
headers, response = self.wait()
self.assertEqual(json_decode(response), {u('foo'): [u('bar')]})
def test_decode_argument(self):
# These urls all decode to the same thing
urls = ["/decode_arg/%C3%A9?foo=%C3%A9&encoding=utf-8",
"/decode_arg/%E9?foo=%E9&encoding=latin1",
"/decode_arg_kw/%E9?foo=%E9&encoding=latin1",
]
for req_url in urls:
response = self.fetch(req_url)
response.rethrow()
data = json_decode(response.body)
self.assertEqual(data, {u('path'): [u('unicode'), u('\u00e9')],
u('query'): [u('unicode'), u('\u00e9')],
})
response = self.fetch("/decode_arg/%C3%A9?foo=%C3%A9")
response.rethrow()
data = json_decode(response.body)
self.assertEqual(data, {u('path'): [u('bytes'), u('c3a9')],
u('query'): [u('bytes'), u('c3a9')],
})
def test_get_argument(self):
response = self.fetch("/get_argument?foo=bar")
self.assertEqual(response.body, b"bar")
response = self.fetch("/get_argument?foo=")
self.assertEqual(response.body, b"")
response = self.fetch("/get_argument")
self.assertEqual(response.body, b"default")
# Test merging of query and body arguments.
# In singular form, body arguments take precedence over query arguments.
body = urllib_parse.urlencode(dict(foo="hello"))
response = self.fetch("/get_argument?foo=bar", method="POST", body=body)
self.assertEqual(response.body, b"hello")
# In plural methods they are merged.
response = self.fetch("/get_arguments?foo=bar",
method="POST", body=body)
self.assertEqual(json_decode(response.body),
dict(default=['bar', 'hello'],
query=['bar'],
body=['hello']))
def test_decode_argument(self):
# These urls all decode to the same thing
urls = ["/decode_arg/%C3%A9?foo=%C3%A9&encoding=utf-8",
"/decode_arg/%E9?foo=%E9&encoding=latin1",
"/decode_arg_kw/%E9?foo=%E9&encoding=latin1",
]
for req_url in urls:
response = self.fetch(req_url)
response.rethrow()
data = json_decode(response.body)
self.assertEqual(data, {u('path'): [u('unicode'), u('\u00e9')],
u('query'): [u('unicode'), u('\u00e9')],
})
response = self.fetch("/decode_arg/%C3%A9?foo=%C3%A9")
response.rethrow()
data = json_decode(response.body)
self.assertEqual(data, {u('path'): [u('bytes'), u('c3a9')],
u('query'): [u('bytes'), u('c3a9')],
})
def test_get_argument(self):
response = self.fetch("/get_argument?foo=bar")
self.assertEqual(response.body, b"bar")
response = self.fetch("/get_argument?foo=")
self.assertEqual(response.body, b"")
response = self.fetch("/get_argument")
self.assertEqual(response.body, b"default")
# Test merging of query and body arguments.
# In singular form, body arguments take precedence over query arguments.
body = urllib_parse.urlencode(dict(foo="hello"))
response = self.fetch("/get_argument?foo=bar", method="POST", body=body)
self.assertEqual(response.body, b"hello")
# In plural methods they are merged.
response = self.fetch("/get_arguments?foo=bar",
method="POST", body=body)
self.assertEqual(json_decode(response.body),
dict(default=['bar', 'hello'],
query=['bar'],
body=['hello']))
def test_multipart_form(self):
# Encodings here are tricky: Headers are latin1, bodies can be
# anything (we use utf8 by default).
response = self.raw_fetch([
b"POST /multipart HTTP/1.0",
b"Content-Type: multipart/form-data; boundary=1234567890",
b"X-Header-encoding-test: \xe9",
],
b"\r\n".join([
b"Content-Disposition: form-data; name=argument",
b"",
u("\u00e1").encode("utf-8"),
b"--1234567890",
u('Content-Disposition: form-data; name="files"; filename="\u00f3"').encode("utf8"),
b"",
u("\u00fa").encode("utf-8"),
b"--1234567890--",
b"",
]))
data = json_decode(response)
self.assertEqual(u("\u00e9"), data["header"])
self.assertEqual(u("\u00e1"), data["argument"])
self.assertEqual(u("\u00f3"), data["filename"])
self.assertEqual(u("\u00fa"), data["filebody"])
def test_chunked_request_body(self):
# Chunked requests are not widely supported and we don't have a way
# to generate them in AsyncHTTPClient, but HTTPServer will read them.
self.stream.write(b"""\
POST /echo HTTP/1.1
Transfer-Encoding: chunked
Content-Type: application/x-www-form-urlencoded
4
foo=
3
bar
0
""".replace(b"\n", b"\r\n"))
read_stream_body(self.stream, self.stop)
headers, response = self.wait()
self.assertEqual(json_decode(response), {u('foo'): [u('bar')]})
def test_get_argument(self):
response = self.fetch("/get_argument?foo=bar")
self.assertEqual(response.body, b"bar")
response = self.fetch("/get_argument?foo=")
self.assertEqual(response.body, b"")
response = self.fetch("/get_argument")
self.assertEqual(response.body, b"default")
# Test merging of query and body arguments.
# In singular form, body arguments take precedence over query arguments.
body = urllib_parse.urlencode(dict(foo="hello"))
response = self.fetch("/get_argument?foo=bar", method="POST", body=body)
self.assertEqual(response.body, b"hello")
# In plural methods they are merged.
response = self.fetch("/get_arguments?foo=bar",
method="POST", body=body)
self.assertEqual(json_decode(response.body),
dict(default=['bar', 'hello'],
query=['bar'],
body=['hello']))
def test_username_query(self):
username = "bobsmith"
positive_query = 'bobsm'
negative_query = 'nancy'
async with self.pool.acquire() as con:
await con.execute("INSERT INTO users (username, toshi_id) VALUES ($1, $2)", username, TEST_ADDRESS)
resp = await self.fetch("/search/user?query={}".format(positive_query), method="GET")
self.assertEqual(resp.code, 200)
body = json_decode(resp.body)
self.assertEqual(len(body['results']), 1)
# ensure we got a tracking event
self.assertEqual((await self.next_tracking_event())[0], None)
resp = await self.fetch("/search/user?query={}".format(negative_query), method="GET")
self.assertEqual(resp.code, 200)
body = json_decode(resp.body)
self.assertEqual(len(body['results']), 0)
# ensure we got a tracking event
self.assertEqual((await self.next_tracking_event())[0], None)
def test_username_query_sql_inject_attampt(self):
username = "bobsmith"
inject_attempt = quote_plus("x'; delete from users; select * from users")
async with self.pool.acquire() as con:
await con.execute("INSERT INTO users (username, toshi_id) VALUES ($1, $2)", username, TEST_ADDRESS)
resp = await self.fetch("/search/user?query={}".format(inject_attempt), method="GET")
self.assertEqual(resp.code, 200)
body = json_decode(resp.body)
self.assertEqual(len(body['results']), 0)
async with self.pool.acquire() as con:
row = await con.fetchrow("SELECT COUNT(*) AS count FROM users")
self.assertEqual(row['count'], 1)
def test_underscore_username_query(self):
async with self.pool.acquire() as con:
await con.execute("INSERT INTO users (username, name, toshi_id) VALUES ($1, $2, $3)",
"wager_weight", "Wager Weight", "0x0000000000000000000000000000000000000001")
await con.execute("INSERT INTO users (username, name, toshi_id) VALUES ($1, $2, $3)",
"bob_smith", "Robert", "0x0000000000000000000000000000000000000002")
await con.execute("INSERT INTO users (username, name, toshi_id) VALUES ($1, $2, $3)",
"bob_jack", "Jackie", "0x0000000000000000000000000000000000000003")
await con.execute("INSERT INTO users (username, name, toshi_id) VALUES ($1, $2, $3)",
"user1234", "user1234", "0x0000000000000000000000000000000000000004")
for positive_query in ["wager", "wager_we", "wager_weight", "bob_smi"]:
resp = await self.fetch("/search/user?query={}".format(positive_query), method="GET")
self.assertEqual(resp.code, 200)
body = json_decode(resp.body)
self.assertEqual(len(body['results']), 1, "Failed to get match for search query: {}".format(positive_query))
for negative_query in ["wager_foo", "bob_bar", "1234", "bobsmith"]:
resp = await self.fetch("/search/user?query={}".format(negative_query), method="GET")
self.assertEqual(resp.code, 200)
body = json_decode(resp.body)
self.assertEqual(len(body['results']), 0, "got unexpected match for search query: {}".format(negative_query))
def test_inactive_username_query(self):
username = "bobsmith"
positive_query = 'bobsm'
async with self.pool.acquire() as con:
await con.execute("INSERT INTO users (username, toshi_id, active) VALUES ($1, $2, false)", username, TEST_ADDRESS)
resp = await self.fetch("/search/user?query={}".format(positive_query), method="GET")
self.assertEqual(resp.code, 200)
body = json_decode(resp.body)
self.assertEqual(len(body['results']), 0)
resp = await self.fetch_signed("/user", signing_key=TEST_PRIVATE_KEY, method="PUT", body={
"payment_address": TEST_ADDRESS
})
self.assertResponseCodeEqual(resp, 200)
resp = await self.fetch("/search/user?query={}".format(positive_query), method="GET")
self.assertEqual(resp.code, 200)
body = json_decode(resp.body)
self.assertEqual(len(body['results']), 1)
def test_create_user(self):
resp = await self.fetch("/timestamp")
self.assertEqual(resp.code, 200)
resp = await self.fetch_signed("/user", signing_key=TEST_PRIVATE_KEY, method="POST",
body={'payment_address': TEST_PAYMENT_ADDRESS})
self.assertResponseCodeEqual(resp, 200)
body = json_decode(resp.body)
self.assertEqual(body['toshi_id'], TEST_ADDRESS)
async with self.pool.acquire() as con:
row = await con.fetchrow("SELECT * FROM users WHERE toshi_id = $1", TEST_ADDRESS)
self.assertIsNotNone(row)
self.assertFalse(row['is_app'])
self.assertIsNotNone(row['username'])
# ensure we got a tracking event
self.assertEqual((await self.next_tracking_event())[0], encode_id(TEST_ADDRESS))
def test_create_app_user(self):
resp = await self.fetch("/timestamp")
self.assertEqual(resp.code, 200)
resp = await self.fetch_signed("/user", signing_key=TEST_PRIVATE_KEY, method="POST",
body={"is_app": True, 'payment_address': TEST_PAYMENT_ADDRESS})
self.assertResponseCodeEqual(resp, 200)
body = json_decode(resp.body)
self.assertEqual(body['toshi_id'], TEST_ADDRESS)
async with self.pool.acquire() as con:
row = await con.fetchrow("SELECT * FROM users WHERE toshi_id = $1", TEST_ADDRESS)
self.assertIsNotNone(row)
self.assertTrue(row['is_app'])
# ensure we got a tracking event
self.assertEqual((await self.next_tracking_event())[0], encode_id(TEST_ADDRESS))
def test_set_unknown_categories_fails(self):
username = "toshibot"
name = "ToshiBot"
categories = await self.setup_categories()
async with self.pool.acquire() as con:
await con.execute("INSERT INTO users (username, toshi_id, name, is_app, is_public) VALUES ($1, $2, $3, true, true)",
username, TEST_ADDRESS, name)
resp = await self.fetch_signed("/user", signing_key=TEST_PRIVATE_KEY, method="PUT", body={
"categories": [categories[-1][0] + 10, 'badcat']
})
self.assertResponseCodeEqual(resp, 400)
resp = await self.fetch("/user/{}".format(TEST_ADDRESS))
self.assertResponseCodeEqual(resp, 200)
body = json_decode(resp.body)
self.assertIn("categories", body)
self.assertEqual(len(body['categories']), 0)
def test_app_underscore_username_query(self):
async with self.pool.acquire() as con:
await con.execute("INSERT INTO users (username, name, toshi_id, is_app, is_public) VALUES ($1, $2, $3, true, true)",
"wager_weight", "Wager Weight", "0x0000000000000000000000000000000000000001")
await con.execute("INSERT INTO users (username, name, toshi_id, is_app, is_public) VALUES ($1, $2, $3, true, true)",
"bob_smith", "Robert", "0x0000000000000000000000000000000000000002")
await con.execute("INSERT INTO users (username, name, toshi_id, is_app, is_public) VALUES ($1, $2, $3, true, true)",
"bob_jack", "Jackie", "0x0000000000000000000000000000000000000003")
await con.execute("INSERT INTO users (username, name, toshi_id, is_app, is_public) VALUES ($1, $2, $3, true, true)",
"user1234", "user1234", "0x0000000000000000000000000000000000000004")
for positive_query in ["wager", "wager_we", "wager_weight", "bob_smi"]:
resp = await self.fetch("/search/apps?query={}".format(positive_query), method="GET")
self.assertEqual(resp.code, 200)
body = json_decode(resp.body)
self.assertEqual(len(body['results']), 1, "Failed to get match for search query: {}".format(positive_query))
for negative_query in ["wager_foo", "bob_bar", "1234", "bobsmith"]:
resp = await self.fetch("/search/apps?query={}".format(negative_query), method="GET")
self.assertEqual(resp.code, 200)
body = json_decode(resp.body)
self.assertEqual(len(body['results']), 0, "got unexpected match for search query: {}".format(negative_query))
def fetch_definitions(self, term):
url = url_concat(UrbanDictionary.DEFINE_URL, dict(term=term))
request = HTTPRequest(
url = url,
headers = {
'X-Mashape-Key': self.api_key,
'Accept' : 'text/plain'
}
)
tornado_future = self.client.fetch(request)
future = to_asyncio_future(tornado_future)
response = await future
data = json_decode(response.body)
return data['list']
def access_token(self, code, state):
client = AsyncHTTPClient()
payload = (
('client_id', self.client_id),
('client_secret', self.client_secret),
('grant_type', 'authorization_code'),
('redirect_uri', Twitch.REDIRECT_URI),
('code', code),
('state', state),
)
url = Twitch.TOKEN_URL
request = HTTPRequest(
url = url,
method = 'POST',
body = urlencode(payload)
)
tornado_future = client.fetch(request)
future = to_asyncio_future(tornado_future)
response = await future
data = json_decode(response.body)
return data['access_token']
def test_username_query(self):
username = "TokenBot"
positive_query = 'enb'
negative_query = 'TickleFight'
async with self.pool.acquire() as con:
await con.execute("INSERT INTO apps (name, token_id) VALUES ($1, $2)", username, TEST_ADDRESS)
await con.execute("INSERT INTO sofa_manifests (token_id, payment_address) VALUES ($1, $2)", TEST_ADDRESS, TEST_ADDRESS)
resp = await self.fetch("/search/apps?query={}".format(positive_query), method="GET")
self.assertEqual(resp.code, 200)
body = json_decode(resp.body)
self.assertEqual(len(body['apps']), 1)
resp = await self.fetch("/search/apps?query={}".format(negative_query), method="GET")
self.assertEqual(resp.code, 200)
body = json_decode(resp.body)
self.assertEqual(len(body['apps']), 0)
def save_term_settings(term, location, session, settings):
"""
Saves the *settings* associated with the given *term*, *location*, and
*session* in the 'term_settings.json' file inside the user's session
directory.
When complete the given *callback* will be called (if given).
"""
if not session:
return # Just a viewer of a broadcast terminal
term = str(term) # JSON wants strings as keys
term_settings = RUDict()
term_settings[location] = {term: settings}
session_dir = os.path.join(getsettings('BASE_DIR'), 'sessions')
session_dir = os.path.join(session_dir, session)
settings_path = os.path.join(session_dir, 'term_settings.json')
# First we read in the existing settings and then update them.
if os.path.exists(settings_path):
with io.open(settings_path, encoding='utf-8') as f:
term_settings.update(json_decode(f.read()))
term_settings[location][term].update(settings)
with io.open(settings_path, 'w', encoding='utf-8') as f:
f.write(json_encode(term_settings))
def get_log_metadata(golog_path):
"""
Returns the metadata from the log at the given *golog_path* in the form of
a dict.
"""
metadata = {}
if not os.path.getsize(golog_path): # 0 bytes
return metadata # Nothing to do
try:
first_frame, distance = retrieve_first_frame(golog_path)
except IOError:
# Something wrong with the log... Probably still being written to
return metadata
if first_frame[14:].startswith('{'):
# This is JSON, capture metadata
metadata = json_decode(first_frame[14:])
return metadata # All done
def get_current_user(self):
"""
Mostly identical to the function of the same name in MainHandler. The
difference being that when API authentication is enabled the WebSocket
will expect and perform its own auth of the client.
"""
expiration = self.settings.get('auth_timeout', "14d")
# Need the expiration in days (which is a bit silly but whatever):
expiration = (
float(total_seconds(convert_to_timedelta(expiration)))
/ float(86400))
#user_json = self.get_secure_cookie(
#"gateone_user", max_age_days=expiration)
user_json = self.message.http_session.get('gateone_user',None)
#print 'user_json',user_json
#user_json {"upn": "ANONYMOUS", "session": "YmM5MDU5MDgyYmVjNDU0M2E5MDMzYTg5NWMzZTI5YTBkN"}
if not user_json:
if not self.settings['auth']:
# This can happen if the user's browser isn't allowing
# persistent cookies (e.g. incognito mode)
return {'upn': 'ANONYMOUS', 'session': generate_session_id()}
return None
user = json_decode(user_json)
#user['ip_address'] = self.request.remote_ip
return user
def clear_term_settings(self, term):
"""
Removes any settings associated with the given *term* in the user's
term_settings.json file (in their session directory).
"""
term = str(term)
self.term_log.debug("clear_term_settings(%s)" % term)
term_settings = RUDict()
term_settings[self.ws.location] = {term: {}}
#session_dir = options.session_dir
session_dir = self.settings['session_dir']
session_dir = os.path.join(session_dir, self.ws.session)
settings_path = os.path.join(session_dir, 'term_settings.json')
if not os.path.exists(settings_path):
return # Nothing to do
# First we read in the existing settings and then update them.
if os.path.exists(settings_path):
with io.open(settings_path, encoding='utf-8') as f:
term_settings.update(json_decode(f.read()))
del term_settings[self.ws.location][term]
with io.open(settings_path, 'w', encoding='utf-8') as f:
f.write(json_encode(term_settings))
self.trigger("terminal:clear_term_settings", term)
#@require(authenticated(), policies('terminal'))
httpserver_test.py 文件源码
项目:My-Web-Server-Framework-With-Python2.7
作者: syjsu
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def test_multipart_form(self):
# Encodings here are tricky: Headers are latin1, bodies can be
# anything (we use utf8 by default).
response = self.raw_fetch([
b"POST /multipart HTTP/1.0",
b"Content-Type: multipart/form-data; boundary=1234567890",
b"X-Header-encoding-test: \xe9",
],
b"\r\n".join([
b"Content-Disposition: form-data; name=argument",
b"",
u("\u00e1").encode("utf-8"),
b"--1234567890",
u('Content-Disposition: form-data; name="files"; filename="\u00f3"').encode("utf8"),
b"",
u("\u00fa").encode("utf-8"),
b"--1234567890--",
b"",
]))
data = json_decode(response)
self.assertEqual(u("\u00e9"), data["header"])
self.assertEqual(u("\u00e1"), data["argument"])
self.assertEqual(u("\u00f3"), data["filename"])
self.assertEqual(u("\u00fa"), data["filebody"])
httpserver_test.py 文件源码
项目:My-Web-Server-Framework-With-Python2.7
作者: syjsu
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def test_chunked_request_body(self):
# Chunked requests are not widely supported and we don't have a way
# to generate them in AsyncHTTPClient, but HTTPServer will read them.
self.stream.write(b"""\
POST /echo HTTP/1.1
Transfer-Encoding: chunked
Content-Type: application/x-www-form-urlencoded
4
foo=
3
bar
0
""".replace(b"\n", b"\r\n"))
read_stream_body(self.stream, self.stop)
headers, response = self.wait()
self.assertEqual(json_decode(response), {u('foo'): [u('bar')]})
def test_decode_argument(self):
# These urls all decode to the same thing
urls = ["/decode_arg/%C3%A9?foo=%C3%A9&encoding=utf-8",
"/decode_arg/%E9?foo=%E9&encoding=latin1",
"/decode_arg_kw/%E9?foo=%E9&encoding=latin1",
]
for req_url in urls:
response = self.fetch(req_url)
response.rethrow()
data = json_decode(response.body)
self.assertEqual(data, {u('path'): [u('unicode'), u('\u00e9')],
u('query'): [u('unicode'), u('\u00e9')],
})
response = self.fetch("/decode_arg/%C3%A9?foo=%C3%A9")
response.rethrow()
data = json_decode(response.body)
self.assertEqual(data, {u('path'): [u('bytes'), u('c3a9')],
u('query'): [u('bytes'), u('c3a9')],
})
def test_get_argument(self):
response = self.fetch("/get_argument?foo=bar")
self.assertEqual(response.body, b"bar")
response = self.fetch("/get_argument?foo=")
self.assertEqual(response.body, b"")
response = self.fetch("/get_argument")
self.assertEqual(response.body, b"default")
# Test merging of query and body arguments.
# In singular form, body arguments take precedence over query arguments.
body = urllib_parse.urlencode(dict(foo="hello"))
response = self.fetch("/get_argument?foo=bar", method="POST", body=body)
self.assertEqual(response.body, b"hello")
# In plural methods they are merged.
response = self.fetch("/get_arguments?foo=bar",
method="POST", body=body)
self.assertEqual(json_decode(response.body),
dict(default=['bar', 'hello'],
query=['bar'],
body=['hello']))
def graphql_request(self):
return json_decode(self.request.body)