python类Key()的实例源码

compress_css_js_files.py 文件源码 项目:django-webpacker 作者: MicroPyramid 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def upload_to_s3(css_file):
    bucket_name = settings.AWS_BUCKET_NAME
    conn = S3Connection(settings.AWS_ACCESS_KEY_ID, settings.AWS_SECRET_ACCESS_KEY)

    folder = 'webpack_bundles/'
    bucket = conn.get_bucket(bucket_name=bucket_name)

    filename = css_file.split('/')[-1]
    file_obj = open(css_file, 'r')
    content = file_obj.read()

    key = folder + filename
    bucket = conn.get_bucket(bucket_name=bucket_name)
    mime = mimetypes.guess_type(filename)[0]
    k = Key(bucket)
    k.key = key  # folder + filename
    k.set_metadata("Content-Type", mime)
    k.set_contents_from_string(content)
    public_read = True
    if public_read:
        k.set_acl("public-read")
aws_s3_file_upload.py 文件源码 项目:iot-detroit-jan2017 作者: mvartani76 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def upload_file_to_s3(file_file_path, bucket_name):
    """Uploads files to Amazon's S3 service.

    Arguments:
    file_file_path: Path to file to upload on local machine.
    bucket_name: Name of the S3 bucket where file should be uploaded.
    key_name: Name of the key for the file on S3 (usually the
            timestamp).
    """
    bucket = s3_connection.get_bucket(bucket_name)

    # Create a new key using file_file_path as the key
    key = Key(bucket)
    key.key = file_file_path 
    key.set_contents_from_filename(file_file_path)
    return key
ead_to_collection.py 文件源码 项目:mets2man 作者: thegetty 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def to_s3(ident, doc_type):
    os.environ['http_proxy'] = 'http://dumont.getty.edu:80'
    os.environ['https_proxy'] = 'https://dumont.getty.edu:80'

    # Connect to s3 and get bucket
    rw = boto.connect_s3(aws_access_key_id=aws_access,
        aws_secret_access_key=aws_secret)
    b = rw.get_bucket(aws_bucket)

    print('{}{}/{}.json'.format(iiif_prezi_base, ident, doc_type))
    k = Key(b, '{}{}/{}.json'.format(iiif_prezi_base, ident, doc_type))
    if doc_type == 'collection':
        print('{}/{}/collection.json'.format(collection_dir, ident))
        k.set_contents_from_filename('{}/{}/collection.json'.format(collection_dir, ident))
    elif doc_type == 'manifest':
        print('{}/{}/manifest.json'.format(manifest_dir, ident))
        k.set_contents_from_filename('{}/{}/manifest.json'.format(manifest_dir, ident))
    c.execute('INSERT OR REPLACE INTO {}_prezi_docs VALUES (?, ?)'.format(project), (ident, 1))
    conn.commit()
    print('{} complete and added to DB'.format(ident))
ZendeskPDFMaker.py 文件源码 项目:zendesk-utils 作者: trailbehind 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def post_pdfs_to_s3(self):
    conn = boto.s3.connect_to_region('us-east-1',
                                     aws_access_key_id=S3_ACCESS_KEY_FOR_MANUAL,
                                     aws_secret_access_key=S3_SECRET_KEY_FOR_MANUAL,
                                     calling_format=OrdinaryCallingFormat())
    bucket_name = S3_BUCKET_FOR_MANUAL
    bucket_dir = S3_DIRECTORY_FOR_MANUAL
    bucket = conn.get_bucket(bucket_name, validate=False)
    source_dir = os.path.join(ZENDESK_UTIL_DIR, 'gen/pdf/')
    print "posting pdfs from %s" % source_dir
    section_dict = {}
    for fn in os.listdir(source_dir):
      with open(source_dir + fn, 'r') as pdf_file:
        chunks = fn.split('-')
        category = chunks[0]
        filename = '-'.join(chunks[1:len(chunks)])
        if not category in section_dict:
          section_dict[category] = ''
        section_dict[category] += '<tr><td style="padding-right:10px;padding-bottom:5px"><a href=http://{}/{}/{}/{}>{}</a></td><td>http://{}/{}/{}/{}</td></tr>'.format(bucket_name, bucket_dir, category, filename, filename, bucket_name, bucket_dir, category, filename)
        k = Key(bucket)
        k.key = '/' + bucket_dir + '/' + category + '/' + filename
        print("POSTING PDF to S3: " + k.key)
        k.set_contents_from_file(pdf_file,cb=self.percent_cb, num_cb=1)
    self.post_inventory_html(section_dict, bucket, bucket_name)
