def _execute_command(self, command):
if len(self.job_servers) == 0:
app_log.error('there is no job server')
return
server = self.job_servers[self.job_server_index]
self.job_server_index = (self.job_server_index + 1) % len(self.job_servers)
context = zmq.Context.instance()
zmq_sock = context.socket(zmq.DEALER)
zmq_sock.linger = 1000
zmq_sock.identity = bytes(str(os.getpid()), 'ascii')
ip = server['ip']
if ip == '*':
ip = 'localhost'
url = 'tcp://{0}:{1}'.format(ip, server['zmq_port'])
app_log.info('connect %s', url)
zmq_sock.connect(url)
command = json_encode({'command': command})
app_log.info('command: %s', command)
zmq_sock.send_multipart([b'0', bytes(command, 'ascii')])
stream = ZMQStream(zmq_sock)
stream.on_recv(self.response_handler)
评论列表
文章目录