python类copyfileobj()的实例源码

saveimages.py 文件源码 项目:endorsementdb.com 作者: endorsementdb 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def handle(self, *args, **options):
        s3_connection = boto.connect_s3()
        bucket = s3_connection.get_bucket('endorsementdb.com')

        usernames = options['usernames']
        for username in usernames:
            account = Account.objects.get_from_username(username)
            endorser = account.endorser

            url = account.get_large_image()
            print url, endorser.name

            response = requests.get(url, stream=True)
            with open('/tmp/profile_image.png', 'wb') as out_file:
                shutil.copyfileobj(response.raw, out_file)
                del response

            key = bucket.new_key('images/endorsers/%d.png' % endorser.pk)
            key.set_contents_from_filename(out_file.name)
            key.make_public()
rpm.py 文件源码 项目:crypto-detector 作者: Wind-River 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def copyfileobj(src, dst, length=None):
    """Copy length bytes from fileobj src to fileobj dst.
       If length is None, copy the entire content.
    """
    if length == 0:
        return
    if length is None:
        shutil.copyfileobj(src, dst)
        return

    bufsize = 16 * 1024
    blocks, remainder = divmod(length, bufsize)
    for _ in range(blocks):
        buf = src.read(bufsize)
        if len(buf) < bufsize:
            raise IOError("end of file reached")
        dst.write(buf)

    if remainder != 0:
        buf = src.read(remainder)
        if len(buf) < remainder:
            raise IOError("end of file reached")
        dst.write(buf)
    return
rpm.py 文件源码 项目:crypto-detector 作者: Wind-River 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def makefile(self, cpioinfo, cpiogetpath):
        """Make a file called cpiogetpath.
        """
        extractinfo = None
        if cpioinfo.nlink == 1:
            extractinfo = cpioinfo
        else:
            if cpioinfo.ino in self.inodes:
                # actual file exists, create link
                # FIXME handle platforms that don't support hardlinks
                os.link(os.path.join(cpioinfo._link_path,
                                     self.inodes[cpioinfo.ino][0]), cpiogetpath)
            else:
                extractinfo = self._datamember(cpioinfo)

        if cpioinfo.ino not in self.inodes:
            self.inodes[cpioinfo.ino] = []
        self.inodes[cpioinfo.ino].append(cpioinfo.name)

        if extractinfo:
            source = self.extractfile(extractinfo)
            cpioget = open(cpiogetpath, "wb")
            copyfileobj(source, cpioget)
            source.close()
            cpioget.close()
worker.py 文件源码 项目:embeddeddata 作者: toolforge 项目源码 文件源码 阅读 71 收藏 0 点赞 0 评论 0
def overwrite(filepage, msg, res, path):
    filepage._file_revisions.clear()

    if not filepage.get_file_history():
        pywikibot.warning("Page doesn't exist, skipping upload.")
        return

    with tempfile.NamedTemporaryFile() as tmp:
        with open(path, 'rb') as old:
            shutil.copyfileobj(old, tmp)

        tmp.truncate(res[0]['pos'])
        retry_apierror(
            lambda:
            filepage.upload(tmp.name,
                            comment=MESSAGE_PREFIX+msg,
                            ignore_warnings=True)
        )
vision.py 文件源码 项目:pybot 作者: raelga 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def see(url):

    import requests
    import shutil

    path = str(random.random())

    image = requests.get(url, stream=True)
    if image.status_code == 200:
        with open(path, 'wb') as tmpfile:
            image.raw.decode_content = True
            shutil.copyfileobj(image.raw, tmpfile)

        res = process(path)
        os.remove(path)
        return res
