python类Manager()的实例源码

qpipe.py 文件源码 项目:qpipe 作者: dankinder 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def results(self):
        """Start the flow, block until completion, and return the results.
        """
        if self._started_operating:
            raise Exception("You cannot start a pipe flow that has already been run")
        result_pipe = self._result_pipe()

        if is_backend(Backend.MULTIPROCESSING):
            result_pipe._results = multiprocessing.Manager().list()
        else:
            result_pipe._results = []

        self.execute()

        if is_backend(Backend.MULTIPROCESSING):
            return list(result_pipe._results)
        else:
            return result_pipe._results
benchmarking.py 文件源码 项目:NordVPN-NetworkManager 作者: Chadsr 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_best_servers(server_list, ping_attempts, valid_protocols):
    manager = multiprocessing.Manager()
    best_servers = manager.dict()

    num_servers = len(server_list)
    num_processes = get_num_processes(num_servers)

    pool = multiprocessing.Pool(num_processes, maxtasksperchild=1)
    pool.map(partial(compare_server, best_servers=best_servers, ping_attempts=ping_attempts, valid_protocols=valid_protocols), server_list)
    pool.close()

    return best_servers
mpQueue.py 文件源码 项目:Concurrency-With-Python 作者: elliotforbes 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def main():
  m = multiprocessing.Manager()
  sharedQueue = m.Queue()
  sharedQueue.put(2)
  sharedQueue.put(3)
  sharedQueue.put(4)

  process1 = multiprocessing.Process(target=myTask, args=(sharedQueue,))
  process1.start()

  process2 = multiprocessing.Process(target=myTask, args=(sharedQueue,))
  process2.start()

  process3 = multiprocessing.Process(target=myTask, args=(sharedQueue,))
  process3.start()

  process2.join()
  process1.join()
  process3.join()
service.py 文件源码 项目:kuryr-kubernetes 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __init__(self):
        super(CNIDaemonServiceManager, self).__init__()
        # TODO(dulek): Use cotyledon.oslo_config_glue to support conf reload.

        # TODO(vikasc): Should be done using dynamically loadable OVO types
        #               plugin.
        objects.register_locally_defined_vifs()

        os_vif.initialize()
        clients.setup_kubernetes_client()

        self.manager = multiprocessing.Manager()
        registry = self.manager.dict()  # For Watcher->Server communication.
        self.add(CNIDaemonWatcherService, workers=1, args=(registry,))
        self.add(CNIDaemonServerService, workers=1, args=(registry,))
        self.register_hooks(on_terminate=self.terminate)
multiprocess.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _import_mp():
    global Process, Queue, Pool, Event, Value, Array
    try:
        from multiprocessing import Manager, Process
        #prevent the server process created in the manager which holds Python 
        #objects and allows other processes to manipulate them using proxies
        #to interrupt on SIGINT (keyboardinterrupt) so that the communication
        #channel between subprocesses and main process is still usable after
        #ctrl+C is received in the main process.
        old=signal.signal(signal.SIGINT, signal.SIG_IGN)
        m = Manager()
        #reset it back so main process will receive a KeyboardInterrupt
        #exception on ctrl+c
        signal.signal(signal.SIGINT, old)
        Queue, Pool, Event, Value, Array = (
                m.Queue, m.Pool, m.Event, m.Value, m.Array
        )
    except ImportError:
        warn("multiprocessing module is not available, multiprocess plugin "
             "cannot be used", RuntimeWarning)
pipeline.py 文件源码 项目:DeepFramework 作者: issey173 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __init__(self, core_classes_map):
        """Creates a Pipeline object.

        Args:
            core_classes_map (list[dict]): Each element in the list corresponds to a Core. The element must be a
                dictionary with the key Pipeline.KEY_CLASS and value the class that should be instantiated (the Core
                subclass). You can provide arguments to the constructor using the key Pipeline.KEY_KWARGS.
        """

        self.input_pipe, self.output_pipe = self._construct_pipes(core_classes_map)
        # Instantiate the core classes, connecting them with the created pipes
        self.cores = [core_class[self.KEY_CLASS](**core_class[self.KEY_KWARGS]) for core_class in core_classes_map]
        self.started = False
        self.results_manager = Manager()
        self.results = self.results_manager.dict()
        self.results_producer = PipeConsumer(self.output_pipe, self.results)
