def test_curve(self):
"""test CURVE encryption"""
server = self.socket(zmq.DEALER)
server.identity = b'IDENT'
client = self.socket(zmq.DEALER)
self.sockets.extend([server, client])
try:
server.curve_server = True
except zmq.ZMQError as e:
# will raise EINVAL if not linked against libsodium
if e.errno == zmq.EINVAL:
raise SkipTest("CURVE unsupported")
server_public, server_secret = zmq.curve_keypair()
client_public, client_secret = zmq.curve_keypair()
server.curve_secretkey = server_secret
server.curve_publickey = server_public
client.curve_serverkey = server_public
client.curve_publickey = client_public
client.curve_secretkey = client_secret
self.assertEqual(server.mechanism, zmq.CURVE)
self.assertEqual(client.mechanism, zmq.CURVE)
self.assertEqual(server.get(zmq.CURVE_SERVER), True)
self.assertEqual(client.get(zmq.CURVE_SERVER), False)
self.start_zap()
iface = 'tcp://127.0.0.1'
port = server.bind_to_random_port(iface)
client.connect("%s:%i" % (iface, port))
self.bounce(server, client)
self.stop_zap()
评论列表
文章目录