def _batches(self):
"""
Partition the data into consecutive data sets of the specified batch size.
:return: batched data
:rtype: DataFrame iterator
"""
t1 = partition_all(self.batch_size, self.data[text_1])
t2 = partition_all(self.batch_size, self.data[text_2])
if self._labeled:
l = partition_all(self.batch_size, self.data[label].cat.codes)
batches = zip(t1, t2, l)
else:
batches = zip(t1, t2)
for batch in batches:
if self._labeled:
columns = [text_1, text_2, label]
else:
columns = [text_1, text_2]
yield DataFrame(dict(zip(columns, batch)), columns=columns)
python类partition_all()的实例源码
def scrape_blockchain(mongo):
s = Steem()
# see how far behind we are
missing = list(range(last_block_num(mongo), s.last_irreversible_block_num))
# if we are far behind blockchain head
# split work in chunks of 100
if len(missing) > 100:
for batch in partition_all(100, missing):
results = s.get_blocks(batch)
insert_blocks(mongo, results)
# otherwise continue as normal
blockchain = Blockchain(mode="irreversible")
hist = blockchain.stream_from(start_block=last_block_num(mongo), full_blocks=True)
insert_blocks(mongo, hist)
def main(in_dir, out_dir, n_process=int(multiprocessing.cpu_count() * .75), n_thread=4, batch_size=10000):
# Create the output directory, if it doesn't exist
if not path.exists(out_dir):
makedirs(out_dir)
# Get total number of input files for tracking progress
total_files = len(list(iter_dir(in_dir)))
# For each input file
for i, file in enumerate(iter_dir(in_dir)):
# Print progress
print('Tagging file %s of %s' % (i + 1, total_files))
# If multiprocessing
if n_process >= 2:
# Split up text in the input file
texts = partition_all(100000, iter_lines(file))
# Parallelize the job
parallelize(save_parses, enumerate(texts),
n_process, [out_dir, n_thread, batch_size],
backend='multiprocessing')
# If not multiprocessing
else:
save_parses(0, iter_lines(file), out_dir, n_thread, batch_size)
def sync_from_file(file_path, skip_lines, chunk_size=250, is_initial_sync=False):
with open(file_path) as f:
# each line in file represents one block
# we can skip the blocks we already have
remaining = drop(skip_lines, f)
for batch in partition_all(chunk_size, remaining):
process_blocks(map(json.loads, batch), is_initial_sync)
def compute_date_range_chunks(sessions, start_date, end_date, chunksize):
"""Compute the start and end dates to run a pipeline for.
Parameters
----------
sessions : DatetimeIndex
The available dates.
start_date : pd.Timestamp
The first date in the pipeline.
end_date : pd.Timestamp
The last date in the pipeline.
chunksize : int or None
The size of the chunks to run. Setting this to None returns one chunk.
Returns
-------
ranges : iterable[(np.datetime64, np.datetime64)]
A sequence of start and end dates to run the pipeline for.
"""
if start_date not in sessions:
raise KeyError("Start date %s is not found in calendar." %
(start_date.strftime("%Y-%m-%d"),))
if end_date not in sessions:
raise KeyError("End date %s is not found in calendar." %
(end_date.strftime("%Y-%m-%d"),))
if end_date < start_date:
raise ValueError("End date %s cannot precede start date %s." %
(end_date.strftime("%Y-%m-%d"),
start_date.strftime("%Y-%m-%d")))
if chunksize is None:
return [(start_date, end_date)]
start_ix, end_ix = sessions.slice_locs(start_date, end_date)
return (
(r[0], r[-1]) for r in partition_all(
chunksize, sessions[start_ix:end_ix]
)
)
def optimize(model, sampler, train, valid):
"""
Optimize the model. TODO: implement early-stopping
:param model: model to optimize
:param sampler: mini-batch sampler
:param train: train user-item matrix
:param valid: validation user-item matrix
:return: None
"""
sess = tf.Session()
sess.run(tf.global_variables_initializer())
if model.feature_projection is not None:
# initialize item embedding with feature projection
sess.run(tf.assign(model.item_embeddings, model.feature_projection))
while True:
# create evaluator on validation set
validation_recall = RecallEvaluator(train, valid)
# compute recall on validate set
valid_recalls = []
# sample some users to calculate recall validation
valid_users = list(set(valid.nonzero()[0]))[:300]
for user_chunk in toolz.partition_all(300, valid_users):
scores = sess.run(model.item_scores, {model.score_user_ids: user_chunk})
valid_recalls.extend([validation_recall.eval(user, user_scores)
for user, user_scores in zip(user_chunk, scores)]
)
print("\nRecall on (sampled) validation set: {}".format(numpy.mean(valid_recalls)))
# TODO: early stopping based on validation recall
# train model
losses = []
# run n mini-batches
for _ in tqdm(range(EVALUATION_EVERY_N_BATCHES), desc="Optimizing..."):
user_pos, neg = sampler.next_batch()
_, loss = sess.run((model.optimize, model.loss),
{model.user_positive_items_pairs: user_pos,
model.negative_samples: neg})
losses.append(loss)
print("\nTraining loss {}".format(numpy.mean(losses)))
def optimize(model, sampler, train, valid):
"""
Optimize the model. TODO: implement early-stopping
:param model: model to optimize
:param sampler: mini-batch sampler
:param train: train user-item matrix
:param valid: validation user-item matrix
:return: None
"""
sess = tf.Session()
sess.run(tf.global_variables_initializer())
if model.feature_projection is not None:
# initialize item embedding with feature projection
sess.run(tf.assign(model.item_embeddings, model.feature_projection))
# sample some users to calculate recall validation
valid_users = numpy.random.choice(list(set(valid.nonzero()[0])), size=1000, replace=False)
while True:
# create evaluator on validation set
validation_recall = RecallEvaluator(model, train, valid)
# compute recall on validate set
valid_recalls = []
# compute recall in chunks to utilize speedup provided by Tensorflow
for user_chunk in toolz.partition_all(100, valid_users):
valid_recalls.extend([validation_recall.eval(sess, user_chunk)])
print("\nRecall on (sampled) validation set: {}".format(numpy.mean(valid_recalls)))
# TODO: early stopping based on validation recall
# train model
losses = []
# run n mini-batches
for _ in tqdm(range(EVALUATION_EVERY_N_BATCHES), desc="Optimizing..."):
user_pos, neg = sampler.next_batch()
_, loss = sess.run((model.optimize, model.loss),
{model.user_positive_items_pairs: user_pos,
model.negative_samples: neg})
losses.append(loss)
print("\nTraining loss {}".format(numpy.mean(losses)))