main.py 文件源码

python
阅读 37 收藏 0 点赞 0 评论 0

项目:CerebralCortex-2.0-legacy 作者: MD2Korg 项目源码 文件源码
def migrate(folder_path: str, data_block_size):
    """
    Migrate data from old CerebralCortex structure to new CerebralCortex structure
    :param folder_path:
    """

    configuration_file = os.path.join(os.path.dirname(__file__), '../../cerebralcortex.yml')
    CC = CerebralCortex(configuration_file, master="local[*]", name="Data Migrator API", time_zone="US/Central", load_spark=True)

    if not folder_path:
        raise ValueError("Path to the data directory cannot be empty.")

    for filename in glob.iglob(folder_path + '/**/*.json', recursive=True):
        print(str(datetime.datetime.now()) + " -- Started processing file " + filename)

        tmp = filename.split("/")
        tmp = tmp[len(tmp) - 1].split("+")
        owner_id = tmp[0]
        stream_id = str(uuid.uuid3(uuid.NAMESPACE_DNS, str(tmp[0] + " " + tmp[1])))

        name = ''
        for i in tmp[3:]:
            name += i + " "

        name = name.strip().replace(".json", "")
        name = tmp[1] + " " + name

        pm_algo_name = tmp[2]

        data_filename = filename.replace(".json", ".csv.bz2")
        old_schema = read_file(filename)
        execution_context = get_execution_context(pm_algo_name, old_schema)
        data_descriptor = get_data_descriptor(old_schema)
        annotations = get_annotations()
        print(str(datetime.datetime.now()) + " -- Schema building is complete ")
        print(str(datetime.datetime.now()) + " -- Started unzipping file and adding records in Cassandra ")
        for data_block in bz2file_to_datapoints(data_filename, data_block_size):
            persist_data(execution_context, data_descriptor, annotations, stream_id, name, owner_id, data_block, CC)
        print(str(datetime.datetime.now()) + " -- Completed processing file " + filename)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号