python类NamedTemporaryFile()的实例源码

utils.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def import_key(keyid):
    key = keyid.strip()
    if (key.startswith('-----BEGIN PGP PUBLIC KEY BLOCK-----') and
            key.endswith('-----END PGP PUBLIC KEY BLOCK-----')):
        juju_log("PGP key found (looks like ASCII Armor format)", level=DEBUG)
        juju_log("Importing ASCII Armor PGP key", level=DEBUG)
        with tempfile.NamedTemporaryFile() as keyfile:
            with open(keyfile.name, 'w') as fd:
                fd.write(key)
                fd.write("\n")

            cmd = ['apt-key', 'add', keyfile.name]
            try:
                subprocess.check_call(cmd)
            except subprocess.CalledProcessError:
                error_out("Error importing PGP key '%s'" % key)
    else:
        juju_log("PGP key found (looks like Radix64 format)", level=DEBUG)
        juju_log("Importing PGP key from keyserver", level=DEBUG)
        cmd = ['apt-key', 'adv', '--keyserver',
               'hkp://keyserver.ubuntu.com:80', '--recv-keys', key]
        try:
            subprocess.check_call(cmd)
        except subprocess.CalledProcessError:
            error_out("Error importing PGP key '%s'" % key)
components.py 文件源码 项目:easydo-ui 作者: easydo-cn 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def dot2graph(self, dot, format='svg'):
        # windows ???????????????????
        # ?????? NamedTemporaryFile ?????
        with NamedTemporaryFile(delete=False) as dotfile:
            dotfile.write(dot)
        outfile = NamedTemporaryFile(delete=False)
        os.system('dot -Efontname=sans -Nfontname=sans %s -o%s -T%s' % (
            dotfile.name, outfile.name, format))
        result = outfile.read()
        outfile.close()
        os.unlink(dotfile.name)
        os.unlink(outfile.name)
        return result
test_rstviewer.py 文件源码 项目:rstviewer 作者: arne-cl 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def test_rs3topng():
    """rs3 file is converted to PNG"""
    png_str = rstviewer.rs3topng(RS3_FILEPATH)

    temp = tempfile.NamedTemporaryFile(suffix='.png', delete=False)
    temp.close()

    rstviewer.rs3topng(RS3_FILEPATH, temp.name)
    with open(temp.name, 'r') as png_file:
        assert png_str == png_file.read()
        os.unlink(temp.name)

    # generated images might not be 100% identical, probably
    # because of the font used
    with open(EXPECTED_PNG1, 'r') as expected_png_file:
        ident1 = png_str == expected_png_file.read()

    with open(EXPECTED_PNG2, 'r') as expected_png_file:
        ident2 = png_str == expected_png_file.read()

    assert ident1 or ident2
test_rstviewer.py 文件源码 项目:rstviewer 作者: arne-cl 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_cli_rs3topng():
    """conversion to PNG on the commandline"""
    temp_png = tempfile.NamedTemporaryFile(suffix='.png', delete=False)
    temp_png.close()

    # calling `rstviewer -f png input.rs3 output.png` will end the program
    # with sys.exit(0), so we'll have to catch this here.
    with pytest.raises(SystemExit) as serr:
        cli(['-f', 'png', RS3_FILEPATH, temp_png.name])
        out, err = pytest.capsys.readouterr()
        assert err == 0

    with open(temp_png.name, 'r') as png_file:
        png_str = png_file.read()
        os.unlink(temp_png.name)

    # generated images might not be 100% identical, probably
    # because of the font used
    with open(EXPECTED_PNG1, 'r') as expected_png_file:
        ident1 = png_str == expected_png_file.read()

    with open(EXPECTED_PNG2, 'r') as expected_png_file:
        ident2 = png_str == expected_png_file.read()

    assert ident1 or ident2
