check_tap.py 文件源码

python
阅读 28 收藏 0 点赞 0 评论 0

项目:singer-tools 作者: singer-io 项目源码 文件源码
def add(self, message):
        if isinstance(message, singer.RecordMessage):
            stream = self.ensure_stream(message.stream)
            if stream.latest_schema:
                validator_fn = extend_with_default(Draft4Validator)
                validator = validator_fn(
                    stream.latest_schema, format_checker=FormatChecker())
                validator.validate(copy.deepcopy(message.record))
            else:
                print('I saw a record for stream {} before the schema'.format(
                    message.stream))
                exit(1)
            stream.num_records += 1

        elif isinstance(message, singer.SchemaMessage):
            stream = self.ensure_stream(message.stream)
            stream.num_schemas += 1
            stream.latest_schema = message.schema

        elif isinstance(message, singer.StateMessage):
            self.latest_state = message.value
            self.num_states += 1
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号