def find_modules(self, type_c, container):
container = container or self
nodes = {}
containers = {}
mod_type = type(self)
if mod_type == type_c:
nodes[len(nodes)+1] = self
containers[len(containers)] = container
# Recurse on nodes with 'modules'
if self.modules is not None:
if type(self.modules) is DictType:
for i in xrange(len(self.modules)):
child = self.modules[i]
cur_nodes, cur_containers = child.find_modules(
type_c, self)
# This shouldn't happen
if not len(cur_nodes) == len(cur_containers):
raise Exception('Internal error: incorrect return length')
# add the list items from our child to our list (ie return a
# flattened table of the return nodes).
for j in xrange(len(cur_nodes)):
nodes[len(cur_nodes)+1] = cur_nodes[j]
containers[len(containers)+1] = cur_containers[j]
return nodes, containers
python类DictType()的实例源码
def append(self, argument, typehint = None):
"""Appends data to the bundle, creating an OSCMessage to encapsulate
the provided argument unless this is already an OSCMessage.
Any newly created OSCMessage inherits the OSCBundle's address at the time of creation.
If 'argument' is an iterable, its elements will be encapsuated by a single OSCMessage.
Finally, 'argument' can be (or contain) a dict, which will be 'converted' to an OSCMessage;
- if 'addr' appears in the dict, its value overrides the OSCBundle's address
- if 'args' appears in the dict, its value(s) become the OSCMessage's arguments
"""
if isinstance(argument, OSCMessage):
binary = OSCBlob(argument.getBinary())
else:
msg = OSCMessage(self.address)
if type(argument) == types.DictType:
if 'addr' in argument:
msg.setAddress(argument['addr'])
if 'args' in argument:
msg.append(argument['args'], typehint)
else:
msg.append(argument, typehint)
binary = OSCBlob(msg.getBinary())
self.message += binary
self.typetags += 'b'
def _setTarget(self, address, prefix=None, filters=None):
"""Add (i.e. subscribe) a new OSCTarget, or change the prefix for an existing OSCTarget.
- address ((host, port) tuple): IP-address & UDP-port
- prefix (string): The OSC-address prefix prepended to the address of each OSCMessage
sent to this OSCTarget (optional)
"""
if address not in self.targets.keys():
self.targets[address] = ["",{}]
if prefix != None:
if len(prefix):
# make sure prefix starts with ONE '/', and does not end with '/'
prefix = '/' + prefix.strip('/')
self.targets[address][0] = prefix
if filters != None:
if type(filters) in types.StringTypes:
(_, filters) = parseFilterStr(filters)
elif type(filters) != types.DictType:
raise TypeError("'filters' argument must be a dict with {addr:bool} entries")
self._updateFilters(self.targets[address][1], filters)
def parse(self, dict, header=TRUE):
"""parse(dict) -> string
This method parses the open file object passed, replacing any keys
found using the replacement dictionary passed."""
if type(dict) != types.DictType:
raise TypeError, "Second argument must be a dictionary"
if not self.template:
raise OpagMissingPrecondition, "template path is not set"
# Open the file if its not already open. If it is, seek to the
# beginning of the file.
if not self.template_file:
self.template_file = open(self.template, "r")
else:
self.template_file.seek(0)
# Instantiate a new bound method to do the replacement.
replacer = Replacer(dict).replace
# Read in the entire template into memory. I guess we'd better keep
# the templates a reasonable size if we're going to keep doing this.
buffer = self.template_file.read()
replaced = ""
if header:
replaced = "Content-Type: text/html\n\n"
replaced = replaced + re.sub("%%(\w+)%%", replacer, buffer)
return replaced
def _get_table_type(self, data):
# ?????????table??
if self._table_type is not None:
return self._table_type
# ???
if data is None or len(data) == 0:
return ListTable
# ??????????
row = data[0]
if isinstance(row, SequenceCollectionType):
return ListTable
elif isinstance(row, types.DictType):
return DictTable
else:
return ObjectTable
def get_event(self, block=1):
"""Return an Event instance. Returns None if |block| is false
and there is no event pending, otherwise waits for the
completion of an event."""
while 1:
if self.event_queue:
return self.event_queue.pop(0)
elif block:
pyg_event = pygame.event.wait()
else:
pyg_event = pygame.event.poll()
if pyg_event.type == NOEVENT:
return
if pyg_event.key in modcolors:
continue
k, c = self.tr_event(pyg_event)
self.cmd_buf += c.encode('ascii', 'replace')
self.k = k
if not isinstance(k, types.DictType):
e = Event(k, self.cmd_buf, [])
self.k = self.keymap
self.cmd_buf = ''
return e
def unwrap(value):
t = type(value)
if t is types.InstanceType and isinstance(value, DynamicProxy):
if pyro_daemon:
try:
return pyro_daemon.getLocalObject(value.objectID)
except KeyError:
pass
return value
elif t is types.ListType:
for i in range(len(value)):
value[i] = unwrap(value[i])
elif t is types.TupleType:
value = list(value)
for i in range(len(value)):
value[i] = unwrap(value[i])
return tuple(value)
elif t is types.DictType:
for k, v in value.items():
value[k] = unwrap(v)
return value
def __init__(self, initfile):
self.__initfile = initfile
self.__colordb = None
self.__optiondb = {}
self.__views = []
self.__red = 0
self.__green = 0
self.__blue = 0
self.__canceled = 0
# read the initialization file
fp = None
if initfile:
try:
try:
fp = open(initfile)
self.__optiondb = marshal.load(fp)
if not isinstance(self.__optiondb, DictType):
print >> sys.stderr, \
'Problem reading options from file:', initfile
self.__optiondb = {}
except (IOError, EOFError, ValueError):
pass
finally:
if fp:
fp.close()
def __init__(self, initfile):
self.__initfile = initfile
self.__colordb = None
self.__optiondb = {}
self.__views = []
self.__red = 0
self.__green = 0
self.__blue = 0
self.__canceled = 0
# read the initialization file
fp = None
if initfile:
try:
try:
fp = open(initfile)
self.__optiondb = marshal.load(fp)
if not isinstance(self.__optiondb, DictType):
print >> sys.stderr, \
'Problem reading options from file:', initfile
self.__optiondb = {}
except (IOError, EOFError, ValueError):
pass
finally:
if fp:
fp.close()
def unparse(self,dn,record):
"""
dn
string-representation of distinguished name
record
Either a dictionary holding the LDAP entry {attrtype:record}
or a list with a modify list like for LDAPObject.modify().
"""
if not record:
# Simply ignore empty records
return
# Start with line containing the distinguished name
self._unparseAttrTypeandValue('dn',dn)
# Dispatch to record type specific writers
if isinstance(record,types.DictType):
self._unparseEntryRecord(record)
elif isinstance(record,types.ListType):
self._unparseChangeRecord(record)
else:
raise ValueError, "Argument record must be dictionary or list"
# Write empty line separating the records
self._output_file.write(self._line_sep)
# Count records written
self.records_written = self.records_written+1
return # unparse()
def createSortDicDump(dic):
"""
Create a sorted ASCII representation of a dictionary. The order is sorted
according to the dictionary keys.
dic: Source dictionary (dictionary).
Returns: Sorted dictionary ASCII representation (string).
"""
if (type(dic) != types.DictType):
raise Exception, "Object given is not a dictionary"
keys = dic.keys()
keys.sort()
asciiDic = ""
for key in keys: asciiDic += ", '%s': '%s'" % (key, dic[key])
asciiDic = "{" + asciiDic[2:] + "}"
return asciiDic
def validate_request(request):
if not isinstance(request, dict):
fault = Fault(
-32600, 'Request must be {}, not %s.' % type(request)
)
return fault
rpcid = request.get('id', None)
version = get_version(request)
if not version:
fault = Fault(-32600, 'Request %s invalid.' % request, rpcid=rpcid)
return fault
request.setdefault('params', [])
method = request.get('method', None)
params = request.get('params')
param_types = (types.ListType, types.DictType, types.TupleType)
if not method or type(method) not in types.StringTypes or \
type(params) not in param_types:
fault = Fault(
-32600, 'Invalid request parameters or method.', rpcid=rpcid
)
return fault
return True
def QueryToGAP(req):
try:
qu = req.query
except:
qu = {}
if not types.DictType == type(qu):
qu = {}
res = cStringIO.StringIO()
res.write('GAPQUERY:=rec();;')
for k in qu.keys():
res.write('GAPQUERY.("')
if len(k) > 1000:
res.write(StringToGAP(k[:1000]))
else:
res.write(StringToGAP(k))
res.write('"):="')
res.write(StringToGAP(str(qu[k][0])))
res.write('";;')
res.write('\n')
s = res.getvalue()
res.close()
return s
def _escape_json(json):
"""Escapes all string fields of JSON data.
Operates recursively."""
t = type(json)
if t == types.StringType or t == types.UnicodeType:
return cgi.escape(json)
elif t == types.IntType:
return json
elif t == types.FloatType:
return json
elif t == types.DictType:
result = {}
for f in json.keys():
result[f] = _escape_json(json[f])
return result
elif t == types.ListType:
result = []
for f in json:
result.append(_escape_json(f))
return result
else:
raise RuntimeError, "Unsupported type: %s" % str(t)
def getrefs(i, depth):
"""Get the i'th object in memory, return objects that reference it"""
import sys, gc, types
o = sys.getobjects(i)[-1]
for d in range(depth):
for ref in gc.get_referrers(o):
if type(ref) in (types.ListType, types.DictType,
types.InstanceType):
if type(ref) is types.DictType and ref.has_key('copyright'):
continue
o = ref
break
else:
print "Max depth ", d
return o
return o
def append(self, argument, typehint = None):
"""Appends data to the bundle, creating an OSCMessage to encapsulate
the provided argument unless this is already an OSCMessage.
Any newly created OSCMessage inherits the OSCBundle's address at the time of creation.
If 'argument' is an iterable, its elements will be encapsuated by a single OSCMessage.
Finally, 'argument' can be (or contain) a dict, which will be 'converted' to an OSCMessage;
- if 'addr' appears in the dict, its value overrides the OSCBundle's address
- if 'args' appears in the dict, its value(s) become the OSCMessage's arguments
"""
if isinstance(argument, OSCMessage):
binary = OSCBlob(argument.getBinary())
else:
msg = OSCMessage(self.address)
if type(argument) == types.DictType:
if 'addr' in argument:
msg.setAddress(argument['addr'])
if 'args' in argument:
msg.append(argument['args'], typehint)
else:
msg.append(argument, typehint)
binary = OSCBlob(msg.getBinary())
self.message += binary
self.typetags += 'b'
def _setTarget(self, address, prefix=None, filters=None):
"""Add (i.e. subscribe) a new OSCTarget, or change the prefix for an existing OSCTarget.
- address ((host, port) tuple): IP-address & UDP-port
- prefix (string): The OSC-address prefix prepended to the address of each OSCMessage
sent to this OSCTarget (optional)
"""
if address not in self.targets.keys():
self.targets[address] = ["",{}]
if prefix != None:
if len(prefix):
# make sure prefix starts with ONE '/', and does not end with '/'
prefix = '/' + prefix.strip('/')
self.targets[address][0] = prefix
if filters != None:
if type(filters) in types.StringTypes:
(_, filters) = parseFilterStr(filters)
elif type(filters) != types.DictType:
raise TypeError("'filters' argument must be a dict with {addr:bool} entries")
self._updateFilters(self.targets[address][1], filters)
def main(ipa_bases, ipa_all, dia_defs, sort_order):
segments = read_ipa_bases(ipa_bases)
assert isinstance(segments, ListType)
diacritics, combinations = parse_dia_defs(dia_defs)
assert isinstance(diacritics, DictType)
assert isinstance(combinations, ListType)
all_segments = set(segments)
for diacritic in diacritics.values():
assert isinstance(diacritic, Diacritic)
for segment in segments:
new_seg = diacritic.apply(segment)
if new_seg is not None:
all_segments.add(new_seg)
for combination in combinations:
assert isinstance(combination, Combination)
for segment in segments:
new_seg = combination.apply(segment)
if new_seg is not None:
all_segments.add(new_seg)
write_ipa_all(ipa_bases, ipa_all, all_segments, sort_order)
def _checkNpcNames(self, name):
def match(npcName, name = name):
name = TextEncoder().encodeWtext(name)
name = string.strip(name)
return TextEncoder.upper(npcName) == TextEncoder.upper(name)
for npcId in NPCList.NPC_LIST.keys():
data = NPCList.NPC_LIST[npcId]
if type(data) is types.DictType and HumanDNA.HumanDNA.setName in data:
npcName = data[HumanDNA.HumanDNA.setName]
if (self.independent or not (self.main.isNPCEditor)) and match(npcName):
self.notify.info('name matches NPC name "%s"' % npcName)
return OL.NCGeneric
match(npcName)
def copy(self):
"""Return a copy of the inctance."""
if self.is_struct:
obj = Structure(set_all_auto=False)
elif self.is_traj:
obj = Trajectory(set_all_auto=False)
# Copy attrs over
for name in self.attr_lst:
val = getattr(self, name)
if val is None:
setattr(obj, name, None)
# dict.copy() is shallow, use deepcopy instead
elif hasattr(val, 'copy') and not isinstance(val, types.DictType):
setattr(obj, name, val.copy())
else:
setattr(obj, name, copy.deepcopy(val))
return obj
def __init__(self, deviceID, command, routine=False, force=False, source=None, send_event=True, notify=False, speech=False):
from core import sendCommand
self._deviceID = deviceID
self._command = command
self._routine = routine
self._force = force
self._notify = notify
self._source = source
self._speech = speech
self._send_event = send_event
self._simple = not isinstance(
command, types.DictType) if command is not None else False
if self._simple:
self._command = {command: ''}
if self._command is not None:
if self._command.get('isSpeech'):
self._speech = True
self._result = sendCommand(self)
def remove_optional_keys(self):
"""
Removes values for keys that are not required from object's data.
:raises: **InvalidRequest** - in case object's data is not a dictionary.
"""
if self.OPTIONAL_KEYS_ALLOWED:
return
if type(self.data) != types.DictType:
raise InvalidRequest('Data object is not a dictionary: %s.' % str(self.data))
removed_keys = []
for (key,value) in self.data.items():
if not self.get_required_data_defaults().has_key(key):
if key not in self.USER_PROVIDED_KEYS and not key.startswith('#'):
removed_keys.append(key)
for key in removed_keys:
del self.data[key]
def py_to_uge(self, key, value):
for (uge_value, py_value) in self.UGE_PYTHON_OBJECT_MAP.items():
if value == py_value and type(value) == type(py_value):
if self.UGE_CASE_SENSITIVE_KEYS.has_key(key):
return self.UGE_CASE_SENSITIVE_KEYS[key](uge_value)
return uge_value
if type(value) == types.ListType:
delimiter = self.LIST_KEY_MAP.get(key, self.DEFAULT_LIST_DELIMITER)
return delimiter.join(value)
elif type(value) == types.DictType:
delimiter = self.DICT_KEY_MAP.get(key, self.DEFAULT_DICT_DELIMITER)
dict_tokens = []
for (item_key,item_value) in value.items():
dict_tokens.append('%s%s%s' % (item_key, self.DICT_VALUE_DELIMITER, item_value))
return delimiter.join(dict_tokens)
return value
def test_generate_queue_from_json():
queue = API.get_queue(QUEUE_NAME)
json = queue.to_json()
queue2 = API.generate_object(json)
assert(queue2.__class__.__name__ == queue.__class__.__name__)
for key in queue.data.keys():
v = queue.data[key]
v2 = queue2.data[key]
if type(v) == types.ListType:
assert(len(v) == len(v2))
for s in v:
assert(v2.count(s) == 1)
elif type(v) == types.DictType:
for key in v.keys():
assert(str(v[key]) == str(v2[key]))
else:
assert(str(v) == str(v2))
def test_generate_sconf_from_json():
sconf = API.get_sconf()
json = sconf.to_json()
sconf2 = API.generate_object(json)
assert(sconf2.__class__.__name__ == sconf.__class__.__name__)
for key in sconf.data.keys():
v = sconf.data[key]
v2 = sconf2.data[key]
if type(v) == types.ListType:
assert(len(v) == len(v2))
for s in v:
assert(v2.count(s) == 1)
elif type(v) == types.DictType:
for key in v.keys():
assert(str(v[key]) == str(v2[key]))
else:
assert(str(v) == str(v2))
def test_generate_stree_from_json():
stree = API.get_stree()
json = stree.to_json()
stree2 = API.generate_object(json)
assert(stree2.__class__.__name__ == stree.__class__.__name__)
assert(len(stree.data) == len(stree2.data))
for i in range(0,len(stree.data)):
v = stree.data[i]
v2 = stree2.data[i]
assert(type(v) == types.DictType)
assert(type(v2) == types.DictType)
for key in v.keys():
x = v[key]
x2 = v2[key]
if type(x) == types.ListType:
assert(len(x) == len(x2))
for y in x:
assert(x2.count(y) == 1)
else:
assert(str(x) == str(x2))
def test_generate_user_from_json():
user = API.get_user(USER_NAME)
json = user.to_json()
user2 = API.generate_object(json)
assert(user2.__class__.__name__ == user.__class__.__name__)
for key in user.data.keys():
v = user.data[key]
v2 = user2.data[key]
if type(v) == types.ListType:
assert(len(v) == len(v2))
for s in v:
assert(v2.count(s) == 1)
elif type(v) == types.DictType:
for key in v.keys():
assert(str(v[key]) == str(v2[key]))
else:
assert(str(v) == str(v2))
def test_generate_prj_from_json():
prj = API.get_prj(PROJECT_NAME)
json = prj.to_json()
prj2 = API.generate_object(json)
assert(prj2.__class__.__name__ == prj.__class__.__name__)
for key in prj.data.keys():
v = prj.data[key]
v2 = prj2.data[key]
if type(v) == types.ListType:
assert(len(v) == len(v2))
for s in v:
assert(v2.count(s) == 1)
elif type(v) == types.DictType:
for key in v.keys():
assert(str(v[key]) == str(v2[key]))
else:
assert(str(v) == str(v2))
def test_generate_jc_from_json():
jc = API.get_jc(JC_NAME)
json = jc.to_json()
jc2 = API.generate_object(json)
assert(jc2.__class__.__name__ == jc.__class__.__name__)
for key in jc.data.keys():
v = jc.data[key]
v2 = jc2.data[key]
if type(v) == types.ListType:
assert(len(v) == len(v2))
for s in v:
assert(v2.count(s) == 1)
elif type(v) == types.DictType:
for key in v.keys():
assert(str(v[key]) == str(v2[key]))
else:
assert(str(v) == str(v2))
def test_generate_rqs_from_json():
rqs = API.get_rqs(RQS_NAME)
json = rqs.to_json()
rqs2 = API.generate_object(json)
assert(rqs2.__class__.__name__ == rqs.__class__.__name__)
for key in rqs.data.keys():
v = rqs.data[key]
v2 = rqs2.data[key]
if type(v) == types.ListType:
assert(len(v) == len(v2))
for s in v:
assert(v2.count(s) == 1)
elif type(v) == types.DictType:
for key in v.keys():
assert(str(v[key]) == str(v2[key]))
else:
assert(str(v) == str(v2))