dummy_tv.py 文件源码

python
阅读 23 收藏 0 点赞 0 评论 0

项目:home-assistant-dlna-dmr 作者: StevenLooman 项目源码 文件源码
def async_notify_listeners(self, **kwargs):
        property = str(self.__class__.__name__)
        value = str(self.value)

        LOGGER.debug('async_notify_listeners(): %s -> %s', property, value)

        event_base = '<Event xmlns="urn:schemas-upnp-org:metadata-1-0/RCS/">' \
                     '<InstanceID val="0" />' \
                     '</Event>'
        el_event = ET.fromstring(event_base)
        el_instance_id = el_event.find('.//rcs:InstanceID', NS)
        args = kwargs.copy()
        args.update({'val': value})
        ET.SubElement(el_instance_id, 'rcs:' + property, **args)

        notify_base = '<?xml version="1.0" encoding="utf-8"?>' \
                      '<e:propertyset xmlns:e="urn:schemas-upnp-org:event-1-0">' \
                      '<e:property>' \
                      '<LastChange />' \
                      '</e:property>' \
                      '</e:propertyset>'
        el_notify = ET.fromstring(notify_base)
        el_last_change = el_notify.find('.//LastChange', NS)
        el_last_change.text = ET.tostring(el_event).decode('utf-8')

        global SUBSCRIBED_CLIENTS
        service_name = self.SERVICE_NAME
        for sid, url in SUBSCRIBED_CLIENTS[service_name].items():
            headers = {
                'SID': sid
            }
            with ClientSession(loop=asyncio.get_event_loop()) as session:
                with async_timeout.timeout(10):
                    data = ET.tostring(el_notify)
                    LOGGER.debug('Calling: %s', url)
                    yield from session.request('NOTIFY', url, headers=headers, data=data)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号