def error(self, message, ensure_ascii=False):
"""Stop analyzer with an error message. Changing ensure_ascii can be helpful when stucking
with ascii <-> utf-8 issues. Additionally, the input as returned, too. Maybe helpful when dealing with errors.
:param message: Error message
:param ensure_ascii: Force ascii output. Default: False"""
analyzerInput = self.__input
if 'password' in analyzerInput.get('config', {}):
analyzerInput['config']['password'] = 'REMOVED'
if 'key' in analyzerInput.get('config', {}):
analyzerInput['config']['key'] = 'REMOVED'
if 'apikey' in analyzerInput.get('config', {}):
analyzerInput['config']['apikey'] = 'REMOVED'
if 'api_key' in analyzerInput.get('config', {}):
analyzerInput['config']['api_key'] = 'REMOVED'
json.dump({'success': False,
'input': analyzerInput,
'errorMessage': message},
self.fpoutput,
ensure_ascii=ensure_ascii)
# Force exit after error
sys.exit(1)
python类dump()的实例源码
def register(name):
# hit api to see if name is already registered
if check_name(name)['status'] == 'error':
print('{} already registered.'.format(name))
else:
# generate new keypair
(pub, priv) = rsa.newkeys(512)
if os.path.exists(KEY_LOCATION) == False:
os.mkdir(KEY_LOCATION)
# save to disk
with open('{}/.key'.format(KEY_LOCATION), 'wb') as f:
pickle.dump((pub, priv), f, pickle.HIGHEST_PROTOCOL)
r = requests.post('{}/names'.format(API_LOCATION), data = {'name' : name, 'n' : pub.n, 'e' : pub.e})
if r.json()['status'] == 'success':
print('Successfully registered new name: {}'.format(name))
else:
print('Error registering name: {}'.format(name))
def report(self, full_report, ensure_ascii=False):
"""Returns a json dict via stdout.
:param full_report: Analyzer results as dict.
:param ensure_ascii: Force ascii output. Default: False"""
summary = {}
try:
summary = self.summary(full_report)
except:
pass
report = {
'success': True,
'summary': summary,
'artifacts': self.artifacts(full_report),
'full': full_report
}
json.dump(report, self.fpoutput, ensure_ascii=ensure_ascii)
def write(self, path=None, fileobj=None, legacy=False, skip_unknown=True):
if [path, fileobj].count(None) != 1:
raise ValueError('Exactly one of path and fileobj is needed')
self.validate()
if legacy:
if self._legacy:
legacy_md = self._legacy
else:
legacy_md = self._to_legacy()
if path:
legacy_md.write(path, skip_unknown=skip_unknown)
else:
legacy_md.write_file(fileobj, skip_unknown=skip_unknown)
else:
if self._legacy:
d = self._from_legacy()
else:
d = self._data
if fileobj:
json.dump(d, fileobj, ensure_ascii=True, indent=2,
sort_keys=True)
else:
with codecs.open(path, 'w', 'utf-8') as f:
json.dump(d, f, ensure_ascii=True, indent=2,
sort_keys=True)
def save(self, pypi_version, current_time):
# Check to make sure that we own the directory
if not check_path_owner(os.path.dirname(self.statefile_path)):
return
# Now that we've ensured the directory is owned by this user, we'll go
# ahead and make sure that all our directories are created.
ensure_dir(os.path.dirname(self.statefile_path))
# Attempt to write out our version check file
with lockfile.LockFile(self.statefile_path):
if os.path.exists(self.statefile_path):
with open(self.statefile_path) as statefile:
state = json.load(statefile)
else:
state = {}
state[sys.prefix] = {
"last_check": current_time.strftime(SELFCHECK_DATE_FMT),
"pypi_version": pypi_version,
}
with open(self.statefile_path, "w") as statefile:
json.dump(state, statefile, sort_keys=True,
separators=(",", ":"))
data_preprocessing_video.py 文件源码
项目:AVSR-Deep-Speech
作者: pandeydivesh15
项目源码
文件源码
阅读 41
收藏 0
点赞 0
评论 0
def encode_and_store(batch_x, output_dir, file_name):
"""
Args:
1. batch_x: Batch of 32*32 images which will go inside our autoencoder.
2. output_dir: Dir path for storing all encoded features for given `batch_x`.
Features will be stored in the form of JSON file.
3. file_name: File name of JSON file.
"""
global AUTO_ENCODER
if AUTO_ENCODER is None:
load_AE()
norm_batch = np.zeros(batch_x.shape)
for i in range(len(batch_x)):
norm_batch[i] = (batch_x[i] - np.mean(batch_x[i])) / np.std(batch_x[i])
output_dict = {
'name' : file_name,
'encoded': AUTO_ENCODER.transform(norm_batch).tolist()}
with open(output_dir+file_name+'.json', 'w') as f:
json.dump(output_dict, f)
def write_log(target_uri, doi, pmid, found_rrids, head, body, text, h):
now = datetime.now().isoformat()[0:19].replace(':','').replace('-','')
frv = list(set(found_rrids.values()))
if len(frv) == 1 and frv[0] == 'Already Annotated':
head, body, text = None, None, None
log = {'target_uri':target_uri,
'group':h.group,
'doi':doi,
'pmid':pmid,
'found_rrids':found_rrids,
'count':len(found_rrids),
'head':head,
'body':body,
'text':text,
}
fname = 'logs/' + 'rrid-%s.json' % now
with open(fname, 'wt') as f:
json.dump(log, f, sort_keys=True, indent=4)
def export(metadata, start, end, container_image_pattern):
queries = []
metadata["start"] = start.isoformat() + "Z"
metadata["end"] = end.isoformat() + "Z"
metadata["services"] = []
ts = datetime.utcnow().strftime("%Y%m%d%H%M%S-")
path = os.path.join(metadata["metrics_export"], ts + metadata["measurement_name"])
if not os.path.isdir(path):
os.makedirs(path)
for app in APPS:
metadata["services"].append(dump_app(app, path, start, end, container_image_pattern))
with open(os.path.join(path, "metadata.json"), "w+") as f:
json.dump(metadata, f, cls=Encoder, sort_keys=True, indent=4)
f.flush()
def keystone_auth(auth_details):
try:
if auth_details['OS_AUTH_URL'].endswith('v3'):
k_client = k3_client
else:
k_client = k2_client
tenant_name = auth_details['OS_TENANT_NAME']
keystone = k_client.Client(username=auth_details['OS_USERNAME'],
password=auth_details['OS_PASSWORD'],
tenant_name=tenant_name,
auth_url=auth_details['OS_AUTH_URL'])
except Exception as e:
status_err(str(e))
try:
with open(TOKEN_FILE, 'w') as token_file:
json.dump(keystone.auth_ref, token_file)
except IOError:
# if we can't write the file we go on
pass
return keystone.auth_ref
def write(self, path=None, fileobj=None, legacy=False, skip_unknown=True):
if [path, fileobj].count(None) != 1:
raise ValueError('Exactly one of path and fileobj is needed')
self.validate()
if legacy:
if self._legacy:
legacy_md = self._legacy
else:
legacy_md = self._to_legacy()
if path:
legacy_md.write(path, skip_unknown=skip_unknown)
else:
legacy_md.write_file(fileobj, skip_unknown=skip_unknown)
else:
if self._legacy:
d = self._from_legacy()
else:
d = self._data
if fileobj:
json.dump(d, fileobj, ensure_ascii=True, indent=2,
sort_keys=True)
else:
with codecs.open(path, 'w', 'utf-8') as f:
json.dump(d, f, ensure_ascii=True, indent=2,
sort_keys=True)
def save(self, pypi_version, current_time):
# Check to make sure that we own the directory
if not check_path_owner(os.path.dirname(self.statefile_path)):
return
# Now that we've ensured the directory is owned by this user, we'll go
# ahead and make sure that all our directories are created.
ensure_dir(os.path.dirname(self.statefile_path))
# Attempt to write out our version check file
with lockfile.LockFile(self.statefile_path):
if os.path.exists(self.statefile_path):
with open(self.statefile_path) as statefile:
state = json.load(statefile)
else:
state = {}
state[sys.prefix] = {
"last_check": current_time.strftime(SELFCHECK_DATE_FMT),
"pypi_version": pypi_version,
}
with open(self.statefile_path, "w") as statefile:
json.dump(state, statefile, sort_keys=True,
separators=(",", ":"))
def crypto_spot(bot, trigger):
from_cur = trigger.group(1)
global last_prices
from_cur = from_cur.lower()
if from_cur not in main_coins:
bot.say("Invalid currency!")
api_result = requests.get(single_url.format(from_cur)).json()
if from_cur not in last_prices:
last_prices[from_cur] = 0
digits = False if from_cur.lower()=='xrp' else True
diffStr = getDiffString(float(api_result["last_price"]), last_prices[from_cur], digits)
last_prices[from_cur] = float(api_result["last_price"])
with open('~/.sopel/cur_py_cache', 'w') as outfile:
json.dump(last_prices, outfile)
bot.say("{0}: ${1:.{2}f}{3}".format(from_cur, float(api_result["last_price"]), 2 if digits else 4, diffStr))
def write_store(self,store):
"""
Commit store to file.
"""
inner_data = json.dumps(store).encode('utf-8')
nonce = nacl.utils.random(nacl.secret.SecretBox.NONCE_SIZE)
enc_bytes = self._box.encrypt(inner_data,nonce)
enc_blob = bytes_to_hex_str(enc_bytes)
outer_data = {
'hash': self._hash,
'salt': bytes_to_hex_str(self._salt),
'iterations': self._iterations,
'enc_blob': enc_blob,
}
with open(self._path,'w',encoding='ascii') as fw:
json.dump(outer_data,fw)
def finish(self):
super().finish()
self.info(bold("writing templatebuiltins.js..."))
xrefs = self.env.domaindata["std"]["objects"]
templatebuiltins = {
"ttags": [
n for ((t, n), (k, a)) in xrefs.items()
if t == "templatetag" and k == "ref/templates/builtins"
],
"tfilters": [
n for ((t, n), (k, a)) in xrefs.items()
if t == "templatefilter" and k == "ref/templates/builtins"
],
}
outfilename = os.path.join(self.outdir, "templatebuiltins.js")
with open(outfilename, 'w') as fp:
fp.write('var django_template_builtins = ')
json.dump(templatebuiltins, fp)
fp.write(';\n')
def main(args, outs):
genomes = cr_matrix.GeneBCMatrices.load_genomes_from_h5(args.filtered_matrices)
chemistry = cr_matrix.GeneBCMatrices.load_chemistry_from_h5(args.filtered_matrices)
total_cells = cr_matrix.GeneBCMatrices.count_cells_from_h5(args.filtered_matrices)
summary = {'chemistry_description': chemistry, 'filtered_bcs_transcriptome_union': total_cells}
with open(outs.summary, 'w') as f:
json.dump(summary, f, indent=4, sort_keys=True)
sample_properties = cr_webshim.get_sample_properties(args.analysis_id, args.analysis_desc, genomes, version=martian.get_pipelines_version())
sample_data_paths = cr_webshim_data.SampleDataPaths(
summary_path=outs.summary,
analysis_path=args.analysis,
)
sample_data = cr_webshim.load_sample_data(sample_properties, sample_data_paths)
cr_webshim.build_web_summary_html(outs.web_summary, sample_properties, sample_data, PIPELINE_REANALYZE)
def join(args, outs, chunk_defs, chunk_outs):
matrix_attrs = cr_matrix.make_matrix_attrs_aggr(args.gem_group_index, "Unknown")
cr_matrix.concatenate_h5([chunk_out.raw_matrices_h5 for chunk_out in chunk_outs], outs.raw_matrices_h5, extra_attrs=matrix_attrs)
cr_matrix.concatenate_h5([chunk_out.filtered_matrices_h5 for chunk_out in chunk_outs], outs.filtered_matrices_h5, extra_attrs=matrix_attrs)
cr_matrix.concatenate_mex_dirs([chunk_out.raw_matrices_mex for chunk_out in chunk_outs], outs.raw_matrices_mex)
cr_matrix.concatenate_mex_dirs([chunk_out.filtered_matrices_mex for chunk_out in chunk_outs], outs.filtered_matrices_mex)
merged_molecules = [chunk_out.filtered_molecules for chunk_out in chunk_outs]
cr_mol_counter.MoleculeCounter.concatenate(outs.filtered_molecules, merged_molecules)
barcode_summaries = [chunk_out.barcode_summary_h5 for chunk_out in chunk_outs]
merge_barcode_summaries(barcode_summaries, outs.barcode_summary_h5)
# merge summaries
summary = merge_summaries(chunk_outs)
with open(outs.summary, 'w') as f:
json.dump(summary, f, indent=4, sort_keys=True)
def main(args, outs):
# Write read_chunk for consumption by Rust
with open("chunk_args.json", "w") as f:
json.dump(args.read_chunk, f)
output_path = martian.make_path("")
prefix = "fastq_chunk"
chunk_reads_args = ['chunk_reads', '--reads-per-fastq', str(args.reads_per_file), output_path, prefix, "--martian-args", "chunk_args.json"]
print "running chunk reads: [%s]" % str(chunk_reads_args)
subprocess.check_call(chunk_reads_args)
with open(os.path.join(output_path, "read_chunks.json")) as f:
chunk_results = json.load(f)
outs.out_chunks = []
# Write out a new chunk entry for each resulting chunk
for chunk in chunk_results:
print args.read_chunk
chunk_copy = args.read_chunk.copy()
print chunk_copy
chunk_copy['read_chunks'] = chunk
outs.out_chunks.append(chunk_copy)
def requests_with_cache(dir):
def decorator(func):
def wrapper(**kwargs):
cache_key = str(kwargs.get("param", "default.json"))
cache_url = dir + "/" + cache_key.replace("/", "-").replace("_", "-")
if os.path.isfile(cache_url):
with open(cache_url, 'r') as f:
print(cache_url)
return json.load(f)
with open(cache_url, 'w+') as f:
ret = func(**kwargs)
json.dump(ret, f)
return ret
return wrapper
return decorator
def main():
if len(sys.argv) == 1:
infile = sys.stdin
outfile = sys.stdout
elif len(sys.argv) == 2:
infile = open(sys.argv[1], 'rb')
outfile = sys.stdout
elif len(sys.argv) == 3:
infile = open(sys.argv[1], 'rb')
outfile = open(sys.argv[2], 'wb')
else:
raise SystemExit(sys.argv[0] + " [infile [outfile]]")
try:
obj = json.load(infile)
except ValueError, e:
raise SystemExit(e)
json.dump(obj, outfile, sort_keys=True, indent=4)
outfile.write('\n')
def output_json(svc_msg, service_types):
"""
Writes output results to a file
"""
global DATA2
outfname = 'device-scanner.json'
DATA2 = DATA2 + [{'serverid': svc_msg.getServerId(),
'devicename': svc_msg.getDeviceName(),
'devicedescription': svc_msg.getDeviceDescription(),
'hostname': svc_msg.getHostname(),
'portnumber': svc_msg.getPortNumber(),
'urlprefix': svc_msg.getUrlPrefix(),
'servicetypes': service_types,
}]
try:
with open(outfname, 'w') as outfile:
json.dump(DATA2, outfile)
except Exception:
print("You need to configure the webserver path" +
" if you want to output json")
7.05_winrate_change.py 文件源码
项目:fantasy-dota-heroes
作者: ThePianoDentist
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def main():
driver.get("https://www.dotabuff.com/heroes/winning?date=patch_7.04")
rows = driver.find_elements_by_xpath("//table/tbody/tr")
old_winrates = {}
for row in rows:
cells = row.find_elements_by_xpath("td")
hero = cells[1].text
winrate = float(cells[2].get_attribute("data-value"))
old_winrates[hero] = winrate
driver.get("https://www.dotabuff.com/heroes/winning?date=patch_7.05")
rows = driver.find_elements_by_xpath("//table/tbody/tr")
win_rate_diff = {}
for row in rows:
cells = row.find_elements_by_xpath("td")
hero = cells[1].text
winrate = float(cells[2].get_attribute("data-value"))
win_rate_diff[hero] = winrate - old_winrates[hero]
with open(os.environ.get('FDOTA') + '/fantasydota/junk/windiff_705', 'w') as f:
json.dump(win_rate_diff, f)
def add_manifest(name_module=None,version_modul=None,url=None):
if name_module == None :
return "please insert your name_module"
if version_modul == None :
version = "0.1.0"
if url == None:
url = ""
try:
filename = 'manifest.json'
with open(filename,'r') as data_file:
data_json = json.loads(data_file.read())
os.remove(filename)
new_data={'name_module':name_module,'version_module':version_modul,'url_module':url}
data_json['installed_module'].append(new_data)
with open(filename,'w') as data_file:
json.dump(data_json, data_file)
except Exception as e:
return e
def del_manifest(name_module=None):
try:
filename = 'manifest.json'
if name_module == None:
return "please insert your modul name"
with open(filename,'r') as data_file:
data_json = json.loads(data_file.read())
number_akhir = len(data_json['installed_module'])
number_awal = 0
for number_awal in xrange(number_awal,number_akhir):
jumlah = (number_awal+1)-1
if name_module == data_json['installed_module'][jumlah]['name_module']:
os.remove(filename)
del data_json['installed_module'][jumlah]
with open(filename,'w') as data_file:
json.dump(data_json, data_file)
else:
pass
return "modul has deleted"
except Exception as e:
pass
def create_requirement(name_module=None,version_module=None,url_endpoint=None,requirement=None,comment=None,url=None):
if comment is None:
comment = "my module name is {name}".format(name=name_module)
if requirement is None:
requirement = "openedoo_core"
if name_module==None:
return "please insert name module"
if version_module is None:
version_module = "0.1.0"
if url_endpoint is None:
url_endpoint = {'url_endpoint':''.format(url=name_module),'type':'function'}
else:
url_endpoint = {'url_endpoint':url_endpoint,'type':'end_point'}
data_json = {"name":name_module,
"version": version_module,
"requirement":requirement,
"pip_library":[],
"comment":comment,
"type":url_endpoint['type'],
"url":url,
"url_endpoint":url_endpoint['url_endpoint']}
filename = 'requirement.json'
with open('modules/{folder}/{filename}'.format(folder=name_module,filename=filename),'w') as data_file:
json.dump(data_json, data_file)
return "module has created"
def save(self):
global_step = self.sess.run(tf.train.get_global_step(self.graph))
if self.config['last_checkpoint'] == global_step:
if self.config['debug']:
print('Model has already been saved during the current global step.')
return
print('Saving to %s with global_step %d.' % (self.config['results_dir'], global_step))
self.saver.save(self.sess, os.path.join(self.config['results_dir'], 'checkpoint'), global_step)
self.config['last_checkpoint'] = global_step
# Also save the configuration
json_file = os.path.join(self.config['results_dir'], 'config.json')
with open(json_file, 'w') as f:
json.dump(self.config, f, cls=utilities.NumPyCompatibleJSONEncoder)
def save(self, config_file_path):
"""Save credentials to file.
Args:
config_file_path (path-like object): Path to file containing
credentials. The file os opened and closed by this method.
"""
with open(config_file_path, 'w') as f:
data = {}
if self.lastfm is not None:
data['lastfm'] = {'session_key': self.lastfm.session_key}
if self.spotify is not None:
data['spotify'] = {
'access_token': self.spotify.access_token,
'token_type': self.spotify.token_type,
'refresh_token': self.spotify.refresh_token,
'scope': self.spotify.scope
}
json.dump(data, f)
def add_text(self, tag, text_string, global_step=None):
"""Add text data to summary.
Args:
tag (string): Data identifier
text_string (string): String to save
global_step (int): Global step value to record
Examples::
writer.add_text('lstm', 'This is an lstm', 0)
writer.add_text('rnn', 'This is an rnn', 10)
"""
self.file_writer.add_summary(text(tag, text_string), global_step)
if tag not in self.text_tags:
self.text_tags.append(tag)
extensionDIR = self.file_writer.get_logdir() + '/plugins/tensorboard_text/'
if not os.path.exists(extensionDIR):
os.makedirs(extensionDIR)
with open(extensionDIR + 'tensors.json', 'w') as fp:
json.dump(self.text_tags, fp)
def set(self, url, content):
f = LockedFile(self._file, 'r+', 'r')
try:
f.open_and_lock()
if f.is_locked():
cache = _read_or_initialize_cache(f)
cache[url] = (content, _to_timestamp(datetime.datetime.now()))
# Remove stale cache.
for k, (_, timestamp) in list(cache.items()):
if _to_timestamp(datetime.datetime.now()) >= timestamp + self._max_age:
del cache[k]
f.file_handle().truncate(0)
f.file_handle().seek(0)
json.dump(cache, f.file_handle())
else:
logger.debug('Could not obtain a lock for the cache file.')
except Exception as e:
logger.warning(e, exc_info=True)
finally:
f.unlock_and_close()
def apply_settings(self, widget, data=None):
# ???????? ????, ???? ?? ??????
self.window.hide()
if self.white_bg_check.get_active():
self.data['properties']['white_bg'] = True
else:
self.data['properties']['white_bg'] = False
if self.auto_levels_check.get_active():
self.data['properties']['auto_levels'] = True
else:
self.data['properties']['auto_levels'] = False
self.data['properties']['resolution'] = int(self.resolution_cb.get_active_text())
config = open(self.path, 'wb')
json.dump(self.data, config, indent=3)
config.close()
gtk.main_quit()
# ??? ??????? ???????????? ?????????????? ?????????? ???????
def register(self, name, serializer):
"""Register ``serializer`` object under ``name``.
Raises :class:`AttributeError` if ``serializer`` in invalid.
.. note::
``name`` will be used as the file extension of the saved files.
:param name: Name to register ``serializer`` under
:type name: ``unicode`` or ``str``
:param serializer: object with ``load()`` and ``dump()``
methods
"""
# Basic validation
getattr(serializer, 'load')
getattr(serializer, 'dump')
self._serializers[name] = serializer