def __init__(self, addr="*", port="8080", logger=None):
self.logger = logger
# create a socket object
self.context = zmq.Context()
self.complete_address = Address(addr, port).complete_address
self.sync_address = ''
# Socket used with the following node
self.list_communication_channel = None
# This part is just for test
# if port == '5555':
# self.sync_address = Address(addr, '5562').complete_address
# elif port == '5556':
# self.sync_address = Address(addr, '5563').complete_address
# elif port == '5557':
# self.sync_address = Address(addr, '5564').complete_address
python类Context()的实例源码
def forward(self, data):
try:
# self.logger.debug('sending message')
self.list_communication_channel.send(data)
# self.logger.debug('ok with the message')
except zmq.NotDone:
# time.sleep(TRY_TIMEOUT)
self.logger.debug('my recipient is dead, not done')
self.list_communication_channel.close()
except zmq.Again:
self.logger.debug('my recipient is dead')
# self.list_communication_channel.close()
raise zmq.Again
except zmq.ZMQError as a:
self.logger.debug("Error in message forward " + a.strerror)
self.context.destroy()
self.context = zmq.Context()
def send_int_message(self, msg=b'ALIVE', timeout=TRACKER_INFINITE_TIMEOUT):
try:
self.logger.debug('sending message to {}'.format(self.sync_address))
tracker_object = self.list_communication_channel.send(msg, track=True, copy=False)
# wait forever
tracker_object.wait(timeout)
# self.logger.debug('ok with the message')
except zmq.NotDone:
self.logger.debug('Something went wrong with that message')
time.sleep(TRY_TIMEOUT)
# self.logger.debug('Sleep finished')
# self.list_communication_channel.close()
except zmq.ZMQError as a:
self.logger.debug(a.strerror)
self.context.destroy()
self.context = zmq.Context()
self.generate_internal_channel_client_side()
# used when it's the first time to sync
def flash(self):
if self.pid != str(os.getpid()):
# reset process pid
self.pid = str(os.getpid())
# update zmq sockets
# (couldnt share socket in differenet process)
self.zmq_socket = zmq.Context().socket(zmq.REQ)
self.zmq_file_socket = zmq.Context().socket(zmq.DEALER)
# update context
ctx = main_context(self.main_file, self.main_folder)
if self.main_param is not None:
main_config_path = os.path.join(self.main_folder, self.main_param)
params = yaml.load(open(main_config_path, 'r'))
ctx.params = params
self.context = ctx
def prepare():
config = Config()
global tee
global input_files_dir
global result_files_dir
context = zmq.Context()
logger_socket = context.socket(zmq.PUSH)
logger_socket.connect(config.server_log['external_url'])
tee = logger_socket.send_string
atexit.register(close_sockets, [logger_socket])
input_files_dir = os.path.expanduser(config.server_files['input_files_dir'])
result_files_dir = os.path.expanduser(config.server_files['result_files_dir'])
tee('Started service files with pid {}'.format(os.getpid()))
return config
def main():
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect("tcp://%s:%s" % (config.LISTEN_ON_IP, config.LISTEN_ON_PORT))
while True:
command = input("Command: ")
socket.send(command.encode(config.CODEC))
response = socket.recv().decode(config.CODEC)
print(" ... %s" % response)
words = shlex.split(response.lower())
status = words[0]
if len(words) > 1:
info = words[1:]
if status == "finished":
print("Finished status received from robot")
break
def zmq_streamer():
try:
context = zmq.Context()
# Socket facing clients
frontend = context.socket(zmq.PUSH)
frontend.bind("tcp://*:%s" % (zmq_queue_port_push))
# Socket facing services
backend = context.socket(zmq.PULL)
backend.bind("tcp://*:%s" % (zmq_queue_port_pull))
zmq.device(zmq.STREAMER, frontend, backend)
except Exception as e:
print(e)
print("bringing down zmq device")
finally:
frontend.close()
backend.close()
context.term()
def __init__(self, data_dir=bqueryd.DEFAULT_DATA_DIR, redis_url='redis://127.0.0.1:6379/0', loglevel=logging.DEBUG):
if not os.path.exists(data_dir) or not os.path.isdir(data_dir):
raise Exception("Datadir %s is not a valid directory" % data_dir)
self.worker_id = binascii.hexlify(os.urandom(8))
self.node_name = socket.gethostname()
self.data_dir = data_dir
self.data_files = set()
context = zmq.Context()
self.socket = context.socket(zmq.ROUTER)
self.socket.setsockopt(zmq.LINGER, 500)
self.socket.identity = self.worker_id
self.poller = zmq.Poller()
self.poller.register(self.socket, zmq.POLLIN | zmq.POLLOUT)
self.redis_server = redis.from_url(redis_url)
self.controllers = {} # Keep a dict of timestamps when you last spoke to controllers
self.check_controllers()
self.last_wrm = 0
self.start_time = time.time()
self.logger = bqueryd.logger.getChild('worker ' + self.worker_id)
self.logger.setLevel(loglevel)
self.msg_count = 0
signal.signal(signal.SIGTERM, self.term_signal())
def __init__(self, address=None, timeout=120, redis_url='redis://127.0.0.1:6379/0', loglevel=logging.INFO, retries=3):
self.logger = bqueryd.logger.getChild('rpc')
self.logger.setLevel(loglevel)
self.context = zmq.Context()
self.redis_url = redis_url
redis_server = redis.from_url(redis_url)
self.retries = retries
self.timeout = timeout
self.identity = binascii.hexlify(os.urandom(8))
if not address:
# Bind to a random controller
controllers = list(redis_server.smembers(bqueryd.REDIS_SET_KEY))
if len(controllers) < 1:
raise Exception('No Controllers found in Redis set: ' + bqueryd.REDIS_SET_KEY)
random.shuffle(controllers)
else:
controllers = [address]
self.controllers = controllers
self.connect_socket()
def __init__(self, push, pull, redis_conf):
super(MinerClient, self).__init__()
print("Connecting to Redis cache {} ...".format(redis_conf))
redis_host, redis_port, redis_db = redis_conf.split(":")
self.redis = redis.StrictRedis(host=redis_host, port=int(redis_port), db=int(redis_db))
self.redis.setnx('transaction', 0)
# NOTE: Expiration times for pending/processed tasks in seconds.
self.transaction_expiration = 60 * 60
self.result_expiration = 60 * 10
context = zmq.Context()
print("Connecting to push socket '{}' ...".format(push))
self.push = context.socket(zmq.PUSH)
self.push.connect(push)
print("Binding to pull socket '{}' ...".format(pull))
self.pull = context.socket(zmq.PULL)
self.pull.bind(pull)
def brute_zmq(host, port=5555, user=None, password=None, db=0):
context = zmq.Context()
# Configure
socket = context.socket(zmq.SUB)
socket.setsockopt(zmq.SUBSCRIBE, b"") # All topics
socket.setsockopt(zmq.LINGER, 0) # All topics
socket.RCVTIMEO = 1000 # timeout: 1 sec
# Connect
socket.connect("tcp://%s:%s" % (host, port))
# Try to receive
try:
socket.recv()
return True
except Exception:
return False
finally:
socket.close()
def handle_zmq(host, port=5555, extra_config=None):
# log.debug(" * Connection to ZeroMQ: %s : %s" % (host, port))
context = zmq.Context()
# Configure
socket = context.socket(zmq.SUB)
socket.setsockopt(zmq.SUBSCRIBE, b"") # All topics
socket.setsockopt(zmq.LINGER, 0) # All topics
socket.RCVTIMEO = 1000 # timeout: 1 sec
# Connect
socket.connect("tcp://%s:%s" % (host, port))
# Try to receive
try:
socket.recv()
return True
except Exception:
return False
finally:
socket.close()
def test_tcp_req_socket(event_loop, socket_factory, connect_or_bind):
rep_socket = socket_factory.create(zmq.REP)
connect_or_bind(rep_socket, 'tcp://127.0.0.1:3333', reverse=True)
def run():
frames = rep_socket.recv_multipart()
assert frames == [b'my', b'question']
rep_socket.send_multipart([b'your', b'answer'])
with run_in_background(run):
async with azmq.Context(loop=event_loop) as context:
socket = context.socket(azmq.REQ)
connect_or_bind(socket, 'tcp://127.0.0.1:3333')
await asyncio.wait_for(
socket.send_multipart([b'my', b'question']),
1,
)
frames = await asyncio.wait_for(socket.recv_multipart(), 1)
assert frames == [b'your', b'answer']
def test_tcp_rep_socket(event_loop, socket_factory, connect_or_bind):
req_socket = socket_factory.create(zmq.REQ)
connect_or_bind(req_socket, 'tcp://127.0.0.1:3333', reverse=True)
def run():
req_socket.send_multipart([b'my', b'question'])
frames = req_socket.recv_multipart()
assert frames == [b'your', b'answer']
with run_in_background(run):
async with azmq.Context(loop=event_loop) as context:
socket = context.socket(azmq.REP)
connect_or_bind(socket, 'tcp://127.0.0.1:3333')
frames = await asyncio.wait_for(socket.recv_multipart(), 1)
assert frames == [b'my', b'question']
await asyncio.wait_for(
socket.send_multipart([b'your', b'answer']),
1,
)
def test_tcp_dealer_socket(event_loop, socket_factory, connect_or_bind):
rep_socket = socket_factory.create(zmq.REP)
connect_or_bind(rep_socket, 'tcp://127.0.0.1:3333', reverse=True)
def run():
frames = rep_socket.recv_multipart()
assert frames == [b'my', b'question']
rep_socket.send_multipart([b'your', b'answer'])
with run_in_background(run):
async with azmq.Context(loop=event_loop) as context:
socket = context.socket(azmq.DEALER)
connect_or_bind(socket, 'tcp://127.0.0.1:3333')
await asyncio.wait_for(
socket.send_multipart([b'', b'my', b'question']),
1,
)
frames = await asyncio.wait_for(socket.recv_multipart(), 1)
assert frames == [b'', b'your', b'answer']
def test_tcp_router_socket(event_loop, socket_factory, connect_or_bind):
req_socket = socket_factory.create(zmq.REQ)
req_socket.identity = b'abcd'
connect_or_bind(req_socket, 'tcp://127.0.0.1:3333', reverse=True)
def run():
req_socket.send_multipart([b'my', b'question'])
frames = req_socket.recv_multipart()
assert frames == [b'your', b'answer']
with run_in_background(run):
async with azmq.Context(loop=event_loop) as context:
socket = context.socket(azmq.ROUTER)
connect_or_bind(socket, 'tcp://127.0.0.1:3333')
frames = await asyncio.wait_for(socket.recv_multipart(), 1)
identity = frames.pop(0)
assert identity == req_socket.identity
assert frames == [b'', b'my', b'question']
await asyncio.wait_for(
socket.send_multipart([identity, b'', b'your', b'answer']),
1,
)
def test_tcp_xpub_socket(event_loop, socket_factory, connect_or_bind):
sub_socket = socket_factory.create(zmq.SUB)
sub_socket.setsockopt(zmq.SUBSCRIBE, b'a')
connect_or_bind(sub_socket, 'tcp://127.0.0.1:3333', reverse=True)
def run():
frames = sub_socket.recv_multipart()
assert frames == [b'a', b'message']
with run_in_background(run) as thread_done_event:
async with azmq.Context(loop=event_loop) as context:
socket = context.socket(azmq.XPUB)
connect_or_bind(socket, 'tcp://127.0.0.1:3333')
frames = await asyncio.wait_for(socket.recv_multipart(), 1)
assert frames == [b'\1a']
while not thread_done_event.is_set():
await socket.send_multipart([b'a', b'message'])
await socket.send_multipart([b'b', b'wrong'])
sub_socket.close()
frames = await asyncio.wait_for(socket.recv_multipart(), 1)
assert frames == [b'\0a']
def test_tcp_sub_socket(event_loop, socket_factory, connect_or_bind):
xpub_socket = socket_factory.create(zmq.XPUB)
connect_or_bind(xpub_socket, 'tcp://127.0.0.1:3333', reverse=True)
def run():
# Wait one second for the subscription to arrive.
assert xpub_socket.poll(1000) == zmq.POLLIN
topic = xpub_socket.recv_multipart()
assert topic == [b'\x01a']
xpub_socket.send_multipart([b'a', b'message'])
if connect_or_bind == 'connect':
assert xpub_socket.poll(1000) == zmq.POLLIN
topic = xpub_socket.recv_multipart()
assert topic == [b'\x00a']
with run_in_background(run):
async with azmq.Context(loop=event_loop) as context:
socket = context.socket(azmq.SUB)
await socket.subscribe(b'a')
connect_or_bind(socket, 'tcp://127.0.0.1:3333')
frames = await asyncio.wait_for(socket.recv_multipart(), 1)
assert frames == [b'a', b'message']
def test_tcp_xsub_socket(event_loop, socket_factory, connect_or_bind):
xpub_socket = socket_factory.create(zmq.XPUB)
connect_or_bind(xpub_socket, 'tcp://127.0.0.1:3333', reverse=True)
def run():
# Wait one second for the subscription to arrive.
assert xpub_socket.poll(1000) == zmq.POLLIN
topic = xpub_socket.recv_multipart()
assert topic == [b'\x01a']
xpub_socket.send_multipart([b'a', b'message'])
if connect_or_bind == 'connect':
assert xpub_socket.poll(1000) == zmq.POLLIN
topic = xpub_socket.recv_multipart()
assert topic == [b'\x00a']
with run_in_background(run):
async with azmq.Context(loop=event_loop) as context:
socket = context.socket(azmq.XSUB)
await socket.send_multipart([b'\x01a'])
connect_or_bind(socket, 'tcp://127.0.0.1:3333')
frames = await asyncio.wait_for(socket.recv_multipart(), 1)
assert frames == [b'a', b'message']
def test_tcp_push_socket(event_loop, socket_factory, connect_or_bind):
pull_socket = socket_factory.create(zmq.PULL)
connect_or_bind(pull_socket, 'tcp://127.0.0.1:3333', reverse=True)
def run():
assert pull_socket.poll(1000) == zmq.POLLIN
message = pull_socket.recv_multipart()
assert message == [b'hello', b'world']
with run_in_background(run) as event:
async with azmq.Context(loop=event_loop) as context:
socket = context.socket(azmq.PUSH)
connect_or_bind(socket, 'tcp://127.0.0.1:3333')
await socket.send_multipart([b'hello', b'world'])
while not event.is_set():
await asyncio.sleep(0.1)