def get_configuration(self, filename=None):
"""Check the json file for changes and load it if needed."""
if not filename:
filename = CONF.scheduler_json_config_location
if not filename:
return self.data
if self.last_checked:
now = self._get_time_now()
if now - self.last_checked < datetime.timedelta(minutes=5):
return self.data
last_modified = self._get_file_timestamp(filename)
if (not last_modified or not self.last_modified or
last_modified > self.last_modified):
self.data = self._load_file(self._get_file_handle(filename))
self.last_modified = last_modified
if not self.data:
self.data = {}
return self.data
python类load()的实例源码
scheduler_options.py 文件源码
项目:Trusted-Platform-Module-nova
作者: BU-NU-CLOUD-SP16
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
vendordata_json.py 文件源码
项目:Trusted-Platform-Module-nova
作者: BU-NU-CLOUD-SP16
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def __init__(self, *args, **kwargs):
super(JsonFileVendorData, self).__init__(*args, **kwargs)
data = {}
fpath = CONF.vendordata_jsonfile_path
logprefix = "%s[%s]:" % (file_opt.name, fpath)
if fpath:
try:
with open(fpath, "r") as fp:
data = jsonutils.load(fp)
except IOError as e:
if e.errno == errno.ENOENT:
LOG.warning(_LW("%(logprefix)s file does not exist"),
{'logprefix': logprefix})
else:
LOG.warning(_LW("%(logprefix)s unexpected IOError when "
"reading"), {'logprefix': logprefix})
raise e
except ValueError:
LOG.warning(_LW("%(logprefix)s failed to load json"),
{'logprefix': logprefix})
raise
self._data = data
def run():
# REVISIT(ivc): current CNI implementation provided by this package is
# experimental and its primary purpose is to enable development of other
# components (e.g. functional tests, service/LBaaSv2 support)
cni_conf = utils.CNIConfig(jsonutils.load(sys.stdin))
args = ['--config-file', cni_conf.kuryr_conf]
try:
if cni_conf.debug:
args.append('-d')
except AttributeError:
pass
config.init(args)
config.setup_logging()
# Initialize o.vo registry.
k_objects.register_locally_defined_vifs()
os_vif.initialize()
if CONF.cni_daemon.daemon_enabled:
runner = cni_api.CNIDaemonizedRunner()
else:
runner = cni_api.CNIStandaloneRunner(K8sCNIPlugin())
LOG.info("Using '%s' ", runner.__class__.__name__)
def _timeout(signum, frame):
runner._write_dict(sys.stdout, {
'msg': 'timeout',
'code': k_const.CNI_TIMEOUT_CODE,
})
LOG.debug('timed out')
sys.exit(1)
signal.signal(signal.SIGALRM, _timeout)
signal.alarm(_CNI_TIMEOUT)
status = runner.run(os.environ, cni_conf, sys.stdout)
LOG.debug("Exiting with status %s", status)
if status:
sys.exit(status)
def test_playbook_persistence(self):
r_playbook = m.Playbook.query.first()
tmpfile = os.path.join(self.app.config['ARA_TMP_DIR'], 'ara.json')
with open(tmpfile, 'rb') as file:
data = jsonutils.load(file)
self.assertEqual(r_playbook.id, data['playbook']['id'])
def test_playbook_persistence(self):
r_playbook = m.Playbook.query.first()
tmpfile = os.path.join(self.app.config['ARA_TMP_DIR'], 'ara.json')
with open(tmpfile, 'rb') as file:
data = jsonutils.load(file)
self.assertEqual(r_playbook.id, data['playbook']['id'])
scheduler_options.py 文件源码
项目:Trusted-Platform-Module-nova
作者: BU-NU-CLOUD-SP16
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def _load_file(self, handle):
"""Decode the JSON file. Broken out for testing."""
try:
return jsonutils.load(handle)
except ValueError:
LOG.exception(_LE("Could not decode scheduler options"))
return {}
policy_fixture.py 文件源码
项目:Trusted-Platform-Module-nova
作者: BU-NU-CLOUD-SP16
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def _prepare_policy(self):
policy = jsonutils.load(open(CONF.oslo_policy.policy_file))
# Convert all actions to require specified role
for action, rule in six.iteritems(policy):
policy[action] = 'role:%s' % self.role
self.policy_dir = self.useFixture(fixtures.TempDir())
self.policy_file = os.path.join(self.policy_dir.path,
'policy.json')
with open(self.policy_file, 'w') as f:
jsonutils.dump(policy, f)
def smart_config(conf):
# list existing compute nodes
json_conf = jsonutils.load(open(conf))
compute_nodes = json_conf['compute']
network_controllers = json_conf['network']
for c in compute_nodes:
print("[+]\tFound Compute {}".format(c))
for c in network_controllers:
print("[+]\tFound Network {}".format(c))
# create a role and user per compute node and per network node
for compute_node, param in compute_nodes.items():
rolename = param['role']
username = param['username']
password = param['password']
create_role(rolename)
create_user(username, password)
set_user_role(username, rolename)
print("[+] creating user '{}', role '{}'".format(
username, rolename))
# PERMISSIONS
set_role_permission(
rolename,
"/networking-vpp/nodes/{}/*".format(compute_node), "read")
set_role_permission(
rolename,
"/networking-vpp/state/{}/*".format(compute_node), "readwrite")
for network_controller, param in network_controllers.items():
rolename = param['role']
username = param['username']
password = param['password']
create_role(rolename)
create_user(username, password)
print("[+] creating user '{}', role '{}'".format(
username, rolename))
set_user_role(username, rolename)
# PERMISSION
set_role_permission(
rolename,
"/networking-vpp/nodes/*", "readwrite")
set_role_permission(
rolename,
"/networking-vpp/state/*", "read")
if click.confirm('Enable ETCD authentication ?'):
print("[*] Enabling ETCD authentication")
enable_authentication()