def test_serialize_and_deserialize_2():
transcoder = UnifiedTranscoder(json_encoder_cls=DjangoJSONEncoder)
# test serialize
updated = SampleAggregate.Updated(entity_id='b089a0a6-e0b3-480d-9382-c47f99103b3d', entity_version=10, attr1='val1',
attr2='val2', metadata={'command_id': 123})
updated_stored_event = transcoder.serialize(updated)
assert updated_stored_event.event_type == 'overridden_event_type'
assert updated_stored_event.event_version == 1
assert updated_stored_event.event_data == '{"attr1":"val1","attr2":"val2"}'
assert updated_stored_event.aggregate_id == 'b089a0a6-e0b3-480d-9382-c47f99103b3d'
assert updated_stored_event.aggregate_version == 10
assert updated_stored_event.aggregate_type == 'SampleAggregate'
assert updated_stored_event.module_name == 'djangoevents.tests.test_unifiedtranscoder'
assert updated_stored_event.class_name == 'SampleAggregate.Updated'
assert updated_stored_event.metadata == '{"command_id":123}'
# test deserialize
updated_copy = transcoder.deserialize(updated_stored_event)
assert 'metadata' not in updated_copy.__dict__
updated.__dict__.pop('metadata') # metadata is not included in deserialization
assert updated.__dict__ == updated_copy.__dict__
test_unifiedtranscoder.py 文件源码
python
阅读 17
收藏 0
点赞 0
评论 0
评论列表
文章目录