python类WebSocketApp()的实例源码

api.py 文件源码 项目:upstox-python 作者: upstox 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def start_websocket(self, run_in_background=False):
        socket_params = {}
        try:
            socket_params = self.get_socket_params()
        except requests.exceptions.HTTPError:
            print ("Can't Access Socket Params")
        ping_interval = 60
        ping_timeout = 10

        if 'pythonPingInterval' in socket_params.keys():
            ping_interval = socket_params['pythonPingInterval']

        if 'pythonPingTimeout' in socket_params.keys():
            ping_timeout = socket_params['pythonPingTimeout']

        url = self.config['socketEndpoint'].format(api_key=self.api_key, access_token=self.access_token)
        self.websocket = websocket.WebSocketApp(url,
                                                header={'Authorization: Bearer' + self.access_token},
                                                on_data=self._on_data,
                                                on_error=self._on_error,
                                                on_close=self._on_close)
        if run_in_background is True:
            self.ws_thread = threading.Thread(target=self.websocket.run_forever)
            self.ws_thread.daemon = True
            self.ws_thread.start()
        else:
            self.websocket.run_forever(ping_interval=ping_interval, ping_timeout=ping_timeout)
CliWebsocket.py 文件源码 项目:cli 作者: sparkl 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __init__(self, args, ws_url):
        """
        Opens the websocket url.
        """
        self.args = args
        self.ws_url = ws_url
        self.on_message = lambda args, data: None
        cookies = unpickle_cookies(args)
        session = cookies["ipaas_session"]

        self.ws_connection = websocket.WebSocketApp(
            ws_url,
            on_message=self.__ws_message,
            on_open=self.__ws_open,
            on_error=self.__ws_error,
            on_close=self.__ws_close,
            on_data=self.__ws_data,
            cookie="ipaas_session=" + session)
proxy.py 文件源码 项目:Cloudroid 作者: cyberdb 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def run(self):
        self.topic_type = wait_topic_ready(self.topic_name, self.url)
    #print str(self.topic_type)+"  self.topic_type"
        if not self.topic_type:
            rospy.logerr('Type of topic %s are not equal in the remote and local sides', self.topic_name)
            return

        topic_type_module, topic_type_name = tuple(self.topic_type.split('/'))
        try:       
            roslib.load_manifest(topic_type_module)
            msg_module = import_module(topic_type_module + '.msg')
            self.rostype = getattr(msg_module, topic_type_name)

            if self.test:
                self.publisher = rospy.Publisher(self.topic_name + '_rb', self.rostype, queue_size = self.queue_size)
            else: 
                self.publisher = rospy.Publisher(self.topic_name, self.rostype, queue_size = self.queue_size)

            self.ws = websocket.WebSocketApp(self.url, on_message = self.on_message, on_error = self.on_error, on_close = self.on_close, on_open = self.on_open)
            rospy.loginfo('Create connection to Rosbridge server %s for subscribed topic %s successfully', self.url, self.topic_name)
            self.ws.run_forever()
        except ResourceNotFound, e:
            rospy.logerr('Proxy for subscribed topic %s init falied. Reason: Could not find the required resource: %s', self.topic_name, str(e))
        except Exception, e:
            rospy.logerr('Proxy for subscribed topic %s init falied. Reason: %s', self.topic_name, str(e))
proxy.py 文件源码 项目:Cloudroid 作者: cyberdb 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def run(self):
        self.service_type, self.service_args = wait_service_ready(self.service_name, self.url)
        if not self.service_type:
            rospy.logerr('Type of service %s are not equal in the remote and local sides', self.service_type)
            return

        service_type_module, service_type_name = tuple(self.service_type.split('/'))
        try:       
            roslib.load_manifest(service_type_module)
            msg_module = import_module(service_type_module + '.srv')
            self.srvtype = getattr(msg_module, service_type_name)

            if self.test:
                self.caller = rospy.Service(self.service_name + '_rb', self.srvtype, self.callback)#, self.queue_size)
            else: 
                self.caller = rospy.Service(self.service_name, self.srvtype, self.callback)#, self.queue_size)

            self.ws = websocket.WebSocketApp(self.url, on_message = self.on_message, on_error = self.on_error, on_close = self.on_close, on_open = self.on_open)
            rospy.loginfo('Create connection to Rosbridge server %s for calling service %s successfully', self.url, self.service_name)
            self.ws.run_forever()
        except ResourceNotFound, e:
            rospy.logerr('Proxy for service %s init falied. Reason: Could not find the required resource: %s', self.service_name, str(e))
        except Exception, e:
            rospy.logerr('Proxy for service %s init falied. Reason: %s', self.service_name, str(e))
