producer_test.py 文件源码

python
阅读 30 收藏 0 点赞 0 评论 0

项目:data_pipeline 作者: Yelp 项目源码 文件源码
def test_publish_pii_payload_data_message(
        self, pii_schema, example_payload_data, producer_instance
    ):
        with reconfigure(
            encryption_type='AES_MODE_CBC-1',
            skip_messages_with_pii=False
        ), producer_instance as producer:
            pii_message = CreateMessage(
                schema_id=pii_schema.schema_id,
                payload_data=example_payload_data
            )
            self._publish_and_assert_pii_message(pii_message, producer)
        assert len(multiprocessing.active_children()) == 0
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号