python类mkstemp()的实例源码

mysql.py 文件源码 项目:deployfish 作者: caltechads 项目源码 文件源码 阅读 45 收藏 0 点赞 0 评论 0
def empty_db(self):
        cmd = [
            "mysqldump",
            "-u%(user)s" % self.db_config,
            "-h%(host)s" % self.db_config,
            "--add_drop-table",
            "--no-data",
            "%(name)s" % self.db_config,
        ]

        tmphandle, tmppath = tempfile.mkstemp(text=True)
        tmpfile = os.fdopen(tmphandle, "w")

        sql_data = subprocess.check_output(cmd, stderr=None).split('\n')
        tmpfile.write("SET FOREIGN_KEY_CHECKS = 0;\n")
        tmpfile.write("use %(name)s;\n" % self.db_config)
        for line in sql_data:
            if line.startswith("DROP"):
                tmpfile.write(line + '\n')
        tmpfile.close()
        self._run_mysql_cmd("source %s" % tmppath)
        os.remove(tmppath)
Image.py 文件源码 项目:imagepaste 作者: robinchenyu 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _dump(self, file=None, format=None):
        import tempfile
        suffix = ''
        if format:
            suffix = '.'+format
        if not file:
            f, file = tempfile.mkstemp(suffix)
            os.close(f)

        self.load()
        if not format or format == "PPM":
            self.im.save_ppm(file)
        else:
            if not file.endswith(format):
                file = file + "." + format
            self.save(file, format)
        return file
_temp.py 文件源码 项目:senf 作者: quodlibet 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def mkstemp(suffix=None, prefix=None, dir=None, text=False):
    """
    Args:
        suffix (`pathlike` or `None`): suffix or `None` to use the default
        prefix (`pathlike` or `None`): prefix or `None` to use the default
        dir (`pathlike` or `None`): temp dir or `None` to use the default
        text (bool): if the file should be opened in text mode
    Returns:
        Tuple[`int`, `fsnative`]:
            A tuple containing the file descriptor and the file path
    Raises:
        EnvironmentError

    Like :func:`python3:tempfile.mkstemp` but always returns a `fsnative`
    path.
    """

    suffix = fsnative() if suffix is None else path2fsn(suffix)
    prefix = gettempprefix() if prefix is None else path2fsn(prefix)
    dir = gettempdir() if dir is None else path2fsn(dir)

    return tempfile.mkstemp(suffix, prefix, dir, text)
_temp.py 文件源码 项目:senf 作者: quodlibet 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def mkdtemp(suffix=None, prefix=None, dir=None):
    """
    Args:
        suffix (`pathlike` or `None`): suffix or `None` to use the default
        prefix (`pathlike` or `None`): prefix or `None` to use the default
        dir (`pathlike` or `None`): temp dir or `None` to use the default
    Returns:
        `fsnative`: A path to a directory
    Raises:
        EnvironmentError

    Like :func:`python3:tempfile.mkstemp` but always returns a `fsnative` path.
    """

    suffix = fsnative() if suffix is None else path2fsn(suffix)
    prefix = gettempprefix() if prefix is None else path2fsn(prefix)
    dir = gettempdir() if dir is None else path2fsn(dir)

    return tempfile.mkdtemp(suffix, prefix, dir)
ImageGrab.py 文件源码 项目:imagepaste 作者: robinchenyu 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def grab(bbox=None):
    if sys.platform == "darwin":
        f, file = tempfile.mkstemp('.png')
        os.close(f)
        subprocess.call(['screencapture', '-x', file])
        im = Image.open(file)
        im.load()
        os.unlink(file)
    else:
        size, data = grabber()
        im = Image.frombytes(
            "RGB", size, data,
            # RGB, 32-bit line padding, origo in lower left corner
            "raw", "BGR", (size[0]*3 + 3) & -4, -1
            )
    if bbox:
        im = im.crop(bbox)
    return im
archive.py 文件源码 项目:bob 作者: BobBuildTool 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def _openDownloadFile(self, buildId, suffix):
        (tmpFd, tmpName) = mkstemp()
        url = self._makeUrl(buildId, suffix)
        try:
            os.close(tmpFd)
            env = { k:v for (k,v) in os.environ.items() if k in self.__whiteList }
            env["BOB_LOCAL_ARTIFACT"] = tmpName
            env["BOB_REMOTE_ARTIFACT"] = url
            ret = subprocess.call(["/bin/bash", "-ec", self.__downloadCmd],
                stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL,
                cwd="/tmp", env=env)
            if ret == 0:
                ret = tmpName
                tmpName = None
                return CustomDownloader(ret)
            else:
                raise ArtifactDownloadError("failed (exit {})".format(ret))
        finally:
            if tmpName is not None: os.unlink(tmpName)
