def test_cyclic_destroy(self):
"""ctx.destroy should succeed when cyclic ref prevents gc"""
# test credit @dln (GH #137):
class CyclicReference(object):
def __init__(self, parent=None):
self.parent = parent
def crash(self, sock):
self.sock = sock
self.child = CyclicReference(self)
def crash_zmq():
ctx = self.Context()
sock = ctx.socket(zmq.PULL)
c = CyclicReference()
c.crash(sock)
ctx.destroy()
crash_zmq()
python类PULL的实例源码
def test_cyclic_destroy(self):
"""ctx.destroy should succeed when cyclic ref prevents gc"""
# test credit @dln (GH #137):
class CyclicReference(object):
def __init__(self, parent=None):
self.parent = parent
def crash(self, sock):
self.sock = sock
self.child = CyclicReference(self)
def crash_zmq():
ctx = self.Context()
sock = ctx.socket(zmq.PULL)
c = CyclicReference()
c.crash(sock)
ctx.destroy()
crash_zmq()
def test_shadow_pyczmq(self):
try:
from pyczmq import zctx, zsocket, zstr
except Exception:
raise SkipTest("Requires pyczmq")
ctx = zctx.new()
a = zsocket.new(ctx, zmq.PUSH)
zsocket.bind(a, "inproc://a")
ctx2 = self.Context.shadow_pyczmq(ctx)
b = ctx2.socket(zmq.PULL)
b.connect("inproc://a")
zstr.send(a, b'hi')
rcvd = self.recv(b)
self.assertEqual(rcvd, b'hi')
b.close()
def test_shadow_pyczmq(self):
try:
from pyczmq import zctx, zsocket
except Exception:
raise SkipTest("Requires pyczmq")
ctx = zctx.new()
ca = zsocket.new(ctx, zmq.PUSH)
cb = zsocket.new(ctx, zmq.PULL)
a = zmq.Socket.shadow(ca)
b = zmq.Socket.shadow(cb)
a.bind("inproc://a")
b.connect("inproc://a")
a.send(b'hi')
rcvd = self.recv(b)
self.assertEqual(rcvd, b'hi')
def test_cyclic_destroy(self):
"""ctx.destroy should succeed when cyclic ref prevents gc"""
# test credit @dln (GH #137):
class CyclicReference(object):
def __init__(self, parent=None):
self.parent = parent
def crash(self, sock):
self.sock = sock
self.child = CyclicReference(self)
def crash_zmq():
ctx = self.Context()
sock = ctx.socket(zmq.PULL)
c = CyclicReference()
c.crash(sock)
ctx.destroy()
crash_zmq()
def test_shadow_pyczmq(self):
try:
from pyczmq import zctx, zsocket, zstr
except Exception:
raise SkipTest("Requires pyczmq")
ctx = zctx.new()
a = zsocket.new(ctx, zmq.PUSH)
zsocket.bind(a, "inproc://a")
ctx2 = self.Context.shadow_pyczmq(ctx)
b = ctx2.socket(zmq.PULL)
b.connect("inproc://a")
zstr.send(a, b'hi')
rcvd = self.recv(b)
self.assertEqual(rcvd, b'hi')
b.close()
def execute_command_streamer():
from oldspeak.console.parsers.streamer import parser
args = parser.parse_args(get_sub_parser_argv())
bootstrap_conf_with_gevent(args)
device = Device(zmq.STREAMER, zmq.PULL, zmq.PUSH)
device.bind_in(args.pull)
device.bind_out(args.push)
if args.pull_hwm:
device.setsockopt_in(zmq.RCVHWM, args.pull_hwm)
if args.push_hwm:
device.setsockopt_out(zmq.SNDHWM, args.push_hwm)
print "oldspeak streamer started"
print "date", datetime.utcnow().isoformat()
print "pull", (getattr(args, 'pull'))
print "push", (getattr(args, 'push'))
device.start()
def consumer():
consumer_id = random.randrange(1,10005)
print "I am consumer #%s" % (consumer_id)
context = zmq.Context()
# recieve work
consumer_receiver = context.socket(zmq.PULL)
consumer_receiver.connect("tcp://127.0.0.1:5557")
# send work
consumer_sender = context.socket(zmq.PUSH)
consumer_sender.connect("tcp://127.0.0.1:5558")
while True:
work = consumer_receiver.recv_json()
data = work['num']
result = { 'consumer' : consumer_id, 'num' : data}
if data%2 == 0:
consumer_sender.send_json(result)
def __init__(self, name, send_qsize=0, mode='ipc'):
self._name = name
self._conn_info = None
self._context_lock = threading.Lock()
self._context = zmq.Context()
self._tosock = self._context.socket(zmq.ROUTER)
self._frsock = self._context.socket(zmq.PULL)
self._tosock.set_hwm(10)
self._frsock.set_hwm(10)
self._dispatcher = CallbackManager()
self._send_queue = queue.Queue(maxsize=send_qsize)
self._rcv_thread = None
self._snd_thread = None
self._mode = mode
assert mode in ('ipc', 'tcp')
def _setup_ipc(self):
'''
Subscribe to the pub IPC
and publish the messages
on the right transport.
'''
self.ctx = zmq.Context()
log.debug('Setting up the publisher puller')
self.sub = self.ctx.socket(zmq.PULL)
self.sub.bind(PUB_IPC_URL)
try:
self.sub.setsockopt(zmq.HWM, self.opts['hwm'])
# zmq 2
except AttributeError:
# zmq 3
self.sub.setsockopt(zmq.RCVHWM, self.opts['hwm'])
def main():
try:
context = zmq.Context(1)
# Socket facing clients
frontend = context.socket(zmq.PULL)
frontend.bind("tcp://*:5559")
# Socket facing services
backend = context.socket(zmq.PUSH)
backend.bind("tcp://*:5560")
zmq.device(zmq.STREAMER, frontend, backend)
except Exception, e:
print e
print "bringing down zmq device"
finally:
pass
frontend.close()
backend.close()
context.term()
def init_recv_socket(self):
logger.info("Initalizing receive socket")
self.context = zmq.Context()
self.socket = self.context.socket(zmq.PULL)
logger.info("Initalized receive socket")
while not self.isInterruptionRequested():
try:
time.sleep(0.1)
logger.info("Trying to get a connection to gnuradio...")
self.socket.connect("tcp://{0}:{1}".format(self.ip, self.gr_port))
logger.info("Got connection")
break
except (ConnectionRefusedError, ConnectionResetError):
continue
except Exception as e:
logger.error("Unexpected error", str(e))
def test_send_1k_push_pull(self):
down, up, port = self.create_bound_pair(zmq.PUSH, zmq.PULL)
eventlet.sleep()
done = eventlet.Event()
def tx():
tx_i = 0
while tx_i <= 1000:
tx_i += 1
down.send(str(tx_i).encode())
def rx():
while True:
rx_i = up.recv()
if rx_i == b"1000":
done.send(0)
break
eventlet.spawn(tx)
eventlet.spawn(rx)
final_i = done.wait()
self.assertEqual(final_i, 0)
def __init__(self, opts=None):
if opts is None:
self.opts = self.process_config(CONFIG_LOCATION)
else:
self.opts = opts
return
# General setup of ZeroMQ
self.ctx = zmq.Context()
self.loop = zmq.eventloop.IOLoop.instance()
# Begin setup of PULL socket
self.pull_socket = self.ctx.socket(zmq.PULL)
self.pull_socket.bind('tcp://127.0.0.1:2001')
self.pull_stream = zmq.eventloop.zmqstream.ZMQStream(self.pull_socket, self.loop)
self.pull_stream.on_recv(self.stream_decode)
# Begin setup of PUSH socket for IPC to publisher
self.push_socket = self.ctx.socket(zmq.PUSH)
self.push_socket.connect('ipc:///tmp/reactor.ipc')
self.push_stream = zmq.eventloop.zmqstream.ZMQStream(self.push_socket, self.loop)
self.actions = loader.load_actions(self.opts, '/home/mp/devel/eventdriventalk/actions')
self.registers = loader.load_registers(self.opts, '/home/mp/devel/eventdriventalk/registers')
self.rules = loader.load_registers(self.opts, '/home/mp/devel/eventdriventalk/rules')
def msg_server(self):
import zmq
import time
context = zmq.Context()
socket = context.socket(zmq.PULL)
socket.bind("tcp://*:%s"%config.ZMQ_PORT)
logging.info("zmq msg_server start...")
while not self.is_terminated:
# Wait for next request from client
message = socket.recv()
logging.info("new pull message: %s", message)
self.process_message(message)
time.sleep (1) # Do some 'work'
def main(self):
"""
The main of the logger.
"""
config = Config.get()
# Startup logger.
self.__startup()
# Register our socket for asynchronous incoming messages.
self.__message_controller.register_end_point('pull', zmq.PULL, config.get_logger_pull_end_point())
# Register supported message types
self.__message_controller.register_message_type(HaltMessage.MESSAGE_TYPE)
self.__message_controller.register_message_type(LogFileMessage.MESSAGE_TYPE)
# Register message received event handlers.
self.__message_controller.register_listener(HaltMessage.MESSAGE_TYPE, HaltMessageEventHandler.handle)
self.__message_controller.register_listener(LogFileMessage.MESSAGE_TYPE, LogFileMessageEventHandler.handle)
# Register other event handlers.
self.__event_controller.event_queue_empty.register_listener(self.__message_controller.receive_message)
# Run the event loop.
self.__event_controller.loop()
# Shutdown logger.
self.__shutdown()
# ------------------------------------------------------------------------------------------------------------------
def test_context_manager(self):
url = 'inproc://a'
with self.Context() as ctx:
with ctx.socket(zmq.PUSH) as a:
a.bind(url)
with ctx.socket(zmq.PULL) as b:
b.connect(url)
msg = b'hi'
a.send(msg)
rcvd = self.recv(b)
self.assertEqual(rcvd, msg)
self.assertEqual(b.closed, True)
self.assertEqual(a.closed, True)
self.assertEqual(ctx.closed, True)
def test_identity(self):
s = self.context.socket(zmq.PULL)
self.sockets.append(s)
ident = b'identity\0\0'
s.identity = ident
self.assertEqual(s.get(zmq.IDENTITY), ident)
def test_shadow(self):
p = self.socket(zmq.PUSH)
p.bind("tcp://127.0.0.1:5555")
p2 = zmq.Socket.shadow(p.underlying)
self.assertEqual(p.underlying, p2.underlying)
s = self.socket(zmq.PULL)
s2 = zmq.Socket.shadow(s.underlying)
self.assertNotEqual(s.underlying, p.underlying)
self.assertEqual(s.underlying, s2.underlying)
s2.connect("tcp://127.0.0.1:5555")
sent = b'hi'
p2.send(sent)
rcvd = self.recv(s2)
self.assertEqual(rcvd, sent)
def test_recv_multipart(self):
@gen.coroutine
def test():
a, b = self.create_bound_pair(zmq.PUSH, zmq.PULL)
f = b.recv_multipart()
assert not f.done()
yield a.send(b'hi')
recvd = yield f
self.assertEqual(recvd, [b'hi'])
self.loop.run_sync(test)