def results(self):
"""Start the flow, block until completion, and return the results.
"""
if self._started_operating:
raise Exception("You cannot start a pipe flow that has already been run")
result_pipe = self._result_pipe()
if is_backend(Backend.MULTIPROCESSING):
result_pipe._results = multiprocessing.Manager().list()
else:
result_pipe._results = []
self.execute()
if is_backend(Backend.MULTIPROCESSING):
return list(result_pipe._results)
else:
return result_pipe._results
python类Manager()的实例源码
def get_best_servers(server_list, ping_attempts, valid_protocols):
manager = multiprocessing.Manager()
best_servers = manager.dict()
num_servers = len(server_list)
num_processes = get_num_processes(num_servers)
pool = multiprocessing.Pool(num_processes, maxtasksperchild=1)
pool.map(partial(compare_server, best_servers=best_servers, ping_attempts=ping_attempts, valid_protocols=valid_protocols), server_list)
pool.close()
return best_servers
def main():
m = multiprocessing.Manager()
sharedQueue = m.Queue()
sharedQueue.put(2)
sharedQueue.put(3)
sharedQueue.put(4)
process1 = multiprocessing.Process(target=myTask, args=(sharedQueue,))
process1.start()
process2 = multiprocessing.Process(target=myTask, args=(sharedQueue,))
process2.start()
process3 = multiprocessing.Process(target=myTask, args=(sharedQueue,))
process3.start()
process2.join()
process1.join()
process3.join()
def __init__(self):
super(CNIDaemonServiceManager, self).__init__()
# TODO(dulek): Use cotyledon.oslo_config_glue to support conf reload.
# TODO(vikasc): Should be done using dynamically loadable OVO types
# plugin.
objects.register_locally_defined_vifs()
os_vif.initialize()
clients.setup_kubernetes_client()
self.manager = multiprocessing.Manager()
registry = self.manager.dict() # For Watcher->Server communication.
self.add(CNIDaemonWatcherService, workers=1, args=(registry,))
self.add(CNIDaemonServerService, workers=1, args=(registry,))
self.register_hooks(on_terminate=self.terminate)
def _import_mp():
global Process, Queue, Pool, Event, Value, Array
try:
from multiprocessing import Manager, Process
#prevent the server process created in the manager which holds Python
#objects and allows other processes to manipulate them using proxies
#to interrupt on SIGINT (keyboardinterrupt) so that the communication
#channel between subprocesses and main process is still usable after
#ctrl+C is received in the main process.
old=signal.signal(signal.SIGINT, signal.SIG_IGN)
m = Manager()
#reset it back so main process will receive a KeyboardInterrupt
#exception on ctrl+c
signal.signal(signal.SIGINT, old)
Queue, Pool, Event, Value, Array = (
m.Queue, m.Pool, m.Event, m.Value, m.Array
)
except ImportError:
warn("multiprocessing module is not available, multiprocess plugin "
"cannot be used", RuntimeWarning)
def __init__(self, core_classes_map):
"""Creates a Pipeline object.
Args:
core_classes_map (list[dict]): Each element in the list corresponds to a Core. The element must be a
dictionary with the key Pipeline.KEY_CLASS and value the class that should be instantiated (the Core
subclass). You can provide arguments to the constructor using the key Pipeline.KEY_KWARGS.
"""
self.input_pipe, self.output_pipe = self._construct_pipes(core_classes_map)
# Instantiate the core classes, connecting them with the created pipes
self.cores = [core_class[self.KEY_CLASS](**core_class[self.KEY_KWARGS]) for core_class in core_classes_map]
self.started = False
self.results_manager = Manager()
self.results = self.results_manager.dict()
self.results_producer = PipeConsumer(self.output_pipe, self.results)
def process_pool():
p=Pool(10)
start=time.time()
#q1=Queue.Queue()
manager=Manager()
q=manager.Queue()
print "main start ",start
for i in xrange(10):
p.apply_async(sub_pool,args=(q,))
p.close()
p.join()
end=time.time()
print "process done at ",end
#print q
print q.get()
'''
while q1.empty() ==False:
d= q1.get(True)
print d
'''
def create_ic_relations_to_db(num_workers, to_db=False):
"""
Creates intercity relations and stores them in the database if desired.
If storing is desired, a connection to the database must be possible.
Blocks until the producers and workers are done.
:param num_workers: The number of workers to use for computing the
relation scores. This is a read-only operation.
:param to_db: Defaults to false. If true, the relations are stored.
"""
if to_db and not db_utils.connected_to_db():
LOGGER.error('No database connection!')
return
w_factory = workers.Workers()
man = Manager()
queue = man.Queue()
producers = w_factory.run_compute_ic_rels_workers(num_workers, queue,
join=False)
consumers = w_factory.run_store_ic_rels_worker(queue, join=False,
to_db=to_db)
# Join all workers when done
_join_ic_rel_workers(w_factory, producers, consumers)
def use_virustotal(args):
"""
Use Virustotal to download the environment malware
"""
m = multiprocessing.Manager()
download_queue = m.JoinableQueue(args.nconcurrent)
archive_procs = [
multiprocessing.Process(
target=download_worker_function,
args=(download_queue, args.vtapikey))
for i in range(args.nconcurrent)
]
for w in archive_procs:
w.start()
for row in get_sample_hashes():
download_queue.put(row["sha256"])
for i in range(args.narchiveprocs):
download_queue.put("STOP")
download_queue.join()
for w in archive_procs:
w.join()
def bootstrap(diffs, B):
m = multiprocessing.Manager()
q = m.Queue()
pool = multiprocessing.Pool()
rs = pool.map_async(bs_one, [(diffs, q) for _ in xrange(B)])
pool.close() # No more work
while (True):
if (rs.ready()): break
log.info('Waiting for %d bootstrap samples to finish...' % (B - q.qsize()))
time.sleep(1)
assert(q.qsize() == B), "qsize=%d, B=%d" % (q.qsize(), B)
count = [0] * len(diffs[0])
for i in xrange(B):
qres = q.get()
for j in xrange(len(diffs[0])):
count[j] += qres[j]
assert(q.empty())
return [(c + 1.0) / (B + 1.0) for c in count] # smoothed p-value
def __init__(self, n_walkers, n_workers=None, gpu_indices=None):
if gpu_indices is not None:
self.gpu_indices = gpu_indices
self.n_workers = len(gpu_indices)
else:
assert n_workers, "If gpu_indices are not given the n_workers must be given"
self.n_workers = n_workers
self.gpu_indices = range(n_workers)
# make a Queue for free workers, when one is being used it is
# popped off and locked
self.free_workers = mulproc.Queue()
# the semaphore provides the locks on the workers
self.lock = mulproc.Semaphore(self.n_workers)
# initialize a list to put results in
self.results_list = mulproc.Manager().list()
for i in range(n_walkers):
self.results_list.append(None)
# add the free worker indices (not device/gpu indices) to the
# free workers queue
for i in range(self.n_workers):
self.free_workers.put(i)
def __init__(self, bot=None, machines=None, **kwargs):
"""
Implements a sequence of multiple machines
:param machines: the sequence of machines to be ran
:type machines: list of Machine
"""
self.bot = bot
self.machines = machines
self.lock = Lock()
# prevent Manager() process to be interrupted
handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
self.mutables = Manager().dict()
# restore current handler for the rest of the program
signal.signal(signal.SIGINT, handler)
self.on_init(**kwargs)
def __init__(self, settings=None, filter=None):
"""
Stores settings across multiple independent processing units
:param settings: the set of variables managed in this context
:type settings: dict
:param filter: a function to interpret values on check()
:type filter: callable
"""
# prevent Manager() process to be interrupted
handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
self.lock = Lock()
self.values = Manager().dict()
# restore current handler for the rest of the program
signal.signal(signal.SIGINT, handler)
self.filter = filter if filter else self._filter
if settings:
self.apply(settings)
def __init__(self, env, workers):
"""Initialize WorkloadInterrupted object instance.
Args:
env(testlib.common3.Environment): TAF environment instance
"""
self.env = env
# Filter environment device for workload
# get device with hw.stress_tool_attributes
self.devices = [dev for dev in self.env.id_map.values()
if hasattr(dev, 'hw') and hasattr(dev.hw, 'stress_tool_attributes')]
manager = Manager()
self.workload_results = {}
for dev in self.devices:
self.workload_results[dev.id] = manager.list([]) # pylint: disable=no-member
self.pool = ThreadPool(len(self.devices))
self.workers = get_workers(workers)
if not self.workers:
self.workers = {'time': WORKLOAD_TIME}
else:
if not int(self.workers.get('time', 0)):
self.workers['time'] = WORKLOAD_TIME
def test_answer_challenge_auth_failure(self):
class _FakeConnection(object):
def __init__(self):
self.count = 0
def recv_bytes(self, size):
self.count += 1
if self.count == 1:
return multiprocessing.connection.CHALLENGE
elif self.count == 2:
return b'something bogus'
return b''
def send_bytes(self, data):
pass
self.assertRaises(multiprocessing.AuthenticationError,
multiprocessing.connection.answer_challenge,
_FakeConnection(), b'abc')
#
# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
#
def test_answer_challenge_auth_failure(self):
class _FakeConnection(object):
def __init__(self):
self.count = 0
def recv_bytes(self, size):
self.count += 1
if self.count == 1:
return multiprocessing.connection.CHALLENGE
elif self.count == 2:
return b'something bogus'
return b''
def send_bytes(self, data):
pass
self.assertRaises(multiprocessing.AuthenticationError,
multiprocessing.connection.answer_challenge,
_FakeConnection(), b'abc')
#
# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
#
def assembly(overlap_length, percent_identity, threads, wd, verbose):
"""
"""
manage = Manager()
queue = manage.Queue()
pool = Pool(processes=int(threads), maxtasksperchild=10)
new_commands = []
for root, dirs, file in os.walk(wd):
for fasta_file in file:
complete_data = (fasta_file, percent_identity, overlap_length, wd, verbose, queue)
new_commands.append(complete_data)
results = pool.map_async(iAssembler, new_commands)
with progressbar.ProgressBar(max_value=len(new_commands)) as bar:
while not results.ready():
size = queue.qsize()
bar.update(size)
time.sleep(1)
def _import_mp():
global Process, Queue, Pool, Event, Value, Array
try:
from multiprocessing import Manager, Process
#prevent the server process created in the manager which holds Python
#objects and allows other processes to manipulate them using proxies
#to interrupt on SIGINT (keyboardinterrupt) so that the communication
#channel between subprocesses and main process is still usable after
#ctrl+C is received in the main process.
old=signal.signal(signal.SIGINT, signal.SIG_IGN)
m = Manager()
#reset it back so main process will receive a KeyboardInterrupt
#exception on ctrl+c
signal.signal(signal.SIGINT, old)
Queue, Pool, Event, Value, Array = (
m.Queue, m.Pool, m.Event, m.Value, m.Array
)
except ImportError:
warn("multiprocessing module is not available, multiprocess plugin "
"cannot be used", RuntimeWarning)
def __init__ (self, swift_attribs=None):
''' Initialize the thread pool
Trying to implement the emews model.
Kwargs:
- swift_attribs : Takes a dict of swift attribs. Fot future.
'''
logger.debug("In __init__")
self.mp_manager = mp.Manager()
self.outgoing_q = self.mp_manager.Queue()
self.incoming_q = self.mp_manager.Queue()
self.isAlive = True
self._queue_management_thread = None
self._start_queue_management_thread()
logger.debug("Created management thread : %s", self._queue_management_thread)
self.worker = mp.Process(target=runner, args = (self.outgoing_q, self.incoming_q))
self.worker.start()
logger.debug("Created worker : %s", self.worker)
self.tasks = {}
def solve_with_pool():
"""
?????
"""
manager = multiprocessing.Manager()
pool = multiprocessing.Pool(40)
check_codes = manager.dict()
# ??? 1 ? 1000 ??????????
pool.map(partial(get_verify_code, check_codes=check_codes), [i for i in range(1, 1000 + 1)])
# ???????
print(check_codes)
check_codes = dict(check_codes)
with open("result_check_code.txt", "w") as f:
json.dump(check_codes, f)
# ????
vote(check_codes)
def solve_without_pool():
"""
??????
:return:
"""
manager = multiprocessing.Manager()
check_codes = manager.dict()
# ??? 1 ? 1000 ??????????
jobs = list()
for i in range(1, 1000 + 1):
p = multiprocessing.Process(target=get_verify_code, args=(i, check_codes))
jobs.append(p)
p.start()
for process in jobs:
process.join()
print(check_codes)
# ????
vote(check_codes)
def main():
"""
main process
"""
m = Manager()
q = m.Queue()
plist = []
pool = Pool(processes=20)
for proc in plist:
pool.apply_async(process, (q, proc))
pool.close()
pool.join()
count = 0
while True:
if q.empty():
print "empty"
break
else:
c = q.get()
print c
count += c
print count
def _main():
""" Start multiple processes to truncate data out of measurements. """
wlock = Manager().Lock()
pool = PoolLimit()
probes = cprobes()
try:
while True:
try:
cprobe = probes.next()
except StopIteration:
break
pool.apply_async(_truncate, (cprobe, wlock))
pool.close()
pool.join()
except KeyboardInterrupt:
pass
def __init__(self, num_processor, batch_size, phase,
batch_idx_init = 0, data_ids_init = train_ids, capacity = 10):
self.num_processor = num_processor
self.batch_size = batch_size
self.data_load_capacity = capacity
self.manager = Manager()
self.batch_lock = Lock()
self.mutex = Lock()
self.cv_full = Condition(self.mutex)
self.cv_empty = Condition(self.mutex)
self.data_load_queue = self.manager.list()
self.cur_batch = self.manager.list([batch_idx_init])
self.processors = []
if phase == 'train':
self.data_ids = self.manager.list(data_ids_init)
elif phase == 'test':
self.data_ids = self.manager.list(test_ids)
else:
raise ValueError('Could not set phase to %s' % phase)
def test_answer_challenge_auth_failure(self):
class _FakeConnection(object):
def __init__(self):
self.count = 0
def recv_bytes(self, size):
self.count += 1
if self.count == 1:
return multiprocessing.connection.CHALLENGE
elif self.count == 2:
return b'something bogus'
return b''
def send_bytes(self, data):
pass
self.assertRaises(multiprocessing.AuthenticationError,
multiprocessing.connection.answer_challenge,
_FakeConnection(), b'abc')
#
# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
#
def __init__(self, path, lock, in_path=None):
"""
Setup all values to be shared (between processes) values.
"""
self.lock = lock
self.path = path
if os.path.isfile(path):
self.loadData()
else:
self.in_path = in_path
self.clones = Manager().list()
self.counter = Value("i", 0)
self.nodes_total = Value("i", 0)
self.first_counter = Value("i", 0)
self.query_time_total = Value("d", 0)
self.projects_counter = Value("i", 0)
self.first_query_time_total = Value("d", 0)
def __init__(self):
"""
Initialize the manager
"""
self.logger = mp.log_to_stderr()
self.logger.handlers[0].setFormatter(PipeFormatter())
self.submitted = []
self.actions = []
self.process_thread = Thread(target=self.process)
self.lock = Lock()
self.pool = {'steps': '', 'pipelines': ''}
self.pool['pipelines'] = mp.Pool(processes=MAX_PIPELINES,
initializer=init_worker,
maxtasksperchild=1)
self.pool['steps'] = mp.Pool(processes=MAX_STEPS,
initializer=init_worker,
maxtasksperchild=1)
self.manager = mp.Manager()
self.pids = self.manager.dict()
self.count = 0
def _get_manager(cluster_info, host, ppid):
"""Returns this executor's "singleton" instance of the multiprocessing.Manager, reconnecting per python-worker if needed.
Args:
:cluster_info: cluster node reservations
:host: host IP
:ppid: parent (executor JVM) PID
Returns:
TFManager instance for this executor/python-worker
"""
for node in cluster_info:
if node['host'] == host and node['ppid'] == ppid:
addr = node['addr']
authkey = node['authkey']
TFSparkNode.mgr = TFManager.connect(addr,authkey)
break
logging.info("Connected to TFSparkNode.mgr on {0}, ppid={1}, state={2}".format(host, ppid, str(TFSparkNode.mgr.get('state'))))
return TFSparkNode.mgr
def setUp(self):
# Create a JobQ (to hold tasks to be done)
# and a ResultsQ (to hold results of completed tasks)
manager = multiprocessing.Manager()
self.JobQ = manager.Queue()
self.ResultQ = manager.Queue()
# Launch desired number of worker processes
# We don't need to store references to these processes,
# We can get everything we need from JobQ and ResultsQ
# SHARED MEM: we need to give workers access to shared memory at
# startup
for uid in range(self.nWorkers):
SharedMemWorker(
uid, self.JobQ, self.ResultQ,
Xsh=self.Xsh,
Msh=self.Msh,
returnVal=self.returnVal,
sleepPerUnit=self.sleepPerUnit,
verbose=self.verbose).start()
def setUpWorkers(nWorker=1, verbose=0, nRepsForMinDuration=1, **kwargs):
''' Create queues and launch all workers.
Returns
-------
JobQ
ResultQ
'''
# Create a JobQ (to hold tasks to be done)
# and a ResultsQ (to hold results of completed tasks)
manager = multiprocessing.Manager()
JobQ = manager.Queue()
ResultQ = manager.Queue()
# Launch desired number of worker processes
# We don't need to store references to these processes,
# We can get everything we need from JobQ and ResultsQ
for uid in range(nWorker):
workerProcess = Worker_IPCData_IPCModel(
uid, JobQ, ResultQ,
nReps=nRepsForMinDuration,
verbose=verbose)
workerProcess.start()
return JobQ, ResultQ