dbussy.py 文件源码

python
阅读 24 收藏 0 点赞 0 评论 0

项目:dbussy 作者: ldo 项目源码 文件源码
def add_filter(self, function, user_data, free_data = None) :
        "adds a filter callback that gets to look at all incoming messages" \
        " before they get to the dispatch system. The same function can be added" \
        " multiple times as long as the user_data is different"

        def wrap_function(c_conn, message, _data) :
            result = function(self, Message(dbus.dbus_message_ref(message)), user_data)
            if isinstance(result, types.CoroutineType) :
                assert self.loop != None, "no event loop to attach coroutine to"
                self.loop.create_task(result)
                result = DBUS.HANDLER_RESULT_HANDLED
            #end if
            return \
                result
        #end wrap_function

        def wrap_free_data(_data) :
            free_data(data)
        #end wrap_free_data

    #begin add_filter
        filter_key = (function, data_key(user_data))
        filter_value = \
            {
                "function" : DBUS.HandleMessageFunction(wrap_function),
                "free_data" : (lambda : None, lambda : DBUS.FreeFunction(wrap_free_data))[free_data != None](),
            }
        # pass user_data id because libdbus identifies filter entry by both function address and user data address
        if not dbus.dbus_connection_add_filter(self._dbobj, filter_value["function"], filter_key[1], filter_value["free_data"]) :
            raise CallFailed("dbus_connection_add_filter")
        #end if
        self._filters[filter_key] = filter_value
          # need to ensure wrapped functions don’t disappear prematurely
    #end add_filter
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号