def __init__(
self,
chroot_directory=None,
working_directory='/',
umask=0,
uid=None,
gid=None,
prevent_core=True,
detach_process=None,
files_preserve=[], # changed default
loggers_preserve=[], # new
pidfile=None,
stdout_logger = None, # new
stderr_logger = None, # new
#stdin, omitted!
#stdout, omitted!
#sterr, omitted!
signal_map=None,
):
self.stdout_logger = stdout_logger
self.stderr_logger = stderr_logger
self.loggers_preserve = loggers_preserve
devnull_in = open(os.devnull, 'r+')
devnull_out = open(os.devnull, 'w+')
files_preserve.extend([devnull_in, devnull_out])
daemon.DaemonContext.__init__(self,
chroot_directory = chroot_directory,
working_directory = working_directory,
umask = umask,
uid = uid,
gid = gid,
prevent_core = prevent_core,
detach_process = detach_process,
files_preserve = files_preserve,
pidfile = pidfile,
stdin = devnull_in,
stdout = devnull_out,
stderr = devnull_out,
signal_map = signal_map)
python类DaemonContext()的实例源码
def handle_daemon(self, name, daemon):
"""
Executes the daemon command.
:param str name: The name of the daemon.
:param * daemon: The daemon, i.e. object with main method.
"""
self.output = EnarkshStyle(self.input, self.output)
log = logging.getLogger('enarksh')
log.setLevel(logging.INFO)
log_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
if self.option('daemonize'):
config = Config.get()
log_file_name = os.path.join(C.HOME, config.get_enarksh_log_dir(), name + '.log')
pid_file_name = os.path.join(C.HOME, config.get_enarksh_lock_dir(), name + '.pid')
log_handler = logging.handlers.RotatingFileHandler(log_file_name,
maxBytes=config.get_enarksh_max_log_size(),
backupCount=config.get_enarksh_log_back())
log_handler.setLevel(logging.DEBUG)
log_handler.setFormatter(log_formatter)
log.addHandler(log_handler)
output = open(log_file_name, 'ab', 0)
context = DaemonContext()
context.working_directory = C.HOME
context.umask = 0o002
context.pidfile = PIDLockFile(pid_file_name, False)
context.stdout = output
context.stderr = output
context.files_preserve = [log_handler.stream]
with context:
daemon.main()
else:
log_handler = logging.StreamHandler(sys.stdout)
log_handler.setLevel(logging.DEBUG)
log_handler.setFormatter(log_formatter)
log.addHandler(log_handler)
daemon.main()
# ----------------------------------------------------------------------------------------------------------------------
def _run_job(name, config, gpu=None, prog_args=None, background=False):
import socket
import subprocess
import daemon
exper_dir = _expath(name)
runem_cmd = ([config['experiment']['prog']] +
config['experiment']['prog_args'] +
(prog_args or []))
env = os.environ
if gpu:
env['CUDA_VISIBLE_DEVICES'] = gpu
def _do_run_job():
try:
job = subprocess.Popen(runem_cmd, cwd=exper_dir, env=env,
stdin=sys.stdin, stdout=sys.stdout,
stderr=sys.stderr)
with shelve.open('.em', writeback=True) as emdb:
emdb[name] = {
'started': _tstamp(),
'status': 'running',
'pid': job.pid,
'hostname': socket.getfqdn(),
}
if gpu:
emdb[name]['gpu'] = gpu
job.wait()
with shelve.open('.em', writeback=True) as emdb:
status = 'completed' if job.returncode == 0 else 'error'
emdb[name]['status'] = status
except KeyboardInterrupt:
with shelve.open('.em', writeback=True) as emdb:
emdb[name]['status'] = 'interrupted'
finally:
with shelve.open('.em', writeback=True) as emdb:
emdb[name].pop('pid', None)
emdb[name]['ended'] = _tstamp()
if background:
curdir = osp.abspath(os.curdir)
with daemon.DaemonContext(working_directory=curdir):
_do_run_job()
else:
_do_run_job()
def worker(args):
env = os.environ.copy()
env['AIRFLOW_HOME'] = settings.AIRFLOW_HOME
# Celery worker
from airflow.executors.celery_executor import app as celery_app
from celery.bin import worker
worker = worker.worker(app=celery_app)
options = {
'optimization': 'fair',
'O': 'fair',
'queues': args.queues,
'concurrency': args.concurrency,
'hostname': args.celery_hostname,
}
if args.daemon:
pid, stdout, stderr, log_file = setup_locations("worker", args.pid, args.stdout, args.stderr, args.log_file)
handle = setup_logging(log_file)
stdout = open(stdout, 'w+')
stderr = open(stderr, 'w+')
ctx = daemon.DaemonContext(
pidfile=TimeoutPIDLockFile(pid, -1),
files_preserve=[handle],
stdout=stdout,
stderr=stderr,
)
with ctx:
sp = subprocess.Popen(['airflow', 'serve_logs'], env=env, close_fds=True)
worker.run(**options)
sp.kill()
stdout.close()
stderr.close()
else:
signal.signal(signal.SIGINT, sigint_handler)
signal.signal(signal.SIGTERM, sigint_handler)
sp = subprocess.Popen(['airflow', 'serve_logs'], env=env, close_fds=True)
worker.run(**options)
sp.kill()
def daemon_run(host="localhost", port="8080", pidfile=None, logfile=None,
keyfile='priv.key', certfile='pub.crt', cafile='ca.crt',
action="start"):
"""
Get the bottle 'run' function running in the background as a daemonized
process.
:host: The host interface to listen for connections on. Enter 0.0.0.0
to listen on all interfaces. Defaults to localhost.
:port: The host port to listen on. Defaults to 8080.
:pidfile: The file to use as the process id file. Defaults to "bottle.pid"
:logfile: The file to log stdout and stderr from bottle to. Defaults to "bottle.log"
"""
if pidfile is None:
pidfile = os.path.join(
os.getcwd(),
"bottle.pid"
)
if logfile is None:
logfile = os.path.join(
os.getcwd(),
"bottle.log"
)
if action == "start":
log = open(logfile, "w+")
context = daemon.DaemonContext(
pidfile=__locked_pidfile(pidfile),
stdout=log,
stderr=log
)
with context:
# bottle.run(host=host, port=port)
srv = SSLWSGIRefServer(host=host, port=port, keyfile=keyfile,
certfile=certfile, cafile=cafile)
bottle.run(server=srv)
else:
with open(pidfile, "r") as p:
pid = int(p.read())
os.kill(pid, signal.SIGTERM)
def worker(args):
env = os.environ.copy()
env['AIRFLOW_HOME'] = settings.AIRFLOW_HOME
# Celery worker
from airflow.executors.celery_executor import app as celery_app
from celery.bin import worker
worker = worker.worker(app=celery_app)
options = {
'optimization': 'fair',
'O': 'fair',
'queues': args.queues,
'concurrency': args.concurrency,
}
if args.daemon:
pid, stdout, stderr, log_file = setup_locations("worker", args.pid, args.stdout, args.stderr, args.log_file)
handle = setup_logging(log_file)
stdout = open(stdout, 'w+')
stderr = open(stderr, 'w+')
ctx = daemon.DaemonContext(
pidfile=TimeoutPIDLockFile(pid, -1),
files_preserve=[handle],
stdout=stdout,
stderr=stderr,
)
with ctx:
sp = subprocess.Popen(['airflow', 'serve_logs'], env=env)
worker.run(**options)
sp.kill()
stdout.close()
stderr.close()
else:
signal.signal(signal.SIGINT, sigint_handler)
signal.signal(signal.SIGTERM, sigint_handler)
sp = subprocess.Popen(['airflow', 'serve_logs'], env=env)
worker.run(**options)
sp.kill()