def test_basic_usage(self):
""""""
pipp, pipc = Pipe()
pips, pipr = Pipe()
self.print_bar()
print('Test Task Interface')
ret_process = ProcessTask(id='test-1', target=testfun, args=(5,),
status_monitor_pipe=pipc,
result_pipe=pips,
result_hook_function=result_callback)
ret_process.start()
print('Test get threads status')
time.sleep(1)
#print(ret_process.subthreads_count)
threads_status = pipp.recv()
self.assertIsInstance(threads_status, dict)
#print pipr.recv()
#print pipr.recv()
#print pipr.recv()
#print pipr.recv()
self.print_end_bar()
评论列表
文章目录