def initializer(self):
"""If set, the initializer function will be called after the subprocess
is started with the worker object as the first argument.
You can use this to, for example, set the process name suffix, to
distinguish between activity and workflow workers (when starting them
from the same process):
.. code-block:: python
from setproctitle import getproctitle, setproctitle
def set_worker_title(worker):
name = getproctitle()
if isinstance(worker, WorkflowWorker):
setproctitle(name + ' (WorkflowWorker)')
elif isinstance(worker, ActivityWorker):
setproctitle(name + ' (ActivityWorker)')
worker.initializer = set_worker_title
"""
try:
return self.__initializer
except AttributeError:
return lambda obj: None
python类getproctitle()的实例源码
def run(self):
if setproctitle:
oldproctitle = getproctitle()
setproctitle('[backing up %d: %s]' % (self.pk, self.friendly_name))
try:
self.run_rsync()
self.snapshot_rotate()
self.snapshot_create()
# Atomic update of size.
size = bfs.parse_backup_sizes(
self.dest_pool, self.hostgroup.name, self.friendly_name,
self.date_complete)['size']
size_mb = size[0:-6] or '0' # :P
HostConfig.objects.filter(pk=self.pk).update(
backup_size_mb=size_mb)
# Send signal that we're done.
self.signal_done(True)
except:
# Send signal that we've failed.
self.signal_done(False)
# Propagate.
raise
finally:
if setproctitle:
setproctitle(oldproctitle)
def post_worker_init(dummy_worker):
setproctitle.setproctitle(
settings.GUNICORN_WORKER_READY_PREFIX + setproctitle.getproctitle()
)
def run(self):
'''Main execute of the class'''
def cb_exit_gracefully(signum, frame):
'''Callback to exit gracefully'''
self.logger.info("Grace exit command received signum %d" % (signum))
for proc in self.current_subprocs:
if proc.poll() is None:
# Switching to a kill -9 as the nice option seems to require it.
# proc.send_signal(signal.SIGINT)
proc.terminate()
#subprocess.check_call("kill -9 " + proc.pid())
sys.exit(0)
compressor_workers = int(self.config.get("compression", "compressor_workers"))
self.logger.info("Compressor process starting up")
self.pool = ThreadPool(compressor_workers)
setproctitle("[compress] " + getproctitle())
signal.signal(signal.SIGINT, cb_exit_gracefully)
signal.signal(signal.SIGTERM, cb_exit_gracefully)
while True:
tocompress_dir = os.path.join(self.config.get(
"main", "working_directory"), "tocompress")
files = self.get_files(tocompress_dir, ".mak")
if files:
self.pool.map(self.compress_filename, files)
time.sleep(float(self.config.get(
"compression", "compression_check_interval")))
sys.exit(0)
def run(self):
'''Main class entrypoint'''
nice_level = self.config.get("consumer", "consumer_nice_level")
process = psutil.Process(os.getpid())
process.nice(int(nice_level))
setproctitle("[consumer" + self.consumer_id + "] " + getproctitle())
while not self.shutting_down:
try:
self.run_consumer()
except Exception as exe:
self.logger.error(
"Unexpected error with kafka consumer: " + str(exe))
self.logger.error(traceback.format_exc())
self.logger.error(
"Sleeping for 30 seconds before trying again")
if self.consumer != None:
self.consumer.commit()
self.consumer.close()
for part in self.partitions:
self.partitions[part].writer.close()
time.sleep(30)
# save all our offsets
self.consumer.commit()
def post_worker_init(dummy_worker):
setproctitle.setproctitle(
settings.GUNICORN_WORKER_READY_PREFIX + setproctitle.getproctitle()
)
def test_set_process_name(self, config):
consumer = KafkaConsumerBase(
'my_very_extraordinarily_elongated_topic_name',
config, ['1', '2', '3', '4', '5'])
with mock.patch(
'yelp_kafka.consumer.setproctitle',
) as mock_setproctitle:
consumer.set_process_name()
expected_name = \
'{procname}-my_very_extraordinarily_elongated_topic_name' \
'-{messages}'.format(
procname=getproctitle(),
messages=['1', '2', '3', '4', '5'],
)
mock_setproctitle.assert_called_with(expected_name)
def set_process_name(self):
"""Setup process name for consumer to include topic and
partitions to improve debuggability.
"""
process_name = '%s-%s-%s' % (getproctitle(), self.topic.decode(), self.partitions)
setproctitle(process_name)