test_run_no_updates_available.py 文件源码 项目:pyupdater-wx-demo 作者: wettenhj 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def setUp(self):
        tempFile = tempfile.NamedTemporaryFile()
        self.fileServerDir = tempFile.name
        tempFile.close()
        os.mkdir(self.fileServerDir)
        os.environ['PYUPDATER_FILESERVER_DIR'] = self.fileServerDir
        privateKey = ed25519.SigningKey(PRIVATE_KEY.encode('utf-8'),
                                        encoding='base64')
        signature = privateKey.sign(six.b(json.dumps(VERSIONS, sort_keys=True)),
                                    encoding='base64').decode()
        VERSIONS['signature'] = signature
        keysFilePath = os.path.join(self.fileServerDir, 'keys.gz')
        with gzip.open(keysFilePath, 'wb') as keysFile:
            keysFile.write(json.dumps(KEYS, sort_keys=True))
        versionsFilePath = os.path.join(self.fileServerDir, 'versions.gz')
        with gzip.open(versionsFilePath, 'wb') as versionsFile:
            versionsFile.write(json.dumps(VERSIONS, sort_keys=True))
        os.environ['WXUPDATEDEMO_TESTING'] = 'True'
        from wxupdatedemo.config import CLIENT_CONFIG
        self.clientConfig = CLIENT_CONFIG
        self.clientConfig.PUBLIC_KEY = PUBLIC_KEY
test_run_update_available.py 文件源码 项目:pyupdater-wx-demo 作者: wettenhj 项目源码 文件源码 阅读 51 收藏 0 点赞 0 评论 0
def setUp(self):
        tempFile = tempfile.NamedTemporaryFile()
        self.fileServerDir = tempFile.name
        tempFile.close()
        os.mkdir(self.fileServerDir)
        os.environ['PYUPDATER_FILESERVER_DIR'] = self.fileServerDir
        privateKey = ed25519.SigningKey(PRIVATE_KEY.encode('utf-8'),
                                        encoding='base64')
        signature = privateKey.sign(six.b(json.dumps(VERSIONS, sort_keys=True)),
                                    encoding='base64').decode()
        VERSIONS['signature'] = signature
        keysFilePath = os.path.join(self.fileServerDir, 'keys.gz')
        with gzip.open(keysFilePath, 'wb') as keysFile:
            keysFile.write(json.dumps(KEYS, sort_keys=True))
        versionsFilePath = os.path.join(self.fileServerDir, 'versions.gz')
        with gzip.open(versionsFilePath, 'wb') as versionsFile:
            versionsFile.write(json.dumps(VERSIONS, sort_keys=True))
        os.environ['WXUPDATEDEMO_TESTING'] = 'True'
        from wxupdatedemo.config import CLIENT_CONFIG
        self.clientConfig = CLIENT_CONFIG
        self.clientConfig.PUBLIC_KEY = PUBLIC_KEY
        self.clientConfig.APP_NAME = APP_NAME
metadata.py 文件源码 项目:rca-evaluation 作者: sieve-microservices 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def _atomic_write(filename):
    path = os.path.dirname(filename)
    try:
        file = tempfile.NamedTemporaryFile(delete=False, dir=path, mode="w+")
        yield file
        file.flush()
        os.fsync(file.fileno())
        os.rename(file.name, filename)
    finally:
        try:
            os.remove(file.name)
        except OSError as e:
            if e.errno == 2:
                pass
            else:
                raise e
app_test.py 文件源码 项目:service-fabric-cli 作者: Azure 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def upload_to_fileshare_test(self): #pylint: disable=no-self-use
        """Upload copies files to non-native store correctly with no
        progress"""
        import shutil
        import tempfile
        temp_file = tempfile.NamedTemporaryFile(dir=tempfile.mkdtemp())
        temp_src_dir = os.path.dirname(temp_file.name)
        temp_dst_dir = tempfile.mkdtemp()
        shutil_mock = MagicMock()
        shutil_mock.copyfile.return_value = None
        with patch('sfctl.custom_app.shutil', new=shutil_mock):
            sf_c.upload_to_fileshare(temp_src_dir, temp_dst_dir, False)
            shutil_mock.copyfile.assert_called_once()
        temp_file.close()
        shutil.rmtree(os.path.dirname(temp_file.name))
        shutil.rmtree(temp_dst_dir)
mapprint.py 文件源码 项目:geo-pyprint 作者: ioda-net 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def _create_pdf_pdftk(self):
        with NamedTemporaryFile(
            mode='wb+',
            prefix='geo-pyprint_',
            delete=True
        ) as map_image_file:
            self._map_image.save(map_image_file, 'PNG')
            map_image_file.flush()

            # TODO: use the configuration to select the template
            # TODO: use the configuration to select the name of the key in the template
            pdfjinja = PdfJinja('pdfjinja-template.pdf')
            pdfout = pdfjinja(dict(map=map_image_file.name))

            with NamedTemporaryFile(
                mode='wb+',
                prefix='geo-pyprint_',
                suffix='.pdf',
                delete=False
            ) as output_file:
                pdfout.write(output_file)
                output_file.flush()

                return output_file.name