proxy.py 文件源码 项目:Cloudroid 作者: cyberdb 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def run(self):
        self.topic_type = wait_topic_ready(self.topic_name, self.url)
        if not self.topic_type:
            rospy.logerr('Type of topic %s are not equal in the remote and local sides', self.topic_name)
            return

        topic_type_module, topic_type_name = tuple(self.topic_type.split('/'))
        try:      
            roslib.load_manifest(topic_type_module)
            msg_module = import_module(topic_type_module + '.msg')
            self.rostype = getattr(msg_module, topic_type_name)

            self.subscriber = rospy.Subscriber(self.topic_name, self.rostype, self.callback)

            self.ws = websocket.WebSocketApp(self.url, on_message = self.on_message, on_error = self.on_error, on_close = self.on_close, on_open = self.on_open)
            rospy.loginfo('Create connection to Rosbridge server for published topic %s successfully', self.topic_name)
            self.ws.run_forever()
        except ResourceNotFound, e:
            rospy.logerr('Could not find the required resource %s', str(e))
        except Exception, e:
            rospy.logerr('Proxy for published topic %s init falied. Reason: %s', self.topic_name, str(e))
handler.py 文件源码 项目:rancher-gen 作者: pitrho 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def start(self):
        header = {
            'Authorization': 'Basic {0}'.format(self.api_token)
        }
        protocol = 'wss' if self.ssl else 'ws'
        url = '{0}://{1}:{2}/v1/projects/{3}/subscribe?eventNames='\
            'resource.change&include=services'\
            .format(protocol, self.rancher_host, self.rancher_port,
                    self.project_id)
        self.ws = websocket.WebSocketApp(url, header=header,
                                         on_message=self._on_message,
                                         on_open=self._on_open,
                                         on_error=self._on_error,
                                         on_close=self._on_close)

        logger.info('Watching for rancher events')
        self.ws.run_forever()
__init__.py 文件源码 项目:marconibot 作者: s4w3d0ff 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def startWebsocket(self):
        """ Run the websocket in a thread """
        self._tick = {}
        iniTick = self.returnTicker()
        self._ids = {market: iniTick[market]['id'] for market in iniTick}
        for market in iniTick:
            self._tick[self._ids[market]] = iniTick[market]

        self._ws = WebSocketApp("wss://api2.poloniex.com/",
                                on_open=self._on_open,
                                on_message=self._on_message,
                                on_error=self._on_error,
                                on_close=self._on_close)
        self._t = _Thread(target=self._ws.run_forever)
        self._t.daemon = True
        self._t._running = True
        self._t.start()
        logger.info('Websocket thread started')
        logger.debug(self._ws.url)
test_websocket.py 文件源码 项目:Treehacks 作者: andrewsy97 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def testSockMaskKey(self):
        """ A WebSocketApp should forward the received mask_key function down
        to the actual socket.
        """

        def my_mask_key_func():
            pass

        def on_open(self, *args, **kwargs):
            """ Set the value so the test can use it later on and immediately
            close the connection.
            """
            WebSocketAppTest.get_mask_key_id = id(self.get_mask_key)
            self.close()

        app = ws.WebSocketApp('ws://echo.websocket.org/', on_open=on_open, get_mask_key=my_mask_key_func)
        app.run_forever()

        # Note: We can't use 'is' for comparing the functions directly, need to use 'id'.
        self.assertEqual(WebSocketAppTest.get_mask_key_id, id(my_mask_key_func))
