python类idle_add()的实例源码

recipe-551774.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _add_watch(self, obj):
        mask = self._get_mask(obj)
        if mask:
            source_id = gobject.io_add_watch(obj, mask, self._handle_io,
                                             priority=IO_PRIORITY)
            self._watch_map[obj] = (source_id, mask)
            return False

        # This should be exceptional. The channel is still open, but is neither
        # readable nor writable. Retry until it is, but with a timeout_add() to
        # preserve CPU.
        if self._watch_map[obj][1] == 'idle_add':
            source_id = gobject.timeout_add(200, self._add_watch, obj,
                                            priority=gobject.PRIORITY_LOW)
            self._watch_map[obj] = (source_id, 'timeout_add')
            return False

        return True
micropi.py 文件源码 项目:Micro-Pi 作者: Bottersnike 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def uBitPoller():
    global uBitFound
    global uBitUploading
    last = {}
    while True:
        for self in OPENWINDOWS:
            if self not in last:
                last[self] = (False, False)
            uBitFound = os.path.exists(SETTINGS['mbitLocation'])
            if not (uBitUploading and uBitFound):
                if uBitFound and not last[self][0]:
                    gobject.idle_add(self.indicator.set_from_file, os.path.join(WORKINGDIR, "data", "uBitFound.png"))
                elif last[self][0] and not uBitFound:
                    gobject.idle_add(self.indicator.set_from_file, os.path.join(WORKINGDIR, "data", "uBitNotFound.png"))
                    uBitUploading = False
            else:
                gobject.idle_add(self.indicator.set_from_file, os.path.join(WORKINGDIR, "data", "uBitUploading.png"))
            last[self] = (uBitFound, uBitUploading)
        time.sleep(0.2)
micropi.py 文件源码 项目:Micro-Pi 作者: Bottersnike 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def upload(self):
    global mbedUploading
    if os.path.exists('%s/build/bbc-microbit-classic-gcc/source/microbit-build-combined.hex' % buildLocation):
        if os.path.exists(SETTINGS['mbitLocation']):
            end = open('%s/build/bbc-microbit-classic-gcc/source/microbit-build-combined.hex' % buildLocation).read()
            open(
                '%s/microbit-build-combined.hex' % SETTINGS['mbitLocation'],
                'w'
            ).write(end)
        else:
            gobject.idle_add(self.message, """Cannot upload!
Micro:Bit not found!
Check it is plugged in and
Micro:Pi knows where to find it.""")
    else:
        gobject.idle_add(self.message, """No build files avaliable""")
    mbedUploading = False
micropi.py 文件源码 项目:Micro-Pi 作者: Bottersnike 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def updateTitle():
    lastTitle = {}
    while True:
        for self in OPENWINDOWS:
            start = '*' if self.getModified() else ''
            fn = os.path.basename(self.saveLocation)
            full = os.path.dirname(self.saveLocation)
            end = 'Micro:Pi'

            title = '%s%s - %s - %s' % (start, fn, full, end)

            if self not in lastTitle:
                lastTitle[self] = ''
            if title != lastTitle[self]:
                gobject.idle_add(self.window.set_title, title)

            lastTitle[self] = title

        time.sleep(0.1)
micropi.py 文件源码 项目:Micro-Pi 作者: Bottersnike 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def serialPoller(self):
    start = True
    def addText(self, text):
        tb = self.consoleBody.get_buffer()
        tb.insert(tb.get_end_iter(), text)
    while True:
        if self.serialConnection:
            try:
                data = self.serialConnection.read()
                d2 = ''
                for i in data:
                    if i in string.printable:
                        d2 += i
                gobject.idle_add(addText, self, d2)
            except:
                pass
        else:
            try:
                self.serialConnection = serial.serial_for_url(self.serialLocation)
                self.serialConnection.baudrate = self.baudrate
            except:
                pass
            time.sleep(0.1)