windows.py 文件源码 项目:NeoVintageous 作者: NeoVintageous 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def filter_region(view, txt, command):
    try:
        contents = tempfile.NamedTemporaryFile(suffix='.txt', delete=False)
        contents.write(txt.encode('utf-8'))
        contents.close()

        script = tempfile.NamedTemporaryFile(suffix='.bat', delete=False)
        script.write(('@echo off\ntype %s | %s' % (contents.name, command)).encode('utf-8'))
        script.close()

        p = subprocess.Popen([script.name],
                             stdout=PIPE,
                             stderr=PIPE,
                             startupinfo=get_startup_info())

        out, err = p.communicate()
        return (out or err).decode(get_oem_cp()).replace('\r\n', '\n')[:-1].strip()
    finally:
        os.remove(script.name)
        os.remove(contents.name)
test_machine.py 文件源码 项目:python-libjuju 作者: juju 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_scp(event_loop):
    async with base.CleanModel() as model:
        await model.add_machine()
        await asyncio.wait_for(
            model.block_until(lambda: model.machines),
            timeout=240)
        machine = model.machines['0']
        await asyncio.wait_for(
            model.block_until(lambda: (machine.status == 'running' and
                                       machine.agent_status == 'started')),
            timeout=480)

        with NamedTemporaryFile() as f:
            f.write(b'testcontents')
            f.flush()
            await machine.scp_to(f.name, 'testfile')

        with NamedTemporaryFile() as f:
            await machine.scp_from('testfile', f.name)
            assert f.read() == b'testcontents'
test_unit.py 文件源码 项目:python-libjuju 作者: juju 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def test_scp(event_loop):
    async with base.CleanModel() as model:
        app = await model.deploy('ubuntu')

        await asyncio.wait_for(
            model.block_until(lambda: app.units),
            timeout=60)
        unit = app.units[0]
        await asyncio.wait_for(
            model.block_until(lambda: unit.machine),
            timeout=60)
        machine = unit.machine
        await asyncio.wait_for(
            model.block_until(lambda: (machine.status == 'running' and
                                       machine.agent_status == 'started')),
            timeout=480)

        with NamedTemporaryFile() as f:
            f.write(b'testcontents')
            f.flush()
            await unit.scp_to(f.name, 'testfile')

        with NamedTemporaryFile() as f:
            await unit.scp_from('testfile', f.name)
            assert f.read() == b'testcontents'
model.py 文件源码 项目:python-libjuju 作者: juju 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def add_local_charm_dir(self, charm_dir, series):
        """Upload a local charm to the model.

        This will automatically generate an archive from
        the charm dir.

        :param charm_dir: Path to the charm directory
        :param series: Charm series

        """
        fh = tempfile.NamedTemporaryFile()
        CharmArchiveGenerator(charm_dir).make_archive(fh.name)
        with fh:
            func = partial(
                self.add_local_charm, fh, series, os.stat(fh.name).st_size)
            charm_url = await self._connector.loop.run_in_executor(None, func)

        log.debug('Uploaded local charm: %s -> %s', charm_dir, charm_url)
        return charm_url
test_data_structure.py 文件源码 项目:flask-basic-roles 作者: ownaginatious 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def test_load_config_file(self):

        with NamedTemporaryFile(mode='w+t') as f:

            f.write("user_a:password_a:role_a,role_b\n")
            f.write("user_b:password_b:role_b,role_c\n")
            f.write("user_c:password_c:role_c,role_c\n")
            f.flush()
            self.auth.load_from_file(f.name)

        # Assert user equality.
        self.assertEqual(self.auth.users,
                         {
                             'user_a': 'password_a',
                             'user_b': 'password_b',
                             'user_c': 'password_c'
                         })

        # Assert role equality.
        self.assertEqual(self.auth.roles,
                         {
                             'user_a': set(('role_a', 'role_b')),
                             'user_b': set(('role_b', 'role_c')),
                             'user_c': set(('role_c', 'role_c'))
                         })
