def __init__(self):
# Applet icon
global indicator
indicator = appindicator.Indicator.new(APPINDICATOR_ID, self.exchange_app.dogecoin.icon, appindicator.IndicatorCategory.SYSTEM_SERVICES)
indicator.set_status(appindicator.IndicatorStatus.ACTIVE)
indicator.set_label('€ *.****', '100%')
# Set the menu of the indicator (default: gtk.Menu())
indicator.set_menu(self.build_menu())
# Ctrl-C behaviour
signal.signal(signal.SIGINT, signal.SIG_DFL)
# Setup the refresh every 5 minutes
gobject.timeout_add(1000*60*5, self.exchange_app.update_price, "timeout")
# First price update within 1 second
gobject.timeout_add(1000, self.exchange_app.first_update_price, "first_update")
# Main loop
gtk.main()
python类SIG_DFL的实例源码
cryptocoin_indicator.py 文件源码
项目:cryptocoin-indicator
作者: MichelMichels
项目源码
文件源码
阅读 17
收藏 0
点赞 0
评论 0
def init_signals(self):
# reset signaling
[signal.signal(s, signal.SIG_DFL) for s in self.SIGNALS]
# init new signaling
signal.signal(signal.SIGQUIT, self.handle_quit)
signal.signal(signal.SIGTERM, self.handle_exit)
signal.signal(signal.SIGINT, self.handle_quit)
signal.signal(signal.SIGWINCH, self.handle_winch)
signal.signal(signal.SIGUSR1, self.handle_usr1)
signal.signal(signal.SIGABRT, self.handle_abort)
# Don't let SIGTERM and SIGUSR1 disturb active requests
# by interrupting system calls
if hasattr(signal, 'siginterrupt'): # python >= 2.6
signal.siginterrupt(signal.SIGTERM, False)
signal.siginterrupt(signal.SIGUSR1, False)
if hasattr(signal, 'set_wakeup_fd'):
signal.set_wakeup_fd(self.PIPE[1])
def __call__(self, *args, **kwargs):
"""Trap ``SIGTERM`` and call wrapped function."""
self._caught_signal = None
# Register handler for SIGTERM, then call `self.func`
self.old_signal_handler = signal.getsignal(signal.SIGTERM)
signal.signal(signal.SIGTERM, self.signal_handler)
self.func(*args, **kwargs)
# Restore old signal handler
signal.signal(signal.SIGTERM, self.old_signal_handler)
# Handle any signal caught during execution
if self._caught_signal is not None:
signum, frame = self._caught_signal
if callable(self.old_signal_handler):
self.old_signal_handler(signum, frame)
elif self.old_signal_handler == signal.SIG_DFL:
sys.exit(0)
def test_signal_handler(self, mock_signal, mock_os):
# pylint: disable=protected-access
mock_signal.getsignal.return_value = signal.SIG_DFL
self.handler.set_signal_handlers()
signal_handler = self.handler._signal_handler
for signum in self.signals:
mock_signal.signal.assert_any_call(signum, signal_handler)
signum = self.signals[0]
signal_handler(signum, None)
self.init_func.assert_called_once_with(*self.init_args,
**self.init_kwargs)
mock_os.kill.assert_called_once_with(mock_os.getpid(), signum)
self.handler.reset_signal_handlers()
for signum in self.signals:
mock_signal.signal.assert_any_call(signum, signal.SIG_DFL)
def init_signals(self):
# reset signaling
[signal.signal(s, signal.SIG_DFL) for s in self.SIGNALS]
# init new signaling
signal.signal(signal.SIGQUIT, self.handle_quit)
signal.signal(signal.SIGTERM, self.handle_exit)
signal.signal(signal.SIGINT, self.handle_quit)
signal.signal(signal.SIGWINCH, self.handle_winch)
signal.signal(signal.SIGUSR1, self.handle_usr1)
signal.signal(signal.SIGABRT, self.handle_abort)
# Don't let SIGTERM and SIGUSR1 disturb active requests
# by interrupting system calls
if hasattr(signal, 'siginterrupt'): # python >= 2.6
signal.siginterrupt(signal.SIGTERM, False)
signal.siginterrupt(signal.SIGUSR1, False)
def __init__(self, default_handler):
self.called = False
self.original_handler = default_handler
if isinstance(default_handler, int):
if default_handler == signal.SIG_DFL:
# Pretend it's signal.default_int_handler instead.
default_handler = signal.default_int_handler
elif default_handler == signal.SIG_IGN:
# Not quite the same thing as SIG_IGN, but the closest we
# can make it: do nothing.
def default_handler(unused_signum, unused_frame):
pass
else:
raise TypeError("expected SIGINT signal handler to be "
"signal.SIG_IGN, signal.SIG_DFL, or a "
"callable object")
self.default_handler = default_handler
def __init__(self, default_handler):
self.called = False
self.original_handler = default_handler
if isinstance(default_handler, int):
if default_handler == signal.SIG_DFL:
# Pretend it's signal.default_int_handler instead.
default_handler = signal.default_int_handler
elif default_handler == signal.SIG_IGN:
# Not quite the same thing as SIG_IGN, but the closest we
# can make it: do nothing.
def default_handler(unused_signum, unused_frame):
pass
else:
raise TypeError("expected SIGINT signal handler to be "
"signal.SIG_IGN, signal.SIG_DFL, or a "
"callable object")
self.default_handler = default_handler
def init_signals(self):
# reset signaling
[signal.signal(s, signal.SIG_DFL) for s in self.SIGNALS]
# init new signaling
signal.signal(signal.SIGQUIT, self.handle_quit)
signal.signal(signal.SIGTERM, self.handle_exit)
signal.signal(signal.SIGINT, self.handle_quit)
signal.signal(signal.SIGWINCH, self.handle_winch)
signal.signal(signal.SIGUSR1, self.handle_usr1)
signal.signal(signal.SIGABRT, self.handle_abort)
# Don't let SIGTERM and SIGUSR1 disturb active requests
# by interrupting system calls
if hasattr(signal, 'siginterrupt'): # python >= 2.6
signal.siginterrupt(signal.SIGTERM, False)
signal.siginterrupt(signal.SIGUSR1, False)
if hasattr(signal, 'set_wakeup_fd'):
signal.set_wakeup_fd(self.PIPE[1])
def __init__(self, default_handler):
self.called = False
self.original_handler = default_handler
if isinstance(default_handler, int):
if default_handler == signal.SIG_DFL:
# Pretend it's signal.default_int_handler instead.
default_handler = signal.default_int_handler
elif default_handler == signal.SIG_IGN:
# Not quite the same thing as SIG_IGN, but the closest we
# can make it: do nothing.
def default_handler(unused_signum, unused_frame):
pass
else:
raise TypeError("expected SIGINT signal handler to be "
"signal.SIG_IGN, signal.SIG_DFL, or a "
"callable object")
self.default_handler = default_handler
def closed(self):
global old
log.msg('closed %s' % self)
log.msg(repr(self.conn.channels))
if not options['nocache']: # fork into the background
if os.fork():
if old:
fd = sys.stdin.fileno()
tty.tcsetattr(fd, tty.TCSANOW, old)
if (options['command'] and options['tty']) or \
not options['notty']:
signal.signal(signal.SIGWINCH, signal.SIG_DFL)
os._exit(0)
os.setsid()
for i in range(3):
try:
os.close(i)
except OSError, e:
import errno
if e.errno != errno.EBADF:
raise
def __call__(self, *args, **kwargs):
"""Trap ``SIGTERM`` and call wrapped function."""
self._caught_signal = None
# Register handler for SIGTERM, then call `self.func`
self.old_signal_handler = signal.getsignal(signal.SIGTERM)
signal.signal(signal.SIGTERM, self.signal_handler)
self.func(*args, **kwargs)
# Restore old signal handler
signal.signal(signal.SIGTERM, self.old_signal_handler)
# Handle any signal caught during execution
if self._caught_signal is not None:
signum, frame = self._caught_signal
if callable(self.old_signal_handler):
self.old_signal_handler(signum, frame)
elif self.old_signal_handler == signal.SIG_DFL:
sys.exit(0)
def __call__(self, *args, **kwargs):
self._caught_signal = None
# Register handler for SIGTERM, then call `self.func`
self.old_signal_handler = signal.getsignal(signal.SIGTERM)
signal.signal(signal.SIGTERM, self.signal_handler)
self.func(*args, **kwargs)
# Restore old signal handler
signal.signal(signal.SIGTERM, self.old_signal_handler)
# Handle any signal caught during execution
if self._caught_signal is not None:
signum, frame = self._caught_signal
if callable(self.old_signal_handler):
self.old_signal_handler(signum, frame)
elif self.old_signal_handler == signal.SIG_DFL:
sys.exit(0)
def safe_call(function, *args, **kwargs):
"""
If we're not in the main process, sets the default signal handler
(`signal.SIG_DFL`) for the ``SIGINT`` signal before calling *function*
using the given *args* and *kwargs*. Otherwise *function* will just be
called and returned normally.
The point being to prevent loads of unnecessary tracebacks from being
printed to stdout when the user executes a :kbd:`Ctrl-C` on a running
gateone.py process.
.. note::
This function is only meant to be used to wrap calls made inside of
`MultiprocessRunner` instances.
"""
if os.getpid() != PID:
signal.signal(signal.SIGINT, signal.SIG_DFL)
return function(*args, **kwargs)
def main(self):
self.indicator = appindicator.Indicator.new(self.APPINDICATOR_ID, self.ICON_OFF,
appindicator.IndicatorCategory.SYSTEM_SERVICES)
self.indicator.set_status(appindicator.IndicatorStatus.ACTIVE)
self.indicator.set_menu(self.build_menu())
# This sets the handler for “INT” signal processing
#- the one issued by the OS when “Ctrl+C” is typed.
#The handler we assign to it is the “default” handler, which,
#in case of the interrupt signal, is to stop execution.
signal.signal(signal.SIGINT, signal.SIG_DFL) #listen to quit signal
notify.init(self.APPINDICATOR_ID)
self.update()
glib.timeout_add_seconds(self.UPDATE_FREQUENCY, self.update)
gtk.main()
def __call__(self, *args, **kwargs):
self._caught_signal = None
# Register handler for SIGTERM, then call `self.func`
self.old_signal_handler = signal.getsignal(signal.SIGTERM)
signal.signal(signal.SIGTERM, self.signal_handler)
self.func(*args, **kwargs)
# Restore old signal handler
signal.signal(signal.SIGTERM, self.old_signal_handler)
# Handle any signal caught during execution
if self._caught_signal is not None:
signum, frame = self._caught_signal
if callable(self.old_signal_handler):
self.old_signal_handler(signum, frame)
elif self.old_signal_handler == signal.SIG_DFL:
sys.exit(0)
def process(self):
""" Run the interactive shell after displaying the software version information
@raise SystemExit
"""
self._display_banner()
while 1:
try:
self.cmdloop()
except kbd_exception.CtrlD:
signal.signal(signal.SIGINT, signal.SIG_DFL)
return
except kbd_exception.CtrlC:
# cancel the current line and get a new one
pass
def __init__(self, default_handler):
self.called = False
self.original_handler = default_handler
if isinstance(default_handler, int):
if default_handler == signal.SIG_DFL:
# Pretend it's signal.default_int_handler instead.
default_handler = signal.default_int_handler
elif default_handler == signal.SIG_IGN:
# Not quite the same thing as SIG_IGN, but the closest we
# can make it: do nothing.
def default_handler(unused_signum, unused_frame):
pass
else:
raise TypeError("expected SIGINT signal handler to be "
"signal.SIG_IGN, signal.SIG_DFL, or a "
"callable object")
self.default_handler = default_handler
def do_startup(self):
Gtk.Application.do_startup(self)
signal.signal(signal.SIGINT, signal.SIG_DFL)
action = Gio.SimpleAction.new('about', None)
action.connect('activate', self.on_about)
self.add_action(action)
action = Gio.SimpleAction.new('quit', None)
action.connect('activate', self.on_quit)
self.add_action(action)
self.add_accelerator('<Primary>q', 'app.quit')
app_menu = Gio.Menu.new()
app_menu.append(_('About'), 'app.about')
app_menu.append(_('Quit'), 'app.quit')
self.set_app_menu(app_menu)
def set_blocking_signal_threshold(self, seconds, action):
"""Sends a signal if the ioloop is blocked for more than s seconds.
Pass seconds=None to disable. Requires python 2.6 on a unixy
platform.
The action parameter is a python signal handler. Read the
documentation for the python 'signal' module for more information.
If action is None, the process will be killed if it is blocked for
too long.
"""
if not hasattr(signal, "setitimer"):
logging.error("set_blocking_signal_threshold requires a signal module "
"with the setitimer method")
return
self._blocking_signal_threshold = seconds
if seconds is not None:
signal.signal(signal.SIGALRM,
action if action is not None else signal.SIG_DFL)
def init_signals(self):
# reset signaling
[signal.signal(s, signal.SIG_DFL) for s in self.SIGNALS]
# init new signaling
signal.signal(signal.SIGQUIT, self.handle_quit)
signal.signal(signal.SIGTERM, self.handle_exit)
signal.signal(signal.SIGINT, self.handle_quit)
signal.signal(signal.SIGWINCH, self.handle_winch)
signal.signal(signal.SIGUSR1, self.handle_usr1)
signal.signal(signal.SIGABRT, self.handle_abort)
# Don't let SIGTERM and SIGUSR1 disturb active requests
# by interrupting system calls
if hasattr(signal, 'siginterrupt'): # python >= 2.6
signal.siginterrupt(signal.SIGTERM, False)
signal.siginterrupt(signal.SIGUSR1, False)
if hasattr(signal, 'set_wakeup_fd'):
signal.set_wakeup_fd(self.PIPE[1])