def publish(self, topic, payload, qos=0, retain=False):
"""Publish a message, with the given parameters.
Substitute in the _msg_seq if the payload contains {seq}."""
if not topic:
print("Topic must be specified")
else:
if not payload:
print("Payload not specified, so a zero-length message will be published.")
payload = None
elif (not isinstance(payload, bytearray)) and ("{seq}" in payload):
payload = payload.format(seq=self._msg_seq)
(result, msg_id) = self._mqttclient.publish(topic=topic, payload=payload, qos=qos, retain=retain)
print("...msg_id={!r}, result={} ({})".format(msg_id, result, mqtt.error_string(result)))
if result == mqtt.MQTT_ERR_SUCCESS:
self._msg_seq += 1
python类error_string()的实例源码
def publish_file(file_name, file_path):
global mqtt_client
log.debug('publish_file({})'.format(file_path))
try:
with open(file_path) as fh:
file_contents = fh.read()
ret_obj = mqtt_client.publish(topic=file_name, payload=file_contents, qos=0)
if ret_obj.rc == mqtt.MQTT_ERR_SUCCESS:
log.debug('MQTT published file: {}'.format(file_path))
else:
log.warning('MQTT failed to publish file: {}'.format(file_path))
log.warning('MQTT failed to publish file. error: {}'.format(mqtt.error_string(ret_obj.rc)))
except Exception as ex:
log.error('Exception in publish_file(). ex: {}'.format(ex))
# This function will periodically publish device data to the MQTT Broker
def _publish(self, message):
""" publish WolkMQTTPublishMessage
"""
logger.info("Publish %s", message)
if not self.client:
raise WolkMQTTClientException("No mqtt client")
if not message:
logger.warning("No message to publish")
return(False, "No message to publish")
info = self.client.publish(message.topic, message.payload, self.clientConfig.qos)
if info.rc == mqtt.MQTT_ERR_SUCCESS:
return(True, "")
elif info.is_published:
return(True, "")
else:
return(False, mqtt.error_string(info.rc))
def subscribe(self, sub):
"""Subscribe to a topic, using the Subscription (namedtuple)."""
if not sub.topic:
print("Topic must be specified")
else:
(result, msg_id) = self._mqttclient.subscribe(topic=sub.topic, qos=(sub.qos or 0))
print("...msg_id={!r}, result={} ({})".format(msg_id, result, mqtt.error_string(result)))
if result == mqtt.MQTT_ERR_SUCCESS:
self._discard_sub(sub.topic) # do not want two Subscriptions with same topic, but different qos
self._subscriptions.add(sub)
def unsubscribe(self, topic):
"""Unsubscribe from a topic."""
if not topic:
print("Topic must be specified")
else:
(result, msg_id) = self._mqttclient.unsubscribe(topic)
print("...msg_id={!r}, result={} ({})".format(msg_id, result, mqtt.error_string(result)))
if result == mqtt.MQTT_ERR_SUCCESS:
self._discard_sub(topic)
def on_disconnect(self, client, userdata, rc):
self.is_connected = False
Log.info("[Exiting] Disconnected: %s", mqtt.error_string(rc))
self.client.loop_stop()
sys.exit(-1)
# The callback for when the client receives a CONNACK response from the server.
#@staticmethod
def _on_mqtt_connect(self, _, __, ___, result):
if result:
errorMessage = "Error connecting to mqtt broker: " + mqtt.connack_string(result)
logger.error(errorMessage)
raise WolkMQTTClientException(errorMessage)
else:
logger.info("Connected %s to mqtt broker", self.clientConfig.username)
for topic in self.clientConfig.topics:
(res, ____) = self.client.subscribe(topic, self.clientConfig.qos)
if res == 0:
logger.info("Subscribed to topic: %s", topic)
else:
logger.error("Failed subscribing to topic: %s reason: %s", topic, mqtt.error_string(res))