python类WebSocketApp()的实例源码

test_websocket.py 文件源码 项目:bitcoinelasticsearch 作者: currentsea 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def testSockMaskKey(self):
        """ A WebSocketApp should forward the received mask_key function down
        to the actual socket.
        """

        def my_mask_key_func():
            pass

        def on_open(self, *args, **kwargs):
            """ Set the value so the test can use it later on and immediately
            close the connection.
            """
            WebSocketAppTest.get_mask_key_id = id(self.get_mask_key)
            self.close()

        app = ws.WebSocketApp('ws://echo.websocket.org/', on_open=on_open, get_mask_key=my_mask_key_func)
        app.run_forever()

        # Note: We can't use 'is' for comparing the functions directly, need to use 'id'.
        self.assertEqual(WebSocketAppTest.get_mask_key_id, id(my_mask_key_func))
test_websocket.py 文件源码 项目:bitcoinelasticsearch 作者: currentsea 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def testSockMaskKey(self):
        """ A WebSocketApp should forward the received mask_key function down
        to the actual socket.
        """

        def my_mask_key_func():
            pass

        def on_open(self, *args, **kwargs):
            """ Set the value so the test can use it later on and immediately
            close the connection.
            """
            WebSocketAppTest.get_mask_key_id = id(self.get_mask_key)
            self.close()

        app = ws.WebSocketApp('ws://echo.websocket.org/', on_open=on_open, get_mask_key=my_mask_key_func)
        app.run_forever()

        # Note: We can't use 'is' for comparing the functions directly, need to use 'id'.
        self.assertEqual(WebSocketAppTest.get_mask_key_id, id(my_mask_key_func))
test_websocket.py 文件源码 项目:deb-python-websocket-client 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def testSockMaskKey(self):
        """ A WebSocketApp should forward the received mask_key function down
        to the actual socket.
        """

        def my_mask_key_func():
            pass

        def on_open(self, *args, **kwargs):
            """ Set the value so the test can use it later on and immediately
            close the connection.
            """
            WebSocketAppTest.get_mask_key_id = id(self.get_mask_key)
            self.close()

        app = ws.WebSocketApp('ws://echo.websocket.org/', on_open=on_open, get_mask_key=my_mask_key_func)
        app.run_forever()

        # Note: We can't use 'is' for comparing the functions directly, need to use 'id'.
        self.assertEqual(WebSocketAppTest.get_mask_key_id, id(my_mask_key_func))
display.py 文件源码 项目:IotCenter 作者: panjanek 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def start(self):
        self.windowThread = threading.Thread(target = self.window.start)
        self.windowThread.daemon = True
        self.windowThread.start() 
        while True:
            try:
                self.ws = websocket.WebSocketApp(self.apiAddr,on_message = self.on_message,on_error = self.on_error,on_close = self.on_close)
                self.ws.on_open = self.on_open
                self.ws.run_forever()
            except:
                self.logger.error("websocket connection error, reconnecting...")
            time.sleep(10)
rtm_loop.py 文件源码 项目:bearychat.py 作者: bearyinnovative 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def __init__(self, ws_host):
        self._call_id = 0
        self._inbox = Queue()
        self._errors = Queue()
        self._ws = websocket.WebSocketApp(
            ws_host,
            on_open=self.on_open,
            on_message=self.on_message,
            on_close=self.on_close,
            on_error=self.on_error)
        self._worker = threading.Thread(
            target=self._ws.run_forever,
            kwargs={'sslopt': sslopt_with_ca_certs})
hyperionplatform.py 文件源码 项目:AlexaPi 作者: alexa-pi 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def init_connection(self):
        logger.debug('Initializing connection')
        if self._pconfig['verbose']:
            websocket.enableTrace(True)
        self.socket = websocket.WebSocketApp(self.service,
                on_message=self.on_socket_message,
                on_close=self.on_socket_close,
                on_error=self.on_socket_error)
        self.socket_thread = threading.Thread(target=self.socket.run_forever)
        self.socket_thread.daemon = True
        self.socket_thread.start()