process_usage.py 文件源码 项目:base_function 作者: Rockyzsu 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def process_pool():
    p=Pool(10)
    start=time.time()
    #q1=Queue.Queue()
    manager=Manager()
    q=manager.Queue()
    print "main start ",start
    for i in xrange(10):
        p.apply_async(sub_pool,args=(q,))
    p.close()
    p.join()
    end=time.time()

    print "process done at ",end
    #print q
    print q.get()
    '''
    while q1.empty() ==False:
        d= q1.get(True)
        print d
    '''
main.py 文件源码 项目:UrbanSearch 作者: urbansearchTUD 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def create_ic_relations_to_db(num_workers, to_db=False):
    """
    Creates intercity relations and stores them in the database if desired.
    If storing is desired, a connection to the database must be possible.
    Blocks until the producers and workers are done.

    :param num_workers: The number of workers to use for computing the
    relation scores. This is a read-only operation.
    :param to_db: Defaults to false. If true, the relations are stored.
    """
    if to_db and not db_utils.connected_to_db():
        LOGGER.error('No database connection!')
        return

    w_factory = workers.Workers()
    man = Manager()
    queue = man.Queue()

    producers = w_factory.run_compute_ic_rels_workers(num_workers, queue,
                                                      join=False)
    consumers = w_factory.run_store_ic_rels_worker(queue, join=False,
                                                   to_db=to_db)

    # Join all workers when done
    _join_ic_rel_workers(w_factory, producers, consumers)
download_samples.py 文件源码 项目:gym-malware 作者: endgameinc 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def use_virustotal(args):
    """
    Use Virustotal to download the environment malware
    """
    m = multiprocessing.Manager()
    download_queue = m.JoinableQueue(args.nconcurrent)

    archive_procs = [
        multiprocessing.Process(
            target=download_worker_function,
            args=(download_queue, args.vtapikey))
        for i in range(args.nconcurrent)
    ]
    for w in archive_procs:
        w.start()

    for row in get_sample_hashes():
        download_queue.put(row["sha256"])

    for i in range(args.narchiveprocs):
        download_queue.put("STOP")

    download_queue.join()
    for w in archive_procs:
        w.join()
eval.py 文件源码 项目:arc-swift 作者: qipeng 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def bootstrap(diffs, B):
        m = multiprocessing.Manager()
        q = m.Queue()
        pool = multiprocessing.Pool()
        rs = pool.map_async(bs_one, [(diffs, q) for _ in xrange(B)])
        pool.close() # No more work
        while (True):
            if (rs.ready()): break

            log.info('Waiting for %d bootstrap samples to finish...' % (B - q.qsize()))
            time.sleep(1)

        assert(q.qsize() == B), "qsize=%d, B=%d" % (q.qsize(), B)
        count = [0] * len(diffs[0])
        for i in xrange(B):
            qres = q.get()
            for j in xrange(len(diffs[0])):
                count[j] += qres[j]
        assert(q.empty())

        return [(c + 1.0) / (B + 1.0) for c in count]    # smoothed p-value
gpu.py 文件源码 项目:wepy 作者: ADicksonLab 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self, n_walkers, n_workers=None, gpu_indices=None):

        if gpu_indices is not None:
            self.gpu_indices = gpu_indices
            self.n_workers = len(gpu_indices)
        else:
            assert n_workers, "If gpu_indices are not given the n_workers must be given"
            self.n_workers = n_workers
            self.gpu_indices = range(n_workers)

        # make a Queue for free workers, when one is being used it is
        # popped off and locked
        self.free_workers = mulproc.Queue()
        # the semaphore provides the locks on the workers
        self.lock = mulproc.Semaphore(self.n_workers)
        # initialize a list to put results in
        self.results_list = mulproc.Manager().list()
        for i in range(n_walkers):
            self.results_list.append(None)

        # add the free worker indices (not device/gpu indices) to the
        # free workers queue
        for i in range(self.n_workers):
            self.free_workers.put(i)
sequence.py 文件源码 项目:shellbot 作者: bernard357 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __init__(self, bot=None, machines=None, **kwargs):
        """
        Implements a sequence of multiple machines

        :param machines: the sequence of machines to be ran
        :type machines: list of Machine

        """
        self.bot = bot
        self.machines = machines

        self.lock = Lock()

        # prevent Manager() process to be interrupted
        handler = signal.signal(signal.SIGINT, signal.SIG_IGN)

        self.mutables = Manager().dict()

        # restore current handler for the rest of the program
        signal.signal(signal.SIGINT, handler)

        self.on_init(**kwargs)