test_data_structure.py 文件源码 项目:flask-basic-roles 作者: ownaginatious 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_save_config_file(self):

        self.auth.add_user('user_a', 'password_a')
        self.auth.add_roles('user_a', ('role_a', 'role_b'))

        self.auth.add_user('user_b', 'password_b')
        self.auth.add_roles('user_b', ('role_b', 'role_c'))

        self.auth.add_user('user_c', 'password_c')
        self.auth.add_roles('user_c', ('role_c', 'role_c'))

        with NamedTemporaryFile(mode='w+t') as f:

            self.auth.save_to_file(f.name)

            expected = [
                "user_a:password_a:role_a,role_b",
                "user_b:password_b:role_b,role_c",
                "user_c:password_c:role_c"
            ]

            for a, e in zip(f.readlines(), expected):
                self.assertEqual(a.strip(), e)
utils.py 文件源码 项目:seq2seq 作者: google 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def create_temp_parallel_data(sources, targets):
  """
  Creates a temporary TFRecords file.

  Args:
    source: List of source sentences
    target: List of target sentences

  Returns:
    A tuple (sources_file, targets_file).
  """
  file_source = tempfile.NamedTemporaryFile()
  file_target = tempfile.NamedTemporaryFile()
  file_source.write("\n".join(sources).encode("utf-8"))
  file_source.flush()
  file_target.write("\n".join(targets).encode("utf-8"))
  file_target.flush()
  return file_source, file_target
utils.py 文件源码 项目:seq2seq 作者: google 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def create_temp_tfrecords(sources, targets):
  """
  Creates a temporary TFRecords file.

  Args:
    source: List of source sentences
    target: List of target sentences

  Returns:
    A tuple (sources_file, targets_file).
  """

  output_file = tempfile.NamedTemporaryFile()
  writer = tf.python_io.TFRecordWriter(output_file.name)
  for source, target in zip(sources, targets):
    ex = tf.train.Example()
    #pylint: disable=E1101
    ex.features.feature["source"].bytes_list.value.extend(
        [source.encode("utf-8")])
    ex.features.feature["target"].bytes_list.value.extend(
        [target.encode("utf-8")])
    writer.write(ex.SerializeToString())
  writer.close()

  return output_file
utils.py 文件源码 项目:seq2seq 作者: google 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def create_temporary_vocab_file(words, counts=None):
  """
  Creates a temporary vocabulary file.

  Args:
    words: List of words in the vocabulary

  Returns:
    A temporary file object with one word per line
  """
  vocab_file = tempfile.NamedTemporaryFile()
  if counts is None:
    for token in words:
      vocab_file.write((token + "\n").encode("utf-8"))
  else:
    for token, count in zip(words, counts):
      vocab_file.write("{}\t{}\n".format(token, count).encode("utf-8"))
  vocab_file.flush()
  return vocab_file
test_qrcode.py 文件源码 项目:segno 作者: heuer 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def test_save_svgz_filename():
    import gzip
    qr = segno.make_qr('test')
    f = tempfile.NamedTemporaryFile('wb', suffix='.svgz', delete=False)
    f.close()
    qr.save(f.name)
    f = open(f.name, mode='rb')
    expected = b'\x1f\x8b\x08'  # gzip magic number
    val = f.read(len(expected))
    f.close()
    f = gzip.open(f.name)
    try:
        content = f.read(6)
    finally:
        f.close()
    os.unlink(f.name)
    assert expected == val
    assert b'<?xml ' == content
test_svg.py 文件源码 项目:segno 作者: heuer 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def test_write_unicode_filename():
    qr = segno.make_qr('test')
    f = tempfile.NamedTemporaryFile('wt', suffix='.svg', delete=False)
    f.close()
    title = 'mürrische Mädchen'
    desc = '?'
    qr.save(f.name, title=title, desc=desc)
    f = open(f.name, mode='rb')
    root = _parse_xml(f)
    f.seek(0)
    val = f.read(6)
    f.close()
    os.unlink(f.name)
    assert b'<?xml ' == val
    assert title == _get_title(root).text
    assert desc == _get_desc(root).text
