python类NamedTemporaryFile()的实例源码

test_io.py 文件源码 项目:htsget 作者: jeromekelleher 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_no_bearer_token(self):
        ticket_url = "http://ticket.com"
        ticket = {"htsget": {"urls": []}}
        returned_response = MockedTicketResponse(json.dumps(ticket).encode())
        with mock.patch("requests.get", return_value=returned_response) as mocked_get:
            with tempfile.NamedTemporaryFile("wb+") as f:
                htsget.get(ticket_url, f)
                f.seek(0)
                self.assertEqual(f.read(), b"")
            # Because we have no URLs in the returned ticked, it should be called
            # only once.
            self.assertEqual(mocked_get.call_count, 1)
            # Note that we only get the arguments for the last call using this method.
            args, kwargs = mocked_get.call_args
            self.assertEqual(args[0], ticket_url)
            headers = {}
            self.assertEqual(kwargs["headers"], headers)
            self.assertEqual(kwargs["stream"], True)
test_io.py 文件源码 项目:htsget 作者: jeromekelleher 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def test_ticket_char_by_char(self):
        # Tests the streaming code for the ticket response.
        ticket_url = "http://ticket.com"
        ticket = {"htsget": {"urls": []}, "padding": "X" * 10}
        returned_response = MockedTicketResponse(
                json.dumps(ticket).encode(), char_by_char=True)
        with mock.patch("requests.get", return_value=returned_response) as mocked_get:
            with tempfile.NamedTemporaryFile("wb+") as f:
                htsget.get(ticket_url, f)
                f.seek(0)
                self.assertEqual(f.read(), b"")
            # Because we have no URLs in the returned ticked, it should be called
            # only once.
            self.assertEqual(mocked_get.call_count, 1)
            # Note that we only get the arguments for the last call using this method.
            args, kwargs = mocked_get.call_args
            self.assertEqual(args[0], ticket_url)
            headers = {}
            self.assertEqual(kwargs["headers"], headers)
            self.assertEqual(kwargs["stream"], True)
recipe-580672.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def pyrun(src):
        """Run python code 'src' in a separate interpreter.
        Return subprocess exit code.
        """
        if PY3:
            src = bytes(src, 'ascii')
        with tempfile.NamedTemporaryFile(suffix='.py', delete=False) as f:
            f.write(src)
            f.flush()
            test_files.append(f.name)
            code = subprocess.call(
                [sys.executable, f.name],
                stdout=None, stderr=None,
                # creationflags=subprocess.CREATE_NEW_PROCESS_GROUP
            )
        return code
pickle_test.py 文件源码 项目:girder_worker 作者: girder 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_inputs_from_file(self):
        """Run a task with base64 inputs in a file."""
        a = tempfile.NamedTemporaryFile()
        b = tempfile.NamedTemporaryFile()

        convert(
            'python',
            {'format': 'object', 'data': (0, 1)},
            {'format': 'pickle.base64', 'mode': 'local', 'path': a.name}
        )

        convert(
            'python',
            {'format': 'object', 'data': 2},
            {'format': 'pickle.base64', 'mode': 'local', 'path': b.name}
        )

        outputs = self.run_basic_task({
            'a': {'format': 'pickle.base64', 'mode': 'local', 'path': a.name},
            'b': {'format': 'pickle.base64', 'mode': 'local', 'path': b.name}
        })

        self.assertEqual(outputs.get('c'), (0, 1, 0, 1))
        self.assertEqual(outputs.get('d'), 4)
__init__.py 文件源码 项目:girder_worker 作者: girder 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _inline_fetch(spec, **kwargs):
    taskInput = kwargs.get('task_input', {})
    target = taskInput.get('target', 'memory')
    if target == 'filepath':
        # Ensure we have a trailing slash
        tmpDir = os.path.join(kwargs['_tempdir'], '')

        if 'filename' in taskInput:
            filename = taskInput['filename']
            path = os.path.join(tmpDir, filename)
            with open(path, 'wb') as out:
                out.write(spec['data'])
        else:
            with tempfile.NamedTemporaryFile(
                    'wb', prefix=tmpDir, delete=False) as out:
                out.write(spec['data'])
                path = out.name

        return path
    elif target == 'memory':
        return spec['data']
    else:
        raise Exception('Invalid fetch target: ' + target)
test_jsonschema.py 文件源码 项目:sphinxcontrib-jsonschema 作者: tk0miya 项目源码 文件源码 阅读 59 收藏 0 点赞 0 评论 0
def test_instantiate(self):
        try:
            tmpdir = mkdtemp()
            tmpfile = NamedTemporaryFile('w+t', dir=tmpdir)

            data = {'type': 'string'}
            tmpfile.write(json.dumps(data))
            tmpfile.seek(0)

            # load from string
            schema = JSONSchema.loads(json.dumps(data))
            self.assertEqual(data, schema.attributes)

            # load from readable object
            schema = JSONSchema.load(tmpfile)
            self.assertEqual(data, schema.attributes)

            # load from file
            schema = JSONSchema.loadfromfile(tmpfile.name)
            self.assertEqual(data, schema.attributes)
        finally:
            tmpfile.close()
            rmtree(tmpdir)
