def run(self):
self.stream = self.sink.input_streams[0]
axes = self.stream.descriptor.axes
num_axes = len(axes)
totals = [self.stream.descriptor.num_points_through_axis(axis) for axis in range(num_axes)]
chunk_sizes = [max(1,self.stream.descriptor.num_points_through_axis(axis+1)) for axis in range(num_axes)]
self.num = min(self.num, num_axes)
self.bars = []
for i in range(self.num):
if self.notebook:
self.bars.append(tqdm_notebook(total=totals[i]/chunk_sizes[i]))
else:
self.bars.append(tqdm(total=totals[i]/chunk_sizes[i]))
self.w_id = 0
while True:
if self.stream.done() and self.w_id==self.stream.num_points():
break
new_data = np.array(await self.stream.queue.get()).flatten()
while self.stream.queue.qsize() > 0:
new_data = np.append(new_data, np.array(self.stream.queue.get_nowait()).flatten())
self.w_id += new_data.size
num_data = self.stream.points_taken
for i in range(self.num):
if num_data == 0:
if self.notebook:
self.bars[i].sp(close=True)
# Reset the progress bar with a new one
self.bars[i] = tqdm_notebook(total=totals[i]/chunk_sizes[i])
else:
# Reset the progress bar with a new one
self.bars[i].close()
self.bars[i] = tqdm(total=totals[i]/chunk_sizes[i])
pos = int(10*num_data / chunk_sizes[i])/10.0 # One decimal is good enough
if pos > self.bars[i].n:
self.bars[i].update(pos - self.bars[i].n)
num_data = num_data % chunk_sizes[i]
评论列表
文章目录