test_tool.py 文件源码 项目:oscars2016 作者: 0x0ece 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_infile_outfile(self):
        with tempfile.NamedTemporaryFile() as infile:
            infile.write(self.data.encode())
            infile.flush()
            # outfile will get overwritten by tool, so the delete
            # may not work on some platforms. Do it manually.
            outfile = tempfile.NamedTemporaryFile()
            try:
                self.assertEqual(
                    self.runTool(args=[infile.name, outfile.name]),
                    ''.encode())
                with open(outfile.name, 'rb') as f:
                    self.assertEqual(f.read(), self.expect.encode())
            finally:
                outfile.close()
                if os.path.exists(outfile.name):
                    os.unlink(outfile.name)
test_utils_dirhasher.py 文件源码 项目:bob 作者: BobBuildTool 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def testRewriteFile(self):
        """Changing the file content should change the hash sum"""

        with NamedTemporaryFile() as index:
            with TemporaryDirectory() as tmp:
                with open(os.path.join(tmp, "foo"), 'wb') as f:
                    f.write(b'abc')
                sum1 = hashDirectory(tmp, index.name)

                with open(index.name, "rb") as f:
                    assert f.read(4) == b'BOB1'

                with open(os.path.join(tmp, "foo"), 'wb') as f:
                    f.write(b'qwer')
                sum2 = hashDirectory(tmp, index.name)

                with open(index.name, "rb") as f:
                    assert f.read(4) == b'BOB1'

                assert sum1 != sum2
RemoteGraphicsView.py 文件源码 项目:NeoAnalysis 作者: neoanalysis 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def __init__(self, *args, **kwds):
        ## Create shared memory for rendered image
        #pg.dbg(namespace={'r': self})
        if sys.platform.startswith('win'):
            self.shmtag = "pyqtgraph_shmem_" + ''.join([chr((random.getrandbits(20)%25) + 97) for i in range(20)])
            self.shm = mmap.mmap(-1, mmap.PAGESIZE, self.shmtag) # use anonymous mmap on windows
        else:
            self.shmFile = tempfile.NamedTemporaryFile(prefix='pyqtgraph_shmem_')
            self.shmFile.write(b'\x00' * (mmap.PAGESIZE+1))
            fd = self.shmFile.fileno()
            self.shm = mmap.mmap(fd, mmap.PAGESIZE, mmap.MAP_SHARED, mmap.PROT_WRITE)
        atexit.register(self.close)

        GraphicsView.__init__(self, *args, **kwds)
        self.scene().changed.connect(self.update)
        self.img = None
        self.renderTimer = QtCore.QTimer()
        self.renderTimer.timeout.connect(self.renderView)
        self.renderTimer.start(16)
test_svg.py 文件源码 项目:NeoAnalysis 作者: neoanalysis 项目源码 文件源码 阅读 48 收藏 0 点赞 0 评论 0
def test_plotscene():
    tempfilename = tempfile.NamedTemporaryFile(suffix='.svg').name
    print("using %s as a temporary file" % tempfilename)
    pg.setConfigOption('foreground', (0,0,0))
    w = pg.GraphicsWindow()
    w.show()        
    p1 = w.addPlot()
    p2 = w.addPlot()
    p1.plot([1,3,2,3,1,6,9,8,4,2,3,5,3], pen={'color':'k'})
    p1.setXRange(0,5)
    p2.plot([1,5,2,3,4,6,1,2,4,2,3,5,3], pen={'color':'k', 'cosmetic':False, 'width': 0.3})
    app.processEvents()
    app.processEvents()

    ex = pg.exporters.SVGExporter(w.scene())
    ex.export(fileName=tempfilename)
    # clean up after the test is done
    os.unlink(tempfilename)
test_svg.py 文件源码 项目:NeoAnalysis 作者: neoanalysis 项目源码 文件源码 阅读 43 收藏 0 点赞 0 评论 0
def test_plotscene():
    tempfilename = tempfile.NamedTemporaryFile(suffix='.svg').name
    print("using %s as a temporary file" % tempfilename)
    pg.setConfigOption('foreground', (0,0,0))
    w = pg.GraphicsWindow()
    w.show()        
    p1 = w.addPlot()
    p2 = w.addPlot()
    p1.plot([1,3,2,3,1,6,9,8,4,2,3,5,3], pen={'color':'k'})
    p1.setXRange(0,5)
    p2.plot([1,5,2,3,4,6,1,2,4,2,3,5,3], pen={'color':'k', 'cosmetic':False, 'width': 0.3})
    app.processEvents()
    app.processEvents()

    ex = pg.exporters.SVGExporter(w.scene())
    ex.export(fileName=tempfilename)
    # clean up after the test is done
    os.unlink(tempfilename)
