python类split()的实例源码

http.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def parse_qs(qs, keep_blank_values=0, strict_parsing=0, unquote=unquote):
    """like cgi.parse_qs, only with custom unquote function"""
    d = {}
    items = [s2 for s1 in qs.split("&") for s2 in s1.split(";")]
    for item in items:
        try:
            k, v = item.split("=", 1)
        except ValueError:
            if strict_parsing:
                raise
            continue
        if v or keep_blank_values:
            k = unquote(k.replace("+", " "))
            v = unquote(v.replace("+", " "))
            if k in d:
                d[k].append(v)
            else:
                d[k] = [v]
    return d
http.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def lineReceived(self, line):
        if self.firstLine:
            self.firstLine = 0
            l = line.split(None, 2)
            version = l[0]
            status = l[1]
            try:
                message = l[2]
            except IndexError:
                # sometimes there is no message
                message = ""
            self.handleStatus(version, status, message)
            return
        if line:
            key, val = line.split(':', 1)
            val = val.lstrip()
            self.handleHeader(key, val)
            if key.lower() == 'content-length':
                self.length = int(val)
        else:
            self.__buffer = StringIO()
            self.handleEndHeaders()
            self.setRawMode()
http.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _authorize(self):
        # Authorization, (mostly) per the RFC
        try:
            authh = self.getHeader("Authorization")
            if not authh:
                self.user = self.password = ''
                return
            bas, upw = authh.split()
            if bas.lower() != "basic":
                raise ValueError
            upw = base64.decodestring(upw)
            self.user, self.password = upw.split(':', 1)
        except (binascii.Error, ValueError):
            self.user = self.password = ""
        except:
            log.err()
            self.user = self.password = ""
stockInfo.py 文件源码 项目:stockUtils 作者: caiyue 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def getMostValueableStockList(self):
        '''????????'''
        page = 1
        cList = []
        while True:
            res = getHtmlFromUrl(mostValueableStockUrl % page)
            if res:page += 1
            companyListObj = getJsonObj(res)
            if companyListObj:
                list =  companyListObj['Results']
                if list and len(list):
                    for item in list:
                        stockInfo = item.split(',')
                        jzcsyl = str(float(stockInfo[5].split('(')[0]) * 100) + '%'
                        fhlrzzl = str(float(stockInfo[3].split('(')[0]) * 100) + '%'
                        orgCount = stockInfo[4].split('(')[0]
                        sz = str(int(float(stockInfo[6])/10000/10000))
                        cinfo = MostValueableCompanyInfo(stockInfo[1],stockInfo[2],jzcsyl,fhlrzzl,orgCount,sz)
                        cList.append(cinfo)
                    if len(list) < pageSize:break
                    #???????????????????????
                    if int(companyListObj['PageCount']) < page:break
                else:break
            else:break
        return cList
http.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def parse_qs(qs, keep_blank_values=0, strict_parsing=0, unquote=unquote):
    """like cgi.parse_qs, only with custom unquote function"""
    d = {}
    items = [s2 for s1 in qs.split("&") for s2 in s1.split(";")]
    for item in items:
        try:
            k, v = item.split("=", 1)
        except ValueError:
            if strict_parsing:
                raise
            continue
        if v or keep_blank_values:
            k = unquote(k.replace("+", " "))
            v = unquote(v.replace("+", " "))
            if k in d:
                d[k].append(v)
            else:
                d[k] = [v]
    return d
http.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def lineReceived(self, line):
        if self.firstLine:
            self.firstLine = 0
            l = line.split(None, 2)
            version = l[0]
            status = l[1]
            try:
                message = l[2]
            except IndexError:
                # sometimes there is no message
                message = ""
            self.handleStatus(version, status, message)
            return
        if line:
            key, val = line.split(':', 1)
            val = val.lstrip()
            self.handleHeader(key, val)
            if key.lower() == 'content-length':
                self.length = int(val)
        else:
            self.__buffer = StringIO()
            self.handleEndHeaders()
            self.setRawMode()
