def test_notification_pub_sub_mix(self):
"""
Test notification messaging pattern mixed with basic pub/sub calls.
"""
self.m.register_notification_endpoint(self._simple_subscribe_cbf1, "test.notification1")
self.m.subscribe(self._simple_subscribe_cbf1, "test.notification2")
time.sleep(0.5) # give broker some time to register subscriptions
# send publish to notify endpoint
self.m.publish("test.notification1", "my-notification1")
self.assertEqual(self.wait_for_messages()[0], "my-notification1")
# send notify to subscribe endpoint
self.m.notify("test.notification2", "my-notification2")
#res = self.wait_for_messages(n_messages=2)
self.assertTrue(self.wait_for_particular_messages("my-notification1"))
self.assertTrue(self.wait_for_particular_messages("my-notification2"))
#@unittest.skip("disabled")
评论列表
文章目录