def add(self, message):
if isinstance(message, singer.RecordMessage):
stream = self.ensure_stream(message.stream)
if stream.latest_schema:
validator_fn = extend_with_default(Draft4Validator)
validator = validator_fn(
stream.latest_schema, format_checker=FormatChecker())
validator.validate(copy.deepcopy(message.record))
else:
print('I saw a record for stream {} before the schema'.format(
message.stream))
exit(1)
stream.num_records += 1
elif isinstance(message, singer.SchemaMessage):
stream = self.ensure_stream(message.stream)
stream.num_schemas += 1
stream.latest_schema = message.schema
elif isinstance(message, singer.StateMessage):
self.latest_state = message.value
self.num_states += 1
评论列表
文章目录