def test_run_async_exit_code(self):
def run_with_exit_code_0(process_idx):
sys.exit(0)
def run_with_exit_code_11(process_idx):
os.kill(os.getpid(), signal.SIGSEGV)
with warnings.catch_warnings(record=True) as w:
async.run_async(4, run_with_exit_code_0)
# There should be no warnings
assert len(w) == 0
with warnings.catch_warnings(record=True) as w:
async.run_async(4, run_with_exit_code_11)
# There should be 4 warnings
assert len(w) == 4
python类SIGSEGV的实例源码
def test_issue9324(self):
# Updated for issue #10003, adding SIGBREAK
handler = lambda x, y: None
checked = set()
for sig in (signal.SIGABRT, signal.SIGBREAK, signal.SIGFPE,
signal.SIGILL, signal.SIGINT, signal.SIGSEGV,
signal.SIGTERM):
# Set and then reset a handler for signals that work on windows.
# Issue #18396, only for signals without a C-level handler.
if signal.getsignal(sig) is not None:
signal.signal(sig, signal.signal(sig, handler))
checked.add(sig)
# Issue #18396: Ensure the above loop at least tested *something*
self.assertTrue(checked)
with self.assertRaises(ValueError):
signal.signal(-1, handler)
with self.assertRaises(ValueError):
signal.signal(7, handler)
def _default_handler(signum, *args):
''' The default signal handler. Don't register with built-in
signal.signal! This needs to be used on the subprocess await
death workaround.
'''
# All valid cpython windows signals
sigs = {
signal.SIGABRT: SIGABRT,
# signal.SIGFPE: 'fpe', # Don't catch this
# signal.SIGSEGV: 'segv', # Don't catch this
# signal.SIGILL: 'illegal', # Don't catch this
signal.SIGINT: SIGINT,
signal.SIGTERM: SIGTERM,
# Note that signal.CTRL_C_EVENT and signal.CTRL_BREAK_EVENT are
# converted to SIGINT in _await_signal
}
try:
exc = sigs[signum]
except KeyError:
exc = DaemonikerSignal
_sketch_raise_in_main(exc)
def test_issue9324(self):
# Updated for issue #10003, adding SIGBREAK
handler = lambda x, y: None
checked = set()
for sig in (signal.SIGABRT, signal.SIGBREAK, signal.SIGFPE,
signal.SIGILL, signal.SIGINT, signal.SIGSEGV,
signal.SIGTERM):
# Set and then reset a handler for signals that work on windows.
# Issue #18396, only for signals without a C-level handler.
if signal.getsignal(sig) is not None:
signal.signal(sig, signal.signal(sig, handler))
checked.add(sig)
# Issue #18396: Ensure the above loop at least tested *something*
self.assertTrue(checked)
with self.assertRaises(ValueError):
signal.signal(-1, handler)
with self.assertRaises(ValueError):
signal.signal(7, handler)
def test_issue9324(self):
# Updated for issue #10003, adding SIGBREAK
handler = lambda x, y: None
checked = set()
for sig in (signal.SIGABRT, signal.SIGBREAK, signal.SIGFPE,
signal.SIGILL, signal.SIGINT, signal.SIGSEGV,
signal.SIGTERM):
# Set and then reset a handler for signals that work on windows.
# Issue #18396, only for signals without a C-level handler.
if signal.getsignal(sig) is not None:
signal.signal(sig, signal.signal(sig, handler))
checked.add(sig)
# Issue #18396: Ensure the above loop at least tested *something*
self.assertTrue(checked)
with self.assertRaises(ValueError):
signal.signal(-1, handler)
with self.assertRaises(ValueError):
signal.signal(7, handler)
def _is_sigsegv(self, return_code):
"""
Check return code against SIGSEGV
"""
if return_code == -signal.SIGSEGV:
return True
return False
def _analyze(self):
if self.signum == SIGSEGV:
self.memoryFault()
elif self.signum == SIGFPE:
self.mathError()
elif self.signum == SIGCHLD:
self.childExit()
elif self.signum == SIGABRT:
self.error = Abort()
return self.error
def _run_trace(self, stdout_file=None):
"""
accumulate a basic block trace using qemu
"""
timeout = 0.05
if len(self.binaries) > 1:
timeout = 0.25
args = ["timeout", "-k", str(timeout), str(timeout)]
args += [os.path.join(self.base_dir, "bin", "fakesingle")]
if self.use_alt_flag:
args += ["-s", self.SEED_ALT]
else:
args += ["-s", self.SEED]
args += self.binaries
with open('/dev/null', 'wb') as devnull:
stdout_f = devnull
if stdout_file is not None:
stdout_f = open(stdout_file, 'wb')
l.debug("tracing as raw input")
p = subprocess.Popen(args, stdin=subprocess.PIPE, stdout=stdout_f, stderr=devnull)
_, _ = p.communicate(self.payload)
ret = p.wait()
self.returncode = p.returncode
# did a crash occur?
if ret < 0 or ret == 139:
if abs(ret) == signal.SIGSEGV or abs(ret) == signal.SIGILL or ret == 139:
l.info("input caused a crash (signal %d) during dynamic tracing", abs(ret))
l.debug("entering crash mode")
self.crash_mode = True
if stdout_file is not None:
stdout_f.close()
def test_issue9324(self):
# Updated for issue #10003, adding SIGBREAK
handler = lambda x, y: None
for sig in (signal.SIGABRT, signal.SIGBREAK, signal.SIGFPE,
signal.SIGILL, signal.SIGINT, signal.SIGSEGV,
signal.SIGTERM):
# Set and then reset a handler for signals that work on windows
signal.signal(sig, signal.signal(sig, handler))
with self.assertRaises(ValueError):
signal.signal(-1, handler)
with self.assertRaises(ValueError):
signal.signal(7, handler)
def setup():
for sig in (signal.SIGABRT, signal.SIGILL, signal.SIGINT, signal.SIGSEGV, signal.SIGTERM):
signal.signal(sig, cleanup)
def test_issue9324(self):
# Updated for issue #10003, adding SIGBREAK
handler = lambda x, y: None
for sig in (signal.SIGABRT, signal.SIGBREAK, signal.SIGFPE,
signal.SIGILL, signal.SIGINT, signal.SIGSEGV,
signal.SIGTERM):
# Set and then reset a handler for signals that work on windows
signal.signal(sig, signal.signal(sig, handler))
with self.assertRaises(ValueError):
signal.signal(-1, handler)
with self.assertRaises(ValueError):
signal.signal(7, handler)
def test_issue9324(self):
# Updated for issue #10003, adding SIGBREAK
handler = lambda x, y: None
for sig in (signal.SIGABRT, signal.SIGBREAK, signal.SIGFPE,
signal.SIGILL, signal.SIGINT, signal.SIGSEGV,
signal.SIGTERM):
# Set and then reset a handler for signals that work on windows
signal.signal(sig, signal.signal(sig, handler))
with self.assertRaises(ValueError):
signal.signal(-1, handler)
with self.assertRaises(ValueError):
signal.signal(7, handler)
def crashes(self, signals=(signal.SIGSEGV, signal.SIGILL)):
"""
Retrieve the crashes discovered by AFL. Since we are now detecting flag
page leaks (via SIGUSR1) we will not return these leaks as crashes.
Instead, these 'crashes' can be found with the leaks function.
:param signals: list of valid kill signal numbers to override the default (SIGSEGV and SIGILL)
:return: a list of strings which are crashing inputs
"""
return self._get_crashing_inputs(signals)
def test_issue9324(self):
# Updated for issue #10003, adding SIGBREAK
handler = lambda x, y: None
for sig in (signal.SIGABRT, signal.SIGBREAK, signal.SIGFPE,
signal.SIGILL, signal.SIGINT, signal.SIGSEGV,
signal.SIGTERM):
# Set and then reset a handler for signals that work on windows
signal.signal(sig, signal.signal(sig, handler))
with self.assertRaises(ValueError):
signal.signal(-1, handler)
with self.assertRaises(ValueError):
signal.signal(7, handler)
def test_issue9324(self):
# Updated for issue #10003, adding SIGBREAK
handler = lambda x, y: None
for sig in (signal.SIGABRT, signal.SIGBREAK, signal.SIGFPE,
signal.SIGILL, signal.SIGINT, signal.SIGSEGV,
signal.SIGTERM):
# Set and then reset a handler for signals that work on windows
signal.signal(sig, signal.signal(sig, handler))
with self.assertRaises(ValueError):
signal.signal(-1, handler)
with self.assertRaises(ValueError):
signal.signal(7, handler)
def init():
# exit handlers
for sig in (SIGABRT, SIGILL, SIGINT, SIGSEGV, SIGTERM):
signal(sig, cleanup)
def main():
parser = argparse.ArgumentParser(description='Mock tool with control over its output & termination.')
parser.add_argument('--print-args', action='store_true', default=False,
help='print the (non-option) arguments')
parser.add_argument('--print-env', metavar='VAR', type=str, default=None,
help='print an environment variable')
parser.add_argument('--echo-stdin', action='store_true', default=False,
help='echo the standard input')
parser.add_argument('--to-stderr', action='store_true', default=False,
help='write to standard error instead of standard output')
parser.add_argument('--crash', action='store_true', default=False,
help='crash process after output')
parser.add_argument('--exit-code', metavar='N', type=int, default=0,
help='terminate process with given exit code (default: %(default)s)')
parser.add_argument('args', nargs='*',
help='arbitrary command line arguments')
args = parser.parse_args()
out = sys.stderr if args.to_stderr else sys.stdout
if args.print_args:
for arg in args.args:
print(arg, file=out, flush=True)
if args.print_env is not None:
print(os.getenv(args.print_env, ''), file=out, flush=True)
if args.echo_stdin:
for line in sys.stdin:
print(line, file=out, end='', flush=True)
if args.crash:
os.kill(os.getpid(), signal.SIGSEGV)
sys.exit(args.exit_code)
def _analyze(self):
if self.signum in (SIGSEGV, SIGBUS):
self.memoryFault()
elif self.signum == SIGFPE:
self.mathError()
elif self.signum == SIGCHLD:
self.childExit()
elif self.signum == SIGABRT:
self.reason = Abort()
return self.reason
def platformProcessEvent(self, event):
"""
Handle a mach exception message
"""
#if self.attaching:
#self.useptrace = True
#return self.handlePosixSignal(event)
#self.useptrace = False
# Some event states that need to be reset
self.softexc = False
threadid, excode, codes = event
# Set the thread that signaled.
self.setMeta('ThreadId', threadid)
self.setMeta('StoppedThreadId', threadid)
self.setMeta('MachException', event)
if excode == EXC_SOFTWARE:
self.softexc = True
assert( len(codes) == 2 )
assert( codes[0] == EXC_SOFT_SIGNAL )
sig = codes[1]
self.handlePosixSignal(sig)
elif excode == EXC_BAD_ACCESS:
print('exc_bad_access',repr([hex(x) for x in codes ]))
signo = signal.SIGSEGV
#if codes[0] == KERN_INVALID_ADDRESS:
#signo = signal.SIGBUS
self._fireSignal(signo)
elif excode == EXC_BAD_INSTRUCTION:
print('exc_bad_instruction',repr([hex(x) for x in codes ]))
self._fireSignal(signal.SIGILL)
elif excode == EXC_CRASH:
print('exc_crash')
print('Crash:',repr([hex(x) for x in codes]))
self._fireExit(0xffffffff)
elif excode == EXC_BREAKPOINT:
print('exc_breakpoint',codes)
self.handlePosixSignal(signal.SIGTRAP)
else:
print('Unprocessed Exception Type: %d' % excode)
self.fireNotifiers(vtrace.NOTIFY_SIGNAL)
return
def do_monitor(args):
""" Handle "monitor" mode.
"""
# If we aren't running in monitor mode, then we are done.
if not args.monitor:
return
# This is the main retry loop.
while True:
# Fork the process.
logger.info('Forking child process.')
pid = os.fork()
# If we are the child, leave this function and work.
if pid == 0:
logger.debug('We are a newly spawned child process.')
return
logger.debug('Child process spawned: %d', pid)
# Wait for the child to die. If we die first, kill the child.
atexit.register(kill_process, pid)
try:
_, exit_status = os.waitpid(pid, 0)
except KeyboardInterrupt:
break
atexit.unregister(kill_process)
# Process the exit code.
signal_number = exit_status & 0xFF
exit_code = (exit_status >> 8) & 0xFF
core_dump = bool(0x80 & signal_number)
if signal_number == 0:
logger.info('Child process exited with exit code: %d.', exit_code)
else:
logger.info('Child process exited with signal %d (core dump: %s).',
signal_number, core_dump)
retry = False
if os.WIFSIGNALED(exit_status):
if os.WTERMSIG(exit_status) == signal.SIGSEGV:
logger.error('Child process seg faulted.')
retry = True
if not retry:
break
sys.exit(0)
###############################################################################
def do_monitor(args):
""" Handle "monitor" mode.
"""
# If we aren't running in monitor mode, then we are done.
if not args.monitor:
return
# This is the main retry loop.
while True:
# Fork the process.
logger.info('Forking child process.')
pid = os.fork()
# If we are the child, leave this function and work.
if pid == 0:
logger.debug('We are a newly spawned child process.')
return
logger.debug('Child process spawned: %d', pid)
# Wait for the child to die. If we die first, kill the child.
atexit.register(kill_process, pid)
try:
_, exit_status = os.waitpid(pid, 0)
except KeyboardInterrupt:
break
atexit.unregister(kill_process)
# Process the exit code.
signal_number = exit_status & 0xFF
exit_code = (exit_status >> 8) & 0xFF
core_dump = bool(0x80 & signal_number)
if signal_number == 0:
logger.info('Child process exited with exit code: %d.', exit_code)
else:
logger.info('Child process exited with signal %d (core dump: %s).',
signal_number, core_dump)
retry = False
if os.WIFSIGNALED(exit_status):
if os.WTERMSIG(exit_status) == signal.SIGSEGV:
logger.error('Child process seg faulted.')
retry = True
if not retry:
break
sys.exit(0)
###############################################################################