def etl_job(sc, sqlContext, submission_date=None, save=True):
s3_path = 's3n://telemetry-parquet/harter/privacy_prefs_shield/v2'
if submission_date is None:
submission_date = (date.today() - timedelta(1)).strftime("%Y%m%d")
pings = Dataset.from_source(
"telemetry"
).where(
docType="shield-study",
submissionDate=submission_date,
appName="Firefox",
).records(sc)
transformed_event_pings = transform_event_pings(sqlContext, pings)
transformed_state_pings = transform_state_pings(sqlContext, pings)
transformed_pings = transformed_event_pings.union(transformed_state_pings)
if save:
path = s3_path + '/submission_date={}'.format(submission_date)
transformed_pings.repartition(1).write.mode('overwrite').parquet(path)
return transformed_pings
评论列表
文章目录