python类parse_qs()的实例源码

chrome_test_server_spawner.py 文件源码 项目:nojs 作者: chrisdickinson 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def do_GET(self):
    parsed_path = urlparse.urlparse(self.path)
    action = parsed_path.path
    params = urlparse.parse_qs(parsed_path.query, keep_blank_values=1)
    logging.info('Action for GET method is: %s.', action)
    for param in params:
      logging.info('%s=%s', param, params[param][0])
    if action == '/kill':
      self._KillTestServer(params)
    elif action == '/ping':
      # The ping handler is used to check whether the spawner server is ready
      # to serve the requests. We don't need to test the status of the test
      # server when handling ping request.
      self._SendResponse(200, 'OK', {}, 'ready')
      logging.info('Handled ping request and sent response.')
    else:
      self._SendResponse(400, 'Unknown request', {}, '')
      logging.info('Encounter unknown request: %s.', action)
rrid.py 文件源码 项目:scibot 作者: SciCrunch 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def process_POST_request(request):
    dict_ = urlparse.parse_qs(request.text)
    def htmlify(thing):
        try:
            html = dict_[thing][0]
        except KeyError as e:
            html = ''
        return '<html>' + html + '</html>'
    uri = dict_['uri'][0]
    head = htmlify('head')
    body = htmlify('body')
    try:
        text = dict_['data'][0]
    except KeyError as e:
        text = ''

    headsoup = BeautifulSoup(head, 'lxml')
    bodysoup = BeautifulSoup(body, 'lxml')

    target_uri = getUri(uri, headsoup, bodysoup)
    doi = getDoi(headsoup, bodysoup)
    return target_uri, doi, head, body, text
tunemovie.py 文件源码 项目:plugin.video.exodus 作者: lastship 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def episode(self, url, imdb, tvdb, title, premiered, season, episode):
        try:
            data = urlparse.parse_qs(url)
            data = dict([(i, data[i][0]) if data[i] else (i, '') for i in data])

            query = urlparse.urljoin(self.base_link, self.search_link)
            query = query % urllib.quote_plus(data['tvshowtitle'])

            t = cleantitle.get(data['tvshowtitle'])

            r = client.request(query)

            r = client.parseDOM(r, 'div', attrs = {'class': 'thumb'})
            r = [(client.parseDOM(i, 'a', ret='href'), client.parseDOM(i, 'a', ret='title'), re.findall('(\d{4})', i)) for i in r]
            r = [(i[0][0], i[1][0], i[2][0]) for i in r if len(i[0]) > 0 and len(i[1]) > 0 and len(i[2]) > 0]

            url = [i[0] for i in r if t in cleantitle.get(i[1]) and ('Season %s' % season) in i[1]][0]
            url += '?episode=%01d' % int(episode)

            return url
        except:
            return
pubfilm.py 文件源码 项目:plugin.video.exodus 作者: lastship 项目源码 文件源码 阅读 142 收藏 0 点赞 0 评论 0
def episode(self, url, imdb, tvdb, title, premiered, season, episode):
        try:
            data = urlparse.parse_qs(url)
            data = dict([(i, data[i][0]) if data[i] else (i, '') for i in data])

            year = re.findall('(\d{4})', premiered)[0]
            season = '%01d' % int(season) ; episode = '%01d' % int(episode)
            tvshowtitle = '%s %s: Season %s' % (data['tvshowtitle'], year, season)

            url = cache.get(self.pidtv_tvcache, 120, tvshowtitle)

            if url == None: raise Exception()

            url += '?episode=%01d' % int(episode)
            url = url.encode('utf-8')
            return url
        except:
            return
