def bytes2pixbuf(data, width=icon_size['width'], height=icon_size['height'], display_name=None):
"""
converts raw bytes into a GTK PixBug
args:
data (bytes): raw bytes
width (int): width of image
height (int): height of images
returns:
GtkPixbuf: a GTK Pixbuf
"""
loader = GdkPixbuf.PixbufLoader()
loader.set_size(width, height)
try:
loader.write(data)
loader.close()
except GLib.Error as e:
logger.error("can't process icon for {}: {}".format(display_name, str(e)))
else:
return loader.get_pixbuf()
python类Error()的实例源码
def _connect(self, device):
def cb_connect():
try:
bus = pydbus.SystemBus()
proxy = bus.get(SERVICE_NAME, device.path)
proxy.Connect()
except KeyError:
self._log.debug("The device has likely disappeared.", exc_info=True)
except GLib.Error:
self._log.debug("Connect() failed:", exc_info=True)
else:
self._log.info("Connection successful.")
self.queued_connections -= 1
if self.queued_connections == 0:
print_device(device, "Connecting")
GLib.idle_add(cb_connect)
device.connected = True
self.queued_connections += 1
def refresh_devices(self):
for a, d in self.adapter.devices.items():
try:
if a in self.ble_devices:
if not d.proxy.Connected:
del self.ble_devices[a]
else:
continue
if not d.proxy.Connected:
continue
for driver in self.drivers:
driver_devices = driver.probe(d)
for i, v in driver_devices.items():
da = self.ble_devices.setdefault(a, {})
da.setdefault(i, CompositeBleDevice()).extend(v)
except GLib.Error as e:
epart = e.message.split(':')
if epart[0] != "GDBus.Error":
raise
if not epart[1].startswith("org.bluez.Error"):
raise
emsg = ':'.join(epart[1:])
log.error("{}: {}".format(d, emsg))
def get_icon(icon_path):
"""
Generate a GdkPixbuf image
:param image: icon name or image path
:return: GdkPixbux Image
"""
try:
if "symbolic" in icon_path:
icon = (None, icon_path)
else:
icon = GdkPixbuf.Pixbuf.new_from_file(icon_path)
if icon.get_width() != 48 or icon.get_height() != 48:
icon = icon.scale_simple(48, 48, GdkPixbuf.InterpType.BILINEAR)
return icon
except GLib.Error:
return None
def update_icon_preview_cb(self, dialog, preview):
# Different widths make the dialog look really crappy as it resizes -
# constrain the width and adjust the height to keep perspective.
filename = dialog.get_preview_filename()
if filename is not None:
if os.path.isfile(filename):
try:
pixbuf = GdkPixbuf.Pixbuf.new_from_file_at_size(filename, 320, -1)
if pixbuf is not None:
preview.set_from_pixbuf(pixbuf)
self.frame.show()
return
except GLib.Error as e:
print("Unable to generate preview for file '%s' - %s\n", filename, e.message)
preview.clear()
self.frame.hide()
def _apply_css(config):
if OSDWindow.css_provider:
Gtk.StyleContext.remove_provider_for_screen(
Gdk.Screen.get_default(), OSDWindow.css_provider)
colors = {}
for x in config['osk_colors'] : colors["osk_%s" % (x,)] = config['osk_colors'][x]
for x in config['osd_colors'] : colors[x] = config['osd_colors'][x]
colors = OSDCssMagic(colors)
try:
css_file = os.path.join(get_share_path(), "osd_styles", config["osd_style"])
css = file(css_file, "r").read()
if ((Gtk.get_major_version(), Gtk.get_minor_version()) > (3, 20)):
css += OSDWindow.CSS_3_20
OSDWindow.css_provider = Gtk.CssProvider()
OSDWindow.css_provider.load_from_data((css % colors).encode("utf-8"))
Gtk.StyleContext.add_provider_for_screen(
Gdk.Screen.get_default(),
OSDWindow.css_provider,
Gtk.STYLE_PROVIDER_PRIORITY_USER)
except GLib.Error, e:
log.error("Failed to apply css with user settings:")
log.error(e)
log.error("Retrying with default values")
OSDWindow.css_provider = Gtk.CssProvider()
css_file = os.path.join(get_share_path(), "osd_styles", "Classic.gtkstyle.css")
css = file(css_file, "r").read()
if ((Gtk.get_major_version(), Gtk.get_minor_version()) > (3, 20)):
css += OSDWindow.CSS_3_20
OSDWindow.css_provider.load_from_data((css % colors).encode("utf-8"))
Gtk.StyleContext.add_provider_for_screen(
Gdk.Screen.get_default(),
OSDWindow.css_provider,
Gtk.STYLE_PROVIDER_PRIORITY_USER)
def search(self, iterc, mode):
project_dir = Bracer.get_tmp_dir()
temp_file = tempfile.NamedTemporaryFile(dir=project_dir)
buff = iterc.get_buffer()
begin, end = buff.get_bounds()
doc_text = buff.get_text(begin, end, True)
temp_file.write(doc_text.encode('utf-8'))
temp_file.seek(0)
line = iterc.get_line() + 1
column = iterc.get_line_offset()
result = None
try:
launcher = Ide.SubprocessLauncher.new(Gio.SubprocessFlags.STDOUT_PIPE)
launcher.push_argv(self.get_racer_path())
launcher.push_argv(mode)
launcher.push_argv(str(line))
launcher.push_argv(str(column))
launcher.push_argv(temp_file.name)
launcher.set_run_on_host(True)
sub_process = launcher.spawn()
success, stdout, stderr = sub_process.communicate_utf8(None, None)
if stdout:
result = stdout
except GLib.Error as e:
pass
temp_file.close()
return result
def version(self):
result = None
try:
launcher = Ide.SubprocessLauncher.new(Gio.SubprocessFlags.STDOUT_PIPE)
launcher.push_argv(self.get_racer_path())
launcher.push_argv('-V')
launcher.set_run_on_host(True)
sub_process = launcher.spawn()
success, stdout, stderr = sub_process.communicate_utf8(None, None)
if stdout:
result = stdout.replace('racer','').strip()
except GLib.Error as e:
pass
return result
def __enter__(self):
self._log.debug("Choosing the first available Bluetooth adapter and "
"starting device discovery.")
self._log.debug("The discovery filter is set to Bluetooth LE only.")
try:
self.adapter = find_adapter()
self.adapter.SetDiscoveryFilter({"Transport": pydbus.Variant("s", "le")})
self.adapter.StartDiscovery()
except GLib.Error as ex:
self._log.exception("Is the bluetooth controller powered on? "
"Use `bluetoothctl`, `power on` otherwise.")
raise ex
return self
def _on_request_interest_complete(self, interface, res, interest):
logger.info('Consumer: request interest complete: %s, %s, %s', interface, res, interest)
try:
name, final_segment, fd_list = interface.call_request_interest_finish(res)
self._set_final_segment(final_segment)
except GLib.Error as error:
# XXX actual error handeling !
# assuming TryAgain
return self.expressInterest(interest)
def _on_request_interest_complete(self, interface, res, interest):
logger.info('call complete, %s', res)
try:
name, data = interface.call_request_interest_finish(res)
except GLib.Error as error:
# XXX actual error handeling !
# assuming TryAgain
return self.expressInterest(interest)
self.emit('data', interest, data)
self.emit('complete')
def _get_thumb_path(self):
result = None
if (
self.kind != HistoryItemKind.FILE and
self.kind != HistoryItemKind.IMAGE
): return result
filename = os.path.expanduser(self._raw)
if not os.path.exists(filename): return result
uri = 'file://%s' % filename
file_ = Gio.file_new_for_uri(uri)
try:
info = file_.query_info(
'standard::content-type,thumbnail::path',
Gio.FileQueryInfoFlags.NONE
)
path = info.get_attribute_byte_string('thumbnail::path')
self._content_type = info.get_content_type()
is_image = self._content_type.startswith('image')
if path:
result = path
elif is_image:
try:
GdkPixbuf.Pixbuf.new_from_file_at_scale(
filename,
80,
80,
False
)
except GLib.Error:
pass
else:
result = filename
else:
pass
except GLib.Error:
pass
finally:
return result
def __init__(self):
try:
repos_obj = Gio.File.new_for_uri('resource:///org/gnome/IconRequests/repos.json')
self._repositories = loads(str(repos_obj.load_contents(None)[1].decode("utf-8")))
self._get_supported_themes()
except GLib.Error:
logging.error("Error loading repository database file. Exiting")
exit()
def uri_scheme_deny(self, request):
# FIXME
# .. image:: http://example.org/foo.jpg
# GLib-GObject-WARNING **: invalid cast from 'WebKitSoupRequestGeneric' to 'SoupRequestHTTP'
# libsoup-CRITICAL **: soup_request_http_get_message: assertion 'SOUP_IS_REQUEST_HTTP (http)' failed
# this is not called!
log.debug("uri scheme denied " + request.get_uri())
err = GLib.Error("load cancelled: extern")
request.finish_error(err)
def load_img(self, uri, request):
log.debug("load image " + uri)
try:
stream = Gio.file_new_for_path(uri).read()
request.finish(stream, -1, None)
except GLib.Error:
stream = Gio.MemoryInputStream.new_from_data("file not found".encode())
request.finish(stream, -1, None)
def image_from_path(path, size=48, image=None):
gimage = Gtk.Image() if image is None else image
try:
pixbuf = GdkPixbuf.Pixbuf.new_from_file_at_scale(path, size, size, True)
gimage.set_from_pixbuf(pixbuf)
except GLib.Error:
pass
return gimage
def process_deployment(self, base):
"""
Check for deployments, download them, verify checksum and trigger
RAUC install operation.
"""
if self.action_id is not None:
self.logger.info('Deployment is already in progress')
return
# retrieve action id and resource parameter from URL
deployment = base['_links']['deploymentBase']['href']
match = re.search('/deploymentBase/(.+)\?c=(.+)$', deployment)
action_id, resource = match.groups()
self.logger.info('Deployment found for this target')
# fetch deployment information
deploy_info = await self.ddi.deploymentBase[action_id](resource)
try:
chunk = deploy_info['deployment']['chunks'][0]
except IndexError:
# send negative feedback to HawkBit
status_execution = DeploymentStatusExecution.closed
status_result = DeploymentStatusResult.failure
msg = 'Deployment without chunks found. Ignoring'
await self.ddi.deploymentBase[action_id].feedback(
status_execution, status_result, [msg])
raise APIError(msg)
try:
artifact = chunk['artifacts'][0]
except IndexError:
# send negative feedback to HawkBit
status_execution = DeploymentStatusExecution.closed
status_result = DeploymentStatusResult.failure
msg = 'Deployment without artifacts found. Ignoring'
await self.ddi.deploymentBase[action_id].feedback(
status_execution, status_result, [msg])
raise APIError(msg)
# download artifact, check md5 and report feedback
download_url = artifact['_links']['download-http']['href']
md5_hash = artifact['hashes']['md5']
self.logger.info('Starting bundle download')
await self.download_artifact(action_id, download_url, md5_hash)
# download successful, start install
self.logger.info('Starting installation')
try:
self.action_id = action_id
# do not interrupt install call
await asyncio.shield(self.install())
except GLib.Error as e:
# send negative feedback to HawkBit
status_execution = DeploymentStatusExecution.closed
status_result = DeploymentStatusResult.failure
await self.ddi.deploymentBase[action_id].feedback(
status_execution, status_result, [str(e)])
raise APIError(str(e))
def uri_scheme_file(self, request):
if log.isEnabledFor(logging.INFO):
self.time_start = datetime.datetime.now()
uri = request.get_uri()
log.debug("----------")
log.debug("loading")
log.debug("URI " + uri)
log.debug("state " + str(self.load_state))
# handle non ascii
uri = urllib.request.unquote(uri)
# handle spaces in links
uri = uri.replace(rechar, " ")
uri, ext = uri2path(uri, os.path.dirname(self.current_file), startdir)
log.debug("URI " + uri)
if self.load_state == 0:
if uri.endswith(".rst") and not ext:
if self.tvbuffer.get_modified():
log.debug("cancel due to modified")
err = GLib.Error("load cancelled: open file modified")
request.finish_error(err)
self.saved_request = request.get_uri()
self.info.set_reveal_child(True)
self.info_box_button_ok.grab_focus()
return
self.load_rst(uri, request)
self.state_file.set_label("")
self.state.set_label("")
return
self.open_uri(uri)
err = GLib.Error("load cancelled: file opened externally")
request.finish_error(err)
return
if self.load_state == 1:
(typ, enc) = mimetypes.guess_type(uri)
log.debug(typ)
if typ and typ.startswith("image"):
self.load_img(uri, request)
return