def test_skip_publish_pii_message(self, pii_schema, payload, producer_instance):
with reconfigure(
encryption_type='AES_MODE_CBC-1',
skip_messages_with_pii=True
), producer_instance as producer, mock.patch.object(
data_pipeline._kafka_producer,
'logger'
) as mock_logger:
pii_message = CreateMessage(
schema_id=pii_schema.schema_id,
payload=payload
)
messages = self._publish_message(pii_message, producer)
assert len(messages) == 0
assert len(multiprocessing.active_children()) == 0
call_args = (
"Skipping a PII message - uuid hex: {}, schema_id: {}, "
"timestamp: {}, type: {}"
).format(
pii_message.uuid_hex,
pii_message.schema_id,
pii_message.timestamp,
pii_message.message_type.name
)
assert mock_logger.info.call_args_list[0] == mock.call(call_args)
评论列表
文章目录