bobby.py 文件源码 项目:plugin.video.exodus 作者: lastship 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def episode(self, url, imdb, tvdb, title, premiered, season, episode):
        try:
            data = urlparse.parse_qs(url)
            data = dict([(i, data[i][0]) if data[i] else (i, '') for i in data])
            headers = eval(data['headers'])
            aliases = eval(data['aliases'])
            title = data['tvshowtitle'] if 'tvshowtitle' in data else data['title']
            title = cleantitle.getsearch(title)
            query = self.search_link % (urllib.quote_plus(title))
            query = urlparse.urljoin(self.base_link, query)
            r = client.request(query, headers=headers, timeout='30', mobile=True)
            match = re.compile('alias=(.+?)\'">(.+?)</a>').findall(r)
            r = [(i[0], re.findall('(.+?)\s+-\s+Season\s+(\d+)', i[1])) for i in match]
            r = [(i[0], i[1][0][0], i[1][0][1]) for i in r if len(i[1]) > 0]
            r = [i[0] for i in r if self.matchAlias(i[1], aliases) and int(season) == int(i[2])][0]
            url = {'type': 'tvshow', 'id': r, 'episode': episode, 'season': season, 'headers': headers}
            url = urllib.urlencode(url)
            return url
        except:
            return
movies14.py 文件源码 项目:plugin.video.exodus 作者: lastship 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def episode(self, url, imdb, tvdb, title, premiered, season, episode):
        try:
            data = urlparse.parse_qs(url)
            data = dict([(i, data[i][0]) if data[i] else (i, '') for i in data])

            year = re.findall('(\d{4})', premiered)[0]
            if int(year) >= 2016: raise Exception()

            url = re.sub('[^A-Za-z0-9]', '-', data['tvshowtitle']).lower()
            url = self.tvsearch_link % (url, data['year'], '%01d' % int(season), '%01d' % int(episode))

            r = urlparse.urljoin(self.base_link, url)
            r = client.request(r, output='geturl')
            if not data['year'] in r: raise Exception()

            return url
        except:
            return
directdl.py 文件源码 项目:plugin.video.exodus 作者: lastship 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def resolve(self, url):
        try:
            b = urlparse.urlparse(url).netloc
            b = re.compile('([\w]+[.][\w]+)$').findall(b)[0]

            if not b in base64.b64decode(self.b_link): return url

            u, p, h = url.split('|')
            r = urlparse.parse_qs(h)['Referer'][0]
            #u += '&app_id=Exodus'

            c = self.request(r, output='cookie', close=False)
            result = self.request(u, post=p, referer=r, cookie=c)

            url = result.split('url=')
            url = [urllib.unquote_plus(i.strip()) for i in url]
            url = [i for i in url if i.startswith('http')]
            url = url[-1]

            return url
        except:
            return
dramafire.py 文件源码 项目:plugin.video.exodus 作者: lastship 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def episode(self, url, imdb, tvdb, title, premiered, season, episode):
        try:
            if not url:
                return

            data = urlparse.parse_qs(url)
            data = dict([(i, data[i][0]) if data[i] else (i, '') for i in data])
            tvshowtitle = data['tvshowtitle']
            localtvshowtitle = data['localtvshowtitle']
            aliases = source_utils.aliases_to_array(eval(data['aliases']))
            year = data['year']

            episode = tvmaze.tvMaze().episodeAbsoluteNumber(tvdb, int(season), int(episode))

            url = self.__search([localtvshowtitle] + aliases, year, episode)
            if not url and tvshowtitle != localtvshowtitle:
                url = self.__search([tvshowtitle] + aliases, year, episode)
            return url
        except:
            return
meinkino.py 文件源码 项目:plugin.video.exodus 作者: lastship 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def episode(self, url, imdb, tvdb, title, premiered, season, episode):
        try:
            if not url:
                return

            data = urlparse.parse_qs(url)
            data = dict([(i, data[i][0]) if data[i] else (i, '') for i in data])
            data.update({'year': re.findall('(\d{4})', premiered)[0]})
            tvshowtitle = data['tvshowtitle']
            localtvshowtitle = data['localtvshowtitle']
            aliases = source_utils.aliases_to_array(eval(data['aliases']))

            url = self.__search([localtvshowtitle] + aliases, 'tv', data['year'], season, episode)
            if not url and tvshowtitle != localtvshowtitle: url = self.__search([tvshowtitle] + aliases, 'tv', data['year'], season, episode)
            return url
        except:
            return