test_config.py 文件源码 项目:Projects 作者: it2school 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def test_css(Chart):
    """Test css file option"""
    css = "{{ id }}text { fill: #bedead; }\n"
    with NamedTemporaryFile('w') as f:
        f.write(css)
        f.flush()

        config = Config()
        config.css.append('file://' + f.name)

        chart = Chart(config)
        chart.add('/', [10, 1, 5])
        svg = chart.render().decode('utf-8')
        assert '#bedead' in svg

        chart = Chart(css=(_ellipsis, 'file://' + f.name))
        chart.add('/', [10, 1, 5])
        svg = chart.render().decode('utf-8')
        assert '#bedead' in svg
test_util.py 文件源码 项目:runcommands 作者: wylee 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def copy(contents, config=None, destination_dir=False, **kwargs):
    if config is None:
        config = Config(xyz='123')

    with NamedTemporaryFile('w', delete=False) as tp:
        tp.write(contents)

    source = tp.name

    if destination_dir:
        with TemporaryDirectory() as destination:
            path = copy_file(config, source, destination, **kwargs)
            yield source, destination, path
            os.remove(source)
    else:
        destination = source + '.copy'
        path = copy_file(config, source, destination, **kwargs)
        yield source, destination, path
        os.remove(source)
        os.remove(path)
editor.py 文件源码 项目:Flask_Blog 作者: sugarguo 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def edit(filename=None, contents=None):
    editor = get_editor()
    args = get_editor_args(os.path.basename(os.path.realpath(editor)))
    args = [editor] + args.split(' ')

    if filename is None:
        tmp = tempfile.NamedTemporaryFile()
        filename = tmp.name

    if contents is not None:
        with open(filename, mode='wb') as f:
            f.write(contents)

    args += [filename]

    proc = subprocess.Popen(args, close_fds=True)
    proc.communicate()

    with open(filename, mode='rb') as f:
        return f.read()
test_io.py 文件源码 项目:htsget 作者: jeromekelleher 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def test_simple_case(self):
        ticket_url = "http://ticket.com"
        data_url = "http://data.url.com"
        headers = {"a": "a", "xyz": "ghj"}
        ticket = {"htsget": {
            "urls": [{"url": data_url, "headers": headers}]}}
        data = b"0" * 1024
        returned_response = MockedResponse(json.dumps(ticket).encode(), data)
        with mock.patch("requests.get", return_value=returned_response) as mocked_get:
            with tempfile.NamedTemporaryFile("wb+") as f:
                htsget.get(ticket_url, f)
                f.seek(0)
                self.assertEqual(f.read(), data)
            self.assertEqual(mocked_get.call_count, 2)
            # Note that we only get the arguments for the last call using this method.
            args, kwargs = mocked_get.call_args
            self.assertEqual(args[0], data_url)
            self.assertEqual(kwargs["headers"], headers)
            self.assertEqual(kwargs["stream"], True)
test_io.py 文件源码 项目:htsget 作者: jeromekelleher 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def test_bearer_token(self):
        ticket_url = "http://ticket.com"
        ticket = {"htsget": {"urls": []}}
        bearer_token = "x" * 1024
        returned_response = MockedTicketResponse(json.dumps(ticket).encode())
        with mock.patch("requests.get", return_value=returned_response) as mocked_get:
            with tempfile.NamedTemporaryFile("wb+") as f:
                htsget.get(ticket_url, f, bearer_token=bearer_token)
                f.seek(0)
                self.assertEqual(f.read(), b"")
            # Because we have no URLs in the returned ticked, it should be called
            # only once.
            self.assertEqual(mocked_get.call_count, 1)
            # Note that we only get the arguments for the last call using this method.
            args, kwargs = mocked_get.call_args
            self.assertEqual(args[0], ticket_url)
            headers = {"Authorization": "Bearer {}".format(bearer_token)}
            self.assertEqual(kwargs["headers"], headers)
            self.assertEqual(kwargs["stream"], True)


问题


面经


文章

微信
公众号

扫码关注公众号