test.py 文件源码

python
阅读 20 收藏 0 点赞 0 评论 0

项目:cloudcam 作者: revmischa 项目源码 文件源码
def init_mqtt_client(self):
        endpoint = self.iot_client.describe_endpoint()
        use_websocket = True if self.credentials else False
        endpoint_port = 443 if use_websocket else 8883
        self.mqtt_client = AWSIoTMQTTClient("testo", useWebsocket=use_websocket)
        self.mqtt_client.configureEndpoint(endpoint['endpointAddress'], endpoint_port)
        self.mqtt_client.configureOfflinePublishQueueing(-1)
        self.mqtt_client.configureConnectDisconnectTimeout(10)
        self.mqtt_client.configureMQTTOperationTimeout(10)
        self.configure_credentials()
        log.debug("OpenSSL version {}".format(ssl.OPENSSL_VERSION))
        log.debug("Connecting MQTT client to {} on port {}...".format(endpoint['endpointAddress'], endpoint_port))
        try:
            self.mqtt_client.connect()
            log.debug("MQTT client connected")
        except connectTimeoutException:
            log.error("Failed to connect MQTT client - timeout (check policy)")
            self.mqtt_client = None
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号