kinox.py 文件源码 项目:plugin.video.exodus 作者: lastship 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def resolve(self, url):
        try:
            url = urlparse.urljoin(self.base_link, url)

            r = client.request(url, referer=self.base_link)
            r = json.loads(r)['Stream']
            r = [(dom_parser.parse_dom(r, 'a', req='href'), dom_parser.parse_dom(r, 'iframe', req='src'))]
            r = [i[0][0].attrs['href'] if i[0] else i[1][0].attrs['src'] for i in r if i[0] or i[1]][0]

            if not r.startswith('http'):
                r = urlparse.parse_qs(r)
                r = [r[i][0] if r[i] and r[i][0].startswith('http') else (i, '') for i in r][0]

            return r
        except:
            return
filmpalast.py 文件源码 项目:plugin.video.exodus 作者: lastship 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def episode(self, url, imdb, tvdb, title, premiered, season, episode):
        try:
            if not url:
                return

            data = urlparse.parse_qs(url)
            data = dict([(i, data[i][0]) if data[i] else (i, '') for i in data])
            title = data['localtvshowtitle']
            title += ' S%02dE%02d' % (int(season), int(episode))
            aliases = source_utils.aliases_to_array(eval(data['aliases']))
            aliases = [i + ' S%02dE%02d' % (int(season), int(episode)) for i in aliases]

            url = self.__search([title] + aliases)
            if not url and data['tvshowtitle'] != data['localtvshowtitle']:
                title = data['tvshowtitle']
                title += ' S%02dE%02d' % (int(season), int(episode))
                url = self.__search([title] + aliases)
            return url
        except:
            return
tata.py 文件源码 项目:plugin.video.exodus 作者: lastship 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def episode(self, url, imdb, tvdb, title, premiered, season, episode):
        try:
            if not url:
                return

            data = urlparse.parse_qs(url)
            data = dict([(i, data[i][0]) if data[i] else (i, '') for i in data])
            tvshowtitle = data['tvshowtitle']
            localtvshowtitle = data['localtvshowtitle']
            aliases = source_utils.aliases_to_array(eval(data['aliases']))

            year = re.findall('(\d{4})', premiered)
            year = year[0] if year else data['year']

            url = self.__search([localtvshowtitle] + aliases, year, season, episode)
            if not url and tvshowtitle != localtvshowtitle:
                url = self.__search([tvshowtitle] + aliases, year, season, episode)
            return url
        except:
            return
main.py 文件源码 项目:quartz-browser 作者: ksharindam 项目源码 文件源码 阅读 94 收藏 0 点赞 0 评论 0
def downloadVideo(self):
        url = unicode(self.tabWidget.currentWidget().url().toString())
        # For youtube videos
        if validYoutubeUrl(url):
            vid_id = parse_qs(urlparse(url).query)['v'][0]
            url = 'https://m.youtube.com/watch?v=' + vid_id
            yt = YouTube(url)    # Use PyTube module for restricted videos
            videos = yt.get_videos()
            dialog = youtube_dialog.YoutubeDialog(videos, self)
            if dialog.exec_() == 1 :
                index = abs(dialog.buttonGroup.checkedId())-2
                vid = videos[index]
                reply = networkmanager.get( QNetworkRequest(QUrl.fromUserInput(vid.url)) )
                self.handleUnsupportedContent(reply, vid.filename + '.' + vid.extension)
            return
        # For embeded HTML5 videos
        request = QNetworkRequest(self.video_URL)
        request.setRawHeader('Referer', self.video_page_url)
        reply = networkmanager.get(request)
        self.handleUnsupportedContent(reply)
protocol_alt.py 文件源码 项目:android3dblendermouse 作者: sketchpunk 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def serial_class_for_url(url):
    """extract host and port from an URL string"""
    parts = urlparse.urlsplit(url)
    if parts.scheme != 'alt':
        raise serial.SerialException('expected a string in the form "alt://port[?option[=value][&option[=value]]]": not starting with alt:// (%r)' % (parts.scheme,))
    class_name = 'Serial'
    try:
        for option, values in urlparse.parse_qs(parts.query, True).items():
            if option == 'class':
                class_name = values[0]
            else:
                raise ValueError('unknown option: %r' % (option,))
    except ValueError as e:
        raise serial.SerialException('expected a string in the form "alt://port[?option[=value][&option[=value]]]": %s' % e)
    return (''.join([parts.netloc, parts.path]), getattr(serial, class_name))

# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
protocol_socket.py 文件源码 项目:android3dblendermouse 作者: sketchpunk 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def from_url(self, url):
        """extract host and port from an URL string"""
        parts = urlparse.urlsplit(url)
        if parts.scheme != "socket":
            raise SerialException('expected a string in the form "socket://<host>:<port>[?logging={debug|info|warning|error}]": not starting with socket:// (%r)' % (parts.scheme,))
        try:
            # process options now, directly altering self
            for option, values in urlparse.parse_qs(parts.query, True).items():
                if option == 'logging':
                    logging.basicConfig()   # XXX is that good to call it here?
                    self.logger = logging.getLogger('pySerial.socket')
                    self.logger.setLevel(LOGGER_LEVELS[values[0]])
                    self.logger.debug('enabled logging')
                else:
                    raise ValueError('unknown option: %r' % (option,))
            # get host and port
            host, port = parts.hostname, parts.port
            if not 0 <= port < 65536:
                raise ValueError("port not in range 0...65535")
        except ValueError as e:
            raise SerialException('expected a string in the form "socket://<host>:<port>[?logging={debug|info|warning|error}]": %s' % e)
        return (host, port)

    #  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -
protocol_loop.py 文件源码 项目:android3dblendermouse 作者: sketchpunk 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def from_url(self, url):
        """extract host and port from an URL string"""
        parts = urlparse.urlsplit(url)
        if parts.scheme != "loop":
            raise SerialException('expected a string in the form "loop://[?logging={debug|info|warning|error}]": not starting with loop:// (%r)' % (parts.scheme,))
        try:
            # process options now, directly altering self
            for option, values in urlparse.parse_qs(parts.query, True).items():
                if option == 'logging':
                    logging.basicConfig()   # XXX is that good to call it here?
                    self.logger = logging.getLogger('pySerial.loop')
                    self.logger.setLevel(LOGGER_LEVELS[values[0]])
                    self.logger.debug('enabled logging')
                else:
                    raise ValueError('unknown option: %r' % (option,))
        except ValueError as e:
            raise SerialException('expected a string in the form "loop://[?logging={debug|info|warning|error}]": %s' % e)

    #  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -
protocol_spy.py 文件源码 项目:android3dblendermouse 作者: sketchpunk 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def from_url(self, url):
        """extract host and port from an URL string"""
        parts = urlparse.urlsplit(url)
        if parts.scheme != 'spy':
            raise serial.SerialException('expected a string in the form "spy://port[?option[=value][&option[=value]]]": not starting with spy:// (%r)' % (parts.scheme,))
        # process options now, directly altering self
        formatter = FormatHexdump
        color = False
        output = sys.stderr
        try:
            for option, values in urlparse.parse_qs(parts.query, True).items():
                if option == 'file':
                    output = open(values[0], 'w')
                elif option == 'color':
                    color = True
                elif option == 'raw':
                    formatter = FormatRaw
                elif option == 'all':
                    self.show_all = True
                else:
                    raise ValueError('unknown option: %r' % (option,))
        except ValueError as e:
            raise serial.SerialException('expected a string in the form "spy://port[?option[=value][&option[=value]]]": %s' % e)
        self.formatter = formatter(output, color)
        return ''.join([parts.netloc, parts.path])
url.py 文件源码 项目:purelove 作者: hucmosin 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def query_params(self, value=None):
        """
        Return or set a dictionary of query params

        :param dict value: new dictionary of values
        """
        if value is not None:
            return URL._mutate(self, query=unicode_urlencode(value, doseq=True))
        query = '' if self._tuple.query is None else self._tuple.query

        # In Python 2.6, urlparse needs a bytestring so we encode and then
        # decode the result.
        if not six.PY3:
            result = parse_qs(to_utf8(query), True)
            return dict_to_unicode(result)

        return parse_qs(query, True)