ImageGrab.py 文件源码 项目:Projects 作者: it2school 项目源码 文件源码 阅读 44 收藏 0 点赞 0 评论 0
def grab(bbox=None):
    if sys.platform == "darwin":
        fh, filepath = tempfile.mkstemp('.png')
        os.close(fh)
        subprocess.call(['screencapture', '-x', filepath])
        im = Image.open(filepath)
        im.load()
        os.unlink(filepath)
    else:
        size, data = grabber()
        im = Image.frombytes(
            "RGB", size, data,
            # RGB, 32-bit line padding, origin lower left corner
            "raw", "BGR", (size[0]*3 + 3) & -4, -1
            )
    if bbox:
        im = im.crop(bbox)
    return im
Image.py 文件源码 项目:Projects 作者: it2school 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def _dump(self, file=None, format=None):
        import tempfile
        suffix = ''
        if format:
            suffix = '.'+format
        if not file:
            f, file = tempfile.mkstemp(suffix)
            os.close(f)

        self.load()
        if not format or format == "PPM":
            self.im.save_ppm(file)
        else:
            if not file.endswith(format):
                file = file + "." + format
            self.save(file, format)
        return file
template.py 文件源码 项目:Flask_Blog 作者: sugarguo 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def _compile_module_file(template, text, filename, outputpath, module_writer):
    source, lexer = _compile(template, text, filename,
                             generate_magic_comment=True)

    if isinstance(source, compat.text_type):
        source = source.encode(lexer.encoding or 'ascii')

    if module_writer:
        module_writer(source, outputpath)
    else:
        # make tempfiles in the same location as the ultimate
        # location.   this ensures they're on the same filesystem,
        # avoiding synchronization issues.
        (dest, name) = tempfile.mkstemp(dir=os.path.dirname(outputpath))

        os.write(dest, source)
        os.close(dest)
        shutil.move(name, outputpath)
cache.py 文件源码 项目:Flask_Blog 作者: sugarguo 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def set(self, key, value, timeout=None):
        if timeout is None:
            timeout = int(time() + self.default_timeout)
        elif timeout != 0:
            timeout = int(time() + timeout)
        filename = self._get_filename(key)
        self._prune()
        try:
            fd, tmp = tempfile.mkstemp(suffix=self._fs_transaction_suffix,
                                       dir=self._path)
            with os.fdopen(fd, 'wb') as f:
                pickle.dump(timeout, f, 1)
                pickle.dump(value, f, pickle.HIGHEST_PROTOCOL)
            rename(tmp, filename)
            os.chmod(filename, self._mode)
        except (IOError, OSError):
            return False
        else:
            return True
test_local_server.py 文件源码 项目:htsget 作者: jeromekelleher 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_transfer_with_cli(self):
        test_instances = [
            TestUrlInstance(url="/data1", data=b"data1"),
            TestUrlInstance(url="/data2", data=b"data2")
        ]
        self.httpd.test_instances = test_instances
        try:
            fd, filename = tempfile.mkstemp()
            os.close(fd)
            cmd = [TestRequestHandler.ticket_url, "-O", filename]
            parser = cli.get_htsget_parser()
            args = parser.parse_args(cmd)
            with mock.patch("sys.exit") as mocked_exit:
                cli.run(args)
                mocked_exit.assert_called_once_with(0)
            all_data = b"".join(test_instance.data for test_instance in test_instances)
            with open(filename, "rb") as f:
                self.assertEqual(f.read(), all_data)
        finally:
            os.unlink(filename)
