def __save(self):
if self.__asynchronous == 0:
state = {
"version" : _BobState.CUR_VERSION,
"byNameDirs" : self.__byNameDirs,
"results" : self.__results,
"inputs" : self.__inputs,
"jenkins" : self.__jenkins,
"dirStates" : self.__dirStates,
"buildState" : self.__buildState,
}
tmpFile = self.__path+".new"
try:
with open(tmpFile, "wb") as f:
pickle.dump(state, f)
f.flush()
os.fsync(f.fileno())
os.replace(tmpFile, self.__path)
except OSError as e:
raise ParseError("Error saving workspace state: " + str(e))
self.__dirty = False
else:
self.__dirty = True
python类open()的实例源码
def __init__(self, root, **config):
self.root = root
self.config = config
store = self.config["dbm_path"]
os.makedirs(store, exist_ok=True)
self.http_path = config.get("http_path", "monitor")
self.oauth = github.OAuth(**root.config.secrets[self.config["name"]])
self.tokens = dbm.open(os.path.join(store, "tokens.db"), "cs")
self.sessions = dbm.open(os.path.join(store, "sessions.db"), "cs")
self.acl = config["access"]
self.logs_path = config["logs_path"]
os.makedirs(self.logs_path, exist_ok=True)
self.connections = []
self._ws_job_listeners = collections.defaultdict(list)
self._jobws_cb_map = collections.defaultdict(list)
self._ws_user_map = {}
self._user_events = collections.defaultdict(list)
def dumpasn1(self):
"""
Pretty print an ASN.1 DER object using cryptlib dumpasn1 tool.
Use a temporary file rather than popen4() because dumpasn1 uses
seek() when decoding ASN.1 content nested in OCTET STRING values.
"""
ret = None
fn = "dumpasn1.%d.tmp" % os.getpid()
try:
f = open(fn, "wb")
f.write(self.get_DER())
f.close()
p = subprocess.Popen(("dumpasn1", "-a", fn), stdout = subprocess.PIPE, stderr = subprocess.STDOUT)
ret = "\n".join(x for x in p.communicate()[0].splitlines() if x.startswith(" "))
except Exception, e:
ret = "[Could not run dumpasn1: %s]" % e
finally:
os.unlink(fn)
return ret
def dumpasn1(self):
"""
Pretty print an ASN.1 DER object using cryptlib dumpasn1 tool.
Use a temporary file rather than popen4() because dumpasn1 uses
seek() when decoding ASN.1 content nested in OCTET STRING values.
"""
ret = None
fn = "dumpasn1.%d.tmp" % os.getpid()
try:
f = open(fn, "wb")
f.write(self.get_DER())
f.close()
p = subprocess.Popen(("dumpasn1", "-a", fn), stdout = subprocess.PIPE, stderr = subprocess.STDOUT)
ret = "\n".join(x for x in p.communicate()[0].splitlines() if x.startswith(" "))
except Exception, e:
ret = "[Could not run dumpasn1: %s]" % e
finally:
os.unlink(fn)
return ret
def dumpasn1(self):
"""
Pretty print an ASN.1 DER object using cryptlib dumpasn1 tool.
Use a temporary file rather than popen4() because dumpasn1 uses
seek() when decoding ASN.1 content nested in OCTET STRING values.
"""
ret = None
fn = "dumpasn1.%d.tmp" % os.getpid()
try:
f = open(fn, "wb")
f.write(self.get_DER())
f.close()
p = subprocess.Popen(("dumpasn1", "-a", fn), stdout = subprocess.PIPE, stderr = subprocess.STDOUT)
ret = "\n".join(x for x in p.communicate()[0].splitlines() if x.startswith(" "))
except Exception, e:
ret = "[Could not run dumpasn1: %s]" % e
finally:
os.unlink(fn)
return ret
def dumpasn1(self):
"""
Pretty print an ASN.1 DER object using cryptlib dumpasn1 tool.
Use a temporary file rather than popen4() because dumpasn1 uses
seek() when decoding ASN.1 content nested in OCTET STRING values.
"""
ret = None
fn = "dumpasn1.%d.tmp" % os.getpid()
try:
f = open(fn, "wb")
f.write(self.get_DER())
f.close()
p = subprocess.Popen(("dumpasn1", "-a", fn), stdout = subprocess.PIPE, stderr = subprocess.STDOUT)
ret = "\n".join(x for x in p.communicate()[0].splitlines() if x.startswith(" "))
except Exception, e:
ret = "[Could not run dumpasn1: %s]" % e
finally:
os.unlink(fn)
return ret
def dumpasn1(self):
"""
Pretty print an ASN.1 DER object using cryptlib dumpasn1 tool.
Use a temporary file rather than popen4() because dumpasn1 uses
seek() when decoding ASN.1 content nested in OCTET STRING values.
"""
ret = None
fn = "dumpasn1.%d.tmp" % os.getpid()
try:
f = open(fn, "wb")
f.write(self.get_DER())
f.close()
p = subprocess.Popen(("dumpasn1", "-a", fn), stdout = subprocess.PIPE, stderr = subprocess.STDOUT)
ret = "\n".join(x for x in p.communicate()[0].splitlines() if x.startswith(" "))
except Exception, e:
ret = "[Could not run dumpasn1: %s]" % e
finally:
os.unlink(fn)
return ret
def dumpasn1(self):
"""
Pretty print an ASN.1 DER object using cryptlib dumpasn1 tool.
Use a temporary file rather than popen4() because dumpasn1 uses
seek() when decoding ASN.1 content nested in OCTET STRING values.
"""
ret = None
fn = "dumpasn1.%d.tmp" % os.getpid()
try:
f = open(fn, "wb")
f.write(self.get_DER())
f.close()
p = subprocess.Popen(("dumpasn1", "-a", fn), stdout = subprocess.PIPE, stderr = subprocess.STDOUT)
ret = "\n".join(x for x in p.communicate()[0].splitlines() if x.startswith(" "))
except Exception, e:
ret = "[Could not run dumpasn1: %s]" % e
finally:
os.unlink(fn)
return ret
def keys(self):
"""Return a list of usernames in the database.
@rtype: list
@return: The usernames in the database.
"""
if self.db == None:
raise AssertionError("DB not open")
self.lock.acquire()
try:
usernames = self.db.keys()
finally:
self.lock.release()
usernames = [u for u in usernames if not u.startswith("--Reserved--")]
return usernames
def find_test_run_time_diff(test_id, run_time):
times_db_path = os.path.join(os.path.join(os.getcwd(), '.testrepository'),
'times.dbm')
if os.path.isfile(times_db_path):
try:
test_times = dbm.open(times_db_path)
except Exception:
return False
try:
avg_runtime = float(test_times.get(str(test_id), False))
except Exception:
try:
avg_runtime = float(test_times[str(test_id)])
except Exception:
avg_runtime = False
if avg_runtime and avg_runtime > 0:
run_time = float(run_time.rstrip('s'))
perc_diff = ((run_time - avg_runtime) / avg_runtime) * 100
return perc_diff
return False
def open(file, flag='r', mode=0666):
"""Open or create database at path given by *file*.
Optional argument *flag* can be 'r' (default) for read-only access, 'w'
for read-write access of an existing database, 'c' for read-write access
to a new or existing database, and 'n' for read-write access to a new
database.
Note: 'r' and 'w' fail if the database doesn't exist; 'c' creates it
only if it doesn't exist; and 'n' always creates a new database.
"""
# guess the type of an existing database
from whichdb import whichdb
result=whichdb(file)
if result is None:
# db doesn't exist
if 'c' in flag or 'n' in flag:
# file doesn't exist and the new
# flag was used so use default type
mod = _defaultmod
else:
raise error, "need 'c' or 'n' flag to open new db"
elif result == "":
# db type cannot be determined
raise error, "db type could not be determined"
else:
mod = __import__(result)
return mod.open(file, flag, mode)
def __getBIdCache(self):
if self.__buildIdCache is None:
self.__buildIdCache = dbm.open(".bob-buildids.dbm", 'c')
#self.__buildIdCache = {}
return self.__buildIdCache
def load(cls, cacheName, cacheKey):
try:
db = dbm.open(cacheName, "r")
persistedCacheKey = db.get(b'vsn')
if cacheKey == persistedCacheKey:
return cls(db, db[b'root'])
else:
db.close()
except OSError:
pass
except dbm.error:
pass
return None
def create(cls, cacheName, cacheKey, root):
try:
db = dbm.open(cacheName, 'n')
try:
db[b'root'] = rootKey = PkgGraphNode.__convertPackageToGraph(db, root)
db[b'vsn'] = cacheKey
finally:
db.close()
return cls(dbm.open(cacheName, 'r'), rootKey)
except dbm.error as e:
raise BobError("Cannot save internal state: " + str(e))
def __enter__(self):
try:
self.__db = dbm.open(".bob-adb", 'c')
except dbm.error as e:
raise BobError("Cannot open cache: " + str(e))
return self
def open(file, flag='r', mode=0666):
"""Open or create database at path given by *file*.
Optional argument *flag* can be 'r' (default) for read-only access, 'w'
for read-write access of an existing database, 'c' for read-write access
to a new or existing database, and 'n' for read-write access to a new
database.
Note: 'r' and 'w' fail if the database doesn't exist; 'c' creates it
only if it doesn't exist; and 'n' always creates a new database.
"""
# guess the type of an existing database
from whichdb import whichdb
result=whichdb(file)
if result is None:
# db doesn't exist
if 'c' in flag or 'n' in flag:
# file doesn't exist and the new
# flag was used so use default type
mod = _defaultmod
else:
raise error, "need 'c' or 'n' flag to open new db"
elif result == "":
# db type cannot be determined
raise error, "db type could not be determined"
else:
mod = __import__(result)
return mod.open(file, flag, mode)
def __init__(self, service_name, storage_name, **kwargs):
path = os.path.join(kwargs["path"], service_name)
os.makedirs(path, exist_ok=True)
self.db = dbm.open(os.path.join(path, storage_name + ".db"), "cs")
def __init__(self, root, **kwargs):
self.root = root
self.cfg = kwargs
self.http_path = kwargs.get("http_path", "github")
self.oauth = github.OAuth(**root.config.secrets[self.cfg["name"]])
store = kwargs["data-path"]
os.makedirs(store, exist_ok=True)
self.users = dbm.open(os.path.join(store, "users.db"), "cs")
self.orgs = dbm.open(os.path.join(store, "orgs.db"), "cs")
session_store = dbm.open(os.path.join(store, "sessions.db"), "cs")
self.ss = SessionStore(session_store, kwargs.get("cookie_name", "ghs"))
def cb_job_started(self, job):
path = os.path.join(self.config["logs_path"], job.event.id,
job.config["name"])
os.makedirs(path)
path = os.path.join(path, "console.txt")
outfile = open(path, "wb")
job.console_callbacks.add(lambda s,d: outfile.write(d))
def setUp(self):
self.db = shelfdb.open('test_data/db')
self.assertIsInstance(self.db, shelfdb.shelf.DB)
def test_shelf_open_close(self):
self.db.shelf('user')
try:
dbm.open('test_data/db/user')
except dbm.gnu.error as e:
self.assertEqual(e.errno, 11)
self.db.close()
self.assertIsInstance(self.db._shelf['user']._shelf.dict,
shelve._ClosedDict)
def setUp(self):
self.db = shelfdb.open('test_data/db')
self.user_list = [
{'name': 'Jan'},
{'name': 'Feb'},
{'name': 'Mar'},
]
for user in self.user_list:
user['_id'] = self.db.shelf('user').insert(user)
# Check if _id is a valid uuid1
id_ = uuid.UUID(user['_id'], version=1)
self.assertEqual(user['_id'].replace('-', ''), id_.hex)
def OnSeveResult(self,event):
fn = open('catalog.txt','w+')
fn2 = open('extract.txt',"w+")
leftRule = self.lf.GetText()
rightRule = self.rt.GetText()
fn.writelines(leftRule)
fn2.writelines(rightRule)
fn.close()
fn2.close()
demo_local_disk_file_persistence.py 文件源码
项目:SmallReptileTraining
作者: yanbober
项目源码
文件源码
阅读 17
收藏 0
点赞 0
评论 0
def save(self, data):
with open('NormalFilePersistence.txt', 'w') as open_file:
open_file.write(data)
demo_local_disk_file_persistence.py 文件源码
项目:SmallReptileTraining
作者: yanbober
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def load(self):
with open('NormalFilePersistence.txt', 'r') as open_file:
return open_file.read()
demo_local_disk_file_persistence.py 文件源码
项目:SmallReptileTraining
作者: yanbober
项目源码
文件源码
阅读 15
收藏 0
点赞 0
评论 0
def save(self, key, value):
try:
dbm_file = dbm.open('DBMPersistence', 'c')
dbm_file[key] = str(value)
finally:
dbm_file.close()
demo_local_disk_file_persistence.py 文件源码
项目:SmallReptileTraining
作者: yanbober
项目源码
文件源码
阅读 18
收藏 0
点赞 0
评论 0
def load(self, key):
try:
dbm_file = dbm.open('DBMPersistence', 'r')
if key in dbm_file:
result = dbm_file[key]
else:
result = None
finally:
dbm_file.close()
return result
demo_local_disk_file_persistence.py 文件源码
项目:SmallReptileTraining
作者: yanbober
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def save(self, obj):
with open('PicklePersistence', 'wb') as pickle_file:
pickle.dump(obj, pickle_file)
demo_local_disk_file_persistence.py 文件源码
项目:SmallReptileTraining
作者: yanbober
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def save(self, key, obj):
try:
shelve_file = shelve.open('ShelvePersistence')
shelve_file[key] = obj
finally:
shelve_file.close()
demo_local_disk_file_persistence.py 文件源码
项目:SmallReptileTraining
作者: yanbober
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def load(self, key):
try:
shelve_file = shelve.open('ShelvePersistence')
if key in shelve_file:
result = shelve_file[key]
else:
result = None
finally:
shelve_file.close()
return result