def extract(main_summary, new_profile, start_ds, period, slack, is_sampled):
"""
Extract data from the main summary table taking into account the
retention period and submission latency.
:param main_summary: dataframe pointing to main_summary.v4
:param new_profile: dataframe pointing to new_profile_ping_parquet
:param start_ds: start date of the retention period
:param period: length of the retention period
:param slack: slack added to account for submission latency
:return: a dataframe containing the raw subset of data
"""
start = arrow.get(start_ds, DS_NODASH)
predicates = [
(F.col("subsession_start_date") >= utils.format_date(start, DS)),
(F.col("subsession_start_date") < utils.format_date(start, DS, period)),
(F.col("submission_date_s3") >= utils.format_date(start, DS_NODASH)),
(F.col("submission_date_s3") < utils.format_date(start, DS_NODASH,
period + slack)),
]
if is_sampled:
predicates.append((F.col("sample_id") == "57"))
extract_ms = (
main_summary
.where(reduce(operator.__and__, predicates))
.select(SOURCE_COLUMNS)
)
np = clean_new_profile(new_profile)
extract_np = (
np
.where(reduce(operator.__and__, predicates))
.select(SOURCE_COLUMNS)
)
coalesced_ms = coalesce_new_profile_attribution(extract_ms, np)
return coalesced_ms.union(extract_np)
评论列表
文章目录