python类open()的实例源码

state.py 文件源码 项目:bob 作者: BobBuildTool 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __save(self):
        if self.__asynchronous == 0:
            state = {
                "version" : _BobState.CUR_VERSION,
                "byNameDirs" : self.__byNameDirs,
                "results" : self.__results,
                "inputs" : self.__inputs,
                "jenkins" : self.__jenkins,
                "dirStates" : self.__dirStates,
                "buildState" : self.__buildState,
            }
            tmpFile = self.__path+".new"
            try:
                with open(tmpFile, "wb") as f:
                    pickle.dump(state, f)
                    f.flush()
                    os.fsync(f.fileno())
                os.replace(tmpFile, self.__path)
            except OSError as e:
                raise ParseError("Error saving workspace state: " + str(e))
            self.__dirty = False
        else:
            self.__dirty = True
service.py 文件源码 项目:rci 作者: seecloud 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __init__(self, root, **config):
        self.root = root
        self.config = config

        store = self.config["dbm_path"]
        os.makedirs(store, exist_ok=True)

        self.http_path = config.get("http_path", "monitor")
        self.oauth = github.OAuth(**root.config.secrets[self.config["name"]])
        self.tokens = dbm.open(os.path.join(store, "tokens.db"), "cs")
        self.sessions = dbm.open(os.path.join(store, "sessions.db"), "cs")
        self.acl = config["access"]
        self.logs_path = config["logs_path"]
        os.makedirs(self.logs_path, exist_ok=True)
        self.connections = []
        self._ws_job_listeners = collections.defaultdict(list)
        self._jobws_cb_map = collections.defaultdict(list)
        self._ws_user_map = {}
        self._user_events = collections.defaultdict(list)
x509.py 文件源码 项目:RPKI-toolkit 作者: pavel-odintsov 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def dumpasn1(self):
    """
    Pretty print an ASN.1 DER object using cryptlib dumpasn1 tool.
    Use a temporary file rather than popen4() because dumpasn1 uses
    seek() when decoding ASN.1 content nested in OCTET STRING values.
    """

    ret = None
    fn = "dumpasn1.%d.tmp" % os.getpid()
    try:
      f = open(fn, "wb")
      f.write(self.get_DER())
      f.close()
      p = subprocess.Popen(("dumpasn1", "-a", fn), stdout = subprocess.PIPE, stderr = subprocess.STDOUT)
      ret = "\n".join(x for x in p.communicate()[0].splitlines() if x.startswith(" "))
    except Exception, e:
      ret = "[Could not run dumpasn1: %s]" % e
    finally:
      os.unlink(fn)
    return ret
x509.py 文件源码 项目:RPKI-toolkit 作者: pavel-odintsov 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def dumpasn1(self):
    """
    Pretty print an ASN.1 DER object using cryptlib dumpasn1 tool.
    Use a temporary file rather than popen4() because dumpasn1 uses
    seek() when decoding ASN.1 content nested in OCTET STRING values.
    """

    ret = None
    fn = "dumpasn1.%d.tmp" % os.getpid()
    try:
      f = open(fn, "wb")
      f.write(self.get_DER())
      f.close()
      p = subprocess.Popen(("dumpasn1", "-a", fn), stdout = subprocess.PIPE, stderr = subprocess.STDOUT)
      ret = "\n".join(x for x in p.communicate()[0].splitlines() if x.startswith(" "))
    except Exception, e:
      ret = "[Could not run dumpasn1: %s]" % e
    finally:
      os.unlink(fn)
    return ret
x509.py 文件源码 项目:RPKI-toolkit 作者: pavel-odintsov 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def dumpasn1(self):
    """
    Pretty print an ASN.1 DER object using cryptlib dumpasn1 tool.
    Use a temporary file rather than popen4() because dumpasn1 uses
    seek() when decoding ASN.1 content nested in OCTET STRING values.
    """

    ret = None
    fn = "dumpasn1.%d.tmp" % os.getpid()
    try:
      f = open(fn, "wb")
      f.write(self.get_DER())
      f.close()
      p = subprocess.Popen(("dumpasn1", "-a", fn), stdout = subprocess.PIPE, stderr = subprocess.STDOUT)
      ret = "\n".join(x for x in p.communicate()[0].splitlines() if x.startswith(" "))
    except Exception, e:
      ret = "[Could not run dumpasn1: %s]" % e
    finally:
      os.unlink(fn)
    return ret