test_websocket.py 文件源码 项目:Treehacks 作者: andrewsy97 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def testSockMaskKey(self):
        """ A WebSocketApp should forward the received mask_key function down
        to the actual socket.
        """

        def my_mask_key_func():
            pass

        def on_open(self, *args, **kwargs):
            """ Set the value so the test can use it later on and immediately
            close the connection.
            """
            WebSocketAppTest.get_mask_key_id = id(self.get_mask_key)
            self.close()

        app = ws.WebSocketApp('ws://echo.websocket.org/', on_open=on_open, get_mask_key=my_mask_key_func)
        app.run_forever()

        # Note: We can't use 'is' for comparing the functions directly, need to use 'id'.
        self.assertEqual(WebSocketAppTest.get_mask_key_id, id(my_mask_key_func))
PushServer.py 文件源码 项目:gRPC-Makerboards 作者: PeridotYouClod 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def serve():
  protoConfig = ProtoConfig.getConfig()
  server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
  pushServer = Push(accessToken=protoConfig.wioLinks['havok'].accessToken)
  sensors_pb2.add_PushServicer_to_server(pushServer, server)
  port = protoConfig.ports.pushPort
  server.add_insecure_port('[::]:%s' % port)
  server.start()
  print('Started Push Server on Port %s ' % port)

  websocket.enableTrace(True)
  ws = websocket.WebSocketApp(
    "wss://us.wio.seeed.io/v1/node/event",
    on_message = pushServer.on_message,
    on_error = pushServer.on_error,
    on_close = pushServer.on_close)
  ws.on_open = pushServer.on_open
  ws.run_forever()

  try:
    while True:
      time.sleep(_ONE_DAY_IN_SECONDS)
  except KeyboardInterrupt:
    server.stop(0)
reactbot.py 文件源码 项目:PantherBot 作者: PantherHackers 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def connect_to_slack(self, token):
        # Initiates connection to the server based on the token, receives websocket URL "bot_conn"
        logger.info("Starting RTM connection")
        bot_conn = self.SLACK_CLIENT.api_call(
            "rtm.start",
            token = token
        )

        logger.info("Initializing info")
        self.initialize_info()

        # Creates WebSocketApp based on the URL returned by the RTM API
        # Assigns local methods to websocket methods
        logger.info("Initializing WebSocketApplication")
        self.WEBSOCKET = websocket.WebSocketApp(bot_conn["url"],
                                on_message=self.on_message,
                                on_error=self.on_error,
                                on_close=self.on_close,
                                on_open=self.on_open)
test_websocket.py 文件源码 项目:tk-photoshopcc 作者: shotgunsoftware 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def testSockMaskKey(self):
        """ A WebSocketApp should forward the received mask_key function down
        to the actual socket.
        """

        def my_mask_key_func():
            pass

        def on_open(self, *args, **kwargs):
            """ Set the value so the test can use it later on and immediately
            close the connection.
            """
            WebSocketAppTest.get_mask_key_id = id(self.get_mask_key)
            self.close()

        app = ws.WebSocketApp('ws://echo.websocket.org/', on_open=on_open, get_mask_key=my_mask_key_func)
        app.run_forever()

        # Note: We can't use 'is' for comparing the functions directly, need to use 'id'.
        self.assertEqual(WebSocketAppTest.get_mask_key_id, id(my_mask_key_func))
health-check-redeploy.py 文件源码 项目:dockercloud-missing-tools 作者: apollo-13 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def start(self):

        print ">> starting redeploy"

        self.completedHealthChecks = 0
        self.containers = []

        for container in dockercloud.Container.list(service=self.service.resource_uri):
            if container.state in ['Running', 'Starting']:
                self.containers.append(container)

        self.containers.sort(key=lambda x: x.name)

        print "found containers %s" % ', '.join([container.name for container in self.containers])

        self.websocket = websocket.WebSocketApp(
            'wss://ws.cloud.docker.com/api/audit/v1/events',
            header=['Authorization: ' + dockercloud.auth.get_auth_header()['Authorization']],
            on_message=self.onMessage,
            on_error=self.onError,
            on_open=self.onOpen
        )
        self.websocket.run_forever()
