def trainPool(population,envNum,species,queue,env):
before = time.time()
results = []
jobs = Queue()
lock = multiprocessing.Lock()
s = 0
for specie in species:
g=0
for genome in specie.genomes:
genome.generateNetwork()
jobs.put((s,g,genome))
g+=1
s+=1
mPool = multiprocessing.Pool(processes=envNum,initializer = poolInitializer,initargs=(jobs,lock,))
results = mPool.map(jobTrainer,[env]*envNum)
mPool.close()
mPool.join()
after = time.time()
killFCEUX()
print("next generation")
queue.put(results)
python类Lock()的实例源码
def __init__(self, controller, style):
# Shared objects to help event handling.
self.events = Queue()
self.lock = Lock()
self.view = MainWindow(controller)
self.screen = raw_display.Screen()
self.screen.set_terminal_properties(256)
self.loop = MainLoop(widget=self,
palette=style,
screen=self.screen,
unhandled_input=Tui.exit_handler,
pop_ups=True)
self.pipe = self.loop.watch_pipe(self.update_ui)
self.loop.set_alarm_in(0.1, Tui.update_timer, self.view.logo.timer)
super(Tui, self).__init__(self.view)
connect_signal(self.view.issues_table, 'refresh', lambda source: self.loop.draw_screen())
connect_signal(self.view.stat_table, 'refresh', lambda source: self.loop.draw_screen())
def __init__(self, topics, config, consumer_factory):
self.config = config
self.termination_flag = None
self.partitioner = Partitioner(
config,
topics,
self.acquire,
self.release,
)
self.consumers = None
self.consumers_lock = Lock()
self.consumer_procs = {}
self.consumer_factory = consumer_factory
self.log = logging.getLogger(self.__class__.__name__)
self.pre_rebalance_callback = config.pre_rebalance_callback
self.post_rebalance_callback = config.post_rebalance_callback
def array(shape, dtype=_np.float64, autolock=False):
"""Factory method for shared memory arrays supporting all numpy dtypes."""
assert _NP_AVAILABLE, (
"To use the shared array object, numpy must be available!")
if not isinstance(dtype, _np.dtype):
dtype = _np.dtype(dtype)
# Not bothering to translate the numpy dtypes to ctype types directly,
# because they're only partially supported. Instead, create a byte ctypes
# array of the right size and use a view of the appropriate datatype.
shared_arr = _multiprocessing.Array(
'b', int(_np.prod(shape) * dtype.alignment), lock=autolock)
with _warnings.catch_warnings():
# For more information on why this is necessary, see
# https://www.reddit.com/r/Python/comments/j3qjb/parformatlabpool_replacement
_warnings.simplefilter('ignore', RuntimeWarning)
data = _np.ctypeslib.as_array(shared_arr).view(dtype).reshape(shape)
return data
def connect(self):
'''
Opens a serial connection to the Paradox Alarm Panel.
To do: Add a loop to attempt a connection several times before giving up.
'''
self._lock = Lock() #Does this do anything?
try:
self._pipe = serial.Serial(self._port, self._speed, timeout=1)
self._pipe.flushInput() #Gets rid of /X0 after being disconnected for long?
except SerialException:
if self._port is None:
_LOGGER.error(str.format('Port not configured yet.'))
else:
self.reconnect()
else:
#Connection should now be open
self._shutdown = False
_LOGGER.info(str.format("Connected to Paradox on port: {0}, speed: {1}",
self._port, self._speed))
def refactor(self, items, write=False, doctests_only=False,
num_processes=1):
if num_processes == 1:
return super(MultiprocessRefactoringTool, self).refactor(
items, write, doctests_only)
try:
import multiprocessing
except ImportError:
raise MultiprocessingUnsupported
if self.queue is not None:
raise RuntimeError("already doing multiple processes")
self.queue = multiprocessing.JoinableQueue()
self.output_lock = multiprocessing.Lock()
processes = [multiprocessing.Process(target=self._child)
for i in xrange(num_processes)]
try:
for p in processes:
p.start()
super(MultiprocessRefactoringTool, self).refactor(items, write,
doctests_only)
finally:
self.queue.join()
for i in xrange(num_processes):
self.queue.put(None)
for p in processes:
if p.is_alive():
p.join()
self.queue = None
def __init__(self, maxsize=0):
'''initialize the queue'''
self.mutex = multiprocessing.Lock()
self.not_empty = multiprocessing.Condition(self.mutex)
self.not_full = multiprocessing.Condition(self.mutex)
self.maxsize = maxsize
self._tags = {} # list of refid's for each tag
self._queue = {} # the actual queue data
self._refcount = {} # how many tags refer to a given refid in the queue
self.id_generator = id_generator()
def __init__(self, num_processes=1, tasks_per_requests=1, bitmap_size=(64 << 10)):
self.to_update_queue = multiprocessing.Queue()
self.to_master_queue = multiprocessing.Queue()
self.to_master_from_mapserver_queue = multiprocessing.Queue()
self.to_master_from_slave_queue = multiprocessing.Queue()
self.to_mapserver_queue = multiprocessing.Queue()
self.to_slave_queues = []
for i in range(num_processes):
self.to_slave_queues.append(multiprocessing.Queue())
self.slave_locks_A = []
self.slave_locks_B = []
for i in range(num_processes):
self.slave_locks_A.append(multiprocessing.Lock())
self.slave_locks_B.append(multiprocessing.Lock())
self.slave_locks_B[i].acquire()
self.reload_semaphore = multiprocessing.Semaphore(multiprocessing.cpu_count()/2)
self.num_processes = num_processes
self.tasks_per_requests = tasks_per_requests
self.stage_abortion_notifier = multiprocessing.Value('b', False)
self.slave_termination = multiprocessing.Value('b', False, lock=False)
self.sampling_failed_notifier = multiprocessing.Value('b', False)
self.effector_mode = multiprocessing.Value('b', False)
self.files = ["/dev/shm/kafl_fuzzer_master_", "/dev/shm/kafl_fuzzer_mapserver_", "/dev/shm/kafl_fuzzer_bitmap_"]
self.sizes = [(65 << 10), (65 << 10), bitmap_size]
self.tmp_shm = [{}, {}, {}]
def __init__(self, blocks=None):
self.blocks_lock = Lock()
self.unconfirmed_transactions_lock = Lock()
self.unconfirmed_transactions = Manager().list
if blocks is None:
genesis_block = self.get_genesis_block()
self.add_block(genesis_block)
else:
for block in blocks:
self.add_block(block)
def worker_with(lock, stream):
with lock:
stream.write('Lock acquired via with\n')
def worker_no_with(lock, stream):
lock.acquire()
try:
stream.write('Lock acquired directly\n')
finally:
lock.release()
def __init__(self):
super(ActivePool, self).__init__()
self.mgr = multiprocessing.Manager()
self.active = self.mgr.list()
self.lock = multiprocessing.Lock()
def __init__(self, database):
self._handle = open(database, 'rb')
self._size = os.fstat(self._handle.fileno()).st_size
if not hasattr(os, 'pread'):
self._lock = Lock()
def __init__(self, opt, data_loader=None, cands=None, shared=None, **kwargs):
# super() call initiates stream in self.data by calling _load()
super().__init__(opt, data_loader, cands, shared, **kwargs)
self.cycle = kwargs['cycle'] if 'cycle' in kwargs else True
if shared:
# auxiliary instances hold pointer to main datastream (in self.data)
self.reset_data = shared['reset']
# Share datafile and data_loader for computing num_exs and num_eps
self.datafile = shared['datafile']
self.data_loader = shared['data_loader']
if 'lock' in shared:
self.lock = shared['lock']
else:
# main instance holds the stream and shares pointer to it
self.data_loader = data_loader
self.datafile = opt['datafile']
self.reset_data = None
self.is_reset = True
if opt.get('numthreads', 1) > 1:
print('WARNING: multithreaded steaming will process every '
'example numthreads times.')
self.lock = Lock()
self.entry_idx = 0
self.next_episode = None
self.num_eps = None
self.num_exs = None
def __init__(self):
self.lock = multiprocessing.Lock()
self.readers_condition = multiprocessing.Condition(self.lock)
self.writer_condition = multiprocessing.Condition(self.lock)
self.readers = multiprocessing.RawValue(ctypes.c_uint, 0)
self.writer = multiprocessing.RawValue(ctypes.c_bool, False)
def __init__(self, player, bot):
self._song = None
self._player = player
self._bot = bot
self._lock = Lock()
def __init__(self, out, cacheline = 100, flushnow = 0):
self.out = out
self.cacheline = cacheline
self.flushnow = flushnow
self.lock = multiprocessing.Lock()
self.filter = []
self.__cache = []
def __init__(self, ndata, nprocs):
self._ndata = mp.RawValue(ctypes.c_int, ndata)
self._start = mp.RawValue(ctypes.c_int, 0)
self._lock = mp.Lock()
min_chunk = ndata // nprocs
min_chunk = ndata if min_chunk <= 2 else min_chunk
self._chunk = min_chunk
def __init__(self, dstconf, transport_class, transport_kwargs):
super(PupyAsyncStream, self).__init__()
self.active=True
#buffers for streams
self.buf_in=Buffer()
self.buf_out=Buffer()
self.buf_tmp=Buffer()
self.cookie=''.join(random.SystemRandom().choice("abcdef0123456789") for _ in range(32))
self.buf_in.cookie=self.cookie
self.buf_out.cookie=self.cookie
self.buf_tmp.cookie=self.cookie
#buffers for transport
self.upstream=Buffer(transport_func=addGetPeer(("127.0.0.1", 443)))
self.downstream=Buffer(transport_func=addGetPeer(("127.0.0.1", 443)))
self.upstream_lock=multiprocessing.Lock()
self.downstream_lock=multiprocessing.Lock()
self.transport=transport_class(self, **transport_kwargs)
self.max_pull_interval=2
self.pull_interval=0
self.pull_event=multiprocessing.Event()
self.MAX_IO_CHUNK=32000*100 #3Mo because it is a async transport
self.client_side=self.transport.client
if self.client_side:
self.poller_thread=multiprocessing.Process(target=self.poller_loop)
self.poller_thread.daemon=True
self.poller_thread.start()
self.on_connect()
def __init__(self, transport_class, transport_kwargs):
self.bufin=Buffer(transport_func=addGetPeer(("127.0.0.1", 443)))
self.bufout=Buffer(transport_func=addGetPeer(("127.0.0.1", 443)))
self.upstream=Buffer(transport_func=addGetPeer(("127.0.0.1", 443)))
self.downstream=Buffer(transport_func=addGetPeer(("127.0.0.1", 443)))
self.transport=transport_class(self, **transport_kwargs)
self.lockin=multiprocessing.Lock()
self.lockout=multiprocessing.Lock()