python类source_remove()的实例源码

test_apthistory.py 文件源码 项目:x-mario-center 作者: fossasia 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_apthistory_rescan_big(self):
        """ create big history file and ensure that on rescan the
            events are still processed
        """
        self._timeouts = []
        new_history = os.path.join(self.basedir,"history.log.2")
        try:
            os.remove(new_history+".gz")
        except OSError: 
            pass
        history = self._get_apt_history()
        self.assertEqual(len(history.transactions), 186)
        self._generate_big_history_file(new_history)
        timer_id = GObject.timeout_add(100, self._glib_timeout)
        with ExecutionTime("rescan %s byte file" % os.path.getsize(new_history+".gz")):
            history._rescan(use_cache=False)
        GObject.source_remove(timer_id)
        # verify rescan
        self.assertTrue(len(history.transactions) > 186)
        # check the timeouts
        self.assertTrue(len(self._timeouts) > 0)
        for i in range(len(self._timeouts)-1):
            # check that we get a max timeout of 0.2s
            if abs(self._timeouts[i] - self._timeouts[i+1]) > 0.2:
                raise
        os.remove(new_history+".gz")
pendingstore.py 文件源码 项目:x-mario-center 作者: fossasia 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def clear(self):
        super(PendingStore, self).clear()
        for sig in self._signals:
            GObject.source_remove(sig)
            del sig
        self._signals = []
appview.py 文件源码 项目:x-mario-center 作者: fossasia 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def on_entry_changed(widget, data):

    def _work():
        new_text = widget.get_text()
        (view, enquirer) = data

        with ExecutionTime("total time"):
            with ExecutionTime("enquire.set_query()"):
                enquirer.set_query(get_query_from_search_entry(new_text),
                    limit=100 * 1000,
                    nonapps_visible=NonAppVisibility.ALWAYS_VISIBLE)

            store = view.tree_view.get_model()
            with ExecutionTime("store.clear()"):
                store.clear()

            with ExecutionTime("store.set_from_matches()"):
                store.set_from_matches(enquirer.matches)

            with ExecutionTime("model settle (size=%s)" % len(store)):
                while Gtk.events_pending():
                    Gtk.main_iteration()
        return

    if widget.stamp:
        GObject.source_remove(widget.stamp)
    widget.stamp = GObject.timeout_add(250, _work)
exhibits.py 文件源码 项目:x-mario-center 作者: fossasia 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def cleanup_timeout(self):
        if self._timeout > 0:
            GObject.source_remove(self._timeout)
            self._timeout = 0
exhibits.py 文件源码 项目:x-mario-center 作者: fossasia 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def set_exhibits(self, exhibits_list):
        if not exhibits_list:
            return

        self.exhibits = exhibits_list
        self.cursor = 0

        for child in self.index_hbox:
            child.destroy()

        for sigid in self._dotsigs:
            GObject.source_remove(sigid)

        self._dotsigs = []
        if len(self.exhibits) > 1:
            for i, exhibit in enumerate(self.exhibits):
                dot = ExhibitButton()
                dot.set_size_request(StockEms.LARGE, StockEms.LARGE)
                self._dotsigs.append(
                    dot.connect("clicked",
                                self.on_paging_dot_clicked,
                                len(self.exhibits) - 1 - i)  # index
                )
                self.index_hbox.pack_end(dot, False, False, 0)
                self.index_hbox.show_all()

        self._render_exhibit_at_cursor()
searchentry.py 文件源码 项目:x-mario-center 作者: fossasia 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def _on_changed(self, widget):
        """
        Call the actual search method after a small timeout to allow the user
        to enter a longer search term
        """
        self._check_style()
        if self._timeout_id > 0:
            GObject.source_remove(self._timeout_id)
        self._timeout_id = GObject.timeout_add(self.SEARCH_TIMEOUT,
                                               self._emit_terms_changed)
searchaid.py 文件源码 项目:x-mario-center 作者: fossasia 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def reset(self):
        for label, handler in zip(self._labels, self._handlers):
            GObject.source_remove(handler)
            label.destroy()

        self._labels = []
        self._handlers = []
thumbnail.py 文件源码 项目:x-mario-center 作者: fossasia 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def set_app_details(self, app_details):
        if self._sig > 0:
            GObject.source_remove(self._sig)

        self.app_details = app_details
        self.appname = app_details.display_name
        self.pkgname = app_details.pkgname

        self._sig = self.app_details.connect(
            "screenshots-available", self._on_screenshots_available)

        self._screenshots = []
        self.app_details.query_multiple_screenshots()
thumbnail.py 文件源码 项目:x-mario-center 作者: fossasia 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def clear(self):
        self.cancel.cancel()
        self.cancel.reset()

        for sig in self._handlers:
            GObject.source_remove(sig)

        for child in self:
            child.destroy()
database.py 文件源码 项目:x-mario-center 作者: fossasia 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _on_axi_stamp_changed(self, monitor, afile, otherfile, event):
        # we only care about the utime() update from update-a-x-i
        if not event == Gio.FileMonitorEvent.ATTRIBUTE_CHANGED:
            return
        LOG.info("afile '%s' changed" % afile)
        if self._timeout_id:
            GObject.source_remove(self._timeout_id)
            self._timeout_id = None
        self._timeout_id = GObject.timeout_add(500, self.reopen)
