def capture_new_data_pipeline_messages(topic):
"""contextmanager that moves to the tail of the given topic, and waits to
receive new messages, returning a function that can be called zero or more
times which will retrieve decoded data pipeline messages from the topic.
Returns:
Callable[[int], List[Message]]: Function that takes a single
optional argument, count, and returns up to count decoded data pipeline
messages. This function does not block, and will return however many
messages are available immediately. Default count is 100.
"""
with capture_new_messages(topic) as get_kafka_messages:
def get_data_pipeline_messages(count=100):
kafka_messages = get_kafka_messages(count)
return [
create_from_offset_and_message(kafka_message)
for kafka_message in kafka_messages
]
yield get_data_pipeline_messages
评论列表
文章目录