python类Key()的实例源码

s3config.py 文件源码 项目:kingpin 作者: pinterest 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def put_config_string(self, keyname, data):
        """ Put the config data into a keyname
            will replace . with / in the keyname so that this will happen:
            discovery.service.prod -> discovery/service/prod
        """
        keyname = keyname.replace('.', '/')
        s3_bucket = self._get_bucket_conn()
        s3_key = s3_bucket.get_key(keyname)
        if s3_key is None:
            s3_key = Key(s3_bucket, keyname)
        try:
            s3_key.set_contents_from_string(data)
        except boto.exception.S3ResponseError, err:
            return err
data.py 文件源码 项目:Dallinger 作者: Dallinger 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def backup(id):
    """Backup the database to S3."""
    k = Key(user_s3_bucket())
    k.key = '{}.dump'.format(id)
    filename = dump_database(id)
    k.set_contents_from_filename(filename)
    url = k.generate_url(expires_in=0, query_auth=False)
    return url
data.py 文件源码 项目:Dallinger 作者: Dallinger 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def register(id, url=None):
    """Register a UUID key in the global S3 bucket."""
    k = Key(registration_s3_bucket())
    k.key = registration_key(id)
    k.set_contents_from_string(url or 'missing')
    reg_url = k.generate_url(expires_in=0, query_auth=False)
    return reg_url
test_transcripts.py 文件源码 项目:edx-video-pipeline 作者: edx 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def assert_uploaded_transcript_on_s3(self, connection):
        """
        Verify sjson data uploaded to s3
        """
        key = Key(connection.get_bucket(CONFIG_DATA['aws_video_transcripts_bucket']))
        key.key = '{directory}{uuid}.sjson'.format(
            directory=CONFIG_DATA['aws_video_transcripts_prefix'], uuid=self.uuid_hex
        )
        sjson_transcript = json.loads(key.get_contents_as_string())
        self.assertEqual(sjson_transcript, TRANSCRIPT_SJSON_DATA)
transcripts.py 文件源码 项目:edx-video-pipeline 作者: edx 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def upload_sjson_to_s3(config, sjson_data):
    """
    Upload sjson data to s3.
    """
    s3_conn = boto.connect_s3()
    bucket = s3_conn.get_bucket(config['aws_video_transcripts_bucket'])
    k = Key(bucket)
    k.content_type = 'application/json'
    k.key = '{directory}{uuid}.sjson'.format(
        directory=config['aws_video_transcripts_prefix'],
        uuid=uuid.uuid4().hex
    )
    k.set_contents_from_string(json.dumps(sjson_data))
    k.set_acl('public-read')
    return k.key
veda_deliver.py 文件源码 项目:edx-video-pipeline 作者: edx 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _BOTO_SINGLEPART(self):
        """
        Upload single part (under threshold in node_config)
        node_config MULTI_UPLOAD_BARRIER
        """
        try:
            conn = boto.connect_s3()
        except S3ResponseError:
            ErrorObject.print_error(
                message='Deliverable Fail: s3 Connection Error\n \
                Check node_config DELIVERY_ENDPOINT'
            )
            return False
        delv_bucket = conn.get_bucket(
            self.auth_dict['edx_s3_endpoint_bucket']
        )
        upload_key = Key(delv_bucket)
        upload_key.key = os.path.basename(os.path.join(
            self.node_work_directory,
            self.encoded_file
        ))
        headers = {"Content-Disposition": "attachment"}
        upload_key.set_contents_from_filename(
            os.path.join(
                self.node_work_directory,
                self.encoded_file
            ),
            headers=headers,
            replace=True
        )
        upload_key.set_acl('public-read')
        return True
test_file_discovery.py 文件源码 项目:edx-video-pipeline 作者: edx 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def upload_video_with_metadata(self, **metadata):
        """
        Sets the metadata on an S3 video key.
        """
        # Upload the video file to ingest bucket
        connection = S3Connection()
        self.ingest_bucket = connection.get_bucket(CONFIG_DATA['edx_s3_ingest_bucket'])

        key_name = os.path.join(CONFIG_DATA['edx_s3_ingest_prefix'], self.file_name)
        self.video_key = Key(self.ingest_bucket, key_name)
        for metadata_name, value in dict(S3_METADATA, **metadata).iteritems():
            if value is not None:
                self.video_key.set_metadata(metadata_name, value)

        self.video_key.set_contents_from_filename(self.video_file_path)
interface.py 文件源码 项目:acacia_main 作者: AcaciaTrading 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_s3_key():
    c = boto.connect_s3()
    b = c.get_bucket(BUCKET_NAME)
    return Key(b)
