def on_connect(client, userdata, flags, rc):
print("Connected to the {0} topic".
format(topic))
subscribe_result = client.subscribe(topic)
publish_result_1 = client.publish(
topic=topic,
payload="Listening to messages in the Paho Python Client")
publish_result_2 = publish_command(
client,
topic,
"print_temperature_fahrenheit",
"temperature_fahrenheit",
45)
publish_result_3 = publish_command(
client,
topic,
"print_information_message",
"text",
"Python IoT")
python类Client()的实例源码
iot_python_chapter_09_06.py 文件源码
项目:Internet-of-Things-with-Python
作者: PacktPublishing
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def initialize(self):
try:
if DEBUG_ON:
print('connect again')
self._mqttc = mqtt.Client(None)
self._mqttc.on_message = self.mqtt_on_message
self._mqttc.on_connect = self.mqtt_on_connect
self._mqttc.on_subscribe = self.mqtt_on_subscribe
self._mqttc.on_publish = self.mqtt_on_publish
self._mqttc.on_disconnect = self.on_disconnect
# Enable this line if you are doing the snip code, off stress
# self._mqttc.loop_start()
except TypeError:
print('Connect to mqtter error')
return
def setup_client(self):
if self.client is None:
if not HAVE_MQTT:
print("Please install paho-mqtt 'pip install paho-mqtt' "
"to use this library")
return False
self.client = mqtt.Client(
client_id=self.blid, clean_session=self.clean,
protocol=mqtt.MQTTv311)
# Assign event callbacks
self.client.on_message = self.on_message
self.client.on_connect = self.on_connect
self.client.on_publish = self.on_publish
self.client.on_subscribe = self.on_subscribe
self.client.on_disconnect = self.on_disconnect
# Uncomment to enable debug messages
# client.on_log = self.on_log
# set TLS, self.cert_name is required by paho-mqtt, even if the
# certificate is not used...
# but v1.3 changes all this, so have to do the following:
self.log.info("Seting TLS")
try:
self.client.tls_set(
self.cert_name, cert_reqs=ssl.CERT_NONE,
tls_version=ssl.PROTOCOL_TLSv1)
except ValueError: # try V1.3 version
self.log.warn("TLS Setting failed - trying 1.3 version")
self.client._ssl_context = None
context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
context.verify_mode = ssl.CERT_NONE
context.load_default_certs()
self.client.tls_set_context(context)
# disables peer verification
self.client.tls_insecure_set(True)
self.client.username_pw_set(self.blid, self.password)
return True
return False
def mqtt_passthrough_sub():
mqttc = mqtt.Client("001.passthrough_subscriber")
mqttc.username_pw_set("iiot", "smartlinkcloud")
mqttc.on_message = on_message
mqttc.on_connect = on_connect
mqttc.on_publish = on_publish
mqttc.on_subscribe = on_subscribe
mqttc.on_log = on_log
#strBroker = "localhost"
strBroker = "101.200.158.2"
mqttc.connect(strBroker, 1883, 60)
mqttc.subscribe("001.passthrough_upstream", 0)
mqttc.loop_forever()
def _mq_reconnect(self, force=False):
if force:
self.mq_connected = False
while not self.mq_connected:
try:
self.mq_client = mqtt.Client()
self.mq_client.connect(host=self.broker, keepalive=10)
self.mq_client.subscribe(MQ_COMMAND_TOPIC)
self.mq_client.subscribe(MQ_HA_NOTIFY_TOPIC)
self.mq_client.on_message = self.on_mq_message
self.on_mq_disconnect = self.on_mq_disconnect
self.mq_client.loop_start()
self.mq_connected = True
_LOG.info("Connected to MQ!")
except Exception as ex:
_LOG.error("Could not connect to MQ: {0}".format(ex))
_LOG.warning("Trying again in 5 seconds...")
time.sleep(5)
def pass_message(meg):
import paho.mqtt.client as mqtt
#?????????????
def on_connect(client, userdata, flags, rc):
#print("Connected with result code "+str(rc))
#??on_connect??????
#??????????????
client.subscribe("topic/sub")
#???????????????
def on_message(client, userdata, msg):
print("??:"+msg.topic+" ??:"+str(msg.payload))
client = mqtt.Client()
#???
#client(clean_session=True,userdata=='1',protocol=mqtt.MQTTv31)
client.on_connect = on_connect #????????????
client.on_message = on_message #??????????????
client.connect("10.66.15.222", 1883, 60) #?????,???1883,?????60?
client.loop_start()
client.publish("movement",meg)
def connect_to_mqtt(self):
try:
if DEBUG_ON:
print 'connect again'
self._mqttc = mqtt.Client(None)
self._mqttc.on_message = self.mqtt_on_message
self._mqttc.on_connect = self.mqtt_on_connect
self._mqttc.on_subscribe = self.mqtt_on_subscribe
self._mqttc.on_publish = self.mqtt_on_publish
self._mqttc.on_disconnect = self.on_disconnect
self._mqttc.connect("broker.pikabot.org", 1883, 60)
# Enable this line if you are doing the snip code, off stress
# self._mqttc.loop_start()
except TypeError:
print 'Connect to mqtter error'
return
def __init__(self, host, port=1883, client_id="", client_username="", client_password=None, server_tls=False, server_cert=None, topics=[], mock_class=None):
self.host = host
self.port = port
self.client_id = client_id
self.client_username = client_id
self.client_password = client_password
self.topics = topics
self.server_tls = server_tls
self.server_cert = server_cert
if mock_class:
self.client = MockMQTTClient(self.client_id)
else:
self.client = paho.Client(self.client_id)
if self.client_username:
self.client.username_pw_set(self.client_username, password=self.client_password)
self._connect()
def __init__(self, host, port=1883, client_id="", client_username="", client_password=None, server_tls=False, server_cert=None, topics=[], mock_class=None):
self.host = host
self.port = port
self.client_id = client_id
self.client_username = client_id
self.client_password = client_password
self.topics = topics
self.server_tls = server_tls
self.server_cert = server_cert
if mock_class:
self.client = MockMQTTClient(self.client_id)
else:
self.client = paho.Client(self.client_id)
if self.client_username:
self.client.username_pw_set(self.client_username, password=self.client_password)
self._connect()
def __init__(self, hostname, topic, statsd_topic, statsd_type,
statsd_client, port=1883, websocket=False, client_id=None,
keepalive=60, will=None, auth=None, tls=None, qos=0):
super(MQTTStat, self).__init__()
self.hostname = hostname
self.port = port
self.client_id = client_id
self.keepalive = keepalive
self.mqtt_topic = topic
self.will = will
self.auth = auth
self.tls = tls
self.qos = qos
transport = "tcp"
if websocket:
transport = "websocket"
self.statsd_client = statsd_client
self.statsd_topic = statsd_topic
self.statsd_type = statsd_type
self.client = mqtt.Client(transport=transport)
if tls:
self.client.tls_set(**tls)
if auth:
self.client.username_pw_set(auth['username'],
password=auth.get('password'))
def main():
client = mqtt.Client(parser.get('mqtt', 'clientname'),
userdata=file, clean_session=True)
client.on_connect = on_connect
client.on_message = on_message
client.connect(parser.get('mqtt', 'server'),
parser.get('mqtt', 'port'), 60)
# Loop forever
try:
client.loop_forever()
# Catches SigINT
except KeyboardInterrupt:
global exit_me
exit_me = True
client.disconnect()
logger.info("Exiting main thread")
time.sleep(2.0)
def main():
client = mqtt.Client(parser.get('mqtt', 'clientname'),
userdata=None, clean_session=True)
client.on_connect = on_connect
client.on_message = on_message
client.connect(parser.get('mqtt', 'server'),
parser.get('mqtt', 'port'), 60)
# Loop forever
try:
client.loop_forever()
# Catches SigINT
except KeyboardInterrupt:
global exit_me
exit_me = True
client.disconnect()
logger.info("Exiting main thread")
time.sleep(2.0)
def connect(self):
# Create a mqtt client object
# TODO: maybe use UUIDs here?
pid = os.getpid()
client_id = '{}:{}'.format(self.client_id_prefix, str(pid))
self.mqttc = mqtt.Client(client_id=client_id, clean_session=True, userdata={'gateway': True})
# Handle authentication
if self.username:
self.mqttc.username_pw_set(self.username, self.password)
# Connect to broker
self.mqttc.connect(self.host, self.port, self.keepalive)
#self.mqttc.publish(self.topic + '/helo', 'hello world')
# Attach MQTT callbacks
self.mqttc.on_connect = self.on_connect
self.mqttc.on_disconnect = self.on_disconnect
self.mqttc.on_publish = self.on_publish
self.mqttc.on_subscribe = self.on_subscribe
self.mqttc.on_unsubscribe = self.on_unsubscribe
self.mqttc.on_message = self.on_message
def on_connect(client, userdata, flags, rc):
log.debug("MQTT Client connection results: {}".format(mqtt.connack_string(rc)))
# Subscribing in on_connect() means that if we lose the connection and
# reconnect then subscriptions will be renewed.
# QOS 0: The broker will deliver the message once, with no confirmation.
# QOS 1: The broker will deliver the message at least once, with confirmation required.
# QOS 2: The broker will deliver the message exactly once by using a four step handshake.
#
# A list of tuples (i.e. topic, qos). Both topic and qos must be present in the tuple.
topics = [(settings.GPS_TOPIC, 2),
(settings.MODEM_TEMP_TOPIC, 1),
(settings.WAN_CONNECTION_STATE_TOPIC, 0)]
try:
client.subscribe(topics)
except Exception as ex:
log.error('Client Subscribe exception. ex={}'.format(ex))
# Called when a message has been received on a topic that the client subscribes
# to and the message does not match an existing topic filter callback. Use
# message_callback_add() to define a callback that will be called for specific
# topic filters. on_message will serve as fallback when none matched.
def connect(self):
"""
Connect to MQTT broker.
"""
# TODO: This is currently done synchronous which could have issues in timeout situations
# because it would block other subsystems.
# => Check if we can do asynchronous connection establishment.
self.client = mqtt.Client(client_id=self.name, clean_session=True, userdata={'foo': 'bar'})
if self.broker_username:
self.client.username_pw_set(self.broker_username, self.broker_password)
self.client.on_connect = lambda *args: reactor.callFromThread(self.on_connect, *args)
self.client.on_message = lambda *args: reactor.callFromThread(self.on_message, *args)
self.client.on_log = lambda *args: reactor.callFromThread(self.on_log, *args)
# Connect with retry
self.connect_loop = LoopingCall(self.connect_with_retry)
self.connect_loop.start(self.retry_interval, now=True)
def __init__(self, wolkMQTTClientConfig):
self.clientConfig = wolkMQTTClientConfig
# Setup MQTT client
self.client = mqtt.Client(self.clientConfig.wolkClientId, True)
self.client.on_connect = self._on_mqtt_connect
self.client.on_disconnect = self._on_mqtt_disconnect
self.client.on_message = self._on_mqtt_message
if self.clientConfig.ca_cert:
self.client.tls_set(self.clientConfig.ca_cert)
if self.clientConfig.set_insecure:
self.client.tls_insecure_set(self.clientConfig.set_insecure)
self.client.username_pw_set(self.clientConfig.username, self.clientConfig.password)
self.host = self.clientConfig.host
self.port = self.clientConfig.port
lastWillTopic = "lastwill/" + self.clientConfig.username
lastWillPayloyad = "Last will of serial:" + self.clientConfig.username
self.client.will_set(lastWillTopic, lastWillPayloyad, self.clientConfig.qos, False)
self.client.on_log = self._on_log
def main():
parser = argparse.ArgumentParser(description='Sqlite Logger for MQTT broker')
parser.add_argument('--host', dest='mqtt_host', default='localhost', help='Mqtt Broker URL')
parser.add_argument('--port', dest='mqtt_port', default=1883, help='Mqtt Broker Port')
parser.add_argument('--root', dest='root_topic', default='logger/', help='Root topic for logger commands')
parser.add_argument('--mgpassword', dest='management_password', default='admin1234', help='password for management options')
parser.add_argument('--qrpassword', dest='query_password', default='query1234', help='password for query options')
init_settings(parser.parse_args())
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
print(get_host() + ':' + str(get_port()))
client.connect(get_host(), int(get_port()), 60)
client.loop_forever()
def mqtt(self):
"""Returns slack client"""
if not hasattr(self, "client"):
try:
import paho.mqtt.client as mqtt
except ImportError:
install_package("paho-mqtt")
import paho.mqtt.client as mqtt
self.client = mqtt.Client()
self.client.on_connect = on_connect
self.client.on_message = on_message
self.client.mbot = self.bot
self.client.backend = self
return self.client
def __init__(self, options):
self.messages = []
self.receiveTopic = options["control_topic"]+"/receive"
self.sendTopic = options["control_topic"]+"/send"
self.sendProxyTopic = options["proxy_control_topic"]+"/receive"
self.client = mqtt.Client("control_connection")
self.client.on_message = self.on_message
self.client.on_connect = self.on_connect
self.client.on_publish = self.on_publish
self.controlBrokerHost, self.controlBrokerPort = options["control_connection"].split(":")
self.controlBrokerPort = int(self.controlBrokerPort)
self.ready = False
self.client.connect(self.controlBrokerHost, self.controlBrokerPort, 60)
self.published = False
self.client.loop_start()
while not self.ready:
time.sleep(.4)
def _mqtt_connect(self):
"""Connect to the MQTT broker."""
self._mqtt = mqtt.Client(userdata=self)
self._mqtt.on_message = self.on_message
self._mqtt.on_connect = self.on_connect
self._mqtt.username_pw_set(self._serial, self._credentials)
self._mqtt.connect(self._network_device.address,
self._network_device.port)
self._mqtt.loop_start()
self._connected = self._connection_queue.get(timeout=10)
if self._connected:
self.request_current_state()
# Start Environmental thread
self._request_thread = EnvironmentalSensorThread(
self.request_environmental_state)
self._request_thread.start()
# Wait for first data
self._state_data_available.get()
self._sensor_data_available.get()
self._device_available = True
else:
self._mqtt.loop_stop()
return self._connected
def mqtt_connect():
try:
mqtt_client =mqtt.Client()
mqtt_client.connect(DEFAULT_MQTT_IP, DEFAULT_MQTT_PORT, DEFAULT_MQTT_TIMEOUT)
mqtt_client.loop_start()
mqtt_client.on_message = on_message_cb
except struct.error as err:
message = err.message
err.message = 'Invalid argument value passed in %s at line no. %s\nError: %s' \
% (traceback.extract_stack()[0][0], traceback.extract_stack()[0][1], message)
logger.error('%s' %(err.message))
raise err
except Exception, tx:
tx.message = 'Could not connect to the JET notification server'
logger.error('%s' %(tx.message))
raise Exception(tx.message)
return mqtt_client
def mqtt_connect(self):
if self.mqtt_broker_reachable():
self.verbose('Connecting to ' + self.config['mqtt_host'] + ':' + self.config['mqtt_port'])
self.mqtt_client = mqtt.Client(self.config['mqtt_client_id'], clean_session=False)
if 'mqtt_user' in self.config and 'mqtt_password' in self.config:
self.mqtt_client.username_pw_set(self.config['mqtt_user'], self.config['mqtt_password'])
self.mqtt_client.on_connect = self.mqtt_on_connect
self.mqtt_client.on_disconnect = self.mqtt_on_disconnect
self.mqtt_client.on_disconnect = self.mqtt_on_disconnect
self.mqtt_client.on_message = self.mqtt_on_message
try:
self.mqtt_client.connect(self.config['mqtt_host'], int(self.config['mqtt_port']), 10)
self.mqtt_client.subscribe(self.config['mqtt_topic_prefix'] + '/#', self.mqtt_qos_subscribe)
self.mqtt_client.loop_start()
except:
self.error(traceback.format_exc())
self.mqtt_client = None
else:
self.error(self.config['mqtt_host'] + ':' + self.config['mqtt_port'] + ' not reachable!')
def Publish(self, target, channel, message):
client = mqtt.Client()
client.max_inflight_messages_set(200000)
client.connect(target, 1883)
client.loop_start()
msg_info = client.publish(channel, message, qos=1)
if msg_info.is_published() == False:
msg_info.wait_for_publish()
client.disconnect()
def Publish(target, channel, message):
client = mqtt.Client()
client.max_inflight_messages_set(200000)
client.connect(target, 1883)
client.loop_start()
msg_info = client.publish(channel, message, qos=1)
if msg_info.is_published() == False:
msg_info.wait_for_publish()
client.disconnect()
def Publish(target,channel,message):
client = mqtt.Client()
client.max_inflight_messages_set(200000)
client.connect(target, 1883)
(rc, mid) = client.publish(channel, message, qos=1)
#time.sleep(0.01)
print "DMQTT RESULT : "+str(rc)
def Publish(self, target, channel, message):
client = mqtt.Client()
client.max_inflight_messages_set(200000)
client.connect(target, 1883)
client.loop_start()
msg_info = client.publish(channel, message, qos=1)
if msg_info.is_published() == False:
msg_info.wait_for_publish()
client.disconnect()
def Publish(self, target, channel, message):
client = mqtt.Client()
client.max_inflight_messages_set(200000)
client.connect(target, 1883)
client.loop_start()
msg_info = client.publish(channel, message, qos=1)
if msg_info.is_published() == False:
msg_info.wait_for_publish()
client.disconnect()
def Publish(target,channel,message):
client = mqtt.Client()
#client.on_publish = on_publish
client.max_inflight_messages_set(200000)
client.connect(target, 1883)
(rc, mid) = client.publish(channel, message, qos=1)
#time.sleep(0.01)
print "DMQTT RESULT : "+str(rc)
def Publish(self, target, channel, message):
client = mqtt.Client()
#client.on_publish = self.on_publish
client.max_inflight_messages_set(200000)
client.connect(target, 1883)
client.loop_start()
msg_info = client.publish(channel, message, qos=1)
if msg_info.is_published() == False:
msg_info.wait_for_publish()
client.disconnect()
#time.sleep(0.01)
print "MAP MQTT IS PUBLISH : "+str(msg_info.is_published())
print "MAP MQTT IP : "+target
print "MAP MQTT MESSAGE : "+message
print ""
def Publish(self, target, channel, message):
client = mqtt.Client()
#client.on_publish = self.on_publish
client.max_inflight_messages_set(200000)
client.connect(target, 1883)
client.loop_start()
msg_info = client.publish(channel, message, qos=1)
if msg_info.is_published() == False:
msg_info.wait_for_publish()
client.disconnect()
#time.sleep(0.01)