def __init__(self, opts=None):
if opts is None:
self.opts = self.process_config(CONFIG_LOCATION)
else:
self.opts = opts
self.ctx = zmq.Context()
self.pub_socket = self.ctx.socket(zmq.PUB)
self.pub_socket.bind('tcp://127.0.0.1:2000')
self.loop = zmq.eventloop.IOLoop.instance()
self.pub_stream = zmq.eventloop.zmqstream.ZMQStream(self.pub_socket, self.loop)
# Now create PULL socket over IPC to listen to reactor
self.pull_socket = self.ctx.socket(zmq.PULL)
self.pull_socket.bind('ipc:///tmp/reactor.ipc')
self.pull_stream = zmq.eventloop.zmqstream.ZMQStream(self.pull_socket, self.loop)
self.pull_stream.on_recv(self.republish)
python类Context()的实例源码
def serviceA(context=None):
#reuse context if it exists, otherwise make a new one
context = context or zmq.Context.instance()
service = context.socket(zmq.DEALER)
#identify worker
service.setsockopt(zmq.IDENTITY,b'A')
service.connect("tcp://localhost:5560")
while True:
message = service.recv()
with myLock:
print "Service A got:"
print message
if message == "Service A":
#do some work
time.sleep(random.uniform(0,0.5))
service.send(b"Service A did your laundry")
elif message == "END":
break
else:
with myLock:
print "the server has the wrong identities!"
break
def frontendClient(context=None):
#reuse context if it exists, otherwise make a new one
context = context or zmq.Context.instance()
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5559")
socket.RCVTIMEO = 2000 #we will only wait 2s for a reply
while True:
#randomly request either service A or service B
serviceRequest = random.choice([b'Service A',b'Service B'])
with myLock:
print "client wants %s" % serviceRequest
socket.send(serviceRequest)
try:
reply = socket.recv()
except Exception as e:
print "client timed out"
break
if not reply:
break
with myLock:
print "Client got reply: "
print reply
print
#take a nap
time.sleep(1)
def __init__(self, opts=None):
if opts is None:
self.opts = self.process_config(CONFIG_LOCATION)
else:
self.opts = opts
# Start setting up ZeroMQ
self.ctx = zmq.Context()
self.socket = self.ctx.socket(zmq.SUB)
self.socket.connect('tcp://localhost:2000')
self.loop = zmq.eventloop.IOLoop.instance()
self.stream = zmq.eventloop.zmqstream.ZMQStream(self.socket, self.loop)
self.stream.on_recv(act)
# Load up actions
self.actions = loader.load_actions(self.opts, '/home/mp/devel/eventdrivetalk/actions')
def run(self):
"""
Entry point for the live plotting when started as a separate process. This starts the loop
"""
self.entity_name = current_process().name
plogger.info("Starting new thread %s", self.entity_name)
self.context = zmq.Context()
self.socket = self.context.socket(zmq.SUB)
self.socket.connect("tcp://localhost:%d" % self.port)
topic = pickle.dumps(self.var_name, protocol=pickle.HIGHEST_PROTOCOL)
self.socket.setsockopt(zmq.SUBSCRIBE, topic)
plogger.info("Subscribed to topic %s on port %d", self.var_name, self.port)
self.init(**self.init_kwargs)
# Reference to animation required so that GC doesn't clean it up.
# WILL NOT work if you remove it!!!!!
# See: http://matplotlib.org/api/animation_api.html
ani = animation.FuncAnimation(self.fig, self.loop, interval=100)
self.plt.show()
def notify_msg(self, type, price):
import zmq
try:
context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.connect ("tcp://%s:%s" % (config.ZMQ_HOST, config.ZMQ_PORT))
time.sleep(1)
message = {'type':type, 'price':price}
logging.info( "notify message %s", json.dumps(message))
socket.send_string(json.dumps(message))
except Exception as e:
logging.warn("notify_msg Exception")
pass
def rec(port):
zmq_ctx = zmq.Context()
s = zmq_ctx.socket(zmq.SUB)
s.bind('tcp://*:{port}'.format(port=port))
s.setsockopt(zmq.SUBSCRIBE, b"")
stream = ZMQStream(s)
stream.on_recv_stream(rec_frame)
ioloop.IOLoop.instance().start()
while True:
pass
def main():
port = "5556"
context = zmq.Context()
socket = context.socket(zmq.PAIR)
socket.connect("tcp://localhost:%s" % port)
socket.send_string(str('hello'))
message = '00101110'
cnt = 0
while True:
reward = socket.recv() # 1 or 0, or '-1' for None
print(reward)
msg_in = socket.recv()
print(msg_in)
# think...
msg_out = str(random.getrandbits(1) if cnt % 7 == 0 else 1)
if cnt % 2 == 0:
msg_out = str(message[cnt % 8])
socket.send(msg_out)
cnt = cnt + 1
def __init__(self, cmd, port, address=None):
try:
import zmq
except ImportError:
raise ImportError("Must have zeromq for remote learner.")
if address is None:
address = '*'
if port is None:
port = 5556
elif int(port) < 1 or int(port) > 65535:
raise ValueError("Invalid port number: %s" % port)
self.context = zmq.Context()
self.socket = self.context.socket(zmq.PAIR)
self.socket.bind("tcp://%s:%s" % (address, port))
# launch learner
if cmd is not None:
subprocess.Popen((cmd + ' ' + str(port)).split())
handshake_in = self.socket.recv().decode('utf-8')
assert handshake_in == 'hello' # handshake
# send to learner, and get response;
def __init__(self):
"""
Object constructor.
"""
Command.__init__(self)
self._zmq_context = None
"""
The ZMQ context.
:type: Context
"""
self.__end_points = {}
"""
The end points of the Enarksh daemons.
:type: dict[string,string]
"""
# ------------------------------------------------------------------------------------------------------------------
def serviceB(context=None):
#reuse context if it exists, otherwise make a new one
context = context or zmq.Context.instance()
service = context.socket(zmq.DEALER)
#identify worker
service.setsockopt(zmq.IDENTITY,b'B')
service.connect("tcp://localhost:5560")
while True:
message = service.recv()
with myLock:
print "Service B got:"
print message
if message == "Service B":
#do some work
time.sleep(random.uniform(0,0.5))
service.send(b"Service B cleaned your room")
elif message == "END":
break
else:
with myLock:
print "the server has the wrong identities!"
break
def tearDown(self):
contexts = set([self.context])
while self.sockets:
sock = self.sockets.pop()
contexts.add(sock.context) # in case additional contexts are created
sock.close(0)
for ctx in contexts:
t = Thread(target=ctx.term)
t.daemon = True
t.start()
t.join(timeout=2)
if t.is_alive():
# reset Context.instance, so the failure to term doesn't corrupt subsequent tests
zmq.sugar.context.Context._instance = None
raise RuntimeError("context could not terminate, open sockets likely remain in test")
super(BaseZMQTestCase, self).tearDown()
def main():
context = zmq.Context()
socket = zmq.Socket(context, zmq.SUB)
monitor = socket.get_monitor_socket()
socket.connect(ipc_sub_url)
while True:
status = recv_monitor_message(monitor)
if status['event'] == zmq.EVENT_CONNECTED:
break
elif status['event'] == zmq.EVENT_CONNECT_DELAYED:
pass
print('connected')
socket.subscribe('pupil')
while True:
topic = socket.recv_string()
payload = serializer.loads(socket.recv(), encoding='utf-8')
print(topic, payload)
def main():
try:
context = zmq.Context(1)
# Socket do cliente
frontend = context.socket(zmq.XREP)
frontend.bind("tcp://*:5559")
# Socket do servidor
backend = context.socket(zmq.XREQ)
backend.bind("tcp://*:5560")
zmq.device(zmq.QUEUE, frontend, backend)
except :
for val in sys.exc_info():
print(val)
print("Desativa a fila")
finally:
pass
frontend.close()
backend.close()
context.term()
def test_reqrep_raw_zmq_outside(nsproxy):
"""
Simple request-reply pattern between an agent and a direct ZMQ connection.
"""
def rep_handler(agent, message):
return message
# Create an osBrain agent that will receive the message
a1 = run_agent('a1')
a1.set_attr(received=None)
addr = a1.bind('REP', transport='tcp', handler=rep_handler,
serializer='raw')
# Create a raw ZeroMQ REQ socket
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect('tcp://%s:%s' % (addr.address.host, addr.address.port))
# Send the message
message = b'Hello world'
socket.send(message)
assert socket.recv() == message
socket.close()
context.destroy()
def test_pushpull_raw_zmq_outside(nsproxy):
"""
Simple push-pull pattern test. Channel without serialization.
The message is sent from outside osBrain, through a ZMQ PUSH socket.
"""
# Create an osBrain agent that will receive the message
a1 = run_agent('a1')
a1.set_attr(received=None)
addr = a1.bind('PULL', transport='tcp', handler=set_received,
serializer='raw')
# Create a raw ZeroMQ PUSH socket
context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.connect('tcp://%s:%s' % (addr.address.host, addr.address.port))
# Send the message
message = b'Hello world'
socket.send(message)
assert wait_agent_attr(a1, name='received', value=message)
socket.close()
context.destroy()
controller_handler.py 文件源码
项目:lustre_task_driven_monitoring_framework
作者: GSI-HPC
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def connect(self):
self.context = zmq.Context()
if not self.context:
raise RuntimeError('Failed to create ZMQ context!')
self.socket = self.context.socket(zmq.REQ)
if not self.socket:
raise RuntimeError('Failed to create ZMQ socket!')
self.socket.connect(self.endpoint)
self.poller = zmq.Poller()
self.poller.register(self.socket, zmq.POLLIN)
self.is_connected = True
database_proxy_handler.py 文件源码
项目:lustre_task_driven_monitoring_framework
作者: GSI-HPC
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def connect(self):
self.context = zmq.Context()
if not self.context:
raise RuntimeError('Failed to create ZMQ context!')
self.socket = self.context.socket(zmq.PULL)
if not self.socket:
raise RuntimeError('Failed to create ZMQ socket!')
self.socket.bind(self.endpoint)
self.poller = zmq.Poller()
self.poller.register(self.socket, zmq.POLLIN)
self.is_connected = True
master_handler.py 文件源码
项目:lustre_task_driven_monitoring_framework
作者: GSI-HPC
项目源码
文件源码
阅读 17
收藏 0
点赞 0
评论 0
def connect(self):
self.context = zmq.Context()
if not self.context:
raise RuntimeError('Failed to create ZMQ context!')
self.socket = self.context.socket(zmq.REP)
if not self.socket:
raise RuntimeError('Failed to create ZMQ socket!')
self.socket.bind(self.endpoint)
self.poller = zmq.Poller()
self.poller.register(self.socket, zmq.POLLIN)
self.is_connected = True
def __init__(self, repAddress, pubAddress):
"""Constructor"""
super(RpcServer, self).__init__()
# ??????????key?????value?????
self.__functions = {}
# zmq????
self.__context = zmq.Context()
self.__socketREP = self.__context.socket(zmq.REP) # ????socket
self.__socketREP.bind(repAddress)
self.__socketPUB = self.__context.socket(zmq.PUB) # ????socket
self.__socketPUB.bind(pubAddress)
# ??????
self.__active = False # ????????
self.__thread = threading.Thread(target=self.run) # ????????
#----------------------------------------------------------------------
def __init__(self, reqAddress, subAddress):
"""Constructor"""
super(RpcClient, self).__init__()
# zmq????
self.__reqAddress = reqAddress
self.__subAddress = subAddress
self.__context = zmq.Context()
self.__socketREQ = self.__context.socket(zmq.REQ) # ????socket
self.__socketSUB = self.__context.socket(zmq.SUB) # ????socket
# ???????????????????
self.__active = False # ????????
self.__thread = threading.Thread(target=self.run) # ????????
#----------------------------------------------------------------------
def router_main(_, pidx, args):
log = get_logger('examples.zmqserver.extra', pidx)
ctx = zmq.Context()
ctx.linger = 0
in_sock = ctx.socket(zmq.PULL)
in_sock.bind('tcp://*:5000')
out_sock = ctx.socket(zmq.PUSH)
out_sock.bind('ipc://example-events')
try:
log.info('router proxy started')
zmq.proxy(in_sock, out_sock)
except KeyboardInterrupt:
pass
except:
log.exception('unexpected error')
finally:
log.info('router proxy terminated')
in_sock.close()
out_sock.close()
ctx.term()
def reset(self):
self.status = READY
context = zmq.Context()
self._socket1 = context.socket(zmq.PUSH)
self._socket1.bind(self._address1)
self._socket1.set_hwm(32)
self._socket2 = context.socket(zmq.PULL)
self._socket2.set_hwm(32)
self._socket2.RCVTIMEO = 1
self._socket2.bind(self._address2)
self._prev_drained = False
self._sub_drained = False
self._conn1_send_count = 0
self._conn1_recv_count = {}
self._conn2_send_count = {}
self._conn2_recv_count = 0
self._retry_count = 0
def reset(self):
self.status = READY
context = zmq.Context()
self._socket = context.socket(zmq.PULL)
self._socket.RCVTIMEO = 1
sync_socket = context.socket(zmq.PUSH)
while self._ports['conn1'] is None or self._ports['sync_conn1'] is None:
sleep(0.01)
# Handshake with main process
self._socket.connect(self._address + ':' + str(self._ports['conn1']))
sync_socket.connect(self._address + ':' + str(self._ports['sync_conn1']))
packet = msgpack.dumps(b'SYNC')
sync_socket.send(packet)
sync_socket.close()
self._num_recv = 0
self._drained = False
test_zmq_pub_sub.py 文件源码
项目:integration-prototype
作者: SKA-ScienceDataProcessor
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def test_pub(self):
"""Publish log messages. bind() to PUB socket."""
# pylint: disable=E1101
context = zmq.Context()
pub = context.socket(zmq.PUB)
try:
pub.bind('tcp://*:{}'.format(self.sub_port))
except zmq.ZMQError as error:
print(error)
time.sleep(0.1)
send_count = self.send_count
for i in range(send_count):
pub.send_string('hi there {}'.format(i))
time.sleep(1e-5)
sys.stdout.flush()
# Wait for the watcher thread to exit.
while self.watcher.isAlive():
self.watcher.join(timeout=1e-5)
pub.close()
context.term()
test_zmq_pub_sub.py 文件源码
项目:integration-prototype
作者: SKA-ScienceDataProcessor
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def test_pub(self):
"""Publish log messages. connect() to PUB socket."""
# pylint: disable=E1101
context = zmq.Context()
pub = context.socket(zmq.PUB)
try:
_address = 'tcp://{}:{}'.format(self.sub_host, self.sub_port)
pub.connect(_address)
except zmq.ZMQError as error:
print('ERROR:', error)
time.sleep(0.1)
send_count = self.send_count
for i in range(send_count):
pub.send_string('hi there {}'.format(i))
time.sleep(1e-5)
# Wait for the watcher thread to exit
while self.watcher.isAlive():
self.watcher.join(timeout=1e-5)
pub.close()
context.term()
logging_handlers.py 文件源码
项目:integration-prototype
作者: SKA-ScienceDataProcessor
项目源码
文件源码
阅读 29
收藏 0
点赞 0
评论 0
def to(cls, channel, host='127.0.0.1',
port=logging.handlers.DEFAULT_TCP_LOGGING_PORT,
level=logging.NOTSET):
"""Convenience class method to create a ZmqLoghandler and
connect to a ZMQ subscriber.
Args:
channel (string): Logging channel name. This is used to build a
ZMQ topic.
host (string): Hostname / ip address of the subscriber to publish
to.
port (int, string): Port on which to publish messages.
level (int): Logging level
"""
context = zmq.Context()
publisher = context.socket(zmq.PUB)
address = 'tcp://{}:{}'.format(host, port)
publisher.connect(address)
time.sleep(0.1) # This sleep hopefully fixes the silent joiner problem.
return cls(channel, publisher, level=level)
def __init__(self):
# if not exist server, spawn server, try except around
context = zmq.Context()
# try to start server in background
os.system("justdb serve &")
main_socket = context.socket(zmq.REQ)
main_socket.connect("tcp://localhost:5555")
# print("Connecting to write server")
freeze_socket = context.socket(zmq.REQ)
freeze_socket.connect("tcp://localhost:6666")
self.main_socket = main_socket
self.freeze_socket = freeze_socket
def create_server():
context = zmq.Context()
try:
main_socket = context.socket(zmq.REP)
main_socket.bind("tcp://*:5555")
freeze_socket = context.socket(zmq.REP)
freeze_socket.bind("tcp://*:6666")
except zmq.error.ZMQError:
print("JustDB already running, this is no error.")
sys.exit()
print("Successfully started \033[92mjustdb\033[0m")
while True: # pragma: no cover
_ = main_socket.recv()
main_socket.send(b"")
_ = freeze_socket.recv()
freeze_socket.send(b"")
def create_socket(port):
"""
Create zmq sub socket.
"""
context = zmq.Context()
socket = context.socket(zmq.SUB)
try:
socket.bind("tcp://*:%s" % port)
except zmq.error.ZMQError:
print("Address already in use")
sys.exit(1)
socket.setsockopt(zmq.SUBSCRIBE, b"")
print("Start node-masternode Subscribe")
return socket, context