micropi.py 文件源码 项目:Micro-Pi 作者: Bottersnike 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def inlineSerialPoller(self):
    start = True
    def addText(self, text):
        tb = self.serialConsoleBody.get_buffer()
        tb.insert(tb.get_end_iter(), text)
    while True:
        if self.serialConnection:
            try:
                data = self.serialConnection.read()
                d2 = ''
                for i in data:
                    if i in string.printable:
                        d2 += i
                gobject.idle_add(addText, self, d2)
            except:
                pass
        else:
            try:
                self.serialConnection = serial.serial_for_url(self.serialLocation)
                self.serialConnection.baudrate = self.baudrate
            except:
                pass
            time.sleep(0.1)
common.py 文件源码 项目:chirp_fork 作者: mach327 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _execute(self, target, func):
        try:
            DBG("Running %s (%s %s)" % (self.func,
                                        str(self.args),
                                        str(self.kwargs)))
            DBG(self.desc)
            result = func(*self.args, **self.kwargs)
        except errors.InvalidMemoryLocation, e:
            result = e
        except Exception, e:
            LOG.error("Exception running RadioJob: %s" % e)
            log_exception()
            LOG.error("Job Args:   %s" % str(self.args))
            LOG.error("Job KWArgs: %s" % str(self.kwargs))
            LOG.error("Job Called from:%s%s" %
                      (os.linesep, "".join(self.tb[:-1])))
            result = e

        if self.cb:
            gobject.idle_add(self.cb, result, *self.cb_args)
dockyard_noblock.py 文件源码 项目:dockyard 作者: maidstone-hackspace 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def start_job(generator):
    """Start a job (a coroutine that yield generic tasks)."""
    def _task_return(result):
        """Function to be sent to tasks to be used as task_return."""
        def _advance_generator():
            try:
                new_task = generator.send(result)
            except StopIteration:
                return
            new_task(_task_return)
        # make sure the generator is advanced in the main thread
        gobject.idle_add(_advance_generator)            
    _task_return(None)
    return generator

# 2 task examples: sleep_task, threaded_task
recipe-551774.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __setitem__(self, fd, obj):
        if fd in self._fd_map:
            self._remove_watch(self._fd_map[fd])
        source_id = gobject.idle_add(self._add_watch, obj,
                                     priority=IO_PRIORITY)
        self._watch_map[obj] = (source_id, 'idle_add')
        self._fd_map[fd] = obj
recipe-551774.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def _handle_io(self, obj, mask):
        asyncore.readwrite(obj, mask)

        # Make sure objects removed during the readwrite() aren't re-added
        if obj._fileno not in self._fd_map:
            return False

        # If read-/writability has changed, change watch mask
        if self._get_mask(obj) != self._watch_map[obj][1]:
            source_id = gobject.idle_add(self._add_watch, obj,
                                         priority=IO_PRIORITY)
            self._watch_map[obj] = (source_id, 'idle_add')
            return False

        return True
easyevent.py 文件源码 项目:qr-lipsync 作者: UbiCastTeam 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def dispatch_event(self, event):
        """ Dispatch a launched event to all affected listeners.
        @param event: Event launched.
        @type event: C{L{Event}}
        """
        if event.type in self.listeners and self.listeners[event.type]:
            for obj in self.listeners[event.type]:
                # Try to call event-specific handle method
                fctname = obj.event_pattern %(event.type)
                if hasattr(obj, fctname):
                    function = getattr(obj, fctname)
                    if callable(function):
                        if dispatcher == 'gobject':
                            import gobject
                            gobject.idle_add(function, event, priority=gobject.PRIORITY_HIGH)
                        elif dispatcher == 'callback':
                            function(event)
                        continue
                    else:
                        logger.warning('Event-specific handler found but not callable.')
                # Try to call default handle method
                if hasattr(obj, obj.event_default):
                    function = getattr(obj, obj.event_default)
                    if callable(function):
                        if dispatcher == 'gobject':
                            import gobject
                            gobject.idle_add(function, event, priority=gobject.PRIORITY_HIGH)
                        elif dispatcher == 'callback':
                            function(event)
                        continue
                # No handle method found, raise error ?
                if not obj.event_silent:
                    raise UnhandledEventError('%s has no method to handle %s' %(obj, event))
        else:
            #logger.warning('No listener for the event type %r.', event.type)
            pass
