def get(self):
is_cron = self.request.headers.get('X-Appengine-Cron', False)
# logging.info("is_cron is %s", is_cron)
# Comment out the following check to allow non-cron-initiated requests.
if not is_cron:
return 'Blocked.'
# These env vars are set in app.yaml.
PROJECT = os.environ['PROJECT']
BUCKET = os.environ['BUCKET']
TEMPLATE = os.environ['TEMPLATE_NAME']
# Because we're using the same job name each time, if you try to launch one
# job while another is still running, the second will fail.
JOBNAME = PROJECT + '-twproc-template'
credentials = GoogleCredentials.get_application_default()
service = build('dataflow', 'v1b3', credentials=credentials)
BODY = {
"jobName": "{jobname}".format(jobname=JOBNAME),
"gcsPath": "gs://{bucket}/templates/{template}".format(
bucket=BUCKET, template=TEMPLATE),
"parameters": {"timestamp": str(datetime.datetime.utcnow())},
"environment": {
"tempLocation": "gs://{bucket}/temp".format(bucket=BUCKET),
"zone": "us-central1-f"
}
}
dfrequest = service.projects().templates().create(
projectId=PROJECT, body=BODY)
dfresponse = dfrequest.execute()
logging.info(dfresponse)
self.response.write('Done')
评论列表
文章目录