def migrate(folder_path: str, data_block_size):
"""
Migrate data from old CerebralCortex structure to new CerebralCortex structure
:param folder_path:
"""
configuration_file = os.path.join(os.path.dirname(__file__), '../../cerebralcortex.yml')
CC = CerebralCortex(configuration_file, master="local[*]", name="Data Migrator API", time_zone="US/Central", load_spark=True)
if not folder_path:
raise ValueError("Path to the data directory cannot be empty.")
for filename in glob.iglob(folder_path + '/**/*.json', recursive=True):
print(str(datetime.datetime.now()) + " -- Started processing file " + filename)
tmp = filename.split("/")
tmp = tmp[len(tmp) - 1].split("+")
owner_id = tmp[0]
stream_id = str(uuid.uuid3(uuid.NAMESPACE_DNS, str(tmp[0] + " " + tmp[1])))
name = ''
for i in tmp[3:]:
name += i + " "
name = name.strip().replace(".json", "")
name = tmp[1] + " " + name
pm_algo_name = tmp[2]
data_filename = filename.replace(".json", ".csv.bz2")
old_schema = read_file(filename)
execution_context = get_execution_context(pm_algo_name, old_schema)
data_descriptor = get_data_descriptor(old_schema)
annotations = get_annotations()
print(str(datetime.datetime.now()) + " -- Schema building is complete ")
print(str(datetime.datetime.now()) + " -- Started unzipping file and adding records in Cassandra ")
for data_block in bz2file_to_datapoints(data_filename, data_block_size):
persist_data(execution_context, data_descriptor, annotations, stream_id, name, owner_id, data_block, CC)
print(str(datetime.datetime.now()) + " -- Completed processing file " + filename)
评论列表
文章目录