http.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def _authorize(self):
        # Authorization, (mostly) per the RFC
        try:
            authh = self.getHeader("Authorization")
            if not authh:
                self.user = self.password = ''
                return
            bas, upw = authh.split()
            if bas.lower() != "basic":
                raise ValueError
            upw = base64.decodestring(upw)
            self.user, self.password = upw.split(':', 1)
        except (binascii.Error, ValueError):
            self.user = self.password = ""
        except:
            log.err()
            self.user = self.password = ""
CianParser.py 文件源码 项目:RealEstateTelegramBot 作者: PeterZhizhin 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def parse_time(date, time):
    current_time = datetime.datetime.now(timezone)
    if date == '???????':
        date = current_time.date()
    elif date == '?????':
        date = (current_time - datetime.timedelta(days=1)).date()
    else:
        date = date.split(' ')
        day = int(date[0])
        if len(date) == 3:
            year = int(date[2])
        else:
            year = current_time.year
        month = date[1].lower()
        for i, name in enumerate(months):
            if name.startswith(month):
                month = i + 1
                break
        date = datetime.date(year=year, day=day, month=month)

    time = [int(i) for i in time.split(':')]
    time = datetime.time(hour=time[0], minute=time[1])

    return datetime.datetime.combine(date=date, time=time)
consume_rank_feature.py 文件源码 项目:DataMiningCompetitionFirstPrize 作者: lzddzh 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def extract_consume_per_person(file_name, consume_dict):
    lines = open(file_name).readlines()
    for line in lines:
        temps = line.strip("\r\n").split("$")
        id = temps[0]
        totol_amount = 0
        active_date_set = set()

        for i in range(1, len(temps)):
            records = temps[i].split(",")
            cate = records[0].strip("\"")
            amount = float(records[4].strip("\""))
            time = records[3].strip("\"")
            date = time.split(" ")[0]
            active_date_set.add(date)
            if cate == "POS??":
                totol_amount += amount
        consume_dict[id] = float(totol_amount) / len(active_date_set)
consume_rank_feature.py 文件源码 项目:DataMiningCompetitionFirstPrize 作者: lzddzh 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def extract_rank_feature(file_name, final_rank, score_dict, if_train):
    if if_train:
        w = open("../original_data/rank_feature_train.txt", 'w')
    else:
        w = open("../original_data/rank_feature_test.txt", 'w')

    lines = open(file_name).readlines()
    for line in lines:
        if if_train:
            id = line.strip().split(",")[0]
        else:
            id = line.strip()
            print id
        w.write("{")
        w.write('"stuId": ' + id + ", ")

        if score_dict.has_key(id) and final_rank.has_key(id):
            w.write('"rank_in_faculty":' + str(final_rank[id]) + "," + '"rank_score_consume":' + str(
                final_rank[id] * score_dict[id]) + "} \n")
        else:
            w.write(
                '"rank_in_faculty":' + str(final_rank.get(id, -999)) + "," + '"rank_score_consume":' + str(
                    -999) + "} \n")

    w.close()
wilber.py 文件源码 项目:bat 作者: braunfuss 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def extract_stations(self, page):
        r = re.compile(r'station\(([^)]+)\)')
        stations = []
        for str in r.findall(page):
            toks = str.replace("'",'').replace(',,',',').split(',')
            if toks[0] == 'name': continue
            st = Station( station=toks[0],
                          network=toks[1],
                          dist=float(toks[2]), 
                          azimuth=float(toks[3]), 
                          channels=toks[4:-1],
                          snr=float(toks[-1]) )

            stations.append(st)

        return stations
atool.py 文件源码 项目:bat 作者: braunfuss 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def readEvent(EventPath):

    Origin = {}
    logger.info('\033[31m Parsing EventFile \033[0m \n')
    try:
        for i in os.listdir(EventPath):
            if fnmatch.fnmatch(i, '*.origin'):
                evfile = os.path.join(EventPath,i)
        fobj = open(evfile, 'r')

        for line in fobj:
            line = str.split(line, '=')
            key  = (line[0].replace('\'', '')).strip()
            value = line[1].replace('\n','').strip()

            Origin[key] = value
    except:
        logger.info('\033[31m File Error Exception \033[0m \n') 

    return Origin