ZendeskPDFMaker.py 文件源码 项目:zendesk-utils 作者: trailbehind 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def post_inventory_html(self, section_dict, bucket, bucket_name):
    manual_urls = '<h1>{}</h1>'.format("Manual PDFs")

    for category in section_dict:
      if URL_LIST_CATEGORIES == None  or category in URL_LIST_CATEGORIES:
        manual_urls += '<h2>{}</h2>'.format(category)
        manual_urls += '<table>'
        manual_urls += section_dict[category]
        manual_urls += '</table>'
    date = time.strftime('%l:%M%p %Z on %b %d, %Y')
    manual_urls += '<h3 style="color:gray"><em>Last Updated: {}</em></h3>'.format(date)

    with open(os.path.join(ZENDESK_UTIL_DIR, 'gen/url_list.html'), 'w') as url_file:
      url_file.write(manual_urls)
    with open(os.path.join(ZENDESK_UTIL_DIR, 'gen/url_list.html'), 'r') as url_file:
      k = Key(bucket)
      k.key = '/' + S3_DIRECTORY_FOR_MANUAL + '/url_list.html'
      k.set_contents_from_file(url_file, cb=self.percent_cb, num_cb=1)

    print("POSTED inventory html to S3 at: " + bucket_name + k.key)
veda_deliver.py 文件源码 项目:edx-video-pipeline 作者: edx 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _CLEANUP(self):
        """
        check for workflow simplification
        """
        if self.auth_dict['veda_deliverable_bucket'] == \
                self.auth_dict['edx_s3_endpoint_bucket']:
            return
        try:
            conn = boto.connect_s3()
        except S3ResponseError:
            return
        del_bucket = conn.get_bucket(
            self.auth_dict['veda_deliverable_bucket']
        )
        k = Key(del_bucket)
        k.key = self.encoded_file
        k.delete()
connection.py 文件源码 项目:cuny-bdif 作者: aristotle-tek 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def make_request(self, method, bucket='', key='', headers=None, data='',
                     query_args=None, sender=None, override_num_retries=None,
                     retry_handler=None):
        if isinstance(bucket, self.bucket_class):
            bucket = bucket.name
        if isinstance(key, Key):
            key = key.name
        path = self.calling_format.build_path_base(bucket, key)
        boto.log.debug('path=%s' % path)
        auth_path = self.calling_format.build_auth_path(bucket, key)
        boto.log.debug('auth_path=%s' % auth_path)
        host = self.calling_format.build_host(self.server_name(), bucket)
        if query_args:
            path += '?' + query_args
            boto.log.debug('path=%s' % path)
            auth_path += '?' + query_args
            boto.log.debug('auth_path=%s' % auth_path)
        return super(S3Connection, self).make_request(
            method, path, headers,
            data, host, auth_path, sender,
            override_num_retries=override_num_retries,
            retry_handler=retry_handler
        )
connection.py 文件源码 项目:baiji 作者: bodylabs 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def encrypt_at_rest(self, key):
        '''
        This method takes a key on s3 and encrypts it.
        Note that calling this method on a local file is an error
        and that calling it on an s3 key that is already encrypted,
        while allowed, is a no-op.
        '''
        k = path.parse(key)
        if k.scheme != 's3':
            raise InvalidSchemeException("URI Scheme %s is not implemented" % k.scheme)
        remote_object = self._lookup(k.netloc, k.path)
        if remote_object is None:
            raise KeyNotFound("Error encrypting %s: Key doesn't exist" % (key, ))
        if not bool(remote_object.encrypted):
            bucket = self._bucket(k.netloc)
            src = k.path
            if src.startswith(path.sep):
                src = src[len(path.sep):] # NB: copy_key is failing with absolute src keys...
            bucket.copy_key(src, k.netloc, src, preserve_acl=True, metadata=None, encrypt_key=True)