test_pcs_converter.py 文件源码 项目:ConfigSpace 作者: automl 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_read_write(self):
        # Some smoke tests whether reading, writing, reading alters makes the
        #  configspace incomparable
        this_file = os.path.abspath(__file__)
        this_directory = os.path.dirname(this_file)
        configuration_space_path = os.path.join(this_directory,
                                                "..", "test_searchspaces")
        configuration_space_path = os.path.abspath(configuration_space_path)
        configuration_space_path = os.path.join(configuration_space_path,
                                                "spear-params-mixed.pcs")
        with open(configuration_space_path) as fh:
            cs = pcs.read(fh)

        tf = tempfile.NamedTemporaryFile()
        name = tf.name
        tf.close()
        with open(name, 'w') as fh:
            pcs_string = pcs.write(cs)
            fh.write(pcs_string)
        with open(name, 'r') as fh:
            pcs_new = pcs.read(fh)

        self.assertEqual(pcs_new, cs, msg=(pcs_new, cs))
test_datafs.py 文件源码 项目:DataFS 作者: ClimateImpactLab 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def update_and_hash(arch, contents):
    '''
    Save contents to archive ``arch`` and return the DataAPI's hash value
    '''

    f = tempfile.NamedTemporaryFile(delete=False)

    try:
        f.write(contents)
        f.close()

        apihash = arch.api.hash_file(f.name)['checksum']
        arch.update(f.name)

    finally:
        os.remove(f.name)

    return apihash
pdf.py 文件源码 项目:document_clipper 作者: reclamador 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def image_to_pdf(self, img, pdf_path=None, **kwargs):
        """
        Convert image to pdf.
        :param img: image file opened by PIL
        :param pdf_path: path to save pdf
        :param kwargs: any parameter accepted by Image.save i.e. quality
        :return:
        """
        processor = ResizeToFit(width=self.max_size_in_pixels[0], height=self.max_size_in_pixels[1])
        img = processor.process(img)
        # Create a white canvas and paste the image
        final_img_width = min(img.size[0], self.max_size_in_pixels[0])
        final_img_height = min(img.size[1], self.max_size_in_pixels[1])
        tmp_image = Image.new("RGB", (final_img_width, final_img_height), "white")
        margin_left = 0
        margin_top = 0
        tmp_image.paste(img, (margin_left, margin_top,
                              final_img_width, final_img_height))

        # Save the image as .pdf file
        if not pdf_path:
            f = NamedTemporaryFile(delete=False)
            pdf_path = f.name
        tmp_image.save(pdf_path, "PDF", resolution=100.0, **kwargs)
        return pdf_path
compression.py 文件源码 项目:TACTIC-Handler 作者: listyque 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def prepend(line, path):
    """
    Appends *line* to the _beginning_ of the file at the given *path*.

    If *line* doesn't end in a newline one will be appended to the end of it.
    """
    if isinstance(line, str):
        line = line.encode('utf-8')
    if not line.endswith(b'\n'):
        line += b'\n'
    temp = tempfile.NamedTemporaryFile('wb')
    temp_name = temp.name # We really only need a random path-safe name
    temp.close()
    with open(temp_name, 'wb') as temp:
        temp.write(line)
        with open(path, 'rb') as r:
            temp.write(r.read())
    # Now replace the original with the modified version
    shutil.move(temp_name, path)
json_export.py 文件源码 项目:FRG-Crowdsourcing 作者: 97amarnathk 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def _make_zip(self, project, ty):
        name = self._project_name_latin_encoded(project)
        json_task_generator = self._respond_json(ty, project.id)
        if json_task_generator is not None:
            datafile = tempfile.NamedTemporaryFile()
            try:
                datafile.write(json.dumps(json_task_generator))
                datafile.flush()
                zipped_datafile = tempfile.NamedTemporaryFile()
                try:
                    _zip = self._zip_factory(zipped_datafile.name)
                    _zip.write(datafile.name, secure_filename('%s_%s.json' % (name, ty)))
                    _zip.close()
                    container = "user_%d" % project.owner_id
                    _file = FileStorage(filename=self.download_name(project, ty), stream=zipped_datafile)
                    uploader.upload_file(_file, container=container)
                finally:
                    zipped_datafile.close()
            finally:
                datafile.close()
