def init_workers(self, tornado_app):
"""
For Tornado's application *tornado_app* create workers instances
and add them into list of the management command processes.
"""
interface = tornado_app.settings['interface']
name, processes, host, port = (
interface.name, interface.processes,
interface.host, interface.port)
if processes <= 0:
processes = tornado.process.cpu_count()
self.logger.info(
"Init %d worker(s) for interface '%s' (%s:%d)",
processes, name, host, port)
sockets = tornado.netutil.bind_sockets(port, host)
for dummy_i in six.moves.range(processes):
self.workers.append(
Worker(
name=name, factory=get_worker_instance,
args=(tornado_app, sockets, self.pid)
)
)
python类process()的实例源码
def kill_spawned_process(self):
"""
Kill spawned process inside container.
If process of `docker exec` was killed, the spawned process inside container is
still running. So we should kill spawned process before kill `docker exec`.
"""
p = PtyProcessUnicode.spawn(['docker', 'exec', self.container_id, '/bin/sh', '-c',
'kill -1 $(cat /tmp/sh.pid.{})'.format(self.uuid)])
# wait till complete execution of command
while p.isalive():
sleep(1)
p.close()
def _open_hpx(self, comp_id, freq_id, viewer):
"""
Open the HPX image of a specified product using a sub-process
NOTE
----
Only allowed when accessing from the localhost
Parameters
----------
comp_id : str
ID of the component whose product will be checksum'ed
freq_id : int
The frequency ID of the specific product within the component.
viewer : str
The executable name or path to the FITS viewer.
Returns
-------
pid : int
ID of the sub process which opened the HPX image.
``None`` if failed to open the image.
error : str
If failed, this ``error`` saves the details, otherwise, ``None``.
"""
pid = None
error = None
if self.from_localhost:
try:
filepath = self.products.get_product_abspath(
comp_id, freq_id, ptype="hpx")
cmd = [viewer, filepath]
p = tornado.process.Subprocess(cmd)
pid = p.pid
logger.info("(PID: {0}) ".format(pid) +
"Opened HPX image: {0}".format(" ".join(cmd)))
except (ValueError, KeyError) as e:
error = str(e)
else:
error = "Action 'open' only allowed from localhost"
return (pid, error)
def stop_child(http_server, parent_pid):
"""
Tornado's callback function which checks PID of the parent process.
If PID of the parent process is changed (parent has stopped), will
stop **IOLoop**.
"""
if os.getppid() != parent_pid:
# Stop HTTP server (stop accept new requests)
http_server.stop()
# Stop IOLoop
tornado.ioloop.IOLoop.instance().add_callback(
tornado.ioloop.IOLoop.instance().stop)
def tornado_worker(tornado_app, sockets, parent_pid):
"""
Tornado worker which process HTTP requests.
"""
setproctitle.setproctitle(
"{:s}: worker {:s}".format(
tornado_app.settings['context'].config.name,
tornado_app.settings['interface'].name
)
)
tornado_app.settings['context'].config.configure_logging()
# Run HTTP server
http_server = tornado.httpserver.HTTPServer(tornado_app)
http_server.add_sockets(sockets)
# Register SIGINT handler which will stop worker
def sigint_handler(dummy_signum, dummy_frame):
"""
Stop HTTP server and IOLoop if SIGINT.
"""
# Stop HTTP server (stop accept new requests)
http_server.stop()
# Stop IOLoop
tornado.ioloop.IOLoop.instance().add_callback(
tornado.ioloop.IOLoop.instance().stop)
signal.signal(signal.SIGINT, sigint_handler)
# Register job which will stop worker if parent process PID is changed
stop_callback = tornado.ioloop.PeriodicCallback(
functools.partial(stop_child, http_server, parent_pid), 250)
stop_callback.start()
# Run IOLoop
tornado.ioloop.IOLoop.instance().start()
def command(self):
setproctitle.setproctitle(
"{:s}: master process '{:s}'".format(
self.context.config.name, " ".join(sys.argv)
))
# For each interface create workers
for tornado_app in get_tornado_apps(self.context, debug=False):
self.init_workers(tornado_app)
# Run workers
try:
start_workers(self.workers, max_restarts=100)
except KeyboardInterrupt:
pass
def get(self):
"""
Handle the READ-ONLY products manifest manipulations.
Supported actions:
- get: Get the current products manifest
- which: Locate the command/program (check whether the command/program
can be found in PATH and is executable)
- download: Download the specified product (HEALPix map / HPX image)
- open: Open the HPX image of a specified product using a sub-process
NOTE: Only allowed when accessing from the localhost
"""
action = self.get_argument("action", "get")
if action == "get":
# Get current products manifest
success = True
response = {
"manifest": self.products.manifest,
"localhost": self.from_localhost,
}
elif action == "which":
# Locate (and check) the command/program
cmd = json_decode(self.get_argument("cmd"))
cmdpath = shutil.which(cmd)
if cmdpath:
success = True
response = {
"isExecutable": True,
"cmdPath": cmdpath,
}
else:
success = False
reason = "Cannot locate the executable for: {0}".format(cmd)
elif action == "open":
# Open the HPX image of a specified product using a sub-process
comp_id = json_decode(self.get_argument("compID"))
freq_id = json_decode(self.get_argument("freqID"))
viewer = json_decode(self.get_argument("viewer"))
pid, error = self._open_hpx(comp_id, freq_id, viewer)
if pid is not None:
success = True
response = {"pid": pid}
else:
success = False
reason = error
else:
# ERROR: bad action
success = False
reason = "Bad request action: {0}".format(action)
#
if success:
logger.debug("Response: {0}".format(response))
self.set_header("Content-Type", "application/json; charset=UTF-8")
self.write(json_encode(response))
else:
logger.warning("Request failed: {0}".format(reason))
self.send_error(400, reason=reason)