def parse_qs(qs, keep_blank_values=0, strict_parsing=0, unquote=unquote):
"""like cgi.parse_qs, only with custom unquote function"""
d = {}
items = [s2 for s1 in qs.split("&") for s2 in s1.split(";")]
for item in items:
try:
k, v = item.split("=", 1)
except ValueError:
if strict_parsing:
raise
continue
if v or keep_blank_values:
k = unquote(k.replace("+", " "))
v = unquote(v.replace("+", " "))
if k in d:
d[k].append(v)
else:
d[k] = [v]
return d
python类split()的实例源码
def lineReceived(self, line):
if self.firstLine:
self.firstLine = 0
l = line.split(None, 2)
version = l[0]
status = l[1]
try:
message = l[2]
except IndexError:
# sometimes there is no message
message = ""
self.handleStatus(version, status, message)
return
if line:
key, val = line.split(':', 1)
val = val.lstrip()
self.handleHeader(key, val)
if key.lower() == 'content-length':
self.length = int(val)
else:
self.__buffer = StringIO()
self.handleEndHeaders()
self.setRawMode()
def _authorize(self):
# Authorization, (mostly) per the RFC
try:
authh = self.getHeader("Authorization")
if not authh:
self.user = self.password = ''
return
bas, upw = authh.split()
if bas.lower() != "basic":
raise ValueError
upw = base64.decodestring(upw)
self.user, self.password = upw.split(':', 1)
except (binascii.Error, ValueError):
self.user = self.password = ""
except:
log.err()
self.user = self.password = ""
def getMostValueableStockList(self):
'''????????'''
page = 1
cList = []
while True:
res = getHtmlFromUrl(mostValueableStockUrl % page)
if res:page += 1
companyListObj = getJsonObj(res)
if companyListObj:
list = companyListObj['Results']
if list and len(list):
for item in list:
stockInfo = item.split(',')
jzcsyl = str(float(stockInfo[5].split('(')[0]) * 100) + '%'
fhlrzzl = str(float(stockInfo[3].split('(')[0]) * 100) + '%'
orgCount = stockInfo[4].split('(')[0]
sz = str(int(float(stockInfo[6])/10000/10000))
cinfo = MostValueableCompanyInfo(stockInfo[1],stockInfo[2],jzcsyl,fhlrzzl,orgCount,sz)
cList.append(cinfo)
if len(list) < pageSize:break
#???????????????????????
if int(companyListObj['PageCount']) < page:break
else:break
else:break
return cList
def parse_qs(qs, keep_blank_values=0, strict_parsing=0, unquote=unquote):
"""like cgi.parse_qs, only with custom unquote function"""
d = {}
items = [s2 for s1 in qs.split("&") for s2 in s1.split(";")]
for item in items:
try:
k, v = item.split("=", 1)
except ValueError:
if strict_parsing:
raise
continue
if v or keep_blank_values:
k = unquote(k.replace("+", " "))
v = unquote(v.replace("+", " "))
if k in d:
d[k].append(v)
else:
d[k] = [v]
return d
def lineReceived(self, line):
if self.firstLine:
self.firstLine = 0
l = line.split(None, 2)
version = l[0]
status = l[1]
try:
message = l[2]
except IndexError:
# sometimes there is no message
message = ""
self.handleStatus(version, status, message)
return
if line:
key, val = line.split(':', 1)
val = val.lstrip()
self.handleHeader(key, val)
if key.lower() == 'content-length':
self.length = int(val)
else:
self.__buffer = StringIO()
self.handleEndHeaders()
self.setRawMode()
def _authorize(self):
# Authorization, (mostly) per the RFC
try:
authh = self.getHeader("Authorization")
if not authh:
self.user = self.password = ''
return
bas, upw = authh.split()
if bas.lower() != "basic":
raise ValueError
upw = base64.decodestring(upw)
self.user, self.password = upw.split(':', 1)
except (binascii.Error, ValueError):
self.user = self.password = ""
except:
log.err()
self.user = self.password = ""
def parse_time(date, time):
current_time = datetime.datetime.now(timezone)
if date == '???????':
date = current_time.date()
elif date == '?????':
date = (current_time - datetime.timedelta(days=1)).date()
else:
date = date.split(' ')
day = int(date[0])
if len(date) == 3:
year = int(date[2])
else:
year = current_time.year
month = date[1].lower()
for i, name in enumerate(months):
if name.startswith(month):
month = i + 1
break
date = datetime.date(year=year, day=day, month=month)
time = [int(i) for i in time.split(':')]
time = datetime.time(hour=time[0], minute=time[1])
return datetime.datetime.combine(date=date, time=time)
consume_rank_feature.py 文件源码
项目:DataMiningCompetitionFirstPrize
作者: lzddzh
项目源码
文件源码
阅读 30
收藏 0
点赞 0
评论 0
def extract_consume_per_person(file_name, consume_dict):
lines = open(file_name).readlines()
for line in lines:
temps = line.strip("\r\n").split("$")
id = temps[0]
totol_amount = 0
active_date_set = set()
for i in range(1, len(temps)):
records = temps[i].split(",")
cate = records[0].strip("\"")
amount = float(records[4].strip("\""))
time = records[3].strip("\"")
date = time.split(" ")[0]
active_date_set.add(date)
if cate == "POS??":
totol_amount += amount
consume_dict[id] = float(totol_amount) / len(active_date_set)
consume_rank_feature.py 文件源码
项目:DataMiningCompetitionFirstPrize
作者: lzddzh
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def extract_rank_feature(file_name, final_rank, score_dict, if_train):
if if_train:
w = open("../original_data/rank_feature_train.txt", 'w')
else:
w = open("../original_data/rank_feature_test.txt", 'w')
lines = open(file_name).readlines()
for line in lines:
if if_train:
id = line.strip().split(",")[0]
else:
id = line.strip()
print id
w.write("{")
w.write('"stuId": ' + id + ", ")
if score_dict.has_key(id) and final_rank.has_key(id):
w.write('"rank_in_faculty":' + str(final_rank[id]) + "," + '"rank_score_consume":' + str(
final_rank[id] * score_dict[id]) + "} \n")
else:
w.write(
'"rank_in_faculty":' + str(final_rank.get(id, -999)) + "," + '"rank_score_consume":' + str(
-999) + "} \n")
w.close()
def extract_stations(self, page):
r = re.compile(r'station\(([^)]+)\)')
stations = []
for str in r.findall(page):
toks = str.replace("'",'').replace(',,',',').split(',')
if toks[0] == 'name': continue
st = Station( station=toks[0],
network=toks[1],
dist=float(toks[2]),
azimuth=float(toks[3]),
channels=toks[4:-1],
snr=float(toks[-1]) )
stations.append(st)
return stations
def readEvent(EventPath):
Origin = {}
logger.info('\033[31m Parsing EventFile \033[0m \n')
try:
for i in os.listdir(EventPath):
if fnmatch.fnmatch(i, '*.origin'):
evfile = os.path.join(EventPath,i)
fobj = open(evfile, 'r')
for line in fobj:
line = str.split(line, '=')
key = (line[0].replace('\'', '')).strip()
value = line[1].replace('\n','').strip()
Origin[key] = value
except:
logger.info('\033[31m File Error Exception \033[0m \n')
return Origin
def readConfig(EventPath):
Config = {}
logger.info('\033[31m Parsing ConfigFile \033[0m \n')
try:
for i in os.listdir(EventPath):
if fnmatch.fnmatch(i, '*.config'):
evfile = os.path.join(EventPath,i)
fobj = open(evfile, 'r')
for line in fobj:
if line[0] != '#' and len(line) > 1:
line = str.split(line, '=')
key = (line[0].replace('\'', '')).strip()
value = line[1].replace('\n','').strip()
Config[key] = value
except:
logger.info('\033[31m File Error Exception \033[0m \n')
return Config
def parse_qs(qs, keep_blank_values=0, strict_parsing=0):
"""
Like C{cgi.parse_qs}, but with support for parsing byte strings on Python 3.
@type qs: C{bytes}
"""
d = {}
items = [s2 for s1 in qs.split(b"&") for s2 in s1.split(b";")]
for item in items:
try:
k, v = item.split(b"=", 1)
except ValueError:
if strict_parsing:
raise
continue
if v or keep_blank_values:
k = unquote(k.replace(b"+", b" "))
v = unquote(v.replace(b"+", b" "))
if k in d:
d[k].append(v)
else:
d[k] = [v]
return d
def fromChunk(data):
"""
Convert chunk to string.
@type data: C{bytes}
@return: tuple of (result, remaining) - both C{bytes}.
@raise ValueError: If the given data is not a correctly formatted chunked
byte string.
"""
prefix, rest = data.split(b'\r\n', 1)
length = int(prefix, 16)
if length < 0:
raise ValueError("Chunk length must be >= 0, not %d" % (length,))
if rest[length:length + 2] != b'\r\n':
raise ValueError("chunk must end with CRLF")
return rest[:length], rest[length + 2:]
def parseContentRange(header):
"""
Parse a content-range header into (start, end, realLength).
realLength might be None if real length is not known ('*').
"""
kind, other = header.strip().split()
if kind.lower() != "bytes":
raise ValueError("a range of type %r is not supported")
startend, realLength = other.split("/")
start, end = map(int, startend.split("-"))
if realLength == "*":
realLength = None
else:
realLength = int(realLength)
return (start, end, realLength)
def parseCookies(self):
"""
Parse cookie headers.
This method is not intended for users.
"""
cookieheaders = self.requestHeaders.getRawHeaders(b"cookie")
if cookieheaders is None:
return
for cookietxt in cookieheaders:
if cookietxt:
for cook in cookietxt.split(b';'):
cook = cook.lstrip()
try:
k, v = cook.split(b'=', 1)
self.received_cookies[k] = v
except ValueError:
pass
def getRequestHostname(self):
"""
Get the hostname that the user passed in to the request.
This will either use the Host: header (if it is available) or the
host we are listening on if the header is unavailable.
@returns: the requested hostname
@rtype: C{bytes}
"""
# XXX This method probably has no unit tests. I changed it a ton and
# nothing failed.
host = self.getHeader(b'host')
if host:
return host.split(b':', 1)[0]
return networkString(self.getHost().host)
def _authorize(self):
# Authorization, (mostly) per the RFC
try:
authh = self.getHeader(b"Authorization")
if not authh:
self.user = self.password = ''
return
bas, upw = authh.split()
if bas.lower() != b"basic":
raise ValueError()
upw = base64.decodestring(upw)
self.user, self.password = upw.split(b':', 1)
except (binascii.Error, ValueError):
self.user = self.password = ""
except:
log.err()
self.user = self.password = ""
def _srtToAtoms(self, srtText):
subAtoms = []
srtText = srtText.replace('\r\n', '\n').split('\n\n')
line = 0
for idx in range(len(srtText)):
line += 1
st = srtText[idx].split('\n')
#printDBG("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++")
#printDBG(st)
#printDBG("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++")
if len(st)>=2:
try:
try:
tmp = int(st[0].strip())
i = 1
except Exception:
if '' == st[0]: i = 1
else: i = 0
if len(st)<(i+2): continue
split = st[i].split(' --> ')
subAtoms.append( { 'start':self._srtTc2ms(split[0].strip()), 'end':self._srtTc2ms(split[1].strip()), 'text':self._srtClearText('\n'.join(j for j in st[i+1:len(st)])) } )
except Exception:
printExc("Line number [%d]" % line)
return subAtoms
def test_use_cache():
"""With cached files this should come back in under a second"""
# Generate cached files
cmd_list = [NETMIKO_GREP] + ['interface', 'all']
subprocess_handler(cmd_list)
cmd_list = [NETMIKO_GREP] + ['--use-cache', '--display-runtime', 'interface', 'all']
(output, std_err) = subprocess_handler(cmd_list)
match = re.search(r"Total time: (0:.*)", output)
time = match.group(1)
_, _, seconds = time.split(":")
seconds = float(seconds)
assert seconds <= 1
assert 'pynet_rtr1.txt:interface FastEthernet0' in output
def test_display_failed():
"""Verify failed devices are showing"""
cmd_list = [NETMIKO_GREP] + ['interface', 'all']
(output, std_err) = subprocess_handler(cmd_list)
assert "Failed devices" in output
failed_devices = output.split("Failed devices:")[1]
failed_devices = failed_devices.strip().split("\n")
failed_devices = [x.strip() for x in failed_devices]
assert len(failed_devices) == 2
assert "bad_device" in failed_devices
assert "bad_port" in failed_devices
def fromChunk(data):
"""Convert chunk to string.
@returns: tuple (result, remaining), may raise ValueError.
"""
prefix, rest = data.split('\r\n', 1)
length = int(prefix, 16)
if length < 0:
raise ValueError("Chunk length must be >= 0, not %d" % (length,))
if not rest[length:length + 2] == '\r\n':
raise ValueError, "chunk must end with CRLF"
return rest[:length], rest[length + 2:]
def parseContentRange(header):
"""Parse a content-range header into (start, end, realLength).
realLength might be None if real length is not known ('*').
"""
kind, other = header.strip().split()
if kind.lower() != "bytes":
raise ValueError, "a range of type %r is not supported"
startend, realLength = other.split("/")
start, end = map(int, startend.split("-"))
if realLength == "*":
realLength = None
else:
realLength = int(realLength)
return (start, end, realLength)
def setLastModified(self, when):
"""Set the X{Last-Modified} time for the response to this request.
If I am called more than once, I ignore attempts to set
Last-Modified earlier, only replacing the Last-Modified time
if it is to a later value.
If I am a conditional request, I may modify my response code
to L{NOT_MODIFIED} if appropriate for the time given.
@param when: The last time the resource being returned was
modified, in seconds since the epoch.
@type when: number
@return: If I am a X{If-Modified-Since} conditional request and
the time given is not newer than the condition, I return
L{http.CACHED<CACHED>} to indicate that you should write no
body. Otherwise, I return a false value.
"""
# time.time() may be a float, but the HTTP-date strings are
# only good for whole seconds.
when = long(math.ceil(when))
if (not self.lastModified) or (self.lastModified < when):
self.lastModified = when
modified_since = self.getHeader('if-modified-since')
if modified_since:
modified_since = stringToDatetime(modified_since.split(';', 1)[0])
if modified_since >= when:
self.setResponseCode(NOT_MODIFIED)
return CACHED
return None
def setETag(self, etag):
"""Set an X{entity tag} for the outgoing response.
That's \"entity tag\" as in the HTTP/1.1 X{ETag} header, \"used
for comparing two or more entities from the same requested
resource.\"
If I am a conditional request, I may modify my response code
to L{NOT_MODIFIED} or L{PRECONDITION_FAILED}, if appropriate
for the tag given.
@param etag: The entity tag for the resource being returned.
@type etag: string
@return: If I am a X{If-None-Match} conditional request and
the tag matches one in the request, I return
L{http.CACHED<CACHED>} to indicate that you should write
no body. Otherwise, I return a false value.
"""
if etag:
self.etag = etag
tags = self.getHeader("if-none-match")
if tags:
tags = tags.split()
if (etag in tags) or ('*' in tags):
self.setResponseCode(((self.method in ("HEAD", "GET"))
and NOT_MODIFIED)
or PRECONDITION_FAILED)
return CACHED
return None
def getRequestHostname(self):
"""Get the hostname that the user passed in to the request.
This will either use the Host: header (if it is available) or the
host we are listening on if the header is unavailable.
"""
return (self.getHeader('host') or
socket.gethostbyaddr(self.getHost()[1])[0]
).split(':')[0]
def headerReceived(self, line):
"""Do pre-processing (for content-length) and store this header away.
"""
header, data = line.split(':', 1)
header = header.lower()
data = data.strip()
if header == 'content-length':
self.length = int(data)
reqHeaders = self.requests[-1].received_headers
reqHeaders[header] = data
if len(reqHeaders) > self.maxHeaders:
self.transport.write("HTTP/1.1 400 Bad Request\r\n\r\n")
self.transport.loseConnection()
def checkPersistence(self, request, version):
"""Check if the channel should close or not."""
connection = request.getHeader('connection')
if connection:
tokens = map(str.lower, connection.split(' '))
else:
tokens = []
# HTTP 1.0 persistent connection support is currently disabled,
# since we need a way to disable pipelining. HTTP 1.0 can't do
# pipelining since we can't know in advance if we'll have a
# content-length header, if we don't have the header we need to close the
# connection. In HTTP 1.1 this is not an issue since we use chunked
# encoding if content-length is not available.
#if version == "HTTP/1.0":
# if 'keep-alive' in tokens:
# request.setHeader('connection', 'Keep-Alive')
# return 1
# else:
# return 0
if version == "HTTP/1.1":
if 'close' in tokens:
request.setHeader('connection', 'close')
return 0
else:
return 1
else:
return 0
def getJsonObj(obj):
if not obj:return None
if hasHTML(obj):return None
#"var moJuuzHq="{"Results":["2,300672,???,?","2,300676,????,?","1,603612,????,?","1,603707,????,?","2,002888,????,?","2,300678,????,?","2,002889,????,?","1,603860,????,?","2,300685,????,?","2,300687,????,?","1,603880,????,?","2,300689,????,?","1,603602,????,?","2,300688,????,?","1,603721,????,?","2,300691,????,?","1,601326,????,?","1,603776,???,?","2,002892,???,?","1,603129,????,?","1,603557,????,?"],"AllCount":"21","PageCount":"1","AtPage":"1","PageSize":"40","ErrMsg":"","UpdateTime":"2017/8/19 13:37:03","TimeOut":"3ms"}"
# newobj = obj.split('=')[1] #//???????= ??
# return simplejson.loads(newobj)
newobj = "{" + obj.split('={')[1]
return simplejson.loads(newobj)