def worker(config_uri):
""" Console entry script that starts a worker process
"""
# TODO: import spacy's model to share it between workers
pyramid_env = bootstrap(config_uri)
# this conflicts with normal worker output
# TODO: solve logging for the console
# Setup logging to allow log output from command methods
# from pyramid.paster import setup_logging
# setup_logging(config_uri)
try:
qs = ['default']
conn = redis_connection()
with Connection(conn):
w = Worker(qs)
w.work()
finally:
pyramid_env['closer']()
python类Worker()的实例源码
def __init__(self, queues=None, *args, **kwargs):
u'''
Constructor.
Accepts the same arguments as the constructor of
``rq.worker.Worker``. However, the behavior of the ``queues``
parameter is different.
:param queues: The job queue(s) to listen on. Can be a string
with the name of a single queue or a list of queue names.
If not given then the default queue is used.
'''
queues = queues or [DEFAULT_QUEUE_NAME]
queues = [get_queue(q) for q in ensure_list(queues)]
rq.worker.logger.setLevel(logging.INFO)
super(Worker, self).__init__(queues, *args, **kwargs)
def perform_job(self, *args, **kwargs):
result = super(Worker, self).perform_job(*args, **kwargs)
# rq.Worker.main_work_horse does a hard exit via os._exit directly
# after its call to perform_job returns. Hence here is the correct
# location to clean up.
try:
meta.Session.remove()
except Exception:
log.exception(u'Error while closing database session')
try:
meta.engine.dispose()
except Exception:
log.exception(u'Error while disposing database engine')
return result
# adapted from ckanext.datastore.backend.postgres
def worker():
with Connection(Redis("jobqueue.local")):
qs = sys.argv[1:] or ['default']
print("foo")
w = Worker(qs)
w.work()
def get_worker(*queue_names):
"""
Returns a RQ worker for all queues or specified ones.
"""
queues = get_queues(*queue_names)
return Worker(queues,
connection=queues[0].connection,
exception_handlers=get_exception_handlers() or None)
def main():
"""
Init workers
"""
parser = argparse.ArgumentParser()
parser.add_argument('--queues', nargs='+', type=unicode, dest='queues', required=True)
args = parser.parse_args()
with Connection(connection=StrictRedis(**settings.REDIS)):
qs = map(Queue, args.queues) or [Queue()]
worker = Worker(qs)
worker.work()
def main():
with Connection(redis_connection):
worker = Worker(Queue('default'))
worker.work()
def request_stop(self, *args, **kwargs):
"""When SIGINT is sent to the worker (eg, if the Supervisor process
group is restarted), immediately fail the running job and stop the
worker. This avoids a scenario in which the worker gets shut down but
not unregistered in Redis, causing it to get "marooned" in the admin.
"""
job = self.get_current_job()
if job:
self.handle_job_failure(job)
self.failed_queue.quarantine(job, 'Worker shutdown.')
self.request_force_stop(*args, **kwargs)
def main(config):
global worker_config
worker_config = config
listen = config["listen"].values()
queue_dsn = config["queue"]["dsn"]
conn = redis.from_url(queue_dsn)
with Connection(conn):
worker = Worker(map(Queue, listen))
worker.work()
def main(config=None):
listen = config["listen"].values()
queue_dsn = config["queue"]["dsn"]
conn = redis.from_url(queue_dsn)
with Connection(conn):
worker = Worker(map(Queue, listen))
worker.work()
def register_birth(self, *args, **kwargs):
result = super(Worker, self).register_birth(*args, **kwargs)
names = [remove_queue_name_prefix(n) for n in self.queue_names()]
names = u', '.join(u'"{}"'.format(n) for n in names)
log.info(u'Worker {} (PID {}) has started on queue(s) {} '.format(
self.key, self.pid, names))
return result
def execute_job(self, job, *args, **kwargs):
# We shut down all database connections and the engine to make sure
# that they are not shared with the child process and closed there
# while still being in use in the main process, see
#
# https://github.com/ckan/ckan/issues/3365
#
# Note that this rolls back any non-committed changes in the session.
# Both `Session` and `engine` automatically re-initialize themselve
# when they are used the next time.
log.debug(u'Disposing database engine before fork')
meta.Session.remove()
meta.engine.dispose()
# The original implementation performs the actual fork
queue = remove_queue_name_prefix(job.origin)
log.info(u'Worker {} starts job {} from queue "{}"'.format(
self.key, job.id, queue))
# HACK
# for plugin in plugins.PluginImplementations(plugins.IForkObserver):
# plugin.before_fork()
_dispose_engines()
result = super(Worker, self).execute_job(job, *args, **kwargs)
log.info(u'Worker {} has finished job {} from queue "{}"'.format(
self.key, job.id, queue))
return result
def register_death(self, *args, **kwargs):
result = super(Worker, self).register_death(*args, **kwargs)
log.info(u'Worker {} (PID {}) has stopped'.format(self.key, self.pid))
return result
def handle_exception(self, job, *exc_info):
log.exception(u'Job {} on worker {} raised an exception: {}'.format(
job.id, self.key, exc_info[1]))
return super(Worker, self).handle_exception(job, *exc_info)
def main():
with Connection(Redis(settings['redis server'])):
qs = ['default']
w = Worker(qs)
w.work()
def runworker(app):
REDIS_HOST = app.config['REDIS_HOST']
REDIS_PORT = app.config['REDIS_PORT']
REDIS_DB = app.config['REDIS_DB']
QUEUES = app.config['QUEUES']
redis_conn = Connection(Redis(REDIS_HOST,
REDIS_PORT,
REDIS_DB))
with redis_conn:
w = Worker(QUEUES)
w.work()
def launch_rq_worker() -> None:
"""
Blocking function to launch a worker using Python RQ's internal API
"""
with Connection():
w = Worker(
get_available_rq_worker_name()
)
w.work()
def run_worker():
"""Initializes a slim rq task queue."""
listen = ['default']
conn = Redis(
host=app.config['RQ_DEFAULT_HOST'],
port=app.config['RQ_DEFAULT_PORT'],
db=0,
password=app.config['RQ_DEFAULT_PASSWORD'])
with Connection(conn):
worker = Worker(map(Queue, listen))
worker.work()
def work():
print("Hello from the worker side.")
with Connection(REDIS):
worker = Worker(map(Queue, QUEUES))
worker.work()
def start_worker(queue_name):
print "starting worker '{}'...".format(queue_name)
with Connection(redis_rq_conn):
worker = Worker(Queue(queue_name), exc_handler=failed_job_handler)
worker.work()
def run_worker():
"""Initializes a slim rq task queue."""
listen = ['default']
conn = Redis(
host=app.config['RQ_DEFAULT_HOST'],
port=app.config['RQ_DEFAULT_PORT'],
db=0,
password=app.config['RQ_DEFAULT_PASSWORD'])
with Connection(conn):
worker = Worker(map(Queue, listen))
worker.work()