def run(self):
player = self._build_player()
context = zmq.Context()
c2s_socket = context.socket(zmq.PUSH)
c2s_socket.setsockopt(zmq.IDENTITY, self.identity)
c2s_socket.set_hwm(2)
c2s_socket.connect(self.c2s)
s2c_socket = context.socket(zmq.DEALER)
s2c_socket.setsockopt(zmq.IDENTITY, self.identity)
#s2c_socket.set_hwm(5)
s2c_socket.connect(self.s2c)
state = player.current_state()
reward, isOver = 0, False
while True:
c2s_socket.send(dumps(
(self.identity, state, reward, isOver)),
copy=False)
action = loads(s2c_socket.recv(copy=False).bytes)
reward, isOver = player.action(action)
state = player.current_state()
# compatibility
python类DEALER的实例源码
def start(self):
self.pid = os.getpid()
context = zmq.Context.instance()
zmq_sock = context.socket(zmq.DEALER)
zmq_sock.linger = 1000
zmq_sock.identity = bytes(str(self.pid), 'ascii')
if self.port == 0:
self.zmq_port = zmq_sock.bind_to_random_port('tcp://{0}'.format(self.ip))
else:
self.zmq_port = zmq_sock.bind('tcp://{0}:{1}'.format(self.ip, self.port))
self.zmq_stream = zmqstream.ZMQStream(zmq_sock)
self.zmq_stream.on_recv(self.request_handler)
self.log_format = (u'%(color)s[%(levelname)1.1s %(asctime)s.%(msecs).03d '
u'%(name)s-{0}]%(end_color)s %(message)s').format(self.pid)
self.log.info('start %s', self)
self.write_server_info_file()
atexit.register(self.remove_server_info_file)
self.io_loop = ioloop.IOLoop.current()
try:
self.io_loop.start()
except KeyboardInterrupt:
self.log.info('JobServer interrupted...')
finally:
self.remove_server_info_file()
def test_attr(self):
"""set setting/getting sockopts as attributes"""
s = self.context.socket(zmq.DEALER)
self.sockets.append(s)
linger = 10
s.linger = linger
self.assertEqual(linger, s.linger)
self.assertEqual(linger, s.getsockopt(zmq.LINGER))
self.assertEqual(s.fd, s.getsockopt(zmq.FD))
def test_bad_attr(self):
s = self.context.socket(zmq.DEALER)
self.sockets.append(s)
try:
s.apple='foo'
except AttributeError:
pass
else:
self.fail("bad setattr should have raised AttributeError")
try:
s.apple
except AttributeError:
pass
else:
self.fail("bad getattr should have raised AttributeError")
def test_router_dealer(self):
router, dealer = self.create_bound_pair(zmq.ROUTER, zmq.DEALER)
msg1 = b'message1'
dealer.send(msg1)
ident = self.recv(router)
more = router.rcvmore
self.assertEqual(more, True)
msg2 = self.recv(router)
self.assertEqual(msg1, msg2)
more = router.rcvmore
self.assertEqual(more, False)
def test_default_mq_args(self):
self.device = dev = devices.ThreadMonitoredQueue(zmq.ROUTER, zmq.DEALER, zmq.PUB)
dev.setsockopt_in(zmq.LINGER, 0)
dev.setsockopt_out(zmq.LINGER, 0)
dev.setsockopt_mon(zmq.LINGER, 0)
# this will raise if default args are wrong
dev.start()
self.teardown_device()
def test_mq_check_prefix(self):
ins = self.context.socket(zmq.ROUTER)
outs = self.context.socket(zmq.DEALER)
mons = self.context.socket(zmq.PUB)
self.sockets.extend([ins, outs, mons])
ins = unicode('in')
outs = unicode('out')
self.assertRaises(TypeError, devices.monitoredqueue, ins, outs, mons)
def test_plain(self):
"""test PLAIN authentication"""
server = self.socket(zmq.DEALER)
server.identity = b'IDENT'
client = self.socket(zmq.DEALER)
self.assertEqual(client.plain_username, b'')
self.assertEqual(client.plain_password, b'')
client.plain_username = USER
client.plain_password = PASS
self.assertEqual(client.getsockopt(zmq.PLAIN_USERNAME), USER)
self.assertEqual(client.getsockopt(zmq.PLAIN_PASSWORD), PASS)
self.assertEqual(client.plain_server, 0)
self.assertEqual(server.plain_server, 0)
server.plain_server = True
self.assertEqual(server.mechanism, zmq.PLAIN)
self.assertEqual(client.mechanism, zmq.PLAIN)
assert not client.plain_server
assert server.plain_server
self.start_zap()
iface = 'tcp://127.0.0.1'
port = server.bind_to_random_port(iface)
client.connect("%s:%i" % (iface, port))
self.bounce(server, client)
self.stop_zap()
def test_curve(self):
"""test CURVE encryption"""
server = self.socket(zmq.DEALER)
server.identity = b'IDENT'
client = self.socket(zmq.DEALER)
self.sockets.extend([server, client])
try:
server.curve_server = True
except zmq.ZMQError as e:
# will raise EINVAL if not linked against libsodium
if e.errno == zmq.EINVAL:
raise SkipTest("CURVE unsupported")
server_public, server_secret = zmq.curve_keypair()
client_public, client_secret = zmq.curve_keypair()
server.curve_secretkey = server_secret
server.curve_publickey = server_public
client.curve_serverkey = server_public
client.curve_publickey = client_public
client.curve_secretkey = client_secret
self.assertEqual(server.mechanism, zmq.CURVE)
self.assertEqual(client.mechanism, zmq.CURVE)
self.assertEqual(server.get(zmq.CURVE_SERVER), True)
self.assertEqual(client.get(zmq.CURVE_SERVER), False)
self.start_zap()
iface = 'tcp://127.0.0.1'
port = server.bind_to_random_port(iface)
client.connect("%s:%i" % (iface, port))
self.bounce(server, client)
self.stop_zap()
def __init__(self, uri=open(os.getenv("HOME") + "/.dh_uri","r").read()):
self.context = zmq.Context()
self.socket = self.context.socket(zmq.DEALER)
self.socket.connect (uri)
#self.stream = ZMQStream(self.socket)
def run(self):
self.context = zmq.Context()
self.socket = self.context.socket(zmq.DEALER)
self.socket.connect (self.controller_uri)
self.stream = ZMQStream(self.socket)
self.stream.on_recv(self.on_rcv)
self.ioloop = ioloop.IOLoop.instance()
self.ioloop.add_callback(self.on_start)
tornado.ioloop.PeriodicCallback(self.on_ping, 1000).start()
try:
self.ioloop.start()
except KeyboardInterrupt:
self.shutdown()
self.ioloop.close()
def open_connection(self, address, status_port, data_port):
self.statusBar().showMessage("Open session to {}:{}".format(address, status_port), 2000)
socket = self.context.socket(zmq.DEALER)
socket.identity = "Matplotlib_Qt_Client".encode()
socket.connect("tcp://{}:{}".format(address, status_port))
socket.send(b"WHATSUP")
poller = zmq.Poller()
poller.register(socket, zmq.POLLOUT)
time.sleep(0.1)
evts = dict(poller.poll(100))
if socket in evts:
try:
reply, desc = [e.decode() for e in socket.recv_multipart()]
desc = json.loads(desc)
self.statusBar().showMessage("Connection established. Pulling plot information.", 2000)
except:
self.statusBar().showMessage("Could not connect to server.", 2000)
return
else:
self.statusBar().showMessage("Server did not respond.", 2000)
socket.close()
self.construct_plots(desc)
# Actual data listener
if self.listener_thread:
self.Datalistener.running = False
self.listener_thread.quit()
self.listener_thread.wait()
self.listener_thread = QtCore.QThread()
self.Datalistener = DataListener(address, data_port)
self.Datalistener.moveToThread(self.listener_thread)
self.listener_thread.started.connect(self.Datalistener.loop)
self.Datalistener.message.connect(self.data_signal_received)
self.Datalistener.finished.connect(self.stop_listening)
QtCore.QTimer.singleShot(0, self.listener_thread.start)
def _memoryTask(settings, logger,master, url_setFrontend, url_getFrontend, url_getBackend, url_setBackend):
from Cache import Slab, CacheSlubLRU
# grab settings
slabSize = settings.getSlabSize()
preallocatedPool = settings.getPreallocatedPool()
getterNumber = settings.getGetterThreadNumber()
# initialize cache
cache = CacheSlubLRU(preallocatedPool , slabSize, logger) #set as 10 mega, 1 mega per slab
#log
logger.debug("Memory Process initialized:" + str(preallocatedPool) + "B, get# = " + str(getterNumber))
# Prepare our context and sockets
context = zmq.Context.instance()
# Socket to talk to get
socketGetFrontend = context.socket(zmq.ROUTER)
socketGetFrontend.bind(url_getFrontend)
# Socket to talk to workers
socketGetBackend = context.socket(zmq.DEALER)
socketGetBackend.bind(url_getBackend)
timing = {}
timing["getters"] = []
timing["setters"] = [-1]
Thread(name='MemoryGetProxy',target=_proxyThread, args=(logger, master, socketGetFrontend, socketGetBackend, url_getFrontend, url_getBackend)).start()
for i in range(getterNumber):
timing["getters"].append(-1)
th = Thread(name='MemoryGetter',target=_getThread, args=(i,logger, settings, cache,master,url_getBackend, timing))
th.start()
slaveSetQueue = Queue.Queue()
hostState = {}
hostState["current"] = None
Thread(name='MemoryPerformanceMetricator',target=_memoryMetricatorThread, args=(logger, cache, settings, master, timing)).start()
Thread(name='MemorySlaveSetter',target=_setToSlaveThread, args=(logger,settings, cache,master,url_getBackend, slaveSetQueue, hostState)).start()
_setThread(logger, settings, cache,master,url_setFrontend,slaveSetQueue, hostState, timing)
def test_attr(self):
"""set setting/getting sockopts as attributes"""
s = self.context.socket(zmq.DEALER)
self.sockets.append(s)
linger = 10
s.linger = linger
self.assertEqual(linger, s.linger)
self.assertEqual(linger, s.getsockopt(zmq.LINGER))
self.assertEqual(s.fd, s.getsockopt(zmq.FD))
def test_bad_attr(self):
s = self.context.socket(zmq.DEALER)
self.sockets.append(s)
try:
s.apple='foo'
except AttributeError:
pass
else:
self.fail("bad setattr should have raised AttributeError")
try:
s.apple
except AttributeError:
pass
else:
self.fail("bad getattr should have raised AttributeError")
def test_term_hang(self):
rep,req = self.create_bound_pair(zmq.ROUTER, zmq.DEALER)
req.setsockopt(zmq.LINGER, 0)
req.send(b'hello', copy=False)
req.close()
rep.close()
self.context.term()
def test_router_dealer(self):
router, dealer = self.create_bound_pair(zmq.ROUTER, zmq.DEALER)
msg1 = b'message1'
dealer.send(msg1)
ident = self.recv(router)
more = router.rcvmore
self.assertEqual(more, True)
msg2 = self.recv(router)
self.assertEqual(msg1, msg2)
more = router.rcvmore
self.assertEqual(more, False)
def test_default_mq_args(self):
self.device = dev = devices.ThreadMonitoredQueue(zmq.ROUTER, zmq.DEALER, zmq.PUB)
dev.setsockopt_in(zmq.LINGER, 0)
dev.setsockopt_out(zmq.LINGER, 0)
dev.setsockopt_mon(zmq.LINGER, 0)
# this will raise if default args are wrong
dev.start()
self.teardown_device()
def test_mq_check_prefix(self):
ins = self.context.socket(zmq.ROUTER)
outs = self.context.socket(zmq.DEALER)
mons = self.context.socket(zmq.PUB)
self.sockets.extend([ins, outs, mons])
ins = unicode('in')
outs = unicode('out')
self.assertRaises(TypeError, devices.monitoredqueue, ins, outs, mons)
def test_null(self):
"""test NULL (default) security"""
server = self.socket(zmq.DEALER)
client = self.socket(zmq.DEALER)
self.assertEqual(client.MECHANISM, zmq.NULL)
self.assertEqual(server.mechanism, zmq.NULL)
self.assertEqual(client.plain_server, 0)
self.assertEqual(server.plain_server, 0)
iface = 'tcp://127.0.0.1'
port = server.bind_to_random_port(iface)
client.connect("%s:%i" % (iface, port))
self.bounce(server, client, False)