photo.py 文件源码 项目:HugoPhotoSwipe 作者: GjjvdBurg 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def create_thumb_js(self, mode=None, pth=None):
        """ Create the thumbnail using SmartCrop.js """
        if pth is None:
            raise ValueError("path can't be None")

        # save a copy of the image with the correct orientation in a temporary 
        # file
        _, tmpfname = tempfile.mkstemp(suffix='.'+settings.output_format)
        self.original_image.save(tmpfname, quality=95)

        # Load smartcrop and set options
        nwidth, nheight = self.resize_dims(mode)
        logging.info("[%s] SmartCrop.js new dimensions: %ix%i" % (self.name, 
            nwidth, nheight))
        command = [settings.smartcrop_js_path, '--width', str(nwidth), 
                '--height', str(nheight), tmpfname, pth]
        logging.info("[%s] SmartCrop.js running crop command." % self.name)
        check_output(command)

        # remove the temporary file
        os.remove(tmpfname)

        return pth
twitter.py 文件源码 项目:v2ex-tornado-2 作者: coderyy 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def Set(self,key,data):
    path = self._GetPath(key)
    directory = os.path.dirname(path)
    if not os.path.exists(directory):
      os.makedirs(directory)
    if not os.path.isdir(directory):
      raise _FileCacheError('%s exists but is not a directory' % directory)
    temp_fd, temp_path = tempfile.mkstemp()
    temp_fp = os.fdopen(temp_fd, 'w')
    temp_fp.write(data)
    temp_fp.close()
    if not path.startswith(self._root_directory):
      raise _FileCacheError('%s does not appear to live under %s' %
                            (path, self._root_directory))
    if os.path.exists(path):
      os.remove(path)
    os.rename(temp_path, path)
CommandlineTests.py 文件源码 项目:piperine 作者: DNA-and-Natural-Algorithms-Group 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def setUp(self):
        # CRN used in this test
        self.crn_rxn = 'A + B -> A + D\n'
        # Establish random temporary filenames
        self.tdir = mkdtemp(prefix='piperine_test')
        fid, self.basename = mkstemp(dir=self.tdir)
        os.close(fid)
        endings = ['.crn', '.fixed', '.pil', '.mfe', '{}_strands.txt', '{}.seqs']
        self.filenames = [ self.basename + suf for suf in endings ]
        self.crn, self.fixed, self.pil, self.mfe, self.strands, self.seqs = self.filenames
        fid, self.fixedscore = mkstemp(suffix='_fixed_score.csv', dir=self.tdir)
        os.close(fid)
        fid, self.reportfile = mkstemp(dir=self.tdir)
        os.close(fid)
        # Write CRN to basename.crn
        with open(self.crn, 'w') as f:
            f.write(self.crn_rxn)
        # Modules and module strings for import tests
        proc = subprocess.Popen(['piperine-design {} -n 3 -D -q'.format(self.crn)], stdout=subprocess.PIPE, shell=True)
        (out, err) = proc.communicate()
        self.ef = energetics.energyfuncs(targetdG=7.7)
        self.trans = translation
msvc.py 文件源码 项目:SoCFoundationFlow 作者: mattaw 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def exec_response_command(self, cmd, **kw):
    # not public yet
    try:
        tmp = None
        if sys.platform.startswith('win') and isinstance(cmd, list) and len(' '.join(cmd)) >= 8192:
            program = cmd[0] #unquoted program name, otherwise exec_command will fail
            cmd = [self.quote_response_command(x) for x in cmd]
            (fd, tmp) = tempfile.mkstemp()
            os.write(fd, '\r\n'.join(i.replace('\\', '\\\\') for i in cmd[1:]).encode())
            os.close(fd)
            cmd = [program, '@' + tmp]
        # no return here, that's on purpose
        ret = self.generator.bld.exec_command(cmd, **kw)
    finally:
        if tmp:
            try:
                os.remove(tmp)
            except OSError:
                pass # anti-virus and indexers can keep the files open -_-
    return ret

########## stupid evil command modification: concatenate the tokens /Fx, /doc, and /x: with the next token
ifort.py 文件源码 项目:SoCFoundationFlow 作者: mattaw 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def exec_response_command(self, cmd, **kw):
    # not public yet
    try:
        tmp = None
        if sys.platform.startswith('win') and isinstance(cmd, list) and len(' '.join(cmd)) >= 8192:
            program = cmd[0] #unquoted program name, otherwise exec_command will fail
            cmd = [self.quote_response_command(x) for x in cmd]
            (fd, tmp) = tempfile.mkstemp()
            os.write(fd, '\r\n'.join(i.replace('\\', '\\\\') for i in cmd[1:]).encode())
            os.close(fd)
            cmd = [program, '@' + tmp]
        # no return here, that's on purpose
        ret = super(self.__class__, self).exec_command(cmd, **kw)
    finally:
        if tmp:
            try:
                os.remove(tmp)
            except OSError:
                pass # anti-virus and indexers can keep the files open -_-
    return ret