api.py 文件源码 项目:sensorbee-python 作者: kmaehashi 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def setup(self, **kwargs):
        """
        ``setup`` is a low-level interface for users who need to configure
        WebSocketApp details.
        """
        self._app = websocket.WebSocketApp(
            self._uri,
            on_open=self._on_open,
            on_message=self._on_message,
            on_error=self._on_error,
            on_close=self._on_close,
            **kwargs
        )
emoji_fetcher.py 文件源码 项目:slack-bomber 作者: setokinto 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def connect():
    global connected
    if connected:
        return
    connected = True
    slacker = Slacker(API_TOKEN)
    url = slacker.rtm.start().body["url"]
    ws = WebSocketApp(url,
                      on_message = on_message,
                      on_error = on_error,
                      on_close = on_close)
    ws.on_open = on_open
    ws.run_forever()
test_websocket.py 文件源码 项目:Treehacks 作者: andrewsy97 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def testKeepRunning(self):
        """ A WebSocketApp should keep running as long as its self.keep_running
        is not False (in the boolean context).
        """

        def on_open(self, *args, **kwargs):
            """ Set the keep_running flag for later inspection and immediately
            close the connection.
            """
            WebSocketAppTest.keep_running_open = self.keep_running

            self.close()

        def on_close(self, *args, **kwargs):
            """ Set the keep_running flag for the test to use.
            """
            WebSocketAppTest.keep_running_close = self.keep_running

        app = ws.WebSocketApp('ws://echo.websocket.org/', on_open=on_open, on_close=on_close)
        app.run_forever()

        self.assertFalse(isinstance(WebSocketAppTest.keep_running_open,
                                    WebSocketAppTest.NotSetYet))

        self.assertFalse(isinstance(WebSocketAppTest.keep_running_close,
                                    WebSocketAppTest.NotSetYet))

        self.assertEqual(True, WebSocketAppTest.keep_running_open)
        self.assertEqual(False, WebSocketAppTest.keep_running_close)
test_websocket.py 文件源码 项目:Treehacks 作者: andrewsy97 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def testKeepRunning(self):
        """ A WebSocketApp should keep running as long as its self.keep_running
        is not False (in the boolean context).
        """

        def on_open(self, *args, **kwargs):
            """ Set the keep_running flag for later inspection and immediately
            close the connection.
            """
            WebSocketAppTest.keep_running_open = self.keep_running

            self.close()

        def on_close(self, *args, **kwargs):
            """ Set the keep_running flag for the test to use.
            """
            WebSocketAppTest.keep_running_close = self.keep_running

        app = ws.WebSocketApp('ws://echo.websocket.org/', on_open=on_open, on_close=on_close)
        app.run_forever()

        self.assertFalse(isinstance(WebSocketAppTest.keep_running_open,
                                    WebSocketAppTest.NotSetYet))

        self.assertFalse(isinstance(WebSocketAppTest.keep_running_close,
                                    WebSocketAppTest.NotSetYet))

        self.assertEqual(True, WebSocketAppTest.keep_running_open)
        self.assertEqual(False, WebSocketAppTest.keep_running_close)
