def test_get_most_recent_batch_time(self):
filename = '%s.json' % str(uuid.uuid4())
file_path = os.path.join(self.test_resources_path, filename)
json_offset_specs = JSONOffsetSpecs(
path=self.test_resources_path,
filename=filename
)
app_name = "mon_metrics_kafka"
topic_1 = str(uuid.uuid4())
partition_1 = 0
until_offset_1 = random.randint(0, sys.maxsize)
from_offset_1 = random.randint(0, sys.maxsize)
my_batch_time = self.get_dummy_batch_time()
json_offset_specs.add(topic=topic_1, partition=partition_1,
app_name=app_name,
from_offset=from_offset_1,
until_offset=until_offset_1,
batch_time_info=my_batch_time)
most_recent_batch_time = (
json_offset_specs.get_most_recent_batch_time_from_offsets(
app_name, topic_1))
self.assertEqual(most_recent_batch_time, my_batch_time)
os.remove(file_path)
test_json_kafka_offsets.py 文件源码
python
阅读 24
收藏 0
点赞 0
评论 0
评论列表
文章目录