distnet.py 文件源码 项目:SoCFoundationFlow 作者: mattaw 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def get_pkg_dir(self, pkgname, pkgver, subdir):
        pkgdir = os.path.join(get_distnet_cache(), pkgname, pkgver)
        if not os.path.isdir(pkgdir):
            os.makedirs(pkgdir)

        target = os.path.join(pkgdir, subdir)

        if os.path.exists(target):
            return target

        (fd, tmp) = tempfile.mkstemp(dir=pkgdir)
        try:
            os.close(fd)
            self.download_to_file(pkgname, pkgver, subdir, tmp)
            if subdir == REQUIRES:
                os.rename(tmp, target)
            else:
                self.extract_tar(subdir, pkgdir, tmp)
        finally:
            try:
                os.remove(tmp)
            except OSError:
                pass

        return target
ifort.py 文件源码 项目:SoCFoundationFlow 作者: mattaw 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def exec_response_command(self, cmd, **kw):
    # not public yet
    try:
        tmp = None
        if sys.platform.startswith('win') and isinstance(cmd, list) and len(' '.join(cmd)) >= 8192:
            program = cmd[0] #unquoted program name, otherwise exec_command will fail
            cmd = [self.quote_response_command(x) for x in cmd]
            (fd, tmp) = tempfile.mkstemp()
            os.write(fd, '\r\n'.join(i.replace('\\', '\\\\') for i in cmd[1:]).encode())
            os.close(fd)
            cmd = [program, '@' + tmp]
        # no return here, that's on purpose
        ret = super(self.__class__, self).exec_command(cmd, **kw)
    finally:
        if tmp:
            try:
                os.remove(tmp)
            except OSError:
                pass # anti-virus and indexers can keep the files open -_-
    return ret
distnet.py 文件源码 项目:SoCFoundationFlow 作者: mattaw 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def get_pkg_dir(self, pkgname, pkgver, subdir):
        pkgdir = os.path.join(get_distnet_cache(), pkgname, pkgver)
        if not os.path.isdir(pkgdir):
            os.makedirs(pkgdir)

        target = os.path.join(pkgdir, subdir)

        if os.path.exists(target):
            return target

        (fd, tmp) = tempfile.mkstemp(dir=pkgdir)
        try:
            os.close(fd)
            self.download_to_file(pkgname, pkgver, subdir, tmp)
            if subdir == REQUIRES:
                os.rename(tmp, target)
            else:
                self.extract_tar(subdir, pkgdir, tmp)
        finally:
            try:
                os.remove(tmp)
            except OSError:
                pass

        return target
msvc.py 文件源码 项目:SoCFoundationFlow 作者: mattaw 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def exec_response_command(self, cmd, **kw):
    # not public yet
    try:
        tmp = None
        if sys.platform.startswith('win') and isinstance(cmd, list) and len(' '.join(cmd)) >= 8192:
            program = cmd[0] #unquoted program name, otherwise exec_command will fail
            cmd = [self.quote_response_command(x) for x in cmd]
            (fd, tmp) = tempfile.mkstemp()
            os.write(fd, '\r\n'.join(i.replace('\\', '\\\\') for i in cmd[1:]).encode())
            os.close(fd)
            cmd = [program, '@' + tmp]
        # no return here, that's on purpose
        ret = self.generator.bld.exec_command(cmd, **kw)
    finally:
        if tmp:
            try:
                os.remove(tmp)
            except OSError:
                pass # anti-virus and indexers can keep the files open -_-
    return ret

########## stupid evil command modification: concatenate the tokens /Fx, /doc, and /x: with the next token
distnet.py 文件源码 项目:SoCFoundationFlow 作者: mattaw 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def get_pkg_dir(self, pkgname, pkgver, subdir):
        pkgdir = os.path.join(get_distnet_cache(), pkgname, pkgver)
        if not os.path.isdir(pkgdir):
            os.makedirs(pkgdir)

        target = os.path.join(pkgdir, subdir)

        if os.path.exists(target):
            return target

        (fd, tmp) = tempfile.mkstemp(dir=pkgdir)
        try:
            os.close(fd)
            self.download_to_file(pkgname, pkgver, subdir, tmp)
            if subdir == REQUIRES:
                os.rename(tmp, target)
            else:
                self.extract_tar(subdir, pkgdir, tmp)
        finally:
            try:
                os.remove(tmp)
            except OSError:
                pass

        return target
