def _async_call(f, args, kwargs, on_done):
def run(data):
f, args, kwargs, on_done = data
error = None
result = None
try:
result = f(*args, **kwargs)
except Exception as e:
e.traceback = traceback.format_exc()
error = 'Unhandled exception in asyn call:\n{}'.format(e.traceback)
GLib.idle_add(lambda: on_done(result, error))
data = f, args, kwargs, on_done
thread = threading.Thread(target=run, args=(data,))
thread.daemon = True
thread.start()
python类idle_add()的实例源码
def InitSignal(gui):
def signal_action(signal):
if signal.value is signal.SIGINT.value:
print("\r", end="")
logging.debug("Caught signal SIGINT(2)")
gui.stop()
def handler(*args):
signal_action(args[0])
def idle_handler(*args):
GLib.idle_add(signal_action, priority=GLib.PRIORITY_HIGH)
def install_glib_handler(sig):
GLib.unix_signal_add(GLib.PRIORITY_HIGH, sig, handler, sig)
SIGS = [getattr(signal, s, None) for s in "SIGINT".split()]
for sig in filter(None, SIGS):
signal.signal(sig, idle_handler)
GLib.idle_add(install_glib_handler, sig, priority=GLib.PRIORITY_HIGH)
def _load_profiles(self):
"""
Search for file containign list of profiles and reads it.
This is done in thread, with crazy hope that it will NOT crash GTK
in the process.
"""
p = os.path.join(os.path.expanduser(self.STEAMPATH), "userdata")
i = 0
if os.path.exists(p):
for user in os.listdir(p):
sharedconfig = os.path.join(p, user, self.PROFILE_LIST)
if os.path.isfile(sharedconfig):
self._lock.acquire()
log.debug("Loading sharedconfig from '%s'", sharedconfig)
try:
i = self._parse_profile_list(i, sharedconfig)
except Exception, e:
log.exception(e)
self._lock.release()
GLib.idle_add(self._load_finished)
def vdf_import_confirmed(self, *a):
name = self.builder.get_object("txName").get_text().strip()
if len(self._profile.action_sets) > 1:
# Update ChangeProfileActions with correct profile names
for x in self._profile.action_set_switches:
id = int(x._profile.split(":")[-1])
target_set = self._profile.action_set_by_id(id)
x._profile = self.gen_aset_name(name, target_set)
# Save action set profiles
for k in self._profile.action_sets:
if k != 'default':
filename = self.gen_aset_name(name, k) + ".sccprofile"
path = os.path.join(get_profiles_path(), filename)
self._profile.action_sets[k].save(path)
self.app.new_profile(self._profile, name)
GLib.idle_add(self.window.destroy)
def setup_signals(self, signals, handler):
"""
This is a workaround to signal.signal(signal, handler)
which does not work with a GLib.MainLoop() for some reason.
Thanks to: http://stackoverflow.com/a/26457317/5433146
args:
signals (list of signal.SIG... signals): the signals to connect to
handler (function): function to be executed on these signals
"""
def install_glib_handler(sig): # add a unix signal handler
GLib.unix_signal_add( GLib.PRIORITY_HIGH,
sig, # for the given signal
handler, # on this signal, run this function
sig # with this argument
)
for sig in signals: # loop over all signals
GLib.idle_add( # 'execute'
install_glib_handler, sig, # add a handler for this signal
priority = GLib.PRIORITY_HIGH )
# set the config
def setup_signals(self, signals, handler):
"""
This is a workaround to signal.signal(signal, handler)
which does not work with a GLib.MainLoop() for some reason.
Thanks to: http://stackoverflow.com/a/26457317/5433146
args:
signals (list of signal.SIG... signals): the signals to connect to
handler (function): function to be executed on these signals
"""
def install_glib_handler(sig): # add a unix signal handler
GLib.unix_signal_add( GLib.PRIORITY_HIGH,
sig, # for the given signal
handler, # on this signal, run this function
sig # with this argument
)
for sig in signals: # loop over all signals
GLib.idle_add( # 'execute'
install_glib_handler, sig, # add a handler for this signal
priority = GLib.PRIORITY_HIGH )
# build the gui
def _async_call(f, args, kwargs, on_done):
def run(data):
f, args, kwargs, on_done = data
error = None
result = None
try:
result = f(*args, **kwargs)
except Exception as e:
e.traceback = traceback.format_exc()
error = 'Unhandled exception in asyn call:\n{}'.format(e.traceback)
GLib.idle_add(lambda: on_done(result, error))
data = f, args, kwargs, on_done
thread = threading.Thread(target=run, args=(data,))
thread.daemon = True
thread.start()
def load_game_parm(self,fname):
try:
fp = open(fname)
except :
gv.gui.info_box("Error loading game - file not found")
return
fp.close()
gv.gui.window.set_title(NAME + " " + VERSION + " " + os.path.basename(fname))
if fname.endswith(".psn"):
self.get_header_from_file(fname)
if gv.show_header == True:
GLib.idle_add(gv.gui.header_lblsente.set_text, gv.sente[:50])
GLib.idle_add(gv.gui.header_lblgote.set_text, gv.gote[:50])
GLib.idle_add(gv.gui.header_lblevent.set_text, gv.event[:50])
GLib.idle_add(gv.gui.header_lbldate.set_text, gv.gamedate[:50])
self.psn.load_game_psn(fname)
return
def command(self, cmd):
e = self.side + "(" + self.get_running_engine().strip() + "):"
if gv.verbose or gv.verbose_uci:
print("->" + e + cmd.strip())
GObject.idle_add(self.engine_debug.add_to_log, "->" + e + cmd.strip())
try:
# write as string (not bytes) since universal_newlines=True
self.p.stdin.write(cmd)
except AttributeError:
GObject.idle_add(
self.engine_debug.add_to_log,
"# engine process is not running")
except IOError:
GObject.idle_add(
self.engine_debug.add_to_log,
"# engine process is not running")
def nvim_request(self, nvim: object, sub: str, args: object) -> object:
"""Signal emitted on a request from neovim.
This is called on neovim's event loop, so modifications to GTK widgets
should be done through `GLib.idle_add()`.
After emission, should return an appropriate value expected by the
request.
:nvim: the `neovim.Nvim` instance.
:sub: the request name.
:args: the request arguments.
"""
if sub == 'Gui' and args[0] == 'Clipboard':
return self.update_clipboard(*args[1:])
def update_clipboard(self, method: str, data: list):
"""Handles a clipboard request.
This is called on neovim's event loop, so modifications to GTK widgets
should be done through `GLib.idle_add()`.
:method: `get` to obtain the clipboard's contents or `set` to update
them.
:data: when the method is `set`, contains the lines to add to the
clipboard, else None.
:returns: the clipboard's contents when the method is `get`, else None.
"""
cb = Gtk.Clipboard.get_default(Gdk.Display.get_default())
if method == 'set':
text = '\n'.join(data[0])
return cb.set_text(text, len(text))
if method == 'get':
text = cb.wait_for_text()
return text.split('\n') if text else []
def switched(meta, builder):
switch = builder.get_object('connect-switch')
state = switch.get_active()
logger.info("switch activated, old state {}".format(state))
if not state:
logger.info("setting switch ON")
GLib.idle_add(lambda: switch.set_active(True))
activate_connection(meta=meta, builder=builder)
else:
notify("eduVPN disconnecting...", "Disconnecting from {}".format(meta.display_name))
logger.info("setting switch OFF")
GLib.idle_add(lambda: switch.set_active(False))
try:
disconnect_provider(meta.uuid)
except Exception as e:
window = builder.get_object('eduvpn-window')
error_helper(window, "can't disconnect", "{}: {}".format(type(e).__name__, str(e)))
GLib.idle_add(lambda: switch.set_active(True))
raise
def _background(meta, oauth, dialog, builder):
try:
cert, key = create_keypair(oauth, meta.api_base_uri)
meta.cert = cert
meta.key = key
meta.config = get_profile_config(oauth, meta.api_base_uri, meta.profile_id)
except Exception as e:
GLib.idle_add(lambda: error_helper(dialog, "can't finalize configuration", "{}: {}".format(type(e).__name__,
str(e))))
GLib.idle_add(lambda: dialog.hide())
raise
else:
try:
uuid = store_provider(meta)
monitor_vpn(uuid=uuid, callback=lambda *args, **kwargs: vpn_change(builder=builder))
GLib.idle_add(lambda: notify("eduVPN provider added", "added provider '{}'".format(meta.display_name)))
except Exception as e:
GLib.idle_add(lambda: error_helper(dialog, "can't store configuration", "{} {}".format(type(e).__name__,
str(e))))
GLib.idle_add(lambda: dialog.hide())
raise
else:
GLib.idle_add(lambda: dialog.hide())
GLib.idle_add(lambda: update_providers(builder))
def setup_signals(self, signals, handler):
"""
This is a workaround to signal.signal(signal, handler)
which does not work with a GLib.MainLoop() for some reason.
Thanks to: http://stackoverflow.com/a/26457317/5433146
args:
signals (list of signal.SIG... signals): the signals to connect to
handler (function): function to be executed on these signals
"""
def install_glib_handler(sig): # add a unix signal handler
GLib.unix_signal_add( GLib.PRIORITY_HIGH,
sig, # for the given signal
handler, # on this signal, run this function
sig # with this argument
)
for sig in signals: # loop over all signals
GLib.idle_add( # 'execute'
install_glib_handler, sig, # add a handler for this signal
priority = GLib.PRIORITY_HIGH )
# get an object from the builder
def paste_emoji(self, completer, model, tree_iter):
shortname = model[tree_iter][1]
codepoint = model[tree_iter][3]
if '-' in codepoint:
sequence = codepoint.split('-')
string = chr(int(sequence[0], 16)) + chr(int(sequence[1], 16))
else:
string = chr(int(codepoint, 16))
self.entry.set_text('')
# When selecting match with mouse window.close() doesn't seem to work
self.hide_window(self, None)
GLib.idle_add(shared.clipboard.paste, string)
if shortname in shared.recent:
shared.recent.remove(shortname)
shared.recent.appendleft(shortname)
shared.emoji.categories['recent'].get_model().refilter()
# Need to trigger resort when emoji already in recent gets used again
shared.emoji.categories['recent'].set_sort_func(
1, shared.emoji.sort_recent, None)
return True
def async_call(func, *args, callback=None):
'''Call `func` in background thread, and then call `callback` in Gtk main thread.
If error occurs in `func`, error will keep the traceback and passed to
`callback` as second parameter. Always check `error` is not None.
'''
def do_call():
result = None
error = None
try:
result = func(*args)
except Exception:
error = traceback.format_exc()
logger.error(error)
if callback:
GLib.idle_add(callback, result, error)
thread = threading.Thread(target=do_call)
thread.daemon = True
thread.start()
def _connect(self, device):
def cb_connect():
try:
bus = pydbus.SystemBus()
proxy = bus.get(SERVICE_NAME, device.path)
proxy.Connect()
except KeyError:
self._log.debug("The device has likely disappeared.", exc_info=True)
except GLib.Error:
self._log.debug("Connect() failed:", exc_info=True)
else:
self._log.info("Connection successful.")
self.queued_connections -= 1
if self.queued_connections == 0:
print_device(device, "Connecting")
GLib.idle_add(cb_connect)
device.connected = True
self.queued_connections += 1
def on_fontsize_adjustment_value_changed(self, adjustment):
'''
The fontsize adjustment in the header bar has been changed
:param adjustment: The adjustment used to change the fontsize
:type adjustment: Gtk.Adjustment object
'''
value = adjustment.get_value()
if _ARGS.debug:
sys.stdout.write(
'on_fontsize_adjustment_value_changed() value = %s\n'
%value)
self._fontsize = value
self._save_options()
self._busy_start()
GLib.idle_add(self._change_flowbox_font)
def on_fallback_check_button_toggled(self, check_button):
'''
The fallback check button in the header bar has been toggled
:param toggle_button: The check button used to select whether
fallback fonts should be used.
:type adjustment: Gtk.CheckButton object
'''
self._fallback = check_button.get_active()
if _ARGS.debug:
sys.stdout.write(
'on_fallback_check_button_toggled() self._fallback = %s\n'
%self._fallback)
self._save_options()
self._busy_start()
GLib.idle_add(self._change_flowbox_font)
def __init__(self):
Gtk.Window.__init__(self, title='apart')
self.dying = False
self.status_listener = MessageListener(
on_message=lambda m: GLib.idle_add(self.on_status_msg, m),
message_predicate=lambda m: m['type'] == 'status')
self.core = ApartCore(listeners=[self.status_listener],
on_finish=lambda code: GLib.idle_add(self.on_delete))
self.sources = None
self.sources_interest = [] # array of callbacks on sources update
self.set_default_size(height=300, width=300 * 16/9)
self.loading_body = LoadingBody()
self.clone_body = None
self.add(self.loading_body)
self.connect('delete-event', self.on_delete)
self.set_icon_name('apart')
def try_bridge_connection(self, cb):
import phue
while True:
try:
bridge = phue.Bridge(ip=self.selected_bridge)
except phue.PhueRegistrationException:
time.sleep(1)
except Exception:
import traceback
traceback.print_exc()
break
else:
self.bridge = bridge
break
GLib.idle_add(cb)
def do_activate(self):
mark_time("in app activate")
if self.get_active_window() is not None:
self.get_active_window().present()
return
window = Window(self)
mark_time("have window")
self.add_window(window)
mark_time("added window")
window.show()
mark_time("showed window")
GLib.idle_add(mark_time, "in main loop")
mark_time("app activate done")
def multithread(handler):
"""Multithread decorator"""
def action(*args, **kwargs):
with global_threading_lock:
try:
result = handler(*args, **kwargs)
GLib.idle_add(on_done, result, args[0])
except Exception as e:
print("Error in multithreading:\n%s" % str(e))
def on_done(result, inst):
set_cursor(inst)
if callable(result):
result()
def wrapper(*args, **kwargs):
set_cursor(args[0], Gdk.Cursor(Gdk.CursorType.WATCH))
thread = threading.Thread(target=action, args=args, kwargs=kwargs)
thread.daemon = True
thread.start()
return wrapper
def toggle_cursor(widget, hide=False):
window = widget.get_window()
if window:
blank = Gdk.CursorType.BLANK_CURSOR
cursor = Gdk.Cursor(blank) if hide else None
GLib.idle_add(window.set_cursor, cursor)
def __init__(self, app):
self.app = app
self.stack = app.channels_stack
self.filter = None
self.channels = Gtk.Builder()
self.channels.add_from_file(relative_path('ui/channels.ui'))
self.channels.connect_signals(self)
self.channels_box = self.channels.get_object('box_channels')
self.stack.add_named(self.channels_box, 'channels_container')
self.channels_filters = self.channels.get_object('list_box_channels_filters')
self.channels_list = self.channels.get_object('flow_box_channels_list')
self.channels_list.set_filter_func(self.on_channels_list_row_changed)
GLib.idle_add(self.do_initial_setup)
def run(self):
self.running = True
while self.running:
event = self.display.next_event()
try:
if event.type == X.ButtonPress:
# Check if pointer is inside monitored windows
for w in self.windows:
if (Gtk.MAJOR_VERSION, Gtk.MINOR_VERSION) >= (3, 20):
pdevice = Gdk.Display.get_default().get_default_seat().get_pointer()
else:
pdevice = Gdk.Display.get_default().get_device_manager().get_client_pointer()
p = self.get_window().get_device_position(pdevice)
g = self.get_size()
if p.x >= 0 and p.y >= 0 and p.x <= g.width and p.y <= g.height:
break
else:
# Is outside, so activate
GLib.idle_add(self.idle)
self.display.allow_events(X.ReplayPointer, event.time)
else:
self.display.allow_events(X.ReplayPointer, X.CurrentTime)
except Exception as e:
print "Unexpected error: " + str(e)
def load_feeds(self):
with concurrent.futures.ProcessPoolExecutor() as executor:
feeds = executor.map(scrape_rss, self.feed_manager.get_feeds())
frontpage_entries = []
for feed in feeds:
feed.entries = feed.entries[:10] # Only get 1st 10
GLib.idle_add(self.add_new_feed_tab, feed.feed.title, feed.entries)
# Load into frontpage-o
frontpage_entries.extend(feed.entries)
frontpage_entries = sorted(frontpage_entries, key=operator.attrgetter('updated'))
GLib.idle_add(self.add_new_feed_tab, "Frontpage", frontpage_entries)
def on_video_search1_activated(self,searchbox):
""" Start searching when the user presses enter """
# Show a spinner to the user while the results are being retrieved
spinner = Gtk.Spinner()
searching = Gtk.Label("Buscando...")
# iterate through the list items and remove child items
self.video_list.foreach(lambda child: self.video_list.remove(child))
# Show the spinner and searching label
self.video_list.add(spinner)
self.video_list.add(searching)
spinner.start()
# Update the changes
self.video_list.show_all()
#we spawn a new thread to consume the api asynchronously
def start_search():
#for now we use a single backend
self.search_response = self.backend.search(searchbox.get_text())
for search_result in self.search_response.get("items",[]):
if search_result["id"]["kind"] == "youtube#video":
GLib.idle_add(spinner.destroy)
GLib.idle_add(searching.destroy)
GLib.idle_add(self.update_list,search_result)
GLib.idle_add(self.video_list.show)
self.download_thread = threading.Thread(target=start_search)
self.download_thread.daemon = True
self.download_thread.start()
def download_video_thread(self,video_id, filename, video_iter):
def update_progress(progress):
""" Depending on the download status update the dialog or notify the user """
for item in self.download_queue_store:
def get_status(status):
return {
'downloading' : 'Descargando',
'finished' : 'Terminado',
'converting' : 'Convirtiendo'
}[status]
# This mumbo-jumbo exists because messing with the progress bar without using Glib.idle_add caused crashes with huge files.
def update_progressbar(progress):
self.download_queue_store[video_iter][1] = get_status(progress['status'])
if 'total_bytes_estimate' in progress:
self.download_queue_store[video_iter][2] = progress['downloaded_bytes'] / progress['total_bytes_estimate'] * 100
else:
self.download_queue_store[video_iter][2] = progress['downloaded_bytes'] / progress['total_bytes'] * 100
GLib.idle_add(update_progressbar, progress)
if progress['status'] != 'downloading':
GLib.idle_add(self.finish_download)
""" Spawn a new thread and download """
def start_download():
self.backend.video = video_id
self.backend.filename = filename
self.backend.download_callback = update_progress
self.backend.download()
self.download_thread = threading.Thread(target=start_download)
self.download_thread.daemon = True
self.download_thread.start()
def emit(self, *args):
GLib.idle_add(GObject.GObject.emit, self, *args)