def __init__(self, oneconfviewpickler):
'''Controller of the installed pane'''
LOG.debug("OneConf Handler init")
super(OneConfHandler, self).__init__()
# OneConf stuff
self.oneconf = DbusConnect()
self.oneconf.hosts_dbus_object.connect_to_signal('hostlist_changed',
self.refresh_hosts)
self.oneconf.hosts_dbus_object.connect_to_signal('packagelist_changed',
self._on_store_packagelist_changed)
self.oneconf.hosts_dbus_object.connect_to_signal('latestsync_changed',
self.on_new_latest_oneconf_sync_timestamp)
self.already_registered_hostids = []
self.is_current_registered = False
self.oneconfviewpickler = oneconfviewpickler
# refresh host list
self._refreshing_hosts = False
GObject.timeout_add_seconds(MIN_TIME_WITHOUT_ACTIVITY,
self.get_latest_oneconf_sync)
GObject.idle_add(self.refresh_hosts)
python类timeout_add_seconds()的实例源码
def __init__(self, oneconfviewpickler):
'''Controller of the installed pane'''
LOG.debug("OneConf Handler init")
super(OneConfHandler, self).__init__()
# OneConf stuff
self.oneconf = DbusConnect()
self.oneconf.hosts_dbus_object.connect_to_signal('hostlist_changed',
self.refresh_hosts)
self.oneconf.hosts_dbus_object.connect_to_signal('packagelist_changed',
self._on_store_packagelist_changed)
self.oneconf.hosts_dbus_object.connect_to_signal('latestsync_changed',
self.on_new_latest_oneconf_sync_timestamp)
self.already_registered_hostids = []
self.is_current_registered = False
self.oneconfviewpickler = oneconfviewpickler
# refresh host list
self._refreshing_hosts = False
GObject.timeout_add_seconds(MIN_TIME_WITHOUT_ACTIVITY,
self.get_latest_oneconf_sync)
GObject.idle_add(self.refresh_hosts)
def main():
global indicator, menu
indicator = appindicator.Indicator.new(APPINDICATOR_ID, os.path.abspath('closed.svg'), appindicator.IndicatorCategory.SYSTEM_SERVICES)
indicator.set_status(appindicator.IndicatorStatus.ACTIVE)
pubnub = set_up_pubnub()
menu = gtk.Menu()
item = gtk.MenuItem('Quit')
item.connect('activate', die, pubnub)
menu.append(item)
menu.show_all()
indicator.set_menu(menu)
indicator.set_icon(os.path.abspath("closed.svg"))
notify.init(APPINDICATOR_ID)
GObject.timeout_add_seconds(1, check_caps, pubnub)
gtk.main()
def set_sbrick(self, sbrick):
self.sbrick = sbrick
self.set_sensitive(sbrick is not None)
if sbrick is not None:
self.refresh_all_values()
GObject.timeout_add_seconds(5, self.refresh_updating_values)
def Delay (self, seconds, reply_handler, error_handler):
print "Sleeping for %ds" % seconds
gobject.timeout_add_seconds (seconds,
lambda: reply_handler (seconds))
def _ask_and_repair_broken_cache(self):
# wait until the window window is available
if self.window_main.props.visible == False:
GObject.timeout_add_seconds(1, self._ask_and_repair_broken_cache)
return
if dialogs.confirm_repair_broken_cache(self.window_main,
self.datadir):
self.backend.fix_broken_depends()
def initiate_purchase(self, app, iconname, url=None, html=None):
"""
initiates the purchase workflow inside the embedded webkit window
for the item specified
"""
if not self._ask_for_tos_acceptance_if_needed():
self.emit("terms-of-service-declined")
return False
self.init_view()
self.app = app
self.iconname = iconname
self.wk.webkit.load_html_string(self.LOADING_HTML, "file:///")
self.wk.show()
while Gtk.events_pending():
Gtk.main_iteration()
if url:
self.wk.webkit.load_uri(url)
elif html:
self.wk.webkit.load_html_string(html, "file:///")
else:
self.wk.webkit.load_html_string(DUMMY_HTML, "file:///")
self.pack_start(self.wk, True, True, 0)
# only for debugging
if os.environ.get("SOFTWARE_CENTER_DEBUG_BUY"):
GObject.timeout_add_seconds(1, _generate_events, self)
return True
def get_test_window_purchaseview():
#url = "http://www.animiertegifs.de/java-scripts/alertbox.php"
url = "http://www.ubuntu.cohtml=DUMMY_m"
#d = PurchaseDialog(app=None, url="http://spiegel.de")
from softwarecenter.enums import BUY_SOMETHING_HOST
url = BUY_SOMETHING_HOST + "/subscriptions/en/ubuntu/maverick/+new/?%s" % (
urllib.urlencode({
'archive_id': "mvo/private-test",
'arch': "i386",
}))
# use cmdline if available
if len(sys.argv) > 1:
url = sys.argv[1]
# useful for debugging
#d.connect("key-press-event", _on_key_press)
#GObject.timeout_add_seconds(1, _generate_events, d)
widget = PurchaseView()
from mock import Mock
widget.config = Mock()
win = Gtk.Window()
win.set_data("view", widget)
win.add(widget)
win.set_size_request(600, 500)
win.set_position(Gtk.WindowPosition.CENTER)
win.show_all()
win.connect('destroy', Gtk.main_quit)
widget.initiate_purchase(app=None, iconname=None, url=url)
#widget.initiate_purchase(app=None, iconname=None, html=DUMMY_HTML)
return win
def queue_next(self):
self.cleanup_timeout()
self._timeout = GObject.timeout_add_seconds(
self.TIMEOUT_SECONDS, self.next_exhibit)
return self._timeout
def get_test_window():
w = OneConfViews(Gtk.IconTheme.get_default())
w.show()
win = Gtk.Window()
win.set_data("pane", w)
win.add(w)
win.set_size_request(400, 600)
win.connect("destroy", lambda x: Gtk.main_quit())
# init the view
w.register_computer("AAAAA", "NameA")
w.register_computer("ZZZZZ", "NameZ")
w.register_computer("DDDDD", "NameD")
w.register_computer("CCCCC", "NameC")
w.register_computer("", "This computer should be first")
w.select_first()
GObject.timeout_add_seconds(5, w.register_computer, "EEEEE", "NameE")
def print_selected_hostid(widget, hostid, hostname):
print "%s selected for %s" % (hostid, hostname)
w.connect("computer-changed", print_selected_hostid)
w.remove_computer("DDDDD")
win.show_all()
return win
def _on_apt_finished_stamp_changed(self, monitor, afile, other_file,
event):
if not event == Gio.FileMonitorEvent.CHANGES_DONE_HINT:
return
if self._timeout_id:
GObject.source_remove(self._timeout_id)
self._timeout_id = None
self._timeout_id = GObject.timeout_add_seconds(10, self.open)
def clear_token_from_ubuntu_sso_sync(appname):
""" send a dbus signal to the com.ubuntu.sso service to clear
the credentials for the given appname, e.g. _("Ubuntu Software Center")
and wait for it to finish (or 2s)
"""
from ubuntu_sso import (
DBUS_BUS_NAME,
DBUS_CREDENTIALS_IFACE,
DBUS_CREDENTIALS_PATH,
)
# clean
loop = GObject.MainLoop()
bus = dbus.SessionBus()
obj = bus.get_object(bus_name=DBUS_BUS_NAME,
object_path=DBUS_CREDENTIALS_PATH,
follow_name_owner_changes=True)
proxy = dbus.Interface(object=obj,
dbus_interface=DBUS_CREDENTIALS_IFACE)
proxy.connect_to_signal("CredentialsCleared", loop.quit)
proxy.connect_to_signal("CredentialsNotFound", loop.quit)
proxy.connect_to_signal("CredentialsError", loop.quit)
proxy.clear_credentials(appname, {})
# ensure we don't hang forever here
GObject.timeout_add_seconds(2, loop.quit)
# run the mainloop until the credentials are clear
loop.run()
def __init__(self, db):
self.db = db
self.distro = get_distro()
self.backend = get_install_backend()
self.backend.connect("channels-changed",
self._remove_no_longer_needed_extra_channels)
# kick off a background check for changes that may have been made
# in the channels list
GObject.timeout_add_seconds(60, self._check_for_channel_updates_timer)
# extra channels from e.g. external sources
self.extra_channels = []
self._logger = LOG
# external API
def _ask_and_repair_broken_cache(self):
# wait until the window window is available
if self.window_main.props.visible == False:
GObject.timeout_add_seconds(1, self._ask_and_repair_broken_cache)
return
if dialogs.confirm_repair_broken_cache(self.window_main,
self.datadir):
self.backend.fix_broken_depends()
def initiate_purchase(self, app, iconname, url=None, html=None):
"""
initiates the purchase workflow inside the embedded webkit window
for the item specified
"""
if not self._ask_for_tos_acceptance_if_needed():
self.emit("terms-of-service-declined")
return False
self.init_view()
self.app = app
self.iconname = iconname
self.wk.webkit.load_html_string(self.LOADING_HTML, "file:///")
self.wk.show()
while Gtk.events_pending():
Gtk.main_iteration()
if url:
self.wk.webkit.load_uri(url)
elif html:
self.wk.webkit.load_html_string(html, "file:///")
else:
self.wk.webkit.load_html_string(DUMMY_HTML, "file:///")
self.pack_start(self.wk, True, True, 0)
# only for debugging
if os.environ.get("SOFTWARE_CENTER_DEBUG_BUY"):
GObject.timeout_add_seconds(1, _generate_events, self)
return True
def get_test_window_purchaseview():
#url = "http://www.animiertegifs.de/java-scripts/alertbox.php"
url = "http://www.ubuntu.cohtml=DUMMY_m"
#d = PurchaseDialog(app=None, url="http://spiegel.de")
from softwarecenter.enums import BUY_SOMETHING_HOST
url = BUY_SOMETHING_HOST + "/subscriptions/en/ubuntu/maverick/+new/?%s" % (
urllib.urlencode({
'archive_id': "mvo/private-test",
'arch': "i386",
}))
# use cmdline if available
if len(sys.argv) > 1:
url = sys.argv[1]
# useful for debugging
#d.connect("key-press-event", _on_key_press)
#GObject.timeout_add_seconds(1, _generate_events, d)
widget = PurchaseView()
from mock import Mock
widget.config = Mock()
win = Gtk.Window()
win.set_data("view", widget)
win.add(widget)
win.set_size_request(600, 500)
win.set_position(Gtk.WindowPosition.CENTER)
win.show_all()
win.connect('destroy', Gtk.main_quit)
widget.initiate_purchase(app=None, iconname=None, url=url)
#widget.initiate_purchase(app=None, iconname=None, html=DUMMY_HTML)
return win
def queue_next(self):
self.cleanup_timeout()
self._timeout = GObject.timeout_add_seconds(
self.TIMEOUT_SECONDS, self.next_exhibit)
return self._timeout
def get_test_spinner_window():
label = Gtk.Label("foo")
spinner_notebook = SpinnerNotebook(label, "random msg")
window = Gtk.Window()
window.add(spinner_notebook)
window.set_size_request(600, 500)
window.set_position(Gtk.WindowPosition.CENTER)
window.show_all()
window.connect('destroy', Gtk.main_quit)
spinner_notebook.show_spinner("Loading for 1s ...")
GObject.timeout_add_seconds(1, lambda: spinner_notebook.hide_spinner())
return window
def _on_apt_finished_stamp_changed(self, monitor, afile, other_file,
event):
if not event == Gio.FileMonitorEvent.CHANGES_DONE_HINT:
return
if self._timeout_id:
GObject.source_remove(self._timeout_id)
self._timeout_id = None
self._timeout_id = GObject.timeout_add_seconds(10, self.open)
def clear_token_from_ubuntu_sso_sync(appname):
""" send a dbus signal to the com.ubuntu.sso service to clear
the credentials for the given appname, e.g. _("Ubuntu Software Center")
and wait for it to finish (or 2s)
"""
from ubuntu_sso import (
DBUS_BUS_NAME,
DBUS_CREDENTIALS_IFACE,
DBUS_CREDENTIALS_PATH,
)
# clean
loop = GObject.MainLoop()
bus = dbus.SessionBus()
obj = bus.get_object(bus_name=DBUS_BUS_NAME,
object_path=DBUS_CREDENTIALS_PATH,
follow_name_owner_changes=True)
proxy = dbus.Interface(object=obj,
dbus_interface=DBUS_CREDENTIALS_IFACE)
proxy.connect_to_signal("CredentialsCleared", loop.quit)
proxy.connect_to_signal("CredentialsNotFound", loop.quit)
proxy.connect_to_signal("CredentialsError", loop.quit)
proxy.clear_credentials(appname, {})
# ensure we don't hang forever here
GObject.timeout_add_seconds(2, loop.quit)
# run the mainloop until the credentials are clear
loop.run()
def __init__(self, db):
self.db = db
self.distro = get_distro()
self.backend = get_install_backend()
self.backend.connect("channels-changed",
self._remove_no_longer_needed_extra_channels)
# kick off a background check for changes that may have been made
# in the channels list
GObject.timeout_add_seconds(60, self._check_for_channel_updates_timer)
# extra channels from e.g. external sources
self.extra_channels = []
self._logger = LOG
# external API
def change_app_icon(self):
self.indicator.set_icon( self.get_icon() )
gobject.timeout_add_seconds( 60, self.update_server_status_icon )
def start_session(self, host, port, session, username, password, wait):
""" Start a session using qtnx """
self.state = "connecting"
if not os.path.exists(os.path.expanduser('~/.qtnx')):
os.mkdir(os.path.expanduser('~/.qtnx'))
# Generate qtnx's configuration file
filename = os.path.expanduser('~/.qtnx/%s-%s-%s.nxml') % (
host, port, session.replace("/", "_"))
nxml = open(filename, "w+")
config = self.NXML_TEMPLATE
config = config.replace("WL_NAME", "%s-%s-%s" % (host, port,
session.replace("/", "_")))
config = config.replace("WL_SERVER", host)
config = config.replace("WL_PORT", str(port))
config = config.replace("WL_COMMAND", "weblive-session %s" % session)
nxml.write(config)
nxml.close()
# Prepare qtnx call
cmd = [self.BINARY_PATH,
'%s-%s-%s' % (str(host), str(port), session.replace("/", "_")),
username,
password]
def qtnx_countdown():
""" Send progress events every two seconds """
if self.helper_progress == 10:
self.state = "connected"
self.emit("connected", False)
return False
else:
self.emit("progress", self.helper_progress * 10)
self.helper_progress += 1
return True
def qtnx_start_timer():
""" As we don't have a way of knowing the connection
status, we countdown from 20s
"""
self.helper_progress = 0
qtnx_countdown()
GObject.timeout_add_seconds(2, qtnx_countdown)
qtnx_start_timer()
if wait == False:
# Start in the background and attach a watch for when it exits
(self.helper_pid, stdin, stdout, stderr) = GObject.spawn_async(
cmd, standard_input=True, standard_output=True,
standard_error=True, flags=GObject.SPAWN_DO_NOT_REAP_CHILD)
GObject.child_watch_add(self.helper_pid, self._on_qtnx_exit,
filename)
else:
# Start it and wait till it finishes
p = subprocess.Popen(cmd)
p.wait()