def get_container_id_mapping(pool, compose_cmd):
service_names = subprocess.check_output(
compose_cmd + ["config", "--services"]
)
service_names = service_names.strip().decode("utf-8").split("\n")
id_mapping = {
name: pool.apply_async(pool_container_id, (name, compose_cmd))
for name in service_names
}
while not all(future.ready() for future in id_mapping.values()):
time.sleep(0.1)
for name, future in list(id_mapping.items()):
if not future.successful():
raise RuntimeError("Cannot get ID of service {0}".format(name))
id_mapping[name] = future.get()
return id_mapping
python类apply_async()的实例源码
def timeout(max_timeout):
"""Timeout decorator, parameter in seconds."""
def timeout_decorator(item):
"""Wrap the original function."""
@functools.wraps(item)
def func_wrapper(*args, **kwargs):
"""Closure for function."""
pool = multiprocessing.pool.ThreadPool(processes=1)
async_result = pool.apply_async(item, args, kwargs)
# raises a TimeoutError if execution exceeds max_timeout
return async_result.get(max_timeout)
return func_wrapper
return timeout_decorator
def timeout(max_timeout):
"""Timeout decorator, parameter in seconds."""
def timeout_decorator(item):
"""Wrap the original function."""
@functools.wraps(item)
def func_wrapper(*args, **kwargs):
"""Closure for function."""
pool = multiprocessing.pool.ThreadPool(processes=1)
async_result = pool.apply_async(item, args, kwargs)
# raises a TimeoutError if execution exceeds max_timeout
return async_result.get(max_timeout)
return func_wrapper
return timeout_decorator
#??????????????????
def timeout(max_timeout):
"""Timeout decorator, parameter in seconds."""
def timeout_decorator(item):
"""Wrap the original function."""
@functools.wraps(item)
def func_wrapper(*args, **kwargs):
"""Closure for function."""
pool = multiprocessing.pool.ThreadPool(processes=1)
async_result = pool.apply_async(item, args, kwargs)
# raises a TimeoutError if execution exceeds max_timeout
return async_result.get(max_timeout)
return func_wrapper
return timeout_decorator
def timeout(max_timeout):
"""Timeout decorator, parameter in seconds."""
def timeout_decorator(item):
"""Wrap the original function."""
@functools.wraps(item)
def func_wrapper(*args, **kwargs):
"""Closure for function."""
pool = multiprocessing.pool.ThreadPool(processes=1)
async_result = pool.apply_async(item, args, kwargs)
# raises a TimeoutError if execution exceeds max_timeout
return async_result.get(max_timeout)
return func_wrapper
return timeout_decorator
def timeout(max_timeout):
"""Timeout decorator, parameter in seconds."""
def timeout_decorator(item):
"""Wrap the original function."""
@functools.wraps(item)
def func_wrapper(*args, **kwargs):
"""Closure for function."""
pool = multiprocessing.pool.ThreadPool(processes=1)
async_result = pool.apply_async(item, args, kwargs)
# raises a TimeoutError if execution exceeds max_timeout
return async_result.get(max_timeout)
return func_wrapper
return timeout_decorator
def timeout(max_timeout):
"""Timeout decorator, parameter in seconds."""
def timeout_decorator(item):
"""Wrap the original function."""
@functools.wraps(item)
def func_wrapper(*args, **kwargs):
"""Closure for function."""
pool = multiprocessing.pool.ThreadPool(processes=1)
async_result = pool.apply_async(item, args, kwargs)
# raises a TimeoutError if execution exceeds max_timeout
return async_result.get(max_timeout)
return func_wrapper
return timeout_decorator
def process_main_files(pool, snapshot_dir, compose_cmd, container_ids):
pool.apply_async(collect_backup, [snapshot_dir, compose_cmd])
pool.apply_async(collect_docker_info, [snapshot_dir])
pool.apply_async(collect_docker_version, [snapshot_dir])
pool.apply_async(
collect_docker_compose_config, [snapshot_dir, compose_cmd])
pool.apply_async(collect_all_logs, [snapshot_dir, compose_cmd])
pool.apply_async(collect_monitoring_results,
[snapshot_dir, container_ids["admin"]])
def process_service_files(pool, name, container_id, snapshot_dir, compose_cmd):
service_snapshot_dir = os.path.join(snapshot_dir, name)
pool.apply_async(collect_service_log,
[service_snapshot_dir, name, compose_cmd])
pool.apply_async(collect_service_date,
[service_snapshot_dir, name, compose_cmd])
pool.apply_async(collect_service_unix_timestamp,
[service_snapshot_dir, name, compose_cmd])
pool.apply_async(collect_service_packages_os,
[service_snapshot_dir, name, compose_cmd])
pool.apply_async(collect_service_ps,
[service_snapshot_dir, name, compose_cmd])
pool.apply_async(collect_service_docker_inspect,
[service_snapshot_dir, name, container_id])
pool.apply_async(collect_service_docker_stats,
[service_snapshot_dir, name, container_id])
pool.apply_async(collect_service_config,
[service_snapshot_dir, name, container_id])
pool.apply_async(collect_service_git_release,
[service_snapshot_dir, name, container_id])
pool.apply_async(collect_service_decapod_release,
[service_snapshot_dir, name, container_id])
pool.apply_async(collect_service_packages_npm,
[service_snapshot_dir, name, container_id])
pool.apply_async(collect_service_packages_python2,
[service_snapshot_dir, name, container_id])
pool.apply_async(collect_service_packages_python3,
[service_snapshot_dir, name, container_id])
pool.apply_async(collect_service_ansible_config,
[service_snapshot_dir, name, container_id])
pool.apply_async(collect_service_private_key_sha1sum,
[service_snapshot_dir, name, compose_cmd])
def timeout(max_timeout):
"""Timeout decorator, parameter in seconds."""
def timeout_decorator(f):
"""Wrap the original function."""
@functools.wraps(f)
def func_wrapper(self, *args, **kwargs):
"""Closure for function."""
pool = multiprocessing.pool.ThreadPool(processes=1)
async_result = pool.apply_async(f, (self,) + args, kwargs)
timeout = kwargs.pop('timeout_max_timeout', max_timeout) or max_timeout
# raises a TimeoutError if execution exceeds max_timeout
return async_result.get(timeout)
return func_wrapper
return timeout_decorator
def main():
process_pool_context = multiprocessing.get_context('spawn')
pool = multiprocessing.pool.Pool(
processes=4,
context=process_pool_context,
)
pool.apply_async(
func=zmq_streamer,
)
multiprocessing_manager = multiprocessing.Manager()
multiprocessing_queue = multiprocessing_manager.Queue(
maxsize=test_queue_size,
)
for i in range(test_queue_size):
multiprocessing_queue.put(b'1')
res = pool.apply_async(
func=consume_queue,
args=(multiprocessing_queue,),
)
res.get()
context = zmq.Context()
socket = context.socket(zmq.PAIR)
res = pool.apply_async(
func=consume_zmq_pair,
)
time.sleep(1)
socket.connect("tcp://localhost:%s" % zmq_port)
for i in range(test_queue_size):
socket.send(b'1')
res.get()
socket.close()
context = zmq.Context()
socket = context.socket(zmq.PUSH)
res = pool.apply_async(
func=consume_zmq_streamer,
)
time.sleep(1)
socket.connect("tcp://localhost:%s" % zmq_queue_port_pull)
for i in range(test_queue_size):
socket.send(b'1')
res.wait()
socket.close()