job.py 文件源码

python
阅读 31 收藏 0 点赞 0 评论 0

项目:python_mozetl 作者: mozilla 项目源码 文件源码
def extract(main_summary, new_profile, start_ds, period, slack, is_sampled):
    """
    Extract data from the main summary table taking into account the
    retention period and submission latency.

    :param main_summary: dataframe pointing to main_summary.v4
    :param new_profile:  dataframe pointing to new_profile_ping_parquet
    :param start_ds:     start date of the retention period
    :param period:       length of the retention period
    :param slack:        slack added to account for submission latency
    :return:             a dataframe containing the raw subset of data
    """
    start = arrow.get(start_ds, DS_NODASH)

    predicates = [
        (F.col("subsession_start_date") >= utils.format_date(start, DS)),
        (F.col("subsession_start_date") < utils.format_date(start, DS, period)),
        (F.col("submission_date_s3") >= utils.format_date(start, DS_NODASH)),
        (F.col("submission_date_s3") < utils.format_date(start, DS_NODASH,
                                                         period + slack)),
    ]

    if is_sampled:
        predicates.append((F.col("sample_id") == "57"))

    extract_ms = (
        main_summary
        .where(reduce(operator.__and__, predicates))
        .select(SOURCE_COLUMNS)
    )

    np = clean_new_profile(new_profile)
    extract_np = (
        np
        .where(reduce(operator.__and__, predicates))
        .select(SOURCE_COLUMNS)
    )

    coalesced_ms = coalesce_new_profile_attribution(extract_ms, np)

    return coalesced_ms.union(extract_np)
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号