filerecorder.py 文件源码 项目:burro 作者: yconst 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def record_frame(self, image_buffer, angle, throttle):
        '''
        Record a single image buffer, with frame index, angle and throttle values
        as its filename
        '''
        # throttle is inversed, i.e. forward is negative, backwards positive
        # we are only interested in forward values of throttle
        # angle is counter-clockwise, i.e. left is positive
        # TODO: make a proper value mapping here, and then transform
        if (throttle * -1.0 < config.recording.throttle_threshold or
                abs(angle) < config.recording.steering_threshold):
            self.is_recording = False
            return
        self.is_recording = True
        file_angle = int(angle * 10)
        file_throttle = int(throttle * 1000)
        filepath = self.create_img_filepath(
            self.instance_path,
            self.frame_count,
            file_angle,
            file_throttle)
        with open(filepath, 'w') as fd:
            image_buffer.seek(0)
            shutil.copyfileobj(image_buffer, fd, -1)
        self.frame_count += 1
corpus.py 文件源码 项目:Natural-Language-Processing-Python-and-NLTK 作者: PacktPublishing 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def remove_line(fname, line):
    '''Remove line from file by creating a temporary file containing all lines
    from original file except those matching the given line, then copying the
    temporary file back into the original file, overwriting its contents.
    '''
    with lockfile.FileLock(fname):
        tmp = tempfile.TemporaryFile()
        fp = open(fname, 'rw+')
        # write all lines from orig file, except if matches given line
        for l in fp:
            if l.strip() != line:
                tmp.write(l)

        # reset file pointers so entire files are copied
        fp.seek(0)
        tmp.seek(0)
        # copy tmp into fp, then truncate to remove trailing line(s)
        shutil.copyfileobj(tmp, fp)
        fp.truncate()
        fp.close()
        tmp.close()
test_fs.py 文件源码 项目:cloudstrype 作者: btimby 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_fs_replicas(self):
        mock_clients = MockClients(self.user)
        with mock.patch('main.models.User.get_clients',
                        mock_clients.get_clients):
            fs = get_fs(self.user, chunk_size=3, replicas=2)

            with BytesIO(TEST_FILE) as f:
                file = fs.upload('/foo', f)

            mock_clients.clients[2].data.clear()

            self.assertEqual('/foo', file.path)

            with BytesIO() as o:
                with fs.download('/foo') as f:
                    shutil.copyfileobj(f, o)
                    self.assertEqual(TEST_FILE, o.getvalue())

            with self.assertRaises(FileNotFoundError):
                fs.download('/barfoo')

            fs.delete('/foo')
test_fs.py 文件源码 项目:cloudstrype 作者: btimby 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_fs_replicas(self):
        mock_clients = MockClients(self.user)
        with mock.patch('main.models.User.get_clients',
                        mock_clients.get_clients):
            fs = get_fs(self.user, chunk_size=3, replicas=2)

            with BytesIO(TEST_FILE) as f:
                file = fs.upload('/foo', f)

            mock_clients.clients[2].data.clear()

            self.assertEqual('/foo', file.path)

            with BytesIO() as o:
                with fs.download('/foo') as f:
                    shutil.copyfileobj(f, o)
                    self.assertEqual(TEST_FILE, o.getvalue())

            with self.assertRaises(FileNotFoundError):
                fs.download('/barfoo')

            fs.delete('/foo')
test_fs.py 文件源码 项目:cloudstrype 作者: btimby 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_fs_replicas(self):
        mock_clients = MockClients(self.user)
        with mock.patch('main.models.User.get_clients',
                        mock_clients.get_clients):
            fs = get_fs(self.user, chunk_size=3, replicas=2)

            with BytesIO(TEST_FILE) as f:
                file = fs.upload('/foo', f)

            mock_clients.clients[2].data.clear()

            self.assertEqual('/foo', file.path)

            with BytesIO() as o:
                with fs.download('/foo') as f:
                    shutil.copyfileobj(f, o)
                    self.assertEqual(TEST_FILE, o.getvalue())

            with self.assertRaises(FileNotFoundError):
                fs.download('/barfoo')

            fs.delete('/foo')
