def main(start_date, end_date, bucket, prefix, input_bucket, input_prefix):
spark = (SparkSession
.builder
.appName("sync_bookmark")
.getOrCreate())
version = 1
input_path = "s3://{}/{}".format(input_bucket, input_prefix)
# use the airflow date convention
ds_format = "YYYYMMDD"
start = arrow.get(start_date, ds_format)
end = arrow.get(end_date if end_date else start_date, ds_format)
for date in arrow.Arrow.range('day', start, end):
current_date = date.format(ds_format)
logger.info("Processing sync bookmark validation for {}"
.format(current_date))
extract(spark, input_path, current_date)
transform(spark)
load(spark, bucket, prefix, version, current_date)
spark.stop()
评论列表
文章目录