def mandelbrot_main(w, h, max_iterations=1000, output='mandelbrot_celery.png'):
""" Main function for mandelbrot program with celery """
job = group([mandelbrot_calc_row.s(y, w, h, max_iterations) for y in range(h)])
result = job.apply_async()
image = Image.new('RGB', (w, h))
for image_rows in result.join():
for k,v in image_rows.items():
k = int(k)
v = tuple(map(int, v))
x,y = k % args.width, k // args.width
image.putpixel((x,y), v)
image.save(output, 'PNG')
print('Saved to',output)
python类group()的实例源码
celery_mandelbrot.py 文件源码
项目:Software-Architecture-with-Python
作者: PacktPublishing
项目源码
文件源码
阅读 30
收藏 0
点赞 0
评论 0
def loads(payload):
if payload.get('type') != 'normal':
raise Exception('celery task loader only support normal mode')
tasks = payload.get('tasks', [])
cts = []
for task in tasks:
ops = [load(id, task.get('args'), task.get('on_error')) if i == 0 else load(id, None, task.get('on_error')) for
i, id in enumerate(task['ids'])]
cts.append(chain(ops))
callback = payload.get('callback')
if callback:
return chord(header=group(cts), body=func.load(callback).s())
return group(cts)
def initialize_generation(db_path: str, scenario_id: int, generation: int, genotypes: List[Genome],
pair_selection_f: PAIR_SELECTION_F_T, fitness_f: FITNESS_F_T, neat_config: CPPNNEATConfig,
ca_config: CAConfig) -> None:
from celery import group, chord
grouped_tasks = group(handle_individual.s(
scenario_id=scenario_id,
generation=generation,
individual_number=i,
genotype=genotype,
fitness_f=fitness_f,
ca_config=ca_config,
) for i, genotype in enumerate(genotypes))
final_task = persist_results.subtask(
args=(db_path, scenario_id, generation, fitness_f, pair_selection_f, neat_config, ca_config),
)
chord(grouped_tasks, final_task)()
def __init__(self, headers, job_info):
self.values = {
'group_id': headers['group'] or '',
'root_id': headers['root_id'],
'task_id': headers['id'],
'task_name': headers['task'],
'task_args': headers['argsrepr'],
'task_kwargs': headers['kwargsrepr'],
'app_name': headers['task'].rsplit('.', 1)[0],
'plugin': job_info['Plugin'],
}
# set defaults
if self.values['group_id']:
job_info.setdefault('BatchName', '{plugin}-{group_id}')
job_info.setdefault('Name', '{plugin}-{task_name}-{task_id}')
def job(plugin_name, frames, job_info=None, plugin_info=None):
"""
Create a group of tasks for executing each of the frame packets for
the given Deadline plugin.
Parameters
----------
plugin_name : str
frames : str
Returns
-------
celery.group
"""
deadline_group_id = ObjectId()
return group(
[plugin_task.signature((plugin_name, frames, frame, i),
plugin_info=plugin_info,
job_info=job_info,
deadline_group_id=deadline_group_id)
for i, frame in enumerate(parse_frames(frames))])
# --
# FIXME: unused:
def list_to_set_reducer(self, groups):
"""Flatten nested lists to a set of items
Expected shape of input, List of group results, each item in
group results is list of items to reduce.
[[[item1, item2], [item2, item3]], [[item4, item5]]]
:param groups: List of group results. Each group result is expected to be
an itterable containing itterables of set members.
:returns: List of unique values from the input
:rtype: list
"""
# TODO does this assume too much knowledge of the shape of the input?
# print 'list_to_set_reducer: {}'.format(groups)
s = set()
for g in groups:
for i in g:
s.update(i)
return list(s)
def get_end_task(self, tasks, required_kwargs):
"""Accepts any number of tasks as returned by _get_pipeline.
:param tasks: dictionary of str:info where str is the name of the task, info is from the registry
:param dict required_kwargs: Keyword arguments that some tasks require
:returns: celery.Signature, or celery.group, or None
"""
sigs = [
self.make_signature(info, required_kwargs)
for name, info in tasks.items()
if info['after'] is ALL
]
if not sigs:
return None
return sigs[0] if len(sigs) == 1 else group(sigs)
def test_create_chord_exclude_body(self):
"""If the body task of a chord is not a UserTask, it should be cleanly omitted from the status."""
chord([
sample_task.s(self.user.id, '1', user_task_name='Chord: 1 & 2'),
sample_task.s(self.user.id, '2', user_task_name='I should be ignored')
])(normal_task.s('3'))
assert UserTaskStatus.objects.count() == 4
chord_status = UserTaskStatus.objects.get(task_class='celery.chord')
assert chord_status.task_id
assert chord_status.parent is None
assert chord_status.is_container
assert chord_status.name == 'Chord: 1 & 2'
assert chord_status.total_steps == 2
verify_state(chord_status, False)
group_status = UserTaskStatus.objects.get(task_class='celery.group')
assert group_status.task_id
assert group_status.parent_id == chord_status.id
assert group_status.is_container
assert group_status.name == 'Chord: 1 & 2'
assert group_status.total_steps == 2
verify_state(group_status, False)
header_tasks = UserTaskStatus.objects.filter(parent=group_status)
assert len(header_tasks) == 2
for status in header_tasks:
assert status.task_id
assert status.parent_id == group_status.id
assert not status.is_container
assert status.name in ['SampleTask: 1', 'SampleTask: 2']
assert status.total_steps == 1
verify_state(status, False)
def _create_group(self, eager):
"""Create a celery group and verify some assertions about the corresponding status records"""
result = group(sample_task.s(self.user.id, '1'),
sample_task.s(self.user.id, '2', user_task_name='Group: 1, 2')).delay()
assert UserTaskStatus.objects.count() == 3
group_status = UserTaskStatus.objects.get(task_class='celery.group')
assert group_status.task_id == result.id
assert group_status.parent is None
assert group_status.is_container
assert group_status.name == 'Group: 1, 2'
assert group_status.total_steps == 2
verify_state(group_status, eager)
assert len(result.children) == 2
for result in result.children:
task_id = result.id
status = UserTaskStatus.objects.get(task_id=task_id)
assert status.parent_id == group_status.id
assert not status.is_container
assert status.name in ['SampleTask: 1', 'SampleTask: 2']
assert status.total_steps == 1
verify_state(status, eager)
def as_request_group(self, requests):
return group(
dispatch_requests.s([req.as_dict() for req in chunk])
for chunk in self.group_requests(requests)
)
def send(self, event, payload, sender,
timeout=None, context=None, **kwargs):
# the requests are sorted by url, so we group them into chunks
# each containing a list of requests for that host/port/scheme pair,
# with up to :setting:`THORN_CHUNKSIZE` requests each.
#
# this way requests have a good chance of reusing keepalive
# connections as requests with the same host are grouped together.
return self.as_request_group(self.prepare_requests(
event, payload, sender, timeout, context, **kwargs)).delay()
def batch_update_user_data():
"""
Create sub tasks to update user data like enrollments,
certificates and grades from edX platform.
"""
expiration = now_in_utc() + timedelta(hours=5)
lock = Lock(LOCK_ID, expiration)
if not lock.acquire():
# Lock should have expired by now
log.error("Unable to acquire lock for batch_update_user_data")
return
users_to_refresh = calculate_users_to_refresh_in_bulk()
jobs = release_batch_update_user_data_lock.s(token=lock.token.decode())
try:
if len(users_to_refresh) > 0:
user_id_chunks = chunks(users_to_refresh)
job = group(
batch_update_user_data_subtasks.s(user_id_chunk, expiration.timestamp())
for user_id_chunk in user_id_chunks
)
jobs = job | jobs
finally:
jobs.delay()
def get_recommendations(self):
from . import tasks
identifiers = Identifier.objects.filter(
source='isbn',
manifestations__in=self.manifestations.all())
grouped_identifier_ids = group(
tasks.get_recommendations.s(i.pk)
for i in identifiers)().get()
result_identifier_ids = itertools.chain.from_iterable(grouped_identifier_ids)
works = Work.objects.filter(pk__in=result_identifier_ids)
return works
def produce_hot_repo_report(period, ref_date=None):
# 1. parse date
ref_date_str = strf_date(period, ref_date=ref_date)
# 2. fetch and join
fetch_jobs = group([
fetch_hot_repos.s(ref_date_str, 100, 1),
fetch_hot_repos.s(ref_date_str, 100, 2),
fetch_hot_repos.s(ref_date_str, 100, 3),
fetch_hot_repos.s(ref_date_str, 100, 4),
fetch_hot_repos.s(ref_date_str, 100, 5)
])
# 3. group by language and
# 4. create csv
return chord(fetch_jobs)(build_report_task.s(ref_date_str)).get()
def build_report_task(results, ref_date):
all_repos = []
for repos in results:
all_repos += [Repository(repo) for repo in repos]
# 3. group by language
grouped_repos = {}
for repo in all_repos:
if repo.language in grouped_repos:
grouped_repos[repo.language].append(repo.name)
else:
grouped_repos[repo.language] = [repo.name]
# 4. create csv
lines = []
for lang in sorted(grouped_repos.keys()):
lines.append([lang] + grouped_repos[lang])
filename = '{media}/github-hot-repos-{date}.csv'.format(media=settings.MEDIA_ROOT, date=ref_date)
return make_csv(filename, lines)
def fetch_all_huts():
headers = {'x-api-key': settings.API_KEY}
subtasks = []
try:
r = requests.get(settings.API_HUTS_BASE_URL, headers=headers, timeout=settings.API_TIMEOUT)
except requests.exceptions.RequestException as e:
logger.exception(str(e))
if r.status_code == 200:
for h in r.json():
subtasks.append(fetch_hut.s(h['assetId']))
else:
logger.error("Failed huts request with status %s, %s", str(r.status_code), r.json()['message'])
results = group(subtasks)() # in parallel
results.get()
def start_sending(self):
from .tasks import send_mail
with transaction.atomic():
# Then, handle all ignored emails (optouts)
now = timezone.now()
for i in self.mails.not_legit_for(self):
MailStatus.objects.create(
mail=i, status=MailStatus.IGNORED,
creation_date=timezone.now(),
raw_msg='Ignored because of previous optout {}'.format(
i.get_related_optout()))
# Then, handle the legit ones
if not self.send_date:
self.send_date = timezone.now()
self.save()
self.notify(Message.SENDING)
# locking ?
legit_mails = self.mails.legit_for(self)
now = timezone.now()
MailStatus.objects.bulk_create([
MailStatus(
mail=i, status=MailStatus.QUEUED,
creation_date=now, raw_msg='Enqueued in celery')
for i in legit_mails])
# bulk_create do not update Mail (too high db cost)
legit_mails.update(
curstatus=MailStatus.QUEUED, latest_status_date=now)
# end_locking ?
tasks = celery.group([send_mail.s(m.pk) for m in legit_mails])
log.info('Starting sending {} (#{}) to {} recipients.'.format(
self, self.pk, len(tasks)))
tasks.apply_async()
def manyshort(self):
self.join(group(add.s(i, i) for i in range(1000))(),
timeout=10, propagate=True)
def always_timeout(self):
self.join(
group(sleeping.s(1).set(time_limit=0.1)
for _ in range(100))(),
timeout=10, propagate=False,
)
def alwayskilled(self):
g = group(kill.s() for _ in range(10))
self.join(g(), timeout=10)
def alwaysexits(self):
g = group(exiting.s() for _ in range(10))
self.join(g(), timeout=10)
def _evil_groupmember(self, evil_t, *eargs, **opts):
g1 = group(add.s(2, 2).set(**opts), evil_t.s(*eargs).set(**opts),
add.s(4, 4).set(**opts), add.s(8, 8).set(**opts))
g2 = group(add.s(3, 3).set(**opts), add.s(5, 5).set(**opts),
evil_t.s(*eargs).set(**opts), add.s(7, 7).set(**opts))
self.join(g1(), timeout=10)
self.join(g2(), timeout=10)
def _revoketerm(self, wait=None, terminate=True,
joindelay=True, data=BIG):
g = group(any_.s(data, sleep=wait) for i in range(8))
r = g()
if terminate:
if joindelay:
sleep(random.choice(range(4)))
r.revoke(terminate=True)
self.join(r, timeout=10)
def test_commit(repo, ref, tag):
chain = start_test.s(repo, ref) | group(test_board.s(ref=ref, repo=repo, tag=tag, board=board) for board in config["devices"]) | finish_test.s(repo, ref)
chain.delay()
# Adapted from: https://gist.github.com/andrewgross/8ba32af80ecccb894b82774782e7dcd4
def train_networks(self, networks):
"""
train each networks on cluster server
:param networks: network lists
:return: networks
"""
try :
tasks = []
#i = inspect()
#if (i.active() == None):
if (self.debug_mode):
# for debug you can run all tasks on django process
for network in networks:
if(network['flag'] == True ) :
continue
result = train(network.get('nn_id'), str(network.get('nn_wf_ver_id')))
key = '_'.join([network['nn_id'], str(network['nn_wf_ver_id'])])
network['acc'] = result[key].get('accuracy')
network['flag'] = True
else :
# You can use cluster servers for faster hyper parameter searching
# using cluster server with celery for genetic algorithm
for network in networks :
if (network['flag'] == True):
continue
tasks.append(train.subtask((network.get('nn_id'), str(network.get('nn_wf_ver_id')))))
results = group(tasks).apply_async()
results = results.join()
for result in results :
for network in networks :
key = '_'.join([network['nn_id'], str(network['nn_wf_ver_id'])])
if(key in list(result.keys()) and result[key] is not None and result[key].get('accuracy') is not None) :
network['acc'] = result[key].get('accuracy')
network['flag'] = True
return networks
except Exception as e :
logging.error("Error on training : {0} ".format(e))
def test():
# print add.delay(2, 2)
from celery import group, chain
job_info = {
'Name': '{task_name}{task_args}',
'BatchName': 'celery-{root_id}'
}
job = group([add.s(2, 2), add.s(4, 4)])
result = job.apply_async(job_info=job_info)
print("waiting for results:")
for x in result.iterate(propagate=False):
print("result is:" % x)
def test_fail():
from celery import group
job = group([add.s(2, 2), fail.s(), add.s(4, 4)])
# job_info or plugin_info must be passed to trigger submission to deadline
result = job.apply_async(job_info={})
print("waiting for results:")
# failure aborts iteration when propagate=True
for x in result.iterate():
print("result is:" % x)
def __handle_processed_messages(self):
"""
Perform any final housekeeping once all of the messages in the queue have been processed.
:return: None
"""
from tasknode.tasks import handle_scanning_order_from_pubsub
logger.debug(
"Now handling all processed messages (%s keys in targets)."
% (len(self._targets),)
)
for k, v in self._targets.iteritems():
if len(v) > 0:
targets = list(set(v))
task_sig = handle_scanning_order_from_pubsub.si(
org_uuid=k,
targets=targets,
)
task_sig.options["queue"] = config.celery_priority_queue_name
self._tasks.append(task_sig)
self._messages.append(
"Total of %s targets defined for organization %s."
% (len(targets), k)
)
logger.debug(
"Total number of tasks to kick off is %s."
% (len(self._tasks),)
)
if len(self._tasks) > 0:
canvas_sig = group(self._tasks)
canvas_sig.apply_async()
logger.debug("Tasks kicked off.")
def gather_data_for_domain_name(
self,
org_uuid=None,
domain_uuid=None,
domain_scan_uuid=None,
domain_name=None,
order_uuid=None,
):
"""
Perform all data gathering for the given domain name.
:param org_uuid: The UUID of the organization to retrieve data for.
:param domain_uuid: The UUID of the parent domain name that is being investigated.
:param domain_scan_uuid: The UUID of the domain name scan that this task is a part of.
:param domain_name: The domain name to collect data for.
:return: None
"""
logger.info(
"Now gathering information for domain name %s (parent domain %s)."
% (domain_name, domain_uuid)
)
record_types = get_dns_record_types_for_scan()
task_sigs = []
for record_type, do_scanning in record_types:
task_sigs.append(resolve_domain_name_for_organization.si(
org_uuid=org_uuid,
domain_uuid=domain_uuid,
domain_scan_uuid=domain_scan_uuid,
record_type=record_type,
order_uuid=order_uuid,
))
canvas_sig = group(task_sigs)
self.finish_after(signature=canvas_sig)
#USED
def get_whois_data_for_ip_address(
self,
org_uuid=None,
ip_address_uuid=None,
ip_address_scan_uuid=None,
order_uuid=None,
):
"""
Retrieve WHOIS data for the given IP address.
:param org_uuid: The UUID of the organization to perform data retrieval on behalf of.
:param ip_address_uuid: The UUID of the IP address to retrieve data about.
:param ip_address_scan_uuid: The UUID of the IP address scan to associate retrieved data with.
:return: None
"""
logger.info(
"Now retrieving WHOIS information for IP address %s."
% (ip_address_uuid,)
)
task_sigs = []
task_kwargs = {
"org_uuid": org_uuid,
"ip_address_uuid": ip_address_uuid,
"ip_address_scan_uuid": ip_address_scan_uuid,
"order_uuid": order_uuid,
}
task_sigs.append(get_arin_whois_data_for_ip_address.si(**task_kwargs))
if len(task_sigs) > 1:
collection_sig = group(task_sigs)
else:
collection_sig = task_sigs[0]
self.finish_after(signature=collection_sig)
#USED