def test_process_blob(self):
client_factory_class = build_prism_blob_client_factory
# start client
from twisted.internet import reactor
reactor.addSystemEventTrigger('before','shutdown', self._on_finish_blob)
try:
process_blob(BLOB_HASH, self.client_db_dir, client_factory_class, 'fake', host_infos=('localhost',5566,0),
setup_d = self._setup_client_blobs)
except SystemExit:
pass
# tell server process to stop
self.server_queue.put('stop')
# check client variables
self.assertEqual(1, self.blob_exists)
self.assertEqual(1, self.blob_has_been_forwarded)
# file should be removed from client, because it was sent to server
self.assertFalse(os.path.isfile(get_blob_path(BLOB_HASH, self.client_storage)))
# check expected variables we should received from server
server_results = self.client_queue.get()
self.assertEqual(BLOB_CONTENT, server_results[0]['blob_content'])
self.assertEqual(1, server_results[0]['blob_exists'])
评论列表
文章目录