def test_generate_conf_from_json():
conf = API.get_conf(name='global')
json = conf.to_json()
conf2 = API.generate_object(json)
assert(conf2.__class__.__name__ == conf.__class__.__name__)
for key in conf.data.keys():
v = conf.data[key]
v2 = conf2.data[key]
if type(v) == types.ListType:
assert(len(v) == len(v2))
for s in v:
assert(v2.count(s) == 1)
elif type(v) == types.DictType:
for key in v.keys():
assert(str(v[key]) == str(v2[key]))
else:
assert(str(v) == str(v2))
python类DictType()的实例源码
def unparse(self,dn,record):
"""
dn
string-representation of distinguished name
record
Either a dictionary holding the LDAP entry {attrtype:record}
or a list with a modify list like for LDAPObject.modify().
"""
if not record:
# Simply ignore empty records
return
# Start with line containing the distinguished name
self._unparseAttrTypeandValue('dn',dn)
# Dispatch to record type specific writers
if isinstance(record,types.DictType):
self._unparseEntryRecord(record)
elif isinstance(record,types.ListType):
self._unparseChangeRecord(record)
else:
raise ValueError, "Argument record must be dictionary or list"
# Write empty line separating the records
self._output_file.write(self._line_sep)
# Count records written
self.records_written = self.records_written+1
return # unparse()
def append(self, argument, typehint=None):
"""Appends data to the message, updating the typetags based on
the argument's type. If the argument is a blob (counted
string) pass in 'b' as typehint.
'argument' may also be a list or tuple, in which case its elements
will get appended one-by-one, all using the provided typehint
"""
if type(argument) == types.DictType:
argument = argument.items()
elif isinstance(argument, OSCMessage):
raise TypeError("Can only append 'OSCMessage' to 'OSCBundle'")
if hasattr(argument, '__iter__'):
for arg in argument:
self.append(arg, typehint)
return
if typehint == 'b':
binary = OSCBlob(argument)
tag = 'b'
elif typehint == 't':
binary = OSCTimeTag(argument)
tag = 't'
else:
tag, binary = OSCArgument(argument, typehint)
self.typetags += tag
self.message += binary
def _check_callback(self):
if self.action == "callback":
if not hasattr(self.callback, '__call__'):
raise OptionError(
"callback not callable: %r" % self.callback, self)
if (self.callback_args is not None and
type(self.callback_args) is not types.TupleType):
raise OptionError(
"callback_args, if supplied, must be a tuple: not %r"
% self.callback_args, self)
if (self.callback_kwargs is not None and
type(self.callback_kwargs) is not types.DictType):
raise OptionError(
"callback_kwargs, if supplied, must be a dict: not %r"
% self.callback_kwargs, self)
else:
if self.callback is not None:
raise OptionError(
"callback supplied (%r) for non-callback option"
% self.callback, self)
if self.callback_args is not None:
raise OptionError(
"callback_args supplied for non-callback option", self)
if self.callback_kwargs is not None:
raise OptionError(
"callback_kwargs supplied for non-callback option", self)
def __cmp__(self, other):
if isinstance(other, Values):
return cmp(self.__dict__, other.__dict__)
elif isinstance(other, types.DictType):
return cmp(self.__dict__, other)
else:
return -1
def schedule(scheduler, runbook, target, config, dbc, logger):
''' Setup schedule for new runbooks and targets '''
# Default schedule (every minute)
task_schedule = {
'second' : 0,
'minute' : '*',
'hour' : '*',
'day' : '*',
'month' : '*',
'day_of_week' : '*'
}
# If schedule is present override default
if 'schedule' in target['runbooks'][runbook].keys():
if type(target['runbooks'][runbook]['schedule']) == types.DictType:
for key in target['runbooks'][runbook]['schedule'].keys():
task_schedule[key] = target['runbooks'][runbook]['schedule'][key]
elif type(target['runbooks'][runbook]['schedule']) == types.StringType:
breakdown = target['runbooks'][runbook]['schedule'].split(" ")
task_schedule = {
'second' : 0,
'minute' : breakdown[0],
'hour' : breakdown[1],
'day' : breakdown[2],
'month' : breakdown[3],
'day_of_week' : breakdown[4]
}
cron = CronTrigger(
second=task_schedule['second'],
minute=task_schedule['minute'],
hour=task_schedule['hour'],
day=task_schedule['day'],
month=task_schedule['month'],
day_of_week=task_schedule['day_of_week'],
)
return scheduler.add_job(
monitor,
trigger=cron,
args=[runbook, target, config, dbc, logger]
)
def validate_request(request):
if not isinstance(request, types.DictType):
return Fault(-32600, 'Request must be {}, not %s.' % type(request))
rpcid = request.get('id', None)
version = get_version(request)
if not version:
return Fault(-32600, 'Request %s invalid.' % request, rpcid=rpcid)
request.setdefault('params', [])
method = request.get('method', None)
params = request.get('params')
param_types = (types.ListType, types.DictType, types.TupleType)
if not method or type(method) not in types.StringTypes or type(params) not in param_types:
return Fault(-32600, 'Invalid request parameters or method.', rpcid=rpcid)
return True
def _get_key_and_headers(keys, rows):
if keys is None:
if len(rows) == 0:
keys = []
else:
r0 = rows[0]
if type(r0) == types.DictType:
keys = r0.keys()
keys.sort()
elif type(r0) in listtype:
keys = [i for i in range(len(r0))]
else:
keys = ['']
_keys = []
column_headers = []
for k in keys:
if type(k) not in listtype:
k = [k, k]
_keys.append(k[0])
column_headers.append(str(k[1]))
return _keys, column_headers
def __call__(self, query_result):
if not query_result:
return {}
row_0 = query_result[0]
if isinstance(row_0,
(types.ListType, types.TupleType, types.DictType)):
return {row[self._key_index]: row for row in query_result}
return {getattr(row, self._key_index): row for row in query_result}
def __init__(self, msg):
"""
:param msg ???????????????key?locale???
"""
if isinstance(msg, types.DictType):
country_code, _ = locale.getlocale(locale.LC_ALL)
msg = msg[country_code]
if isinstance(msg, unicode):
super(GirlFriendException, self).__init__(msg.encode("utf-8"))
self.msg = msg
elif isinstance(msg, str):
super(GirlFriendException, self).__init__(msg)
self.msg = msg.decode("utf-8")
else:
raise TypeError
def append(self, row):
if not isinstance(row, types.DictType):
raise InvalidTypeException(u"DictTable????????????")
if len(row) != self.column_num:
raise InvalidSizeException(
u"??????{0}???????{1}??????".format(len(row), self.column_num)
)
self._data.append(row)
def _execute(self, context, template_args):
args = self._get_runtime_args(context, template_args)
# ???????
executable = self._get_executable(context)
result = None
if args is None:
result = executable(context)
elif isinstance(args, SequenceCollectionType):
result = executable(context, *args)
elif isinstance(args, types.DictType):
result = executable(context, **args)
return result
def _execute(self, context, sub_args):
executable = self._get_executable(context)
results = []
for args in sub_args:
if self._error_break: # ??????????
return
try:
result = None
if args is None:
result = executable(context)
elif isinstance(args, SequenceCollectionType):
result = executable(context, *args)
elif isinstance(args, types.DictType):
result = executable(context, **args)
except Exception as e:
context.logger.exception(
u"????'{}'???????????????'{}'".format(
self.name, self._error_action))
if self._error_action == "stop":
# rethrow????????
self._error_break = True # ???????
raise e
elif self._error_action == "continue":
# ???????error_handler
if self._error_handler is not None:
exc_type, exc_value, tb = sys.exc_info()
self._error_handler(context, exc_type, exc_value, tb)
# ??????????None???????
results.append(self._error_default_value)
else:
results.append(result)
if self._sub_join is not None:
return self._sub_join(context, results)
return results
def _check_callback(self):
if self.action == "callback":
if not hasattr(self.callback, '__call__'):
raise OptionError(
"callback not callable: %r" % self.callback, self)
if (self.callback_args is not None and
type(self.callback_args) is not types.TupleType):
raise OptionError(
"callback_args, if supplied, must be a tuple: not %r"
% self.callback_args, self)
if (self.callback_kwargs is not None and
type(self.callback_kwargs) is not types.DictType):
raise OptionError(
"callback_kwargs, if supplied, must be a dict: not %r"
% self.callback_kwargs, self)
else:
if self.callback is not None:
raise OptionError(
"callback supplied (%r) for non-callback option"
% self.callback, self)
if self.callback_args is not None:
raise OptionError(
"callback_args supplied for non-callback option", self)
if self.callback_kwargs is not None:
raise OptionError(
"callback_kwargs supplied for non-callback option", self)
def __cmp__(self, other):
if isinstance(other, Values):
return cmp(self.__dict__, other.__dict__)
elif isinstance(other, types.DictType):
return cmp(self.__dict__, other)
else:
return -1
def getSource(self):
#XXX make state be foo=bar instead of a dict.
if self.stateIsDict:
stateDict = self.state
elif isinstance(self.state, Ref) and isinstance(self.state.obj, types.DictType):
stateDict = self.state.obj
else:
stateDict = None
if stateDict is not None:
try:
return "Instance(%r, %s)" % (self.klass, dictToKW(stateDict))
except NonFormattableDict:
return "Instance(%r, %s)" % (self.klass, prettify(stateDict))
return "Instance(%r, %s)" % (self.klass, prettify(self.state))
def prettify(obj):
if hasattr(obj, 'getSource'):
return obj.getSource()
else:
#basic type
t = type(obj)
if t in _SIMPLE_BUILTINS:
return repr(obj)
elif t is types.DictType:
out = ['{']
for k,v in obj.items():
out.append('\n\0%s: %s,' % (prettify(k), prettify(v)))
out.append(len(obj) and '\n\0}' or '}')
return string.join(out, '')
elif t is types.ListType:
out = ["["]
for x in obj:
out.append('\n\0%s,' % prettify(x))
out.append(len(obj) and '\n\0]' or ']')
return string.join(out, '')
elif t is types.TupleType:
out = ["("]
for x in obj:
out.append('\n\0%s,' % prettify(x))
out.append(len(obj) and '\n\0)' or ')')
return string.join(out, '')
else:
raise TypeError("Unsupported type %s when trying to prettify %s." % (t, obj))
def list_modules(self):
def tinsert(to, _from):
if type(_from) == DictType:
for i in xrange(len(_from)):
tinsert(to, _from[i])
else:
to.update(_from)
modules = self
if self.modules:
for i in xrange(len(self.modules)):
modulas = self.modules[i].list_modules()
if modulas:
tinsert(modules, modulas)
return modules
def validate_request(request):
if not isinstance(request, types.DictType):
return Fault(-32600, 'Request must be {}, not %s.' % type(request))
rpcid = request.get('id', None)
version = get_version(request)
if not version:
return Fault(-32600, 'Request %s invalid.' % request, rpcid=rpcid)
request.setdefault('params', [])
method = request.get('method', None)
params = request.get('params')
param_types = (types.ListType, types.DictType, types.TupleType)
if not method or type(method) not in types.StringTypes or type(params) not in param_types:
return Fault(-32600, 'Invalid request parameters or method.', rpcid=rpcid)
return True
def merge(x,y):
# store a copy of x, but overwrite with y's values where applicable
merged = dict(x,**y)
xkeys = x.keys()
# if the value of merged[key] was overwritten with y[key]'s value
# then we need to put back any missing x[key] values
for key in xkeys:
# if this key is a dictionary, recurse
if type(x[key]) is types.DictType and y.has_key(key):
merged[key] = merge(x[key],y[key])
return merged