def populate(filename, max_workers, max_threads, chunk_size, bucket=S3_BLOCKS_BUCKET):
with open(filename, mode='r') as f:
chunks = chunkify(f, chunksize=chunk_size)
start = time.perf_counter()
func = functools.partial(upload_blocks, bucket, chunk_size, max_threads)
counter = 0
samples = 20
chunk_rates = deque(maxlen=samples)
actual_rates = deque(maxlen=samples)
overheads = deque(maxlen=samples)
with Pool(processes=max_workers) as pool:
results = pool.imap_unordered(func, chunks, chunksize=1)
for count, rate in results:
elapsed = int(time.perf_counter() - start)
counter += count
chunk_rates.append(rate)
avg_chunk_rate = int(sum(chunk_rates)/samples)
perfect = avg_chunk_rate * max_workers
actual = int(counter / elapsed)
actual_rates.append(actual)
avg_actual_rate = int(sum(actual_rates)/samples)
overhead = int(100 - ((actual / perfect) * 100))
overheads.append(overhead)
avg_overhead = int(sum(overheads)/samples)
report_progress(counter, avg_chunk_rate, avg_actual_rate, avg_overhead)
end = time.perf_counter()
complete = time.perf_counter()
print('master scheduling time:%s complete_time:%s b/s: %s' % (
end - start, complete - start, 1 / ((complete - start) / chunk_size)
))
评论列表
文章目录