def export_tables(database, tables, directory):
app_id = app_identity.get_application_id()
# Determine what GCS bucket to write to based on the environment and database.
if app_id == 'None':
bucket_name = app_identity.get_default_gcs_bucket_name()
elif database == 'rdr':
bucket_name = '%s-rdr-export' % app_id
elif database in ['cdm', 'voc']:
bucket_name = '%s-cdm' % app_id
else:
raise BadRequest("Invalid database: %s" % database)
for table_name in tables:
if not _TABLE_PATTERN.match(table_name):
raise BadRequest("Invalid table name: %s" % table_name)
for table_name in tables:
deferred.defer(TableExporter._export_csv, bucket_name, database, directory, table_name)
return {'destination': 'gs://%s/%s' % (bucket_name, directory)}
python类get_default_gcs_bucket_name()的实例源码
def __init__(self):
self.bucket_name = os.environ.get(
'BUCKET_NAME', app_identity.get_default_gcs_bucket_name())
def run(self, *args, **kwargs):
params = {
'entity_kind': 'todo.models.user.User',
'output_writer': {
'bucket_name': app_identity.get_default_gcs_bucket_name(),
'content_type': 'text/plain',
},
}
yield mapreduce_pipeline.MapperPipeline(
'export',
'todo.pipelines.ExportPipeline.map',
'mapreduce.input_readers.DatastoreInputReader',
'mapreduce.output_writers.GoogleCloudStorageConsistentOutputWriter',
params=params)
def file_upload(current):
"""Request an upload ticket for commencing file upload."""
upload_request = location.FileUploadRequest.from_json(
current.request.body.getvalue())
return location.FileUploadResponse.from_keywords(
url=blobstore.create_upload_url(
utils.route_api("/control/file_upload_receive",
upload_request=upload_request.to_json(),
client_id=current.client_id),
gs_bucket_name=app_identity.get_default_gcs_bucket_name())
).to_primitive()
def upload(current, type, flow_id, part=0):
collection_id = utils.new_collection_id()
result = location.BlobUploadSpecs.from_keywords(
url=blobstore.create_upload_url(
utils.route_api("/control/upload_receive",
type=type,
flow_id=flow_id,
collection_id=collection_id,
part=part, client_id=current.client_id),
gs_bucket_name=app_identity.get_default_gcs_bucket_name())
).to_primitive()
return result
def _create_google_cloud_storage(self, config):
"""
Create GoogleCloudStorage instance
:param config: The config
:type config: dict
:return: GoogleCloudStorage instance
:rtype: GoogleCloudStorage
"""
from google.appengine.api import app_identity
bucket = app_identity.get_default_gcs_bucket_name()
if 'bucket' in config:
bucket = config['bucket']
storage_path = os.path.join(os.sep, self._storage_path)
if 'directory' in config:
directory = config['directory']
# Check if absolute or relative path
if not directory.startswith(os.sep):
storage_path = os.path.join(storage_path, directory)
else:
storage_path = directory
files_path = self._files_path
if 'files_path' in config:
files_path = config['files_path']
options = {}
if 'prefix' in config:
options['prefix'] = config['prefix']
from edmunds.storage.drivers.googlecloudstorage import GoogleCloudStorage
return GoogleCloudStorage(self._app, bucket, storage_path, files_path, **options)
def recalculate_metrics():
in_progress = MetricsVersionDao().get_version_in_progress()
if in_progress:
logging.info("=========== Metrics pipeline already running ============")
return '{"metrics-pipeline-status": "running"}'
else:
bucket_name = app_identity.get_default_gcs_bucket_name()
logging.info("=========== Starting metrics export ============")
MetricsExport.start_export_tasks(bucket_name,
int(config.getSetting(config.METRICS_SHARDS, 1)))
return '{"metrics-pipeline-status": "started"}'
def gcscacher(f, bucketname=None, cachekey=None, expiresec = None):
if not f:
return functools.partial(gcscacher, expiresec=expiresec)
def getvalue(*args, **kwargs):
key = cachekey if cachekey else make_flash(f, args, kwargs)
logdebug("Enter gcscacher.getvalue: %s" % key)
bucket = bucketname if bucketname else os.environ.get(
'BUCKET_NAME',
app_identity.get_default_gcs_bucket_name())
lpicklepath = "/%s/gcscache/%s.pickle" % (bucket, key)
logdebug("picklepath: %s" % lpicklepath)
lsaved = None
try:
#1: Get the meta info
with gcs.open(lpicklepath) as picklefile:
lsaved = pickle.load(picklefile)
except gcs.NotFoundError:
pass
lexpireat = lsaved.get("expireat") if lsaved else None
lcontent = None
lcacheIsValid = False
if lsaved and not (lexpireat and lexpireat < get_utcnow_unixtimestampusec()):
lcontent = lsaved.get("content")
lcacheIsValid = True
if not lcacheIsValid:
logdebug("GCS Cache miss")
lcontent = f(*args, **kwargs)
logdebug("write content back to gcs")
ltosave = {
"expireat": get_utcnow_unixtimestampusec() + (expiresec * 1000000) if expiresec else None,
"content": lcontent
}
with gcs.open(lpicklepath, "w") as picklefilewrite:
cloudpickle.dump(ltosave, picklefilewrite)
else:
logdebug("GCS Cache hit")
logdebug("Leave gcscacher.getvalue: %s" % key)
return lcontent
return getvalue
def run(self,
job_name,
mapper_spec,
reducer_spec,
input_reader_spec,
output_writer_spec=None,
mapper_params=None,
reducer_params=None,
shards=None,
combiner_spec=None):
if mapper_params.get("bucket_name") is None:
try:
mapper_params["bucket_name"] = (
app_identity.get_default_gcs_bucket_name())
except Exception, e:
raise errors.Error("Unable to get the GCS default bucket name. "
"Check to see that GCS is properly activated. "
+ str(e))
if mapper_params["bucket_name"] is None:
raise errors.Error("There is no GCS default bucket name. "
"Check to see that GCS is properly activated.")
map_pipeline = yield MapPipeline(job_name,
mapper_spec,
input_reader_spec,
params=mapper_params,
shards=shards)
shuffler_pipeline = yield ShufflePipeline(
job_name, mapper_params, map_pipeline)
reducer_pipeline = yield ReducePipeline(
job_name,
reducer_spec,
output_writer_spec,
reducer_params,
mapper_params["bucket_name"],
shuffler_pipeline,
combiner_spec=combiner_spec)
with pipeline.After(reducer_pipeline):
all_temp_files = yield pipeline_common.Extend(
map_pipeline, shuffler_pipeline)
yield CleanupPipeline(all_temp_files)
yield _ReturnPipeline(map_pipeline.result_status,
reducer_pipeline.result_status,
reducer_pipeline)