def make_default_signal_map():
""" Make the default signal map for this system.
:return: A mapping from signal number to handler object.
The signals available differ by system. The map will not contain
any signals not defined on the running system.
"""
name_map = {
'SIGTSTP': None,
'SIGTTIN': None,
'SIGTTOU': None,
'SIGTERM': 'terminate',
}
signal_map = dict(
(getattr(signal, name), target)
for (name, target) in name_map.items()
if hasattr(signal, name))
return signal_map
python类SIGTSTP的实例源码
def suspend_to_background(self, suspend_group=True):
"""
(Not thread safe -- to be called from inside the key bindings.)
Suspend process.
:param suspend_group: When true, suspend the whole process group.
(This is the default, and probably what you want.)
"""
# Only suspend when the opperating system supports it.
# (Not on Windows.)
if hasattr(signal, 'SIGTSTP'):
def run():
# Send `SIGSTP` to own process.
# This will cause it to suspend.
# Usually we want the whole process group to be suspended. This
# handles the case when input is piped from another process.
if suspend_group:
os.kill(0, signal.SIGTSTP)
else:
os.kill(os.getpid(), signal.SIGTSTP)
self.run_in_terminal(run)
def suspend_to_background(self, suspend_group=True):
"""
(Not thread safe -- to be called from inside the key bindings.)
Suspend process.
:param suspend_group: When true, suspend the whole process group.
(This is the default, and probably what you want.)
"""
# Only suspend when the opperating system supports it.
# (Not on Windows.)
if hasattr(signal, 'SIGTSTP'):
def run():
# Send `SIGSTP` to own process.
# This will cause it to suspend.
# Usually we want the whole process group to be suspended. This
# handles the case when input is piped from another process.
if suspend_group:
os.kill(0, signal.SIGTSTP)
else:
os.kill(os.getpid(), signal.SIGTSTP)
self.run_in_terminal(run)
def suspend_to_background(self, suspend_group=True):
"""
(Not thread safe -- to be called from inside the key bindings.)
Suspend process.
:param suspend_group: When true, suspend the whole process group.
(This is the default, and probably what you want.)
"""
# Only suspend when the opperating system supports it.
# (Not on Windows.)
if hasattr(signal, 'SIGTSTP'):
def run():
# Send `SIGSTP` to own process.
# This will cause it to suspend.
# Usually we want the whole process group to be suspended. This
# handles the case when input is piped from another process.
if suspend_group:
os.kill(0, signal.SIGTSTP)
else:
os.kill(os.getpid(), signal.SIGTSTP)
run_in_terminal(run)
def __enter__(self):
# Make sure to cleanup GPIO afterward
if not self.__signals_trapped:
self.__signals_trapped = True
for sig in [signal.SIGQUIT, signal.SIGTERM, signal.SIGTSTP]:
if hasattr(signal.getsignal(sig), '__call__'):
deleg = signal.getsignal(sig)
def delegate(signum, stack):
self.__exit__(None, None, None)
deleg(signum, stack)
signal.signal(sig, delegate)
else:
def delegate(signum, stack):
self.__exit__(None, None, None)
signal.signal(sig, delegate)
return self
def suspend_to_background(self, suspend_group=True):
"""
(Not thread safe -- to be called from inside the key bindings.)
Suspend process.
:param suspend_group: When true, suspend the whole process group.
(This is the default, and probably what you want.)
"""
# Only suspend when the opperating system supports it.
# (Not on Windows.)
if hasattr(signal, 'SIGTSTP'):
def run():
# Send `SIGSTP` to own process.
# This will cause it to suspend.
# Usually we want the whole process group to be suspended. This
# handles the case when input is piped from another process.
if suspend_group:
os.kill(0, signal.SIGTSTP)
else:
os.kill(os.getpid(), signal.SIGTSTP)
self.run_in_terminal(run)
def suspend_to_background(self, suspend_group=True):
"""
(Not thread safe -- to be called from inside the key bindings.)
Suspend process.
:param suspend_group: When true, suspend the whole process group.
(This is the default, and probably what you want.)
"""
# Only suspend when the opperating system supports it.
# (Not on Windows.)
if hasattr(signal, 'SIGTSTP'):
def run():
# Send `SIGSTP` to own process.
# This will cause it to suspend.
# Usually we want the whole process group to be suspended. This
# handles the case when input is piped from another process.
if suspend_group:
os.kill(0, signal.SIGTSTP)
else:
os.kill(os.getpid(), signal.SIGTSTP)
self.run_in_terminal(run)
def suspend_to_background(self, suspend_group=True):
"""
(Not thread safe -- to be called from inside the key bindings.)
Suspend process.
:param suspend_group: When true, suspend the whole process group.
(This is the default, and probably what you want.)
"""
# Only suspend when the opperating system supports it.
# (Not on Windows.)
if hasattr(signal, 'SIGTSTP'):
def run():
# Send `SIGSTP` to own process.
# This will cause it to suspend.
# Usually we want the whole process group to be suspended. This
# handles the case when input is piped from another process.
if suspend_group:
os.kill(0, signal.SIGTSTP)
else:
os.kill(os.getpid(), signal.SIGTSTP)
self.run_in_terminal(run)
def handleInput(self, char):
#log.msg('handling %s' % repr(char))
if char in ('\n', '\r'):
self.escapeMode = 1
self.write(char)
elif self.escapeMode == 1 and char == options['escape']:
self.escapeMode = 2
elif self.escapeMode == 2:
self.escapeMode = 1 # so we can chain escapes together
if char == '.': # disconnect
log.msg('disconnecting from escape')
stopConnection()
return
elif char == '\x1a': # ^Z, suspend
def _():
_leaveRawMode()
sys.stdout.flush()
sys.stdin.flush()
os.kill(os.getpid(), signal.SIGTSTP)
_enterRawMode()
reactor.callLater(0, _)
return
elif char == 'R': # rekey connection
log.msg('rekeying connection')
self.conn.transport.sendKexInit()
return
elif char == '#': # display connections
self.stdio.write('\r\nThe following connections are open:\r\n')
channels = self.conn.channels.keys()
channels.sort()
for channelId in channels:
self.stdio.write(' #%i %s\r\n' % (channelId, str(self.conn.channels[channelId])))
return
self.write('~' + char)
else:
self.escapeMode = 0
self.write(char)
def _setup_signal_handlers(self):
"""Setup the signal handlers for daemon mode"""
signals = self.signals
# Ignore Terminal I/O Signals
if hasattr(signal, 'SIGTTOU'):
signals.ignore(signal.SIGTTOU)
if hasattr(signal, 'SIGTTIN'):
signals.ignore(signal.SIGTTIN)
if hasattr(signal, 'SIGTSTP'):
signals.ignore(signal.SIGTSTP)
# Ignore USR signals
if hasattr(signal, 'SIGUSR1'):
signals.ignore(signal.SIGUSR1)
if hasattr(signal, 'SIGUSR2'):
signals.ignore(signal.SIGUSR2)
def shutdown_on_signals(self, signals=None):
if signals is None:
signals = [signal.SIGHUP,
signal.SIGINT,
signal.SIGQUIT,
signal.SIGTERM,
signal.SIGTSTP]
for sig in signals:
signal.signal(sig, self._handle_signal_shutdown)
def handleInput(self, char):
#log.msg('handling %s' % repr(char))
if char in ('\n', '\r'):
self.escapeMode = 1
self.write(char)
elif self.escapeMode == 1 and char == options['escape']:
self.escapeMode = 2
elif self.escapeMode == 2:
self.escapeMode = 1 # so we can chain escapes together
if char == '.': # disconnect
log.msg('disconnecting from escape')
stopConnection()
return
elif char == '\x1a': # ^Z, suspend
def _():
_leaveRawMode()
sys.stdout.flush()
sys.stdin.flush()
os.kill(os.getpid(), signal.SIGTSTP)
_enterRawMode()
reactor.callLater(0, _)
return
elif char == 'R': # rekey connection
log.msg('rekeying connection')
self.conn.transport.sendKexInit()
return
elif char == '#': # display connections
self.stdio.write('\r\nThe following connections are open:\r\n')
channels = self.conn.channels.keys()
channels.sort()
for channelId in channels:
self.stdio.write(' #%i %s\r\n' % (channelId, str(self.conn.channels[channelId])))
return
self.write('~' + char)
else:
self.escapeMode = 0
self.write(char)
def _subproc_pre():
os.setpgrp()
signal.signal(signal.SIGTSTP, lambda n, f: signal.pause())
def ignore_sigtstp():
signal.signal(signal.SIGTSTP, signal.SIG_IGN)
def handleInput(self, char):
#log.msg('handling %s' % repr(char))
if char in ('\n', '\r'):
self.escapeMode = 1
self.write(char)
elif self.escapeMode == 1 and char == options['escape']:
self.escapeMode = 2
elif self.escapeMode == 2:
self.escapeMode = 1 # so we can chain escapes together
if char == '.': # disconnect
log.msg('disconnecting from escape')
stopConnection()
return
elif char == '\x1a': # ^Z, suspend
def _():
_leaveRawMode()
sys.stdout.flush()
sys.stdin.flush()
os.kill(os.getpid(), signal.SIGTSTP)
_enterRawMode()
reactor.callLater(0, _)
return
elif char == 'R': # rekey connection
log.msg('rekeying connection')
self.conn.transport.sendKexInit()
return
elif char == '#': # display connections
self.stdio.write('\r\nThe following connections are open:\r\n')
channels = self.conn.channels.keys()
channels.sort()
for channelId in channels:
self.stdio.write(' #%i %s\r\n' % (channelId, str(self.conn.channels[channelId])))
return
self.write('~' + char)
else:
self.escapeMode = 0
self.write(char)
def getch():
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
try:
tty.setraw(sys.stdin.fileno())
ch = sys.stdin.read(1)
# restore behavior for special things
if ch == '\x03': # ^C
raise KeyboardInterrupt
elif ch == '\x1a': # ^Z
os.kill(os.getpid(), signal.SIGTSTP)
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
return ch
def ignore_signals():
if platform.system() != 'Windows':
# ??ctrl-z/c??
signal.signal(signal.SIGTSTP, signal.SIG_IGN)
signal.signal(signal.SIGINT, signal.SIG_IGN)
# signal.SIGTSTP ?????????????? Ctrl-Z ????? Unix ?????
# signal.SIGINT ?????????????? Ctrl-C ???
# signal.SIG_IGN ?????????