x509.py 文件源码 项目:RPKI-toolkit 作者: pavel-odintsov 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def dumpasn1(self):
    """
    Pretty print an ASN.1 DER object using cryptlib dumpasn1 tool.
    Use a temporary file rather than popen4() because dumpasn1 uses
    seek() when decoding ASN.1 content nested in OCTET STRING values.
    """

    ret = None
    fn = "dumpasn1.%d.tmp" % os.getpid()
    try:
      f = open(fn, "wb")
      f.write(self.get_DER())
      f.close()
      p = subprocess.Popen(("dumpasn1", "-a", fn), stdout = subprocess.PIPE, stderr = subprocess.STDOUT)
      ret = "\n".join(x for x in p.communicate()[0].splitlines() if x.startswith(" "))
    except Exception, e:
      ret = "[Could not run dumpasn1: %s]" % e
    finally:
      os.unlink(fn)
    return ret
x509.py 文件源码 项目:RPKI-toolkit 作者: pavel-odintsov 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def dumpasn1(self):
    """
    Pretty print an ASN.1 DER object using cryptlib dumpasn1 tool.
    Use a temporary file rather than popen4() because dumpasn1 uses
    seek() when decoding ASN.1 content nested in OCTET STRING values.
    """

    ret = None
    fn = "dumpasn1.%d.tmp" % os.getpid()
    try:
      f = open(fn, "wb")
      f.write(self.get_DER())
      f.close()
      p = subprocess.Popen(("dumpasn1", "-a", fn), stdout = subprocess.PIPE, stderr = subprocess.STDOUT)
      ret = "\n".join(x for x in p.communicate()[0].splitlines() if x.startswith(" "))
    except Exception, e:
      ret = "[Could not run dumpasn1: %s]" % e
    finally:
      os.unlink(fn)
    return ret
x509.py 文件源码 项目:RPKI-toolkit 作者: pavel-odintsov 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def dumpasn1(self):
    """
    Pretty print an ASN.1 DER object using cryptlib dumpasn1 tool.
    Use a temporary file rather than popen4() because dumpasn1 uses
    seek() when decoding ASN.1 content nested in OCTET STRING values.
    """

    ret = None
    fn = "dumpasn1.%d.tmp" % os.getpid()
    try:
      f = open(fn, "wb")
      f.write(self.get_DER())
      f.close()
      p = subprocess.Popen(("dumpasn1", "-a", fn), stdout = subprocess.PIPE, stderr = subprocess.STDOUT)
      ret = "\n".join(x for x in p.communicate()[0].splitlines() if x.startswith(" "))
    except Exception, e:
      ret = "[Could not run dumpasn1: %s]" % e
    finally:
      os.unlink(fn)
    return ret
basedb.py 文件源码 项目:sslxray 作者: portcullislabs 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def keys(self):
        """Return a list of usernames in the database.

        @rtype: list
        @return: The usernames in the database.
        """
        if self.db == None:
            raise AssertionError("DB not open")

        self.lock.acquire()
        try:
            usernames = self.db.keys()
        finally:
            self.lock.release()
        usernames = [u for u in usernames if not u.startswith("--Reserved--")]
        return usernames
subunit_trace.py 文件源码 项目:stestr 作者: mtreinish 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def find_test_run_time_diff(test_id, run_time):
    times_db_path = os.path.join(os.path.join(os.getcwd(), '.testrepository'),
                                 'times.dbm')
    if os.path.isfile(times_db_path):
        try:
            test_times = dbm.open(times_db_path)
        except Exception:
            return False
        try:
            avg_runtime = float(test_times.get(str(test_id), False))
        except Exception:
            try:
                avg_runtime = float(test_times[str(test_id)])
            except Exception:
                avg_runtime = False

        if avg_runtime and avg_runtime > 0:
            run_time = float(run_time.rstrip('s'))
            perc_diff = ((run_time - avg_runtime) / avg_runtime) * 100
            return perc_diff
    return False
