def put_config_string(self, keyname, data):
""" Put the config data into a keyname
will replace . with / in the keyname so that this will happen:
discovery.service.prod -> discovery/service/prod
"""
keyname = keyname.replace('.', '/')
s3_bucket = self._get_bucket_conn()
s3_key = s3_bucket.get_key(keyname)
if s3_key is None:
s3_key = Key(s3_bucket, keyname)
try:
s3_key.set_contents_from_string(data)
except boto.exception.S3ResponseError, err:
return err
python类Key()的实例源码
def backup(id):
"""Backup the database to S3."""
k = Key(user_s3_bucket())
k.key = '{}.dump'.format(id)
filename = dump_database(id)
k.set_contents_from_filename(filename)
url = k.generate_url(expires_in=0, query_auth=False)
return url
def register(id, url=None):
"""Register a UUID key in the global S3 bucket."""
k = Key(registration_s3_bucket())
k.key = registration_key(id)
k.set_contents_from_string(url or 'missing')
reg_url = k.generate_url(expires_in=0, query_auth=False)
return reg_url
def assert_uploaded_transcript_on_s3(self, connection):
"""
Verify sjson data uploaded to s3
"""
key = Key(connection.get_bucket(CONFIG_DATA['aws_video_transcripts_bucket']))
key.key = '{directory}{uuid}.sjson'.format(
directory=CONFIG_DATA['aws_video_transcripts_prefix'], uuid=self.uuid_hex
)
sjson_transcript = json.loads(key.get_contents_as_string())
self.assertEqual(sjson_transcript, TRANSCRIPT_SJSON_DATA)
def upload_sjson_to_s3(config, sjson_data):
"""
Upload sjson data to s3.
"""
s3_conn = boto.connect_s3()
bucket = s3_conn.get_bucket(config['aws_video_transcripts_bucket'])
k = Key(bucket)
k.content_type = 'application/json'
k.key = '{directory}{uuid}.sjson'.format(
directory=config['aws_video_transcripts_prefix'],
uuid=uuid.uuid4().hex
)
k.set_contents_from_string(json.dumps(sjson_data))
k.set_acl('public-read')
return k.key
def _BOTO_SINGLEPART(self):
"""
Upload single part (under threshold in node_config)
node_config MULTI_UPLOAD_BARRIER
"""
try:
conn = boto.connect_s3()
except S3ResponseError:
ErrorObject.print_error(
message='Deliverable Fail: s3 Connection Error\n \
Check node_config DELIVERY_ENDPOINT'
)
return False
delv_bucket = conn.get_bucket(
self.auth_dict['edx_s3_endpoint_bucket']
)
upload_key = Key(delv_bucket)
upload_key.key = os.path.basename(os.path.join(
self.node_work_directory,
self.encoded_file
))
headers = {"Content-Disposition": "attachment"}
upload_key.set_contents_from_filename(
os.path.join(
self.node_work_directory,
self.encoded_file
),
headers=headers,
replace=True
)
upload_key.set_acl('public-read')
return True
def upload_video_with_metadata(self, **metadata):
"""
Sets the metadata on an S3 video key.
"""
# Upload the video file to ingest bucket
connection = S3Connection()
self.ingest_bucket = connection.get_bucket(CONFIG_DATA['edx_s3_ingest_bucket'])
key_name = os.path.join(CONFIG_DATA['edx_s3_ingest_prefix'], self.file_name)
self.video_key = Key(self.ingest_bucket, key_name)
for metadata_name, value in dict(S3_METADATA, **metadata).iteritems():
if value is not None:
self.video_key.set_metadata(metadata_name, value)
self.video_key.set_contents_from_filename(self.video_file_path)
def get_s3_key():
c = boto.connect_s3()
b = c.get_bucket(BUCKET_NAME)
return Key(b)
def test_unicode_name(self):
k = Key()
k.name = u'Österreich'
print(repr(k))
def test_file_error(self):
key = Key()
class CustomException(Exception): pass
key.get_contents_to_file = mock.Mock(
side_effect=CustomException('File blew up!'))
# Ensure our exception gets raised instead of a file or IO error
with self.assertRaises(CustomException):
key.get_contents_to_filename('foo.txt')
def test_bucket_get_key_no_validate(self, mock_gki, mock_gak):
self.set_http_response(status_code=200)
bucket = self.service_connection.get_bucket('mybucket')
key = bucket.get_key('mykey', validate=False)
self.assertEqual(len(mock_gki.mock_calls), 0)
self.assertTrue(isinstance(key, Key))
self.assertEqual(key.name, 'mykey')
with self.assertRaises(BotoClientError):
bucket.get_key(
'mykey',
version_id='something',
validate=False
)
def test_delete_mix(self):
result = self.bucket.delete_keys(["king",
("mice", None),
Key(name="regular"),
Key(),
Prefix(name="folder/"),
DeleteMarker(name="deleted"),
{"bad":"type"}])
self.assertEqual(len(result.deleted), 4)
self.assertEqual(len(result.errors), 3)
def test_delete_unknown_version(self):
no_ver = Key(name="no")
no_ver.version_id = "version"
result = self.bucket.delete_keys([no_ver])
self.assertEqual(len(result.deleted), 0)
self.assertEqual(len(result.errors), 1)
def test_delete_kanji(self):
result = self.bucket.delete_keys([u"??", Key(name=u"???")])
self.assertEqual(len(result.deleted), 2)
self.assertEqual(len(result.errors), 0)
def push_to_S3(filename, jsonToUpload):
k = Key(bucket)
k.key = filename
k.set_contents_from_string(jsonToUpload)
def _fetch(self, data):
bucket = self._obtain_s3_connection(
self.access_key_id,
self.secret_access_key,
).get_bucket(
self.bucket_name,
)
contents = Key(bucket, self.key).get_contents_as_string()
return json.loads(contents)
def download_file(file_key, file_path, bucket_name):
# items_log.info("Downloading file: %s" % file_key)
k = Key(get_bucket(get_connection(), bucket_name))
k.key = file_key
k.get_contents_to_filename(file_path)
# items_log.info("Downloading complete!")
def upload_file(file_key, file_path, bucket_name):
# items_log.info("Uploading file: %s" % file_key)
k = Key(get_bucket(get_connection(), bucket_name))
k.key = file_key
k.set_contents_from_filename(file_path)
# items_log.info("Upload complete!")
def valid_segments(self):
kfile = Key(self.bucket, '/common-crawl/parse-output/valid_segments.txt')
return [i.strip() for i in kfile.read().splitlines()]
def get_index(self, prefix):
"""
:param prefix: str
Prefix to S3 bucket
:return: Uncompressed warc index
:rtype: str
"""
crawl = self.select_crawl(prefix)
botokey = Key(self.bucket, crawl + 'warc.paths.gz')
return [i.strip() for i in GzipFile(fileobj=BytesIO(botokey.read()))]