buildserver.py 文件源码 项目:optimalvibes 作者: littlemika 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def build(self):
        if not os.path.exists(self.srcPath):
            raise HTTPError('No such file', 404)
        if os.path.isdir(self.srcPath):
            raise HTTPError('Is a directory: %s' % self.srcPath, 401)

        self.handler.send_response(200)
        self.handler.send_header('Content-Type', 'application/octet-stream')
        self.handler.send_header('Content-Disposition', 'attachment; filename=%s' % os.path.split(self.srcPath)[-1])
        self.handler.send_header('Content-Length', str(os.stat(self.srcPath).st_size))
        self.handler.end_headers()

        with open(self.srcPath, 'rb') as src:
            shutil.copyfileobj(src, self.handler.wfile)

        super(DownloadBuilder, self).build()
utils.py 文件源码 项目:AutomatorX 作者: xiaoyaojjian 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def http_download(url, target_path):
    """Download file to local
    Args:
        - url(string): url request path
        - target_path(string): download destination

    Raises:
        IOError
        urllib2.URLError
    """
    try:
        resp = urllib2.urlopen(url)
    except urllib2.URLError, e:
        if not hasattr(e, 'code'):
            raise
        resp = e
    if resp.code != 200:
        raise IOError("Request url(%s) expect 200 but got %d" %(url, resp.code))

    with open(target_path, 'wb') as f:
        shutil.copyfileobj(resp, f)
    return target_path
layman.py 文件源码 项目:DevOps 作者: YoLoveLife 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def download_url(module, url, dest):
    '''
    :param url: the URL to download
    :param dest: the absolute path of where to save the downloaded content to;
        it must be writable and not a directory

    :raises ModuleError
    '''

    # Hack to add params in the form that fetch_url expects
    module.params['http_agent'] = USERAGENT
    response, info = fetch_url(module, url)
    if info['status'] != 200:
        raise ModuleError("Failed to get %s: %s" % (url, info['msg']))

    try:
        with open(dest, 'w') as f:
            shutil.copyfileobj(response, f)
    except IOError as e:
        raise ModuleError("Failed to write: %s" % str(e))
compile.py 文件源码 项目:Real-Time-LaTeX 作者: stevenengler 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def fix_synctex(self, project_directory, compiled_path_relative_to_project_path, filename):
        old_synctex = project_directory+'/'+compiled_path_relative_to_project_path+'/'+filename+'.synctex'
        new_synctex = project_directory+'/'+compiled_path_relative_to_project_path+'/'+filename+'.synctex.new'
        #
        if os.path.isfile(old_synctex):
            f1 = open(old_synctex, 'r')
            f2 = open(new_synctex, 'w')
            #
            project_path_relative_to_compiled_path = os.path.relpath(project_directory, project_directory+'/'+compiled_path_relative_to_project_path)
            for line in f1:
                f2.write(line.replace(os.path.abspath(project_directory), project_path_relative_to_compiled_path))
            #
            f1.close()
            f2.close()
            os.remove(old_synctex)
            #os.rename(new_synctex, old_synctex)
            #
            with open(new_synctex, 'rb') as f_in, gzip.open(old_synctex+'.gz', 'wb') as f_out:
                shutil.copyfileobj(f_in, f_out)
            #
            os.remove(new_synctex)
        #
    #
#
nifti.py 文件源码 项目:MDT 作者: cbclab 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def unzip_nifti(input_filename, output_filename):
    """Unzips the given nifti file.

    This will create the output directories if they do not yet exist.

    Args:
        input_filename (str): the nifti file we would like to unzip. Should have the extension ``.gz``.
        output_filename (str): the location for the output file. Should have the extension ``.nii``.

    Raises:
        ValueError: if the extensions of either the input or output filename are not correct.
    """
    if not os.path.exists(os.path.dirname(output_filename)):
        os.makedirs(os.path.dirname(output_filename))

    if not input_filename.rstrip().endswith('.gz') or not output_filename.rstrip().endswith('.nii'):
        raise ValueError('The input filename should have extension ".gz" and the '
                         'output filename should have extension ".nii".')

    with gzip.open(input_filename, 'rb') as f_in, open(output_filename, 'wb') as f_out:
        shutil.copyfileobj(f_in, f_out)