test_facebook.py 文件源码 项目:PythonStudyCode 作者: TongTongX 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_auth_url(self):
        perms = ['email', 'birthday']
        redirect_url = 'https://localhost/facebook/callback/'

        expected_url = 'https://www.facebook.com/dialog/oauth?' + urlencode(
            dict(client_id=self.app_id,
                 redirect_uri=redirect_url,
                 scope=','.join(perms)))
        actual_url = facebook.auth_url(self.app_id, redirect_url, perms=perms)

        # Since the order of the query string parameters might be
        # different in each URL, we cannot just compare them to each
        # other.
        expected_url_result = urlparse(expected_url)
        actual_url_result = urlparse(actual_url)
        expected_query = parse_qs(expected_url_result.query)
        actual_query = parse_qs(actual_url_result.query)

        self.assertEqual(actual_url_result.scheme, expected_url_result.scheme)
        self.assertEqual(actual_url_result.netloc, expected_url_result.netloc)
        self.assertEqual(actual_url_result.path, expected_url_result.path)
        self.assertEqual(actual_url_result.params, expected_url_result.params)
        self.assertEqual(actual_query, expected_query)
server.py 文件源码 项目:OpenBAC 作者: PrometheanInfoSec 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def parse(self, path):
        parsed = parse_qs(path[2:])
        if "au" in parsed.keys():
            for i in parsed:
                parsed[i] = " ".join(parsed[i])
            print parsed
            return self.process(parsed)
        if "mk" in parsed.keys():
            for i in parsed:
                parsed[i] = " ".join(parsed[i])
            print parsed
            return self.gen(parsed)
        if "up" in parsed.keys() and "passwd" in parsed.keys():
            for i in parsed:
                parsed[i] = " ".join(parsed[i])
            print parsed
            return self.unpack(parsed)
        return "Not able to parse input"
protocol_alt.py 文件源码 项目:microperi 作者: c0d3st0rm 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def serial_class_for_url(url):
    """extract host and port from an URL string"""
    parts = urlparse.urlsplit(url)
    if parts.scheme != 'alt':
        raise serial.SerialException('expected a string in the form "alt://port[?option[=value][&option[=value]]]": not starting with alt:// (%r)' % (parts.scheme,))
    class_name = 'Serial'
    try:
        for option, values in urlparse.parse_qs(parts.query, True).items():
            if option == 'class':
                class_name = values[0]
            else:
                raise ValueError('unknown option: %r' % (option,))
    except ValueError as e:
        raise serial.SerialException('expected a string in the form "alt://port[?option[=value][&option[=value]]]": %s' % e)
    return (''.join([parts.netloc, parts.path]), getattr(serial, class_name))

# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
protocol_socket.py 文件源码 项目:microperi 作者: c0d3st0rm 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def from_url(self, url):
        """extract host and port from an URL string"""
        parts = urlparse.urlsplit(url)
        if parts.scheme != "socket":
            raise SerialException('expected a string in the form "socket://<host>:<port>[?logging={debug|info|warning|error}]": not starting with socket:// (%r)' % (parts.scheme,))
        try:
            # process options now, directly altering self
            for option, values in urlparse.parse_qs(parts.query, True).items():
                if option == 'logging':
                    logging.basicConfig()   # XXX is that good to call it here?
                    self.logger = logging.getLogger('pySerial.socket')
                    self.logger.setLevel(LOGGER_LEVELS[values[0]])
                    self.logger.debug('enabled logging')
                else:
                    raise ValueError('unknown option: %r' % (option,))
            # get host and port
            host, port = parts.hostname, parts.port
            if not 0 <= port < 65536:
                raise ValueError("port not in range 0...65535")
        except ValueError as e:
            raise SerialException('expected a string in the form "socket://<host>:<port>[?logging={debug|info|warning|error}]": %s' % e)
        return (host, port)

    #  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -
protocol_loop.py 文件源码 项目:microperi 作者: c0d3st0rm 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def from_url(self, url):
        """extract host and port from an URL string"""
        parts = urlparse.urlsplit(url)
        if parts.scheme != "loop":
            raise SerialException('expected a string in the form "loop://[?logging={debug|info|warning|error}]": not starting with loop:// (%r)' % (parts.scheme,))
        try:
            # process options now, directly altering self
            for option, values in urlparse.parse_qs(parts.query, True).items():
                if option == 'logging':
                    logging.basicConfig()   # XXX is that good to call it here?
                    self.logger = logging.getLogger('pySerial.loop')
                    self.logger.setLevel(LOGGER_LEVELS[values[0]])
                    self.logger.debug('enabled logging')
                else:
                    raise ValueError('unknown option: %r' % (option,))
        except ValueError as e:
            raise SerialException('expected a string in the form "loop://[?logging={debug|info|warning|error}]": %s' % e)

    #  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -
