def test_cell(nb_test_case, ws, code_cell, code_cell_count):
print("\n--- {}) {}\n{}".format(code_cell_count, code_cell,
code_cell.get_source_for_execution()))
code_source = code_cell.get_source_for_execution()
ws.send(ElyraClient.new_code_message(code_source))
target_queue = code_cell.get_target_output_type_queue()
if code_cell.is_output_empty():
# If output empty, waiting for "execute_input" type message
target_queue = deque(["execute_input"])
try:
json_output_message = ElyraClient.receive_target_messages(ws, target_queue)
except websocket.WebSocketTimeoutException as wste:
print("Fail ===================================\nRequest timed out.")
return 1
except Exception as e:
print("FATAL ===================================\n{}.".format(e))
raise e
result_cell = NBCodeCell(message_json_list=json_output_message)
return NBCodeEntity.compare_results(nb_test_case, code_cell, result_cell)
评论列表
文章目录