def testIter(self):
count = 2
for rsvp in ws.create_connection('ws://stream.meetup.com/2/rsvps'):
count -= 1
if count == 0:
break
python类create_connection()的实例源码
def testNext(self):
sock = ws.create_connection('ws://stream.meetup.com/2/rsvps')
self.assertEqual(str, type(next(sock)))
def testWebSocket(self):
s = ws.create_connection("ws://echo.websocket.org/")
self.assertNotEqual(s, None)
s.send("Hello, World")
result = s.recv()
self.assertEqual(result, "Hello, World")
s.send(u"??????????")
result = s.recv()
self.assertEqual(result, "??????????")
s.close()
def testPingPong(self):
s = ws.create_connection("ws://echo.websocket.org/")
self.assertNotEqual(s, None)
s.ping("Hello")
s.pong("Hi")
s.close()
def testSecureWebSocket(self):
if 1:
import ssl
s = ws.create_connection("wss://echo.websocket.org/")
self.assertNotEqual(s, None)
self.assertTrue(isinstance(s.sock, ssl.SSLSocket))
s.send("Hello, World")
result = s.recv()
self.assertEqual(result, "Hello, World")
s.send(u"??????????")
result = s.recv()
self.assertEqual(result, "??????????")
s.close()
#except:
# pass
def testAfterClose(self):
s = ws.create_connection("ws://echo.websocket.org/")
self.assertNotEqual(s, None)
s.close()
self.assertRaises(ws.WebSocketConnectionClosedException, s.send, "Hello")
self.assertRaises(ws.WebSocketConnectionClosedException, s.recv)
def testSockOpt(self):
sockopt = ((socket.IPPROTO_TCP, socket.TCP_NODELAY, 1),)
s = ws.create_connection("ws://echo.websocket.org", sockopt=sockopt)
self.assertNotEqual(s.sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY), 0)
s.close()
def testIter(self):
count = 2
for rsvp in ws.create_connection('ws://stream.meetup.com/2/rsvps'):
count -= 1
if count == 0:
break
def testNext(self):
sock = ws.create_connection('ws://stream.meetup.com/2/rsvps')
self.assertEqual(str, type(next(sock)))
def testWebSocket(self):
s = ws.create_connection("ws://echo.websocket.org/")
self.assertNotEqual(s, None)
s.send("Hello, World")
result = s.recv()
self.assertEqual(result, "Hello, World")
s.send(u"??????????")
result = s.recv()
self.assertEqual(result, "??????????")
s.close()
def testSecureWebSocket(self):
if 1:
import ssl
s = ws.create_connection("wss://echo.websocket.org/")
self.assertNotEqual(s, None)
self.assertTrue(isinstance(s.sock, ssl.SSLSocket))
s.send("Hello, World")
result = s.recv()
self.assertEqual(result, "Hello, World")
s.send(u"??????????")
result = s.recv()
self.assertEqual(result, "??????????")
s.close()
#except:
# pass
def testWebSocketWihtCustomHeader(self):
s = ws.create_connection("ws://echo.websocket.org/",
headers={"User-Agent": "PythonWebsocketClient"})
self.assertNotEqual(s, None)
s.send("Hello, World")
result = s.recv()
self.assertEqual(result, "Hello, World")
s.close()
def testAfterClose(self):
s = ws.create_connection("ws://echo.websocket.org/")
self.assertNotEqual(s, None)
s.close()
self.assertRaises(ws.WebSocketConnectionClosedException, s.send, "Hello")
self.assertRaises(ws.WebSocketConnectionClosedException, s.recv)
def testSockOpt(self):
sockopt = ((socket.IPPROTO_TCP, socket.TCP_NODELAY, 1),)
s = ws.create_connection("ws://echo.websocket.org", sockopt=sockopt)
self.assertNotEqual(s.sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY), 0)
s.close()
def _process_data(self):
self.conn = create_connection(self.addr, timeout=4)
for pair in self.pairs:
payload = [{'event': 'addChannel',
'channel': 'ok_sub_spotusd_%s_ticker' % pair},
{'event': 'addChannel',
'channel': 'ok_sub_spotusd_%s_depth_60' % pair},
{'event': 'addChannel',
'channel': 'ok_sub_spotusd_%s_trades' % pair},
{'event': 'addChannel',
'channel': 'ok_sub_spotusd_%s_kline_1min' % pair}]
log.debug(payload)
self.conn.send(json.dumps(payload))
while self.running:
try:
data = json.loads(self.conn.recv())
except (WebSocketTimeoutException, ConnectionResetError):
self._controller_q.put('restart')
if 'data' in data:
pair = ''.join(data['channel'].split('spot')[1].split('_')[:2]).upper()
self.data_q.put((data['channel'], pair, data['data'],
time.time()))
else:
log.debug(data)
self.conn = None
def _process_data(self):
self.conn = create_connection(self.addr, timeout=4)
payload = json.dumps({'type': 'subscribe', 'product_ids': self.pairs})
self.conn.send(payload)
while self.running:
try:
data = json.loads(self.conn.recv())
except (WebSocketTimeoutException, ConnectionResetError):
self._controller_q.put('restart')
if 'product_id' in data:
self.data_q.put(('order_book', data['product_id'],
data, time.time()))
self.conn = None
def start(self):
"""
Start the websocket client threads
:return:
"""
super(BitfinexWSS, self).start()
log.info("BitfinexWSS.start(): Initializing Websocket connection..")
while self.conn is None:
try:
self.conn = create_connection(self.addr, timeout=10)
except WebSocketTimeoutException:
self.conn = None
print("Couldn't create websocket connection - retrying!")
log.info("BitfinexWSS.start(): Initializing receiver thread..")
if not self.receiver_thread:
self.receiver_thread = Thread(target=self.receive, name='Receiver Thread')
self.receiver_thread.start()
else:
log.info("BitfinexWSS.start(): Thread not started! "
"self.receiver_thread is populated!")
log.info("BitfinexWSS.start(): Initializing processing thread..")
if not self.processing_thread:
self.processing_thread = Thread(target=self.process, name='Processing Thread')
self.processing_thread.start()
else:
log.info("BitfinexWSS.start(): Thread not started! "
"self.processing_thread is populated!")
self.setup_subscriptions()
def _subscription_thread(self, endpoint):
"""
Thread Method, running the connection for each endpoint.
:param endpoint:
:return:
"""
try:
conn = create_connection(self.addr + endpoint, timeout=5)
except WebSocketTimeoutException:
self.restart_q.put(endpoint)
return
while self.threads_running[endpoint]:
try:
msg = conn.recv()
except WebSocketTimeoutException:
self._controller_q.put(endpoint)
log.debug("%s, %s", endpoint, msg)
ep, pair = endpoint.split('/')
log.debug("_subscription_thread(): Putting data on q..")
try:
self.data_q.put((ep, pair, msg, time.time()), timeout=1)
except TimeoutError:
continue
finally:
log.debug("_subscription_thread(): Data Processed, looping back..")
conn.close()
log.debug("_subscription_thread(): Thread Loop Ended.")
def ws_connect(self, path):
return create_connection(self.format_path('ws', path))
def connect(self):
""" Initialize a websocket handshake. """
tc_header = {
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:52.0) Gecko/20100101 Firefox/52.0',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate, br',
'Sec-WebSocket-Protocol': 'tc',
'Sec-WebSocket-Extensions': 'permessage-deflate'
}
# Comment out next 2 lines to not
# have debug info from websocket show in console.
if config.DEBUG_MODE:
websocket.enableTrace(True)
self._ws = websocket.create_connection(
'wss://wss.tinychat.com',
header=tc_header,
origin='https://tinychat.com'
)
if self._ws.connected:
log.info('connecting to: %s' % self.room_name)
if self.send_join_msg():
self.is_connected = True
self.__callback()