def _add_watch(self, obj):
mask = self._get_mask(obj)
if mask:
source_id = gobject.io_add_watch(obj, mask, self._handle_io,
priority=IO_PRIORITY)
self._watch_map[obj] = (source_id, mask)
return False
# This should be exceptional. The channel is still open, but is neither
# readable nor writable. Retry until it is, but with a timeout_add() to
# preserve CPU.
if self._watch_map[obj][1] == 'idle_add':
source_id = gobject.timeout_add(200, self._add_watch, obj,
priority=gobject.PRIORITY_LOW)
self._watch_map[obj] = (source_id, 'timeout_add')
return False
return True
python类idle_add()的实例源码
def uBitPoller():
global uBitFound
global uBitUploading
last = {}
while True:
for self in OPENWINDOWS:
if self not in last:
last[self] = (False, False)
uBitFound = os.path.exists(SETTINGS['mbitLocation'])
if not (uBitUploading and uBitFound):
if uBitFound and not last[self][0]:
gobject.idle_add(self.indicator.set_from_file, os.path.join(WORKINGDIR, "data", "uBitFound.png"))
elif last[self][0] and not uBitFound:
gobject.idle_add(self.indicator.set_from_file, os.path.join(WORKINGDIR, "data", "uBitNotFound.png"))
uBitUploading = False
else:
gobject.idle_add(self.indicator.set_from_file, os.path.join(WORKINGDIR, "data", "uBitUploading.png"))
last[self] = (uBitFound, uBitUploading)
time.sleep(0.2)
def upload(self):
global mbedUploading
if os.path.exists('%s/build/bbc-microbit-classic-gcc/source/microbit-build-combined.hex' % buildLocation):
if os.path.exists(SETTINGS['mbitLocation']):
end = open('%s/build/bbc-microbit-classic-gcc/source/microbit-build-combined.hex' % buildLocation).read()
open(
'%s/microbit-build-combined.hex' % SETTINGS['mbitLocation'],
'w'
).write(end)
else:
gobject.idle_add(self.message, """Cannot upload!
Micro:Bit not found!
Check it is plugged in and
Micro:Pi knows where to find it.""")
else:
gobject.idle_add(self.message, """No build files avaliable""")
mbedUploading = False
def updateTitle():
lastTitle = {}
while True:
for self in OPENWINDOWS:
start = '*' if self.getModified() else ''
fn = os.path.basename(self.saveLocation)
full = os.path.dirname(self.saveLocation)
end = 'Micro:Pi'
title = '%s%s - %s - %s' % (start, fn, full, end)
if self not in lastTitle:
lastTitle[self] = ''
if title != lastTitle[self]:
gobject.idle_add(self.window.set_title, title)
lastTitle[self] = title
time.sleep(0.1)
def serialPoller(self):
start = True
def addText(self, text):
tb = self.consoleBody.get_buffer()
tb.insert(tb.get_end_iter(), text)
while True:
if self.serialConnection:
try:
data = self.serialConnection.read()
d2 = ''
for i in data:
if i in string.printable:
d2 += i
gobject.idle_add(addText, self, d2)
except:
pass
else:
try:
self.serialConnection = serial.serial_for_url(self.serialLocation)
self.serialConnection.baudrate = self.baudrate
except:
pass
time.sleep(0.1)
def inlineSerialPoller(self):
start = True
def addText(self, text):
tb = self.serialConsoleBody.get_buffer()
tb.insert(tb.get_end_iter(), text)
while True:
if self.serialConnection:
try:
data = self.serialConnection.read()
d2 = ''
for i in data:
if i in string.printable:
d2 += i
gobject.idle_add(addText, self, d2)
except:
pass
else:
try:
self.serialConnection = serial.serial_for_url(self.serialLocation)
self.serialConnection.baudrate = self.baudrate
except:
pass
time.sleep(0.1)
def _execute(self, target, func):
try:
DBG("Running %s (%s %s)" % (self.func,
str(self.args),
str(self.kwargs)))
DBG(self.desc)
result = func(*self.args, **self.kwargs)
except errors.InvalidMemoryLocation, e:
result = e
except Exception, e:
LOG.error("Exception running RadioJob: %s" % e)
log_exception()
LOG.error("Job Args: %s" % str(self.args))
LOG.error("Job KWArgs: %s" % str(self.kwargs))
LOG.error("Job Called from:%s%s" %
(os.linesep, "".join(self.tb[:-1])))
result = e
if self.cb:
gobject.idle_add(self.cb, result, *self.cb_args)
def start_job(generator):
"""Start a job (a coroutine that yield generic tasks)."""
def _task_return(result):
"""Function to be sent to tasks to be used as task_return."""
def _advance_generator():
try:
new_task = generator.send(result)
except StopIteration:
return
new_task(_task_return)
# make sure the generator is advanced in the main thread
gobject.idle_add(_advance_generator)
_task_return(None)
return generator
# 2 task examples: sleep_task, threaded_task
def __setitem__(self, fd, obj):
if fd in self._fd_map:
self._remove_watch(self._fd_map[fd])
source_id = gobject.idle_add(self._add_watch, obj,
priority=IO_PRIORITY)
self._watch_map[obj] = (source_id, 'idle_add')
self._fd_map[fd] = obj
def _handle_io(self, obj, mask):
asyncore.readwrite(obj, mask)
# Make sure objects removed during the readwrite() aren't re-added
if obj._fileno not in self._fd_map:
return False
# If read-/writability has changed, change watch mask
if self._get_mask(obj) != self._watch_map[obj][1]:
source_id = gobject.idle_add(self._add_watch, obj,
priority=IO_PRIORITY)
self._watch_map[obj] = (source_id, 'idle_add')
return False
return True
def dispatch_event(self, event):
""" Dispatch a launched event to all affected listeners.
@param event: Event launched.
@type event: C{L{Event}}
"""
if event.type in self.listeners and self.listeners[event.type]:
for obj in self.listeners[event.type]:
# Try to call event-specific handle method
fctname = obj.event_pattern %(event.type)
if hasattr(obj, fctname):
function = getattr(obj, fctname)
if callable(function):
if dispatcher == 'gobject':
import gobject
gobject.idle_add(function, event, priority=gobject.PRIORITY_HIGH)
elif dispatcher == 'callback':
function(event)
continue
else:
logger.warning('Event-specific handler found but not callable.')
# Try to call default handle method
if hasattr(obj, obj.event_default):
function = getattr(obj, obj.event_default)
if callable(function):
if dispatcher == 'gobject':
import gobject
gobject.idle_add(function, event, priority=gobject.PRIORITY_HIGH)
elif dispatcher == 'callback':
function(event)
continue
# No handle method found, raise error ?
if not obj.event_silent:
raise UnhandledEventError('%s has no method to handle %s' %(obj, event))
else:
#logger.warning('No listener for the event type %r.', event.type)
pass
def run(self):
while not self.quit:
#print "sim: Wait for go"
self.go.wait() # wait until the main (view) thread gives us the go signal
self.go.clear()
if self.quit:
break
#self.go.clear()
#print "sim: Acquire lock"
self.lock.acquire()
try:
if 0:
if ns3.core.Simulator.IsFinished():
self.viz.play_button.set_sensitive(False)
break
#print "sim: Current time is %f; Run until: %f" % (ns3.Simulator.Now ().GetSeconds (), self.target_time)
#if ns3.Simulator.Now ().GetSeconds () > self.target_time:
# print "skipping, model is ahead of view!"
self.sim_helper.SimulatorRunUntil(ns.core.Seconds(self.target_time))
#print "sim: Run until ended at current time: ", ns3.Simulator.Now ().GetSeconds ()
self.pause_messages.extend(self.sim_helper.GetPauseMessages())
gobject.idle_add(self.viz.update_model, priority=PRIORITY_UPDATE_MODEL)
#print "sim: Run until: ", self.target_time, ": finished."
finally:
self.lock.release()
#print "sim: Release lock, loop."
# enumeration
def start():
assert Visualizer.INSTANCE is None
if _import_error is not None:
import sys
print >> sys.stderr, "No visualization support (%s)." % (str(_import_error),)
ns.core.Simulator.Run()
return
load_plugins()
viz = Visualizer()
for hook, args in initialization_hooks:
gobject.idle_add(hook, viz, *args)
ns.network.Packet.EnablePrinting()
viz.start()
def copy_url(url):
"""Copy the url into the clipboard."""
# try windows first
try:
import win32clipboard
except ImportError:
# then give pbcopy a try. do that before gtk because
# gtk might be installed on os x but nobody is interested
# in the X11 clipboard there.
from subprocess import Popen, PIPE
try:
client = Popen(['pbcopy'], stdin=PIPE)
except OSError:
try:
import pygtk
pygtk.require('2.0')
import gtk
import gobject
except ImportError:
return
gtk.clipboard_get(gtk.gdk.SELECTION_CLIPBOARD).set_text(url)
gobject.idle_add(gtk.main_quit)
gtk.main()
else:
client.stdin.write(url)
client.stdin.close()
client.wait()
else:
win32clipboard.OpenClipboard()
win32clipboard.EmptyClipboard()
win32clipboard.SetClipboardText(url)
win32clipboard.CloseClipboard()
def main(self, task=None,checkpoint=None):
self.zarj_comm.start()
self.window.show_all()
self.idle_id = gobject.idle_add(self.gtk_idle_cb)
if task is not None and checkpoint is not None:
msg = ZarjStartCommand(task, checkpoint, True)
self.zarj_comm.push_message(msg)
gtk.main()
self.zarj_comm.stop()
def _source_selector_changed(self, wid):
idx = wid.get_active()
if idx < 0:
return
source = wid.get_model()[idx][1]
self.current_source = source
def f():
self.choose_file()
return False
gobject.idle_add(f)
# _source_selector_changed()
def _do_play(self, wid):
if self.last_played:
gobject.idle_add(self._delayed_play, self.last_played)
else:
self.choose_file()
# _do_play()
def choose_file(self):
if not self.current_source:
return
url = self.current_source.get_url_from_chooser()
if url:
gobject.idle_add(self._delayed_play, url)
# choose_file()
def emit(self, *args):
gobject.idle_add(gobject.GObject.emit,self,*args)
def _main_window_setup(self, now=False):
def create(self):
wcfg = self.config['main_window']
window = gtk.Window(gtk.WINDOW_TOPLEVEL)
self.main_window = {'window': window}
if wcfg.get('style', 'default') == 'default':
self._main_window_default_style()
else:
raise NotImplementedError('We only have one style atm.')
if wcfg.get('close_quits'):
window.connect("delete_event", lambda a1,a2: gtk.main_quit())
else:
window.connect('delete-event', lambda w, e: w.hide() or True)
window.connect("destroy", lambda wid: gtk.main_quit())
window.set_title(self.config.get('app_name', 'gui-o-matic'))
window.set_decorated(True)
if wcfg.get('center', False):
window.set_position(gtk.WIN_POS_CENTER)
window.set_size_request(
wcfg.get('width', 360), wcfg.get('height',360))
if wcfg.get('show'):
window.show_all()
if now:
create(self)
else:
gobject.idle_add(create, self)
def hide_splash_screen(self, now=False):
def hide(self):
for k in self.splash or []:
if self.splash[k] is not None:
self.splash[k].destroy()
self.splash = None
if now:
hide(self)
else:
gobject.idle_add(hide, self)
def set_status(self, status='startup', now=False):
if now:
do = lambda o, a: o(a)
else:
do = gobject.idle_add
icons = self.config.get('indicator', {}).get('icons')
if icons:
icon = icons.get(status)
if not icon:
icon = icons.get('normal')
if icon:
self._indicator_set_icon(icon, do=do)
self._indicator_set_status(status, do=do)
def set_item_label(self, item=None, label=None):
if item and item in self.items:
gobject.idle_add(self.items[item].set_label, label)
def _indicator_set_icon(self, icon, do=gobject.idle_add):
do(self.ind.set_icon, self._theme_image(icon))
def _indicator_set_status(self, status, do=gobject.idle_add):
do(self.ind.set_status,
self._STATUS_MODES.get(status, appindicator.STATUS_ATTENTION))
def load_preview(self, f, canon, *args):
self.set_canon(canon)
result, seq = gcode.parse(f, canon, *args)
self.label.set_text("Generating Preview. Please wait ...")
gobject.idle_add(self.loading_finished, self, result, seq)
if result <= gcode.MIN_ERROR:
self.canon.progress.nextphase(1)
canon.calc_extents()
self.stale_dlist('program_rapids')
self.stale_dlist('program_norapids')
self.stale_dlist('select_rapids')
self.stale_dlist('select_norapids')
def _busthread_call(self, method, *args):
gobject.idle_add(method, *args)
def busthread_call(self, method, *args):
gobject.idle_add(method, *args)
def on_search_stop(self):
self.search_button.show()
self.stop_button.hide()
gobject.idle_add(self.progress.set_fraction, 0.0)
def update_progress(self, fraction=None):
t = time.time()
if t - self._last_update_progress < 0.25:
return
self._last_update_progress = t
gobject.idle_add(self.update_progress_immediate, fraction)