test_key.py 文件源码 项目:cuny-bdif 作者: aristotle-tek 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_unicode_name(self):
        k = Key()
        k.name = u'Österreich'
        print(repr(k))
test_key.py 文件源码 项目:cuny-bdif 作者: aristotle-tek 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_file_error(self):
        key = Key()

        class CustomException(Exception): pass

        key.get_contents_to_file = mock.Mock(
            side_effect=CustomException('File blew up!'))

        # Ensure our exception gets raised instead of a file or IO error
        with self.assertRaises(CustomException):
            key.get_contents_to_filename('foo.txt')
test_bucket.py 文件源码 项目:cuny-bdif 作者: aristotle-tek 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_bucket_get_key_no_validate(self, mock_gki, mock_gak):
        self.set_http_response(status_code=200)
        bucket = self.service_connection.get_bucket('mybucket')
        key = bucket.get_key('mykey', validate=False)

        self.assertEqual(len(mock_gki.mock_calls), 0)
        self.assertTrue(isinstance(key, Key))
        self.assertEqual(key.name, 'mykey')

        with self.assertRaises(BotoClientError):
            bucket.get_key(
                'mykey',
                version_id='something',
                validate=False
            )
test_multidelete.py 文件源码 项目:cuny-bdif 作者: aristotle-tek 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_delete_mix(self):
        result = self.bucket.delete_keys(["king",
                                          ("mice", None),
                                          Key(name="regular"),
                                          Key(),
                                          Prefix(name="folder/"),
                                          DeleteMarker(name="deleted"),
                                          {"bad":"type"}])
        self.assertEqual(len(result.deleted), 4)
        self.assertEqual(len(result.errors), 3)
test_multidelete.py 文件源码 项目:cuny-bdif 作者: aristotle-tek 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def test_delete_unknown_version(self):
        no_ver = Key(name="no")
        no_ver.version_id = "version"
        result = self.bucket.delete_keys([no_ver])
        self.assertEqual(len(result.deleted), 0)
        self.assertEqual(len(result.errors), 1)
test_multidelete.py 文件源码 项目:cuny-bdif 作者: aristotle-tek 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_delete_kanji(self):
        result = self.bucket.delete_keys([u"??", Key(name=u"???")])
        self.assertEqual(len(result.deleted), 2)
        self.assertEqual(len(result.errors), 0)
aws_module.py 文件源码 项目:hoot 作者: CatalystOfNostalgia 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def push_to_S3(filename, jsonToUpload):
    k = Key(bucket)
    k.key = filename
    k.set_contents_from_string(jsonToUpload)
s3_data_provider.py 文件源码 项目:beans 作者: Yelp 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def _fetch(self, data):
        bucket = self._obtain_s3_connection(
            self.access_key_id,
            self.secret_access_key,
        ).get_bucket(
            self.bucket_name,
        )
        contents = Key(bucket, self.key).get_contents_as_string()
        return json.loads(contents)
s3.py 文件源码 项目:epilepsy_diary 作者: bfortuner 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def download_file(file_key, file_path, bucket_name):
    # items_log.info("Downloading file: %s" % file_key)
    k = Key(get_bucket(get_connection(), bucket_name))
    k.key = file_key
    k.get_contents_to_filename(file_path)
    # items_log.info("Downloading complete!")
s3.py 文件源码 项目:epilepsy_diary 作者: bfortuner 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def upload_file(file_key, file_path, bucket_name):
    # items_log.info("Uploading file: %s" % file_key)
    k = Key(get_bucket(get_connection(), bucket_name))
    k.key = file_key
    k.set_contents_from_filename(file_path)
    # items_log.info("Upload complete!")
aws.py 文件源码 项目:CommonCrawlJob 作者: qadium-memex 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def valid_segments(self):
        kfile = Key(self.bucket, '/common-crawl/parse-output/valid_segments.txt')
        return [i.strip() for i in kfile.read().splitlines()]
aws.py 文件源码 项目:CommonCrawlJob 作者: qadium-memex 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def get_index(self, prefix):
        """
        :param prefix: str
            Prefix to S3 bucket

        :return: Uncompressed warc index
        :rtype: str
        """
        crawl = self.select_crawl(prefix)
        botokey = Key(self.bucket, crawl + 'warc.paths.gz')
        return [i.strip() for i in GzipFile(fileobj=BytesIO(botokey.read()))]


问题


面经


文章

微信
公众号

扫码关注公众号