def requests_with_cache(dir):
def decorator(func):
def wrapper(**kwargs):
cache_key = str(kwargs.get("param", "default.json"))
cache_url = dir + "/" + cache_key.replace("/", "-").replace("_", "-")
if os.path.isfile(cache_url):
with open(cache_url, 'r') as f:
print(cache_url)
return json.load(f)
with open(cache_url, 'w+') as f:
ret = func(**kwargs)
json.dump(ret, f)
return ret
return wrapper
return decorator
python类load()的实例源码
def main():
if len(sys.argv) == 1:
infile = sys.stdin
outfile = sys.stdout
elif len(sys.argv) == 2:
infile = open(sys.argv[1], 'rb')
outfile = sys.stdout
elif len(sys.argv) == 3:
infile = open(sys.argv[1], 'rb')
outfile = open(sys.argv[2], 'wb')
else:
raise SystemExit(sys.argv[0] + " [infile [outfile]]")
try:
obj = json.load(infile)
except ValueError, e:
raise SystemExit(e)
json.dump(obj, outfile, sort_keys=True, indent=4)
outfile.write('\n')
def import_policy(cb, parser, args):
p = cb.create(Policy)
p.policy = json.load(open(args.policyfile, "r"))
p.description = args.description
p.name = args.name
p.priorityLevel = args.prioritylevel
p.version = 2
try:
p.save()
except ServerError as se:
print("Could not add policy: {0}".format(str(se)))
except Exception as e:
print("Could not add policy: {0}".format(str(e)))
else:
print("Added policy. New policy ID is {0}".format(p.id))
def create_graph():
logfile = 'result/log'
xs = []
ys = []
ls = []
f = open(logfile, 'r')
data = json.load(f)
print(data)
for d in data:
xs.append(d["iteration"])
ys.append(d["main/accuracy"])
ls.append(d["main/loss"])
plt.clf()
plt.cla()
plt.hlines(1, 0, np.max(xs), colors='r', linestyles="dashed") # y=-1, 1??????
plt.title(r"loss/accuracy")
plt.plot(xs, ys, label="accuracy")
plt.plot(xs, ls, label="loss")
plt.legend()
plt.savefig("result/log.png")
def associate_lambda(group_config, lambda_config):
"""
Associate the Lambda described in the `lambda_config` with the
Greengrass Group described by the `group_config`
:param group_config: `gg_group_setup.GroupConfigFile` to store the group
:param lambda_config: the configuration describing the Lambda to
associate with the Greengrass Group
:return:
"""
with open(lambda_config, "r") as f:
cfg = json.load(f)
config = GroupConfigFile(config_file=group_config)
lambdas = config['lambda_functions']
lambdas[cfg['func_name']] = {
'arn': cfg['lambda_arn'],
'arn_qualifier': cfg['lambda_alias']
}
config['lambda_functions'] = lambdas
def load_cache(self, file):
import json
try:
with open(file, 'r') as f:
self.cache = json.load(f)
except:
# Fail silently
pass
def editPipeline(args, config):
pipelineDbUtils = PipelineDbUtils(config)
request = json.loads(pipelineDbUtils.getJobInfo(select=["request"], where={"job_id": args.jobId})[0].request)
_, tmp = mkstemp()
with open(tmp, 'w') as f:
f.write("{data}".format(data=json.dumps(request, indent=4)))
if "EDITOR" in os.environ.keys():
editor = os.environ["EDITOR"]
else:
editor = "/usr/bin/nano"
if subprocess.call([editor, tmp]) == 0:
with open(tmp, 'r') as f:
request = json.load(f)
pipelineDbUtils.updateJob(args.jobId, keyName="job_id", setValues={"request": json.dumps(request)})
else:
print "ERROR: there was a problem editing the request"
exit(-1)
def load_messages():
filename = 'guideline_content.json'
file_path = ('%s/guideline/%s' % (
os.path.dirname(os.path.realpath(__file__)),
filename
))
with open(file_path) as json_data:
data = json.load(json_data)
return data["body"]["messages"]
def load_guidelines(guideline_key):
filename = 'guideline_%s.json' % guideline_key
file_path = ('%s/guideline/%s' % (
os.path.dirname(os.path.realpath(__file__)),
filename
))
with open(file_path) as json_data:
data = json.load(json_data)
return data
def _load_ready_file(self):
if self._ready is not None:
return
if os.path.exists(self._ready_file):
with open(self._ready_file) as fp:
self._ready = set(json.load(fp))
else:
self._ready = set()
def get_matchmaker_map(mm_file='/etc/oslo/matchmaker_ring.json'):
mm_map = {}
if os.path.isfile(mm_file):
with open(mm_file, 'r') as f:
mm_map = json.load(f)
return mm_map
def _git_yaml_load(projects_yaml):
"""
Load the specified yaml into a dictionary.
"""
if not projects_yaml:
return None
return yaml.load(projects_yaml)
def get_id_set(lang_codes):
feature_database = np.load("family_features.npz")
lang_codes = [ get_language_code(l, feature_database) for l in lang_codes ]
all_languages = list(feature_database["langs"])
feature_names = [ "ID_" + l.upper() for l in all_languages ]
values = np.zeros((len(lang_codes), len(feature_names)))
for i, lang_code in enumerate(lang_codes):
feature_index = get_language_index(lang_code, feature_database)
values[i, feature_index] = 1.0
return feature_names, values
def generate(location):
# cli wizard for creating a new contract from a template
if directory_has_smart_contract(location):
example_payload = json.load(open(glob.glob(os.path.join(location, '*.json'))[0]))
print(example_payload)
for k, v in example_payload.items():
value = input(k + ':')
if value != '':
example_payload[k] = value
print(example_payload)
code_path = glob.glob(os.path.join(location, '*.tsol'))
tsol.compile(open(code_path[0]), example_payload)
print('Code compiles with new payload.')
selection = ''
while True:
selection = input('(G)enerate Solidity contract or (E)xport implementation:')
if selection.lower() == 'g':
output_name = input('Name your contract file without an extension:')
code = tsol.generate_code(open(code_path[0]).read(), example_payload)
open(os.path.join(location, '{}.sol'.format(output_name)), 'w').write(code)
break
if selection.lower() == 'e':
output_name = input('Name your implementation file without an extension:')
json.dump(example_payload, open(os.path.join(location, '{}.json'.format(output_name)), 'w'))
break
else:
print('Provided directory does not contain a *.tsol and *.json or does not compile.')
def __init__(self, path=None, filename=None):
self.kafka_offset_spec_file = os.path.join(
(path or "/tmp/"), (filename or 'kafka_offset_specs.json'))
self._kafka_offsets = {}
if os.path.exists(self.kafka_offset_spec_file):
try:
f = open(self.kafka_offset_spec_file)
kafka_offset_dict = json.load(f)
for key, value in kafka_offset_dict.items():
log.info("Found offset %s: %s", key, value)
self._kafka_offsets[key] = OffsetSpec(
app_name=value.get('app_name'),
topic=value.get('topic'),
partition=value.get('partition'),
from_offset=value.get('from_offset'),
until_offset=value.get('until_offset'),
batch_time=value.get('batch_time'),
last_updated=value.get('last_updated'),
revision=value.get('revision')
)
except Exception:
log.info('Invalid or corrupts offsets file found at %s,'
' starting over' % self.kafka_offset_spec_file)
else:
log.info('No kafka offsets found at startup')
def load_offset_file_as_json(self, file_path):
with open(file_path, 'r') as f:
json_file = json.load(f)
return json_file
def register(self, name, serializer):
"""Register ``serializer`` object under ``name``.
Raises :class:`AttributeError` if ``serializer`` in invalid.
.. note::
``name`` will be used as the file extension of the saved files.
:param name: Name to register ``serializer`` under
:type name: ``unicode`` or ``str``
:param serializer: object with ``load()`` and ``dump()``
methods
"""
# Basic validation
getattr(serializer, 'load')
getattr(serializer, 'dump')
self._serializers[name] = serializer
def load(cls, file_obj):
"""Load serialized object from open JSON file.
.. versionadded:: 1.8
:param file_obj: file handle
:type file_obj: ``file`` object
:returns: object loaded from JSON file
:rtype: object
"""
return json.load(file_obj)
def load(cls, file_obj):
"""Load serialized object from open pickle file.
.. versionadded:: 1.8
:param file_obj: file handle
:type file_obj: ``file`` object
:returns: object loaded from pickle file
:rtype: object
"""
return cPickle.load(file_obj)
def load(cls, file_obj):
"""Load serialized object from open pickle file.
.. versionadded:: 1.8
:param file_obj: file handle
:type file_obj: ``file`` object
:returns: object loaded from pickle file
:rtype: object
"""
return pickle.load(file_obj)