def _spawn_transform_service(self):
"""Launch transform service and get pid."""
status = 0
pid = os.fork()
if pid == 0:
try:
os.setsid()
# start transform service
launcher = oslo_service.service.launch(
self.conf,
transform_service.Transform(),
workers=1)
status = launcher.wait()
except SystemExit as exc:
traceback.print_exc()
status = exc.code
except BaseException:
try:
traceback.print_exc()
except BaseException:
print("Could not print traceback")
status = 2
os._exit(status or 0)
return pid
python类fork()的实例源码
def serve_forever(self, poll_interval=0.1):
"""Fork the current process and wait for all children to finish."""
if self.prefork is None or self.prefork <= 1:
return super(_SporkMixIn, self).serve_forever(
poll_interval=poll_interval)
pids = []
for dummy in range(self.prefork):
pid = os.fork()
if not pid:
super(_SporkMixIn, self).serve_forever(
poll_interval=poll_interval)
os._exit(0)
else:
self.log.info("Forked worker %s", pid)
pids.append(pid)
self.pids = pids
for pid in self.pids:
_eintr_retry(os.waitpid, pid, 0)
def run_daemon(server, pidfile, daemonize=True):
"""Run the server as a daemon
:param server: cutlery (a Spoon or Spork)
:param pidfile: the file to keep the parent PID
:param daemonize: if True fork the processes into
a daemon.
:return:
"""
logger = logging.getLogger(server.server_logger)
if daemonize:
detach(pidfile=pidfile, logger=logger)
elif pidfile:
with open(pidfile, "w+") as pidf:
pidf.write("%s\n" % os.getpid())
try:
server.serve_forever()
finally:
try:
os.remove(pidfile)
except OSError:
pass
def shutdown(self):
"""Stops the serve_forever loop.
Blocks until the loop has finished. This must be called while
serve_forever() is running in another thread, or it will
deadlock.
"""
self.__shutdown_request = True
self.__is_shut_down.wait()
# The distinction between handling, getting, processing and
# finishing a request is fairly arbitrary. Remember:
#
# - handle_request() is the top-level call. It calls
# select, get_request(), verify_request() and process_request()
# - get_request() is different for stream or datagram sockets
# - process_request() is the place that may fork a new process
# or create a new thread to finish the request
# - finish_request() instantiates the request handler class;
# this constructor will handle the request all by itself
def process_request(self, request, client_address):
"""Fork a new subprocess to process the request."""
self.collect_children()
pid = os.fork()
if pid:
# Parent process
if self.active_children is None:
self.active_children = []
self.active_children.append(pid)
self.close_request(request) #close handle in parent process
return
else:
# Child process.
# This must never return, hence os._exit()!
try:
self.finish_request(request, client_address)
self.shutdown_request(request)
os._exit(0)
except:
try:
self.handle_error(request, client_address)
self.shutdown_request(request)
finally:
os._exit(1)
def get_command_line():
'''
Returns prefix of command line used for spawning a child process
'''
if process.current_process()._identity==() and is_forking(sys.argv):
raise RuntimeError('''
Attempt to start a new process before the current process
has finished its bootstrapping phase.
This probably means that you are on Windows and you have
forgotten to use the proper idiom in the main module:
if __name__ == '__main__':
freeze_support()
...
The "freeze_support()" line can be omitted if the program
is not going to be frozen to produce a Windows executable.''')
if getattr(sys, 'frozen', False):
return [sys.executable, '--multiprocessing-fork']
else:
prog = 'from multiprocessing.forking import main; main()'
return [_python_exe, '-c', prog, '--multiprocessing-fork']
def __init__(self, cmd, bufsize=-1):
_cleanup()
self.cmd = cmd
p2cread, p2cwrite = os.pipe()
c2pread, c2pwrite = os.pipe()
self.pid = os.fork()
if self.pid == 0:
# Child
os.dup2(p2cread, 0)
os.dup2(c2pwrite, 1)
os.dup2(c2pwrite, 2)
self._run_child(cmd)
os.close(p2cread)
self.tochild = os.fdopen(p2cwrite, 'w', bufsize)
os.close(c2pwrite)
self.fromchild = os.fdopen(c2pread, 'r', bufsize)
def __init__(self, tasks=None, workers=None, block=True, progressDialog=None, randomReseed=True, **kwds):
"""
=============== ===================================================================
**Arguments:**
tasks list of objects to be processed (Parallelize will determine how to
distribute the tasks). If unspecified, then each worker will receive
a single task with a unique id number.
workers number of worker processes or None to use number of CPUs in the
system
progressDialog optional dict of arguments for ProgressDialog
to update while tasks are processed
randomReseed If True, each forked process will reseed its random number generator
to ensure independent results. Works with the built-in random
and numpy.random.
kwds objects to be shared by proxy with child processes (they will
appear as attributes of the tasker)
=============== ===================================================================
"""
## Generate progress dialog.
## Note that we want to avoid letting forked child processes play with progress dialogs..
self.showProgress = False
if progressDialog is not None:
self.showProgress = True
if isinstance(progressDialog, basestring):
progressDialog = {'labelText': progressDialog}
from ..widgets.ProgressDialog import ProgressDialog
self.progressDlg = ProgressDialog(**progressDialog)
if workers is None:
workers = self.suggestedWorkerCount()
if not hasattr(os, 'fork'):
workers = 1
self.workers = workers
if tasks is None:
tasks = range(workers)
self.tasks = list(tasks)
self.reseed = randomReseed
self.kwds = kwds.copy()
self.kwds['_taskStarted'] = self._taskStarted
def start_process(cmd, target, env, *args):
try:
pid = os.fork()
except OSError as e:
logger.error(repr(e) + ' while fork')
raise
if pid == 0:
_close_fds()
args = list(args)
env = dict(os.environ, **env)
args.append(env)
os.execlpe(cmd, cmd, target, *args)
else:
_waitpid(pid)
def shutdown(self):
"""Stops the serve_forever loop.
Blocks until the loop has finished. This must be called while
serve_forever() is running in another thread, or it will
deadlock.
"""
self.__shutdown_request = True
self.__is_shut_down.wait()
# The distinction between handling, getting, processing and
# finishing a request is fairly arbitrary. Remember:
#
# - handle_request() is the top-level call. It calls
# select, get_request(), verify_request() and process_request()
# - get_request() is different for stream or datagram sockets
# - process_request() is the place that may fork a new process
# or create a new thread to finish the request
# - finish_request() instantiates the request handler class;
# this constructor will handle the request all by itself
def process_request(self, request, client_address):
"""Fork a new subprocess to process the request."""
self.collect_children()
pid = os.fork()
if pid:
# Parent process
if self.active_children is None:
self.active_children = set()
self.active_children.add(pid)
self.close_request(request) #close handle in parent process
return
else:
# Child process.
# This must never return, hence os._exit()!
try:
self.finish_request(request, client_address)
self.shutdown_request(request)
os._exit(0)
except:
try:
self.handle_error(request, client_address)
self.shutdown_request(request)
finally:
os._exit(1)
def __init__(self, fun, args=None, kwargs=None, nice_level=0,
child_on_start=None, child_on_exit=None):
if args is None:
args = []
if kwargs is None:
kwargs = {}
self.fun = fun
self.args = args
self.kwargs = kwargs
self.tempdir = tempdir = py.path.local.mkdtemp()
self.RETVAL = tempdir.ensure('retval')
self.STDOUT = tempdir.ensure('stdout')
self.STDERR = tempdir.ensure('stderr')
pid = os.fork()
if pid: # in parent process
self.pid = pid
else: # in child process
self.pid = None
self._child(nice_level, child_on_start, child_on_exit)
def daemonize():
# See http://www.erlenstar.demon.co.uk/unix/faq_toc.html#TOC16
if os.fork(): # launch child and...
os._exit(0) # kill off parent
os.setsid()
if os.fork(): # launch child and...
os._exit(0) # kill off parent again.
os.umask(077)
null=os.open('/dev/null', os.O_RDWR)
for i in range(3):
try:
os.dup2(null, i)
except OSError, e:
if e.errno != errno.EBADF:
raise
os.close(null)
def closed(self):
global old
log.msg('closed %s' % self)
log.msg(repr(self.conn.channels))
if not options['nocache']: # fork into the background
if os.fork():
if old:
fd = sys.stdin.fileno()
tty.tcsetattr(fd, tty.TCSANOW, old)
if (options['command'] and options['tty']) or \
not options['notty']:
signal.signal(signal.SIGWINCH, signal.SIG_DFL)
os._exit(0)
os.setsid()
for i in range(3):
try:
os.close(i)
except OSError, e:
import errno
if e.errno != errno.EBADF:
raise
def get_command_line():
'''
Returns prefix of command line used for spawning a child process
'''
if getattr(process.current_process(), '_inheriting', False):
raise RuntimeError('''
Attempt to start a new process before the current process
has finished its bootstrapping phase.
This probably means that you are on Windows and you have
forgotten to use the proper idiom in the main module:
if __name__ == '__main__':
freeze_support()
...
The "freeze_support()" line can be omitted if the program
is not going to be frozen to produce a Windows executable.''')
if getattr(sys, 'frozen', False):
return [sys.executable, '--multiprocessing-fork']
else:
prog = 'from multiprocessing.forking import main; main()'
opts = util._args_from_interpreter_flags()
return [_python_exe] + opts + ['-c', prog, '--multiprocessing-fork']
def __init__(self, cmd, bufsize=-1):
_cleanup()
self.cmd = cmd
p2cread, p2cwrite = os.pipe()
c2pread, c2pwrite = os.pipe()
self.pid = os.fork()
if self.pid == 0:
# Child
os.dup2(p2cread, 0)
os.dup2(c2pwrite, 1)
os.dup2(c2pwrite, 2)
self._run_child(cmd)
os.close(p2cread)
self.tochild = os.fdopen(p2cwrite, 'w', bufsize)
os.close(c2pwrite)
self.fromchild = os.fdopen(c2pread, 'r', bufsize)
def spawn(argv, master_read=_read, stdin_read=_read):
"""Create a spawned process."""
if type(argv) == type(''):
argv = (argv,)
pid, master_fd = fork()
if pid == CHILD:
os.execlp(argv[0], *argv)
try:
mode = tty.tcgetattr(STDIN_FILENO)
tty.setraw(STDIN_FILENO)
restore = 1
except tty.error: # This is the same as termios.error
restore = 0
try:
_copy(master_fd, master_read, stdin_read)
except (IOError, OSError):
if restore:
tty.tcsetattr(STDIN_FILENO, tty.TCSAFLUSH, mode)
os.close(master_fd)
def run(image_name, image_dir, container_dir, command):
container_id = str(uuid.uuid4())
pid = os.fork()
if pid == 0:
# This is the child, we'll try to do some containment here
try:
contain(command, image_name, image_dir, container_id,
container_dir)
except Exception:
traceback.print_exc()
os._exit(1) # something went wrong in contain()
# This is the parent, pid contains the PID of the forked process
# wait for the forked child, fetch the exit status
_, status = os.waitpid(pid, 0)
print('{} exited with status {}'.format(pid, status))
def run(image_name, image_dir, container_dir, command):
container_id = str(uuid.uuid4())
pid = os.fork()
if pid == 0:
# This is the child, we'll try to do some containment here
try:
contain(command, image_name, image_dir, container_id,
container_dir)
except Exception:
traceback.print_exc()
os._exit(1) # something went wrong in contain()
# This is the parent, pid contains the PID of the forked process
# wait for the forked child, fetch the exit status
_, status = os.waitpid(pid, 0)
print('{} exited with status {}'.format(pid, status))
def run(image_name, image_dir, container_dir, command):
container_id = str(uuid.uuid4())
pid = os.fork()
if pid == 0:
# This is the child, we'll try to do some containment here
try:
contain(command, image_name, image_dir, container_id,
container_dir)
except Exception:
traceback.print_exc()
os._exit(1) # something went wrong in contain()
# This is the parent, pid contains the PID of the forked process
# wait for the forked child, fetch the exit status
_, status = os.waitpid(pid, 0)
print('{} exited with status {}'.format(pid, status))
def run(image_name, image_dir, container_dir, command):
container_id = str(uuid.uuid4())
pid = os.fork()
if pid == 0:
# This is the child, we'll try to do some containment here
try:
contain(command, image_name, image_dir, container_id, container_dir)
except Exception:
traceback.print_exc()
os._exit(1) # something went wrong in contain()
# This is the parent, pid contains the PID of the forked process
# wait for the forked child, fetch the exit status
_, status = os.waitpid(pid, 0)
print('{} exited with status {}'.format(pid, status))
def run(image_name, image_dir, container_dir, command):
container_id = str(uuid.uuid4())
pid = os.fork()
if pid == 0:
# This is the child, we'll try to do some containment here
try:
contain(command, image_name, image_dir, container_id, container_dir)
except Exception:
traceback.print_exc()
os._exit(1) # something went wrong in contain()
# This is the parent, pid contains the PID of the forked process
# wait for the forked child, fetch the exit status
_, status = os.waitpid(pid, 0)
print('{} exited with status {}'.format(pid, status))
def init_host(self, tg, **kwargs):
LOG.info(_LI('Willing init host function.......'))
if CONF.is_all:
pid = os.fork()
if pid == 0:
child_started = False
while True:
enable_spawn = kwargs.get('enable_spawn', True)
if enable_spawn:
eventlet.spawn(self.get_all_user_all_weibo_info,
**kwargs)
child_started = True
else:
kwargs['tg'] = tg
self.get_all_user_all_weibo_info(**kwargs)
child_started = True
if not child_started:
break
os._exit(2)
LOG.debug(_LI('Started child %d' % pid))
# ?????? 14400s ????4?
def generate(self):
return textwrap.dedent("""
import pupy, os
if os.name == 'posix':
pupy.infos['daemonize']=True
if os.fork(): # launch child and...
os._exit(0) # kill off parent
os.setsid()
if os.fork(): # launch child and...
os._exit(0) # kill off parent again.
os.umask(022) # Don't allow others to write
null=os.open('/dev/null', os.O_RDWR)
for i in range(3):
try:
os.dup2(null, i)
except OSError, e:
if e.errno != errno.EBADF:
raise
os.close(null)
""")
def create_daemon(task_id, version):
pid = os.fork()
if pid == 0:
os.setsid()
sub_pid = os.fork()
if sub_pid == 0:
try:
run('supervisorctl restart corvus-agent:')
for _ in range(30):
if program_running('corvus-agent:corvus-agent-api') and \
program_running('corvus-agent:corvus-agent-task'):
break
time.sleep(1)
else:
raise TaskException('Agent updated but not running')
Task.set_status(task_id, Task.DONE)
except Exception:
Task.set_status(task_id, Task.FAILED,
reason=traceback.format_exc())
exit(0)
else:
os._exit(0)
else:
os._exit(0)
def fork_and_exec():
""" This routine forks and execs a new child process and keeps track of its PID. Before we fork,
set the current restart epoch in an env variable that processes can read if they care. """
global restart_epoch
os.environ['RESTART_EPOCH'] = str(restart_epoch)
print ("forking and execing new child process at epoch {}".format(restart_epoch))
restart_epoch += 1
child_pid = os.fork()
if child_pid == 0:
# Child process
os.execl(sys.argv[1], sys.argv[1])
else:
# Parent process
print ("forked new child process with PID={}".format(child_pid))
pid_list.append(child_pid)
def become_daemon(self, root_dir='/'):
if os.fork() != 0: # launch child and ...
os._exit(0) # kill off parent
os.setsid()
os.chdir(root_dir)
os.umask(0)
if os.fork() != 0: # fork again so we are not a session leader
os._exit(0)
sys.stdin.close()
sys.__stdin__ = sys.stdin
sys.stdout.close()
sys.stdout = sys.__stdout__ = _NullDevice()
sys.stderr.close()
sys.stderr = sys.__stderr__ = _NullDevice()
for fd in range(1024):
try:
os.close(fd)
except OSError:
pass
def _handle_child(child_socket, root_dir, in_dir, out_dir):
create_namespace()
pid = fork()
if pid != 0:
child_socket.close()
waitpid(pid, 0)
exit()
enter_namespace(root_dir, in_dir, out_dir)
socket_file = child_socket.makefile('rwb')
while True:
try:
func = cloudpickle.load(socket_file)
except EOFError:
exit()
try:
ret, err = func(), None
except Exception as e:
ret, err = None, e
data = cloudpickle.dumps((ret, err))
socket_file.write(pack('I', len(data)))
socket_file.write(data)
socket_file.flush()
def run(command):
child_pid = os.fork()
if child_pid == 0:
os.execlp(command[0], *command)
else:
while True:
try:
os.waitpid(child_pid, 0)
except OSError as error:
if error.errno == errno.ECHILD:
# No child processes.
# It has exited already.
break
elif error.errno == errno.EINTR:
# Interrupted system call.
# This happens when resizing the terminal.
pass
else:
# An actual error occurred.
raise
def create_child(*args):
parentfp, childfp = socket.socketpair()
pid = os.fork()
if not pid:
mitogen.core.set_block(childfp.fileno())
os.dup2(childfp.fileno(), 0)
os.dup2(childfp.fileno(), 1)
childfp.close()
parentfp.close()
os.execvp(args[0], args)
childfp.close()
# Decouple the socket from the lifetime of the Python socket object.
fd = os.dup(parentfp.fileno())
parentfp.close()
LOG.debug('create_child() child %d fd %d, parent %d, cmd: %s',
pid, fd, os.getpid(), Argv(args))
return pid, fd