csv_export.py 文件源码 项目:FRG-Crowdsourcing 作者: 97amarnathk 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _make_zip(self, project, ty):
        name = self._project_name_latin_encoded(project)
        csv_task_generator = self._respond_csv(ty, project.id)
        if csv_task_generator is not None:
            # TODO: use temp file from csv generation directly
            datafile = tempfile.NamedTemporaryFile()
            try:
                for line in csv_task_generator:
                    datafile.write(str(line))
                datafile.flush()
                csv_task_generator.close()  # delete temp csv file
                zipped_datafile = tempfile.NamedTemporaryFile()
                try:
                    _zip = self._zip_factory(zipped_datafile.name)
                    _zip.write(
                        datafile.name, secure_filename('%s_%s.csv' % (name, ty)))
                    _zip.close()
                    container = "user_%d" % project.owner_id
                    _file = FileStorage(
                        filename=self.download_name(project, ty), stream=zipped_datafile)
                    uploader.upload_file(_file, container=container)
                finally:
                    zipped_datafile.close()
            finally:
                datafile.close()
test_util.py 文件源码 项目:FRG-Crowdsourcing 作者: 97amarnathk 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def test_UnicodeWriter(self):
        """Test UnicodeWriter class works."""
        tmp = tempfile.NamedTemporaryFile()
        uw = util.UnicodeWriter(tmp)
        fake_csv = ['one, two, three, {"i": 1}']
        for row in csv.reader(fake_csv):
            # change it for a dict
            row[3] = dict(i=1)
            uw.writerow(row)
        tmp.seek(0)
        err_msg = "It should be the same CSV content"
        with open(tmp.name, 'rb') as f:
            reader = csv.reader(f)
            for row in reader:
                for item in row:
                    assert item in fake_csv[0], err_msg
__init__.py 文件源码 项目:binja_dynamics 作者: nccgroup 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def set_arguments(arguments, _view):
    version = get_version(_view).host_version
    if 'gdb' in version:
        # Voltron doesn't like commands that aren't UTF-8, but for exploit work we're going to need
        # arbitary byte support. A named temporary file that we can source commands from is the best
        # solution I've come up with so far, despite the fact that it's inelegant.
        with tempfile.NamedTemporaryFile() as tempf:
            tempf.write('set args ')
            tempf.write(arguments)
            tempf.write('\n')
            tempf.flush()
            binjatron.custom_request("command", _build_command_dict("source " + tempf.name))
    elif 'lldb' in version:
        with tempfile.NamedTemporaryFile() as tempf:
            tempf.write('settings set target.run-args ')
            tempf.write(arguments)
            tempf.write('\n')
            tempf.flush()
            binjatron.custom_request("command", _build_command_dict("command source " + tempf.name))
ansible_api.py 文件源码 项目:geekcloud 作者: Mr-Linus 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def gen_sudo_script(role_list, sudo_list):
        # receive role_list = [role1, role2] sudo_list = [sudo1, sudo2]
        # return sudo_alias={'NETWORK': '/sbin/ifconfig, /ls'} sudo_user={'user1': ['NETWORK', 'SYSTEM']}
        sudo_alias = {}
        sudo_user = {}
        for sudo in sudo_list:
            sudo_alias[sudo.name] = sudo.commands

        for role in role_list:
            sudo_user[role.name] = ','.join(sudo_alias.keys())

        sudo_j2 = get_template('jperm/role_sudo.j2')
        sudo_content = sudo_j2.render(Context({"sudo_alias": sudo_alias, "sudo_user": sudo_user}))
        sudo_file = NamedTemporaryFile(delete=False)
        sudo_file.write(sudo_content)
        sudo_file.close()
        return sudo_file.name
test_media.py 文件源码 项目:instagram_private_api_extensions 作者: ping 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def test_prepare_video2(self):
        video_content, size, duration, thumbnail_content = media.prepare_video(
            self.TEST_VIDEO_PATH, max_size=(480, 480), min_size=(0, 0))
        self.assertEqual(duration, self.TEST_VIDEO_DURATION, 'Duration changed.')
        self.assertLessEqual(size[0], 480, 'Invalid width.')
        self.assertLessEqual(size[1], 480, 'Invalid height.')
        self.assertEqual(
            1.0 * size[0] / size[1],
            1.0 * self.TEST_VIDEO_SIZE[0] / self.TEST_VIDEO_SIZE[1],
            'Aspect ratio changed.')
        self.assertGreater(len(video_content), 0, 'No video content returned.')
        self.assertGreater(len(thumbnail_content), 0, 'No thumbnail content returned.')

        # Save video, thumbnail content and verify attributes
        video_output = tempfile.NamedTemporaryFile(prefix='ipae_test_', suffix='.mp4', delete=False)
        video_output.write(video_content)
        video_output.close()
        vidclip_output = VideoFileClip(video_output.name)
        self.assertAlmostEqual(duration, vidclip_output.duration, places=1)
        self.assertEqual(size[0], vidclip_output.size[0])
        self.assertEqual(size[1], vidclip_output.size[1])

        im = Image.open(io.BytesIO(thumbnail_content))
        self.assertEqual(size[0], im.size[0])
        self.assertEqual(size[1], im.size[1])
