def worker(args):
env = os.environ.copy()
env['AIRFLOW_HOME'] = settings.AIRFLOW_HOME
# Celery worker
from airflow.executors.celery_executor import app as celery_app
from celery.bin import worker
worker = worker.worker(app=celery_app)
options = {
'optimization': 'fair',
'O': 'fair',
'queues': args.queues,
'concurrency': args.concurrency,
'hostname': args.celery_hostname,
}
if args.daemon:
pid, stdout, stderr, log_file = setup_locations("worker", args.pid, args.stdout, args.stderr, args.log_file)
handle = setup_logging(log_file)
stdout = open(stdout, 'w+')
stderr = open(stderr, 'w+')
ctx = daemon.DaemonContext(
pidfile=TimeoutPIDLockFile(pid, -1),
files_preserve=[handle],
stdout=stdout,
stderr=stderr,
)
with ctx:
sp = subprocess.Popen(['airflow', 'serve_logs'], env=env, close_fds=True)
worker.run(**options)
sp.kill()
stdout.close()
stderr.close()
else:
signal.signal(signal.SIGINT, sigint_handler)
signal.signal(signal.SIGTERM, sigint_handler)
sp = subprocess.Popen(['airflow', 'serve_logs'], env=env, close_fds=True)
worker.run(**options)
sp.kill()
评论列表
文章目录