fidelity_visa.py 文件源码 项目:bank_wrangler 作者: tmerr 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def fetch(config, fileobj):
    """
    Fetch transactions for the Visa card specified in the config.

    We start by logging in to fidelity.com, then click through some menus to
    transfer credentials to Elan Financial Services' site fidelityrewards.com,
    where we download transactions for the past 17-18 months in CSV format.
    """
    *_, lastfour = config
    account_name = f'Fidelity Visa {lastfour.value}'
    fileobj.write(account_name + '\n')
    with tempfile.TemporaryDirectory() as tempdir:
        csv_path, balance = _download(config, tempdir)
        fileobj.write(balance + '\n')
        with open(csv_path, 'r') as csv_file:
            shutil.copyfileobj(csv_file, fileobj)
tarfile.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def copyfileobj(src, dst, length=None):
    """Copy length bytes from fileobj src to fileobj dst.
       If length is None, copy the entire content.
    """
    if length == 0:
        return
    if length is None:
        shutil.copyfileobj(src, dst)
        return

    BUFSIZE = 16 * 1024
    blocks, remainder = divmod(length, BUFSIZE)
    for b in range(blocks):
        buf = src.read(BUFSIZE)
        if len(buf) < BUFSIZE:
            raise IOError("end of file reached")
        dst.write(buf)

    if remainder != 0:
        buf = src.read(remainder)
        if len(buf) < remainder:
            raise IOError("end of file reached")
        dst.write(buf)
    return
tarfile.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def addfile(self, tarinfo, fileobj=None):
        """Add the TarInfo object `tarinfo' to the archive. If `fileobj' is
           given, tarinfo.size bytes are read from it and added to the archive.
           You can create TarInfo objects using gettarinfo().
           On Windows platforms, `fileobj' should always be opened with mode
           'rb' to avoid irritation about the file size.
        """
        self._check("aw")

        tarinfo = copy.copy(tarinfo)

        buf = tarinfo.tobuf(self.format, self.encoding, self.errors)
        self.fileobj.write(buf)
        self.offset += len(buf)

        # If there's data to follow, append it.
        if fileobj is not None:
            copyfileobj(fileobj, self.fileobj, tarinfo.size)
            blocks, remainder = divmod(tarinfo.size, BLOCKSIZE)
            if remainder > 0:
                self.fileobj.write(NUL * (BLOCKSIZE - remainder))
                blocks += 1
            self.offset += blocks * BLOCKSIZE

        self.members.append(tarinfo)
views.py 文件源码 项目:two1-deep-learning 作者: 21dotco 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def fetch_files(data, filepath_dict):
    '''
        Fetch the files given by urls in data['style'] and data['content']
        and save them to the corresponding file paths given in filepath_dict.
    '''
    logger.info('Fetching remote files')
    for key, filepath in filepath_dict.items():
        if key != settings.OUTPUT_SUFFIX:
            file_url = data[key]
            logger.info('Fetching remote {} file: {}'.format(key, file_url))
            response = requests.get(file_url, stream=True)

            if response.status_code == 200:
                with open(filepath, 'wb') as outfile:
                    response.raw.decode_content = True
                    shutil.copyfileobj(response.raw, outfile)
            else:
                raise FileNotFoundError('Received 404 when fetching {}'.format(file_url))
auto_dataset.py 文件源码 项目:aetros-cli 作者: aetros 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def download_image(url, path):
    if os.path.exists(path):
        return True

    headers = {
        'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3112.101 Safari/537.36'
    }
    try:

        r = requests.get(url, stream=True, timeout=9, headers=headers)
        if r.status_code == 200:
            with open(path, 'wb') as f:
                r.raw.decode_content = True
                shutil.copyfileobj(r.raw, f)
                return True
        else:
            print(("Could not download image %s, response %d" % (url, r.status_code)))
    except Exception as e:
        if hasattr(e, 'message'):
            print(("Could not download image %s due to %s" % (url, e.message)))
        else:
            print(("Could not download image %s due to %s" % (url, repr(e))))

    return False


问题


面经


文章

微信
公众号

扫码关注公众号