def _add_watch(self, obj):
mask = self._get_mask(obj)
if mask:
source_id = gobject.io_add_watch(obj, mask, self._handle_io,
priority=IO_PRIORITY)
self._watch_map[obj] = (source_id, mask)
return False
# This should be exceptional. The channel is still open, but is neither
# readable nor writable. Retry until it is, but with a timeout_add() to
# preserve CPU.
if self._watch_map[obj][1] == 'idle_add':
source_id = gobject.timeout_add(200, self._add_watch, obj,
priority=gobject.PRIORITY_LOW)
self._watch_map[obj] = (source_id, 'timeout_add')
return False
return True
python类timeout_add()的实例源码
def play(args):
window = main_window(args.get('window-type', 'toplevel'), args['title'], args['width']/2, args['height']/2)
window.resize(args['width'], args['height'])
webview = create_webview()
webview.load_string(args['html'], 'text/html', 'UTF-8', args['base'])
if 'x' in args and 'y' in args:
window.move(args['x'], args['y'])
window.add(webview)
window.show_all()
if 'screenshot-file' in args:
import gobject
gobject.timeout_add(1500, save_screenshot, window, args['screenshot-file'])
def doIteration(self, delay):
# flush some pending events, return if there was something to do
# don't use the usual "while self.context.pending(): self.context.iteration()"
# idiom because lots of IO (in particular test_tcp's
# ProperlyCloseFilesTestCase) can keep us from ever exiting.
log.msg(channel='system', event='iteration', reactor=self)
if self.__pending():
self.__iteration(0)
return
# nothing to do, must delay
if delay == 0:
return # shouldn't delay, so just return
self.doIterationTimer = gobject.timeout_add(int(delay * 1000),
self.doIterationTimeout)
# This will either wake up from IO or from a timeout.
self.__iteration(1) # block
# note: with the .simulate timer below, delays > 0.1 will always be
# woken up by the .simulate timer
if self.doIterationTimer:
# if woken by IO, need to cancel the timer
gobject.source_remove(self.doIterationTimer)
self.doIterationTimer = None
def parse(self):
self.pb = ProgressBarDetails()
self.pb.set_title(self.collector.name + " Parser Output")
self.pb.appendText("Starting parser for " + self.collector.name + "...\n")
self.text_buffer = self.pb.text_buffer
if os.name == 'nt':
subprocess.Popen(
self.parserInputs,
cwd=os.path.dirname(os.path.realpath(__file__)),
stdout=subprocess.PIPE, shell=True, stderr=subprocess.PIPE)
self.status = "running"
else:
self.sub_proc = subprocess.Popen(self.parserInputs, stdout=subprocess.PIPE, shell=False)
self.status = "running"
gobject.timeout_add(100, self.update_textbuffer)
def _seek_show(self):
if not self._seek_available() or self.timepos_popup_visible:
return
self.timepos_popup_visible = True
self._seek_update_vals()
width, height = self.timepos_popup.get_size()
pheight = 55
self.timepos_popup.move(gtk.gdk.screen_width() - width,
gtk.gdk.screen_height() - height - pheight)
self.timepos_popup.show()
self.current_engine.handler_unblock(self.timepos_sync_pos_handler)
if not self.seek_autohide:
def f():
self._seek_hide()
self.seek_autohide = None
return False
self.seek_autohide = gobject.timeout_add(5000, f)
# _seek_show()
def doIteration(self, delay):
# flush some pending events, return if there was something to do
# don't use the usual "while self.context.pending(): self.context.iteration()"
# idiom because lots of IO (in particular test_tcp's
# ProperlyCloseFilesTestCase) can keep us from ever exiting.
log.msg(channel='system', event='iteration', reactor=self)
if self.__pending():
self.__iteration(0)
return
# nothing to do, must delay
if delay == 0:
return # shouldn't delay, so just return
self.doIterationTimer = gobject.timeout_add(int(delay * 1000),
self.doIterationTimeout)
# This will either wake up from IO or from a timeout.
self.__iteration(1) # block
# note: with the .simulate timer below, delays > 0.1 will always be
# woken up by the .simulate timer
if self.doIterationTimer:
# if woken by IO, need to cancel the timer
gobject.source_remove(self.doIterationTimer)
self.doIterationTimer = None
def test_timeout_add(self):
"""
A
L{reactor.callLater<twisted.internet.interfaces.IReactorTime.callLater>}
call scheduled from a C{gobject.timeout_add}
call is run on time.
"""
import gobject
reactor = self.buildReactor()
result = []
def gschedule():
reactor.callLater(0, callback)
return 0
def callback():
result.append(True)
reactor.stop()
reactor.callWhenRunning(gobject.timeout_add, 10, gschedule)
self.runReactor(reactor, 5)
self.assertEqual(result, [True])
def _send_next_ssid(self):
if not self.ssids:
logger.debug("No SSIDs available.")
return self.is_notifying
next_ssid = ''
for ssid in self.ssids:
if ssid not in self._ssids_sent:
next_ssid = ssid
break
if not next_ssid:
# all SSIDs have been sent at least once, repeat:
self._ssids_sent = []
GObject.timeout_add(self._wait_time, self._start_send_ssids)
return False
logger.debug("Sending next SSID: %s" % next_ssid)
self.value_update(string_to_dbus_array(next_ssid))
self._ssid_last_sent = next_ssid
self._ssids_sent.append(next_ssid)
return self.is_notifying
gnome_connection_manager.py 文件源码
项目:gnome-connection-manager
作者: mjun
项目源码
文件源码
阅读 18
收藏 0
点赞 0
评论 0
def find_word(self, backwards=False):
pos=-1
if backwards:
lst = range(0, self.search['index'])
lst.reverse()
lst.extend(reversed(range(self.search['index'], len(self.search['lines']))))
else:
lst = range(self.search['index'], len(self.search['lines']))
lst.extend(range(0, self.search['index']))
for i in lst:
pos = self.search['lines'][i].find(self.search['word'])
if pos != -1:
self.search['index'] = i if backwards else i + 1
#print 'found at line %d column %d, index=%d' % (i, pos, self.search['index'])
gobject.timeout_add(0, lambda: self.search['terminal'].get_adjustment().set_value(i))
self.search['terminal'].queue_draw()
break
if pos==-1:
self.search['index'] = len(self.search['lines']) if backwards else 0
def start_scrsaver(self, saver_type):
"""start screen saver"""
#start duration timer
self.video_artwork_widget = self._main_images[(self.WinMain.emu_ini.getint('movie_artwork_no') - 1)]
self.slide_duration = self.WinMain.wahcade_ini.getint('slide_duration') * 1000
self.running = True
self.movie_type = ''
self.saver_type = saver_type
self.WinMain.show_window('scrsaver')
if saver_type in ['slideshow', 'movie']:
self.scr_timer_id = gobject.timeout_add(self.slide_duration, self.on_slide_timer)
#show first game
self.on_slide_timer()
elif saver_type == 'launch_scr':
#start external screen saver
os.system(self.WinMain.emu_ini.get('scr_file'))
def start_timer(self, timer_type):
"""start given timer"""
#screen saver
if timer_type == 'scrsave' and self.scrsave_delay > 0:
if self.scrsave_timer:
gobject.source_remove(self.scrsave_timer)
self.scrsave_timer = gobject.timeout_add(2500, self.on_scrsave_timer)
#video
elif timer_type == 'video' and self.video_enabled:
#stop any playing vids first
self.stop_video()
#restart timer
self.video_timer = gobject.timeout_add(
self.delaymovieprev * 1000,
self.on_video_timer)
#joystick
elif timer_type == 'joystick' and self.joy:
self.joystick_timer = gobject.timeout_add(50, self.joy.poll, self.on_winMain_key_press)
def run(self, installSignalHandlers=1):
self.startRunning(installSignalHandlers=installSignalHandlers)
gobject.timeout_add(0, self.simulate)
self.__run()
def simulate(self):
"""Run simulation loops and reschedule callbacks.
"""
global _simtag
if _simtag is not None:
gobject.source_remove(_simtag)
self.runUntilCurrent()
timeout = min(self.timeout(), 0.1)
if timeout is None:
timeout = 0.1
# grumble
_simtag = gobject.timeout_add(int(timeout * 1010), self.simulate)
def simulate(self):
"""Run simulation loops and reschedule callbacks.
"""
global _simtag
if _simtag is not None:
gobject.source_remove(_simtag)
self.iterate()
timeout = min(self.timeout(), 0.1)
if timeout is None:
timeout = 0.1
# grumble
_simtag = gobject.timeout_add(int(timeout * 1010), self.simulate)
def _start_update_timer(self):
if self._update_timeout_id is not None:
gobject.source_remove(self._update_timeout_id)
#print "start_update_timer"
self._update_timeout_id = gobject.timeout_add(int(SAMPLE_PERIOD/min(self.speed, 1)*1e3),
self.update_view_timeout,
priority=PRIORITY_UPDATE_VIEW)
def start(self):
self.scan_topology()
self.window.connect("delete-event", self._quit)
#self._start_update_timer()
gobject.timeout_add(200, self.autoscale_view)
self.simulation.start()
try:
__IPYTHON__
except NameError:
pass
else:
self._monkey_patch_ipython()
gtk.main()
def _update_hr_msrmt_simulation(self):
print('Update HR Measurement Simulation')
if not self.notifying:
return
GObject.timeout_add(1000, self.hr_msrmt_cb)
def __init__(self, bus, index, service):
Characteristic.__init__(
self, bus, index,
self.BATTERY_LVL_UUID,
['read', 'notify'],
service)
self.notifying = False
self.battery_lvl = 100
GObject.timeout_add(5000, self.drain_battery)
def _on_disconnect(self, client, userdata, rc):
logging.error('[Disconnected] Lost connection to broker')
if self._socket_watch is not None:
gobject.source_remove(self._socket_watch)
self._socket_watch = None
logging.info('[Disconnected] Set timer')
gobject.timeout_add(5000, exit_on_error, self._reconnect)
def on_btn_startBenchmark_clicked(self, widget):
seconds = int(self.get('seconds_adjustment').get_value())
# Half time for upload speed and half for download
self.start_benchmark(seconds/2)
self.timeleft = seconds
self.get('seconds_spinbox').set_sensitive(False)
self.get('hbox_buttons').set_visible(False)
self.get('time_left_label').set_text(_("Benchmark finishing in %d seconds...") % self.timeleft)
self.countdown_event = gobject.timeout_add(1000, self.update_time_left)
self.get('hbox_status').set_visible(True)
def get_results(self):
self.more_output = gobject.timeout_add(200, self.get_more_output)
self.output_timeout = gobject.timeout_add(self.timeout*1000, self.show_results, True)
def gotScreenshot(self, handle, reply):
for i in self.cstore:
if handle == i[C_SESSION_HANDLE]:
# We want to ask for thumbnails every 5 sec after the last one.
# So if the client is too stressed and needs 7 secs to
# send a thumbnail, we'll ask for one every 12 secs.
gobject.timeout_add(5000, self.askScreenshot, handle)
# print "I got a screenshot from %s." % handle
if not reply:
return
try:
rowstride, size, pixels = reply.split('\n', 2)
except:
return
rowstride = int(rowstride)
width, height = size.split('x')
pxb = gtk.gdk.pixbuf_new_from_data(pixels,
gtk.gdk.COLORSPACE_RGB, False, 8, int(width), int(height),
rowstride)
self.currentScreenshots[handle] = pxb
self.cstore[i.path][C_PIXBUF] = pxb
return
# That handle is no longer in the cstore, remove it
try: del self.currentScreenshots[handle]
except: pass
def on_preferences_changed(self, preferences):
if self.widget is None:
return
try:
self.window.remove(self.widget)
except ValueError:
pass
self.window.add_tab(_('icIndex'), self.widget, preferences['pane'])
self.widget.show_lines(preferences['show_lines'])
# Start after a while to give time to close previous window.
gobject.timeout_add(100, lambda *a: self._init_index_ext(preferences))
self.widget.show_all()
def _clear_pagenames_cache(self):
if not self._clear_cache_scheduled:
def _clear():
'''Clear tags cache.'''
if len(self._pagenames_cache) > 200:
self._pagenames_cache = {}
self._clear_cache_scheduled = False
return False # to not call again
gobject.timeout_add(500, _clear)
self._clear_cache_scheduled = True
def _clear_pagenames_cache(self):
if not self._clear_cache_scheduled:
def _clear():
'''Clear tags cache.'''
if len(self._pagenames_cache) > 200:
self._pagenames_cache = {}
self._clear_cache_scheduled = False
return False # to not call again
gobject.timeout_add(500, _clear)
self._clear_cache_scheduled = True
def __init__( self, w, h, speed ):
super( Screen, self ).__init__( )
## Old fashioned way to connect expose. I don't savvy the gobject stuff.
self.connect ( "expose_event", self.do_expose_event )
## We want to know where the mouse is:
self.connect ( "motion_notify_event", self._mouseMoved )
## More GTK voodoo : unmask events
self.add_events ( gdk.BUTTON_PRESS_MASK | gdk.BUTTON_RELEASE_MASK | gdk.POINTER_MOTION_MASK )
## This is what gives the animation life!
gobject.timeout_add( speed, self.tick ) # Go call tick every 'speed' whatsits.
self.width, self.height = w, h
self.set_size_request ( w, h )
self.x, self.y = 11110,11111110 #unlikely first coord to prevent false hits.
def _seekbar_changed(self, seekbar):
if self.seekbar_seek_timeout:
return
def f():
self._do_seek()
self.seekbar_seek_timeout = None
return False
self.seekbar_seek_timeout = gobject.timeout_add(200, f)
# _seekbar_changed()
def _register_state_timeout(self):
self.length_tries = 20
def handler():
if not self.proc or not self.playing:
return False
else:
if self.length is None and self.length_tries:
self.length_tries -= 1
self._cmd("get_time_length")
v = self._read_ans("ANS_LENGTH=", timeout=1000)
if v:
self.length = float(v)
self.info["length"] = self.length
self.emit("media-info", self.info)
self._cmd("get_time_pos")
v = self._read_ans("ANS_TIME_POSITION=", timeout=1000)
if not v:
return True
self.pos = float(v)
self.emit("pos", self.pos)
return True
self.state_timeout = gobject.timeout_add(500, handler)
# _register_state_timeout()
def start(self):
self.timeout_id = gobject.timeout_add(int(self.step * 1000), self.tick)
def __init__(self):
gtk.DrawingArea.__init__(self)
self.graph = Graph()
self.openfilename = None
self.set_flags(gtk.CAN_FOCUS)
self.add_events(gtk.gdk.BUTTON_PRESS_MASK | gtk.gdk.BUTTON_RELEASE_MASK)
self.connect("button-press-event", self.on_area_button_press)
self.connect("button-release-event", self.on_area_button_release)
self.add_events(gtk.gdk.POINTER_MOTION_MASK | gtk.gdk.POINTER_MOTION_HINT_MASK | gtk.gdk.BUTTON_RELEASE_MASK)
self.connect("motion-notify-event", self.on_area_motion_notify)
self.connect("scroll-event", self.on_area_scroll_event)
self.connect("size-allocate", self.on_area_size_allocate)
self.connect('key-press-event', self.on_key_press_event)
self.last_mtime = None
gobject.timeout_add(1000, self.update)
self.x, self.y = 0.0, 0.0
self.zoom_ratio = 1.0
self.zoom_to_fit_on_resize = False
self.animation = NoAnimation(self)
self.drag_action = NullAction(self)
self.presstime = None
self.highlight = None