redis.py 文件源码

python
阅读 38 收藏 0 点赞 0 评论 0

项目:quantbube 作者: nooperpudd 项目源码 文件源码
def add_many(self, name, timestamp_pairs, chunks_size=2000, *args, **kwargs):
        """
        :param name:
        :param timestamp_pairs: [("timestamp",data)]
        :param chunks_size:
        :param args:
        :param kwargs:
        :return:
        """
        incr_key = self.incr_format.format(key=name)
        hash_key = self.hash_format.format(key=name)

        # remove exist data

        # todo maybe other way to optimize this filter code
        sorted_timestamps = sorted(timestamp_pairs, key=itemgetter(0))

        max_timestamp = sorted_timestamps[-1][0]  # max
        min_timestamp = sorted_timestamps[0][0]  # min

        filter_data = self.get_slice(name, start=min_timestamp, end=max_timestamp)
        if filter_data:
            timestamp_set = set(map(lambda x: x[0], filter_data))
            filter_results = itertools.filterfalse(lambda x: x[0] in timestamp_set, sorted_timestamps)
        else:
            filter_results = sorted_timestamps
        chunks_data = helper.chunks(filter_results, chunks_size)

        with self._pipe_acquire() as pipe:
            for chunks in chunks_data:
                start_id = self.client.get(incr_key) or 1  # if key not exist id equal 0
                end_id = self.client.incrby(incr_key, amount=len(chunks))  # incr the add length

                start_id = int(start_id)
                end_id = int(end_id)

                ids_range = range(start_id, end_id)

                dumps_results = map(lambda x: (x[0], self.serializer.dumps(x[1])), chunks)

                mix_data = itertools.zip_longest(dumps_results, ids_range)  # [(("timestamp",data),id),...]
                mix_data = list(mix_data)  # need converted as list

                timestamp_ids = map(lambda seq: (seq[0][0], seq[1]), mix_data)  # [("timestamp",id),...]
                ids_pairs = map(lambda seq: (seq[1], seq[0][1]), mix_data)  # [("id",data),...]

                timestamp_ids = itertools.chain.from_iterable(timestamp_ids)
                ids_values = {k: v for k, v in ids_pairs}

                pipe.multi()
                pipe.zadd(name, *timestamp_ids)
                pipe.hmset(hash_key, ids_values)
                pipe.execute()
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号