def write():
os.remove(filename)
cap = cv2.VideoCapture(0)
db = shelve.open(filename)
imgs = []
data = range(100)
for i in range(100):
ret, frame = cap.read()
if ret:
# jpg = frame # 29 MB
# jpg = cv2.imencode('.jpg', frame) # make much smaller (1.9MB), otherwise 29MB
jpg = cv2.imencode('.jpg', frame)[1].tostring() # no bennefit with doing string (1.9MB)
imgs.append(jpg)
print('frame[{}] {}'.format(i, frame.shape))
time.sleep(0.03)
db['imgs'] = imgs
db['data'] = data
cap.release()
db.close()
python类open()的实例源码
def open(self):
if self.opened:
return
self.lock = open(SETTINGS.CACHE_PATH + '.lock', 'ab')
try:
fcntl.flock(self.lock, fcntl.LOCK_EX | fcntl.LOCK_NB)
mode = 'c'
except IOError:
logger.warn("Cache locked, using read-only")
mode = 'r'
self.lock.close()
self.lock = None
try:
self.storage = shelve.open(SETTINGS.CACHE_PATH, mode)
except Exception as e:
if mode != 'c':
raise
logger.warn("Dropping corrupted cache on %s", e)
self.lock.truncate(0)
self.storage = shelve.open(SETTINGS.CACHE_PATH, mode)
self.opened = True
def __init__(self, path_entry):
# Loading shelve causes an import recursive loop when it
# imports dbm, and we know we are not going to load the
# module # being imported, so when we seem to be
# recursing just ignore the request so another finder
# will be used.
if ShelveFinder._maybe_recursing:
raise ImportError
try:
# Test the path_entry to see if it is a valid shelf
try:
ShelveFinder._maybe_recursing = True
with shelve.open(path_entry, 'r'):
pass
finally:
ShelveFinder._maybe_recursing = False
except Exception as e:
print('shelf could not import from {}: {}'.format(
path_entry, e))
raise
else:
print('shelf added to import path:', path_entry)
self.path_entry = path_entry
return
def new_profile(profile_name, address, port, username, password, activate):
"""
create a network proxy configuration profile
"""
store = shelve.open(db_file)
try:
store[str(profile_name)] = {
'address': address,
'port': port,
'username': username,
'password': password
}
click.echo("Profile '{}' successfully created".format(profile_name))
finally:
store.close()
if activate:
activate_profile(profile_name)
def activate_profile(profile_name):
store = shelve.open(db_file)
try:
if profile_name == "":
util.write(None)
try:
del store['active']
finally:
click.echo("No proxy mode activated")
else:
if profile_name is not "active":
util.write(store[str(profile_name)])
store['active'] = str(profile_name)
click.echo("Profile '{}' successfully activated".
format(profile_name))
except KeyError:
click.echo("No such profile '{}'".format(str(profile_name)))
finally:
store.close()
def delete_profile(profile_name):
"""
delete specified profile
"""
store = shelve.open(db_file)
try:
if profile_name is not "active":
del store[str(profile_name)]
try:
if str(store["active"]) == profile_name:
del store["active"]
except KeyError:
pass
click.echo("Profile '{}' successfully deleted".format(str(profile_name)))
except KeyError:
click.echo("No such profile '{}'".format(str(profile_name)))
finally:
store.close()
def _get_data(url):
"""Helper function to get data over http or from a local file"""
if url.startswith('http://'):
# Try Python 2, use Python 3 on exception
try:
resp = urllib.urlopen(url)
encoding = resp.headers.dict.get('content-encoding', 'plain')
except AttributeError:
resp = urllib.request.urlopen(url)
encoding = resp.headers.get('content-encoding', 'plain')
data = resp.read()
if encoding == 'plain':
pass
elif encoding == 'gzip':
data = StringIO(data)
data = gzip.GzipFile(fileobj=data).read()
else:
raise RuntimeError('unknown encoding')
else:
with open(url, 'r') as fid:
data = fid.read()
return data
def get_data(url, gallery_dir):
"""Persistent dictionary usage to retrieve the search indexes"""
# shelve keys need to be str in python 2
if sys.version_info[0] == 2 and isinstance(url, unicode):
url = url.encode('utf-8')
cached_file = os.path.join(gallery_dir, 'searchindex')
search_index = shelve.open(cached_file)
if url in search_index:
data = search_index[url]
else:
data = _get_data(url)
search_index[url] = data
search_index.close()
return data
def save_game():
""" open a new empty shelve (possibly overwriting an old one) to write the game data """
with shelve.open('savegames/savegame', 'n') as savefile:
gv.cursor.deactivate()
savefile['map'] = gv.game_map
savefile['objects'] = gv.game_objects
savefile['log'] = gv.game_log
savefile['gamestate'] = gv.gamestate
savefile['dlevel'] = gv.dungeon_level
# Store the index of special objects, so they can be later restored from the gv.game_objects array
savefile['p_index'] = gv.game_objects.index(gv.player)
savefile['c_index'] = gv.game_objects.index(gv.cursor)
savefile['sd_index'] = gv.game_objects.index(gv.stairs_down)
savefile['su_index'] = gv.game_objects.index(gv.stairs_up)
savefile.close()
def load_game():
""" load an existing savegame """
with shelve.open('savegames/savegame', 'r') as savefile:
gv.game_map = savefile['map']
gv.game_objects = savefile['objects']
gv.game_log = savefile['log']
gv.gamestate = savefile['gamestate']
gv.dungeon_level = savefile['dlevel']
# Restore special objects
gv.player = gv.game_objects[savefile['p_index']]
gv.cursor = gv.game_objects[savefile['c_index']]
gv.stairs_down = gv.game_objects[savefile['sd_index']]
gv.stairs_up = gv.game_objects[savefile['su_index']]
msgbox('Welcome back stranger to level {0} of {1}!'.format(gv.dungeon_level, settings.DUNGEONNAME),width=35, text_color=colors.red)
def extract_all(data):
resp = dict(data)
# text = get_sanitized(data)
text = data['text']
# request feature extraction.
text_hash = hashlib.sha256(text.encode('ascii', 'ignore')).hexdigest()
print 'text_hash', text_hash
cache_db = shelve.open(path.join(CACHE_DIR, 'feature'))
if not cache_db.has_key(text_hash):
print 'new call'
call = alchemyapi.combined('text', text)
cache_db[text_hash] = call
else:
print 'cached call'
call = cache_db[text_hash]
cache_db.close()
# filter results.
whitelist = ['concepts', 'entities', 'keywords', 'taxonomy']
for key in whitelist:
if key not in call:
resp[key] = []
continue
resp[key] = call[key]
return resp
def proj_create(args, config, _extra_args):
"""Creates a new em-managed project."""
tmpl_repo = config['project']['template_repo']
try:
pygit2.clone_repository(tmpl_repo, args.dest)
# delete history of template
shutil.rmtree(osp.join(args.dest, '.git'), ignore_errors=True)
pygit2.init_repository(args.dest)
except ValueError:
pass # already in a repo
for em_dir in ['experiments', 'data']:
dpath = osp.join(args.dest, em_dir)
if not osp.isdir(dpath):
os.mkdir(dpath)
with shelve.open(osp.join(args.dest, '.em')) as emdb:
emdb['__em__'] = {}
def resume(args, config, prog_args):
"""Resume a stopped experiment."""
name = args.name
repo = pygit2.Repository('.')
with shelve.open('.em') as emdb:
if name not in emdb:
return _die(E_NO_EXP.format(name))
info = emdb[name]
if 'pid' in info or info.get('status') == 'running':
return _die(E_IS_RUNNING.format(name))
try:
repo.lookup_branch(name)
except pygit2.GitError:
return _die(E_NO_EXP.format(name))
prog_args.append('--resume')
if args.epoch:
prog_args.append(args.epoch)
return _run_job(name, config, args.gpu, prog_args, args.background)
def list_experiments(args, _config, _extra_args):
"""List experiments."""
import subprocess
if args.filter:
filter_key, filter_value = args.filter.split('=')
def _filt(stats):
return filter_key in stats and stats[filter_key] == filter_value
with shelve.open('.em') as emdb:
if args.filter:
names = {name
for name, info in sorted(emdb.items()) if _filt(info)}
else:
names = emdb.keys()
names -= {EM_KEY}
if not names:
return
subprocess.run(
['column'], input='\n'.join(sorted(names)) + '\n', encoding='utf8')
def show(args, _config, _extra_args):
"""Show details about an experiment."""
import pickle
import pprint
name = args.name
with shelve.open('.em') as emdb:
if name not in emdb or name == EM_KEY:
return _die(E_NO_EXP.format(name))
for info_name, info_val in sorted(emdb[name].items()):
if isinstance(info_val, datetime.date):
info_val = info_val.ctime()
print(f'{info_name}: {info_val}')
if not args.opts:
return
opts_path = _expath(name, 'run', 'opts.pkl')
with open(opts_path, 'rb') as f_opts:
print('\noptions:')
opts = pickle.load(f_opts)
cols = shutil.get_terminal_size((80, 20)).columns
pprint.pprint(vars(opts), indent=2, compact=True, width=cols)
def get_pdf(pdf_link):
# check whether value already existing in permanent storage:
pdf_name = pdf_link.rsplit('/', 1)[-1] # set filename according to last element of link
if not check_db(pdf_name) and not check_db(pdf_link):
# print 'Downloading: {}'.format(pdf_link)
try:
opener = urllib2.build_opener()
opener.addheaders = [('User-agent', USER_AGENT)]
r = opener.open(pdf_link)
path = tmp_dir + pdf_name
with open(path, "wb") as code: # 'w'
code.write(r.read())
# log successful download:
log_download('DOWNLOADED: {}'.format(pdf_link))
except Exception as e:
log_download('FAILURE: {} | {}'.format(pdf_link, e))
else:
log_download('File already downloaded: {}'.format(pdf_name))
def _get_data(url):
"""Helper function to get data over http(s) or from a local file"""
if urllib_parse.urlparse(url).scheme in ('http', 'https'):
resp = urllib_request.urlopen(url)
encoding = resp.headers.get('content-encoding', 'plain')
data = resp.read()
if encoding == 'plain':
data = data.decode('utf-8')
elif encoding == 'gzip':
data = BytesIO(data)
data = gzip.GzipFile(fileobj=data).read().decode('utf-8')
else:
raise RuntimeError('unknown encoding')
else:
with codecs.open(url, mode='r', encoding='utf-8') as fid:
data = fid.read()
return data
def get_data(url, gallery_dir):
"""Persistent dictionary usage to retrieve the search indexes"""
# shelve keys need to be str in python 2
if sys.version_info[0] == 2 and isinstance(url, unicode):
url = url.encode('utf-8')
cached_file = os.path.join(gallery_dir, 'searchindex')
search_index = shelve.open(cached_file)
if url in search_index:
data = search_index[url]
else:
data = _get_data(url)
search_index[url] = data
search_index.close()
return data
def fill_tf_idf_shelve(self):
tf_idf_shelve = shelve.open(self.tf_idf_shelve_file_name, writeback=True)
if TF not in tf_idf_shelve:
tf_idf_shelve[TF] = {}
if DF not in tf_idf_shelve:
tf_idf_shelve[DF] = {}
if D not in tf_idf_shelve:
tf_idf_shelve[D] = 0
if TF_IDF not in tf_idf_shelve:
tf_idf_shelve[TF_IDF] = {}
if CENTROID not in tf_idf_shelve:
tf_idf_shelve[CENTROID] = {}
for action,trigger_txt in self.trigger_dict.iteritems():
if action not in tf_idf_shelve[TF].keys():
trigger = self.tokenize_text(trigger_txt)
tf_idf_shelve[TF][action] = Counter(trigger)
for word in unique(trigger):
if word not in tf_idf_shelve[DF].keys():
tf_idf_shelve[DF][word] = 0
tf_idf_shelve[DF][word] += 1
tf_idf_shelve[D] = len(tf_idf_shelve[TF])
tf_idf_shelve.close()
self.compute_tf_idf()
self.compute_centroids()
def add_list_of_words_in_w2v_model(self, unknown_words):
huge_w2v_model_file = open(self.w2v_huge_model_path, "r")
current_w2v_model_file = open(self.w2v_model_path, "a")
line = huge_w2v_model_file.readline()
unknown_words_left = len(unknown_words)
while line and unknown_words_left:
word = line.split()[0]
if word in unknown_words:
current_w2v_model_file.write(line)
unknown_words = unknown_words - set([word])
unknown_words_left -= 1
line = huge_w2v_model_file.readline()
for word in list(unknown_words):
random_position = random(self.w2v_model.vector_size)*2-1
current_w2v_model_file.write(" ".join(([word]+[str(x) for x in random_position])))
print "warning random positions introduced for new words ... in the future this should be solved"
current_w2v_model_file.close()
huge_w2v_model_file.close()
def add_list_of_words_in_w2v_model(self, unknown_words):
huge_w2v_model_file = open(self.w2v_huge_model_path, "r")
current_w2v_model_file = open(self.w2v_model_path, "a")
line = huge_w2v_model_file.readline()
unknown_words_left = len(unknown_words)
while line and unknown_words_left:
word = line.split()[0]
if word in unknown_words:
current_w2v_model_file.write(line)
unknown_words = unknown_words - set([word])
unknown_words_left -= 1
line = huge_w2v_model_file.readline()
for word in list(unknown_words):
random_position = random(self.w2v_model.vector_size)*2-1
current_w2v_model_file.write(" ".join(([word]+[str(x) for x in random_position])))
print "warning random positions introduced for new words ... in the future this should be solved"
current_w2v_model_file.close()
huge_w2v_model_file.close()
def list_files(self, path):
storage = self.args.storage
shelve_name = os.path.join(os.path.dirname(__file__), storage)
db = shelve.open(shelve_name)
status = []
for key, value in db.items():
value['hash'] = key
status.append(value)
status.sort(lambda a,b: cmp(a['filename'],b['filename']))
stream = StringIO.StringIO()
writer = csv.writer(stream)
writer.writerow(('FILENAME','GIVEN_NAME','CREATED_ON','DISCOVERED_ON','UPLOADED_ON'))
for params in status:
writer.writerow((params['filename'], params['given_name'],params['camera'],
params['timestamp'], params['discovered_on'], params['uploaded_on']))
return stream.getvalue()
def _get_data(url):
"""Helper function to get data over http or from a local file"""
if url.startswith('http://'):
# Try Python 2, use Python 3 on exception
try:
resp = urllib.urlopen(url)
encoding = resp.headers.dict.get('content-encoding', 'plain')
except AttributeError:
resp = urllib.request.urlopen(url)
encoding = resp.headers.get('content-encoding', 'plain')
data = resp.read()
if encoding == 'plain':
pass
elif encoding == 'gzip':
data = StringIO(data)
data = gzip.GzipFile(fileobj=data).read()
else:
raise RuntimeError('unknown encoding')
else:
with open(url, 'r') as fid:
data = fid.read()
return data
def get_data(url, gallery_dir):
"""Persistent dictionary usage to retrieve the search indexes"""
# shelve keys need to be str in python 2
if sys.version_info[0] == 2 and isinstance(url, unicode):
url = url.encode('utf-8')
cached_file = os.path.join(gallery_dir, 'searchindex')
search_index = shelve.open(cached_file)
if url in search_index:
data = search_index[url]
else:
data = _get_data(url)
search_index[url] = data
search_index.close()
return data
def read():
db = shelve.open(filename)
imgs = db['imgs']
data = db['data']
for i in range(len(imgs)):
d = data[i]
print(i, d)
img = imgs[i]
img = np.fromstring(img, np.uint8)
frame = cv2.imdecode(img, 1)
print('frame[{}] {}'.format(i, frame.shape))
cv2.imshow('camera', frame)
cv2.waitKey(300)
print('bye ...')
cv2.destroyAllWindows()
db.close()
def get_latest_episode(self, url, media=False):
storage_path = join(self.file_system.path, 'feedcache')
LOGGER.debug("storage_path:%s" % storage_path)
storage = shelve.open(storage_path)
ttl = 60 * 60
link = ""
try:
fc = cache.Cache(storage, timeToLiveSeconds=ttl)
parsed_data = fc.fetch(url)
print "parsed_data.feed.title:", parsed_data.feed.title
for entry in parsed_data.entries:
pprint(entry)
if media:
media_content = entry.media_content
if media_content:
link = entry.media_content[0]['url']
else:
link = entry.link
if link:
break
finally:
storage.close()
return link
def dump(self, result_storage):
# The 'tainted' attribute is automatically set to 'True'
# if the dataset required for an answer test is missing
# (see can_run_ds() and can_run_sim()).
# This logic check prevents creating a shelve with empty answers.
storage_is_tainted = result_storage.get('tainted', False)
if self.answer_name is None or storage_is_tainted:
return
# Store data using shelve
ds = shelve.open(self.answer_name, protocol=-1)
for ds_name in result_storage:
answer_name = "%s" % ds_name
if answer_name in ds:
mylog.info("Overwriting %s", answer_name)
ds[answer_name] = result_storage[ds_name]
ds.close()
def val_dump(rels, db):
"""
Make a ``Valuation`` from a list of relation metadata bundles and dump to
persistent database.
:param rels: bundle of metadata needed for constructing a concept
:type rels: list of dict
:param db: name of file to which data is written.
The suffix '.db' will be automatically appended.
:type db: str
"""
concepts = process_bundle(rels).values()
valuation = make_valuation(concepts, read=True)
db_out = shelve.open(db, 'n')
db_out.update(valuation)
db_out.close()
def label_indivs(valuation, lexicon=False):
"""
Assign individual constants to the individuals in the domain of a ``Valuation``.
Given a valuation with an entry of the form ``{'rel': {'a': True}}``,
add a new entry ``{'a': 'a'}``.
:type valuation: Valuation
:rtype: Valuation
"""
# collect all the individuals into a domain
domain = valuation.domain
# convert the domain into a sorted list of alphabetic terms
# use the same string as a label
pairs = [(e, e) for e in domain]
if lexicon:
lex = make_lex(domain)
with open("chat_pnames.cfg", 'w') as outfile:
outfile.writelines(lex)
# read the pairs into the valuation
valuation.update(pairs)
return valuation
def auth(self):
connect_database = shelve.open("../database/database")
database = connect_database.get("data")
verify = UserVerify(database)
conn = self.request
str()
print(bytes(u"??????"))
conn.send(bytes("??????:"))
username = conn.recv(1024)
conn.send(bytes("?????:"))
password = conn.recv(1024)
login = None
if username and password:
try:
login = verify.login(user=username, password=password)
except SystemExit as e:
print(e)
conn.close()
if not login:
return False
else:
return True