io.py 文件源码 项目:bob.bio.base 作者: bioidiap 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def open_compressed(filename, open_flag='r', compression_type='bz2'):
  """Opens a compressed HDF5File with the given opening flags.
  For the 'r' flag, the given compressed file will be extracted to a local space.
  For 'w', an empty HDF5File is created.
  In any case, the opened HDF5File is returned, which needs to be closed using the close_compressed() function.
  """
  # create temporary HDF5 file name
  hdf5_file_name = tempfile.mkstemp('.hdf5', 'bob_')[1]

  if open_flag == 'r':
    # extract the HDF5 file from the given file name into a temporary file name
    tar = tarfile.open(filename, mode="r:" + compression_type)
    memory_file = tar.extractfile(tar.next())
    real_file = open(hdf5_file_name, 'wb')
    real_file.write(memory_file.read())
    del memory_file
    real_file.close()
    tar.close()

  return bob.io.base.HDF5File(hdf5_file_name, open_flag)
cache.py 文件源码 项目:swjtu-pyscraper 作者: Desgard 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def set(self, key, value, timeout=None):
        if timeout is None:
            timeout = int(time() + self.default_timeout)
        elif timeout != 0:
            timeout = int(time() + timeout)
        filename = self._get_filename(key)
        self._prune()
        try:
            fd, tmp = tempfile.mkstemp(suffix=self._fs_transaction_suffix,
                                       dir=self._path)
            with os.fdopen(fd, 'wb') as f:
                pickle.dump(timeout, f, 1)
                pickle.dump(value, f, pickle.HIGHEST_PROTOCOL)
            rename(tmp, filename)
            os.chmod(filename, self._mode)
        except (IOError, OSError):
            return False
        else:
            return True
serving.py 文件源码 项目:swjtu-pyscraper 作者: Desgard 项目源码 文件源码 阅读 43 收藏 0 点赞 0 评论 0
def generate_adhoc_ssl_context():
    """Generates an adhoc SSL context for the development server."""
    crypto = _get_openssl_crypto_module()
    import tempfile
    import atexit

    cert, pkey = generate_adhoc_ssl_pair()
    cert_handle, cert_file = tempfile.mkstemp()
    pkey_handle, pkey_file = tempfile.mkstemp()
    atexit.register(os.remove, pkey_file)
    atexit.register(os.remove, cert_file)

    os.write(cert_handle, crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
    os.write(pkey_handle, crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey))
    os.close(cert_handle)
    os.close(pkey_handle)
    ctx = load_ssl_context(cert_file, pkey_file)
    return ctx
sqlitelockfile.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __init__(self, path, threaded=True, timeout=None):
        """
        >>> lock = SQLiteLockFile('somefile')
        >>> lock = SQLiteLockFile('somefile', threaded=False)
        """
        LockBase.__init__(self, path, threaded, timeout)
        self.lock_file = unicode(self.lock_file)
        self.unique_name = unicode(self.unique_name)

        if SQLiteLockFile.testdb is None:
            import tempfile
            _fd, testdb = tempfile.mkstemp()
            os.close(_fd)
            os.unlink(testdb)
            del _fd, tempfile
            SQLiteLockFile.testdb = testdb

        import sqlite3
        self.connection = sqlite3.connect(SQLiteLockFile.testdb)

        c = self.connection.cursor()
        try:
            c.execute("create table locks"
                      "("
                      "   lock_file varchar(32),"
                      "   unique_name varchar(32)"
                      ")")
        except sqlite3.OperationalError:
            pass
        else:
            self.connection.commit()
            import atexit
            atexit.register(os.unlink, SQLiteLockFile.testdb)
tpm_random.py 文件源码 项目:python-keylime 作者: mit-ll 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def get_tpm_rand_block(size=4096):
    global warned
    randpath = None
    try:
        #make a temp file for the output 
        randfd,randpath = tempfile.mkstemp()
        command = "getrandom -size %d -out %s" % (size,randpath)
        tpm_exec.run(command)

        # read in the quote
        f = open(randpath,"rb")
        rand = f.read()
        f.close()
        os.close(randfd)
    except Exception as e:
        if not warned:
            logger.warn("TPM randomness not available: %s"%e)
            warned=True
        return []
    finally:
        if randpath is not None:
            os.remove(randpath)
    return rand
