def disconnect(self):
if self.is_connected:
if self.socket:
self.socket.setsockopt(zmq.LINGER, 0)
if self.poller:
self.poller.unregister(self.socket)
self.socket.close()
if self.context:
self.context.term()
self.is_connected = False
python类LINGER的实例源码
base_handler.py 文件源码
项目:lustre_task_driven_monitoring_framework
作者: GSI-HPC
项目源码
文件源码
阅读 28
收藏 0
点赞 0
评论 0
def __init__(self, data_dir=bqueryd.DEFAULT_DATA_DIR, redis_url='redis://127.0.0.1:6379/0', loglevel=logging.DEBUG):
if not os.path.exists(data_dir) or not os.path.isdir(data_dir):
raise Exception("Datadir %s is not a valid directory" % data_dir)
self.worker_id = binascii.hexlify(os.urandom(8))
self.node_name = socket.gethostname()
self.data_dir = data_dir
self.data_files = set()
context = zmq.Context()
self.socket = context.socket(zmq.ROUTER)
self.socket.setsockopt(zmq.LINGER, 500)
self.socket.identity = self.worker_id
self.poller = zmq.Poller()
self.poller.register(self.socket, zmq.POLLIN | zmq.POLLOUT)
self.redis_server = redis.from_url(redis_url)
self.controllers = {} # Keep a dict of timestamps when you last spoke to controllers
self.check_controllers()
self.last_wrm = 0
self.start_time = time.time()
self.logger = bqueryd.logger.getChild('worker ' + self.worker_id)
self.logger.setLevel(loglevel)
self.msg_count = 0
signal.signal(signal.SIGTERM, self.term_signal())
def brute_zmq(host, port=5555, user=None, password=None, db=0):
context = zmq.Context()
# Configure
socket = context.socket(zmq.SUB)
socket.setsockopt(zmq.SUBSCRIBE, b"") # All topics
socket.setsockopt(zmq.LINGER, 0) # All topics
socket.RCVTIMEO = 1000 # timeout: 1 sec
# Connect
socket.connect("tcp://%s:%s" % (host, port))
# Try to receive
try:
socket.recv()
return True
except Exception:
return False
finally:
socket.close()
def handle_zmq(host, port=5555, extra_config=None):
# log.debug(" * Connection to ZeroMQ: %s : %s" % (host, port))
context = zmq.Context()
# Configure
socket = context.socket(zmq.SUB)
socket.setsockopt(zmq.SUBSCRIBE, b"") # All topics
socket.setsockopt(zmq.LINGER, 0) # All topics
socket.RCVTIMEO = 1000 # timeout: 1 sec
# Connect
socket.connect("tcp://%s:%s" % (host, port))
# Try to receive
try:
socket.recv()
return True
except Exception:
return False
finally:
socket.close()
def __init__(self, bind_address, linger=-1, poll_timeout=2, loop=None):
self.bind_address = bind_address
self.loop = loop
self.context = zmq.asyncio.Context()
self.poll_timeout = poll_timeout
self.socket = self.context.socket(zmq.ROUTER)
self.socket.setsockopt(zmq.LINGER, linger)
self.in_poller = zmq.asyncio.Poller()
self.in_poller.register(self.socket, zmq.POLLIN)
log.info('Bound to: ' + self.bind_address)
self.socket.bind(self.bind_address)
self._kill = False
def _frame_worker(self):
if(getattr(self, '_frame_class', None)):
ctx = zmq.Context.instance()
skt = ctx.socket(zmq.SUB)
skt.connect("tcp://%s:27185" % self._moku._ip)
skt.setsockopt_string(zmq.SUBSCRIBE, u'')
skt.setsockopt(zmq.RCVHWM, 8)
skt.setsockopt(zmq.LINGER, 5000)
fr = self._frame_class(**self._frame_kwargs)
try:
while self._running:
if skt in zmq.select([skt], [], [], 1.0)[0]:
d = skt.recv()
fr.add_packet(d)
if fr._complete:
self._queue.put_nowait(fr)
fr = self._frame_class(**self._frame_kwargs)
finally:
skt.close()
def reset_socket(self):
# Close things if necessary
if self.pubsock is not None:
self.pubsock.close()
print("Nutmeg connecting")
print("\tPublishing to:", self.pub_address)
self.pubsock = self.context.socket(zmq.PUB)
# self.socket.setsockopt(zmq.LINGER, 0)
self.pubsock.connect(self.pub_address)
if not self.sub_running:
self.sub_running = True
self._subscribe()
else:
self.reset_sub = True
# # Last time a disconnection occurred
# self.disconnected_t = time.time()
self.running = True
self._poke_server()
def zcreate_pipe(ctx, hwm=1000):
backend = zsocket.ZSocket(ctx, zmq.PAIR)
frontend = zsocket.ZSocket(ctx, zmq.PAIR)
backend.set_hwm(hwm)
frontend.set_hwm(hwm)
# close immediately on shutdown
backend.setsockopt(zmq.LINGER, 0)
frontend.setsockopt(zmq.LINGER, 0)
endpoint = "inproc://zactor-%04x-%04x\n"\
%(random.randint(0, 0x10000), random.randint(0, 0x10000))
while True:
try:
frontend.bind(endpoint)
except:
endpoint = "inproc://zactor-%04x-%04x\n"\
%(random.randint(0, 0x10000), random.randint(0, 0x10000))
else:
break
backend.connect(endpoint)
return (frontend, backend)
def serve_data(ds, addr):
ctx = zmq.Context()
socket = ctx.socket(zmq.PUSH)
socket.set_hwm(10)
socket.bind(addr)
ds = RepeatedData(ds, -1)
try:
ds.reset_state()
logger.info("Serving data at {}".format(addr))
while True:
for dp in ds.get_data():
socket.send(dumps(dp), copy=False)
finally:
socket.setsockopt(zmq.LINGER, 0)
socket.close()
if not ctx.closed:
ctx.destroy(0)
def socket_fitness(self, chrom):
if self.socket.closed:
self.socket = self.context.socket(zmq.REQ)
self.socket.bind(self.socket_port)
self.poll.register(self.socket, zmq.POLLIN)
self.socket.send_string(';'.join([
self.func.get_Driving(),
self.func.get_Follower(),
self.func.get_Link(),
self.func.get_Target(),
self.func.get_ExpressionName(),
self.func.get_Expression(),
','.join(["{}:{}".format(e[0], e[1]) for e in self.targetPath]),
','.join([str(e) for e in chrom])
]))
while True:
socks = dict(self.poll.poll(100))
if socks.get(self.socket)==zmq.POLLIN:
return float(self.socket.recv().decode('utf-8'))
else:
self.socket.setsockopt(zmq.LINGER, 0)
self.socket.close()
self.poll.unregister(self.socket)
return self.func(chrom)
def socket_fitness(self, chrom):
if self.socket.closed:
self.socket = self.context.socket(zmq.REQ)
self.socket.bind(self.socket_port)
self.poll.register(self.socket, zmq.POLLIN)
self.socket.send_string(';'.join([
self.func.get_Driving(),
self.func.get_Follower(),
self.func.get_Link(),
self.func.get_Target(),
self.func.get_ExpressionName(),
self.func.get_Expression(),
','.join(["{}:{}".format(e[0], e[1]) for e in self.targetPath]),
','.join([str(e) for e in chrom])
]))
while True:
socks = dict(self.poll.poll(100))
if socks.get(self.socket)==zmq.POLLIN:
return float(self.socket.recv().decode('utf-8'))
else:
self.socket.setsockopt(zmq.LINGER, 0)
self.socket.close()
self.poll.unregister(self.socket)
return self.func(chrom)
def socket_fitness(self, chrom):
if self.socket.closed:
self.socket = self.context.socket(zmq.REQ)
self.socket.bind(self.socket_port)
self.poll.register(self.socket, zmq.POLLIN)
self.socket.send_string(';'.join([
self.func.get_Driving(),
self.func.get_Follower(),
self.func.get_Link(),
self.func.get_Target(),
self.func.get_ExpressionName(),
self.func.get_Expression(),
','.join(["{}:{}".format(e[0], e[1]) for e in self.targetPath]),
','.join([str(e) for e in chrom])
]))
while True:
socks = dict(self.poll.poll(100))
if socks.get(self.socket)==zmq.POLLIN:
return float(self.socket.recv().decode('utf-8'))
else:
self.socket.setsockopt(zmq.LINGER, 0)
self.socket.close()
self.poll.unregister(self.socket)
return self.func(chrom)
def test_bad_sockopts(self):
"""Test that appropriate errors are raised on bad socket options"""
s = self.context.socket(zmq.PUB)
self.sockets.append(s)
s.setsockopt(zmq.LINGER, 0)
# unrecognized int sockopts pass through to libzmq, and should raise EINVAL
self.assertRaisesErrno(zmq.EINVAL, s.setsockopt, 9999, 5)
self.assertRaisesErrno(zmq.EINVAL, s.getsockopt, 9999)
# but only int sockopts are allowed through this way, otherwise raise a TypeError
self.assertRaises(TypeError, s.setsockopt, 9999, b"5")
# some sockopts are valid in general, but not on every socket:
self.assertRaisesErrno(zmq.EINVAL, s.setsockopt, zmq.SUBSCRIBE, b'hi')
def test_sockopt_roundtrip(self):
"test set/getsockopt roundtrip."
p = self.context.socket(zmq.PUB)
self.sockets.append(p)
p.setsockopt(zmq.LINGER, 11)
self.assertEqual(p.getsockopt(zmq.LINGER), 11)
def test_attr(self):
"""set setting/getting sockopts as attributes"""
s = self.context.socket(zmq.DEALER)
self.sockets.append(s)
linger = 10
s.linger = linger
self.assertEqual(linger, s.linger)
self.assertEqual(linger, s.getsockopt(zmq.LINGER))
self.assertEqual(s.fd, s.getsockopt(zmq.FD))
def test_term_hang(self):
rep,req = self.create_bound_pair(zmq.ROUTER, zmq.DEALER)
req.setsockopt(zmq.LINGER, 0)
req.send(b'hello', copy=False)
req.close()
rep.close()
self.context.term()
def test_default_mq_args(self):
self.device = dev = devices.ThreadMonitoredQueue(zmq.ROUTER, zmq.DEALER, zmq.PUB)
dev.setsockopt_in(zmq.LINGER, 0)
dev.setsockopt_out(zmq.LINGER, 0)
dev.setsockopt_mon(zmq.LINGER, 0)
# this will raise if default args are wrong
dev.start()
self.teardown_device()
def create_bound_pair(self, type1=zmq.PAIR, type2=zmq.PAIR, interface='tcp://127.0.0.1'):
"""Create a bound socket pair using a random port."""
s1 = self.context.socket(type1)
s1.setsockopt(zmq.LINGER, 0)
port = s1.bind_to_random_port(interface)
s2 = self.context.socket(type2)
s2.setsockopt(zmq.LINGER, 0)
s2.connect('%s:%s' % (interface, port))
self.sockets.extend([s1,s2])
return s1, s2
def connect(self):
"""Bind or connect to ZMQ socket. Requires package zmq."""
context = zmq.Context()
self.socket = context.socket(self.socket_type)
self.socket.setsockopt(zmq.LINGER, 1)
host = 'tcp://{}:{}'.format(self.address, self.port)
if self.socket_type == zmq.REP:
self.socket.bind(host)
else:
self.socket.connect(host)
print('python thread connected to ' + host)
def setup_socket(self):
"""Sets up the ZMQ socket."""
context = zmq.Context()
# The component inheriting from BaseComponent should self.socket.connect
# with the appropriate address.
self.socket = context.socket(zmq.REQ)
# LINGER sets a timeout for socket.send.
self.socket.setsockopt(zmq.LINGER, 0)
# RCVTIME0 sets a timeout for socket.recv.
self.socket.setsockopt(zmq.RCVTIMEO, 500) # milliseconds
def test_forwarder(forwarder, tcp_sender, context):
"""Monitor should correctly send data"""
sender = tcp_sender
mon = context.socket(zmq.SUB)
mon.setsockopt_string(zmq.SUBSCRIBE, "")
mon.setsockopt(zmq.LINGER, 0)
mon.connect("tcp://localhost:6500")
recv = context.socket(zmq.SUB)
recv.setsockopt_string(zmq.SUBSCRIBE, "")
recv.setsockopt(zmq.LINGER, 0)
recv.connect("tcp://localhost:6002")
server_address = ('localhost', 6001)
sender.connect(server_address)
# waiting for warmup
time.sleep(1)
sender.sendall(b"test test")
data = mon.recv()
assert data is not None
sender.sendall(b"test test")
data = recv.recv()
assert data is not None
sender.close()
forwarder.terminate()
def sender(context):
s = context.socket(zmq.PUSH)
s.setsockopt(zmq.LINGER, 0)
s.bind("tcp://*:6800")
time.sleep(1)
return s
def __init__(self, redis_url='redis://127.0.0.1:6379/0', loglevel=logging.INFO):
self.redis_url = redis_url
self.redis_server = redis.from_url(redis_url)
self.context = zmq.Context()
self.socket = self.context.socket(zmq.ROUTER)
self.socket.setsockopt(zmq.LINGER, 500)
self.socket.setsockopt(zmq.ROUTER_MANDATORY, 1) # Paranoid for debugging purposes
self.socket.setsockopt(zmq.SNDTIMEO, 1000) # Short timeout
self.poller = zmq.Poller()
self.poller.register(self.socket, zmq.POLLIN | zmq.POLLOUT)
self.node_name = socket.gethostname()
self.address = bind_to_random_port(self.socket, 'tcp://' + get_my_ip(), min_port=14300, max_port=14399,
max_tries=100)
with open(os.path.join(RUNFILES_LOCATION, 'bqueryd_controller.address'), 'w') as F:
F.write(self.address)
with open(os.path.join(RUNFILES_LOCATION, 'bqueryd_controller.pid'), 'w') as F:
F.write(str(os.getpid()))
self.logger = bqueryd.logger.getChild('controller').getChild(self.address)
self.logger.setLevel(loglevel)
self.msg_count_in = 0
self.rpc_results = [] # buffer of results that are ready to be returned to callers
self.rpc_segments = {} # Certain RPC calls get split and divided over workers, this dict tracks the original RPCs
self.worker_map = {} # maintain a list of connected workers TODO get rid of unresponsive ones...
self.files_map = {} # shows on which workers a file is available on
self.worker_out_messages = {None: []} # A dict of buffers, used to round-robin based on message affinity
self.worker_out_messages_sequence = [None] # used to round-robin the outgoing messages
self.is_running = True
self.last_heartbeat = 0
self.others = {} # A dict of other Controllers running on other DQE nodes
self.start_time = time.time()
def connect_socket(self):
reply = None
for c in self.controllers:
self.logger.debug('Establishing socket connection to %s' % c)
tmp_sock = self.context.socket(zmq.REQ)
tmp_sock.setsockopt(zmq.RCVTIMEO, 2000)
tmp_sock.setsockopt(zmq.LINGER, 0)
tmp_sock.identity = self.identity
tmp_sock.connect(c)
# first ping the controller to see if it responds at all
msg = RPCMessage({'payload': 'ping'})
tmp_sock.send_json(msg)
try:
reply = msg_factory(tmp_sock.recv_json())
self.address = c
break
except:
traceback.print_exc()
continue
if reply:
# Now set the timeout to the actual requested
self.logger.debug("Connection OK, setting network timeout to %s milliseconds", self.timeout*1000)
self.controller = tmp_sock
self.controller.setsockopt(zmq.RCVTIMEO, self.timeout*1000)
else:
raise Exception('No controller connection')
def test_bad_sockopts(self):
"""Test that appropriate errors are raised on bad socket options"""
s = self.context.socket(zmq.PUB)
self.sockets.append(s)
s.setsockopt(zmq.LINGER, 0)
# unrecognized int sockopts pass through to libzmq, and should raise EINVAL
self.assertRaisesErrno(zmq.EINVAL, s.setsockopt, 9999, 5)
self.assertRaisesErrno(zmq.EINVAL, s.getsockopt, 9999)
# but only int sockopts are allowed through this way, otherwise raise a TypeError
self.assertRaises(TypeError, s.setsockopt, 9999, b"5")
# some sockopts are valid in general, but not on every socket:
self.assertRaisesErrno(zmq.EINVAL, s.setsockopt, zmq.SUBSCRIBE, b'hi')
def test_sockopt_roundtrip(self):
"test set/getsockopt roundtrip."
p = self.context.socket(zmq.PUB)
self.sockets.append(p)
p.setsockopt(zmq.LINGER, 11)
self.assertEqual(p.getsockopt(zmq.LINGER), 11)
def test_attr(self):
"""set setting/getting sockopts as attributes"""
s = self.context.socket(zmq.DEALER)
self.sockets.append(s)
linger = 10
s.linger = linger
self.assertEqual(linger, s.linger)
self.assertEqual(linger, s.getsockopt(zmq.LINGER))
self.assertEqual(s.fd, s.getsockopt(zmq.FD))
def test_term_hang(self):
rep,req = self.create_bound_pair(zmq.ROUTER, zmq.DEALER)
req.setsockopt(zmq.LINGER, 0)
req.send(b'hello', copy=False)
req.close()
rep.close()
self.context.term()
def test_default_mq_args(self):
self.device = dev = devices.ThreadMonitoredQueue(zmq.ROUTER, zmq.DEALER, zmq.PUB)
dev.setsockopt_in(zmq.LINGER, 0)
dev.setsockopt_out(zmq.LINGER, 0)
dev.setsockopt_mon(zmq.LINGER, 0)
# this will raise if default args are wrong
dev.start()
self.teardown_device()
def create_bound_pair(self, type1=zmq.PAIR, type2=zmq.PAIR, interface='tcp://127.0.0.1'):
"""Create a bound socket pair using a random port."""
s1 = self.context.socket(type1)
s1.setsockopt(zmq.LINGER, 0)
port = s1.bind_to_random_port(interface)
s2 = self.context.socket(type2)
s2.setsockopt(zmq.LINGER, 0)
s2.connect('%s:%s' % (interface, port))
self.sockets.extend([s1,s2])
return s1, s2