test_media.py 文件源码 项目:instagram_private_api_extensions 作者: ping 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_remote_video(self):
        video_url = 'https://raw.githubusercontent.com/johndyer/mediaelement-files/master/big_buck_bunny.mp4'
        video_content, size, duration, thumbnail_content = media.prepare_video(
            video_url, aspect_ratios=1.0, max_duration=10)
        self.assertEqual(duration, 10.0, 'Invalid duration.')
        self.assertEqual(size[0], size[1], 'Invalid width/length.')
        self.assertGreater(len(video_content), 0, 'No video content returned.')
        self.assertGreater(len(thumbnail_content), 0, 'No thumbnail content returned.')

        # Save video, thumbnail content and verify attributes
        video_output = tempfile.NamedTemporaryFile(prefix='ipae_test_', suffix='.mp4', delete=False)
        video_output.write(video_content)
        video_output.close()
        vidclip_output = VideoFileClip(video_output.name)
        self.assertAlmostEqual(duration, vidclip_output.duration, places=1)
        self.assertEqual(size[0], vidclip_output.size[0])
        self.assertEqual(size[1], vidclip_output.size[1])

        im = Image.open(io.BytesIO(thumbnail_content))
        self.assertEqual(size[0], im.size[0])
        self.assertEqual(size[1], im.size[1])
file_edit.py 文件源码 项目:roamer 作者: abaldwin88 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def file_editor(content):
    with tempfile.NamedTemporaryFile(suffix=".roamer") as temp:
        if sys.version_info[0] == 3:
            content = content.encode('utf-8')
        temp.write(content)
        temp.flush()
        if EXTRA_EDITOR_COMMAND:
            exit_code = call([EDITOR, EXTRA_EDITOR_COMMAND, temp.name])
        else:
            exit_code = call(EDITOR.split() + [temp.name])
        if exit_code != 0:
            sys.exit()
        temp.seek(0)
        output = temp.read()
        if sys.version_info[0] == 3:
            output = output.decode('UTF-8')
        return output
utils.py 文件源码 项目:Gnome-Authenticator 作者: bil-elmoussaoui 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def screenshot_area():
    """
        Screenshot an area of the screen using gnome-screenshot
        used to QR scan
    """
    ink_flag = call(['which', 'gnome-screenshot'], stdout=PIPE, stderr=PIPE)
    if ink_flag == 0:
        file_name = path.join(GLib.get_tmp_dir(), NamedTemporaryFile().name)
        p = Popen(["gnome-screenshot", "-a", "-f", file_name],
                  stdout=PIPE, stderr=PIPE)
        output, error = p.communicate()
        if error:
            error = error.decode("utf-8").split("\n")
            logging.error("\n".join([e for e in error]))
        if not path.isfile(file_name):
            logging.debug("The screenshot was not token")
            return False
        return file_name
    else:
        logging.error(
            "Couldn't find gnome-screenshot, please install it first")
        return False
hookenv.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def relation_set(relation_id=None, relation_settings=None, **kwargs):
    """Set relation information for the current unit"""
    relation_settings = relation_settings if relation_settings else {}
    relation_cmd_line = ['relation-set']
    accepts_file = "--file" in subprocess.check_output(
        relation_cmd_line + ["--help"], universal_newlines=True)
    if relation_id is not None:
        relation_cmd_line.extend(('-r', relation_id))
    settings = relation_settings.copy()
    settings.update(kwargs)
    for key, value in settings.items():
        # Force value to be a string: it always should, but some call
        # sites pass in things like dicts or numbers.
        if value is not None:
            settings[key] = "{}".format(value)
    if accepts_file:
        # --file was introduced in Juju 1.23.2. Use it by default if
        # available, since otherwise we'll break if the relation data is
        # too big. Ideally we should tell relation-set to read the data from
        # stdin, but that feature is broken in 1.23.2: Bug #1454678.
        with tempfile.NamedTemporaryFile(delete=False) as settings_file:
            settings_file.write(yaml.safe_dump(settings).encode("utf-8"))
        subprocess.check_call(
            relation_cmd_line + ["--file", settings_file.name])
        os.remove(settings_file.name)
    else:
        for key, value in settings.items():
            if value is None:
                relation_cmd_line.append('{}='.format(key))
            else:
                relation_cmd_line.append('{}={}'.format(key, value))
        subprocess.check_call(relation_cmd_line)
    # Flush cache of any relation-gets for local unit
    flush(local_unit())


问题


面经


文章

微信
公众号

扫码关注公众号