def main():
format_ = '%(asctime)s - %(levelname)s - %(name)s - %(message)s'
logging.basicConfig(level=logging.INFO, format=format_)
logger = logging.getLogger(__name__)
if geteuid() == 0:
logger.error("Running eduVPN client as root is not supported (yet)")
exit(1)
GObject.threads_init()
if have_dbus():
import dbus.mainloop.glib
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
# import this later so the logging is properly configured
from eduvpn.ui import EduVpnApp
edu_vpn_app = EduVpnApp()
edu_vpn_app.run()
Gtk.main()
python类threads_init()的实例源码
def __init__(self, device_id=None):
"""Default initialiser.
1. Initialises the program loop using ``GObject``.
2. Registers the Application on the D-Bus.
3. Initialises the list of services offered by the application.
"""
# Initialise the loop that the application runs in
GObject.threads_init()
dbus.mainloop.glib.threads_init()
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
self.mainloop = GObject.MainLoop()
# Initialise the D-Bus path and register it
self.bus = dbus.SystemBus()
self.path = '/ukBaz/bluezero/application{}'.format(id(self))
self.bus_name = dbus.service.BusName('ukBaz.bluezero', self.bus)
dbus.service.Object.__init__(self, self.bus_name, self.path)
# Initialise services within the application
self.services = []
self.dongle = adapter.Adapter(device_id)
def run(self):
GObject.threads_init()
dbus.mainloop.glib.threads_init()
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus_name = dbus.service.BusName(
"com.spoppy",
dbus.SessionBus()
)
super(SpoppyDBusService, self).__init__(
bus_name, "/com/spoppy"
)
self._loop = GObject.MainLoop()
while self.running:
try:
logger.debug('Starting dbus loop')
self._loop.run()
except KeyboardInterrupt:
logger.debug('Loop interrupted, will restart')
def __init__(self, config=None):
self.config_server_url = config["url"]
self.port = config["port"]
self.python = config["python"]
self.receive = config["receive"]
self.name = config["name"]
self.key = config["key"]
self.record = config["record"]
self.upnp_client = miniupnpc.UPnP()
self.connection_attempts = 0
self.receive_from_ip = config["receive_from_ip"]
self.receive_from_port = config["receive_from_port"]
atexit.register(self.cleanup)
signal.signal(signal.SIGINT, self.cleanup)
GObject.threads_init()
Gst.init(None)
def __init__( self ):
GObject.threads_init()
self.config = SafeConfigParser(allow_no_value=True)
config_base_path = os.path.expanduser('~/.config/gstation-edit')
if not os.path.isdir(config_base_path):
os.makedirs(config_base_path)
self.config_path = os.path.join(config_base_path, 'settings.cfg')
self.config.read(self.config_path)
gtk_builder_file = os.path.join('gstation_edit/resources',
'gstation-edit-one-window.ui')
self.gtk_builder = Gtk.Builder()
self.gtk_builder.add_from_file(gtk_builder_file)
self.js_sniffer = JStationSniffer(sys.argv[0])
self.midi_dlg = MidiSelectDlg(self.gtk_builder, self.config, self,
self.js_sniffer,
self.on_connected, self.quit)
self.midi_dlg.gtk_dlg.connect('destroy', self.quit)
def __init__( self ):
GObject.threads_init()
self.config = SafeConfigParser(allow_no_value=True)
config_base_path = os.path.expanduser('~/.config/gstation-edit')
if not os.path.isdir(config_base_path):
os.makedirs(config_base_path)
self.config_path = os.path.join(config_base_path, 'settings.cfg')
self.config.read(self.config_path)
gtk_builder_file = os.path.join(DATA_ROOT_DIR,
'gstation-edit-one-window.ui')
self.gtk_builder = Gtk.Builder()
self.gtk_builder.add_from_file(gtk_builder_file)
self.main_window = MainWindow(sys.argv[0], self.config, self.gtk_builder)
self.main_window.gtk_window.connect('destroy', self.quit)
def main():
GObject.threads_init()
Gst.init([])
args = get_args()
core_ip = socket.gethostbyname(args.host)
server_caps, args = get_server_conf(core_ip, args.source_id, args)
pipeline = mk_pipeline(args, server_caps, core_ip)
clock = get_clock(core_ip)
run_pipeline(pipeline, clock, args.audio_delay, args.video_delay)
def main():
if dbus.SessionBus().request_name(
'es.atareao.PomodoroIndicator') !=\
dbus.bus.REQUEST_NAME_REPLY_PRIMARY_OWNER:
print("application already running")
exit(0)
GObject.threads_init()
Gst.init(None)
Gst.init_check(None)
Notify.init('pomodoro-indicator')
Pomodoro_Indicator()
Gtk.main()
def run():
Game()
GObject.threads_init()
Gtk.main()
return 0
def __init__(self):
GObject.threads_init()
self._loop = GObject.MainLoop()
self._player = Player()
def main():
config.load()
GObject.threads_init()
main_win = MainWindow()
thread = UpdateThread(main_win)
thread.start()
signal.signal(signal.SIGINT, signal.SIG_DFL)
Gtk.main()
def __init__(self, uuid):
Budgie.Applet.__init__(self)
self.app = WsOverviewWin()
GObject.threads_init()
self.button = self.app.appbutton
self.add(self.button)
self.show_all()
def run(self):
GObject.threads_init()
Gdk.threads_init()
Gtk.main()
gtkthread.quit.set()
def do_activate(self):
GObject.threads_init()
gettext.install('easy-ebook-viewer', '/usr/share/easy-ebook-viewer/locale')
# We only allow a single window and raise any existing ones
if not self.window:
# Windows are associated with the application
# when the last one is closed the application shuts down
self.window = MainWindow(file_path=self.file_path)
self.window.connect("delete-event", self.on_quit)
self.window.set_wmclass("easy-ebook-viewer", "easy-ebook-viewer")
self.window.show_all()
if not self.window.book_loaded:
self.window.header_bar_component.hide_jumping_navigation()
Gtk.main()
def run(self):
GObject.threads_init()
self.fsmonitor.start()
self.run_tests_in_background()
gtk.main()
def initialize(self):
"""Initialize bluez DBus communication. Must be called before any other
calls are made!
"""
# Ensure GLib's threading is initialized to support python threads, and
# make a default mainloop that all DBus objects will inherit. These
# commands MUST execute before any other DBus commands!
GObject.threads_init()
dbus.mainloop.glib.threads_init()
# Set the default main loop, this also MUST happen before other DBus calls.
self._mainloop = dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
# Get the main DBus system bus and root bluez object.
self._bus = dbus.SystemBus()
self._bluez = dbus.Interface(self._bus.get_object('org.bluez', '/'),
'org.freedesktop.DBus.ObjectManager')
def initialize(self):
"""Initialize bluez DBus communication. Must be called before any other
calls are made!
"""
# Ensure GLib's threading is initialized to support python threads, and
# make a default mainloop that all DBus objects will inherit. These
# commands MUST execute before any other DBus commands!
GObject.threads_init()
dbus.mainloop.glib.threads_init()
# Set the default main loop, this also MUST happen before other DBus calls.
self._mainloop = dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
# Get the main DBus system bus and root bluez object.
self._bus = dbus.SystemBus()
self._bluez = dbus.Interface(self._bus.get_object('org.bluez', '/'),
'org.freedesktop.DBus.ObjectManager')
def run(self):
try:
time.sleep(10) # Give WiFi a chance to connect
signal.signal(signal.SIGINT, signal.SIG_DFL)
self.indicator.set_menu(self.__menu())
self.refresh_source = GObject.timeout_add(REFRESH_RATES[0] * 60 * 1000, self.__update_price)
self.__update_price()
GObject.threads_init()
gtk.main()
except Exception:
pass
def start_glib_loop(cls):
"""
If your program does not use the GLib loop, call this
first. It initializes threads, GStreamer, and starts the GLib
loop in a separate thread.
"""
GObject.threads_init()
Gst.init(None)
cls.glib_loop = GObject.MainLoop()
cls.glib_thread = threading.Thread(target=cls.glib_loop.run)
cls.glib_thread.start()
def __init__(self, obj, obj_hook):
GObject.threads_init()
Gst.init(None)
self.obj = obj
self.obj_hook = obj_hook
self.state = Gst.State.NULL
self.pipeline = Gst.Pipeline()
def __init__(self, **kwargs):
self.ui = None
GObject.threads_init()
listen()
Gtk.Application.__init__(self, application_id='com.github.geigi.cozy')
GLib.setenv("PULSE_PROP_media.role", "music", True)
import gettext
locale.bindtextdomain('cozy', localedir)
locale.textdomain('cozy')
gettext.install('cozy', localedir)
def main():
init_i18n()
# threads_init isn't required since PyGObject 3.10.2, but just in case
# we're on something ancient...
GObject.threads_init()
app = EmuApplication()
app.run(sys.argv)
def __init__(self, uuid):
Budgie.Applet.__init__(self)
self.uuid = uuid
self.connect("destroy", Gtk.main_quit)
self.settings = cw.settings
icon = Gtk.Image.new_from_icon_name(
"budgie-clockworks-panel", Gtk.IconSize.MENU
)
self.provider = Gtk.CssProvider.new()
self.provider.load_from_data(css_data.encode())
# maingrid
self.maingrid = Gtk.Grid()
self.maingrid.set_row_spacing(2)
self.maingrid.attach(Gtk.Label(" " * 10), 0, 0, 1, 1)
self.maingrid.set_column_spacing(5)
self.clocklist = {}
# create initial clock if it does not exists
currcl_data = self.read_datafile()
if currcl_data:
for cl in currcl_data:
off = cl[1]
clname = cl[2]
self.create_newclock(offset=off, clockname=clname)
else:
self.create_newclock(offset=0)
self.dashbuttonbox = Gtk.Box()
self.add_button = Gtk.Button()
self.add_button.set_relief(Gtk.ReliefStyle.NONE)
self.add_icon = Gtk.Image.new_from_icon_name(
"list-add-symbolic", Gtk.IconSize.MENU
)
self.add_button.set_image(self.add_icon)
self.search_button = Gtk.Button()
self.search_button.set_relief(Gtk.ReliefStyle.NONE)
self.search_icon = Gtk.Image.new_from_icon_name(
"system-search-symbolic", Gtk.IconSize.MENU
)
self.search_button.set_image(self.search_icon)
self.add_button.connect("clicked", self.create_newclock)
self.search_button.connect("clicked", self.run_search)
self.maingrid.attach(self.add_button, 100, 3, 1, 1)
self.maingrid.attach(self.search_button, 100, 4, 1, 1)
# throw it in popover
self.box = Gtk.EventBox()
self.box.add(icon)
self.add(self.box)
self.popover = Budgie.Popover.new(self.box)
self.popover.add(self.maingrid)
self.popover.get_child().show_all()
self.box.show_all()
self.show_all()
self.box.connect("button-press-event", self.on_press)
self.settings.connect("changed", self.update_clockcolor)
# thread
GObject.threads_init()
self.update = Thread(target=self.update_gmt)
# daemonize the thread to make the indicator stopable
self.update.setDaemon(True)
self.update.start()
def __init__(self, id, name, address, update_rate, number_of_pings,
show_indicator, show_text, is_activated=None):
"""Initialize.
:param id:
:param address:
:param update_rate:
:param number_of_pings:
:param show_indicator:
:param is_activated:
"""
# init gobject
GObject.GObject.__init__(self)
GObject.type_register(PingObject)
GObject.threads_init()
# ping object properties
self.id = id
self.name = name
if not self.name:
self.name = address
self.address = address
self.update_rate = update_rate
self.number_of_pings = number_of_pings
self.show_indicator = show_indicator
self.show_text = show_text
if is_activated is None:
self.is_activated = True
else:
self.is_activated = is_activated
self.icon = "icon_red"
if not self.is_activated:
self.icon = "icon_grey"
self.ping_warning = 50.0
# indicator menu item
self.menu_item = gtk.ImageMenuItem("Ping: " + address)
# result
self.result = PingStruct(RESULT_NO_RESPONSE, 0.0, 0.0, 0.0, 0.0)
# gtk image for menu item
self.image = gtk.Image()
self.image.set_from_file(resource.image_path("icon_gray", theme.THEME))
# ping state
self.state = "Ping: " + self.address + " initializing."
# store subprocess
self.process = None
# mutex
self.mutex = threading.Lock()
# thread
self.thread = None
self.is_running = False
# interruptable sleep function
self.stop_event = threading.Event()
# current time
self.time_last = time.time()
# signal definition returns (id, name, icon name, show_indicator)
def __init__(self):
"""Initialize the indicator.
"""
# initialize gobject
GObject.GObject.__init__(self)
GObject.type_register(AnyPingIndicator)
GObject.threads_init()
# initialize mutex
self.mutex = threading.Lock()
# icon counter to get different icon names
self.icon_count = 0
# list of icon tuples
self.list_of_icon_tuple = []
# get ping objects from config
self.ping_objects_tuple = config.ping_object_tuples
self.ping_objects = []
count = 0
for item in self.ping_objects_tuple:
self.ping_objects.append(PingObject(count,
item.name,
item.address,
item.update_rate,
item.number_of_pings,
item.show_indicator,
item.show_text,
item.is_activated))
self.ping_objects[count].set_ping_warning(config.ping_warning)
count += 1
# update list of icon tuples
self.update_list_of_icon_tuples()
# init windows variables
self.preferences_window = None
self.about_dialog = None
# check autostart
self.check_autostart()
# initialize notification
notify.init(APPINDICATOR_ID)
# initialize and build indicator menu
self.menu = gtk.Menu()
self.build_menu()
# initialize indicator
self.indicator = appindicator.Indicator.new(APPINDICATOR_ID,
resource.image_path("icon_red", theme.THEME),
appindicator.IndicatorCategory.SYSTEM_SERVICES)
self.indicator.set_status(appindicator.IndicatorStatus.ACTIVE)
self.indicator.set_menu(self.menu)
# update indicator icon
self.update_indicator_icon()
# start ping objects
self.start_ping_objects()
def gst_rtp_pipeline(test_params):
print("starting gstreamer rtp pipeline..")
GObject.threads_init()
Gst.init(None)
# setup a pipeline which will receive RTP video and decode it, calling new_gst_buffer() on every decoded frame
udpsrc = Gst.ElementFactory.make("udpsrc", None)
udpsrc.set_property('port', test_params.rtp_port)
udpsrc.set_property('caps', Gst.caps_from_string('application/x-rtp, encoding-name=H264, payload=96'))
rtph264depay = Gst.ElementFactory.make("rtph264depay", None)
h264parse = Gst.ElementFactory.make("h264parse", None)
avdec_h264 = Gst.ElementFactory.make("avdec_h264", None)
sink = Gst.ElementFactory.make("appsink", None)
pipeline = Gst.Pipeline.new("rtp-pipeline")
pipeline.add(udpsrc)
pipeline.add(rtph264depay)
pipeline.add(h264parse)
pipeline.add(avdec_h264)
pipeline.add(sink)
udpsrc.link(rtph264depay)
rtph264depay.link(h264parse)
h264parse.link(avdec_h264)
avdec_h264.link(sink)
# gst event loop/bus
loop_thread = GstMainLoopThread()
sink.set_property("emit-signals", True)
sink.connect("new-sample", loop_thread.gst_new_buffer, sink)
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", loop_thread.gst_bus_call, loop_thread.loop)
# start play back and listed to events
pipeline.set_state(Gst.State.PLAYING)
loop_thread.start()
yield loop_thread
print("stopping gstreamer rtp pipeline..")
loop_thread.loop.quit()
pipeline.set_state(Gst.State.NULL)