test_websocket.py 文件源码 项目:siphon-cli 作者: getsiphon 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def testSockMaskKey(self):
        """ A WebSocketApp should forward the received mask_key function down
        to the actual socket.
        """

        def my_mask_key_func():
            pass

        def on_open(self, *args, **kwargs):
            """ Set the value so the test can use it later on and immediately
            close the connection.
            """
            WebSocketAppTest.get_mask_key_id = id(self.get_mask_key)
            self.close()

        app = ws.WebSocketApp('ws://echo.websocket.org/', on_open=on_open, get_mask_key=my_mask_key_func)
        app.run_forever()

        # Note: We can't use 'is' for comparing the functions directly, need to use 'id'.
        self.assertEqual(WebSocketAppTest.get_mask_key_id, id(my_mask_key_func))
pushover_open_client.py 文件源码 项目:py-pushover-open-client 作者: aeirsoul 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def connect(self, callback, traceRoute):
        """Connects the client to the websocket"""
        if(not self.isConnected()):
            if(traceRoute):
                #Enables tracing of connection
                self.trace = True
                websocket.enableTrace(True)
            #Set callback for received messages to go to
            self.callback = callback
            #Have to put this in here, otherwise respawned dies for some reason
            self.ws = websocket.WebSocketApp(WEBSOCKET_URL,
                                on_message = self.onRecieve,
                                on_error = self.onError,
                                on_close = self.onClose)
            self.ws.on_open = self.onOpen
            #Start the actual connection
            self.mainThread = threading.Thread(target = self.ws.run_forever, args=())
            self.mainThread.start()
        else:
            print ("Attempting to connect but already connected.")
chat.py 文件源码 项目:peristop 作者: marin-m 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def connect(self):
        self.ended = True
        self.hadCtrl = False
        self.reconnectTime = time()

        if self.timeout:
            self.timeout.cancel()

        ws = WebSocketApp(self.endpoint.replace('https:','wss:') + '/chatapi/v1/chatnow',
                          on_open = self.authentify,
                          on_message = self.parse,
                          on_error = self.error,
                          on_close = self.close, header={'User-Agent': 'ChatMan/1 (Android) '})

        self.timeout = Timer(80, self.ratamioche)
        self.timeout.daemon = True
        self.timeout.start()

        self.ws = ws
        ws.run_forever(sslopt=sslopt_ca_certs, ping_timeout=90)
test_websocket.py 文件源码 项目:cmdchallenge-site 作者: jarv 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def testSockMaskKey(self):
        """ A WebSocketApp should forward the received mask_key function down
        to the actual socket.
        """

        def my_mask_key_func():
            pass

        def on_open(self, *args, **kwargs):
            """ Set the value so the test can use it later on and immediately
            close the connection.
            """
            WebSocketAppTest.get_mask_key_id = id(self.get_mask_key)
            self.close()

        app = ws.WebSocketApp('ws://echo.websocket.org/', on_open=on_open, get_mask_key=my_mask_key_func)
        app.run_forever()

        # Note: We can't use 'is' for comparing the functions directly, need to use 'id'.
        self.assertEqual(WebSocketAppTest.get_mask_key_id, id(my_mask_key_func))
vnokcoin.py 文件源码 项目:bigfishtrader 作者: xingetouzi 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def connect(self, host, apiKey, secretKey, trace=False):
        """?????"""
        self.host = host
        self.apiKey = apiKey
        self.secretKey = secretKey

        if self.host == OKCOIN_CNY:
            self.currency = CURRENCY_CNY
        else:
            self.currency = CURRENCY_USD

        websocket.enableTrace(trace)

        self.ws = websocket.WebSocketApp(host, 
                                         on_message=self.onMessage,
                                         on_error=self.onError,
                                         on_close=self.onClose,
                                         on_open=self.onOpen)        

        self.thread = Thread(target=self.ws.run_forever)
        self.thread.start()

    #----------------------------------------------------------------------
