def testSockMaskKey(self):
""" A WebSocketApp should forward the received mask_key function down
to the actual socket.
"""
def my_mask_key_func():
pass
def on_open(self, *args, **kwargs):
""" Set the value so the test can use it later on and immediately
close the connection.
"""
WebSocketAppTest.get_mask_key_id = id(self.get_mask_key)
self.close()
app = ws.WebSocketApp('ws://echo.websocket.org/', on_open=on_open, get_mask_key=my_mask_key_func)
app.run_forever()
# Note: We can't use 'is' for comparing the functions directly, need to use 'id'.
self.assertEqual(WebSocketAppTest.get_mask_key_id, id(my_mask_key_func))
python类WebSocketApp()的实例源码
def testSockMaskKey(self):
""" A WebSocketApp should forward the received mask_key function down
to the actual socket.
"""
def my_mask_key_func():
pass
def on_open(self, *args, **kwargs):
""" Set the value so the test can use it later on and immediately
close the connection.
"""
WebSocketAppTest.get_mask_key_id = id(self.get_mask_key)
self.close()
app = ws.WebSocketApp('ws://echo.websocket.org/', on_open=on_open, get_mask_key=my_mask_key_func)
app.run_forever()
# Note: We can't use 'is' for comparing the functions directly, need to use 'id'.
self.assertEqual(WebSocketAppTest.get_mask_key_id, id(my_mask_key_func))
def testSockMaskKey(self):
""" A WebSocketApp should forward the received mask_key function down
to the actual socket.
"""
def my_mask_key_func():
pass
def on_open(self, *args, **kwargs):
""" Set the value so the test can use it later on and immediately
close the connection.
"""
WebSocketAppTest.get_mask_key_id = id(self.get_mask_key)
self.close()
app = ws.WebSocketApp('ws://echo.websocket.org/', on_open=on_open, get_mask_key=my_mask_key_func)
app.run_forever()
# Note: We can't use 'is' for comparing the functions directly, need to use 'id'.
self.assertEqual(WebSocketAppTest.get_mask_key_id, id(my_mask_key_func))
def start(self):
self.windowThread = threading.Thread(target = self.window.start)
self.windowThread.daemon = True
self.windowThread.start()
while True:
try:
self.ws = websocket.WebSocketApp(self.apiAddr,on_message = self.on_message,on_error = self.on_error,on_close = self.on_close)
self.ws.on_open = self.on_open
self.ws.run_forever()
except:
self.logger.error("websocket connection error, reconnecting...")
time.sleep(10)
def __init__(self, ws_host):
self._call_id = 0
self._inbox = Queue()
self._errors = Queue()
self._ws = websocket.WebSocketApp(
ws_host,
on_open=self.on_open,
on_message=self.on_message,
on_close=self.on_close,
on_error=self.on_error)
self._worker = threading.Thread(
target=self._ws.run_forever,
kwargs={'sslopt': sslopt_with_ca_certs})
def init_connection(self):
logger.debug('Initializing connection')
if self._pconfig['verbose']:
websocket.enableTrace(True)
self.socket = websocket.WebSocketApp(self.service,
on_message=self.on_socket_message,
on_close=self.on_socket_close,
on_error=self.on_socket_error)
self.socket_thread = threading.Thread(target=self.socket.run_forever)
self.socket_thread.daemon = True
self.socket_thread.start()
def setup(self, **kwargs):
"""
``setup`` is a low-level interface for users who need to configure
WebSocketApp details.
"""
self._app = websocket.WebSocketApp(
self._uri,
on_open=self._on_open,
on_message=self._on_message,
on_error=self._on_error,
on_close=self._on_close,
**kwargs
)
def connect():
global connected
if connected:
return
connected = True
slacker = Slacker(API_TOKEN)
url = slacker.rtm.start().body["url"]
ws = WebSocketApp(url,
on_message = on_message,
on_error = on_error,
on_close = on_close)
ws.on_open = on_open
ws.run_forever()
def testKeepRunning(self):
""" A WebSocketApp should keep running as long as its self.keep_running
is not False (in the boolean context).
"""
def on_open(self, *args, **kwargs):
""" Set the keep_running flag for later inspection and immediately
close the connection.
"""
WebSocketAppTest.keep_running_open = self.keep_running
self.close()
def on_close(self, *args, **kwargs):
""" Set the keep_running flag for the test to use.
"""
WebSocketAppTest.keep_running_close = self.keep_running
app = ws.WebSocketApp('ws://echo.websocket.org/', on_open=on_open, on_close=on_close)
app.run_forever()
self.assertFalse(isinstance(WebSocketAppTest.keep_running_open,
WebSocketAppTest.NotSetYet))
self.assertFalse(isinstance(WebSocketAppTest.keep_running_close,
WebSocketAppTest.NotSetYet))
self.assertEqual(True, WebSocketAppTest.keep_running_open)
self.assertEqual(False, WebSocketAppTest.keep_running_close)
def testKeepRunning(self):
""" A WebSocketApp should keep running as long as its self.keep_running
is not False (in the boolean context).
"""
def on_open(self, *args, **kwargs):
""" Set the keep_running flag for later inspection and immediately
close the connection.
"""
WebSocketAppTest.keep_running_open = self.keep_running
self.close()
def on_close(self, *args, **kwargs):
""" Set the keep_running flag for the test to use.
"""
WebSocketAppTest.keep_running_close = self.keep_running
app = ws.WebSocketApp('ws://echo.websocket.org/', on_open=on_open, on_close=on_close)
app.run_forever()
self.assertFalse(isinstance(WebSocketAppTest.keep_running_open,
WebSocketAppTest.NotSetYet))
self.assertFalse(isinstance(WebSocketAppTest.keep_running_close,
WebSocketAppTest.NotSetYet))
self.assertEqual(True, WebSocketAppTest.keep_running_open)
self.assertEqual(False, WebSocketAppTest.keep_running_close)
def connect(self, url,
on_message_handler=None,
on_open_handler=None,
on_close_handler=None,
on_error_handler=None,
reconnect_interval=10):
"""
:param url: Url link
:param on_message_handler: Message handler which take the message as
the first argument
:param on_open_handler: Socket open handler which take the socket as
the first argument
:param on_close_handler: Socket close handler which take the socket as
the first argument
:param on_error_handler: Socket error handler which take the socket as
the first argument and the error as the second
argument
:param reconnect_interval: The time interval for reconnection
"""
Logger.info(self.__class__.__name__, "Connecting to socket <%s>..." % self.id)
if on_message_handler is not None:
self.on_message_handlers.append(on_message_handler)
if on_open_handler is not None:
self.on_open_handlers.append(on_open_handler)
if on_close_handler is not None:
self.on_close_handlers.append(on_close_handler)
if on_error_handler is not None:
self.on_error_handlers.append(on_error_handler)
if not self._connecting and not self._connected:
self._connecting = True
self.ws = websocket.WebSocketApp(url,
on_message=self.__on_message,
on_close=self.__on_close,
on_open=self.__on_open,
on_error=self.__on_error)
self.wst = threading.Thread(target=lambda: self.__start(reconnect_interval=reconnect_interval))
self.wst.start()
return self.wst
def start(self):
# Standard REST API:
# This is use to get channel_id from a channel_name,
# and the OAuth token needed for Websocket requests
self.twitch = pytwitcherapi.TwitchSession()
self.twitch_login()
self.access_token = self.twitch.token['access_token']
try:
self.channel_id
except AttributeError:
self.channel_id = self.get_channel_id()
self._write_config('channel_id', self.channel_id)
if self.first_run:
# First run was a success, we don't need to wait 45 seconds for user login
# Set first_run param to 0 (== False)
self._write_config('first_run', '0')
# Websocket / PubSub:
# This is use to get Twitch's Bits information stream
if self.verbose:
websocket.enableTrace(True)
self.twitch.ws = websocket.WebSocketApp(
self.ws_host,
on_message=self.on_message,
on_error=self.on_error,
on_close=lambda _: self.log.info("Terminating...")
)
self.cm = ConsoleMini(db_filepath=self.db_filepath, log=self.log)
self.twitch.ws.on_open = self.on_open
self.twitch.ws.run_forever()
def run(self):
websocket.enableTrace(True)
ws = websocket.WebSocketApp(self.url,
on_message = self.on_message,
on_error = self.on_error,
on_close = self.on_close)
ws.on_open = self.on_open
ws.run_forever()
def init_websocket(self):
wsURL = self.build_websocket_url()
alog.debug("Connecting to %s" % (wsURL))
self.ws = websocket.WebSocketApp(wsURL,
on_message=self.__on_message,
on_close=self.__on_close,
on_open=self.__on_open,
on_error=self.__on_error,
header=self.__get_auth(),
on_ping=self.__on_ping,
on_pong=self.__on_pong)
def run_forever(self):
""" This method is used to run the websocket app continuously.
It will execute callbacks as defined and try to stay
connected with the provided APIs
"""
cnt = 0
while True:
cnt += 1
self.url = next(self.urls)
log.debug("Trying to connect to node %s" % self.url)
try:
# websocket.enableTrace(True)
self.ws = websocket.WebSocketApp(
self.url,
on_message=self.on_message,
# on_data=self.on_message,
on_error=self.on_error,
on_close=self.on_close,
on_open=self.on_open
)
self.ws.run_forever()
except websocket.WebSocketException as exc:
if (self.num_retries >= 0 and cnt > self.num_retries):
raise NumRetriesReached()
sleeptime = (cnt - 1) * 2 if cnt < 10 else 10
if sleeptime:
log.warning(
"Lost connection to node during wsconnect(): %s (%d/%d) "
% (self.url, cnt, self.num_retries) +
"Retrying in %d seconds" % sleeptime
)
time.sleep(sleeptime)
except KeyboardInterrupt:
self.ws.keep_running = False
raise
except Exception as e:
log.critical("{}\n\n{}".format(str(e), traceback.format_exc()))
def testKeepRunning(self):
""" A WebSocketApp should keep running as long as its self.keep_running
is not False (in the boolean context).
"""
def on_open(self, *args, **kwargs):
""" Set the keep_running flag for later inspection and immediately
close the connection.
"""
WebSocketAppTest.keep_running_open = self.keep_running
self.close()
def on_close(self, *args, **kwargs):
""" Set the keep_running flag for the test to use.
"""
WebSocketAppTest.keep_running_close = self.keep_running
app = ws.WebSocketApp('ws://echo.websocket.org/', on_open=on_open, on_close=on_close)
app.run_forever()
self.assertFalse(isinstance(WebSocketAppTest.keep_running_open,
WebSocketAppTest.NotSetYet))
self.assertFalse(isinstance(WebSocketAppTest.keep_running_close,
WebSocketAppTest.NotSetYet))
self.assertEqual(True, WebSocketAppTest.keep_running_open)
self.assertEqual(False, WebSocketAppTest.keep_running_close)
def stream(self):
ws_app = websocket.WebSocketApp(
self.url,
on_message=self._on_message,
on_error=self._on_error,
on_close=self._on_close
)
# FIXME: {'Authorization': os.environ.get('RISEML_APIKEY')}
ws_app.run_forever()
def _ws_connect(self):
# Initialize websocket parameters
self.wsa = websocket.WebSocketApp(
url=self.ws_url,
on_message=self._server_response,
on_error=self._ws_error,
on_open=self._ws_open,
on_close=self._ws_close)
# Run the websocket in parallel thread
self.wsa_thread = threading.Thread(
target=self.wsa.run_forever,
name='WSA-Thread')
self.wsa_thread.setDaemon(True)
self.wsa_thread.start()
def start_websocket():
websocket.enableTrace(True)
ws = websocket.WebSocketApp("ws://192.168.31.71:8080/api/ws/runner",
on_message = on_message,
on_error = on_error,
on_close = on_close)
ws.on_open = on_open
ws.run_forever()
# print('start_websocket...')
# t = threading.Thread(target=start_websocket, args=())
# t.start()
def run_with_id(self, printer_id):
"""
Runs thread, which listens on socket.
Executes given callbacks on events
"""
def on_message(ws, data):
if data.startswith('m'):
self.on_message(ws, json.loads(data[1:]))
elif data.startswith('a'):
for msg in json.loads(data[1:]):
self.on_message(ws, msg, printer_id)
def on_error(ws, exception):
data = {
"id": printer_id,
"state": {
"text": "Offline/Unreachable"
}
}
socketio.emit("status", data, room=str(printer_id))
self.socket = websocket.WebSocketApp(self.url,
on_open=self.on_open,
on_close=self.on_close,
on_message=on_message,
on_error=on_error)
self.thread = Thread(target=self.run_forever)
self.thread.daemon = True
self.thread.start()