def publish(self, topic, payload, qos=0, retain=False):
"""Publish a message, with the given parameters.
Substitute in the _msg_seq if the payload contains {seq}."""
if not topic:
print("Topic must be specified")
else:
if not payload:
print("Payload not specified, so a zero-length message will be published.")
payload = None
elif (not isinstance(payload, bytearray)) and ("{seq}" in payload):
payload = payload.format(seq=self._msg_seq)
(result, msg_id) = self._mqttclient.publish(topic=topic, payload=payload, qos=qos, retain=retain)
print("...msg_id={!r}, result={} ({})".format(msg_id, result, mqtt.error_string(result)))
if result == mqtt.MQTT_ERR_SUCCESS:
self._msg_seq += 1
python类MQTT_ERR_SUCCESS的实例源码
def publish_file(file_name, file_path):
global mqtt_client
log.debug('publish_file({})'.format(file_path))
try:
with open(file_path) as fh:
file_contents = fh.read()
ret_obj = mqtt_client.publish(topic=file_name, payload=file_contents, qos=0)
if ret_obj.rc == mqtt.MQTT_ERR_SUCCESS:
log.debug('MQTT published file: {}'.format(file_path))
else:
log.warning('MQTT failed to publish file: {}'.format(file_path))
log.warning('MQTT failed to publish file. error: {}'.format(mqtt.error_string(ret_obj.rc)))
except Exception as ex:
log.error('Exception in publish_file(). ex: {}'.format(ex))
# This function will periodically publish device data to the MQTT Broker
def _publish(self, message):
""" publish WolkMQTTPublishMessage
"""
logger.info("Publish %s", message)
if not self.client:
raise WolkMQTTClientException("No mqtt client")
if not message:
logger.warning("No message to publish")
return(False, "No message to publish")
info = self.client.publish(message.topic, message.payload, self.clientConfig.qos)
if info.rc == mqtt.MQTT_ERR_SUCCESS:
return(True, "")
elif info.is_published:
return(True, "")
else:
return(False, mqtt.error_string(info.rc))
def loop(self, timeout=1):
""" Performs network activity when connected in non blocking mode """
if self._looping:
raise Exception("Connection in blocking mode, don't call loop")
if self._mqtt_client:
result = self._mqtt_client.loop(timeout)
if result != mqtt.MQTT_ERR_SUCCESS:
LOGGER.debug("Attempting another reconnect for %s...", self._device_id)
self._wrapped_reconnect()
def send_state(self, state, time_like=None):
""" Reports the given state to Losant for this device """
LOGGER.debug("Sending state for %s", self._device_id)
if not self._mqtt_client:
return False
if isinstance(time_like, datetime.datetime):
# getting utc tuple, and so use timegm
seconds = calendar.timegm(time_like.utctimetuple())
millis = time_like.microsecond / 1000
time_like = int(seconds * 1000 + millis)
if isinstance(time_like, time.struct_time):
# don't know the timezone, assume it is local and use mktime
time_like = int(time.mktime(time_like) * 1000)
if not time_like:
time_like = int(time.time() * 1000)
payload = json.dumps({"time": time_like, "data": state}, sort_keys=True)
result = self._mqtt_client.publish(self._state_topic(), payload)
return mqtt.MQTT_ERR_SUCCESS == result
# ============================================================
# Private functions
# ============================================================
def _cb_client_disconnect(self, client, userdata, response_code):
if not self._mqtt_client:
return
if response_code == mqtt.MQTT_ERR_SUCCESS: # intentional disconnect
self._mqtt_client = None
LOGGER.debug("Connection closed for %s", self._device_id)
self._fire_event("close")
else:
LOGGER.debug("Connection abnormally ended for %s, reconnecting...", self._device_id)
self._wrapped_reconnect()
def subscribe(self, sub):
"""Subscribe to a topic, using the Subscription (namedtuple)."""
if not sub.topic:
print("Topic must be specified")
else:
(result, msg_id) = self._mqttclient.subscribe(topic=sub.topic, qos=(sub.qos or 0))
print("...msg_id={!r}, result={} ({})".format(msg_id, result, mqtt.error_string(result)))
if result == mqtt.MQTT_ERR_SUCCESS:
self._discard_sub(sub.topic) # do not want two Subscriptions with same topic, but different qos
self._subscriptions.add(sub)
def unsubscribe(self, topic):
"""Unsubscribe from a topic."""
if not topic:
print("Topic must be specified")
else:
(result, msg_id) = self._mqttclient.unsubscribe(topic)
print("...msg_id={!r}, result={} ({})".format(msg_id, result, mqtt.error_string(result)))
if result == mqtt.MQTT_ERR_SUCCESS:
self._discard_sub(topic)
def do_connect(self, arg):
"""Connect to the MQTT server, using the current connection parameters.
If connection is successful, then go to the Messaging console."""
connected = None
ca = self.context.connection_args
try:
if ca.will:
self.context.mqttclient.will_set(topic=ca.will.topic, payload=ca.will.payload, qos=ca.will.qos, retain=ca.will.retain)
if ca.username:
self.context.mqttclient.username_pw_set(ca.username, ca.password)
else:
self.context.mqttclient.username_pw_set("", None)
if ca.tls_args.ca_certs_filepath:
ta = ca.tls_args
self.context.mqttclient.tls_set(ca_certs=ta.ca_certs_filepath, certfile=ta.cert_filepath, keyfile=ta.key_filepath,
cert_reqs=ta.cert_reqs, tls_version=ta.tls_version, ciphers=ta.ciphers)
self.context.mqttclient.tls_insecure_set(ta.tls_insecure)
rc = self.context.mqttclient.connect(host=ca.host, port=ca.port, keepalive=ca.keepalive, bind_address=ca.bind_address)
connected = (rc == mqtt.MQTT_ERR_SUCCESS)
except Exception as e:
print(e)
else:
if not connected:
print("Unable to connect")
else:
self.context.mqttclient.loop_start()
# Initiate the "Messaging" console.
MessagingConsole(self.context).cmdloop()
# Clean up (disconnect), after returning from "Messaging" console.
self.context.mqttclient.disconnect()
connected = False
self.context.mqttclient.loop_stop()
def _internal_send_message(self, topic, payload, queue):
self.logger.debug("sending topic %s with value \"%s\"" % (topic, payload))
result = self.mqtt.publish(topic, payload, retain=True)
if result == MQTT_ERR_NO_CONN and queue:
self.logger.debug("no connection, saving message with topic %s to queue" % topic)
self.queue.append([topic, payload])
elif result[0] != MQTT_ERR_SUCCESS:
self.logger.warn("failed sending message %s, mqtt error %s" % (topic, result))
return False
return True