def add_many(self, name, timestamp_pairs, chunks_size=2000, *args, **kwargs):
"""
:param name:
:param timestamp_pairs: [("timestamp",data)]
:param chunks_size:
:param args:
:param kwargs:
:return:
"""
incr_key = self.incr_format.format(key=name)
hash_key = self.hash_format.format(key=name)
# remove exist data
# todo maybe other way to optimize this filter code
sorted_timestamps = sorted(timestamp_pairs, key=itemgetter(0))
max_timestamp = sorted_timestamps[-1][0] # max
min_timestamp = sorted_timestamps[0][0] # min
filter_data = self.get_slice(name, start=min_timestamp, end=max_timestamp)
if filter_data:
timestamp_set = set(map(lambda x: x[0], filter_data))
filter_results = itertools.filterfalse(lambda x: x[0] in timestamp_set, sorted_timestamps)
else:
filter_results = sorted_timestamps
chunks_data = helper.chunks(filter_results, chunks_size)
with self._pipe_acquire() as pipe:
for chunks in chunks_data:
start_id = self.client.get(incr_key) or 1 # if key not exist id equal 0
end_id = self.client.incrby(incr_key, amount=len(chunks)) # incr the add length
start_id = int(start_id)
end_id = int(end_id)
ids_range = range(start_id, end_id)
dumps_results = map(lambda x: (x[0], self.serializer.dumps(x[1])), chunks)
mix_data = itertools.zip_longest(dumps_results, ids_range) # [(("timestamp",data),id),...]
mix_data = list(mix_data) # need converted as list
timestamp_ids = map(lambda seq: (seq[0][0], seq[1]), mix_data) # [("timestamp",id),...]
ids_pairs = map(lambda seq: (seq[1], seq[0][1]), mix_data) # [("id",data),...]
timestamp_ids = itertools.chain.from_iterable(timestamp_ids)
ids_values = {k: v for k, v in ids_pairs}
pipe.multi()
pipe.zadd(name, *timestamp_ids)
pipe.hmset(hash_key, ids_values)
pipe.execute()
评论列表
文章目录