test_blob_task.py 文件源码

python
阅读 19 收藏 0 点赞 0 评论 0

项目:reflector-cluster 作者: lbryio 项目源码 文件源码
def test_process_blob(self):
        client_factory_class = build_prism_blob_client_factory

        # start client
        from twisted.internet import reactor
        reactor.addSystemEventTrigger('before','shutdown', self._on_finish_blob)
        try:
            process_blob(BLOB_HASH, self.client_db_dir, client_factory_class, 'fake', host_infos=('localhost',5566,0),
                        setup_d = self._setup_client_blobs)
        except SystemExit:
            pass

        # tell server process to stop
        self.server_queue.put('stop')

        # check client variables
        self.assertEqual(1, self.blob_exists)
        self.assertEqual(1, self.blob_has_been_forwarded)
        # file should be removed from client, because it was sent to server
        self.assertFalse(os.path.isfile(get_blob_path(BLOB_HASH, self.client_storage)))

        # check expected variables we should received from server
        server_results = self.client_queue.get()
        self.assertEqual(BLOB_CONTENT, server_results[0]['blob_content'])
        self.assertEqual(1, server_results[0]['blob_exists'])
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号