def test_apthistory_rescan_big(self):
""" create big history file and ensure that on rescan the
events are still processed
"""
self._timeouts = []
new_history = os.path.join(self.basedir,"history.log.2")
try:
os.remove(new_history+".gz")
except OSError:
pass
history = self._get_apt_history()
self.assertEqual(len(history.transactions), 186)
self._generate_big_history_file(new_history)
timer_id = GObject.timeout_add(100, self._glib_timeout)
with ExecutionTime("rescan %s byte file" % os.path.getsize(new_history+".gz")):
history._rescan(use_cache=False)
GObject.source_remove(timer_id)
# verify rescan
self.assertTrue(len(history.transactions) > 186)
# check the timeouts
self.assertTrue(len(self._timeouts) > 0)
for i in range(len(self._timeouts)-1):
# check that we get a max timeout of 0.2s
if abs(self._timeouts[i] - self._timeouts[i+1]) > 0.2:
raise
os.remove(new_history+".gz")
python类source_remove()的实例源码
def clear(self):
super(PendingStore, self).clear()
for sig in self._signals:
GObject.source_remove(sig)
del sig
self._signals = []
def on_entry_changed(widget, data):
def _work():
new_text = widget.get_text()
(view, enquirer) = data
with ExecutionTime("total time"):
with ExecutionTime("enquire.set_query()"):
enquirer.set_query(get_query_from_search_entry(new_text),
limit=100 * 1000,
nonapps_visible=NonAppVisibility.ALWAYS_VISIBLE)
store = view.tree_view.get_model()
with ExecutionTime("store.clear()"):
store.clear()
with ExecutionTime("store.set_from_matches()"):
store.set_from_matches(enquirer.matches)
with ExecutionTime("model settle (size=%s)" % len(store)):
while Gtk.events_pending():
Gtk.main_iteration()
return
if widget.stamp:
GObject.source_remove(widget.stamp)
widget.stamp = GObject.timeout_add(250, _work)
def cleanup_timeout(self):
if self._timeout > 0:
GObject.source_remove(self._timeout)
self._timeout = 0
def set_exhibits(self, exhibits_list):
if not exhibits_list:
return
self.exhibits = exhibits_list
self.cursor = 0
for child in self.index_hbox:
child.destroy()
for sigid in self._dotsigs:
GObject.source_remove(sigid)
self._dotsigs = []
if len(self.exhibits) > 1:
for i, exhibit in enumerate(self.exhibits):
dot = ExhibitButton()
dot.set_size_request(StockEms.LARGE, StockEms.LARGE)
self._dotsigs.append(
dot.connect("clicked",
self.on_paging_dot_clicked,
len(self.exhibits) - 1 - i) # index
)
self.index_hbox.pack_end(dot, False, False, 0)
self.index_hbox.show_all()
self._render_exhibit_at_cursor()
def _on_changed(self, widget):
"""
Call the actual search method after a small timeout to allow the user
to enter a longer search term
"""
self._check_style()
if self._timeout_id > 0:
GObject.source_remove(self._timeout_id)
self._timeout_id = GObject.timeout_add(self.SEARCH_TIMEOUT,
self._emit_terms_changed)
def reset(self):
for label, handler in zip(self._labels, self._handlers):
GObject.source_remove(handler)
label.destroy()
self._labels = []
self._handlers = []
def set_app_details(self, app_details):
if self._sig > 0:
GObject.source_remove(self._sig)
self.app_details = app_details
self.appname = app_details.display_name
self.pkgname = app_details.pkgname
self._sig = self.app_details.connect(
"screenshots-available", self._on_screenshots_available)
self._screenshots = []
self.app_details.query_multiple_screenshots()
def clear(self):
self.cancel.cancel()
self.cancel.reset()
for sig in self._handlers:
GObject.source_remove(sig)
for child in self:
child.destroy()
def _on_axi_stamp_changed(self, monitor, afile, otherfile, event):
# we only care about the utime() update from update-a-x-i
if not event == Gio.FileMonitorEvent.ATTRIBUTE_CHANGED:
return
LOG.info("afile '%s' changed" % afile)
if self._timeout_id:
GObject.source_remove(self._timeout_id)
self._timeout_id = None
self._timeout_id = GObject.timeout_add(500, self.reopen)
def _on_lowlevel_transactions_changed(self, watcher, current, pending):
# cleanup progress signal (to be sure to not leave dbus
# matchers around)
if self._progress_signal:
GObject.source_remove(self._progress_signal)
self._progress_signal = None
# attach progress-changed signal for current transaction
if current:
try:
trans = client.get_transaction(current)
self._progress_signal = trans.connect("progress-changed",
self._on_progress_changed)
except dbus.DBusException:
pass
# now update pending transactions
self.pending_transactions.clear()
for tid in [current] + pending:
if not tid:
continue
try:
trans = client.get_transaction(tid,
error_handler=lambda x: True)
except dbus.DBusException:
continue
trans_progress = TransactionProgress(trans)
try:
self.pending_transactions[trans_progress.pkgname] = \
trans_progress
except KeyError:
# if its not a transaction from us (sc_pkgname) still
# add it with the tid as key to get accurate results
# (the key of pending_transactions is never directly
# exposed in the UI)
self.pending_transactions[trans.tid] = trans_progress
# emit signal
self.inject_fake_transactions_and_emit_changed_signal()
def clear(self):
super(PendingStore, self).clear()
for sig in self._signals:
GObject.source_remove(sig)
del sig
self._signals = []
def on_entry_changed(widget, data):
def _work():
new_text = widget.get_text()
(view, enquirer) = data
with ExecutionTime("total time"):
with ExecutionTime("enquire.set_query()"):
enquirer.set_query(get_query_from_search_entry(new_text),
limit=100 * 1000,
nonapps_visible=NonAppVisibility.ALWAYS_VISIBLE)
store = view.tree_view.get_model()
with ExecutionTime("store.clear()"):
store.clear()
with ExecutionTime("store.set_from_matches()"):
store.set_from_matches(enquirer.matches)
with ExecutionTime("model settle (size=%s)" % len(store)):
while Gtk.events_pending():
Gtk.main_iteration()
return
if widget.stamp:
GObject.source_remove(widget.stamp)
widget.stamp = GObject.timeout_add(250, _work)
def cleanup_timeout(self):
if self._timeout > 0:
GObject.source_remove(self._timeout)
self._timeout = 0
def hide_spinner(self):
""" hide the spinner page again and show the content page """
if self._last_timeout_id is not None:
GObject.source_remove(self._last_timeout_id)
self._last_timeout_id = None
self.spinner_view.stop_and_hide()
self.set_current_page(self.CONTENT_PAGE)
def _on_changed(self, widget):
"""
Call the actual search method after a small timeout to allow the user
to enter a longer search term
"""
self._check_style()
if self._timeout_id > 0:
GObject.source_remove(self._timeout_id)
self._timeout_id = GObject.timeout_add(self.SEARCH_TIMEOUT,
self._emit_terms_changed)
def reset(self):
for label, handler in zip(self._labels, self._handlers):
GObject.source_remove(handler)
label.destroy()
self._labels = []
self._handlers = []
def set_app_details(self, app_details):
if self._sig > 0:
GObject.source_remove(self._sig)
self.app_details = app_details
self.appname = app_details.display_name
self.pkgname = app_details.pkgname
self._sig = self.app_details.connect(
"screenshots-available", self._on_screenshots_available)
self._screenshots = []
self.app_details.query_multiple_screenshots()
def clear(self):
self.cancel.cancel()
self.cancel.reset()
for sig in self._handlers:
GObject.source_remove(sig)
for child in self:
child.destroy()
def _on_apt_finished_stamp_changed(self, monitor, afile, other_file,
event):
if not event == Gio.FileMonitorEvent.CHANGES_DONE_HINT:
return
if self._timeout_id:
GObject.source_remove(self._timeout_id)
self._timeout_id = None
self._timeout_id = GObject.timeout_add_seconds(10, self.open)