connection.py 文件源码 项目:learneveryword 作者: karan 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def make_request(self, method, bucket='', key='', headers=None, data='',
                     query_args=None, sender=None, override_num_retries=None,
                     retry_handler=None):
        if isinstance(bucket, self.bucket_class):
            bucket = bucket.name
        if isinstance(key, Key):
            key = key.name
        path = self.calling_format.build_path_base(bucket, key)
        boto.log.debug('path=%s' % path)
        auth_path = self.calling_format.build_auth_path(bucket, key)
        boto.log.debug('auth_path=%s' % auth_path)
        host = self.calling_format.build_host(self.server_name(), bucket)
        if query_args:
            path += '?' + query_args
            boto.log.debug('path=%s' % path)
            auth_path += '?' + query_args
            boto.log.debug('auth_path=%s' % auth_path)
        return super(S3Connection, self).make_request(
            method, path, headers,
            data, host, auth_path, sender,
            override_num_retries=override_num_retries,
            retry_handler=retry_handler
        )
common.py 文件源码 项目:PyDataLondon29-EmbarrassinglyParallelDAWithAWSLambda 作者: SignalMedia 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def readline(self):
            """Split the contents of the Key by '\n' characters."""
            if self.lines:
                retval = self.lines[0]
                self.lines = self.lines[1:]
                return retval
            if self.finished_read:
                if self.buffer:
                    retval, self.buffer = self.buffer, ""
                    return retval
                else:
                    raise StopIteration

            if self.encoding:
                self.buffer = "{}{}".format(
                    self.buffer, self.read(8192).decode(self.encoding))
            else:
                self.buffer = "{}{}".format(self.buffer, self.read(8192))

            split_buffer = self.buffer.split("\n")
            self.lines.extend(split_buffer[:-1])
            self.buffer = split_buffer[-1]

            return self.readline()
csv_loader_for_redshift.py 文件源码 项目:CSV_Loader_For_Redshift 作者: alexbuz 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def main(transfer_file, bucket_name, s3_key_name=None, use_rr=False,
         make_public=True):
    global bucket
    # open the wikipedia file
    if not s3_key_name:
        s3_key_name = os.path.basename(transfer_file)
    conn = boto.connect_s3(AWS_ACCESS_KEY_ID,AWS_SECRET_ACCESS_KEY)
    bucket = conn.get_bucket(bucket_name)

    file_handle = open(transfer_file, 'rb')

    k = Key(bucket)
    k.key = s3_key_name

    k.set_contents_from_file(file_handle, cb=progress, num_cb=20, reduced_redundancy=use_rr )
    if make_public:
        k.make_public()


    return '/'.join((bucket_name, s3_key_name))
backup_s3.py 文件源码 项目:trustcode-addons 作者: Trust-Code 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def send_for_amazon_s3(self, file_to_send, name_to_store, database):
        try:
            if self.aws_access_key and self.aws_secret_key:
                access_key = self.aws_access_key
                secret_key = self.aws_secret_key

                conexao = S3Connection(access_key, secret_key)
                bucket_name = '%s_bkp_pelican' % database
                bucket = conexao.create_bucket(bucket_name)

                k = Key(bucket)
                k.key = name_to_store
                k.set_contents_from_filename(file_to_send)
                return k.key
            else:
                _logger.error(
                    u'Configurações do Amazon S3 não setadas, \
                    pulando armazenamento de backup')
        except Exception:
            _logger.error('Erro ao enviar dados para S3', exc_info=True)
connection.py 文件源码 项目:Chromium_DepotTools 作者: p07r0457 项目源码 文件源码 阅读 66 收藏 0 点赞 0 评论 0
def make_request(self, method, bucket='', key='', headers=None, data='',
            query_args=None, sender=None, override_num_retries=None):
        if isinstance(bucket, self.bucket_class):
            bucket = bucket.name
        if isinstance(key, Key):
            key = key.name
        path = self.calling_format.build_path_base(bucket, key)
        boto.log.debug('path=%s' % path)
        auth_path = self.calling_format.build_auth_path(bucket, key)
        boto.log.debug('auth_path=%s' % auth_path)
        host = self.calling_format.build_host(self.server_name(), bucket)
        if query_args:
            path += '?' + query_args
            boto.log.debug('path=%s' % path)
            auth_path += '?' + query_args
            boto.log.debug('auth_path=%s' % auth_path)
        return AWSAuthConnection.make_request(self, method, path, headers,
                data, host, auth_path, sender,
                override_num_retries=override_num_retries)
bucket.py 文件源码 项目:Chromium_DepotTools 作者: p07r0457 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_website_configuration(self, headers=None):
        """
        Returns the current status of website configuration on the bucket.

        :rtype: dict
        :returns: A dictionary containing a Python representation
            of the XML response from S3. The overall structure is:

        * WebsiteConfiguration

          * IndexDocument

            * Suffix : suffix that is appended to request that
              is for a "directory" on the website endpoint
            * ErrorDocument

              * Key : name of object to serve when an error occurs
        """
        return self.get_website_configuration_with_xml(headers)[0]
