ilc_agent.py 文件源码

python
阅读 17 收藏 0 点赞 0 评论 0

项目:volttron-applications 作者: VOLTTRON 项目源码 文件源码
def create_curtailment_publish(self, current_time_str, device_name, meta):
        try:
            headers = {
                "Date": current_time_str,
                "min_compatible_version": "3.0",
                "MessageType": "Control"
            }
            subdevices = self.curtailment.get_device(device_name).command_status.keys()

            for subdevice in subdevices:
                currently_curtailed = self.curtailment.get_device(device_name).currently_curtailed[subdevice]
                curtailment_topic = "/".join([self.update_base_topic, device_name[0], subdevice])
                curtailment_status = "Active" if currently_curtailed else "Inactive"
                curtailment_message = [
                    {
                        "DeviceState": curtailment_status
                    },
                    {
                        "DeviceState": {"tz": "US/Pacific", "type": "string"}
                    }
                ]
                self.vip.pubsub.publish('pubsub', curtailment_topic, headers=headers, message=curtailment_message).get(timeout=15.0)
        except:
            _log.debug("Unable to publish device/subdevice curtailment status message.")
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号