def status_iter(iterable, callback, chunksize=1, reportsize=10):
itersize = len(iterable)
starttime = time.time()
for i, item in enumerate(util.chunks(iterable, chunksize), 1):
callback(item)
if i % reportsize == 0:
done = i * chunksize
nowtime = time.time()
numblocks = itersize * 1.0 / (reportsize*chunksize)
curblock = done / (reportsize*chunksize)
position = curblock / numblocks
duration = round(nowtime - starttime)
durdelta = datetime.timedelta(seconds=duration)
remaining = round((duration / position) - duration)
remdelta = datetime.timedelta(seconds=remaining)
lookuplog.info("Done %s/%s in %s; %s remaining", done, itersize, str(durdelta), str(remdelta))
lookuplog.info("Finished")
python类chunks()的实例源码
def train(self, X_train, X_val):
train_true = filter(lambda x: x[2]==1, X_train)
train_false = filter(lambda x: x[2]==0, X_train)
val_true = filter(lambda x: x[2]==1, X_val)
val_false = filter(lambda x: x[2]==0, X_val)
n_train_true = len(train_true)
n_val_true = len(val_true)
make_epoch_helper = functools.partial(make_epoch, train_true=train_true, train_false=train_false, val_true=val_true, val_false=val_false)
logging.info("Starting training...")
epoch_iterator = ParallelBatchIterator(make_epoch_helper, range(P.N_EPOCHS), ordered=False, batch_size=1, multiprocess=False, n_producers=1)
for epoch_values in epoch_iterator:
self.pre_epoch()
train_epoch_data, val_epoch_data = epoch_values
train_epoch_data = util.chunks(train_epoch_data, P.BATCH_SIZE_TRAIN)
val_epoch_data = util.chunks(val_epoch_data, P.BATCH_SIZE_VALIDATION)
self.do_batches(self.train_fn, train_epoch_data, self.train_metrics)
self.do_batches(self.val_fn, val_epoch_data, self.val_metrics)
self.post_epoch()
logging.info("Setting learning rate to {}".format(P.LEARNING_RATE * ((0.985)**self.epoch)))
self.l_r.set_value(P.LEARNING_RATE * ((0.985)**self.epoch))
def train(self, X_train, X_val):
train_true = filter(lambda x: x[2]==1, X_train)
train_false = filter(lambda x: x[2]==0, X_train)
val_true = filter(lambda x: x[2]==1, X_val)
val_false = filter(lambda x: x[2]==0, X_val)
n_train_true = len(train_true)
n_val_true = len(val_true)
make_epoch_helper = functools.partial(make_epoch, train_true=train_true, train_false=train_false, val_true=val_true, val_false=val_false)
logging.info("Starting training...")
epoch_iterator = ParallelBatchIterator(make_epoch_helper, range(P.N_EPOCHS), ordered=False, batch_size=1, multiprocess=False, n_producers=1)
for epoch_values in epoch_iterator:
self.pre_epoch()
train_epoch_data, val_epoch_data = epoch_values
train_epoch_data = util.chunks(train_epoch_data, P.BATCH_SIZE_TRAIN)
val_epoch_data = util.chunks(val_epoch_data, P.BATCH_SIZE_VALIDATION)
self.do_batches(self.train_fn, train_epoch_data, self.train_metrics)
self.do_batches(self.val_fn, val_epoch_data, self.val_metrics)
self.post_epoch()
logging.info("Setting learning rate to {}".format(P.LEARNING_RATE * ((0.985)**self.epoch)))
self.l_r.set_value(P.LEARNING_RATE * ((0.985)**self.epoch))
def _start_producers(self, result_queue):
jobs = Queue()
n_workers = self.n_producers
batch_count = 0
# Flag used for keeping values in queue in order
last_queued_job = Value('i', -1)
chunks = util.chunks(self.X,self.batch_size)
# Add jobs to queue
for job_index, X_batch in enumerate(chunks):
batch_count += 1
jobs.put( (job_index,X_batch) )
# Add poison pills to queue (to signal workers to stop)
for i in xrange(n_workers):
jobs.put((-1,None))
# Define producer function
produce = partial(_produce_helper,
generator=self.generator,
jobs=jobs,
result_queue=result_queue,
last_queued_job=last_queued_job,
ordered=self.ordered)
# Start worker processes or threads
for i in xrange(n_workers):
name = "ParallelBatchIterator worker {0}".format(i)
if self.multiprocess:
p = Process(target=produce, args=(i,), name=name)
else:
p = Thread(target=produce, args=(i,), name=name)
# Make the process daemon, so the main process can die without these finishing
#p.daemon = True
p.start()
return batch_count, jobs
def lookup():
""" returns (done, remaining)"""
songs = db.data.get_pending_songs()
songcount = db.data.get_count_pending_songs()
if not songs:
return (0, 0)
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
# Start the load operations and mark each future with its URL
i = 0
future_to_song = {}
for songchunk in util.chunks(songs, 10):
future_to_song[executor.submit(query, songchunk, i)] = songchunk
i = 1 - i
for future in concurrent.futures.as_completed(future_to_song):
songchunk = future_to_song[future]
# For each set of songs, get them from the response
# for songs not in the response, add an empty response
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (songchunk, exc))
else:
gotsongs = set()
waitings = set(songchunk)
results = data["response"].get("songs", [])
for s in results:
songid = s["id"]
gotsongs.add(songid)
response = {"response": {"songs": [s], "status": data["response"]["status"]}}
db.data.add_response_if_not_exists(echonest.SONG_PROFILE, songid, response)
nosongs = waitings-gotsongs
for s in list(nosongs):
db.data.add_response_if_not_exists(echonest.SONG_PROFILE, s, {})
return (len(songs), songcount-len(songs))
def _start_producers(self, result_queue):
jobs = Queue()
n_workers = self.n_producers
batch_count = 0
# Flag used for keeping values in queue in order
last_queued_job = Value('i', -1)
chunks = util.chunks(self.X,self.batch_size)
# Add jobs to queue
for job_index, X_batch in enumerate(chunks):
batch_count += 1
jobs.put( (job_index,X_batch) )
# Add poison pills to queue (to signal workers to stop)
for i in xrange(n_workers):
jobs.put((-1,None))
# Define producer function
produce = partial(_produce_helper,
generator=self.generator,
jobs=jobs,
result_queue=result_queue,
last_queued_job=last_queued_job,
ordered=self.ordered)
# Start worker processes or threads
for i in xrange(n_workers):
name = "ParallelBatchIterator worker {0}".format(i)
if self.multiprocess:
p = Process(target=produce, args=(i,), name=name)
else:
p = Thread(target=produce, args=(i,), name=name)
# Make the process daemon, so the main process can die without these finishing
#p.daemon = True
p.start()
return batch_count, jobs