def tasker(self):
node_configs_1=[{'sql':'SELECT ID FROM NODES WHERE topics >= 8000;','sleep_time':5,'between_time':900,'time_log':'8000_node','queue_name':'node1'},
{'sql':'SELECT ID FROM NODES WHERE topics BETWEEN 3000 AND 8000;','sleep_time':10,'between_time':1800,'time_log':'4000_node','queue_name':'node2'},
{'sql':'SELECT ID FROM NODES WHERE topics BETWEEN 1000 AND 3000;','sleep_time':20,'between_time':7200,'time_log':'1000_node','queue_name':'node3'},
{'sql':'SELECT ID FROM NODES WHERE topics BETWEEN 100 AND 1000;','sleep_time':90,'between_time':86400,'time_log':'500_node','queue_name':'node4'},
{'sql':'SELECT ID FROM NODES WHERE topics BETWEEN 1 AND 100;','sleep_time':90,'between_time':86400,'time_log':'0_node','queue_name':'node5'}]
node_configs_2=[{'sql':'SELECT ID FROM NODES WHERE topics >= 8000;','sleep_time':5,'between_time':1800,'time_log':'8000_node','queue_name':'node1'},
{'sql':'SELECT ID FROM NODES WHERE topics BETWEEN 3000 AND 8000;','sleep_time':10,'between_time':3600,'time_log':'4000_node','queue_name':'node2'},
{'sql':'SELECT ID FROM NODES WHERE topics BETWEEN 1000 AND 3000;','sleep_time':20,'between_time':14400,'time_log':'1000_node','queue_name':'node3'},
{'sql':'SELECT ID FROM NODES WHERE topics BETWEEN 100 AND 1000;','sleep_time':90,'between_time':86400,'time_log':'500_node','queue_name':'node4'},
{'sql':'SELECT ID FROM NODES WHERE topics BETWEEN 1 AND 100;','sleep_time':90,'between_time':86400,'time_log':'0_node','queue_name':'node5'}]
time.tzname=('CST', 'CST')
if int(time.strftime('%H')) >= 8 or int(time.strftime('%H')) < 2:
node_configs=node_configs_1
else:
node_configs=node_configs_2
for node_config in node_configs:
sql=node_config['sql']
sleep_time=node_config['sleep_time']
between_time=node_config['between_time']
time_log_name=node_config['time_log']
queue_name=node_config['queue_name']
q_node=Queue(queue_name,connection=self.redis_conn)
if int(time.time()) - int(self.time_log[time_log_name]) >= between_time:
logging.info('start enqueue, queue name: %s' % queue_name)
self.SQ.cursor.execute(sql)
node_ids=self.SQ.cursor.fetchall()
for node_id in node_ids:
node_id=node_id[0]
if queue_name not in ['node4','node5'] or (queue_name in ['node4','node5'] and node_id in self.node_number):
if queue_name in ['node4','node5']:
self.node_number.remove(int(node_id))
q_node.enqueue(node_spider.start,node_id,sleep_time)
self.time_log[time_log_name]=str(int(time.time()))
return
python类Queue()的实例源码
def sign_up():
"""For new users only: sign-up page."""
if request.method == 'POST':
f_name = request.form.get('f_name')
l_name = request.form.get('l_name')
email = request.form.get('email')
password = request.form.get('password')
goodreads_uid = int(request.form.get('goodreads_uid'))
rec_frequency = 1
user_id = 1
user = User.query.filter(User.email == email).all()
if user != []:
flash("Looks like you've already signed up! Please log in.")
return redirect(url_for('index'))
else:
new_user = User(email=email, password=password,
f_name=f_name, l_name=l_name,
goodreads_uid=goodreads_uid, rec_frequency=rec_frequency,
sign_up_date=dt.datetime.now(), paused=0, user_id=user_id)
db.session.add(new_user)
db.session.commit()
flash("Welcome to NextBook!")
session['current_user_id'] = new_user.user_id
## new user setup ###
q = Queue(connection=Redis())
results = q.enqueue_call(new_user_full_setup,
args = [gr_user_id, new_user.user_id, goodreads_key],
ttl=86400)
session['new_user_job_id'] = results.get_id()
return redirect(url_for('recommendations'))
return render_template('sign-up.html')
def fetchmails():
q = Queue(connection=Redis())
job = q.enqueue(gmail.process_user_messages_async, flask.session['credentials'])
return flask.jsonify({'jobId': job.get_id()})
def checkstatus(job_id):
q = Queue(connection=Redis())
job = q.fetch_job(job_id)
return flask.jsonify({'result': job.return_value, 'status': job.get_status()})
def main(msg, config=None, silent=False):
"""
Job enqueue
:param msg:str
:param config:object
:return:
"""
queue_dsn = config["queue"]["dsn"]
redis_conn = redis.from_url(queue_dsn)
q = Queue('high', connection=redis_conn)
ret = q.enqueue(push_messenger, msg, result_ttl=60)
print ret
return ret
def main(config):
global worker_config
worker_config = config
listen = config["listen"].values()
queue_dsn = config["queue"]["dsn"]
conn = redis.from_url(queue_dsn)
with Connection(conn):
worker = Worker(map(Queue, listen))
worker.work()
def main(config=None):
listen = config["listen"].values()
queue_dsn = config["queue"]["dsn"]
conn = redis.from_url(queue_dsn)
with Connection(conn):
worker = Worker(map(Queue, listen))
worker.work()
def add_hash_to_process_queue(sha1):
q = Queue('process', connection=Redis(host=envget('redis').get('host')))
job = q.enqueue('process_hash.generic_process_hash',
args=(sha1,), timeout=70)
def remove_queue_name_prefix(name):
u'''
Remove a queue name's prefix.
:raises ValueError: if the given name is not prefixed.
.. seealso:: :py:func:`add_queue_name_prefix`
'''
prefix = _get_queue_name_prefix()
if not name.startswith(prefix):
raise ValueError(u'Queue name "{}" is not prefixed.'.format(name))
return name[len(prefix):]
def get_all_queues():
u'''
Return all job queues currently in use.
:returns: The queues.
:rtype: List of ``rq.queue.Queue`` instances
.. seealso:: :py:func:`get_queue`
'''
redis_conn = _connect()
prefix = _get_queue_name_prefix()
return [q for q in rq.Queue.all(connection=redis_conn) if
q.name.startswith(prefix)]
def test_foreign_queues_are_ignored(self):
u'''
Test that foreign RQ-queues are ignored.
'''
# Create queues for this CKAN instance
self.enqueue(queue=u'q1')
self.enqueue(queue=u'q2')
# Create queue for another CKAN instance
with changed_config(u'ckan.site_id', u'some-other-ckan-instance'):
self.enqueue(queue=u'q2')
# Create queue not related to CKAN
rq.Queue(u'q4').enqueue_call(jobs.test_job)
all_queues = jobs.get_all_queues()
names = {jobs.remove_queue_name_prefix(q.name) for q in all_queues}
assert_equal(names, {u'q1', u'q2'})
def main():
# clear the failed task queue
redis_connection = get_redis_connection(settings['redis server'])
qfail = Queue("failed", connection=redis_connection)
qfail.empty()
# start up server
prism_server = PrismServer()
reactor.addSystemEventTrigger("before", "startup", prism_server.startService)
reactor.addSystemEventTrigger("before", "shutdown", prism_server.stopService)
# attempt to redistribute any local blobs
if settings['enqueue on startup']:
d = enqueue_on_start()
reactor.run()
def enqueue_stream(sd_hash, num_blobs_in_stream, db_dir, client_factory_class, redis_address=settings['redis server'],
host_infos=None):
timeout = (num_blobs_in_stream+1)*30
redis_connection = get_redis_connection(redis_address)
q = Queue(connection=redis_connection)
q.enqueue(process_stream, sd_hash, db_dir, client_factory_class, redis_address, host_infos, timeout=timeout)
def enqueue_blob(blob_hash, db_dir, client_factory_class, redis_address=settings['redis server'],
host_infos=None):
redis_connection = get_redis_connection(redis_address)
q = Queue(connection=redis_connection)
q.enqueue(process_blob, blob_hash, db_dir, client_factory_class, redis_address, host_getter, timeout=60)
def on_post(self, req, resp, doc_index):
try:
raw_json = req.stream.read()
except Exception as ex:
raise falcon.HTTPError(falcon.HTTP_400,
'Error',
ex.message)
try:
result_json = json.loads(raw_json, encoding='utf-8')
except ValueError:
raise falcon.HTTPError(falcon.HTTP_400,
'Malformed JSON',
'Could not decode the request body. The JSON was incorrect.')
"""
Enqueueing write request as jobs into document_write queue
and processing them in the background with workers.
"""
q = Queue('document_write', connection=self.db.connection())
job = q.enqueue_call(
func=postDocument, args=(result_json, doc_index), result_ttl=5000
)
LOG.info('POST request ' + str(job.get_id()))
resp.status = falcon.HTTP_202
resp.body = json.dumps(result_json, encoding='utf-8')
# This function handles DELETE reuqests
def on_delete(self, req, resp, doc_index):
"""
Enqueueing write request as jobs into document_delete queue
and processing them in the background with workers.
"""
q = Queue('document_delete', connection=self.db.connection())
job = q.enqueue_call(
func=delDocument, args=(doc_index,), result_ttl=5000
)
LOG.info('DELETE request ' + str(job.get_id()))
def test_job_no_result(self):
"""Execute a Git backend job that will not produce any results"""
args = {
'uri': 'http://example.com/',
'gitpath': os.path.join(self.dir, 'data/git_log_empty.txt'),
'from_date': datetime.datetime(2020, 1, 1, 1, 1, 1)
}
q = rq.Queue('queue', async=False)
job = q.enqueue(execute_perceval_job,
backend='git', backend_args=args,
qitems='items', task_id='mytask')
result = job.return_value
self.assertEqual(result.job_id, job.get_id())
self.assertEqual(result.task_id, 'mytask')
self.assertEqual(result.backend, 'git')
self.assertEqual(result.last_uuid, None)
self.assertEqual(result.max_date, None)
self.assertEqual(result.nitems, 0)
self.assertEqual(result.offset, None)
self.assertEqual(result.nresumed, 0)
commits = self.conn.lrange('items', 0, -1)
commits = [pickle.loads(c) for c in commits]
self.assertListEqual(commits, [])
def __init__(self, queue=None, redis_url=None, queue_name='kaneda'):
if not Redis:
raise ImproperlyConfigured('You need to install redis to use the RQ queue.')
if not Queue:
raise ImproperlyConfigured('You need to install rq library to use the RQ queue.')
if queue:
if not isinstance(queue, Queue):
raise ImproperlyConfigured('"queue" parameter is not an instance of RQ queue.')
self.queue = queue
elif redis_url:
self.queue = Queue(queue_name, connection=Redis.from_url(redis_url))
else:
self.queue = Queue(queue_name, connection=Redis())
def __init__(self, data, conn):
self.data = data
self.parallel_count = int(data['parallel_count'])
self.code_name = self.data['code_name']
self.train_id = self.data['train_id']
self.cur_iter_id = 0
self.q = Queue(connection=conn)
self.tasks = []
self.enqueue_task()
def main():
# Range of Fibonacci numbers to compute
fib_range = range(20, 34)
# Kick off the tasks asynchronously
async_results = {}
q = Queue()
for x in fib_range:
async_results[x] = q.enqueue(slow_fib, x)
start_time = time.time()
done = False
while not done:
os.system('clear')
print('Asynchronously: (now = %.2f)' % (time.time() - start_time,))
done = True
for x in fib_range:
result = async_results[x].return_value
if result is None:
done = False
result = '(calculating)'
print('fib(%d) = %s' % (x, result))
print('')
print('To start the actual in the background, run a worker:')
print(' python examples/run_worker.py')
time.sleep(0.2)
print('Done')