python类mkstemp()的实例源码

Image.py 文件源码 项目:workflows.kyoyue 作者: wizyoung 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _dump(self, file=None, format=None, **options):
        import tempfile
        suffix = ''
        if format:
            suffix = '.'+format
        if not file:
            f, file = tempfile.mkstemp(suffix)
            os.close(f)

        self.load()
        if not format or format == "PPM":
            self.im.save_ppm(file)
        else:
            if not file.endswith(format):
                file = file + "." + format
            self.save(file, format, **options)
        return file
report.py 文件源码 项目:health-mosconi 作者: GNUHealth-Mosconi 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def convert(cls, report, data):
        "converts the report data to another mimetype if necessary"
        input_format = report.template_extension
        output_format = report.extension or report.template_extension

        if output_format in MIMETYPES:
            return output_format, data

        fd, path = tempfile.mkstemp(suffix=(os.extsep + input_format),
            prefix='trytond_')
        oext = FORMAT2EXT.get(output_format, output_format)
        with os.fdopen(fd, 'wb+') as fp:
            fp.write(data)
        cmd = ['unoconv', '--connection=%s' % config.get('report', 'unoconv'),
            '-f', oext, '--stdout', path]
        try:
            proc = subprocess.Popen(cmd, stdout=subprocess.PIPE)
            stdoutdata, stderrdata = proc.communicate()
            if proc.wait() != 0:
                raise Exception(stderrdata)
            return oext, stdoutdata
        finally:
            os.remove(path)
test_passive.py 文件源码 项目:no-YOU-talk-to-the-hand 作者: flashashen 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def test_basic_config():


    fd, path = tempfile.mkstemp()
    f = os.fdopen(fd,'w')
    f.write(yaml.dump(testcfg))
    f.flush()

    cfg = ny.get_config(path)
    ny.write_supervisor_conf()

    config = ConfigParser.ConfigParser()
    config.readfp(open(cfg['supervisor.conf']))

    # from IPython import embed
    # embed()
    print(config.get('program:testtunnel','command'))
    assert 'sshuttle -r 1.1.1.1 2.2.2.2 -x 3.3.3.3' in config.get('program:testtunnel','command')
filebased.py 文件源码 项目:CodingDojo 作者: ComputerSocietyUNB 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def set(self, key, value, timeout=DEFAULT_TIMEOUT, version=None):
        self._createdir()  # Cache dir can be deleted at any time.
        fname = self._key_to_file(key, version)
        self._cull()  # make some room if necessary
        fd, tmp_path = tempfile.mkstemp(dir=self._dir)
        renamed = False
        try:
            with io.open(fd, 'wb') as f:
                expiry = self.get_backend_timeout(timeout)
                f.write(pickle.dumps(expiry, -1))
                f.write(zlib.compress(pickle.dumps(value), -1))
            file_move_safe(tmp_path, fname, allow_overwrite=True)
            renamed = True
        finally:
            if not renamed:
                os.remove(tmp_path)
cache.py 文件源码 项目:zanph 作者: zanph 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def set(self, key, value, timeout=None):
        if timeout is None:
            timeout = int(time() + self.default_timeout)
        elif timeout != 0:
            timeout = int(time() + timeout)
        filename = self._get_filename(key)
        self._prune()
        try:
            fd, tmp = tempfile.mkstemp(suffix=self._fs_transaction_suffix,
                                       dir=self._path)
            with os.fdopen(fd, 'wb') as f:
                pickle.dump(timeout, f, 1)
                pickle.dump(value, f, pickle.HIGHEST_PROTOCOL)
            rename(tmp, filename)
            os.chmod(filename, self._mode)
        except (IOError, OSError):
            return False
        else:
            return True
serving.py 文件源码 项目:zanph 作者: zanph 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def generate_adhoc_ssl_context():
    """Generates an adhoc SSL context for the development server."""
    crypto = _get_openssl_crypto_module()
    import tempfile
    import atexit

    cert, pkey = generate_adhoc_ssl_pair()
    cert_handle, cert_file = tempfile.mkstemp()
    pkey_handle, pkey_file = tempfile.mkstemp()
    atexit.register(os.remove, pkey_file)
    atexit.register(os.remove, cert_file)

    os.write(cert_handle, crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
    os.write(pkey_handle, crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey))
    os.close(cert_handle)
    os.close(pkey_handle)
    ctx = load_ssl_context(cert_file, pkey_file)
    return ctx
cert.py 文件源码 项目:CyberScan 作者: medbenali 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def create_temporary_ca_file(anchor_list):
    """
    Concatenate all the certificates (PEM format for the export) in
    'anchor_list' and write the result to file to a temporary file
    using mkstemp() from tempfile module. On success 'filename' is
    returned, None otherwise.

    If you are used to OpenSSL tools, this function builds a CAfile
    that can be used for certificate and CRL check.

    Also see create_temporary_ca_file().
    """
    try:
        f, fname = tempfile.mkstemp()
        for a in anchor_list:
            s = a.output(fmt="PEM")
            l = os.write(f, s)
        os.close(f)
    except:
        return None
    return fname
