def save_emblems_field(
self, emblem_with_field_list, field_name, index=True):
total_len = len(emblem_with_field_list)
self.logger.info('Saving field [%s], total=%d', field_name, total_len)
workers = (multiprocessing.cpu_count() or 1)
emblem_freq_chunks = MapReduceDriver.chunks(
emblem_with_field_list, int(total_len / workers))
if index:
self.data_source.create_index(
self.COLLECTION_EMBLEM, 'name', unique=True)
self.data_source.create_index(
self.COLLECTION_EMBLEM, field_name)
field = emblem_with_field_list[0][1]
if isinstance(field, dict):
for key in field.keys():
self.data_source.create_index(
self.COLLECTION_EMBLEM, field_name + '.' + key)
with multiprocessing.Pool(processes=workers) as pool:
pool.starmap(
self._save_emblems_field,
zip(emblem_freq_chunks, repeat(field_name)))
评论列表
文章目录