connection.py 文件源码 项目:node-gn 作者: Shouqun 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def make_request(self, method, bucket='', key='', headers=None, data='',
            query_args=None, sender=None, override_num_retries=None):
        if isinstance(bucket, self.bucket_class):
            bucket = bucket.name
        if isinstance(key, Key):
            key = key.name
        path = self.calling_format.build_path_base(bucket, key)
        boto.log.debug('path=%s' % path)
        auth_path = self.calling_format.build_auth_path(bucket, key)
        boto.log.debug('auth_path=%s' % auth_path)
        host = self.calling_format.build_host(self.server_name(), bucket)
        if query_args:
            path += '?' + query_args
            boto.log.debug('path=%s' % path)
            auth_path += '?' + query_args
            boto.log.debug('auth_path=%s' % auth_path)
        return AWSAuthConnection.make_request(self, method, path, headers,
                data, host, auth_path, sender,
                override_num_retries=override_num_retries)
bucket.py 文件源码 项目:node-gn 作者: Shouqun 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def get_website_configuration(self, headers=None):
        """
        Returns the current status of website configuration on the bucket.

        :rtype: dict
        :returns: A dictionary containing a Python representation
            of the XML response from S3. The overall structure is:

        * WebsiteConfiguration

          * IndexDocument

            * Suffix : suffix that is appended to request that
              is for a "directory" on the website endpoint
            * ErrorDocument

              * Key : name of object to serve when an error occurs
        """
        return self.get_website_configuration_with_xml(headers)[0]
user.py 文件源码 项目:mltshp 作者: MLTSHP 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def set_profile_image(self, file_path, file_name, content_type):
        """
        Takes a local path, name and content-type, which are parameters passed in by
        nginx upload module.  Converts to RGB, resizes to thumbnail and uploads to S3.
        Returns False if some conditions aren't met, such as error making thumbnail
        or content type is one we don't support.
        """
        valid_content_types = ('image/gif', 'image/jpeg', 'image/jpg', 'image/png',)
        if content_type not in valid_content_types:
            return False

        destination =  cStringIO.StringIO()
        if not transform_to_square_thumbnail(file_path, 100*2, destination):
            return False

        bucket = S3Bucket()
        k = Key(bucket)
        k.key = "account/%s/profile.jpg" % (self.id)
        k.set_metadata('Content-Type', 'image/jpeg')
        k.set_metadata('Cache-Control', 'max-age=86400')
        k.set_contents_from_string(destination.getvalue())
        k.set_acl('public-read')
        self.profile_image = 1
        self.save()
        return True
transport.py 文件源码 项目:django-s3 作者: RaydelMiranda 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def upload(self, resource):
        """
        Upload a resource.

        :param resource: An instance of `django_s3.resource.Resource`
        """
        try:

            folder_name = url_pattern.match(resource.name).groupdict()['folder_name']

            key_holder = Key(self.__bucket)
            key_holder.key = "{}/{}/{}".format(settings.S3_CATEGORY_MAP[resource.category_code],
                                               folder_name,
                                               resource.name)
            key_holder.set_contents_from_filename(os.path.join(django_s3_settings.S3_UPLOAD_DIR_PATH, resource.name))
            key_holder.make_public()
        except Exception as err:
            Transport.logger.error(_("Error uploading file: {}. Error: {}".format(resource.name, err)))
            # Right now we don't know what exceptions are expected here, we propagate the error
            # up. If we found some exception then we'll add the proper handler.
            raise
misc.py 文件源码 项目:smart-cam 作者: smart-cam 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def upload_to_s3(bucket_name, key_name, video_file):
    cfg = Config()
    # connect to the bucket
    conn = boto.connect_s3(cfg.get("aws", "access_key_id"), cfg.get("aws", "secret_access_key"))

    ret_val = False

    try:
        print("# S3: Uploading to Bucket: {0} / Video|Key: {1}".format(bucket_name, video_file))
        bucket = conn.get_bucket(bucket_name)
        k = Key(bucket)
        if key_name:
            k.key = key_name
        else:
            k.key = os.path.basename(video_file)
        k.set_contents_from_filename(video_file)
        ret_val = True
    except boto.exception.S3ResponseError as err:
        print(err)

    return ret_val
