test_08_MessagingJava.py 文件源码

python
阅读 30 收藏 0 点赞 0 评论 0

项目:core-framework 作者: RedhawkSDR 项目源码 文件源码
def test_EventDevicePortConnectionJavaOnly(self):
        self.localEvent = threading.Event()
        self.eventFlag = False

        self._devBooter, self._devMgr = self.launchDeviceManager("/nodes/test_BasicTestDevice_node/DeviceManager.dcd.xml", self._domMgr)
        self.assertNotEqual(self._devBooter, None)
        self._domMgr.installApplication("/waveforms/MessageTestJava/MessageTestJava.sad.xml")
        appFact = self._domMgr._get_applicationFactories()[0]
        self.assertNotEqual(appFact, None)
        app = appFact.create(appFact._get_name(), [], [])
        self.assertNotEqual(app, None)
        components = app._get_registeredComponents()
        app.start() # kick off events
        time.sleep(2)
        for component in components:
            print component.componentObject._get_identifier()
            if 'EventReceiveJava_1' in component.componentObject._get_identifier():
                stuff = component.componentObject.query([CF.DataType("received_messages", any.to_any(None))])
        recval = any.from_any(stuff[0].value)
        self.assertEquals(6, len(recval))
        for val in recval:
            self.assertEquals('test_message' in val, True)
        app.releaseObject() # kill producer/consumer
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号