core.py 文件源码 项目:ns3-rdma 作者: bobzhuyb 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def run(self):
        while not self.quit:
            #print "sim: Wait for go"
            self.go.wait() # wait until the main (view) thread gives us the go signal
            self.go.clear()
            if self.quit:
                break
            #self.go.clear()
            #print "sim: Acquire lock"
            self.lock.acquire()
            try:
                if 0:
                    if ns3.core.Simulator.IsFinished():
                        self.viz.play_button.set_sensitive(False)
                        break
                #print "sim: Current time is %f; Run until: %f" % (ns3.Simulator.Now ().GetSeconds (), self.target_time)
                #if ns3.Simulator.Now ().GetSeconds () > self.target_time:
                #    print "skipping, model is ahead of view!"
                self.sim_helper.SimulatorRunUntil(ns.core.Seconds(self.target_time))
                #print "sim: Run until ended at current time: ", ns3.Simulator.Now ().GetSeconds ()
                self.pause_messages.extend(self.sim_helper.GetPauseMessages())
                gobject.idle_add(self.viz.update_model, priority=PRIORITY_UPDATE_MODEL)
                #print "sim: Run until: ", self.target_time, ": finished."
            finally:
                self.lock.release()
            #print "sim: Release lock, loop."

# enumeration
core.py 文件源码 项目:ns3-rdma 作者: bobzhuyb 项目源码 文件源码 阅读 14 收藏 0 点赞 0 评论 0
def start():
    assert Visualizer.INSTANCE is None
    if _import_error is not None:
        import sys
        print >> sys.stderr, "No visualization support (%s)." % (str(_import_error),)
        ns.core.Simulator.Run()
        return
    load_plugins()
    viz = Visualizer()
    for hook, args in initialization_hooks:
        gobject.idle_add(hook, viz, *args)
    ns.network.Packet.EnablePrinting()
    viz.start()
lodgeit.py 文件源码 项目:yt 作者: yt-project 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def copy_url(url):
    """Copy the url into the clipboard."""
    # try windows first
    try:
        import win32clipboard
    except ImportError:
        # then give pbcopy a try.  do that before gtk because
        # gtk might be installed on os x but nobody is interested
        # in the X11 clipboard there.
        from subprocess import Popen, PIPE
        try:
            client = Popen(['pbcopy'], stdin=PIPE)
        except OSError:
            try:
                import pygtk
                pygtk.require('2.0')
                import gtk
                import gobject
            except ImportError:
                return
            gtk.clipboard_get(gtk.gdk.SELECTION_CLIPBOARD).set_text(url)
            gobject.idle_add(gtk.main_quit)
            gtk.main()
        else:
            client.stdin.write(url)
            client.stdin.close()
            client.wait()
    else:
        win32clipboard.OpenClipboard()
        win32clipboard.EmptyClipboard()
        win32clipboard.SetClipboardText(url)
        win32clipboard.CloseClipboard()
oc.py 文件源码 项目:srcsim2017 作者: ZarjRobotics 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def main(self, task=None,checkpoint=None):
        self.zarj_comm.start()
        self.window.show_all()
        self.idle_id = gobject.idle_add(self.gtk_idle_cb)

        if task is not None and checkpoint is not None:
            msg = ZarjStartCommand(task, checkpoint, True)
            self.zarj_comm.push_message(msg)

        gtk.main()
        self.zarj_comm.stop()
client.py 文件源码 项目:barbieri-playground 作者: barbieri 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _source_selector_changed(self, wid):
        idx = wid.get_active()
        if idx < 0:
            return

        source = wid.get_model()[idx][1]
        self.current_source = source

        def f():
            self.choose_file()
            return False
        gobject.idle_add(f)
    # _source_selector_changed()
client.py 文件源码 项目:barbieri-playground 作者: barbieri 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def _do_play(self, wid):
        if self.last_played:
            gobject.idle_add(self._delayed_play, self.last_played)
        else:
            self.choose_file()
    # _do_play()
client.py 文件源码 项目:barbieri-playground 作者: barbieri 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def choose_file(self):
        if not self.current_source:
            return

        url = self.current_source.get_url_from_chooser()
        if url:
            gobject.idle_add(self._delayed_play, url)
    # choose_file()
monitor_dir.py 文件源码 项目:gpvdm 作者: roderickmackenzie 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def emit(self, *args):
        gobject.idle_add(gobject.GObject.emit,self,*args)