misc.py 文件源码 项目:smart-cam 作者: smart-cam 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def upload_to_s3(bucket_name, key_name, video_file):
    cfg = Config()
    # connect to the bucket
    conn = boto.connect_s3(cfg.get("aws", "access_key_id"),
                            cfg.get("aws", "secret_access_key"))

    ret_val = False

    try:
        print("# S3: Uploading to Bucket: {0} / Video|Key: {1}".format(bucket_name, video_file))
        bucket = conn.get_bucket(bucket_name)
        k = Key(bucket)
        if key_name:
            k.key = key_name
        else:
            k.key = os.path.basename(video_file)
        k.set_contents_from_filename(video_file)
        ret_val = True
    except boto.exception.S3ResponseError as err:
        print(err)

    return ret_val
misc.py 文件源码 项目:smart-cam 作者: smart-cam 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def download_from_s3(bucket_name, key_name, local_out_dir='/tmp'):
    cfg = Config()
    # connect to the bucket
    conn = boto.connect_s3(cfg.get("aws", "access_key_id"),
                            cfg.get("aws", "secret_access_key"))

    ret_val = (False, None)

    try:
        print("# S3: Fetching Bucket: {0} / Key: {1}".format(bucket_name, key_name))
        bucket = conn.get_bucket(bucket_name)
        key = bucket.get_key(key_name)
        if key:
            local_file = os.path.join(local_out_dir, os.path.basename(key_name))
            print '# S3: Saving contents to Local File - {0}'.format(local_file)
            key.get_contents_to_filename(local_file, response_headers={
                                                'response-content-type': 'video/avi'
                                            })
            ret_val = (True, os.path.abspath(local_file))
    except boto.exception.S3ResponseError as err:
        print(err)

    return ret_val
connection.py 文件源码 项目:alfred-ec2 作者: SoMuchToGrok 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def make_request(self, method, bucket='', key='', headers=None, data='',
                     query_args=None, sender=None, override_num_retries=None,
                     retry_handler=None):
        if isinstance(bucket, self.bucket_class):
            bucket = bucket.name
        if isinstance(key, Key):
            key = key.name
        path = self.calling_format.build_path_base(bucket, key)
        boto.log.debug('path=%s' % path)
        auth_path = self.calling_format.build_auth_path(bucket, key)
        boto.log.debug('auth_path=%s' % auth_path)
        host = self.calling_format.build_host(self.server_name(), bucket)
        if query_args:
            path += '?' + query_args
            boto.log.debug('path=%s' % path)
            auth_path += '?' + query_args
            boto.log.debug('auth_path=%s' % auth_path)
        return super(S3Connection, self).make_request(
            method, path, headers,
            data, host, auth_path, sender,
            override_num_retries=override_num_retries,
            retry_handler=retry_handler
        )
aws_s3_utility.py 文件源码 项目:hierarchical_rl 作者: wulfebw 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def upload_file(self, filename_to_save_as, file_path):
        """
        :description: uploads a single file to an s3 bucket
        """
        # what is this?
        def percent_cb(complete, total):
            sys.stdout.write('.')
            sys.stdout.flush()

        # select the bucket, where input_s3_bucket takes the form 'bsdsdata'
        bucket = self.conn.get_bucket(self.s3_bucket)

        # send the file to the s3 bucket
        key = Key(bucket)
        key.key = filename_to_save_as
        key.set_contents_from_filename(file_path, cb=percent_cb, num_cb=50)
motion_surveillance.py 文件源码 项目:iot-detroit-jan2017 作者: mvartani76 项目源码 文件源码 阅读 75 收藏 0 点赞 0 评论 0
def upload_image_to_s3(image_file_path, bucket_name):

    """Uploads images to Amazon's S3 service.

    Arguments:

    image_file_path:    Path to image to upload on local machine.
    bucket_name:        Name of the S3 bucket where file should be uploaded.
    key_name:       Name of the key for the file on S3 (usually the
                timestamp).

    """
    print("Entered s3 upload...")
    print(bucket_name)
    bucket = s3_connection.get_bucket(bucket_name)

    # Create a new key using image_file_path as the key
    key = Key(bucket)
    key.key = image_file_path 
    key.set_contents_from_filename(image_file_path)

    return key

