def _check_value_recursively(key, val, haystack):
"""
Check **resursively** if there is a given key with a given value in the
given dictionary.
..warning:
This is geared at JSON dictionaries, so some corner cases are ignored,
we assume all iterables are either arrays or dicts
:param key: the key to look for
:param val: value to look for
:param haystack: the dictionary
"""
if isinstance(haystack, list):
return any([_check_value_recursively(key, val, l) for l in haystack])
elif isinstance(haystack, dict):
if not key in haystack:
return any([_check_value_recursively(key, val, d) for k, d in haystack.items()
if isinstance(d, list) or isinstance(d, dict)])
else:
return haystack[key] == val
else:
return False
python类dict()的实例源码
def check_assertions(self, checklist, continue_on_error=False):
"""Check a set of assertions
@param all boolean if False, stop at first failure
@return: False if any assertion fails.
"""
assert isinstance(checklist, dict) and 'checks' in checklist
retval = None
retlist = []
for assertion in checklist['checks']:
retval = self.check_assertion(**assertion)
check = copy.deepcopy(assertion)
check['result']=retval.success
check['errormsg']=retval.errormsg
retlist.append(check)
if not retval.success and not continue_on_error:
return retlist
return retlist
def _check_value_recursively(key, val, haystack):
"""
Check if there is key _key_ with value _val_ in the given dictionary.
..warning:
This is geared at JSON dictionaries, so some corner cases are ignored,
we assume all iterables are either arrays or dicts
"""
if isinstance(haystack, list):
return any([_check_value_recursively(key, val, l) for l in haystack])
elif isinstance(haystack, dict):
if not key in haystack:
return any([_check_value_recursively(key, val, d) for k, d in haystack.items()
if isinstance(d, list) or isinstance(d, dict)])
else:
return haystack[key] == val
else:
return False
def check_assertions(self, checklist, all=False):
"""Check a set of assertions
@param all boolean if False, stop at first failure
@return: False if any assertion fails.
"""
assert isinstance(checklist, dict) and 'checks' in checklist
retval = None
retlist = []
for assertion in checklist['checks']:
retval = self.check_assertion(**assertion)
retlist.append(retval)
if not retval.success and not all:
print "Error message:", retval[3]
return retlist
return retlist
def openscope(self, customlocals=None):
'''Opens a new (embedded) scope.
Args:
customlocals (dict): By default, the locals of the embedding scope
are visible in the new one. When this is not the desired
behaviour a dictionary of customized locals can be passed,
and those locals will become the only visible ones.
'''
self._locals_stack.append(self._locals)
self._globalrefs_stack.append(self._globalrefs)
if customlocals is not None:
self._locals = customlocals.copy()
elif self._locals is not None:
self._locals = self._locals.copy()
else:
self._locals = {}
self._globalrefs = set()
self._scope = self._globals.copy()
self._scope.update(self._locals)
def _check_value_recursively(key, val, haystack):
"""
Check **resursively** if there is a given key with a given value in the
given dictionary.
..warning:
This is geared at JSON dictionaries, so some corner cases are ignored,
we assume all iterables are either arrays or dicts
:param key: the key to look for
:param val: value to look for
:param haystack: the dictionary
"""
if isinstance(haystack, list):
return any([_check_value_recursively(key, val, l) for l in haystack])
elif isinstance(haystack, dict):
if not key in haystack:
return any([_check_value_recursively(key, val, d) for k, d in haystack.items()
if isinstance(d, list) or isinstance(d, dict)])
else:
return haystack[key] == val
else:
return False
def check_assertions(self, checklist, continue_on_error=False):
"""Check a set of assertions
@param all boolean if False, stop at first failure
@return: False if any assertion fails.
"""
assert isinstance(checklist, dict) and 'checks' in checklist
retval = None
retlist = []
for assertion in checklist['checks']:
retval = self.check_assertion(**assertion)
check = copy.deepcopy(assertion)
check['result']=retval.success
check['errormsg']=retval.errormsg
retlist.append(check)
if not retval.success and not continue_on_error:
return retlist
return retlist
def __init__(self, dict=None, preserve=1):
"""Create an empty dictionary, or update from 'dict'."""
self.data = {}
self.preserve=preserve
if dict:
self.update(dict)
def update(self, dict):
"""Copy (key,value) pairs from 'dict'."""
for k,v in dict.items():
self[k] = v
def __init__(self, dict=None, **kwargs):
self._order = []
self.data = {}
if dict is not None:
if hasattr(dict,'keys'):
self.update(dict)
else:
for k,v in dict: # sequence
self[k] = v
if len(kwargs):
self.update(kwargs)
def dict(*a, **k):
import warnings
import __builtin__
warnings.warn('twisted.python.util.dict is deprecated. Use __builtin__.dict instead')
return __builtin__.dict(*a, **k)
def flazy(f, *a, **k):
sortedtuple, state = fcompose(__builtin__.sorted, __builtin__.tuple), {}
def lazy(*ap, **kp):
A, K = a+ap, sortedtuple(k.items() + kp.items())
return state[(A,K)] if (A,K) in state else state.setdefault((A,K), f(*A, **__builtin__.dict(k.items()+kp.items())))
return lazy
def reconstructor(cls, n):
if isinstance(n, types.FunctionType):
return lambda f: f
if isinstance(n, types.MethodType):
return lambda f: types.MethodType(f, n.im_self, n.im_class)
if isinstance(n, (staticmethod,classmethod)):
return lambda f: type(n)(f)
if isinstance(n, types.InstanceType):
return lambda f: types.InstanceType(type(n), dict(f.__dict__))
if isinstance(n, (types.TypeType,types.ClassType)):
return lambda f: type(n)(n.__name__, n.__bases__, dict(f.__dict__))
raise NotImplementedError, type(func)
def __init__(self, command, **kwds):
"""Creates a new instance that monitors subprocess.Popen(/command/), the created process starts in a paused state.
Keyword options:
env<dict> = os.environ -- environment to execute program with
cwd<str> = os.getcwd() -- directory to execute program in
shell<bool> = True -- whether to treat program as an argument to a shell, or a path to an executable
newlines<bool> = True -- allow python to tamper with i/o to convert newlines
show<bool> = False -- if within a windowed environment, open up a console for the process.
paused<bool> = False -- if enabled, then don't start the process until .start() is called
timeout<float> = -1 -- if positive, then raise a Queue.Empty exception at the specified interval.
"""
# default properties
self.__updater = None
self.__threads = weakref.WeakSet()
self.__kwds = kwds
self.commandline = command
import Queue
self.eventWorking = threading.Event()
self.__taskQueue = Queue.Queue()
self.__exceptionQueue = Queue.Queue()
self.stdout = kwds.pop('stdout')
self.stderr = kwds.pop('stderr')
# start the process
not kwds.get('paused',False) and self.start(command)
def start(self, command=None, **options):
"""Start the specified ``command`` with the requested **options"""
if self.running:
raise OSError("Process {:d} is still running.".format(self.id))
if self.updater or len(self.threads):
raise OSError("Process {:d} management threads are still running.".format(self.id))
kwds = dict(self.__kwds)
kwds.update(options)
command = command or self.commandline
env = kwds.get('env', os.environ)
cwd = kwds.get('cwd', os.getcwd())
newlines = kwds.get('newlines', True)
shell = kwds.get('shell', False)
stdout,stderr = options.pop('stdout',self.stdout),options.pop('stderr',self.stderr)
self.program = process.subprocess(command, cwd, env, newlines, joined=(stderr is None) or stdout == stderr, shell=shell, show=kwds.get('show', False))
self.commandline = command
self.eventWorking.clear()
# monitor program's i/o
self.__start_monitoring(stdout, stderr)
self.__start_updater(timeout=kwds.get('timeout',-1))
# start monitoring
self.eventWorking.set()
return self
def _get_restricted_builtins(cls):
bidict = dict(cls._RESTRICTED_BUILTINS)
major = sys.version_info[0]
if major == 2:
bidict['True'] = True
bidict['False'] = False
return bidict
def process_file(self, infile, outfile=None):
'''Processes input file and writes result to output file.
Args:
infile (str): Name of the file to read and process. If its value is
'-', input is read from stdin.
outfile (str, optional): Name of the file to write the result to.
If its value is '-', result is written to stdout. If not
present, result will be returned as string.
env (dict, optional): Additional definitions for the evaluator.
Returns:
str: Result of processed input, if no outfile was specified.
'''
infile = STDIN if infile == '-' else infile
output = self._preprocessor.process_file(infile)
if outfile is None:
return output
else:
if outfile == '-':
outfile = sys.stdout
else:
outfile = _open_output_file(outfile, self._create_parent_folder)
outfile.write(output)
if outfile != sys.stdout:
outfile.close()
def process_text(self, txt):
'''Processes a string.
Args:
txt (str): String to process.
env (dict, optional): Additional definitions for the evaluator.
Returns:
str: Processed content.
'''
return self._preprocessor.process_text(txt)
def __init__(self, dict=None, preserve=1):
"""Create an empty dictionary, or update from 'dict'."""
self.data = {}
self.preserve=preserve
if dict:
self.update(dict)
def update(self, dict):
"""Copy (key,value) pairs from 'dict'."""
for k,v in dict.items():
self[k] = v
def __init__(self, dict=None, **kwargs):
self._order = []
self.data = {}
if dict is not None:
if hasattr(dict,'keys'):
self.update(dict)
else:
for k,v in dict: # sequence
self[k] = v
if len(kwargs):
self.update(kwargs)
def dict(*a, **k):
import warnings
import __builtin__
warnings.warn('twisted.python.util.dict is deprecated. Use __builtin__.dict instead')
return __builtin__.dict(*a, **k)