def test_unicode_sockopts(self):
"""test setting/getting sockopts with unicode strings"""
topic = "tést"
if str is not unicode:
topic = topic.decode('utf8')
p,s = self.create_bound_pair(zmq.PUB, zmq.SUB)
self.assertEqual(s.send_unicode, s.send_unicode)
self.assertEqual(p.recv_unicode, p.recv_unicode)
self.assertRaises(TypeError, s.setsockopt, zmq.SUBSCRIBE, topic)
self.assertRaises(TypeError, s.setsockopt, zmq.IDENTITY, topic)
s.setsockopt_unicode(zmq.IDENTITY, topic, 'utf16')
self.assertRaises(TypeError, s.setsockopt, zmq.AFFINITY, topic)
s.setsockopt_unicode(zmq.SUBSCRIBE, topic)
self.assertRaises(TypeError, s.getsockopt_unicode, zmq.AFFINITY)
self.assertRaisesErrno(zmq.EINVAL, s.getsockopt_unicode, zmq.SUBSCRIBE)
identb = s.getsockopt(zmq.IDENTITY)
identu = identb.decode('utf16')
identu2 = s.getsockopt_unicode(zmq.IDENTITY, 'utf16')
self.assertEqual(identu, identu2)
time.sleep(0.1) # wait for connection/subscription
p.send_unicode(topic,zmq.SNDMORE)
p.send_unicode(topic*2, encoding='latin-1')
self.assertEqual(topic, s.recv_unicode())
self.assertEqual(topic*2, s.recv_unicode(encoding='latin-1'))
评论列表
文章目录