python类error_string()的实例源码

mqtt_client_shell.py 文件源码 项目:python-mqtt-client-shell 作者: bapowell 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def publish(self, topic, payload, qos=0, retain=False):
        """Publish a message, with the given parameters.
        Substitute in the _msg_seq if the payload contains {seq}."""
        if not topic:
            print("Topic must be specified")
        else:
            if not payload: 
                print("Payload not specified, so a zero-length message will be published.")
                payload = None
            elif (not isinstance(payload, bytearray)) and ("{seq}" in payload):
                payload = payload.format(seq=self._msg_seq)

            (result, msg_id) = self._mqttclient.publish(topic=topic, payload=payload, qos=qos, retain=retain)
            print("...msg_id={!r}, result={} ({})".format(msg_id, result, mqtt.error_string(result)))
            if result == mqtt.MQTT_ERR_SUCCESS:
                self._msg_seq += 1
mqtt_app.py 文件源码 项目:sdk-samples 作者: cradlepoint 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def publish_file(file_name, file_path):
    global mqtt_client
    log.debug('publish_file({})'.format(file_path))
    try:
        with open(file_path) as fh:
            file_contents = fh.read()
        ret_obj = mqtt_client.publish(topic=file_name, payload=file_contents, qos=0)

        if ret_obj.rc == mqtt.MQTT_ERR_SUCCESS:
            log.debug('MQTT published file: {}'.format(file_path))
        else:
            log.warning('MQTT failed to publish file: {}'.format(file_path))
            log.warning('MQTT failed to publish file. error: {}'.format(mqtt.error_string(ret_obj.rc)))

    except Exception as ex:
        log.error('Exception in publish_file(). ex: {}'.format(ex))


# This function will periodically publish device data to the MQTT Broker
WolkMQTT.py 文件源码 项目:WolkConnect-Python- 作者: Wolkabout 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _publish(self, message):
        """ publish WolkMQTTPublishMessage
        """
        logger.info("Publish %s", message)

        if not self.client:
            raise WolkMQTTClientException("No mqtt client")

        if not message:
            logger.warning("No message to publish")
            return(False, "No message to publish")

        info = self.client.publish(message.topic, message.payload, self.clientConfig.qos)
        if info.rc == mqtt.MQTT_ERR_SUCCESS:
            return(True, "")
        elif info.is_published:
            return(True, "")
        else:
            return(False, mqtt.error_string(info.rc))
mqtt_client_shell.py 文件源码 项目:python-mqtt-client-shell 作者: bapowell 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def subscribe(self, sub):
        """Subscribe to a topic, using the Subscription (namedtuple)."""
        if not sub.topic:
            print("Topic must be specified")
        else:
            (result, msg_id) = self._mqttclient.subscribe(topic=sub.topic, qos=(sub.qos or 0))
            print("...msg_id={!r}, result={} ({})".format(msg_id, result, mqtt.error_string(result)))
            if result == mqtt.MQTT_ERR_SUCCESS:
                self._discard_sub(sub.topic)   # do not want two Subscriptions with same topic, but different qos
                self._subscriptions.add(sub)
mqtt_client_shell.py 文件源码 项目:python-mqtt-client-shell 作者: bapowell 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def unsubscribe(self, topic):
        """Unsubscribe from a topic."""
        if not topic:
            print("Topic must be specified")
        else:
            (result, msg_id) = self._mqttclient.unsubscribe(topic)
            print("...msg_id={!r}, result={} ({})".format(msg_id, result, mqtt.error_string(result)))
            if result == mqtt.MQTT_ERR_SUCCESS:
                self._discard_sub(topic)
communicator.py 文件源码 项目:hapi 作者: mayaculpa 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def on_disconnect(self, client, userdata, rc):
        self.is_connected = False
        Log.info("[Exiting] Disconnected: %s", mqtt.error_string(rc))
        self.client.loop_stop()
        sys.exit(-1)

    # The callback for when the client receives a CONNACK response from the server.
    #@staticmethod
WolkMQTT.py 文件源码 项目:WolkConnect-Python- 作者: Wolkabout 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _on_mqtt_connect(self, _, __, ___, result):
        if result:
            errorMessage = "Error connecting to mqtt broker: " + mqtt.connack_string(result)
            logger.error(errorMessage)
            raise WolkMQTTClientException(errorMessage)
        else:
            logger.info("Connected %s to mqtt broker", self.clientConfig.username)

        for topic in self.clientConfig.topics:
            (res, ____) = self.client.subscribe(topic, self.clientConfig.qos)
            if res == 0:
                logger.info("Subscribed to topic: %s", topic)
            else:
                logger.error("Failed subscribing to topic: %s reason: %s", topic, mqtt.error_string(res))


问题


面经


文章

微信
公众号

扫码关注公众号