ws_api_socket.py 文件源码 项目:BitcoinExchangeFH 作者: Aurora-Team 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def connect(self, url,
                on_message_handler=None,
                on_open_handler=None,
                on_close_handler=None,
                on_error_handler=None,
                reconnect_interval=10):
        """
        :param url: Url link
        :param on_message_handler: Message handler which take the message as
                           the first argument
        :param on_open_handler: Socket open handler which take the socket as
                           the first argument
        :param on_close_handler: Socket close handler which take the socket as
                           the first argument
        :param on_error_handler: Socket error handler which take the socket as
                           the first argument and the error as the second
                           argument
        :param reconnect_interval: The time interval for reconnection
        """
        Logger.info(self.__class__.__name__, "Connecting to socket <%s>..." % self.id)
        if on_message_handler is not None:
            self.on_message_handlers.append(on_message_handler)
        if on_open_handler is not None:
            self.on_open_handlers.append(on_open_handler)
        if on_close_handler is not None:
            self.on_close_handlers.append(on_close_handler)
        if on_error_handler is not None:
            self.on_error_handlers.append(on_error_handler)

        if not self._connecting and not self._connected:
            self._connecting = True
            self.ws = websocket.WebSocketApp(url,
                                             on_message=self.__on_message,
                                             on_close=self.__on_close,
                                             on_open=self.__on_open,
                                             on_error=self.__on_error)
            self.wst = threading.Thread(target=lambda: self.__start(reconnect_interval=reconnect_interval))
            self.wst.start()

        return self.wst
twitchbitsinfo.py 文件源码 项目:twitch-bits-info 作者: floweb 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def start(self):
        # Standard REST API:
        # This is use to get channel_id from a channel_name,
        # and the OAuth token needed for Websocket requests
        self.twitch = pytwitcherapi.TwitchSession()

        self.twitch_login()
        self.access_token = self.twitch.token['access_token']
        try:
            self.channel_id
        except AttributeError:
            self.channel_id = self.get_channel_id()
            self._write_config('channel_id', self.channel_id)

        if self.first_run:
            # First run was a success, we don't need to wait 45 seconds for user login
            # Set first_run param to 0 (== False)
            self._write_config('first_run', '0')

        # Websocket / PubSub:
        # This is use to get Twitch's Bits information stream
        if self.verbose:
            websocket.enableTrace(True)

        self.twitch.ws = websocket.WebSocketApp(
            self.ws_host,
            on_message=self.on_message,
            on_error=self.on_error,
            on_close=lambda _: self.log.info("Terminating...")
        )

        self.cm = ConsoleMini(db_filepath=self.db_filepath, log=self.log)

        self.twitch.ws.on_open = self.on_open
        self.twitch.ws.run_forever()
signalk-nextion.py 文件源码 项目:pynextion 作者: raffmont 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def run(self):
        websocket.enableTrace(True)
        ws = websocket.WebSocketApp(self.url,
                                on_message = self.on_message,
                                on_error = self.on_error,
                                on_close = self.on_close)
        ws.on_open = self.on_open
        ws.run_forever()
websocket.py 文件源码 项目:bitmex-websocket 作者: joliveros 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def init_websocket(self):
        wsURL = self.build_websocket_url()
        alog.debug("Connecting to %s" % (wsURL))
        self.ws = websocket.WebSocketApp(wsURL,
                                         on_message=self.__on_message,
                                         on_close=self.__on_close,
                                         on_open=self.__on_open,
                                         on_error=self.__on_error,
                                         header=self.__get_auth(),
                                         on_ping=self.__on_ping,
                                         on_pong=self.__on_pong)
websocket.py 文件源码 项目:python-muse 作者: aaroncox 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def run_forever(self):
        """ This method is used to run the websocket app continuously.
            It will execute callbacks as defined and try to stay
            connected with the provided APIs
        """
        cnt = 0
        while True:
            cnt += 1
            self.url = next(self.urls)
            log.debug("Trying to connect to node %s" % self.url)
            try:
                # websocket.enableTrace(True)
                self.ws = websocket.WebSocketApp(
                    self.url,
                    on_message=self.on_message,
                    # on_data=self.on_message,
                    on_error=self.on_error,
                    on_close=self.on_close,
                    on_open=self.on_open
                )
                self.ws.run_forever()
            except websocket.WebSocketException as exc:
                if (self.num_retries >= 0 and cnt > self.num_retries):
                    raise NumRetriesReached()

                sleeptime = (cnt - 1) * 2 if cnt < 10 else 10
                if sleeptime:
                    log.warning(
                        "Lost connection to node during wsconnect(): %s (%d/%d) "
                        % (self.url, cnt, self.num_retries) +
                        "Retrying in %d seconds" % sleeptime
                    )
                    time.sleep(sleeptime)

            except KeyboardInterrupt:
                self.ws.keep_running = False
                raise

            except Exception as e:
                log.critical("{}\n\n{}".format(str(e), traceback.format_exc()))