anydbm.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def open(file, flag='r', mode=0666):
    """Open or create database at path given by *file*.

    Optional argument *flag* can be 'r' (default) for read-only access, 'w'
    for read-write access of an existing database, 'c' for read-write access
    to a new or existing database, and 'n' for read-write access to a new
    database.

    Note: 'r' and 'w' fail if the database doesn't exist; 'c' creates it
    only if it doesn't exist; and 'n' always creates a new database.
    """

    # guess the type of an existing database
    from whichdb import whichdb
    result=whichdb(file)
    if result is None:
        # db doesn't exist
        if 'c' in flag or 'n' in flag:
            # file doesn't exist and the new
            # flag was used so use default type
            mod = _defaultmod
        else:
            raise error, "need 'c' or 'n' flag to open new db"
    elif result == "":
        # db type cannot be determined
        raise error, "db type could not be determined"
    else:
        mod = __import__(result)
    return mod.open(file, flag, mode)
state.py 文件源码 项目:bob 作者: BobBuildTool 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __getBIdCache(self):
        if self.__buildIdCache is None:
            self.__buildIdCache = dbm.open(".bob-buildids.dbm", 'c')
            #self.__buildIdCache = {}
        return self.__buildIdCache
pathspec.py 文件源码 项目:bob 作者: BobBuildTool 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def load(cls, cacheName, cacheKey):
        try:
            db = dbm.open(cacheName, "r")
            persistedCacheKey = db.get(b'vsn')
            if cacheKey == persistedCacheKey:
                return cls(db, db[b'root'])
            else:
                db.close()
        except OSError:
            pass
        except dbm.error:
            pass
        return None
pathspec.py 文件源码 项目:bob 作者: BobBuildTool 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def create(cls, cacheName, cacheKey, root):
        try:
            db = dbm.open(cacheName, 'n')
            try:
                db[b'root'] = rootKey = PkgGraphNode.__convertPackageToGraph(db, root)
                db[b'vsn'] = cacheKey
            finally:
                db.close()
            return cls(dbm.open(cacheName, 'r'), rootKey)
        except dbm.error as e:
            raise BobError("Cannot save internal state: " + str(e))
archive.py 文件源码 项目:bob 作者: BobBuildTool 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def __enter__(self):
        try:
            self.__db = dbm.open(".bob-adb", 'c')
        except dbm.error as e:
            raise BobError("Cannot open cache: " + str(e))
        return self
anydbm.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def open(file, flag='r', mode=0666):
    """Open or create database at path given by *file*.

    Optional argument *flag* can be 'r' (default) for read-only access, 'w'
    for read-write access of an existing database, 'c' for read-write access
    to a new or existing database, and 'n' for read-write access to a new
    database.

    Note: 'r' and 'w' fail if the database doesn't exist; 'c' creates it
    only if it doesn't exist; and 'n' always creates a new database.
    """

    # guess the type of an existing database
    from whichdb import whichdb
    result=whichdb(file)
    if result is None:
        # db doesn't exist
        if 'c' in flag or 'n' in flag:
            # file doesn't exist and the new
            # flag was used so use default type
            mod = _defaultmod
        else:
            raise error, "need 'c' or 'n' flag to open new db"
    elif result == "":
        # db type cannot be determined
        raise error, "db type could not be determined"
    else:
        mod = __import__(result)
    return mod.open(file, flag, mode)
storage.py 文件源码 项目:rci 作者: seecloud 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __init__(self, service_name, storage_name, **kwargs):
        path = os.path.join(kwargs["path"], service_name)
        os.makedirs(path, exist_ok=True)
        self.db = dbm.open(os.path.join(path, storage_name + ".db"), "cs")
service.py 文件源码 项目:rci 作者: seecloud 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self, root, **kwargs):
        self.root = root
        self.cfg = kwargs
        self.http_path = kwargs.get("http_path", "github")
        self.oauth = github.OAuth(**root.config.secrets[self.cfg["name"]])

        store = kwargs["data-path"]
        os.makedirs(store, exist_ok=True)
        self.users = dbm.open(os.path.join(store, "users.db"), "cs")
        self.orgs = dbm.open(os.path.join(store, "orgs.db"), "cs")
        session_store = dbm.open(os.path.join(store, "sessions.db"), "cs")
        self.ss = SessionStore(session_store, kwargs.get("cookie_name", "ghs"))
