def enqueue_periodic_jobs(queue_name):
"""Enqueue all PYBOSSA periodic jobs."""
from pybossa.core import sentinel
from rq import Queue
redis_conn = sentinel.master
jobs_generator = get_periodic_jobs(queue_name)
n_jobs = 0
queue = Queue(queue_name, connection=redis_conn)
for job in jobs_generator:
if (job['queue'] == queue_name):
n_jobs += 1
queue.enqueue_call(func=job['name'],
args=job['args'],
kwargs=job['kwargs'],
timeout=job['timeout'])
msg = "%s jobs in %s have been enqueued" % (n_jobs, queue_name)
return msg
python类Queue()的实例源码
def tester_tasker(self):
if int(time.time())-int(self.time_log["tester"]) >= 1800:
logging.info('start enqueue tester')
#losd json
if os.path.exists('.topics_tester.json'):
with open('.topics_tester.json','r') as f:
tmp_topics=json.load(f)
else:
tmp_topics=list()
#main
sql="SELECT ID FROM TOPIC WHERE (time - created) < 345600 AND ID NOT IN (SELECT T_ID FROM STATUS) AND (STRFTIME('%s','now') - created) > 1209600;"
sleep_time=20
self.SQ.cursor.execute(sql)
topic_ids=[x[0] for x in self.SQ.cursor.fetchall()]
q=Queue('tester',connection=self.redis_conn)
for topic_id in topic_ids:
if topic_id not in tmp_topics:
q.enqueue(topic_tester.start,topic_id, sleep_time)
tmp_topics.append(topic_id)
#end
tmp_topics=list(set(tmp_topics))
with open('.topics_tester.json','w') as f:
json.dump(tmp_topics,f)
self.time_log["tester"]=str(int(time.time()))
return
def main(msg, config, silent=False):
"""
Job enqueue
:param msg:str
:param config:
:return:
"""
queue_dsn = config["queue"]["dsn"]
redis_conn = redis.from_url(queue_dsn)
q = Queue('low', connection=redis_conn)
ret = q.enqueue(push_messenger, msg, result_ttl=60)
if silent is True:
return ret
else:
print ret
def get_tasks_on_queue(queue_name):
q = Queue(queue_name, connection=Redis(host=envget('redis.host')))
jobs = q.jobs
tasks = []
for job in jobs:
task = {"date_enqueued": str(
process_date(job.to_dict().get('enqueued_at')))}
'''
to_dict() returns something like this:
{u'origin': u'task_no_vt', u'status': u'queued', u'description': u"Api.task.generic_task('N7UFZ56FQDITJ34F40TZB50XAWVNW575QGIL4YEC')", u'created_at': '2017-03-03T20:14:47Z', u'enqueued_at': '2017-03-03T20:14:47Z', u'timeout': 31536000, u'data': '\x80\x02(X\x15\x00\x00\x00Api.task.generic_taskq\x01NU(N7UFZ56FQDITJ34F40TZB50XAWVNW575QGIL4YECq\x02\x85q\x03}q\x04tq\x05.'}
'''
task_id = re.search('[A-Z0-9]{40}', job.to_dict().get('description'))
if task_id is None:
continue
task['task_id'] = task_id.group(0)
task['hashes'] = count_valid_hashes_in_task(task['task_id'])
tasks.append(task)
return tasks
def add_task(requested):
task_id = id_generator(40)
if requested.get('document_name') is None:
requested["document_name"] = ""
response = {"requested": requested,
"date_enqueued": datetime.datetime.now(),
"task_id": task_id}
save(response)
if requested.get('vt_samples'):
queue_name = "task_private_vt" # task needs a private VT api
elif requested.get('vt_av') and not requested.get('vt_samples'):
queue_name = "task_public_vt" # task needs a public VT api
else:
queue_name = "task_no_vt" # task doesn't need VT
q = Queue(queue_name, connection=Redis(host=envget('redis.host')))
job = q.enqueue('Api.task.generic_task', args=(task_id,), timeout=31536000)
return task_id
def get_queue(name=DEFAULT_QUEUE_NAME):
u'''
Get a job queue.
The job queue is initialized if that hasn't happened before.
:param string name: The name of the queue. If not given then the
default queue is returned.
:returns: The job queue.
:rtype: ``rq.queue.Queue``
.. seealso:: :py:func:`get_all_queues`
'''
global _queues
fullname = add_queue_name_prefix(name)
try:
return _queues[fullname]
except KeyError:
log.debug(u'Initializing background job queue "{}"'.format(name))
redis_conn = _connect()
queue = _queues[fullname] = rq.Queue(fullname, connection=redis_conn)
return queue
def main():
redis_conn = Redis()
q = Queue(connection=redis_conn)
if not os.path.isdir(BLOB_DIR):
os.mkdir(BLOB_DIR)
try:
num_blobs = int(sys.argv[1])
except IndexError:
num_blobs = 1
blobs = []
for i in range(num_blobs):
blob_contents = os.urandom(BLOB_SIZE)
blob_hash = hashlib.sha384(blob_contents).hexdigest()
blob_path = os.path.join(BLOB_DIR, blob_hash)
with open(blob_path, 'wb') as f:
f.write(blob_contents)
blobs.append(blob_hash)
for blob_hash in blobs:
q.enqueue(process_blob, blob_hash, 1)
def test_max_retries_job(self):
"""Test if the job will fail after max_retries limit is reached"""
http_requests = setup_mock_redmine_server(max_failures=2)
args = {
'url': REDMINE_URL,
'api_token': 'AAAA',
'max_issues': 3
}
q = rq.Queue('queue', async=False)
with self.assertRaises(requests.exceptions.HTTPError):
job = q.enqueue(execute_perceval_job,
backend='redmine', backend_args=args,
qitems='items', task_id='mytask',
max_retries=1)
self.assertEqual(job.is_failed, True)
def test_job_caching_not_supported(self):
"""Check if it fails when caching is not supported"""
args = {
'uri': 'http://example.com/',
'gitpath': os.path.join(self.dir, 'data/git_log.txt')
}
q = rq.Queue('queue', async=False)
with self.assertRaises(AttributeError):
job = q.enqueue(execute_perceval_job,
backend='git', backend_args=args,
qitems='items', task_id='mytask',
cache_path=self.tmp_path)
with self.assertRaises(AttributeError):
job = q.enqueue(execute_perceval_job,
backend='git', backend_args=args,
qitems='items', task_id='mytask',
fetch_from_cache=True)
def handle(self, message):
if 'key' not in message:
return
if message['key'] != 'set_variable':
return
group_id = message['group_id']
train_id = message['train_id']
parallel_count = message['parallel_count']
variables = message['variables']
tid = message['transaction_id']
group_dict = self.group_dict
if group_id not in group_dict:
rqq = Queue(connection=self.raw_conn)
group = Group(group_id, train_id, parallel_count, variables, rqq)
group_dict[group_id] = group
group = group_dict[group_id]
cur_sum_count = 1
group.add_message((tid, cur_sum_count))
def test_set_rq():
"""
When Redis params are provided, set an RQ instance.
"""
config = get_config('read/set-rq')
# Should set an instance.
assert isinstance(config.rq, Queue)
args = config.rq.connection.connection_pool.connection_kwargs
# Should use config args.
assert args['host'] == 'host'
assert args['port'] == 1337
assert args['db'] == 1
def call_fn(fn):
kwargs = {}
for name in request.args:
kwargs[name] = request.args.get(name)
q = Queue(fn, connection=redis_conn)
job = q.enqueue('handler.handler', kwargs=kwargs)
image_id = images[fn]
subprocess.check_call(['docker', 'run', '--network', 'scrappyserverless_default', image_id])
return repr(job.result)
# PLAN: Build docker images from function code
# do not delete
def enqueue_job(job):
"""Enqueues a job."""
from pybossa.core import sentinel
from rq import Queue
redis_conn = sentinel.master
queue = Queue(job['queue'], connection=redis_conn)
queue.enqueue_call(func=job['name'],
args=job['args'],
kwargs=job['kwargs'],
timeout=job['timeout'])
return True
def check_failed():
"""Check the jobs that have failed and requeue them."""
from rq import Queue, get_failed_queue, requeue_job
from pybossa.core import sentinel
fq = get_failed_queue()
job_ids = fq.job_ids
count = len(job_ids)
FAILED_JOBS_RETRIES = current_app.config.get('FAILED_JOBS_RETRIES')
for job_id in job_ids:
KEY = 'pybossa:job:failed:%s' % job_id
job = fq.fetch_job(job_id)
if sentinel.slave.exists(KEY):
sentinel.master.incr(KEY)
else:
ttl = current_app.config.get('FAILED_JOBS_MAILS')*24*60*60
sentinel.master.setex(KEY, ttl, 1)
if int(sentinel.slave.get(KEY)) < FAILED_JOBS_RETRIES:
requeue_job(job_id)
else:
KEY = 'pybossa:job:failed:mailed:%s' % job_id
if (not sentinel.slave.exists(KEY) and
current_app.config.get('ADMINS')):
subject = "JOB: %s has failed more than 3 times" % job_id
body = "Please, review the background jobs of your server."
body += "\n This is the trace error\n\n"
body += "------------------------------\n\n"
body += job.exc_info
mail_dict = dict(recipients=current_app.config.get('ADMINS'),
subject=subject, body=body)
send_mail(mail_dict)
ttl = current_app.config.get('FAILED_JOBS_MAILS')*24*60*60
sentinel.master.setex(KEY, ttl, True)
if count > 0:
return "JOBS: %s You have failed the system." % job_ids
else:
return "You have not failed the system"
def test_file_drop_GZHandler():
""" Tests the GZZHandler for file drop """
a = file_drop.GZHandler()
class Event:
""" Creates a mock event object for tests """
event_type = None
src_path = None
is_directory = None
q = None
r = None
def __init__(self, event_type, src_path, is_directory):
""" initializes necessary variables for the object """
self.event_type = event_type
self.src_path = src_path
self.is_directory = is_directory
self.q = Queue(connection=Redis(host='localhost'),
default_timeout=86400)
self.r = StrictRedis(host='localhsot', port=6379, db=0)
b = Event("created", "/dev/null", False)
c = Event("modified", "/etc/hosts", False)
a.process(b)
a.process(b)
a.process(b)
a.on_created(b)
a.on_modified(c)
def make_queue(name='default'):
queue = Queue(connection=redis_connection())
return queue
def get_failed_queue_index(name='default'):
"""
Returns the position of FailedQueue for the named queue in QUEUES_LIST
"""
# Get the index of FailedQueue for 'default' Queue in QUEUES_LIST
queue_index = None
connection = get_connection(name)
connection_kwargs = connection.connection_pool.connection_kwargs
for i in range(0, 100):
q = get_queue_by_index(i)
if q.name == 'failed' and q.connection.connection_pool.connection_kwargs == connection_kwargs:
queue_index = i
break
return queue_index
def get_queue_index(name='default'):
"""
Returns the position of Queue for the named queue in QUEUES_LIST
"""
queue_index = None
connection = get_connection(name)
connection_kwargs = connection.connection_pool.connection_kwargs
for i in range(0, 100):
q = get_queue_by_index(i)
if q.name == name and q.connection.connection_pool.connection_kwargs == connection_kwargs:
queue_index = i
break
return queue_index
def test_get_current_job(self):
"""
Ensure that functions using RQ's ``get_current_job`` doesn't fail
when run from rqworker (the job id is not in the failed queue).
"""
queue = get_queue()
job = queue.enqueue(access_self)
call_command('rqworker', '--burst')
failed_queue = Queue(name='failed', connection=queue.connection)
self.assertFalse(job.id in failed_queue.job_ids)
job.delete()
def test_delete_job(self):
"""
In addition to deleting job from Redis, the job id also needs to be
deleted from Queue.
"""
queue = get_queue('django_rq_test')
queue_index = get_queue_index('django_rq_test')
job = queue.enqueue(access_self)
self.client.post(reverse('rq_delete_job', args=[queue_index, job.id]),
{'post': 'yes'})
self.assertFalse(Job.exists(job.id, connection=queue.connection))
self.assertNotIn(job.id, queue.job_ids)
def test__get_queue(self, mod):
from rq import Queue
assert isinstance(mod._get_queue(), Queue)
def _get_queue():
# Connect to Redis Queue
return Queue('low', connection=conn)
# These are constants because other components may listen for these
# messages in logs:
def api_notifications():
"""Receive MTurk REST notifications."""
event_type = request.values['Event.1.EventType']
assignment_id = request.values.get('Event.1.AssignmentId')
participant_id = request.values.get('participant_id')
# Add the notification to the queue.
db.logger.debug('rq: Queueing %s with id: %s for worker_function',
event_type, assignment_id)
q.enqueue(worker_function, event_type, assignment_id,
participant_id)
db.logger.debug('rq: Submitted Queue Length: %d (%s)', len(q),
', '.join(q.job_ids))
return success_response()
def main():
"""
Init workers
"""
parser = argparse.ArgumentParser()
parser.add_argument('--queues', nargs='+', type=unicode, dest='queues', required=True)
args = parser.parse_args()
with Connection(connection=StrictRedis(**settings.REDIS)):
qs = map(Queue, args.queues) or [Queue()]
worker = Worker(qs)
worker.work()
def main():
app = Application(urls)
conn = redis.from_url(app.config['redis']['url'])
app.q = Queue(connection=conn)
app.start()
def get_osf_queue(queue_name):
redis_connection = get_redis_connection()
osf_queue = Queue(queue_name, connection=redis_connection, default_timeout=DEFAULT_JOB_TIMEOUT)
return osf_queue
def main():
with Connection(redis_connection):
worker = Worker(Queue('default'))
worker.work()
def get_job_queue(redis_client=None):
redis_client = redis_client or get_redis_client()
return Queue(connection=redis_client)
def __init__(self):
NwRedis.__init__(self)
NwParser.__init__(self)
self.topic_job_queue = rq.Queue(
connection=self.redis_connection,
name='scrape_topics',
default_timeout=200
)
self.schedule = BlockingScheduler()
def gen_topic_queue(self):
logging.debug('start topic enqueue')
topics_sql=self.topics_id_sqlite()
if len(topics_sql) <= 2000:
return
topics_rss=self.topics_id_rss()
# load topics
if os.path.exists('.topics_all.json'):
with open('.topics_all.json','r') as f:
tmp_topics=json.load(f)
else:
tmp_topics=list()
t_queue=Queue('topic',connection=self.redis_conn)
# gen queue
for topic in topics_rss:
if topic not in topics_sql and topic not in tmp_topics:
topic_id=int(topic)
t_queue.enqueue(topic_spider.start,topic_id, self.topic_sleep_time)
#save topics
topics_all=list()
topics_all.extend(tmp_topics)
topics_all.extend(topics_rss)
topics_all.extend(topics_sql)
topics_all=list(set(topics_all))
with open('.topics_all.json','w') as f:
json.dump(topics_all, f)
return