def test_handle():
s = HostHttpServer(host)
c = Client(s, Response)
s.start()
r = c.get('/')
assert r.status == '403 FORBIDDEN'
r = c.get('/?ticket=%s' % s.ticket_create())
assert r.status == '200 OK'
r = c.get('/foo/bar/?ticket=%s' % s.ticket_create())
assert r.status == '500 INTERNAL SERVER ERROR'
s.stop()
python类Client()的实例源码
def __init__(self, url, service_impl_cls):
self.url = url_endswith_slash(url)
self.scheme, self.host, self.path, _, _ = urllib.parse.urlsplit(
self.url
)
def null_app(environ, start_response):
start_response('404 Not Found', [('Content-Type', 'text/plain')])
return ['Not Found']
wsgi_app = WsgiApp(service_impl_cls())
if self.path != '/':
self.wsgi_app = DispatcherMiddleware(
null_app, {self.path[:-1]: wsgi_app}
)
else:
self.wsgi_app = wsgi_app
self.wsgi_test_client = Client(self.wsgi_app, Response)
def client(wsgi_app):
app = StandardEndpointMiddleware(wsgi_app)
return Client(app, BaseResponse)
def test_base_request(self):
client = Client(request_demo_app, RequestTestResponse)
# get requests
response = client.get('/?foo=bar&foo=hehe')
self.assert_strict_equal(response['args'], MultiDict([('foo', u'bar'), ('foo', u'hehe')]))
self.assert_strict_equal(response['args_as_list'], [('foo', [u'bar', u'hehe'])])
self.assert_strict_equal(response['form'], MultiDict())
self.assert_strict_equal(response['form_as_list'], [])
self.assert_strict_equal(response['data'], b'')
self.assert_environ(response['environ'], 'GET')
# post requests with form data
response = client.post('/?blub=blah', data='foo=blub+hehe&blah=42',
content_type='application/x-www-form-urlencoded')
self.assert_strict_equal(response['args'], MultiDict([('blub', u'blah')]))
self.assert_strict_equal(response['args_as_list'], [('blub', [u'blah'])])
self.assert_strict_equal(response['form'], MultiDict([('foo', u'blub hehe'), ('blah', u'42')]))
self.assert_strict_equal(response['data'], b'')
# currently we do not guarantee that the values are ordered correctly
# for post data.
## self.assert_strict_equal(response['form_as_list'], [('foo', ['blub hehe']), ('blah', ['42'])])
self.assert_environ(response['environ'], 'POST')
# patch requests with form data
response = client.patch('/?blub=blah', data='foo=blub+hehe&blah=42',
content_type='application/x-www-form-urlencoded')
self.assert_strict_equal(response['args'], MultiDict([('blub', u'blah')]))
self.assert_strict_equal(response['args_as_list'], [('blub', [u'blah'])])
self.assert_strict_equal(response['form'],
MultiDict([('foo', u'blub hehe'), ('blah', u'42')]))
self.assert_strict_equal(response['data'], b'')
self.assert_environ(response['environ'], 'PATCH')
# post requests with json data
json = b'{"foo": "bar", "blub": "blah"}'
response = client.post('/?a=b', data=json, content_type='application/json')
self.assert_strict_equal(response['data'], json)
self.assert_strict_equal(response['args'], MultiDict([('a', u'b')]))
self.assert_strict_equal(response['form'], MultiDict())
def test_ie7_unc_path(self):
client = Client(form_data_consumer, Response)
data_file = join(dirname(__file__), 'multipart', 'ie7_full_path_request.txt')
data = get_contents(data_file)
boundary = '---------------------------7da36d1b4a0164'
response = client.post('/?object=cb_file_upload_multiple', data=data, content_type=
'multipart/form-data; boundary="%s"' % boundary, content_length=len(data))
lines = response.get_data().split(b'\n', 3)
self.assert_strict_equal(lines[0],
repr(u'Sellersburg Town Council Meeting 02-22-2010doc.doc').encode('ascii'))
def test_responder(self):
def foo(environ, start_response):
return BaseResponse(b'Test')
client = Client(wsgi.responder(foo), BaseResponse)
response = client.get('/')
self.assert_equal(response.status_code, 200)
self.assert_equal(response.data, b'Test')
def test_append_slash_redirect(self):
def app(env, sr):
return utils.append_slash_redirect(env)(env, sr)
client = Client(app, BaseResponse)
response = client.get('foo', base_url='http://example.org/app')
self.assert_equal(response.status_code, 301)
self.assert_equal(response.headers['Location'], 'http://example.org/app/foo/')
def test(self, worker_cls, publish_cls):
mock_worker = worker_cls.return_value = Mock()
mock_publisher = publish_cls.return_value = Mock()
app = make_app_from_environ()
c = Client(app, BaseResponse)
sent_data = {
'url': '/url/',
'referrer': '/referrer/',
'user_id': 1,
}
# /page/ endpoint.
resp = c.post('/page/', data=json.dumps(sent_data))
assert resp.status_code == 201
assert mock_worker.queue.call_count == 1
row = mock_worker.queue.call_args[0][0]
for key in sent_data.keys() + ['id', 'received_at', 'context', 'sent_at']:
assert key in row
# /events/ endpoint.
sent_data['event_name'] = 'click'
resp = c.post('/event/', data=json.dumps(sent_data))
assert resp.status_code == 201
assert mock_publisher.publish.call_count == 1
row = mock_publisher.publish.call_args[1]['data']
# Make sure the UUID format is valid.
UUID(bytes=b64decode(row['uuid']))
for key in ('timestamp', 'type', 'data'):
assert key in row
data = row['data']
for key in sent_data.keys() + ['received_at', 'context', 'sent_at']:
if key != 'event_name':
assert key in data
def setUp(self):
self.logdir = self.get_temp_dir()
self.addCleanup(shutil.rmtree, self.logdir)
self._generate_test_data(run_name='run1')
self.multiplexer = event_multiplexer.EventMultiplexer(
size_guidance=application.DEFAULT_SIZE_GUIDANCE,
purge_orphaned_data=True)
self._context = base_plugin.TBContext(
assets_zip_provider=get_test_assets_zip_provider(),
logdir=self.logdir,
multiplexer=self.multiplexer)
self.plugin = core_plugin.CorePlugin(self._context)
app = application.TensorBoardWSGIApp(
self.logdir, [self.plugin], self.multiplexer, 0, path_prefix='')
self.server = werkzeug_test.Client(app, wrappers.BaseResponse)
def _SetupWSGIApp(self):
multiplexer = event_multiplexer.EventMultiplexer(
size_guidance=application.DEFAULT_SIZE_GUIDANCE,
purge_orphaned_data=True)
context = base_plugin.TBContext(
logdir=self.log_dir, multiplexer=multiplexer)
self.plugin = projector_plugin.ProjectorPlugin(context)
wsgi_app = application.TensorBoardWSGIApp(
self.log_dir, [self.plugin], multiplexer, reload_interval=0,
path_prefix='')
self.server = werkzeug_test.Client(wsgi_app, wrappers.BaseResponse)
def setUp(self):
plugins = [
FakePlugin(
None, plugin_name='foo', is_active_value=True, routes_mapping={}),
FakePlugin(
None, plugin_name='bar', is_active_value=False, routes_mapping={}),
]
app = application.TensorBoardWSGI(plugins)
self.server = werkzeug_test.Client(app, wrappers.BaseResponse)
def test_basic(self):
resources = join(dirname(__file__), 'multipart')
client = Client(form_data_consumer, Response)
repository = [
('firefox3-2png1txt', '---------------------------186454651713519341951581030105', [
(u'anchor.png', 'file1', 'image/png', 'file1.png'),
(u'application_edit.png', 'file2', 'image/png', 'file2.png')
], u'example text'),
('firefox3-2pnglongtext', '---------------------------14904044739787191031754711748', [
(u'accept.png', 'file1', 'image/png', 'file1.png'),
(u'add.png', 'file2', 'image/png', 'file2.png')
], u'--long text\r\n--with boundary\r\n--lookalikes--'),
('opera8-2png1txt', '----------zEO9jQKmLc2Cq88c23Dx19', [
(u'arrow_branch.png', 'file1', 'image/png', 'file1.png'),
(u'award_star_bronze_1.png', 'file2', 'image/png', 'file2.png')
], u'blafasel öäü'),
('webkit3-2png1txt', '----WebKitFormBoundaryjdSFhcARk8fyGNy6', [
(u'gtk-apply.png', 'file1', 'image/png', 'file1.png'),
(u'gtk-no.png', 'file2', 'image/png', 'file2.png')
], u'this is another text with ümläüts'),
('ie6-2png1txt', '---------------------------7d91b03a20128', [
(u'file1.png', 'file1', 'image/x-png', 'file1.png'),
(u'file2.png', 'file2', 'image/x-png', 'file2.png')
], u'ie6 sucks :-/')
]
for name, boundary, files, text in repository:
folder = join(resources, name)
data = get_contents(join(folder, 'request.txt'))
for filename, field, content_type, fsname in files:
response = client.post('/?object=' + field, data=data, content_type=
'multipart/form-data; boundary="%s"' % boundary,
content_length=len(data))
lines = response.get_data().split(b'\n', 3)
self.assert_strict_equal(lines[0], repr(filename).encode('ascii'))
self.assert_strict_equal(lines[1], repr(field).encode('ascii'))
self.assert_strict_equal(lines[2], repr(content_type).encode('ascii'))
self.assert_strict_equal(lines[3], get_contents(join(folder, fsname)))
response = client.post('/?object=text', data=data, content_type=
'multipart/form-data; boundary="%s"' % boundary,
content_length=len(data))
self.assert_strict_equal(response.get_data(), repr(text).encode('utf-8'))
def setUp(self):
self.log_dir = tempfile.mkdtemp()
# We use numpy.random to generate images. We seed to avoid non-determinism
# in this test.
numpy.random.seed(42)
# Create old-style image summaries for run "foo".
tf.reset_default_graph()
sess = tf.Session()
placeholder = tf.placeholder(tf.uint8)
tf.summary.image(name="baz", tensor=placeholder)
merged_summary_op = tf.summary.merge_all()
foo_directory = os.path.join(self.log_dir, "foo")
writer = tf.summary.FileWriter(foo_directory)
writer.add_graph(sess.graph)
for step in xrange(2):
writer.add_summary(sess.run(merged_summary_op, feed_dict={
placeholder: (numpy.random.rand(1, 16, 42, 3) * 255).astype(
numpy.uint8)
}), global_step=step)
writer.close()
# Create new-style image summaries for run bar.
tf.reset_default_graph()
sess = tf.Session()
placeholder = tf.placeholder(tf.uint8)
summary.op(name="quux", images=placeholder,
description="how do you pronounce that, anyway?")
merged_summary_op = tf.summary.merge_all()
bar_directory = os.path.join(self.log_dir, "bar")
writer = tf.summary.FileWriter(bar_directory)
writer.add_graph(sess.graph)
for step in xrange(2):
writer.add_summary(sess.run(merged_summary_op, feed_dict={
placeholder: (numpy.random.rand(1, 6, 8, 3) * 255).astype(
numpy.uint8)
}), global_step=step)
writer.close()
# Start a server with the plugin.
multiplexer = event_multiplexer.EventMultiplexer({
"foo": foo_directory,
"bar": bar_directory,
})
context = base_plugin.TBContext(
logdir=self.log_dir, multiplexer=multiplexer)
plugin = images_plugin.ImagesPlugin(context)
wsgi_app = application.TensorBoardWSGIApp(
self.log_dir, [plugin], multiplexer, reload_interval=0, path_prefix='')
self.server = werkzeug_test.Client(wsgi_app, wrappers.BaseResponse)
self.routes = plugin.get_plugin_apps()
def setUp(self):
self.log_dir = tempfile.mkdtemp()
# We use numpy.random to generate audio. We seed to avoid non-determinism
# in this test.
numpy.random.seed(42)
# Create old-style audio summaries for run "foo".
tf.reset_default_graph()
sess = tf.Session()
placeholder = tf.placeholder(tf.float32)
tf.summary.audio(name="baz", tensor=placeholder, sample_rate=44100)
merged_summary_op = tf.summary.merge_all()
foo_directory = os.path.join(self.log_dir, "foo")
writer = tf.summary.FileWriter(foo_directory)
writer.add_graph(sess.graph)
for step in xrange(2):
# The floats (sample data) range from -1 to 1.
writer.add_summary(sess.run(merged_summary_op, feed_dict={
placeholder: numpy.random.rand(42, 22050) * 2 - 1
}), global_step=step)
writer.close()
# Create new-style audio summaries for run "bar".
tf.reset_default_graph()
sess = tf.Session()
audio_placeholder = tf.placeholder(tf.float32)
labels_placeholder = tf.placeholder(tf.string)
summary.op("quux", audio_placeholder, sample_rate=44100,
labels=labels_placeholder,
description="how do you pronounce that, anyway?")
merged_summary_op = tf.summary.merge_all()
bar_directory = os.path.join(self.log_dir, "bar")
writer = tf.summary.FileWriter(bar_directory)
writer.add_graph(sess.graph)
for step in xrange(2):
# The floats (sample data) range from -1 to 1.
writer.add_summary(sess.run(merged_summary_op, feed_dict={
audio_placeholder: numpy.random.rand(42, 11025, 1) * 2 - 1,
labels_placeholder: [
tf.compat.as_bytes('step **%s**, sample %s' % (step, sample))
for sample in xrange(42)
],
}), global_step=step)
writer.close()
# Start a server with the plugin.
multiplexer = event_multiplexer.EventMultiplexer({
"foo": foo_directory,
"bar": bar_directory,
})
context = base_plugin.TBContext(
logdir=self.log_dir, multiplexer=multiplexer)
self.plugin = audio_plugin.AudioPlugin(context)
wsgi_app = application.TensorBoardWSGIApp(
self.log_dir, [self.plugin], multiplexer, reload_interval=0,
path_prefix='')
self.server = werkzeug_test.Client(wsgi_app, wrappers.BaseResponse)