service.py 文件源码 项目:rci 作者: seecloud 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def cb_job_started(self, job):
        path = os.path.join(self.config["logs_path"], job.event.id,
                            job.config["name"])
        os.makedirs(path)
        path = os.path.join(path, "console.txt")
        outfile = open(path, "wb")
        job.console_callbacks.add(lambda s,d: outfile.write(d))
test_shelf.py 文件源码 项目:shelfdb 作者: nitipit 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def setUp(self):
        self.db = shelfdb.open('test_data/db')
        self.assertIsInstance(self.db, shelfdb.shelf.DB)
test_shelf.py 文件源码 项目:shelfdb 作者: nitipit 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_shelf_open_close(self):
        self.db.shelf('user')
        try:
            dbm.open('test_data/db/user')
        except dbm.gnu.error as e:
            self.assertEqual(e.errno, 11)
        self.db.close()
        self.assertIsInstance(self.db._shelf['user']._shelf.dict,
            shelve._ClosedDict)
test_shelf.py 文件源码 项目:shelfdb 作者: nitipit 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def setUp(self):
        self.db = shelfdb.open('test_data/db')
        self.user_list = [
            {'name': 'Jan'},
            {'name': 'Feb'},
            {'name': 'Mar'},
        ]
        for user in self.user_list:
            user['_id'] = self.db.shelf('user').insert(user)
            # Check if _id is a valid uuid1
            id_ = uuid.UUID(user['_id'], version=1)
            self.assertEqual(user['_id'].replace('-', ''), id_.hex)
url?????v1.0.py 文件源码 项目:Url 作者: beiruan 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def OnSeveResult(self,event):
        fn = open('catalog.txt','w+')
        fn2 = open('extract.txt',"w+")
        leftRule = self.lf.GetText()
        rightRule = self.rt.GetText()
        fn.writelines(leftRule)
        fn2.writelines(rightRule)
        fn.close() 
        fn2.close()
demo_local_disk_file_persistence.py 文件源码 项目:SmallReptileTraining 作者: yanbober 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def save(self, data):
        with open('NormalFilePersistence.txt', 'w') as open_file:
            open_file.write(data)
demo_local_disk_file_persistence.py 文件源码 项目:SmallReptileTraining 作者: yanbober 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def load(self):
        with open('NormalFilePersistence.txt', 'r') as open_file:
            return open_file.read()
demo_local_disk_file_persistence.py 文件源码 项目:SmallReptileTraining 作者: yanbober 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def save(self, key, value):
        try:
            dbm_file = dbm.open('DBMPersistence', 'c')
            dbm_file[key] = str(value)
        finally:
            dbm_file.close()
demo_local_disk_file_persistence.py 文件源码 项目:SmallReptileTraining 作者: yanbober 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def load(self, key):
        try:
            dbm_file = dbm.open('DBMPersistence', 'r')
            if key in dbm_file:
                result = dbm_file[key]
            else:
                result = None
        finally:
            dbm_file.close()
        return result
demo_local_disk_file_persistence.py 文件源码 项目:SmallReptileTraining 作者: yanbober 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def save(self, obj):
        with open('PicklePersistence', 'wb') as pickle_file:
            pickle.dump(obj, pickle_file)
demo_local_disk_file_persistence.py 文件源码 项目:SmallReptileTraining 作者: yanbober 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def save(self, key, obj):
        try:
            shelve_file = shelve.open('ShelvePersistence')
            shelve_file[key] = obj
        finally:
            shelve_file.close()
demo_local_disk_file_persistence.py 文件源码 项目:SmallReptileTraining 作者: yanbober 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def load(self, key):
        try:
            shelve_file = shelve.open('ShelvePersistence')
            if key in shelve_file:
                result = shelve_file[key]
            else:
                result = None
        finally:
            shelve_file.close()
        return result


问题


面经


文章

微信
公众号

扫码关注公众号