def test_EventDevicePortConnectionFromPython(self):
self.localEvent = threading.Event()
self.eventFlag = False
self._devBooter, self._devMgr = self.launchDeviceManager("/nodes/test_BasicTestDevice_node/DeviceManager.dcd.xml", self._domMgr)
self.assertNotEqual(self._devBooter, None)
self._domMgr.installApplication("/waveforms/MessageTestPyJava/MessageTestPyJava.sad.xml")
appFact = self._domMgr._get_applicationFactories()[0]
self.assertNotEqual(appFact, None)
app = appFact.create(appFact._get_name(), [], [])
self.assertNotEqual(app, None)
app.start() # kick off events
time.sleep(2)
components = app._get_registeredComponents()
for component in components:
print component.componentObject._get_identifier()
if 'DCE:b1fe6cc1-2562-4878-9a69-f191f89a6ef8' in component.componentObject._get_identifier():
stuff = component.componentObject.query([])
recval = any.from_any(stuff[0].value)
self.assertEquals(6, len(recval))
for val in recval:
self.assertEquals('test_message' in val, True)
app.releaseObject() # kill producer/consumer
test_08_MessagingJava.py 文件源码
python
阅读 27
收藏 0
点赞 0
评论 0
评论列表
文章目录