def __init__(self, log_client, shard_id, consumer_name, processor, cursor_position, cursor_start_time,
max_workers=2):
self.log_client = log_client
self.shard_id = shard_id
self.consumer_name = consumer_name
self.cursor_position = cursor_position
self.cursor_start_time = cursor_start_time
self.processor = processor
self.checkpoint_tracker = ConsumerCheckpointTracker(self.log_client, self.consumer_name,
self.shard_id)
self.executor = ThreadPoolExecutor(max_workers=max_workers)
self.consumer_status = ConsumerStatus.INITIALIZING
self.current_task_exist = False
self.task_future = None
self.fetch_data_future = None
self.next_fetch_cursor = ''
self.shutdown = False
self.last_fetch_log_group = None
self.last_log_error_time = 0
self.last_fetch_time = 0
self.last_fetch_count = 0
self.logger = logging.getLogger(__name__)
评论列表
文章目录