vnokcoin.py 文件源码 项目:bigfishtrader 作者: xingetouzi 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def reconnect(self):
        """????"""
        # ?????????
        self.close()

        # ???????
        self.ws = websocket.WebSocketApp(self.host, 
                                         on_message=self.onMessage,
                                         on_error=self.onError,
                                         on_close=self.onClose,
                                         on_open=self.onOpen)        

        self.thread = Thread(target=self.ws.run_forever)
        self.thread.start()

    #----------------------------------------------------------------------
client.py 文件源码 项目:matriz 作者: stressfm 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def connect(self):
        time.sleep(random.randrange(0, 2**self.connection_attempts))
        self.connection_attempts += 1
        # websocket.enableTrace(True)
        ws = websocket.WebSocketApp(self.config_server_url,
                                    on_message=self.on_message,
                                    on_error=self.on_error,
                                    on_close=self.on_close)
        ws.on_open = self.on_open
        if self.config_server_url.startswith("wss://"):
            ws.run_forever(sslopt={"cert_reqs": ssl.CERT_REQUIRED,
                                   "ca_certs": ca_cert,
                                   "ssl_version": ssl.PROTOCOL_TLSv1_2,
                                   "keyfile": client_pem,
                                   "certfile": client_crt})
        else:
            ws.run_forever()
monitor.py 文件源码 项目:matriz 作者: stressfm 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def main():
    # websocket.enableTrace(True)
    if len(sys.argv) < 2:
        host = ws_url
    else:
        host = sys.argv[1]
    ws = websocket.WebSocketApp(host,
                                on_message=on_message,
                                on_error=on_error,
                                on_close=on_close)
    ws.on_open = on_open
    if host.startswith("wss://"):
        ws.run_forever(sslopt={"cert_reqs": ssl.CERT_REQUIRED,
                               "ca_certs": ca_cert,
                               "ssl_version": ssl.PROTOCOL_TLSv1_2,
                               "keyfile": client_pem,
                               "certfile": client_crt})
    else:
        ws.run_forever()
xc_session.py 文件源码 项目:ReactiveXComponent.py 作者: xcomponent 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def init(self, xc_api, server_url, callback):
        self.xc_api = xc_api
        self.websocket = WebSocket.WebSocketApp(server_url)

        # pylint: disable=unused-argument
        def on_message(websocket, message):
            print(message)

        def on_open(websocket):
            callback(SUCCESS, self)

        def on_error(websocket, error):
            callback(error, None)

        def on_close(websocket):
            print('### session %s closed ###' % server_url)
        # pylint: enable=unused-argument

        self.websocket.on_message = on_message
        self.websocket.on_open = on_open
        self.websocket.on_error = on_error
        self.websocket.on_close = on_close

        self.websocket.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})
son_sp.py 文件源码 项目:son-cli 作者: sonata-nfv 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __init__(self, url, vnf_name=None, metric=None, vm_id=None,
                 desc='exported metric from SP', print=True):

        self.vnf_name = vnf_name
        self.metric = metric
        self.vc_id = vm_id #the unique identifier of the vm, used by OpenStack
        self.desc = desc
        self.print = print

        self.metric_received = False
        self.prometheus_metric = None

        websocket.WebSocketApp.__init__(self, url,
                                        on_message=self._on_message,
                                        on_error=self._on_error,
                                        on_close=self._on_close,
                                        on_open=self._on_open
                                        )
test_websocket.py 文件源码 项目:Flask-SocketIO 作者: cutedogspark 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def testSockMaskKey(self):
        """ A WebSocketApp should forward the received mask_key function down
        to the actual socket.
        """

        def my_mask_key_func():
            pass

        def on_open(self, *args, **kwargs):
            """ Set the value so the test can use it later on and immediately
            close the connection.
            """
            WebSocketAppTest.get_mask_key_id = id(self.get_mask_key)
            self.close()

        app = ws.WebSocketApp('ws://echo.websocket.org/', on_open=on_open, get_mask_key=my_mask_key_func)
        app.run_forever()

        # Note: We can't use 'is' for comparing the functions directly, need to use 'id'.
        self.assertEqual(WebSocketAppTest.get_mask_key_id, id(my_mask_key_func))
