agent_mt.py 文件源码

python
阅读 19 收藏 0 点赞 0 评论 0

项目:volttron-applications 作者: VOLTTRON 项目源码 文件源码
def test():
    from volttron.platform.agent import periodic

    def TestAgent(config_path, **kwargs):
        config = utils.load_config(config_path)
        agent_id = config['agentid']
        rtu_path = dict((key, config[key])
                        for key in ['campus', 'building', 'unit'])

        class Agent(PublishMixin, BaseAgent):
            def setup(self):
                super(Agent, self).setup()
                self.damper = 0

            @matching.match_regex(topics.ACTUATOR_LOCK_ACQUIRE() + '(/.*)')
            def on_lock_result(self, topic, headers, message, match):
                _log.debug("Topic: {topic}, {headers}, Message: {message}".format(
                        topic=topic, headers=headers, message=message))
                self.publish(topics.ACTUATOR_LOCK_RESULT() + match.group(0),
                             headers, jsonapi.dumps('SUCCESS'))

            @matching.match_regex(topics.ACTUATOR_SET() + '(/.*/([^/]+))')
            def on_new_data(self, topic, headers, message, match):
                _log.debug("Topic: {topic}, {headers}, Message: {message}".format(
                        topic=topic, headers=headers, message=message))
                if match.group(2) == 'Damper':
                    self.damper = int(message[0])
                self.publish(topics.ACTUATOR_VALUE() + match.group(0),
                             headers, message[0])

            @periodic(5)
            def send_data(self):
                data = {
                    'ReturnAirTemperature': 55,
                    'OutsideAirTemperature': 50,
                    'MixedAirTemperature': 45,
                    'Damper': self.damper
                }
                self.publish_ex(topics.DEVICES_VALUE(point='all', **rtu_path),
                                {}, ('application/json', jsonapi.dumps(data)))

        Agent.__name__ = 'TestAgent'
        return Agent(**kwargs)

    settings.afdd2_seconds_to_steady_state = 3
    settings.sync_trial_time = 10
    t = threading.Thread(target=utils.default_main, args=(TestAgent, 'test'))
    t.daemon = True
    t.start()
    time.sleep(2)
    main()
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号