protocol_spy.py 文件源码 项目:microperi 作者: c0d3st0rm 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def from_url(self, url):
        """extract host and port from an URL string"""
        parts = urlparse.urlsplit(url)
        if parts.scheme != 'spy':
            raise serial.SerialException('expected a string in the form "spy://port[?option[=value][&option[=value]]]": not starting with spy:// (%r)' % (parts.scheme,))
        # process options now, directly altering self
        formatter = FormatHexdump
        color = False
        output = sys.stderr
        try:
            for option, values in urlparse.parse_qs(parts.query, True).items():
                if option == 'file':
                    output = open(values[0], 'w')
                elif option == 'color':
                    color = True
                elif option == 'raw':
                    formatter = FormatRaw
                elif option == 'all':
                    self.show_all = True
                else:
                    raise ValueError('unknown option: %r' % (option,))
        except ValueError as e:
            raise serial.SerialException('expected a string in the form "spy://port[?option[=value][&option[=value]]]": %s' % e)
        self.formatter = formatter(output, color)
        return ''.join([parts.netloc, parts.path])
test_customers.py 文件源码 项目:optimove 作者: nicolasramy 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_customers_by_action_callback(request):
    params = parse_qs(urlparse(request.url).query)
    if params['RecipientGroupID'][0] == '1' and params['ActionID'][0] == '2' and params['Date'][0] == '2015-06-24':
        if 'CustomerAttributes' in params and 'CustomerAttributesDelimiter' in params:
            if params['CustomerAttributes'][0] == 'Alias;Country' and params['CustomerAttributesDelimiter'][0] == ',':
                resp_body = [
                    {'CustomerID': '231342', 'CustomerAttribute': 'BuddyZZ,UK'},
                    {'CustomerID': '943157', 'CustomerAttribute': 'Pax65,DE'}
                ]

            else:
                return 404, HEADERS['text'], 'Not Found'

        else:
            resp_body = [
                {'CustomerID': '231342'},
                {'CustomerID': '943157'}
            ]

        return 200, HEADERS['json'], json.dumps(resp_body)

    else:
        return 404, HEADERS['text'], 'Not Found'
test_customers.py 文件源码 项目:optimove 作者: nicolasramy 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_customer_actions_by_target_group_callback(request):
    params = parse_qs(urlparse(request.url).query)
    if params['TargetGroupID'][0] == '2' and params['Date'][0] == '2015-12-24':
        if 'CustomerAttributes' in params and 'CustomerAttributesDelimiter' in params:
            if params['CustomerAttributes'][0] == 'Alias;Country' and params['CustomerAttributesDelimiter'][0] == ',':
                resp_body = [
                    {'CustomerID': 'A1342', 'ActionID': 49, 'ChannelID': 6, 'CustomerAttribute': 'BuddyZZ,UK'},
                    {'CustomerID': 'G4650', 'ActionID': 49, 'ChannelID': 6, 'CustomerAttribute': 'Mighty6,ES'}
                ]

            else:
                return 404, HEADERS['text'], 'Not Found'

        else:
            resp_body = [
                {'CustomerID': 'A1342', 'ActionID': 49, 'ChannelID': 6},
                {'CustomerID': 'G4650', 'ActionID': 49, 'ChannelID': 6}
            ]

        return 200, HEADERS['json'], json.dumps(resp_body)

    else:
        return 404, HEADERS['text'], 'Not Found'
test_customers.py 文件源码 项目:optimove 作者: nicolasramy 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_customer_one_time_actions_by_date_callback(request):
    params = parse_qs(urlparse(request.url).query)
    if params['Date'][0] == '2015-06-24':
        if 'CustomerAttributes' in params and 'CustomerAttributesDelimiter' in params:
            if params['CustomerAttributes'][0] == 'Alias;Country' and params['CustomerAttributesDelimiter'][0] == ',':
                resp_body = [
                    {'CustomerID': '8D871', 'ActionID': 19, 'ChannelID': 3, 'CustomerAttribute': 'Yo999,UA'},
                    {'CustomerID': '8U76T', 'ActionID': 19, 'ChannelID': 3, 'CustomerAttribute': 'Neto2,TR'}
                ]

            else:
                return 404, HEADERS['text'], 'Not Found'

        else:
            resp_body = [
                {'CustomerID': '8D871', 'ActionID': 19, 'ChannelID': 3},
                {'CustomerID': '8U76T', 'ActionID': 19, 'ChannelID': 3}
            ]

        return 200, HEADERS['json'], json.dumps(resp_body)

    else:
        return 404, HEADERS['text'], 'Not Found'