tpm_nvram.py 文件源码 项目:python-keylime 作者: mit-ll 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def write_key_nvram(key):
    if common.STUB_TPM:
        storage = open("tpm_nvram","wb")
        storage.write(key)
        storage.close()
        return

    owner_pw = tpm_initialize.get_tpm_metadata('owner_pw')
    keyFile = None
    try:
        # write out quote
        keyfd,keypath = tempfile.mkstemp()
        keyFile = open(keypath,"wb")
        keyFile.write(key)
        keyFile.close()
        os.close(keyfd)
        tpm_exec.run("nv_definespace -pwdo %s -in 1 -sz %d -pwdd %s -per 40004"%(owner_pw,common.BOOTSTRAP_KEY_SIZE,owner_pw))
        tpm_exec.run("nv_writevalue -pwdd %s -in 1 -if %s"%(owner_pw,keyFile.name))
    finally:
        if keyFile is not None:
            os.remove(keyFile.name)
    return
tpm_initialize.py 文件源码 项目:python-keylime 作者: mit-ll 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def test_ownerpw(owner_pw,reentry=False):
    tmppath = None
    try:
        #make a temp file for the output 
        _,tmppath = tempfile.mkstemp()
        (output,code) = tpm_exec.run("getpubek -pwdo %s -ok %s"%(owner_pw,tmppath),raiseOnError=False) 
        if code!=tpm_exec.EXIT_SUCESS:
            if len(output)>0 and output[0].startswith("Error Authentication failed (Incorrect Password) from TPM_OwnerReadPubek"):
                return False
            elif len(output)>0 and output[0].startswith("Error Defend lock running from TPM_OwnerReadPubek"):
                if reentry:
                    logger.error("Unable to unlock TPM")
                    return False
                # tpm got locked. lets try to unlock it
                logger.error("TPM is locked from too many invalid owner password attempts, attempting to unlock with password: %s"%owner_pw)
                # i have no idea why, but runnig this twice seems to actually work
                tpm_exec.run("resetlockvalue -pwdo %s"%owner_pw,raiseOnError=False) 
                tpm_exec.run("resetlockvalue -pwdo %s"%owner_pw,raiseOnError=False) 
                return test_ownerpw(owner_pw,True)
            else:
                raise Exception("test ownerpw, getpubek failed with code "+str(code)+": "+str(output))
    finally:
        if tmppath is not None:
            os.remove(tmppath)
    return True
tpm_initialize.py 文件源码 项目:python-keylime 作者: mit-ll 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def get_pub_ek(): # assumes that owner_pw is correct at this point
    owner_pw = get_tpm_metadata('owner_pw')
    tmppath = None
    try:
        #make a temp file for the output 
        tmpfd,tmppath = tempfile.mkstemp()
        (output,code) = tpm_exec.run("getpubek -pwdo %s -ok %s"%(owner_pw,tmppath),raiseOnError=False) # generates pubek.pem
        if code!=tpm_exec.EXIT_SUCESS:
            raise Exception("getpubek failed with code "+str(code)+": "+str(output))

        # read in the output
        f = open(tmppath,"rb")
        ek = f.read()
        f.close()
        os.close(tmpfd)
    finally:
        if tmppath is not None:
            os.remove(tmppath)

    set_tpm_metadata('ek',ek)
fileutils.py 文件源码 项目:iotronic 作者: openstack 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def write_to_tempfile(content, path=None, suffix='', prefix='tmp'):
    """Create temporary file or use existing file.

    This util is needed for creating temporary file with
    specified content, suffix and prefix. If path is not None,
    it will be used for writing content. If the path doesn't
    exist it'll be created.

    :param content: content for temporary file.
    :param path: same as parameter 'dir' for mkstemp
    :param suffix: same as parameter 'suffix' for mkstemp
    :param prefix: same as parameter 'prefix' for mkstemp

    For example: it can be used in database tests for creating
    configuration files.
    """
    if path:
        ensure_tree(path)

    (fd, path) = tempfile.mkstemp(suffix=suffix, dir=path, prefix=prefix)
    try:
        os.write(fd, content)
    finally:
        os.close(fd)
    return path


问题


面经


文章

微信
公众号

扫码关注公众号