atool.py 文件源码 项目:bat 作者: braunfuss 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def readConfig(EventPath):

    Config = {}
    logger.info('\033[31m Parsing ConfigFile \033[0m \n')
    try:
        for i in os.listdir(EventPath):
            if fnmatch.fnmatch(i, '*.config'):
                evfile = os.path.join(EventPath,i)
        fobj = open(evfile, 'r')

        for line in fobj:
            if line[0] != '#' and len(line) > 1:
                line = str.split(line, '=')
                key  = (line[0].replace('\'', '')).strip()
                value = line[1].replace('\n','').strip()

                Config[key] = value

    except:
        logger.info('\033[31m File Error Exception \033[0m \n')

    return Config
http.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def parse_qs(qs, keep_blank_values=0, strict_parsing=0):
    """
    Like C{cgi.parse_qs}, but with support for parsing byte strings on Python 3.

    @type qs: C{bytes}
    """
    d = {}
    items = [s2 for s1 in qs.split(b"&") for s2 in s1.split(b";")]
    for item in items:
        try:
            k, v = item.split(b"=", 1)
        except ValueError:
            if strict_parsing:
                raise
            continue
        if v or keep_blank_values:
            k = unquote(k.replace(b"+", b" "))
            v = unquote(v.replace(b"+", b" "))
            if k in d:
                d[k].append(v)
            else:
                d[k] = [v]
    return d
http.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def fromChunk(data):
    """
    Convert chunk to string.

    @type data: C{bytes}

    @return: tuple of (result, remaining) - both C{bytes}.

    @raise ValueError: If the given data is not a correctly formatted chunked
        byte string.
    """
    prefix, rest = data.split(b'\r\n', 1)
    length = int(prefix, 16)
    if length < 0:
        raise ValueError("Chunk length must be >= 0, not %d" % (length,))
    if rest[length:length + 2] != b'\r\n':
        raise ValueError("chunk must end with CRLF")
    return rest[:length], rest[length + 2:]
http.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 42 收藏 0 点赞 0 评论 0
def parseContentRange(header):
    """
    Parse a content-range header into (start, end, realLength).

    realLength might be None if real length is not known ('*').
    """
    kind, other = header.strip().split()
    if kind.lower() != "bytes":
        raise ValueError("a range of type %r is not supported")
    startend, realLength = other.split("/")
    start, end = map(int, startend.split("-"))
    if realLength == "*":
        realLength = None
    else:
        realLength = int(realLength)
    return (start, end, realLength)
http.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def parseCookies(self):
        """
        Parse cookie headers.

        This method is not intended for users.
        """
        cookieheaders = self.requestHeaders.getRawHeaders(b"cookie")

        if cookieheaders is None:
            return

        for cookietxt in cookieheaders:
            if cookietxt:
                for cook in cookietxt.split(b';'):
                    cook = cook.lstrip()
                    try:
                        k, v = cook.split(b'=', 1)
                        self.received_cookies[k] = v
                    except ValueError:
                        pass
http.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def getRequestHostname(self):
        """
        Get the hostname that the user passed in to the request.

        This will either use the Host: header (if it is available) or the
        host we are listening on if the header is unavailable.

        @returns: the requested hostname
        @rtype: C{bytes}
        """
        # XXX This method probably has no unit tests.  I changed it a ton and
        # nothing failed.
        host = self.getHeader(b'host')
        if host:
            return host.split(b':', 1)[0]
        return networkString(self.getHost().host)
http.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 43 收藏 0 点赞 0 评论 0
def _authorize(self):
        # Authorization, (mostly) per the RFC
        try:
            authh = self.getHeader(b"Authorization")
            if not authh:
                self.user = self.password = ''
                return
            bas, upw = authh.split()
            if bas.lower() != b"basic":
                raise ValueError()
            upw = base64.decodestring(upw)
            self.user, self.password = upw.split(b':', 1)
        except (binascii.Error, ValueError):
            self.user = self.password = ""
        except:
            log.err()
            self.user = self.password = ""
