def update_queue_banners(self):
if self.goglib_box_width != 1:
new_downloads_pixbuf_width = (self.goglib_box_width - 60) / 5
else:
new_downloads_pixbuf_width = (self.mylib_box_width - 60) / 5
downloads_pixbuf_scale_level = float(new_downloads_pixbuf_width)/518
new_downloads_pixbuf_height = 240 * downloads_pixbuf_scale_level
if queue_game_image_list:
for i in range(0, len(queue_game_image_list)):
if os.path.exists(data_dir + '/images/goglib_banners/' + queue_game_image_list[i].get_name() + '.jpg'):
pixbuf = GdkPixbuf.Pixbuf.new_from_file(data_dir + '/images/goglib_banners/' + queue_game_image_list[i].get_name() + '.jpg')
else:
pixbuf = GdkPixbuf.Pixbuf.new_from_file(data_dir + '/images/mylib_banners/' + queue_game_image_list[i].get_name() + '.jpg')
pixbuf = pixbuf.scale_simple(new_downloads_pixbuf_width, new_downloads_pixbuf_height, InterpType.BILINEAR)
queue_game_image_list[i].set_from_pixbuf(pixbuf)
#~ def timer_check_for_new_games(self):
#~ self.check_for_new_games()
#~ GObject.timeout_add(30000, self.timer_check_for_new_games)
python类timeout_add()的实例源码
def timer(self):
if len(self.additional_windows_list) != 0:
for window in self.additional_windows_list:
window_notebook = window.get_child()
if window_notebook != None:
window_notebook_n_pages = window_notebook.get_n_pages()
for i in range(window_notebook_n_pages):
page_name = window_notebook.get_nth_page(i).get_name()
if page_name not in self.detached_tabs_names:
self.detached_tabs_names.append(page_name)
if (len(self.detached_tabs_names) + self.notebook.get_n_pages()) == 5:
self.button_add_tab.set_visible(False)
else:
self.button_add_tab.set_visible(True)
GObject.timeout_add(1000, self.timer)
def timerFunc(self):
if self.imageIndex < len(self.filenames):
if self.showFixation == False:
pixbuf = GdkPixbuf.Pixbuf.new_from_file_at_scale('./P300Photos/' + self.filenames[self.imageIndex], width=700, height=700, preserve_aspect_ratio=False)
self.image.set_from_pixbuf(pixbuf)
self.imageIndex += 1
gobject.timeout_add(self.testImageDuration, self.timerFunc)
self.timestamps.append(self.dataThread.samplesRead)
else:
self.image.set_from_pixbuf(self.fixation)
gobject.timeout_add(self.fixationDuration, self.timerFunc)
self.showFixation = ~self.showFixation
else:
self.image.set_from_pixbuf(self.fixation)
#gather an extra ~2 seconds of data at the end for window completion reasons
sleep(2)
self.dataThread.stop()
return False
def load_book_data(self, filename):
"""
Loads book to Viwer and moves to correct chapter and scroll position
:param filename:
"""
self.spinner.start()
self.viewer.hide()
self.right_box.add(self.spinner)
self.filename = filename
if not filename.upper().endswith(tuple(constants.NATIVE)):
convert_thread = threading.Thread(target=self.__bg_import_book, args=(filename,))
self.job_running = True
convert_thread.start()
GObject.timeout_add(100, self.__check_on_work)
else:
self.__continiue_book_loading(filename)
def start_app(datadir):
global splash_win
if not options.no_splash:
solfege.splash_win = splash_win = SplashWin()
time.sleep(0.1)
Gdk.flush()
while Gtk.events_pending():
Gtk.main_iteration()
else:
solfege.splash_win = splash_win = None
style_provider = Gtk.CssProvider()
with open("solfege.css", "r") as f:
css = f.read()
try:
style_provider.load_from_data(css)
except GObject.GError, e:
print e
pass
Gtk.StyleContext.add_provider_for_screen(
Gdk.Screen.get_default(), style_provider,
Gtk.STYLE_PROVIDER_PRIORITY_APPLICATION)
GObject.timeout_add(1, start_gui, datadir)
Gtk.main()
cryptocoin_indicator.py 文件源码
项目:cryptocoin-indicator
作者: MichelMichels
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def __init__(self):
# Applet icon
global indicator
indicator = appindicator.Indicator.new(APPINDICATOR_ID, self.exchange_app.dogecoin.icon, appindicator.IndicatorCategory.SYSTEM_SERVICES)
indicator.set_status(appindicator.IndicatorStatus.ACTIVE)
indicator.set_label('€ *.****', '100%')
# Set the menu of the indicator (default: gtk.Menu())
indicator.set_menu(self.build_menu())
# Ctrl-C behaviour
signal.signal(signal.SIGINT, signal.SIG_DFL)
# Setup the refresh every 5 minutes
gobject.timeout_add(1000*60*5, self.exchange_app.update_price, "timeout")
# First price update within 1 second
gobject.timeout_add(1000, self.exchange_app.first_update_price, "first_update")
# Main loop
gtk.main()
def __init__(self, title):
Gtk.Window.__init__(self, title=title)
self.set_border_width(10)
vbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=6)
self.add(vbox)
self.progressbar = Gtk.ProgressBar()
self.progressbar.pulse()
self.progressbar.set_show_text(True)
vbox.pack_start(self.progressbar, True, True, 0)
self.timeout_id = GObject.timeout_add(50, self.on_timeout, None)
self.set_position(Gtk.WindowPosition.CENTER_ALWAYS)
self.set_deletable(False)
self.set_decorated(False)
self.set_resizable(False)
self.set_keep_above(True)
self.fraction = 0.0
self.pulse = True
def arm (self, manager):
self.manager = manager
props = dict(obj=self.remote.path, name=self.remote.item.name, expected=self.when.isoformat( ), **self.remote.item.fields)
self.props = props
new_path = PATH + '/Scheduler/Armed/' + self.hashed
delay_ms = (self.when - datetime.datetime.now( )).total_seconds( ) * 1000
self.remote.bus.add_signal_receiver(self.cleanup, "Remove", dbus_interface=Trigger.OWN_IFACE, bus_name=BUS, path=new_path)
# manager.bus.add_signal_receiver(self.attrs, ack=self.on_success, error=self.on_error)
trigger = None
try:
trigger = Trigger(new_path, manager, props, self)
if trigger:
trigger.Armed( )
self.manager.Trigger("Arming", trigger.path)
self.trigger = trigger
print "DELAYING", delay_ms
gobject.timeout_add(delay_ms, trigger.Fire)
manager.InterfacesAdded(trigger.path, { Trigger.OWN_IFACE: props })
self.manager.Trigger("Armed", trigger.path)
except:
print "already exited?"
raise
finally:
pass
return trigger
def __init__(self, win_id=None):
"""
Builds a new GStreamer pipeline. If `win_id` is specified, it
is used as a window ID to embed the video sink using the
GstOverlay interface.
"""
super().__init__()
self._async_loop = asyncio.get_event_loop()
self._async_response = []
if platform.system() == 'Darwin':
evt = threading.Event()
error = [None]
GObject.timeout_add(1, self._build, win_id, evt, error)
evt.wait()
if error[0]:
raise PlaybinError from error[0]
else:
self._build(win_id, None, None)
def _send_next_ssid(self):
if not self.ssids:
logger.debug("No SSIDs available.")
return self.is_notifying
next_ssid = ''
for ssid in self.ssids:
if ssid not in self._ssids_sent:
next_ssid = ssid
break
if not next_ssid:
# all SSIDs have been sent at least once, repeat:
self._ssids_sent = []
GObject.timeout_add(self._wait_time, self._start_send_ssids)
return False
logger.debug("Sending next SSID: %s" % next_ssid)
self.value_update(string_to_dbus_array(next_ssid))
self._ssid_last_sent = next_ssid
self._ssids_sent.append(next_ssid)
return self.is_notifying
def praise_update(self):
GObject.timeout_add(2000, self.praise_update)
if not self.in_mv:
return None
id = self.play_list_first_row_id()
if None == id:
return None
data = self.req.praise_fetch(id)
logging.debug(data)
nums = data[id]
if nums > 100:
nums = 100
self.label_praise.set_label(str(nums))
self.lb_praise.set_value(nums)
self.comment_fetch()
# logging.debug(dir(self.lb_praise))
def test_availablepane(self):
from softwarecenter.ui.gtk3.panes.availablepane import get_test_window
win = get_test_window()
pane = win.get_data("pane")
self._p()
pane.on_search_terms_changed(None, "the")
self._p()
sortmode = pane.app_view.sort_methods_combobox.get_active_text()
self.assertEqual(sortmode, "By Relevance")
model = pane.app_view.tree_view.get_model()
len1 = len(model)
pane.on_search_terms_changed(None, "nosuchsearchtermforsure")
self._p()
len2 = len(model)
self.assertTrue(len2 < len1)
GObject.timeout_add(TIMEOUT, lambda: win.destroy())
Gtk.main()
def test_custom_lists(self):
from softwarecenter.ui.gtk3.panes.availablepane import get_test_window
win = get_test_window()
pane = win.get_data("pane")
self._p()
pane.on_search_terms_changed(None, "ark,artha,software-center")
self._p()
model = pane.app_view.tree_view.get_model()
# custom list should return three items
self.assertTrue(len(model) == 3)
# check package names, ordering is default "by relevance"
self.assertPkgInListAtIndex(0, model, "ark")
self.assertPkgInListAtIndex(1, model, "software-center")
self.assertPkgInListAtIndex(2, model, "artha")
# check that the status bar offers to install the packages
install_button = pane.action_bar.get_button(ActionButtons.INSTALL)
self.assertNotEqual(install_button, None)
GObject.timeout_add(TIMEOUT, lambda: win.destroy())
Gtk.main()
def run(self, args):
# show window as early as possible
self.window_main.show_all()
# delay cache open
GObject.timeout_add(1, self.cache.open)
# support both "pkg1 pkg" and "pkg1,pkg2" (and pkg1,pkg2 pkg3)
if args:
for (i, arg) in enumerate(args[:]):
if "," in arg:
args.extend(arg.split(","))
del args[i]
# FIXME: make this more predictable and less random
# show args when the app is ready
self.show_available_packages(args)
atexit.register(self.save_state)
def on_banner_rendered(self, renderer):
self.image = renderer.get_pixbuf()
if self.image.get_width() == 1:
# the offscreen window is not really as such content not
# correctly rendered
GObject.timeout_add(500, self.on_banner_rendered, renderer)
return
from gi.repository import Atk
self.get_accessible().set_name(
self.exhibits[self.cursor].title_translated)
self.get_accessible().set_role(Atk.Role.PUSH_BUTTON)
self._fade_in()
self.queue_next()
return False
def _fade_in(self, step=0.05):
self.alpha = 0.0
def fade_step():
retval = True
self.alpha += step
if self.alpha >= 1.0:
self.alpha = 1.0
self.old_image = None
retval = False
self.queue_draw()
return retval
GObject.timeout_add(50, fade_step)
def _on_size_allocate(self, widget, allocation):
# FIXME: mvo: allocation.height is no longer accessable with gtk3
# THIS SEEMS TO BREAK THE ANIMATION
#height = allocation.height
height = widget.get_allocation().height
if self._is_sliding_in:
self._current_height = height
GObject.timeout_add(self.ANIMATE_STEP_INTERVAL,
self._slide_in_cb,
priority=100)
elif self._is_sliding_out:
self._current_height = height
GObject.timeout_add(self.ANIMATE_STEP_INTERVAL,
self._slide_out_cb,
priority=100)
else:
self.queue_draw()
def wait_for_apt_cache_ready(f):
""" decorator that ensures that self.cache is ready using a
gtk idle_add - needs a cache as argument
"""
def wrapper(*args, **kwargs):
self = args[0]
# check if the cache is ready and
window = None
if hasattr(self, "app_view"):
window = self.app_view.get_window()
if not self.cache.ready:
if window:
window.set_cursor(self.busy_cursor)
GObject.timeout_add(500, lambda: wrapper(*args, **kwargs))
return False
# cache ready now
if window:
window.set_cursor(None)
f(*args, **kwargs)
return False
return wrapper
def run(self, args):
# show window as early as possible
self.window_main.show_all()
# delay cache open
GObject.timeout_add(1, self.cache.open)
# support both "pkg1 pkg" and "pkg1,pkg2" (and pkg1,pkg2 pkg3)
if args:
for (i, arg) in enumerate(args[:]):
if "," in arg:
args.extend(arg.split(","))
del args[i]
# FIXME: make this more predictable and less random
# show args when the app is ready
self.show_available_packages(args)
atexit.register(self.save_state)
def on_banner_rendered(self, renderer):
self.image = renderer.get_pixbuf()
if self.image.get_width() == 1:
# the offscreen window is not really as such content not
# correctly rendered
GObject.timeout_add(500, self.on_banner_rendered, renderer)
return
from gi.repository import Atk
self.get_accessible().set_name(
self.exhibits[self.cursor].title_translated)
self.get_accessible().set_role(Atk.Role.PUSH_BUTTON)
self._fade_in()
self.queue_next()
return False
def interrupt_build_and_wait(f):
""" decorator that ensures that a build of the categorised installed apps
is interrupted before a new build commences.
expects self._build_in_progress and self._halt_build as properties
"""
def wrapper(*args, **kwargs):
self = args[0]
if self._build_in_progress:
LOG.debug('Waiting for build to exit...')
self._halt_build = True
GObject.timeout_add(200, lambda: wrapper(*args, **kwargs))
return False
# ready now
self._halt_build = False
f(*args, **kwargs)
return False
return wrapper
def __init__(self):
self.battery = battery.Battery()
self.battery.new_params = None
self.battery.register_callback(self.battery_update_callback)
self.battery.update()
self.indicator = appindicator.Indicator.new(
APPINDICATOR_ID, self.get_icon(), CATEGORY)
self.indicator.set_status(appindicator.IndicatorStatus.ACTIVE)
self.indicator.set_menu(self.build_menu())
self.window = None
self.log_update_period = timedelta(minutes=5)
self.log_last_update = datetime.now() - self.log_update_period
self.battery_data = None
self.update_battery()
self.update_chart()
sec = 1000
gobject.timeout_add(1*sec, self.update_battery)
gobject.timeout_add(1*sec, self.update_log)
gobject.timeout_add(30*sec, self.update_chart)
def __init__(self, tokens_filename, audio, log):
self._log = log
self._audio = audio
self._tokens_filename = tokens_filename
self._eventQueue = queue.Queue()
persist_path = "/tmp"
for directory in ("alerts", "alerts/all", "alerts/active"):
d = os.path.join(persist_path, directory)
if not os.path.exists(d):
os.mkdir(d)
# would prefer to use sqlite, but that complains about
# our threads accessing the same connection - and dbm seems to not
# store any changes.
self.allAlerts = Shove("file:///tmp/alerts/all")
self.activeAlerts = Shove("file:///tmp/alerts/active")
#print(list(self.allAlerts.values()))
self._last_user_activity = datetime.datetime.now()
t = threading.Thread(target=self.eventQueueThread, daemon=True)
t.start()
GObject.timeout_add(500, self.alertCheck)
def Listen(self, milliseconds=None, dialogid=None):
success, state_micpipeline, pending = self._mic_pipeline.get_state(Gst.CLOCK_TIME_NONE)
if state_micpipeline == Gst.State.PLAYING:
self._log.info("Was already listening")
aplay(TIMEOUT_BEEP, _bg=True)
return False
self._log.info("Starting listening, timeout milliseconds=%s, dialog=%s", milliseconds, dialogid)
self.audio_pipeline.set_state(Gst.State.READY)
self.audio_player.set_state(Gst.State.READY)
self._waitForNothingPlaying()
if milliseconds:
self._listening_timeout_timer = GObject.timeout_add(milliseconds, self.cancelListen)
aplay(LISTENING_BEEP, _bg=True)
self._mic_pipeline.dialogid = dialogid
self.speaking_started = False
self.silence_count = 0
self._mic_pipeline.set_state(Gst.State.PLAYING)
def check_gogcom_tab(self):
current_uri = self.webpage.get_uri()
if (current_uri != None) and not (current_uri.startswith('https://www.gog.com')):
self.webpage.go_back()
self.webbrowser.open_new(current_uri)
GObject.timeout_add(1000, self.check_gogcom_tab)
def update_mylib_interface(self):
if (len(mylib_installation_queue) > 0) and (mylib_installation_queue[0] != self.mylib_now_installing):
self.mylib_now_installing = mylib_installation_queue[0]
self.mylib_install_game(mylib_installation_queue[0])
for button in mylib_setup_buttons_list:
if button.get_name() == self.mylib_now_installing:
button.set_label(_("Installing"))
if self.mylib_filters_box.get_allocation().height != 1:
self.scrolledwindow_mylib_filters.set_property('height_request', \
self.mylib_filters_box.get_allocation().height + 20)
if (self.box_mylib_page.get_allocation().width != self.mylib_box_width):
self.mylib_box_width = self.box_mylib_page.get_allocation().width
self.update_mylib_grid()
self.update_queue_banners()
if (len(mylib_installation_queue) == 0):
self.mylib_installation_status_box.set_visible(False)
else:
self.mylib_installation_status_box.set_visible(True)
GObject.timeout_add(1000, self.update_mylib_interface)
def update_goglib_interface(self):
if (len(self.goglib_new_games_list) != 0) and self.main_window.get_visible():
self.update_goglib()
if (len(goglib_installation_queue) > 0) and (goglib_installation_queue[0] != self.goglib_now_installing):
self.goglib_now_installing = goglib_installation_queue[0]
self.goglib_download_game(goglib_installation_queue[0])
for button in goglib_setup_buttons_list:
if button.get_name() == self.goglib_now_installing:
button.set_label(_("Installing"))
if self.box_goglib_filters.get_allocation().height != 1:
self.scrolledwindow_filters.set_property('height_request', \
self.box_goglib_filters.get_allocation().height + 20)
if (self.box_goglib_page.get_allocation().width != self.goglib_box_width):
self.goglib_box_width = self.box_goglib_page.get_allocation().width
self.update_goglib_grid()
self.update_queue_banners()
if (len(goglib_installation_queue) == 0):
self.box_goglib_installation_status.set_visible(False)
else:
self.box_goglib_installation_status.set_visible(True)
GObject.timeout_add(1000, self.update_goglib_interface)
def _set_repeat(self, delay=None, interval=None):
if delay is not None and self.__repeat[0] is None:
self.__tick_id = GObject.timeout_add(10, self._tick_cb)
elif delay is None and self.__repeat[0] is not None:
GObject.source_remove(self.__tick_id)
self.__repeat = (delay, interval)
def activate_polling_of_property_on_element(self, element_name="whatever", property="property", interval_ms=1000):
GObject.timeout_add(interval_ms, self.poll_property, element_name, property)
self.do_poll = True
def go_clicked(self, widget):
# update time control prior to move
gv.tc.update_gui_time_control(self.stm)
# side to move
self.stm = self.get_side_to_move()
# start a timer to display the time left while the player is thinking
gv.tc.start_clock(self.stm)
if not self.timer_active:
GObject.timeout_add(1000, gv.tc.show_time)
gv.gui.disable_menu_items()
gv.gui.disable_go_button()
gv.gui.enable_stop_button()
self.stopped = False
if gv.verbose:
print("#")
print("# " + self.get_side_to_move_string(self.stm) + " to move")
print("#")
gv.gui.apply_drag_and_drop_settings(self.player[self.stm], self.stm)
# gv.board.reduce_board_history(self.movelist)
self.engine_output.clear("w", " ")
self.engine_output.clear("b", " ")
if self.player[self.stm] == "Human":
gv.gui.set_status_bar_msg(_("ready"))
return
gv.gui.set_status_bar_msg(_("Thinking ..."))
# it's the computers turn to move. kick off a separate thread for
# computers move so that gui is still useable
self.ct = _thread.start_new_thread(self.computer_move, ())