context.py 文件源码 项目:shellbot 作者: bernard357 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __init__(self, settings=None, filter=None):
        """
        Stores settings across multiple independent processing units

        :param settings: the set of variables managed in this context
        :type settings: dict

        :param filter: a function to interpret values on check()
        :type filter: callable

        """

        # prevent Manager() process to be interrupted
        handler = signal.signal(signal.SIGINT, signal.SIG_IGN)

        self.lock = Lock()
        self.values = Manager().dict()

        # restore current handler for the rest of the program
        signal.signal(signal.SIGINT, handler)

        self.filter = filter if filter else self._filter

        if settings:
            self.apply(settings)
pytest_workload.py 文件源码 项目:taf 作者: taf3 项目源码 文件源码 阅读 83 收藏 0 点赞 0 评论 0
def __init__(self, env, workers):
        """Initialize WorkloadInterrupted object instance.

        Args:
            env(testlib.common3.Environment): TAF environment instance

        """
        self.env = env

        # Filter environment device for workload
        # get device with hw.stress_tool_attributes
        self.devices = [dev for dev in self.env.id_map.values()
                        if hasattr(dev, 'hw') and hasattr(dev.hw, 'stress_tool_attributes')]
        manager = Manager()
        self.workload_results = {}
        for dev in self.devices:
            self.workload_results[dev.id] = manager.list([])  # pylint: disable=no-member
        self.pool = ThreadPool(len(self.devices))
        self.workers = get_workers(workers)
        if not self.workers:
            self.workers = {'time': WORKLOAD_TIME}
        else:
            if not int(self.workers.get('time', 0)):
                self.workers['time'] = WORKLOAD_TIME
test_multiprocessing.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_answer_challenge_auth_failure(self):
        class _FakeConnection(object):
            def __init__(self):
                self.count = 0
            def recv_bytes(self, size):
                self.count += 1
                if self.count == 1:
                    return multiprocessing.connection.CHALLENGE
                elif self.count == 2:
                    return b'something bogus'
                return b''
            def send_bytes(self, data):
                pass
        self.assertRaises(multiprocessing.AuthenticationError,
                          multiprocessing.connection.answer_challenge,
                          _FakeConnection(), b'abc')

#
# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
#
test_multiprocessing.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_answer_challenge_auth_failure(self):
        class _FakeConnection(object):
            def __init__(self):
                self.count = 0
            def recv_bytes(self, size):
                self.count += 1
                if self.count == 1:
                    return multiprocessing.connection.CHALLENGE
                elif self.count == 2:
                    return b'something bogus'
                return b''
            def send_bytes(self, data):
                pass
        self.assertRaises(multiprocessing.AuthenticationError,
                          multiprocessing.connection.answer_challenge,
                          _FakeConnection(), b'abc')

#
# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
#
consensus_iAssembler.py 文件源码 项目:LoReAn 作者: lfaino 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def assembly(overlap_length, percent_identity, threads, wd, verbose):
    """
    """
    manage = Manager()
    queue = manage.Queue()
    pool = Pool(processes=int(threads), maxtasksperchild=10)

    new_commands = []
    for root, dirs, file in os.walk(wd):
        for fasta_file in file:
            complete_data = (fasta_file, percent_identity, overlap_length, wd, verbose, queue)
            new_commands.append(complete_data)
    results = pool.map_async(iAssembler, new_commands)
    with progressbar.ProgressBar(max_value=len(new_commands)) as bar:
        while not results.ready():
            size = queue.qsize()
            bar.update(size)
            time.sleep(1)
multiprocess.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _import_mp():
    global Process, Queue, Pool, Event, Value, Array
    try:
        from multiprocessing import Manager, Process
        #prevent the server process created in the manager which holds Python 
        #objects and allows other processes to manipulate them using proxies
        #to interrupt on SIGINT (keyboardinterrupt) so that the communication
        #channel between subprocesses and main process is still usable after
        #ctrl+C is received in the main process.
        old=signal.signal(signal.SIGINT, signal.SIG_IGN)
        m = Manager()
        #reset it back so main process will receive a KeyboardInterrupt
        #exception on ctrl+c
        signal.signal(signal.SIGINT, old)
        Queue, Pool, Event, Value, Array = (
                m.Queue, m.Pool, m.Event, m.Value, m.Array
        )
    except ImportError:
        warn("multiprocessing module is not available, multiprocess plugin "
             "cannot be used", RuntimeWarning)