test_websocket.py 文件源码 项目:tk-photoshopcc 作者: shotgunsoftware 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def testKeepRunning(self):
        """ A WebSocketApp should keep running as long as its self.keep_running
        is not False (in the boolean context).
        """

        def on_open(self, *args, **kwargs):
            """ Set the keep_running flag for later inspection and immediately
            close the connection.
            """
            WebSocketAppTest.keep_running_open = self.keep_running

            self.close()

        def on_close(self, *args, **kwargs):
            """ Set the keep_running flag for the test to use.
            """
            WebSocketAppTest.keep_running_close = self.keep_running

        app = ws.WebSocketApp('ws://echo.websocket.org/', on_open=on_open, on_close=on_close)
        app.run_forever()

        self.assertFalse(isinstance(WebSocketAppTest.keep_running_open,
                                    WebSocketAppTest.NotSetYet))

        self.assertFalse(isinstance(WebSocketAppTest.keep_running_close,
                                    WebSocketAppTest.NotSetYet))

        self.assertEqual(True, WebSocketAppTest.keep_running_open)
        self.assertEqual(False, WebSocketAppTest.keep_running_close)
stream.py 文件源码 项目:cli 作者: riseml 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def stream(self):
        ws_app = websocket.WebSocketApp(
            self.url,
            on_message=self._on_message,
            on_error=self._on_error,
            on_close=self._on_close
        )
        # FIXME: {'Authorization': os.environ.get('RISEML_APIKEY')}
        ws_app.run_forever()
client.py 文件源码 项目:mopidy-json-client 作者: ismailof 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _ws_connect(self):
        # Initialize websocket parameters
        self.wsa = websocket.WebSocketApp(
            url=self.ws_url,
            on_message=self._server_response,
            on_error=self._ws_error,
            on_open=self._ws_open,
            on_close=self._ws_close)

        # Run the websocket in parallel thread
        self.wsa_thread = threading.Thread(
            target=self.wsa.run_forever,
            name='WSA-Thread')
        self.wsa_thread.setDaemon(True)
        self.wsa_thread.start()
main.py 文件源码 项目:wpiao 作者: chuqingq 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def start_websocket():
    websocket.enableTrace(True)
    ws = websocket.WebSocketApp("ws://192.168.31.71:8080/api/ws/runner",
                              on_message = on_message,
                              on_error = on_error,
                              on_close = on_close)
    ws.on_open = on_open
    ws.run_forever()

# print('start_websocket...')
# t = threading.Thread(target=start_websocket, args=())
# t.start()
OctoprintStatus.py 文件源码 项目:OctoPrint-Dashboard 作者: meadowfrey 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def run_with_id(self, printer_id):
        """
        Runs thread, which listens on socket.
        Executes given callbacks on events
        """

        def on_message(ws, data):
            if data.startswith('m'):
                self.on_message(ws, json.loads(data[1:]))
            elif data.startswith('a'):
                for msg in json.loads(data[1:]):
                    self.on_message(ws, msg, printer_id)

        def on_error(ws, exception):
            data = {
                "id": printer_id,
                "state": {
                    "text": "Offline/Unreachable"
                }
            }
            socketio.emit("status", data, room=str(printer_id))

        self.socket = websocket.WebSocketApp(self.url,
                                             on_open=self.on_open,
                                             on_close=self.on_close,
                                             on_message=on_message,
                                             on_error=on_error)
        self.thread = Thread(target=self.run_forever)
        self.thread.daemon = True
        self.thread.start()


问题


面经


文章

微信
公众号

扫码关注公众号