shell.py 文件源码 项目:mist.api 作者: mistio 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def command(self, cmd):
        self.cmd = self._wrap_command(cmd)
        log.error(self.cmd)

        self.ws = websocket.WebSocketApp(self.uri,
                                         on_message=self._on_message,
                                         on_error=self._on_error,
                                         on_close=self._on_close)

        log.error(self.ws)
        self.ws.on_open = self._on_open
        self.ws.run_forever(ping_interval=3, ping_timeout=10)
        self.ws.close()
        retval = 0
        output = self.buffer.split("\n")[1:-1]
        return retval, "\n".join(output)
websocket_bot_sanple.py 文件源码 项目:reversi-ai-place 作者: naari3 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __init__(self, board_id, access_token, user_id):
        self.board_id = board_id
        self.access_token = access_token
        self.user_id = user_id
        self.turn = None
        self.v_board = ReversiBoard()
        ws = websocket.WebSocketApp(
            f"ws://localhost:8000/v1/board/{self.board_id}/ws",
            header=[f"Authorization: Bearer {self.access_token}"],
            on_open=self.on_open,
            on_message=self.on_message,
            on_error=self.on_error,
            on_close=self.on_close,
        )
        try:
            ws.run_forever()
        except KeyboardInterrupt:
            ws.close()
test_websocket.py 文件源码 项目:smileybot 作者: sillylyn 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def testSockMaskKey(self):
        """ A WebSocketApp should forward the received mask_key function down
        to the actual socket.
        """

        def my_mask_key_func():
            pass

        def on_open(self, *args, **kwargs):
            """ Set the value so the test can use it later on and immediately
            close the connection.
            """
            WebSocketAppTest.get_mask_key_id = id(self.get_mask_key)
            self.close()

        app = ws.WebSocketApp('ws://echo.websocket.org/', on_open=on_open, get_mask_key=my_mask_key_func)
        app.run_forever()

        # Note: We can't use 'is' for comparing the functions directly, need to use 'id'.
        self.assertEqual(WebSocketAppTest.get_mask_key_id, id(my_mask_key_func))
websocket.py 文件源码 项目:octoclient 作者: hroncok 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def run(self):
        """
        Runs thread, which listens on socket.
        Executes given callbacks on events
        """
        def on_message(ws, data):
            if data.startswith('m'):
                self.on_message(ws, json.loads(data[1:]))
            elif data.startswith('a'):
                for msg in json.loads(data[1:]):
                    self.on_message(ws, msg)

        self.socket = websocket.WebSocketApp(self.url,
                                             on_open=self.on_open,
                                             on_close=self.on_close,
                                             on_message=on_message)
        self.thread = Thread(target=self.socket.run_forever)
        self.thread.daemon = True
        self.thread.start()
api.py 文件源码 项目:bonsai-cli 作者: BonsaiAI 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def run(self):
        ws = websocket.WebSocketApp(
            self._ws_url,
            on_message=self._on_message,
            on_error=self._on_error,
            on_close=LogStreamHandler.on_close,
            header=['Authorization: {}'.format(self._access_key)]
        )
        ws.on_open = self._on_open
        log.debug("Starting websocket connection for bonsai log --follow...")

        proxy = self._get_proxy()
        log.debug('proxy: %s', proxy)

        try:
            ws.run_forever(**proxy)
        except KeyboardInterrupt as e:
            log.debug("Handling user Ctrl+C")
test_websocket.py 文件源码 项目:bitcoinelasticsearch 作者: currentsea 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def testSockMaskKey(self):
        """ A WebSocketApp should forward the received mask_key function down
        to the actual socket.
        """

        def my_mask_key_func():
            pass

        def on_open(self, *args, **kwargs):
            """ Set the value so the test can use it later on and immediately
            close the connection.
            """
            WebSocketAppTest.get_mask_key_id = id(self.get_mask_key)
            self.close()

        app = ws.WebSocketApp('ws://echo.websocket.org/', on_open=on_open, get_mask_key=my_mask_key_func)
        app.run_forever()

        # Note: We can't use 'is' for comparing the functions directly, need to use 'id'.
        self.assertEqual(WebSocketAppTest.get_mask_key_id, id(my_mask_key_func))


问题


面经


文章

微信
公众号

扫码关注公众号