def __init__(self, app, db):
Thread.__init__(self)
GObject.GObject.__init__(self)
self.db = db
self.counter_max = 30
self.counter = self.counter_max
self.account_id = app[0]
self.account_name = app[1]
self.secret_code = Database.fetch_secret_code(app[2])
if self.secret_code:
self.code = Code(self.secret_code)
else:
self.code_generated = False
logging.error("Could not read the secret code,"
"the keyring keys were reset manually")
self.logo = app[3]
self.start()
python类GObject()的实例源码
def __init__(self, parent, main_window):
self.parent = parent
Thread.__init__(self)
GObject.GObject.__init__(self)
self.nom = "applications-db-reader"
self.builder = Gtk.Builder.new_from_resource("/org/gnome/Authenticator/applications.ui")
self.builder.connect_signals({
"on_close" : self.close_window,
"on_key_press": self.on_key_press,
"on_apply" : self.select_application
})
self.window = self.builder.get_object("ApplicationsWindow")
self.window.set_transient_for(main_window)
self.listbox = self.builder.get_object("ApplicationsList")
self.generate_search_bar()
self.stack = self.builder.get_object("ApplicationsStack")
self.stack.set_visible_child(self.stack.get_child_by_name("loadingstack"))
self.builder.get_object("ApplicationListScrolled").add_with_viewport(self.listbox)
self.db = []
self.start()
def import_scc(self, filename):
"""
Imports simple, single-file scc-profile.
Just loads it, checks for shell() actions and asks user to enter name.
"""
files = self.builder.get_object("lstImportPackage")
# Load profile
profile = Profile(GuiActionParser())
try:
profile.load(filename)
except Exception, e:
# Profile cannot be parsed. Display error message and let user to quit
# Error message reuses page from VDF import, because they are
# basically the same
log.error(e)
self.error(str(e))
return
name = ".".join(os.path.split(filename)[-1].split(".")[0:-1])
files.clear()
o = GObject.GObject()
o.obj = profile
files.append(( 2, name, name, _("(profile)"), o ))
self.check_shell_commands()
def load_autoswitch(self):
""" Transfers autoswitch settings from config to UI """
tvItems = self.builder.get_object("tvItems")
cbShowOSD = self.builder.get_object("cbShowOSD")
conditions = AutoSwitcher.parse_conditions(self.app.config)
model = tvItems.get_model()
model.clear()
for cond in conditions.keys():
o = GObject.GObject()
o.condition = cond
o.action = conditions[cond]
a_str = o.action.describe(Action.AC_SWITCHER)
model.append((o, o.condition.describe(), a_str))
self._recursing = True
self.on_tvItems_cursor_changed()
cbShowOSD.set_active(bool(self.app.config['autoswitch_osd']))
self._recursing = False
def __init__(self):
GObject.Object.__init__(self)
settings = Gio.Settings.new("org.ubuntubudgie.plugins.budgie-extras")
settings.bind("ws-overview-index", self,
'mode_index',
Gio.SettingsBindFlags.DEFAULT)
# general
self.mode = modes[self.mode_index]
self.appbutton = Gtk.Button.new()
self.appbutton.set_relief(Gtk.ReliefStyle.NONE)
icon = Gtk.Image.new_from_icon_name("1-wso", Gtk.IconSize.MENU)
self.appbutton.set_image(icon)
self.menu = Gtk.Menu()
self.create_menu()
self.update = Thread(target=self.show_seconds)
# daemonize the thread to make the indicator stopable
self.update.setDaemon(True)
self.update.start()
def on_mcg_connect(self, connected):
if connected:
GObject.idle_add(self._connect_connected)
self._mcg.load_playlist()
self._mcg.load_albums()
self._mcg.get_status()
self._connect_action.set_state(GLib.Variant.new_boolean(True))
self._play_action.set_enabled(True)
self._clear_playlist_action.set_enabled(True)
self._panel_action.set_enabled(True)
else:
GObject.idle_add(self._connect_disconnected)
self._connect_action.set_state(GLib.Variant.new_boolean(False))
self._play_action.set_enabled(False)
self._clear_playlist_action.set_enabled(False)
self._panel_action.set_enabled(False)
def on_mcg_status(self, state, album, pos, time, volume, error):
# Album
GObject.idle_add(self._panels[Window._PANEL_INDEX_COVER].set_album, album)
# State
if state == 'play':
GObject.idle_add(self._header_bar.set_play)
GObject.idle_add(self._panels[Window._PANEL_INDEX_COVER].set_play, pos, time)
self._play_action.set_state(GLib.Variant.new_boolean(True))
elif state == 'pause' or state == 'stop':
GObject.idle_add(self._header_bar.set_pause)
GObject.idle_add(self._panels[Window._PANEL_INDEX_COVER].set_pause)
self._play_action.set_state(GLib.Variant.new_boolean(False))
# Volume
GObject.idle_add(self._header_bar.set_volume, volume)
# Error
if error is None:
self._infobar.hide()
else:
self._show_error(error)
def __init__(self, builder):
GObject.GObject.__init__(self)
self._changing_volume = False
self._setting_volume = False
# Widgets
self._header_bar = builder.get_object('headerbar')
self._stack_switcher = StackSwitcher(builder)
self._button_connect = builder.get_object('headerbar-connection')
self._button_playpause = builder.get_object('headerbar-playpause')
self._button_volume = builder.get_object('headerbar-volume')
# Signals
self._stack_switcher.connect('stack-switched', self.on_stack_switched)
self._button_handlers = {
'on_headerbar-connection_active_notify': self.on_connection_active_notify,
'on_headerbar-connection_state_set': self.on_connection_state_set,
'on_headerbar-playpause_toggled': self.on_playpause_toggled,
'on_headerbar-volume_value_changed': self.on_volume_changed,
'on_headerbar-volume_button_press_event': self.on_volume_press,
'on_headerbar-volume_button_release_event': self.on_volume_release
}
def _set_tracks(self, album):
self._songs_scale.clear_marks()
self._songs_scale.set_range(0, album.get_length())
length = 0
for track in album.get_tracks():
cur_length = length
if length > 0 and length < album.get_length():
cur_length = cur_length + 1
self._songs_scale.add_mark(
cur_length,
Gtk.PositionType.RIGHT,
GObject.markup_escape_text(
Utils.create_track_title(track)
)
)
length = length + track.get_length()
self._songs_scale.add_mark(length, Gtk.PositionType.RIGHT, "{0[0]:02d}:{0[1]:02d} minutes".format(divmod(length, 60)))
def _change_tracklist_size(self, size, notify=True, store=True):
# Set tracklist size
if size == TracklistSize.LARGE:
self._panel.set_homogeneous(True)
self._info_revealer.set_reveal_child(True)
elif size == TracklistSize.SMALL:
self._panel.set_homogeneous(False)
self._info_revealer.set_reveal_child(True)
else:
self._panel.set_homogeneous(False)
self._info_revealer.set_reveal_child(False)
# Store size
if store:
self._tracklist_size = size
# Notify signals
if notify:
self.emit('tracklist-size-changed', size)
# Resize image
GObject.idle_add(self._resize_image)
def __init__(self, services=[], filters=[avahi.LOOKUP_RESULT_LOCAL],
interface=avahi.IF_UNSPEC, protocol=avahi.PROTO_INET):
GObject.GObject.__init__(self)
self.filters = filters
self.services = services
self.interface = interface
self.protocol = protocol
try:
self.system_bus = dbus.SystemBus()
self.system_bus.add_signal_receiver(
self.avahi_dbus_connect_cb, "NameOwnerChanged", "org.freedesktop.DBus", arg0="org.freedesktop.Avahi")
except dbus.DBusException as e:
logger.error("Error Owning name on D-Bus: %s", e)
sys.exit(1)
self.db = ServiceTypeDatabase()
self.service_browsers = {}
self.started = False
def update_menu_item(self):
"""Update the menu item.
:return:
"""
GObject.idle_add(
self.menu_item.set_label,
"Ping: " + self.address + self.print_name() + self.state,
priority=GObject.PRIORITY_HIGH
)
GObject.idle_add(
self.menu_item.set_image,
self.image,
priority=GObject.PRIORITY_HIGH
)
GObject.idle_add(
self.menu_item.show,
priority=GObject.PRIORITY_HIGH
)
def __init__(self):
try:
GObject.GObject.__init__ (self)
threading.Thread.__init__ (self)
self.setDaemon (True)
self.keymap = Gdk.Keymap().get_default()
self.display = Display()
self.screen = self.display.screen()
self.window = self.screen.root
self.showscreen = Wnck.Screen.get_default()
self.ignored_masks = self.get_mask_combinations(X.LockMask | X.Mod2Mask | X.Mod5Mask)
self.map_modifiers()
self.raw_keyval = None
self.keytext = ""
except Exception as cause:
print (("init keybinding error: \n", str(cause)))
self.display = None
return None
def __init__(self, git_uri, window):
GObject.GObject.__init__(self)
self._git = git_uri
self._builder = Gtk.Builder()
self._builder.add_from_resource('/com/nautilus/git/ui/branch.ui')
self._builder.connect_signals({
"on_cancel": self._close_window,
"on_apply": self._update_branch,
"branch_changed": self._validate_branch_name
})
self._window = self._builder.get_object("window")
self._window.set_transient_for(window)
self._build_main_widget()
self._window.show_all()
def __init__(self, folders):
GObject.GObject.__init__(self)
Thread.__init__(self)
Gtk.Window.__init__(self)
# Here i assume that all folders got the same icon...
self._folders = folders
self.model = []
self._flowbox = Gtk.FlowBox()
# Threading stuff
self.setDaemon(True)
self.run()
# Window configurations
self.set_default_size(650, 500)
self.set_size_request(650, 500)
self.set_resizable(True)
self.set_position(Gtk.WindowPosition.CENTER_ON_PARENT)
# Widgets & Accelerators
self._build_header_bar()
self._build_content()
self._setup_accels()
def __init__(self, db, cache, icons, icon_size=48,
global_icon_cache=False):
GObject.GObject.__init__(self)
self.db = db
self.cache = cache
# get all categories
cat_parser = CategoriesParser(db)
self.all_categories = cat_parser.parse_applications_menu(
softwarecenter.paths.APP_INSTALL_PATH)
# reviews stats loader
self.review_loader = get_review_loader(cache, db)
# icon jazz
self.icons = icons
self.icon_size = icon_size
self._missing_icon = None # delay this until actually needed
if global_icon_cache:
self.icon_cache = _app_icon_cache
else:
self.icon_cache = {}
def _category_translate(self, catname):
""" helper that will look into the categories we got from the
parser and returns the translated name if it find it,
otherwise it resorts to plain gettext
"""
# look into parsed categories that use .directory translation
for cat in self.all_categories:
if cat.untranslated_name == catname:
return cat.name
# try normal translation first
translated_catname = _(catname)
if translated_catname == catname:
# if no normal translation is found, try to find a escaped
# translation (LP: #872760)
translated_catname = _(GObject.markup_escape_text(catname))
# the parent expect the string unescaped
translated_catname = unescape(translated_catname)
return translated_catname
def __init__(self, notebook_view, options=None):
GObject.GObject.__init__(self)
self.notebook_view = notebook_view
self.search_entry = SearchEntry()
self.search_entry.connect(
"terms-changed", self.on_search_terms_changed)
self.search_entry.connect(
"key-press-event", self.on_search_entry_key_press_event)
self.back_forward = BackForwardButton()
self.back_forward.connect(
"left-clicked", self.on_nav_back_clicked)
self.back_forward.connect(
"right-clicked", self.on_nav_forward_clicked)
self.navhistory = NavigationHistory(self.back_forward, options)
self.spinner = Gtk.Spinner()
self.all_views = {}
self.view_to_pane = {}
self._globalise_instance()
def __init__(self, pathname=None, cache=None):
GObject.GObject.__init__(self)
# initialize at creation time to avoid spurious AttributeError
self._use_agent = False
self._use_axi = False
if pathname is None:
pathname = softwarecenter.paths.XAPIAN_PATH
self._db_pathname = pathname
if cache is None:
cache = get_pkg_info()
self._aptcache = cache
self._additional_databases = []
# the xapian values as read from /var/lib/apt-xapian-index/values
self._axi_values = {}
# we open one db per thread, thread names are reused eventually
# so no memory leak
self._db_per_thread = {}
self._parser_per_thread = {}
self._axi_stamp_monitor = None
def _gio_screenshots_json_download_complete_cb(self, source, result, path):
try:
res, content, etag = source.load_contents_finish(result)
except GObject.GError:
# ignore read errors, most likely transient
return
if content is not None:
try:
content = json.loads(content)
except ValueError as e:
LOG.error("can not decode: '%s' (%s)" % (content, e))
content = None
if isinstance(content, dict):
# a list of screenshots as listsed online
screenshot_list = content['screenshots']
else:
# fallback to a list of screenshots as supplied by the axi
screenshot_list = []
# save for later and emit
self._screenshot_list = self._sort_screenshots_by_best_version(
screenshot_list)
self.emit("screenshots-available", self._screenshot_list)
def __init__(self, untranslated_name, name, iconname, query,
only_unallocated=True, dont_display=False, flags=[],
subcategories=[], sortmode=SortMethods.BY_ALPHABET,
item_limit=0):
GObject.GObject.__init__(self)
if type(name) == str:
self.name = unicode(name, 'utf8').encode('utf8')
else:
self.name = name.encode('utf8')
self.untranslated_name = untranslated_name
self.iconname = iconname
for subcategory in subcategories:
query = xapian.Query(xapian.Query.OP_OR, query, subcategory.query)
self.query = query
self.only_unallocated = only_unallocated
self.subcategories = subcategories
self.dont_display = dont_display
self.flags = flags
self.sortmode = sortmode
self.item_limit = item_limit
def _threaded_perform_search(self):
self._perform_search_complete = False
# generate a name and ensure we never have two threads
# with the same name
names = [thread.name for thread in threading.enumerate()]
for i in range(threading.active_count() + 1, 0, -1):
thread_name = 'ThreadedQuery-%s' % i
if not thread_name in names:
break
# create and start it
t = threading.Thread(
target=self._blocking_perform_search, name=thread_name)
t.start()
# don't block the UI while the thread is running
context = GObject.main_context_default()
while not self._perform_search_complete:
time.sleep(0.02) # 50 fps
while context.pending():
context.iteration()
t.join()
# call the query-complete callback
self.emit("query-complete")
def wait_for_apt_cache_ready(f):
""" decorator that ensures that self.cache is ready using a
gtk idle_add - needs a cache as argument
"""
def wrapper(*args, **kwargs):
self = args[0]
# check if the cache is ready and
window = None
if hasattr(self, "app_view"):
window = self.app_view.get_window()
if not self.cache.ready:
if window:
window.set_cursor(self.busy_cursor)
GObject.timeout_add(500, lambda: wrapper(*args, **kwargs))
return False
# cache ready now
if window:
window.set_cursor(None)
f(*args, **kwargs)
return False
return wrapper
def __init__(self):
GObject.GObject.__init__(self)
InstallBackend.__init__(self)
# transaction details for setting as meta
self.new_pkgname, self.new_appname, self.new_iconname = '', '', ''
# this is public exposed
self.pending_transactions = {}
self.client = packagekit.Client()
self.pkginfo = get_pkg_info()
self.pkginfo.open()
self._transactions_watcher = PackagekitTransactionsWatcher()
self._transactions_watcher.connect('lowlevel-transactions-changed',
self._on_lowlevel_transactions_changed)
def __init__(self, oneconfviewpickler):
'''Controller of the installed pane'''
LOG.debug("OneConf Handler init")
super(OneConfHandler, self).__init__()
# OneConf stuff
self.oneconf = DbusConnect()
self.oneconf.hosts_dbus_object.connect_to_signal('hostlist_changed',
self.refresh_hosts)
self.oneconf.hosts_dbus_object.connect_to_signal('packagelist_changed',
self._on_store_packagelist_changed)
self.oneconf.hosts_dbus_object.connect_to_signal('latestsync_changed',
self.on_new_latest_oneconf_sync_timestamp)
self.already_registered_hostids = []
self.is_current_registered = False
self.oneconfviewpickler = oneconfviewpickler
# refresh host list
self._refreshing_hosts = False
GObject.timeout_add_seconds(MIN_TIME_WITHOUT_ACTIVITY,
self.get_latest_oneconf_sync)
GObject.idle_add(self.refresh_hosts)
def __init__(self, db, cache, icons, icon_size=48,
global_icon_cache=False):
GObject.GObject.__init__(self)
self.db = db
self.cache = cache
# get all categories
cat_parser = CategoriesParser(db)
self.all_categories = cat_parser.parse_applications_menu(
softwarecenter.paths.APP_INSTALL_PATH)
# reviews stats loader
self.review_loader = get_review_loader(cache, db)
# icon jazz
self.icons = icons
self.icon_size = icon_size
self._missing_icon = None # delay this until actually needed
if global_icon_cache:
self.icon_cache = _app_icon_cache
else:
self.icon_cache = {}
def __init__(self, notebook_view, options=None):
GObject.GObject.__init__(self)
self.notebook_view = notebook_view
self.search_entry = SearchEntry()
self.search_entry.connect(
"terms-changed", self.on_search_terms_changed)
self.search_entry.connect(
"key-press-event", self.on_search_entry_key_press_event)
self.back_forward = BackForwardButton()
self.back_forward.connect(
"left-clicked", self.on_nav_back_clicked)
self.back_forward.connect(
"right-clicked", self.on_nav_forward_clicked)
self.navhistory = NavigationHistory(self.back_forward, options)
self.spinner = Gtk.Spinner()
self.all_views = {}
self.view_to_pane = {}
self._globalise_instance()
def __init__(self, pathname=None, cache=None):
GObject.GObject.__init__(self)
# initialize at creation time to avoid spurious AttributeError
self._use_agent = False
self._use_axi = False
if pathname is None:
pathname = softwarecenter.paths.XAPIAN_PATH
self._db_pathname = pathname
if cache is None:
cache = get_pkg_info()
self._aptcache = cache
self._additional_databases = []
# the xapian values as read from /var/lib/apt-xapian-index/values
self._axi_values = {}
# we open one db per thread, thread names are reused eventually
# so no memory leak
self._db_per_thread = {}
self._parser_per_thread = {}
self._axi_stamp_monitor = None
def _gio_screenshots_json_download_complete_cb(self, source, result, path):
try:
res, content, etag = source.load_contents_finish(result)
except GObject.GError:
# ignore read errors, most likely transient
return
if content is not None:
try:
content = json.loads(content)
except ValueError as e:
LOG.error("can not decode: '%s' (%s)" % (content, e))
content = None
if isinstance(content, dict):
# a list of screenshots as listsed online
screenshot_list = content['screenshots']
else:
# fallback to a list of screenshots as supplied by the axi
screenshot_list = []
# save for later and emit
self._screenshot_list = self._sort_screenshots_by_best_version(
screenshot_list)
self.emit("screenshots-available", self._screenshot_list)
def __init__(self, cache, db):
"""
Init a AppEnquire object
:Parameters:
- `cache`: apt cache (for stuff like the overlay icon)
- `db`: a xapian.Database that contians the applications
"""
GObject.GObject.__init__(self)
self.cache = cache
self.db = db
self.distro = get_distro()
self.search_query = SearchQuery(None)
self.nonblocking_load = True
self.sortmode = SortMethods.UNSORTED
self.nonapps_visible = NonAppVisibility.MAYBE_VISIBLE
self.limit = DEFAULT_SEARCH_LIMIT
self.filter = None
self.exact = False
self.nr_pkgs = 0
self.nr_apps = 0
self._matches = []
self.match_docids = set()