workflow.py 文件源码 项目:scriptcwl 作者: NLeSC 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def validate(self, inline=False):
        """Validate workflow object.

        This method currently validates the workflow object with the use of
        cwltool. It writes the workflow to a tmp CWL file, reads it, validates
        it and removes the tmp file again. By default, the workflow is written
        to file using absolute paths to the steps. Optionally, the steps can be
        saved inline.
        """
        # define tmpfile
        (fd, tmpfile) = tempfile.mkstemp()
        os.close(fd)
        try:
            # save workflow object to tmpfile,
            # do not recursively call validate function
            self.save(tmpfile, inline=inline, validate=False, relative=False,
                      wd=False)
            # load workflow from tmpfile
            document_loader, processobj, metadata, uri = load_cwl(tmpfile)
        finally:
            # cleanup tmpfile
            os.remove(tmpfile)
workflow.py 文件源码 项目:scriptcwl 作者: NLeSC 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _pack(self, fname, encoding):
        """Save workflow with ``--pack`` option

        This means that al tools and subworkflows are included in the workflow
        file that is created. A packed workflow cannot be loaded and used in
        scriptcwl.
        """
        (fd, tmpfile) = tempfile.mkstemp()
        os.close(fd)
        try:
            self.save(tmpfile, validate=False, wd=False, inline=False,
                      relative=False, pack=False)
            document_loader, processobj, metadata, uri = load_cwl(tmpfile)
        finally:
            # cleanup tmpfile
            os.remove(tmpfile)

        with codecs.open(fname, 'wb', encoding=encoding) as f:
            f.write(print_pack(document_loader, processobj, uri, metadata))
filebased.py 文件源码 项目:NarshaTech 作者: KimJangHyeon 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def set(self, key, value, timeout=DEFAULT_TIMEOUT, version=None):
        self._createdir()  # Cache dir can be deleted at any time.
        fname = self._key_to_file(key, version)
        self._cull()  # make some room if necessary
        fd, tmp_path = tempfile.mkstemp(dir=self._dir)
        renamed = False
        try:
            with io.open(fd, 'wb') as f:
                expiry = self.get_backend_timeout(timeout)
                f.write(pickle.dumps(expiry, pickle.HIGHEST_PROTOCOL))
                f.write(zlib.compress(pickle.dumps(value, pickle.HIGHEST_PROTOCOL)))
            file_move_safe(tmp_path, fname, allow_overwrite=True)
            renamed = True
        finally:
            if not renamed:
                os.remove(tmp_path)
cert.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def create_temporary_ca_file(anchor_list):
    """
    Concatenate all the certificates (PEM format for the export) in
    'anchor_list' and write the result to file to a temporary file
    using mkstemp() from tempfile module. On success 'filename' is
    returned, None otherwise.

    If you are used to OpenSSL tools, this function builds a CAfile
    that can be used for certificate and CRL check.

    Also see create_temporary_ca_file().
    """
    try:
        f, fname = tempfile.mkstemp()
        for a in anchor_list:
            s = a.output(fmt="PEM")
            l = os.write(f, s)
        os.close(f)
    except:
        return None
    return fname
util.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _findLib_gcc(name):
        expr = r'[^\(\)\s]*lib%s\.[^\(\)\s]*' % re.escape(name)
        fdout, ccout = tempfile.mkstemp()
        os.close(fdout)
        cmd = 'if type gcc >/dev/null 2>&1; then CC=gcc; elif type cc >/dev/null 2>&1; then CC=cc;else exit 10; fi;' \
              'LANG=C LC_ALL=C $CC -Wl,-t -o ' + ccout + ' 2>&1 -l' + name
        try:
            f = os.popen(cmd)
            try:
                trace = f.read()
            finally:
                rv = f.close()
        finally:
            try:
                os.unlink(ccout)
            except OSError, e:
                if e.errno != errno.ENOENT:
                    raise
        if rv == 10:
            raise OSError, 'gcc or cc command not found'
        res = re.search(expr, trace)
        if not res:
            return None
        return res.group(0)
utils.py 文件源码 项目:GAMADV-XTD 作者: taers232c 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def mktemp(self, *args, **kwds):
        """create temp file that's cleaned up at end of test"""
        self.require_writeable_filesystem()
        fd, path = tempfile.mkstemp(*args, **kwds)
        os.close(fd)
        queue = self._mktemp_queue
        if queue is None:
            queue = self._mktemp_queue = []
            def cleaner():
                for path in queue:
                    if os.path.exists(path):
                        os.remove(path)
                del queue[:]
            self.addCleanup(cleaner)
        queue.append(path)
        return path
cache.py 文件源码 项目:Sci-Finder 作者: snverse 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def set(self, key, value, timeout=None):
        timeout = self._normalize_timeout(timeout)
        filename = self._get_filename(key)
        self._prune()
        try:
            fd, tmp = tempfile.mkstemp(suffix=self._fs_transaction_suffix,
                                       dir=self._path)
            with os.fdopen(fd, 'wb') as f:
                pickle.dump(timeout, f, 1)
                pickle.dump(value, f, pickle.HIGHEST_PROTOCOL)
            rename(tmp, filename)
            os.chmod(filename, self._mode)
        except (IOError, OSError):
            return False
        else:
            return True
