def _send_data_part(data, connection):
if isinstance(data, types.StringTypes):
connection.send(data)
return
# Check to see if data is a file-like object that has a read method.
elif hasattr(data, 'read'):
# Read the file and send it a chunk at a time.
while 1:
binarydata = data.read(100000)
if binarydata == '': break
connection.send(binarydata)
return
else:
# The data object was not a file.
# Try to convert to a string and send the data.
connection.send(str(data))
return
python类StringTypes()的实例源码
def _setTarget(self, address, prefix=None, filters=None):
"""Add (i.e. subscribe) a new OSCTarget, or change the prefix for an existing OSCTarget.
- address ((host, port) tuple): IP-address & UDP-port
- prefix (string): The OSC-address prefix prepended to the address of each OSCMessage
sent to this OSCTarget (optional)
"""
if address not in self.targets.keys():
self.targets[address] = ["",{}]
if prefix != None:
if len(prefix):
# make sure prefix starts with ONE '/', and does not end with '/'
prefix = '/' + prefix.strip('/')
self.targets[address][0] = prefix
if filters != None:
if type(filters) in types.StringTypes:
(_, filters) = parseFilterStr(filters)
elif type(filters) != types.DictType:
raise TypeError("'filters' argument must be a dict with {addr:bool} entries")
self._updateFilters(self.targets[address][1], filters)
def setOSCTarget(self, address, prefix=None, filters=None):
"""Add (i.e. subscribe) a new OSCTarget, or change the prefix for an existing OSCTarget.
the 'address' argument can be a ((host, port) tuple) : The target server address & UDP-port
or a 'host' (string) : The host will be looked-up
- prefix (string): The OSC-address prefix prepended to the address of each OSCMessage
sent to this OSCTarget (optional)
"""
if type(address) in types.StringTypes:
address = self._searchHostAddr(address)
elif (type(address) == types.TupleType):
(host, port) = address[:2]
try:
host = socket.gethostbyname(host)
except:
pass
address = (host, port)
else:
raise TypeError("'address' argument must be a (host, port) tuple or a 'host' string")
self._setTarget(address, prefix, filters)
def delOSCTarget(self, address, prefix=None):
"""Delete the specified OSCTarget from the Client's dict.
the 'address' argument can be a ((host, port) tuple), or a hostname.
If the 'prefix' argument is given, the Target is only deleted if the address and prefix match.
"""
if type(address) in types.StringTypes:
address = self._searchHostAddr(address)
if type(address) == types.TupleType:
(host, port) = address[:2]
try:
host = socket.gethostbyname(host)
except socket.error:
pass
address = (host, port)
self._delTarget(address, prefix)
def getOSCTarget(self, address):
"""Returns the OSCTarget matching the given address as a ((host, port), [prefix, filters]) tuple.
'address' can be a (host, port) tuple, or a 'host' (string), in which case the first matching OSCTarget is returned
Returns (None, ['',{}]) if address not found.
"""
if type(address) in types.StringTypes:
address = self._searchHostAddr(address)
if (type(address) == types.TupleType):
(host, port) = address[:2]
try:
host = socket.gethostbyname(host)
except socket.error:
pass
address = (host, port)
if (address in self.targets.keys()):
try:
(host, _, _) = socket.gethostbyaddr(host)
except socket.error:
pass
return ((host, port), self.targets[address])
return (None, ['',{}])
def _build_selector(self, selector, cmd):
if not isinstance(selector, StringTypes):
self._selector = selector(self)._selector
return self
_selector = selector
component = self
# ???
for name, _component in self.components.items():
if _selector.startswith(name):
_selector = _component.selector + _selector[len(name):]
component = _component(self)
break
if not self._selector:
if cmd:
self._selector = "$(NODE).%s('%s')" % (cmd, _selector)
else:
self._selector = "$('%s')" % _selector
else:
self._selector += ".%s('%s')" % (cmd, _selector)
# ???
return component
def getValueStrings( val, blnUgly=True ):
#Used by joinWithComma function to join list items for SQL queries.
#Expects to receive 'valid' types, as this was designed specifically for joining object attributes and nonvalid attributes were pulled.
#If the default blnUgly is set to false, then the nonvalid types are ignored and the output will be pretty, but the SQL Insert statement will
#probably be wrong.
tplStrings = (types.StringType, types.StringTypes )
tplNums = ( types.FloatType, types.IntType, types.LongType, types.BooleanType )
if isinstance( val, tplNums ):
return '#num#'+ str( val ) + '#num#'
elif isinstance( val, tplStrings ):
strDblQuote = '"'
return strDblQuote + val + strDblQuote
else:
if blnUgly == True:
return "Error: nonconvertable value passed - value type: %s" % type(val )
else:
return None
def scopes_to_string(scopes):
"""Converts scope value to a string.
If scopes is a string then it is simply passed through. If scopes is an
iterable then a string is returned that is all the individual scopes
concatenated with spaces.
Args:
scopes: string or iterable of strings, the scopes.
Returns:
The scopes formatted as a single string.
"""
if isinstance(scopes, types.StringTypes):
return scopes
else:
return ' '.join(scopes)
def is_ip4(ip):
if type(ip) not in types.StringTypes:
return False
ip = ip.split('.')
for s in ip:
if not s.isdigit():
return False
i = int(s)
if i < 0 or i > 255:
return False
return len(ip) == 4
def parse_lock_data(data_str):
"""
Parse string generated by lock_data()
"""
node_id, ip, process_id = (data_str.split('-') + ([None] * 3))[:3]
if type(process_id) in types.StringTypes and process_id.isdigit():
process_id = int(process_id)
else:
process_id = None
return {
'node_id': node_id,
'ip': ip,
'process_id': process_id,
}
def __call__(self, context):
if isinstance(self._object, types.StringTypes):
self._object = context[self._object]
# get dialect object
if isinstance(self._dialect, types.StringTypes):
dialect = csv.get_dialect(self._dialect)
if self._path.startswith("memory:"):
buffer_ = StringIO.StringIO()
self._write_object(buffer_, dialect)
buffer_.seek(0)
context[self._path[len("memory:"):]] = buffer_
else:
with open(self._path, "w") as f:
self._write_object(f, dialect)
return self._path
def validate_config(self, config):
"""????????????????"""
if self._validated:
return
for section in config.prefix("smtp_"):
smtp_config_items = config[section]
for rule in SMTPManager.config_rules:
item_value = smtp_config_items.get(rule.name)
rule.validate(item_value)
if item_value is None:
smtp_config_items[rule.name] = rule.default
if rule.name == "port" and isinstance(
item_value, types.StringTypes):
smtp_config_items["port"] = int(item_value)
smtp_config = SMTPConfig(**smtp_config_items)
self._all_smtp_config[section] = smtp_config
self._validated = True
def __init__(self, attachment_file, mime_type, attachment_filename=None):
"""
:param attachment_file ?????????????file????StringIO???
????????????????????
:param mime_type ???mime type???application/octet-stream
:param attachment_filename ?????????
"""
if attachment_filename is None:
if isinstance(attachment_file, types.StringTypes):
self._attachment_filename = os.path.split(
attachment_file)[1]
elif isinstance(attachment_file, types.FileType):
self._attachment_filename = os.path.split(
attachment_file.name)[1]
else:
raise InvalidArgumentException(
u"????attachement_filename?????????")
def _build_query(self, engine, session, query_items):
query = session.query(*query_items)
if self._query is not None:
if isinstance(self._query, types.StringTypes):
query = query.filter(text(self._query))
else:
query = query.filter(self._query)
if self._order_by is not None:
query = query.order_by(self._order_by)
if self._group_by is not None:
if isinstance(self._group_by, types.StringTypes):
self._group_by = self.automap(engine, self._group_by)
query = query.group_by(self._group_by)
if self._params is not None:
query = query.params(**self._params)
return query
def __call__(self, context):
global _engine_manager
engine_container = _engine_manager.engine(self._engine_name)
session = engine_container.session()
if isinstance(self._sql, types.StringTypes) and \
_SELECT_STATEMENT_REGEX.search(self._sql):
return self._execute_select_statement(session, context)
else:
# ?????
try:
if isinstance(self._sql, types.StringTypes):
session.execute(self._sql, self._params)
# ????
elif isinstance(self._sql, SequenceCollectionType):
if isinstance(self._params, SequenceCollectionType):
for idx, sql in enumerate(self._sql):
session.execute(sql, self._params[idx])
else:
for sql in self._sql:
session.execute(sql)
session.commit()
finally:
session.close()
def _handle_titles(self, titles, auto_title_name):
"""?Title????????????title
???Title????????????:
(Title("id", u"??"), Title("name", u"??"), Title("grade", u"??"))
???????????:
("id", u"??", "name", u"??", "grade", u"??")
"""
first_ele = titles[0]
if isinstance(first_ele, Title):
return titles
elif isinstance(first_ele, types.StringTypes):
if auto_title_name:
return [Title("field_%d" % idx, arg)
for idx, arg in enumerate(titles)]
else:
return [Title(*arg) for arg in zip(titles[::2], titles[1::2])]
else:
raise InvalidTypeException(u"title???????????Title??")
def __init__(self, url, fileOrName,
method='GET', postdata=None, headers=None,
agent="Twisted client", supportPartial=0):
self.requestedPartial = 0
if isinstance(fileOrName, types.StringTypes):
self.fileName = fileOrName
self.file = None
if supportPartial and os.path.exists(self.fileName):
fileLength = os.path.getsize(self.fileName)
if fileLength:
self.requestedPartial = fileLength
if headers == None:
headers = {}
headers["range"] = "bytes=%d-" % fileLength
else:
self.file = fileOrName
HTTPClientFactory.__init__(self, url, method=method, postdata=postdata, headers=headers, agent=agent)
self.deferred = defer.Deferred()
self.waiting = 1
def _send_data_part(data, connection):
if isinstance(data, types.StringTypes):
connection.send(data)
return
# Check to see if data is a file-like object that has a read method.
elif hasattr(data, 'read'):
# Read the file and send it a chunk at a time.
while 1:
binarydata = data.read(100000)
if binarydata == '': break
connection.send(binarydata)
return
else:
# The data object was not a file.
# Try to convert to a string and send the data.
connection.send(str(data))
return
def scopes_to_string(scopes):
"""Converts scope value to a string.
If scopes is a string then it is simply passed through. If scopes is an
iterable then a string is returned that is all the individual scopes
concatenated with spaces.
Args:
scopes: string or iterable of strings, the scopes.
Returns:
The scopes formatted as a single string.
"""
if isinstance(scopes, types.StringTypes):
return scopes
else:
return ' '.join(scopes)
def _render_without_enums(self, rendered_by_wraper):
""""""
_ksn = self._replace_table_without_enum.values()
_ksn = tuple(_ksn)
for i in self._replace_table_without_enum:
_target = self._replace_table_without_enum[i].value
if not isinstance(_target, types.StringTypes):
_target = str(_target)
_tag = self._replace_table_without_enum[i]
if self._wraper_flag[_tag]:
rendered_by_wraper = rendered_by_wraper.replace(i, self._wrap(_target))
else:
rendered_by_wraper = rendered_by_wraper.replace(i, _target)
return rendered_by_wraper
#----------------------------------------------------------------------
def fmtstr(self, *fmt):
str = ''
encoding = 'utf8'#??utf8??
for i in fmt:
if not type(i) in [types.UnicodeType, types.StringTypes, types.StringType]:
s= repr(i)
else:
s = i
if type(s) == type(u''):
str += s.encode(encoding)
else:
str += s
str += '.'
#str += '/n'
#print 'fmtstr:'+str
return str
def expect_exact(self, pattern_list, timeout = -1, searchwindowsize = -1):
"""This is similar to expect(), but uses plain string matching instead
of compiled regular expressions in 'pattern_list'. The 'pattern_list'
may be a string; a list or other sequence of strings; or TIMEOUT and
EOF.
This call might be faster than expect() for two reasons: string
searching is faster than RE matching and it is possible to limit the
search to just the end of the input buffer.
This method is also useful when you don't want to have to worry about
escaping regular expression characters that you want to match."""
if type(pattern_list) in types.StringTypes or pattern_list in (TIMEOUT, EOF):
pattern_list = [pattern_list]
return self.expect_loop(searcher_string(pattern_list), timeout, searchwindowsize)
def __init__(self, depends_on, encoder=ENC_BITS_DEFAULT, fuzzable=True, name=None):
'''
:param depends_on: (name of) field we depend on
:type encoder: :class:`~kitty.model.low_level.encoder.BitsEncoder`
:param encoder: encoder for the field
:param fuzzable: is container fuzzable
:param name: (unique) name of the container
'''
self._rendered_field = None
self.dependency_type = Calculated.VALUE_BASED
super(Calculated, self).__init__(value=self.__class__._default_value_, encoder=encoder, fuzzable=fuzzable, name=name)
if isinstance(depends_on, types.StringTypes):
self._field_name = depends_on
self._field = None
elif isinstance(depends_on, BaseField):
self._field_name = None
self._field = depends_on
else:
raise KittyException('depends_on parameter (%s) is neither a string nor a valid field' % depends_on)
def __init__(self, depends_on, func, encoder=ENC_STR_DEFAULT, fuzzable=False, name=None):
'''
:param depends_on: (name of) field we depend on
:param func: function for processing of the dependant data. func(str)->str
:type encoder: :class:`~kitty.model.low_level.encoder.StrEncoder`
:param encoder: encoder for the field (default: ENC_STR_DEFAULT)
:param fuzzable: is container fuzzable
:param name: (unique) name of the container
'''
try:
res = func('')
kassert.is_of_types(res, types.StringTypes)
self._func = func
except:
raise KittyException('func should be func(str)->str')
super(CalculatedStr, self).__init__(depends_on=depends_on, encoder=encoder, fuzzable=fuzzable, name=name)
def __init__(self, value, num_bits=1, fuzzable=True, name=None):
'''
:param value: value to mutate (str)
:param num_bits: number of consequtive bits to flip (invert)
:param fuzzable: is field fuzzable (default: True)
:param name: name of the object (default: None)
:raises: ``KittyException`` if num_bits is bigger than the value length in bits
:raises: ``KittyException`` if num_bits is not positive
'''
kassert.is_of_types(value, types.StringTypes)
if len(value) * 8 < num_bits:
raise KittyException('len of value in bits(%d) < num_bits(%d)' % (len(value) * 8, num_bits))
if num_bits <= 0:
raise KittyException('num_bits(%d) <= 0' % (num_bits))
super(BitFlip, self).__init__(value=Bits(bytes=value), encoder=ENC_BITS_DEFAULT, fuzzable=fuzzable, name=name)
self._data_len = len(value) * 8
self._num_bits = num_bits
self._num_mutations = self._data_len - (num_bits - 1)
def type_check(tags):
"""Perform a type check on a list of tags"""
if type(tags) in (types.ListType, types.TupleType):
single = False
elif tags == None:
tags = []
single = False
else:
tags = [tags]
single = True
if len([t for t in tags if type(t) not in types.StringTypes]) == 0:
valid = True
else:
valid = False
return tags, single, valid
def __init__(self, collection_names, collection_labels=None, file_names=None, field_names=None):
# load the collections
if isinstance(collection_names,types.StringTypes):
collection_names = [ collection_names ]
if collection_labels is None:
collection_labels = [ None ] * len(collection_names)
self._collections = [ Collection(name,label,file_names,field_names) \
for name,label in zip(collection_names,collection_labels) ]
# find the set of common galaxies and the set of common properties
if len(self._collections) > 1:
self._ids = sorted(reduce(lambda x,y: x&y, [ c.galaxy_ids() for c in self._collections ]))
self._props = reduce(lambda x,y: x&y, [ c.property_names() for c in self._collections ])
else:
self._ids = sorted(self._collections[0].galaxy_ids())
self._props = self._collections[0].property_names()
# print the number of common galaxies
print "Loaded a set of {} collections with {} common galaxies and {} common properties" \
.format(len(self._collections), len(self._ids), len(self._props))
## This function returns a two-dimensional array with the values of the specified property for all common galaxies
# in all collections of the set. The index on the first axis iterates over the collections, the index on the last
# axis iterates over the galaxies, in order of increasing galaxy id.
def __init__(self, collection_names, collection_labels=None, file_names=None, field_names=None):
# load the collections
if isinstance(collection_names,types.StringTypes):
collection_names = [ collection_names ]
if collection_labels is None:
collection_labels = [ None ] * len(collection_names)
self._collections = [ Collection(name,label,file_names,field_names) \
for name,label in zip(collection_names,collection_labels) ]
# find the set of common galaxies and the set of common properties
if len(self._collections) > 1:
self._ids = sorted(reduce(lambda x,y: x&y, [ c.galaxy_ids() for c in self._collections ]))
self._props = reduce(lambda x,y: x&y, [ c.property_names() for c in self._collections ])
else:
self._ids = sorted(self._collections[0].galaxy_ids())
self._props = self._collections[0].property_names()
# print the number of common galaxies
print "Loaded a set of {} collections with {} common galaxies and {} common properties" \
.format(len(self._collections), len(self._ids), len(self._props))
## This function returns a two-dimensional array with the values of the specified property for all common galaxies
# in all collections of the set. The index on the first axis iterates over the collections, the index on the last
# axis iterates over the galaxies, in order of increasing galaxy id.
def __init__(self, url, fileOrName,
method='GET', postdata=None, headers=None,
agent="Twisted client", supportPartial=0):
self.requestedPartial = 0
if isinstance(fileOrName, types.StringTypes):
self.fileName = fileOrName
self.file = None
if supportPartial and os.path.exists(self.fileName):
fileLength = os.path.getsize(self.fileName)
if fileLength:
self.requestedPartial = fileLength
if headers == None:
headers = {}
headers["range"] = "bytes=%d-" % fileLength
else:
self.file = fileOrName
HTTPClientFactory.__init__(self, url, method=method, postdata=postdata, headers=headers, agent=agent)
self.deferred = defer.Deferred()
self.waiting = 1
def p_prop(self, p):
'''prop : prop_name operator prop_value
'''
if p[1].value.upper() in self.INT_TYPE_PROPNAMES:
if p[2].value == '~=':
self._error('"%s"???????"~="???'%(p[1].value), p[2], p[2].lexpos)
if not isinstance(p[3].value, types.IntType):
try:
p[3].value = int(p[3].value)
except ValueError:
self._error('"%s"??????"%s"??????int??'%(p[1].value, type(p[3].value)), p[3], p[3].lexpos)
if p[1].value.upper() == 'MAXDEPTH':
if p[3].value <= 0:
self._error("MaxDepth?????>0", p[3], p[3].lexpos)
elif p[2].value == '~=':
if not isinstance(p[3].value, types.StringTypes):
self._error('???"~="?????"%s"?????'%(type(p[3].value)), p[2], p[2].lexpos)
p[0] = UIObjectProperty(p[1], p[2], p[3])