swift_t.py 文件源码 项目:parsl 作者: Parsl 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__ (self, swift_attribs=None):
        ''' Initialize the thread pool
        Trying to implement the emews model.

        Kwargs:
            - swift_attribs : Takes a dict of swift attribs. Fot future.

        '''

        logger.debug("In __init__")
        self.mp_manager = mp.Manager()
        self.outgoing_q = self.mp_manager.Queue()
        self.incoming_q = self.mp_manager.Queue()
        self.isAlive   = True

        self._queue_management_thread = None
        self._start_queue_management_thread()
        logger.debug("Created management thread : %s", self._queue_management_thread)

        self.worker  = mp.Process(target=runner, args = (self.outgoing_q, self.incoming_q))
        self.worker.start()
        logger.debug("Created worker : %s", self.worker)
        self.tasks   = {}
solve.py 文件源码 项目:qlcoder 作者: L1nwatch 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def solve_with_pool():
    """
    ?????
    """
    manager = multiprocessing.Manager()
    pool = multiprocessing.Pool(40)
    check_codes = manager.dict()

    # ??? 1 ? 1000 ??????????
    pool.map(partial(get_verify_code, check_codes=check_codes), [i for i in range(1, 1000 + 1)])

    # ???????
    print(check_codes)
    check_codes = dict(check_codes)
    with open("result_check_code.txt", "w") as f:
        json.dump(check_codes, f)

    # ????
    vote(check_codes)
solve.py 文件源码 项目:qlcoder 作者: L1nwatch 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def solve_without_pool():
    """
    ??????
    :return:
    """
    manager = multiprocessing.Manager()
    check_codes = manager.dict()

    # ??? 1 ? 1000 ??????????
    jobs = list()
    for i in range(1, 1000 + 1):
        p = multiprocessing.Process(target=get_verify_code, args=(i, check_codes))
        jobs.append(p)
        p.start()

    for process in jobs:
        process.join()

    print(check_codes)

    # ????
    vote(check_codes)
LocalPoolQueue.py 文件源码 项目:PythonSkillTree 作者: w4n9H 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def main():
    """
    main process
    """
    m = Manager()
    q = m.Queue()
    plist = []
    pool = Pool(processes=20)
    for proc in plist:
        pool.apply_async(process, (q, proc))
    pool.close()
    pool.join()
    count = 0
    while True:
        if q.empty():
            print "empty"
            break
        else:
            c = q.get()
            print c
            count += c
    print count
truncatedata.py 文件源码 项目:navigator 作者: naviga-tor 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _main():
    """ Start multiple processes to truncate data out of measurements. """
    wlock = Manager().Lock()
    pool = PoolLimit()
    probes = cprobes()
    try:
        while True:
            try:
                cprobe = probes.next()
            except StopIteration:
                break
            pool.apply_async(_truncate, (cprobe, wlock))
        pool.close()
        pool.join()
    except KeyboardInterrupt:
        pass
data_loader.py 文件源码 项目:dbnet_tensorflow 作者: yuanluya 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __init__(self, num_processor, batch_size, phase,
                 batch_idx_init = 0, data_ids_init = train_ids, capacity = 10):
        self.num_processor = num_processor
        self.batch_size = batch_size
        self.data_load_capacity = capacity
        self.manager = Manager()
        self.batch_lock = Lock()
        self.mutex = Lock()
        self.cv_full = Condition(self.mutex)
        self.cv_empty = Condition(self.mutex)
        self.data_load_queue = self.manager.list()
        self.cur_batch = self.manager.list([batch_idx_init])
        self.processors = []
        if phase == 'train':
            self.data_ids = self.manager.list(data_ids_init)
        elif phase == 'test':
            self.data_ids = self.manager.list(test_ids)
        else:
            raise ValueError('Could not set phase to %s' % phase)
_test_multiprocessing.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_answer_challenge_auth_failure(self):
        class _FakeConnection(object):
            def __init__(self):
                self.count = 0
            def recv_bytes(self, size):
                self.count += 1
                if self.count == 1:
                    return multiprocessing.connection.CHALLENGE
                elif self.count == 2:
                    return b'something bogus'
                return b''
            def send_bytes(self, data):
                pass
        self.assertRaises(multiprocessing.AuthenticationError,
                          multiprocessing.connection.answer_challenge,
                          _FakeConnection(), b'abc')