aptd.py 文件源码 项目:x-mario-center 作者: fossasia 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _on_lowlevel_transactions_changed(self, watcher, current, pending):
        # cleanup progress signal (to be sure to not leave dbus
        # matchers around)
        if self._progress_signal:
            GObject.source_remove(self._progress_signal)
            self._progress_signal = None
        # attach progress-changed signal for current transaction
        if current:
            try:
                trans = client.get_transaction(current)
                self._progress_signal = trans.connect("progress-changed",
                    self._on_progress_changed)
            except dbus.DBusException:
                pass

        # now update pending transactions
        self.pending_transactions.clear()
        for tid in [current] + pending:
            if not tid:
                continue
            try:
                trans = client.get_transaction(tid,
                    error_handler=lambda x: True)
            except dbus.DBusException:
                continue
            trans_progress = TransactionProgress(trans)
            try:
                self.pending_transactions[trans_progress.pkgname] = \
                    trans_progress
            except KeyError:
                # if its not a transaction from us (sc_pkgname) still
                # add it with the tid as key to get accurate results
                # (the key of pending_transactions is never directly
                #  exposed in the UI)
                self.pending_transactions[trans.tid] = trans_progress
        # emit signal
        self.inject_fake_transactions_and_emit_changed_signal()
pendingstore.py 文件源码 项目:x-mario-center 作者: fossasia 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def clear(self):
        super(PendingStore, self).clear()
        for sig in self._signals:
            GObject.source_remove(sig)
            del sig
        self._signals = []
appview.py 文件源码 项目:x-mario-center 作者: fossasia 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def on_entry_changed(widget, data):

    def _work():
        new_text = widget.get_text()
        (view, enquirer) = data

        with ExecutionTime("total time"):
            with ExecutionTime("enquire.set_query()"):
                enquirer.set_query(get_query_from_search_entry(new_text),
                    limit=100 * 1000,
                    nonapps_visible=NonAppVisibility.ALWAYS_VISIBLE)

            store = view.tree_view.get_model()
            with ExecutionTime("store.clear()"):
                store.clear()

            with ExecutionTime("store.set_from_matches()"):
                store.set_from_matches(enquirer.matches)

            with ExecutionTime("model settle (size=%s)" % len(store)):
                while Gtk.events_pending():
                    Gtk.main_iteration()
        return

    if widget.stamp:
        GObject.source_remove(widget.stamp)
    widget.stamp = GObject.timeout_add(250, _work)
exhibits.py 文件源码 项目:x-mario-center 作者: fossasia 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def cleanup_timeout(self):
        if self._timeout > 0:
            GObject.source_remove(self._timeout)
            self._timeout = 0
spinner.py 文件源码 项目:x-mario-center 作者: fossasia 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def hide_spinner(self):
        """ hide the spinner page again and show the content page """
        if self._last_timeout_id is not None:
            GObject.source_remove(self._last_timeout_id)
            self._last_timeout_id = None
        self.spinner_view.stop_and_hide()
        self.set_current_page(self.CONTENT_PAGE)
searchentry.py 文件源码 项目:x-mario-center 作者: fossasia 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _on_changed(self, widget):
        """
        Call the actual search method after a small timeout to allow the user
        to enter a longer search term
        """
        self._check_style()
        if self._timeout_id > 0:
            GObject.source_remove(self._timeout_id)
        self._timeout_id = GObject.timeout_add(self.SEARCH_TIMEOUT,
                                               self._emit_terms_changed)
searchaid.py 文件源码 项目:x-mario-center 作者: fossasia 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def reset(self):
        for label, handler in zip(self._labels, self._handlers):
            GObject.source_remove(handler)
            label.destroy()

        self._labels = []
        self._handlers = []
thumbnail.py 文件源码 项目:x-mario-center 作者: fossasia 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def set_app_details(self, app_details):
        if self._sig > 0:
            GObject.source_remove(self._sig)

        self.app_details = app_details
        self.appname = app_details.display_name
        self.pkgname = app_details.pkgname

        self._sig = self.app_details.connect(
            "screenshots-available", self._on_screenshots_available)

        self._screenshots = []
        self.app_details.query_multiple_screenshots()
thumbnail.py 文件源码 项目:x-mario-center 作者: fossasia 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def clear(self):
        self.cancel.cancel()
        self.cancel.reset()

        for sig in self._handlers:
            GObject.source_remove(sig)

        for child in self:
            child.destroy()
aptcache.py 文件源码 项目:x-mario-center 作者: fossasia 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def _on_apt_finished_stamp_changed(self, monitor, afile, other_file,
        event):
        if not event == Gio.FileMonitorEvent.CHANGES_DONE_HINT:
            return
        if self._timeout_id:
            GObject.source_remove(self._timeout_id)
            self._timeout_id = None
        self._timeout_id = GObject.timeout_add_seconds(10, self.open)


问题


面经


文章

微信
公众号

扫码关注公众号