test_customers.py 文件源码 项目:optimove 作者: nicolasramy 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_target_group_changers_callback(request):
    params = parse_qs(urlparse(request.url).query)
    if params['StartDate'][0] == '2015-09-01' and params['EndDate'][0] == '2015-09-30':
        if 'CustomerAttributes' in params and 'CustomerAttributesDelimiter' in params:
            if params['CustomerAttributes'][0] == 'Alias;Country' and params['CustomerAttributesDelimiter'][0] == ',':
                resp_body = [
                    {'CustomerID': '231342', 'InitialTargetGroupID': 4, 'FinalTargetGroupID': 12,
                     'CustomerAttribute': 'BuddyZZ,UK'},
                    {'CustomerID': '931342', 'InitialTargetGroupID': -1, 'FinalTargetGroupID': 8,
                     'CustomerAttribute': 'Pax65,DE'}
                ]

            else:
                return 404, HEADERS['text'], 'Not Found'

        else:
            resp_body = [
                {'CustomerID': '231342', 'InitialTargetGroupID': 4, 'FinalTargetGroupID': 12},
                {'CustomerID': '931342', 'InitialTargetGroupID': -1, 'FinalTargetGroupID': 8}
            ]

        return 200, HEADERS['json'], json.dumps(resp_body)

    else:
        return 404, HEADERS['text'], 'Not Found'
test_customers.py 文件源码 项目:optimove 作者: nicolasramy 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_customer_send_details_by_campaign_callback(request):
    params = parse_qs(urlparse(request.url).query)
    if params['CampaignID'][0] == '65874':
        if 'IncludeTemplateIDs' in params:
            if params['IncludeTemplateIDs'][0] == 'True':
                resp_body = [
                    {'CustomerID': '231342', 'ChannelID': 4, 'ScheduledTime': '2015-12-30 10:30:00',
                     'SendID': 'HG65D', 'TemplateID': 12},
                    {'CustomerID': '917251', 'ChannelID': 4, 'ScheduledTime': '2015-12-30 11:45:00',
                     'SendID': 'HG65E', 'TemplateID': 7}
                ]
                return 200, HEADERS['json'], json.dumps(resp_body)

        resp_body = [
            {'CustomerID': '231342', 'ChannelID': 4, 'ScheduledTime': '2015-12-30 10:30:00', 'SendID': 'HG65D'},
            {'CustomerID': '917251', 'ChannelID': 4, 'ScheduledTime': '2015-12-30 11:45:00', 'SendID': 'HG65E'}
        ]
        return 200, HEADERS['json'], json.dumps(resp_body)

    else:
        return 404, HEADERS['text'], 'Not Found'
test_actions.py 文件源码 项目:optimove 作者: nicolasramy 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def get_executed_campaign_details_callback(request):
    params = parse_qs(urlparse(request.url).query)

    if params['Date'][0] == '2015-06-19':
        resp_body = [
            {'CampaignID': 221, 'TargetGroupID': 15, 'CampaignType': 'Test/Control', 'Duration': 7,
             'LeadTime': 3, 'Notes': '', 'IsMultiChannel': 'false', 'IsRecurrence': 'false',
             'Status': 'Successful', 'Error': ''},
            {'CampaignID': 81, 'TargetGroupID': 40, 'CampaignType': 'Test/Control', 'Duration': 10,
             'LeadTime': 0, 'Notes': '', 'IsMultiChannel': 'true', 'IsRecurrence': 'true',
             'Status': 'Failed', 'Error': 'ESP unavailable'}
        ]
        return 200, HEADERS['json'], json.dumps(resp_body)

    else:
        return 404, HEADERS['text'], 'Not Found'


问题


面经


文章

微信
公众号

扫码关注公众号