def _init_websock(self):
#create the tornado websocket client
params = self._param_bc.value
param_server_host = params['param_server_host']
param_server_port = params['param_server_port']
self._websock = yield websocket.websocket_connect('ws://%s:%d' % (param_server_host, param_server_port), connect_timeout=3600)
python类websocket_connect()的实例源码
def init_websocket(self, url):
"""Initiate a new WebSocket connection."""
self._websocket = await websocket_connect(self.get_url(url))
return self._websocket
def _create_client(self):
return websocket_connect(
'ws://localhost:{}/?username=joe&password=secret'.format(self.port)
)
websocket_test.py 文件源码
项目:My-Web-Server-Framework-With-Python2.7
作者: syjsu
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def ws_connect(self, path, compression_options=None):
ws = yield websocket_connect(
'ws://127.0.0.1:%d%s' % (self.get_http_port(), path),
compression_options=compression_options)
raise gen.Return(ws)
websocket_test.py 文件源码
项目:My-Web-Server-Framework-With-Python2.7
作者: syjsu
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def test_websocket_callbacks(self):
websocket_connect(
'ws://127.0.0.1:%d/echo' % self.get_http_port(),
io_loop=self.io_loop, callback=self.stop)
ws = self.wait().result()
ws.write_message('hello')
ws.read_message(self.stop)
response = self.wait().result()
self.assertEqual(response, 'hello')
self.close_future.add_done_callback(lambda f: self.stop())
ws.close()
self.wait()
websocket_test.py 文件源码
项目:My-Web-Server-Framework-With-Python2.7
作者: syjsu
项目源码
文件源码
阅读 18
收藏 0
点赞 0
评论 0
def test_websocket_network_fail(self):
sock, port = bind_unused_port()
sock.close()
with self.assertRaises(IOError):
with ExpectLog(gen_log, ".*"):
yield websocket_connect(
'ws://127.0.0.1:%d/' % port,
io_loop=self.io_loop,
connect_timeout=3600)
websocket_test.py 文件源码
项目:My-Web-Server-Framework-With-Python2.7
作者: syjsu
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def test_websocket_close_buffered_data(self):
ws = yield websocket_connect(
'ws://127.0.0.1:%d/echo' % self.get_http_port())
ws.write_message('hello')
ws.write_message('world')
# Close the underlying stream.
ws.stream.close()
yield self.close_future
websocket_test.py 文件源码
项目:My-Web-Server-Framework-With-Python2.7
作者: syjsu
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def test_websocket_headers(self):
# Ensure that arbitrary headers can be passed through websocket_connect.
ws = yield websocket_connect(
HTTPRequest('ws://127.0.0.1:%d/header' % self.get_http_port(),
headers={'X-Test': 'hello'}))
response = yield ws.read_message()
self.assertEqual(response, 'hello')
yield self.close(ws)
websocket_test.py 文件源码
项目:My-Web-Server-Framework-With-Python2.7
作者: syjsu
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def test_check_origin_valid_with_path(self):
port = self.get_http_port()
url = 'ws://127.0.0.1:%d/echo' % port
headers = {'Origin': 'http://127.0.0.1:%d/something' % port}
ws = yield websocket_connect(HTTPRequest(url, headers=headers),
io_loop=self.io_loop)
ws.write_message('hello')
response = yield ws.read_message()
self.assertEqual(response, 'hello')
yield self.close(ws)
websocket_test.py 文件源码
项目:My-Web-Server-Framework-With-Python2.7
作者: syjsu
项目源码
文件源码
阅读 18
收藏 0
点赞 0
评论 0
def test_check_origin_invalid_partial_url(self):
port = self.get_http_port()
url = 'ws://127.0.0.1:%d/echo' % port
headers = {'Origin': '127.0.0.1:%d' % port}
with self.assertRaises(HTTPError) as cm:
yield websocket_connect(HTTPRequest(url, headers=headers),
io_loop=self.io_loop)
self.assertEqual(cm.exception.code, 403)
websocket_test.py 文件源码
项目:My-Web-Server-Framework-With-Python2.7
作者: syjsu
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def test_check_origin_invalid(self):
port = self.get_http_port()
url = 'ws://127.0.0.1:%d/echo' % port
# Host is 127.0.0.1, which should not be accessible from some other
# domain
headers = {'Origin': 'http://somewhereelse.com'}
with self.assertRaises(HTTPError) as cm:
yield websocket_connect(HTTPRequest(url, headers=headers),
io_loop=self.io_loop)
self.assertEqual(cm.exception.code, 403)
websocket_test.py 文件源码
项目:My-Web-Server-Framework-With-Python2.7
作者: syjsu
项目源码
文件源码
阅读 18
收藏 0
点赞 0
评论 0
def test_check_origin_invalid_subdomains(self):
port = self.get_http_port()
url = 'ws://localhost:%d/echo' % port
# Subdomains should be disallowed by default. If we could pass a
# resolver to websocket_connect we could test sibling domains as well.
headers = {'Origin': 'http://subtenant.localhost'}
with self.assertRaises(HTTPError) as cm:
yield websocket_connect(HTTPRequest(url, headers=headers),
io_loop=self.io_loop)
self.assertEqual(cm.exception.code, 403)
def _setup(self):
self._token = os.getenv('SLACK_TOKEN')
if not self._token:
raise InvalidOptions('SLACK_TOKEN required for slack engine.')
log.info('Authenticating...')
try:
response = yield self.api('rtm.start')
except Exception as e:
raise CoreException('API call "rtm.start" to Slack failed: %s' % e)
if response['ok']:
log.info('Logged in!')
else:
log.error('Login failed. Reason: "{}". Payload dump: {}'.format(
response.get('error', 'No error specified'), response))
raise InvalidOptions('Login failed')
self.socket_url = response['url']
self.connection = yield websocket.websocket_connect(self.socket_url)
self._user_id = response['self']['id']
self._user_name = response['self']['name']
self._users = response['users']
self._channels = response['channels']
self._channels.extend(response['groups'])
self._too_fast_warning = False
def test_websocket_gen(self):
ws = yield websocket_connect(
'ws://localhost:%d/echo' % self.get_http_port(),
io_loop=self.io_loop)
ws.write_message('hello')
response = yield ws.read_message()
self.assertEqual(response, 'hello')
ws.close()
yield self.close_future
def test_websocket_callbacks(self):
websocket_connect(
'ws://localhost:%d/echo' % self.get_http_port(),
io_loop=self.io_loop, callback=self.stop)
ws = self.wait().result()
ws.write_message('hello')
ws.read_message(self.stop)
response = self.wait().result()
self.assertEqual(response, 'hello')
ws.close()
yield self.close_future
def test_websocket_http_fail(self):
with self.assertRaises(HTTPError) as cm:
yield websocket_connect(
'ws://localhost:%d/notfound' % self.get_http_port(),
io_loop=self.io_loop)
self.assertEqual(cm.exception.code, 404)
def test_websocket_http_success(self):
with self.assertRaises(WebSocketError):
yield websocket_connect(
'ws://localhost:%d/non_ws' % self.get_http_port(),
io_loop=self.io_loop)
def test_websocket_network_timeout(self):
sock, port = bind_unused_port()
sock.close()
with self.assertRaises(HTTPError) as cm:
with ExpectLog(gen_log, ".*"):
yield websocket_connect(
'ws://localhost:%d/' % port,
io_loop=self.io_loop,
connect_timeout=0.01)
self.assertEqual(cm.exception.code, 599)
def test_websocket_close_buffered_data(self):
ws = yield websocket_connect(
'ws://localhost:%d/echo' % self.get_http_port())
ws.write_message('hello')
ws.write_message('world')
ws.stream.close()
yield self.close_future
def test_websocket_headers(self):
# Ensure that arbitrary headers can be passed through websocket_connect.
ws = yield websocket_connect(
HTTPRequest('ws://localhost:%d/header' % self.get_http_port(),
headers={'X-Test': 'hello'}))
response = yield ws.read_message()
self.assertEqual(response, 'hello')
ws.close()
yield self.close_future