iptvsubtitles.py 文件源码 项目:crossplatform_iptvplayer 作者: j00zek 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def _srtToAtoms(self, srtText):
        subAtoms = []
        srtText = srtText.replace('\r\n', '\n').split('\n\n')

        line = 0
        for idx in range(len(srtText)):
            line += 1
            st = srtText[idx].split('\n')
            #printDBG("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++")
            #printDBG(st)
            #printDBG("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++")
            if len(st)>=2:
                try:
                    try:
                        tmp = int(st[0].strip())
                        i = 1
                    except Exception:
                        if '' == st[0]: i = 1
                        else: i = 0
                    if len(st)<(i+2): continue
                    split = st[i].split(' --> ')
                    subAtoms.append( { 'start':self._srtTc2ms(split[0].strip()), 'end':self._srtTc2ms(split[1].strip()), 'text':self._srtClearText('\n'.join(j for j in st[i+1:len(st)])) } )
                except Exception:
                    printExc("Line number [%d]" % line)
        return subAtoms
test_netmiko_grep.py 文件源码 项目:netmiko_tools 作者: ktbyers 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def test_use_cache():
    """With cached files this should come back in under a second"""
    # Generate cached files
    cmd_list = [NETMIKO_GREP] + ['interface', 'all']
    subprocess_handler(cmd_list)
    cmd_list = [NETMIKO_GREP] + ['--use-cache', '--display-runtime', 'interface', 'all']
    (output, std_err) = subprocess_handler(cmd_list)
    match = re.search(r"Total time: (0:.*)", output)
    time = match.group(1)
    _, _, seconds = time.split(":")
    seconds = float(seconds)
    assert seconds <= 1
    assert 'pynet_rtr1.txt:interface FastEthernet0' in output
test_netmiko_grep.py 文件源码 项目:netmiko_tools 作者: ktbyers 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_display_failed():
    """Verify failed devices are showing"""
    cmd_list = [NETMIKO_GREP] + ['interface', 'all']
    (output, std_err) = subprocess_handler(cmd_list)
    assert "Failed devices" in output
    failed_devices = output.split("Failed devices:")[1]
    failed_devices = failed_devices.strip().split("\n")
    failed_devices = [x.strip() for x in failed_devices]
    assert len(failed_devices) == 2
    assert "bad_device" in failed_devices
    assert "bad_port" in failed_devices
http.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def fromChunk(data):
    """Convert chunk to string.

    @returns: tuple (result, remaining), may raise ValueError.
    """
    prefix, rest = data.split('\r\n', 1)
    length = int(prefix, 16)
    if length < 0:
        raise ValueError("Chunk length must be >= 0, not %d" % (length,))
    if not rest[length:length + 2] == '\r\n':
        raise ValueError, "chunk must end with CRLF"
    return rest[:length], rest[length + 2:]
http.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def parseContentRange(header):
    """Parse a content-range header into (start, end, realLength).

    realLength might be None if real length is not known ('*').
    """
    kind, other = header.strip().split()
    if kind.lower() != "bytes":
        raise ValueError, "a range of type %r is not supported"
    startend, realLength = other.split("/")
    start, end = map(int, startend.split("-"))
    if realLength == "*":
        realLength = None
    else:
        realLength = int(realLength)
    return (start, end, realLength)
http.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def setLastModified(self, when):
        """Set the X{Last-Modified} time for the response to this request.

        If I am called more than once, I ignore attempts to set
        Last-Modified earlier, only replacing the Last-Modified time
        if it is to a later value.

        If I am a conditional request, I may modify my response code
        to L{NOT_MODIFIED} if appropriate for the time given.

        @param when: The last time the resource being returned was
            modified, in seconds since the epoch.
        @type when: number
        @return: If I am a X{If-Modified-Since} conditional request and
            the time given is not newer than the condition, I return
            L{http.CACHED<CACHED>} to indicate that you should write no
            body.  Otherwise, I return a false value.
        """
        # time.time() may be a float, but the HTTP-date strings are
        # only good for whole seconds.
        when = long(math.ceil(when))
        if (not self.lastModified) or (self.lastModified < when):
            self.lastModified = when

        modified_since = self.getHeader('if-modified-since')
        if modified_since:
            modified_since = stringToDatetime(modified_since.split(';', 1)[0])
            if modified_since >= when:
                self.setResponseCode(NOT_MODIFIED)
                return CACHED
        return None