#
# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
#
shared_data.py 文件源码 项目:ccdetection 作者: tommiu 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __init__(self, path, lock, in_path=None):
        """
        Setup all values to be shared (between processes) values.
        """
        self.lock = lock
        self.path = path

        if os.path.isfile(path):
            self.loadData()

        else:
            self.in_path = in_path
            self.clones = Manager().list()
            self.counter = Value("i", 0)
            self.nodes_total = Value("i", 0)            
            self.first_counter = Value("i", 0)
            self.query_time_total = Value("d", 0)
            self.projects_counter = Value("i", 0)
            self.first_query_time_total = Value("d", 0)
pipelines_manager.py 文件源码 项目:pypers 作者: frankosan 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(self):
        """
        Initialize the manager
        """
        self.logger = mp.log_to_stderr()
        self.logger.handlers[0].setFormatter(PipeFormatter())

        self.submitted = []
        self.actions = []
        self.process_thread = Thread(target=self.process)
        self.lock = Lock()
        self.pool = {'steps': '', 'pipelines': ''}

        self.pool['pipelines'] = mp.Pool(processes=MAX_PIPELINES,
                                         initializer=init_worker,
                                         maxtasksperchild=1)

        self.pool['steps'] = mp.Pool(processes=MAX_STEPS,
                                     initializer=init_worker,
                                     maxtasksperchild=1)

        self.manager = mp.Manager()
        self.pids = self.manager.dict()
        self.count = 0
TFSparkNode.py 文件源码 项目:TensorFlowOnSpark 作者: yahoo 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _get_manager(cluster_info, host, ppid):
  """Returns this executor's "singleton" instance of the multiprocessing.Manager, reconnecting per python-worker if needed.

  Args:
    :cluster_info: cluster node reservations
    :host: host IP
    :ppid: parent (executor JVM) PID

  Returns:
    TFManager instance for this executor/python-worker
  """
  for node in cluster_info:
    if node['host'] == host and node['ppid'] == ppid:
      addr = node['addr']
      authkey = node['authkey']
      TFSparkNode.mgr = TFManager.connect(addr,authkey)
      break
  logging.info("Connected to TFSparkNode.mgr on {0}, ppid={1}, state={2}".format(host, ppid, str(TFSparkNode.mgr.get('state'))))
  return TFSparkNode.mgr
TestKMeans_Shared.py 文件源码 项目:bnpy 作者: bnpy 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def setUp(self):
        # Create a JobQ (to hold tasks to be done)
        # and a ResultsQ (to hold results of completed tasks)
        manager = multiprocessing.Manager()
        self.JobQ = manager.Queue()
        self.ResultQ = manager.Queue()

        # Launch desired number of worker processes
        # We don't need to store references to these processes,
        # We can get everything we need from JobQ and ResultsQ
        # SHARED MEM: we need to give workers access to shared memory at
        # startup
        for uid in range(self.nWorkers):
            SharedMemWorker(
                uid, self.JobQ, self.ResultQ,
                Xsh=self.Xsh,
                Msh=self.Msh,
                returnVal=self.returnVal,
                sleepPerUnit=self.sleepPerUnit,
                verbose=self.verbose).start()
LocalStepUtil_ParallelIPC.py 文件源码 项目:bnpy 作者: bnpy 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def setUpWorkers(nWorker=1, verbose=0, nRepsForMinDuration=1, **kwargs):
    ''' Create queues and launch all workers.

    Returns
    -------
    JobQ
    ResultQ
    '''
    # Create a JobQ (to hold tasks to be done)
    # and a ResultsQ (to hold results of completed tasks)
    manager = multiprocessing.Manager()
    JobQ = manager.Queue()
    ResultQ = manager.Queue()

    # Launch desired number of worker processes
    # We don't need to store references to these processes,
    # We can get everything we need from JobQ and ResultsQ
    for uid in range(nWorker):
        workerProcess = Worker_IPCData_IPCModel(
            uid, JobQ, ResultQ,
            nReps=nRepsForMinDuration,
            verbose=verbose)
        workerProcess.start()
    return JobQ, ResultQ


问题


面经


文章

微信
公众号

扫码关注公众号