gtkbase.py 文件源码 项目:gui-o-matic 作者: mailpile 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def _main_window_setup(self, now=False):
        def create(self):
            wcfg = self.config['main_window']

            window = gtk.Window(gtk.WINDOW_TOPLEVEL)
            self.main_window = {'window': window}

            if wcfg.get('style', 'default') == 'default':
                self._main_window_default_style()
            else:
                raise NotImplementedError('We only have one style atm.')

            if wcfg.get('close_quits'):
                window.connect("delete_event", lambda a1,a2: gtk.main_quit())
            else:
                window.connect('delete-event', lambda w, e: w.hide() or True)
            window.connect("destroy", lambda wid: gtk.main_quit())

            window.set_title(self.config.get('app_name', 'gui-o-matic'))
            window.set_decorated(True)
            if wcfg.get('center', False):
                window.set_position(gtk.WIN_POS_CENTER)
            window.set_size_request(
                wcfg.get('width', 360), wcfg.get('height',360))
            if wcfg.get('show'):
                window.show_all()

        if now:
            create(self)
        else:
            gobject.idle_add(create, self)
gtkbase.py 文件源码 项目:gui-o-matic 作者: mailpile 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def hide_splash_screen(self, now=False):
        def hide(self):
            for k in self.splash or []:
                if self.splash[k] is not None:
                    self.splash[k].destroy()
            self.splash = None
        if now:
            hide(self)
        else:
            gobject.idle_add(hide, self)
gtkbase.py 文件源码 项目:gui-o-matic 作者: mailpile 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def set_status(self, status='startup', now=False):
        if now:
            do = lambda o, a: o(a)
        else:
            do = gobject.idle_add
        icons = self.config.get('indicator', {}).get('icons')
        if icons:
            icon = icons.get(status)
            if not icon:
                icon = icons.get('normal')
            if icon:
                self._indicator_set_icon(icon, do=do)
        self._indicator_set_status(status, do=do)
gtkbase.py 文件源码 项目:gui-o-matic 作者: mailpile 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def set_item_label(self, item=None, label=None):
        if item and item in self.items:
            gobject.idle_add(self.items[item].set_label, label)
unity.py 文件源码 项目:gui-o-matic 作者: mailpile 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _indicator_set_icon(self, icon, do=gobject.idle_add):
        do(self.ind.set_icon, self._theme_image(icon))
unity.py 文件源码 项目:gui-o-matic 作者: mailpile 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def _indicator_set_status(self, status, do=gobject.idle_add):
        do(self.ind.set_status,
           self._STATUS_MODES.get(status, appindicator.STATUS_ATTENTION))
glcanon.py 文件源码 项目:hazzy 作者: KurtJacobson 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def load_preview(self, f, canon, *args):
        self.set_canon(canon)
        result, seq = gcode.parse(f, canon, *args)

        self.label.set_text("Generating Preview. Please wait ...")
        gobject.idle_add(self.loading_finished, self, result, seq)

        if result <= gcode.MIN_ERROR:
            self.canon.progress.nextphase(1)
            canon.calc_extents()
            self.stale_dlist('program_rapids')
            self.stale_dlist('program_norapids')
            self.stale_dlist('select_rapids')
            self.stale_dlist('select_norapids')
master.py 文件源码 项目:rebus 作者: airbus-seclab 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _busthread_call(self, method, *args):
        gobject.idle_add(method, *args)
slave.py 文件源码 项目:rebus 作者: airbus-seclab 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def busthread_call(self, method, *args):
        gobject.idle_add(method, *args)
smalictrace.py 文件源码 项目:linux-pentest-util 作者: fikr4n 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def on_search_stop(self):
        self.search_button.show()
        self.stop_button.hide()
        gobject.idle_add(self.progress.set_fraction, 0.0)
smalictrace.py 文件源码 项目:linux-pentest-util 作者: fikr4n 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def update_progress(self, fraction=None):
        t = time.time()
        if t - self._last_update_progress < 0.25:
            return
        self._last_update_progress = t
        gobject.idle_add(self.update_progress_immediate, fraction)


问题


面经


文章

微信
公众号

扫码关注公众号