http.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def setETag(self, etag):
        """Set an X{entity tag} for the outgoing response.

        That's \"entity tag\" as in the HTTP/1.1 X{ETag} header, \"used
        for comparing two or more entities from the same requested
        resource.\"

        If I am a conditional request, I may modify my response code
        to L{NOT_MODIFIED} or L{PRECONDITION_FAILED}, if appropriate
        for the tag given.

        @param etag: The entity tag for the resource being returned.
        @type etag: string
        @return: If I am a X{If-None-Match} conditional request and
            the tag matches one in the request, I return
            L{http.CACHED<CACHED>} to indicate that you should write
            no body.  Otherwise, I return a false value.
        """
        if etag:
            self.etag = etag

        tags = self.getHeader("if-none-match")
        if tags:
            tags = tags.split()
            if (etag in tags) or ('*' in tags):
                self.setResponseCode(((self.method in ("HEAD", "GET"))
                                      and NOT_MODIFIED)
                                     or PRECONDITION_FAILED)
                return CACHED
        return None
http.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def getRequestHostname(self):
        """Get the hostname that the user passed in to the request.

        This will either use the Host: header (if it is available) or the
        host we are listening on if the header is unavailable.
        """
        return (self.getHeader('host') or
                socket.gethostbyaddr(self.getHost()[1])[0]
                ).split(':')[0]
http.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def headerReceived(self, line):
        """Do pre-processing (for content-length) and store this header away.
        """
        header, data = line.split(':', 1)
        header = header.lower()
        data = data.strip()
        if header == 'content-length':
            self.length = int(data)
        reqHeaders = self.requests[-1].received_headers
        reqHeaders[header] = data
        if len(reqHeaders) > self.maxHeaders:
            self.transport.write("HTTP/1.1 400 Bad Request\r\n\r\n")
            self.transport.loseConnection()
http.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def checkPersistence(self, request, version):
        """Check if the channel should close or not."""
        connection = request.getHeader('connection')
        if connection:
            tokens = map(str.lower, connection.split(' '))
        else:
            tokens = []

        # HTTP 1.0 persistent connection support is currently disabled,
        # since we need a way to disable pipelining. HTTP 1.0 can't do
        # pipelining since we can't know in advance if we'll have a
        # content-length header, if we don't have the header we need to close the
        # connection. In HTTP 1.1 this is not an issue since we use chunked
        # encoding if content-length is not available.

        #if version == "HTTP/1.0":
        #    if 'keep-alive' in tokens:
        #        request.setHeader('connection', 'Keep-Alive')
        #        return 1
        #    else:
        #        return 0
        if version == "HTTP/1.1":
            if 'close' in tokens:
                request.setHeader('connection', 'close')
                return 0
            else:
                return 1
        else:
            return 0
stockInfo.py 文件源码 项目:stockUtils 作者: caiyue 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def getJsonObj(obj):
    if not obj:return None
    if hasHTML(obj):return None
    #"var moJuuzHq="{"Results":["2,300672,???,?","2,300676,????,?","1,603612,????,?","1,603707,????,?","2,002888,????,?","2,300678,????,?","2,002889,????,?","1,603860,????,?","2,300685,????,?","2,300687,????,?","1,603880,????,?","2,300689,????,?","1,603602,????,?","2,300688,????,?","1,603721,????,?","2,300691,????,?","1,601326,????,?","1,603776,???,?","2,002892,???,?","1,603129,????,?","1,603557,????,?"],"AllCount":"21","PageCount":"1","AtPage":"1","PageSize":"40","ErrMsg":"","UpdateTime":"2017/8/19 13:37:03","TimeOut":"3ms"}"
    # newobj = obj.split('=')[1]  #//???????= ??
    # return  simplejson.loads(newobj)
    newobj = "{" + obj.split('={')[1]
    return simplejson.loads(newobj)


问题


面经


文章

微信
公众号

扫码关注公众号