def test_config_load(self):
Config.load()
config = InlineClass({
'debug': True,
'fallball_service_url': 'PUT_HERE_FALLBALL_SERVICE_URI',
'fallball_service_authorization_token': 'PUT_HERE_FALLBALL_SERVICE_AUTHORIZATION_TOKEN',
'oauth_key': 'PUT_HERE_OAUTH_KEY',
'oauth_secret': 'PUT_HERE_OAUTH_SECRET',
'diskspace_resource': 'DISKSPACE',
'devices_resource': 'DEVICES',
})
for (key, value) in viewitems(config.__dict__):
self.assertTrue(hasattr(Config, key))
Config.conf_file = 'tests/fake_config_invalid.json'
with self.assertRaises(RuntimeError):
Config.load()
Config.conf_file = 'not_exists'
with self.assertRaises(IOError):
Config.load()
python类viewitems()的实例源码
def __iter__(self):
while self.index < self.total_iterations:
idx = self.index % self.nbatches
self.index += 1
dict = {}
for k, x in viewitems(self.data_arrays):
if k == 'inp_txt' or k == 'teacher_tgt':
dict[k] = np.squeeze(x[:, idx:(idx + 1), :, :])
else:
dict[k] = np.squeeze(x[:, idx:(idx + 1), :])
yield dict
def _merge_config(d, u):
for k, v in viewitems(u):
if isinstance(v, collections.Mapping):
r = _merge_config(d.get(k, {}), v)
d[k] = r
else:
d[k] = u[k]
return d
def __init__(self, options, *args, **kwargs):
self.args = args
self.kwargs = kwargs
self.opts = {}
self.cli_args = {}
self.cmds = {}
self.spinner = None
self.register = {}
user_agent = BaseCommand.get_user_agent()
for (k, v) in options.items():
if v is not None:
cli_arg = re.search('\<([\w\-]+)\>', k)
cli_opt = re.search('^\-\-([a-zA-Z0-9]+[\w\-]+)', k)
if cli_arg is not None:
self.cli_args[cli_arg.group(1)] = v
elif cli_opt is not None:
self.opts[cli_opt.group(1)] = v
else:
self.cmds[k] = v
safe_log_opts = {k: v for k, v in viewitems(self.opts) if k != 'pass' }
logging.debug('Args: {0}'.format(self.cli_args))
logging.debug('Opts: {0}'.format(safe_log_opts))
logging.debug('User-Agent: {0}'.format(user_agent))
host = self.get_config('host', 'ENVMGR_HOST')
user = self.get_config('user', 'ENVMGR_USER')
pwrd = self.get_password('pass', 'ENVMGR_PASS')
headers = {'User-Agent':user_agent}
envmgr.config(host, user, b64encode(pwrd.encode('ascii')), default_headers=headers)
def get(self):
res = dict()
for (name, child) in viewitems(self.children):
res[name] = child.get()
return res
def from_json(cls, session, json):
id = json.get('id')
fields = json.get('fields')
unique_fields = frozenset(json.get('unique_by', []))
def _build_field(name, value):
unique = name in unique_fields
return Field.from_schema(value, unique=unique)
fields = {f[0]: _build_field(f[0], f[1]) for f in viewitems(fields)}
return cls(session, id, fields)
def create(cls, session, id, fields):
json = {
'fields': {f[0]: f[1].to_schema() for f in viewitems(fields)}
}
unique_by = [f[0] for f in viewitems(fields) if f[1].unique]
if unique_by:
json['unique_by'] = unique_by
url = session.build_url('datasets', id)
response = session.put(url, json=json)
response.raise_for_status()
return cls.from_json(session, response.json())
def _build_data_json(self, data):
def _fields_json(entry):
fields = self.fields
return {f[0]: fields[f[0]].to_json(f[1])
for f in viewitems(entry)}
json = {
'data': [_fields_json(entry) for entry in data]
}
return json
def yaml_load(filepath, **updates):
"""Loads a yaml file producing the corresponding Python dict (`safe_load`). Then:
1. normalizes non-absolute sqlite path values relative to `filepath`, if any
2. updates the dict values with `updqtes`
and returns the yaml dict.
:param updates: arguments which will updates the yaml dict before it is returned
"""
with open(filepath, 'r') as stream:
ret = yaml.safe_load(stream)
# convert sqlite into absolute paths, if any
configfilepath = abspath(dirname(filepath))
# convert relative sqlite path to absolute, assuming they are relative to the config:
sqlite_prefix = 'sqlite:///'
# we cannot modify a dict while in iteration, thus create a new dict of possibly
# modified sqlite paths and use later dict.update
newdict = {}
for k, v in viewitems(ret):
try:
if v.startswith(sqlite_prefix) and ":memory:" not in v:
dbpath = v[len(sqlite_prefix):]
if not isabs(dbpath):
newdict[k] = sqlite_prefix + abspath(normpath(join(configfilepath, dbpath)))
except AttributeError:
pass
newdict.update(updates)
ret.update(newdict)
return ret
def proxies(self):
""" Returns all the proxies registered in the server """
proxies = APIConsumer.get("/proxies").json()
proxies_dict = {}
for name, values in viewitems(proxies):
# Lets create a Proxy object to hold all its data
proxy = Proxy(**values)
# Add the new proxy to the toxiproxy proxies collection
proxies_dict.update({name: proxy})
return proxies_dict