serving.py 文件源码 项目:Sci-Finder 作者: snverse 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def generate_adhoc_ssl_context():
    """Generates an adhoc SSL context for the development server."""
    crypto = _get_openssl_crypto_module()
    import tempfile
    import atexit

    cert, pkey = generate_adhoc_ssl_pair()
    cert_handle, cert_file = tempfile.mkstemp()
    pkey_handle, pkey_file = tempfile.mkstemp()
    atexit.register(os.remove, pkey_file)
    atexit.register(os.remove, cert_file)

    os.write(cert_handle, crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
    os.write(pkey_handle, crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey))
    os.close(cert_handle)
    os.close(pkey_handle)
    ctx = load_ssl_context(cert_file, pkey_file)
    return ctx
cache.py 文件源码 项目:Sci-Finder 作者: snverse 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def set(self, key, value, timeout=None):
        timeout = self._normalize_timeout(timeout)
        filename = self._get_filename(key)
        self._prune()
        try:
            fd, tmp = tempfile.mkstemp(suffix=self._fs_transaction_suffix,
                                       dir=self._path)
            with os.fdopen(fd, 'wb') as f:
                pickle.dump(timeout, f, 1)
                pickle.dump(value, f, pickle.HIGHEST_PROTOCOL)
            rename(tmp, filename)
            os.chmod(filename, self._mode)
        except (IOError, OSError):
            return False
        else:
            return True
serving.py 文件源码 项目:Sci-Finder 作者: snverse 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def generate_adhoc_ssl_context():
    """Generates an adhoc SSL context for the development server."""
    crypto = _get_openssl_crypto_module()
    import tempfile
    import atexit

    cert, pkey = generate_adhoc_ssl_pair()
    cert_handle, cert_file = tempfile.mkstemp()
    pkey_handle, pkey_file = tempfile.mkstemp()
    atexit.register(os.remove, pkey_file)
    atexit.register(os.remove, cert_file)

    os.write(cert_handle, crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
    os.write(pkey_handle, crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey))
    os.close(cert_handle)
    os.close(pkey_handle)
    ctx = load_ssl_context(cert_file, pkey_file)
    return ctx
test_new_ffi_1.py 文件源码 项目:SwiftKitten 作者: johncsnyder 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_ffi_buffer_with_file(self):
        import tempfile, os, array
        fd, filename = tempfile.mkstemp()
        f = os.fdopen(fd, 'r+b')
        a = ffi.new("int[]", list(range(1005)))
        try:
            ffi.buffer(a, 512)
        except NotImplementedError as e:
            py.test.skip(str(e))
        f.write(ffi.buffer(a, 1000 * ffi.sizeof("int")))
        f.seek(0)
        assert f.read() == array.array('i', range(1000)).tostring()
        f.seek(0)
        b = ffi.new("int[]", 1005)
        f.readinto(ffi.buffer(b, 1000 * ffi.sizeof("int")))
        assert list(a)[:1000] + [0] * (len(a)-1000) == list(b)
        f.close()
        os.unlink(filename)
backend_tests.py 文件源码 项目:SwiftKitten 作者: johncsnyder 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_ffi_buffer_with_file(self):
        ffi = FFI(backend=self.Backend())
        import tempfile, os, array
        fd, filename = tempfile.mkstemp()
        f = os.fdopen(fd, 'r+b')
        a = ffi.new("int[]", list(range(1005)))
        try:
            ffi.buffer(a, 512)
        except NotImplementedError as e:
            py.test.skip(str(e))
        f.write(ffi.buffer(a, 1000 * ffi.sizeof("int")))
        f.seek(0)
        assert f.read() == array.array('i', range(1000)).tostring()
        f.seek(0)
        b = ffi.new("int[]", 1005)
        f.readinto(ffi.buffer(b, 1000 * ffi.sizeof("int")))
        assert list(a)[:1000] + [0] * (len(a)-1000) == list(b)
        f.close()
        os.unlink(filename)
service.py 文件源码 项目:devsecops-example-helloworld 作者: boozallen 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def __init__(self, executable_path, port=0, service_args=None, log_path=None):
        """
        Creates a new instance of the Service

        :Args:
         - executable_path : Path to PhantomJS binary
         - port : Port the service is running on
         - service_args : A List of other command line options to pass to PhantomJS
         - log_path: Path for PhantomJS service to log to
        """
        self.service_args= service_args
        if self.service_args is None:
            self.service_args = []
        else:
            self.service_args=service_args[:]
        if not log_path:
            log_path = "ghostdriver.log"
        if not self._args_contain("--cookies-file="):
            self._cookie_temp_file = tempfile.mkstemp()[1]
            self.service_args.append("--cookies-file=" + self._cookie_temp_file)
        else:
            self._cookie_temp_file = None

        service.Service.__init__(self, executable_path, port=port, log_file=open(log_path, 'w'))


问题


面经


文章

微信
公众号

扫码关注公众号