# Send Alert to Phone Number Using Twilio
connection.py 文件源码 项目:depot_tools 作者: webrtc-uwp 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def make_request(self, method, bucket='', key='', headers=None, data='',
            query_args=None, sender=None, override_num_retries=None):
        if isinstance(bucket, self.bucket_class):
            bucket = bucket.name
        if isinstance(key, Key):
            key = key.name
        path = self.calling_format.build_path_base(bucket, key)
        boto.log.debug('path=%s' % path)
        auth_path = self.calling_format.build_auth_path(bucket, key)
        boto.log.debug('auth_path=%s' % auth_path)
        host = self.calling_format.build_host(self.server_name(), bucket)
        if query_args:
            path += '?' + query_args
            boto.log.debug('path=%s' % path)
            auth_path += '?' + query_args
            boto.log.debug('auth_path=%s' % auth_path)
        return AWSAuthConnection.make_request(self, method, path, headers,
                data, host, auth_path, sender,
                override_num_retries=override_num_retries)
bucket.py 文件源码 项目:depot_tools 作者: webrtc-uwp 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_website_configuration(self, headers=None):
        """
        Returns the current status of website configuration on the bucket.

        :rtype: dict
        :returns: A dictionary containing a Python representation
            of the XML response from S3. The overall structure is:

        * WebsiteConfiguration

          * IndexDocument

            * Suffix : suffix that is appended to request that
              is for a "directory" on the website endpoint
            * ErrorDocument

              * Key : name of object to serve when an error occurs
        """
        return self.get_website_configuration_with_xml(headers)[0]
s3file.py 文件源码 项目:python-s3-cache 作者: vincetse 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def close(self):
        """On closing the file, copy it back to s3 if it was opened for
        writing/appending.

        :rtype: int
        :return: the number of bytes written
        """
        self.log("closing local cache file(" + self.tmppath + ")")
        self.file.close()
        bytes_written = 0
        if 'w' in self.mode or 'a' in self.mode:
            self.log("writing updated cache file contents to S3")
            k = Key(self.mgr.bucket, self.path)
            try:
                bytes_written = k.set_contents_from_filename(self.tmppath)
            except AttributeError as err:
                self.log(str(err))
                raise
        if not self.mgr.caching:
            # remove the local copy if caching is turned off
            self.remove_cached()
        return bytes_written
employee.py 文件源码 项目:love 作者: Yelp 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def _get_employee_info_from_s3():
    logging.info('Reading employees file from S3...')
    key = Key(
        connect_s3(
            aws_access_key_id=get_secret('AWS_ACCESS_KEY_ID'),
            aws_secret_access_key=get_secret('AWS_SECRET_ACCESS_KEY'),
        ).get_bucket(config.S3_BUCKET),
        'employees.json',
    )
    employee_dicts = json.loads(key.get_contents_as_string())
    logging.info('Done.')
    return employee_dicts
mets_to_iiif.py 文件源码 项目:mets2man 作者: thegetty 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def create_jp2(img_name, img_id):
    image = Image.open('{}.tiff'.format(img_name))
    if image.mode == 'RGB':
        kdu_com = kdu_command_rgb.format(img_name, img_name)
    else:
        kdu_com = kdu_command.format(img_name, img_name)
    kdu_com = kdu_com.replace('{', '\{')
    kdu_com = kdu_com.replace('}', '\}')
    res = subprocess.getoutput(kdu_com)
    if res.startswith('Kakadu Error'):
        # Probably not uncompressed tiff
        print('probably not uncompressed tiff')
        print(res)
        subprocess.getoutput('mv {}.tiff {}-2.tiff'.format(img_name, img_name))
        subprocess.getoutput(convert_command.format(img_name, img_name))
        kdu_com2 = kdu_command.format(img_name, img_name)
        kdu_com2 = kdu_com2.replace('{', '\{')
        kdu_com2 = kdu_com2.replace('}', '\}')
        res = subprocess.getoutput(kdu_com2)
        print('new response')
        print(res)
        if res.startswith('Kakadu Error') or res.startswith('Kakadu Core Error'):
            print('Still broken :(')
            raise ValueError(img_name)

    k = Key(b, '{}{}.jp2'.format(image_base, img_id))
    k.set_contents_from_filename('{}.jp2'.format(img_name))
mets_to_iiif.py 文件源码 项目:mets2man 作者: thegetty 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def to_s3(ident):
    print('{}{}/manifest.json'.format(manifest_base, ident))
    k = Key(b, '{}{}/manifest.json'.format(manifest_base, ident))
    print('{}/{}/manifest.json'.format(manifest_dir, ident))
    k.set_contents_from_filename('{}/{}/manifest.json'.format(manifest_dir, ident))


问题


面经


文章

微信
公众号

扫码关注公众号