def test_EventDevicePortConnectionJavaOnly(self):
self.localEvent = threading.Event()
self.eventFlag = False
self._devBooter, self._devMgr = self.launchDeviceManager("/nodes/test_BasicTestDevice_node/DeviceManager.dcd.xml", self._domMgr)
self.assertNotEqual(self._devBooter, None)
self._domMgr.installApplication("/waveforms/MessageTestJava/MessageTestJava.sad.xml")
appFact = self._domMgr._get_applicationFactories()[0]
self.assertNotEqual(appFact, None)
app = appFact.create(appFact._get_name(), [], [])
self.assertNotEqual(app, None)
components = app._get_registeredComponents()
app.start() # kick off events
time.sleep(2)
for component in components:
print component.componentObject._get_identifier()
if 'EventReceiveJava_1' in component.componentObject._get_identifier():
stuff = component.componentObject.query([CF.DataType("received_messages", any.to_any(None))])
recval = any.from_any(stuff[0].value)
self.assertEquals(6, len(recval))
for val in recval:
self.assertEquals('test_message' in val, True)
app.releaseObject() # kill producer/consumer
test_08_MessagingJava.py 文件源码
python
阅读 30
收藏 0
点赞 0
评论 0
评论列表
文章目录