def get_microsegment_changers_with_attributes_callback(request):
params = parse_qs(urlparse(request.url).query)
if params['StartDate'][0] == '2016-01-01' and params['EndDate'][0] == '2016-01-31'\
and params['CustomerAttributes'][0] == 'Alias;Country'\
and params['CustomerAttributesDelimiter'][0] == ',':
resp_body = [
{'CustomerID': '231342', 'InitialMicrosegmentID': 4, 'FinalMicrosegmentID': 12,
'CustomerAttributes': 'BuddyZZ,UK'},
{'CustomerID': '231342', 'InitialMicrosegmentID': 3, 'FinalMicrosegmentID': 67,
'CustomerAttributes': 'Player99,US'}
]
return 200, HEADERS['json'], json.dumps(resp_body)
else:
return 404, HEADERS['text'], 'Not Found'
python类parse_qs()的实例源码
def __init__(self, URL, assignment_id='', worker_id='', participant_id=''):
logger.info("Creating bot with URL: %s." % URL)
self.URL = URL
parts = urlparse(URL)
query = parse_qs(parts.query)
if not assignment_id:
assignment_id = query.get('assignment_id', [''])[0]
if not participant_id:
participant_id = query.get('participant_id', [''])[0]
self.assignment_id = assignment_id
if not worker_id:
worker_id = query.get('worker_id', [''])[0]
self.participant_id = participant_id
self.worker_id = worker_id
self.unique_id = worker_id + ':' + assignment_id
def parse_search_page(self, response):
# handle current page
for item in self.parse_tweets_block(response.body):
yield item
# get next page
tmp = self.reScrollCursor.search(response.body)
if tmp:
query = urlparse.parse_qs(urlparse.urlparse(response.request.url).query)['q'][0]
scroll_cursor = tmp.group(1)
url = 'https://twitter.com/i/search/timeline?q=%s&' \
'include_available_features=1&include_entities=1&max_position=%s' % \
(urllib.quote_plus(query), scroll_cursor)
yield http.Request(url, callback=self.parse_more_page)
# TODO: # get refresh page
# tmp = self.reRefreshCursor.search(response.body)
# if tmp:
# query = urlparse.parse_qs(urlparse.urlparse(response.request.url).query)['q'][0]
# refresh_cursor=tmp.group(1)
def _parse_get(self, all_injectable = False):
params_dict_list = urlparse.parse_qs(urlparse.urlsplit(self.url).query)
for param, value_list in params_dict_list.items():
self.get_params[param] = value_list
if self.tag in param:
self.injs.append({
'field' : 'GET',
'part' : 'param',
'param': param
})
for idx, value in enumerate(value_list):
if self.tag in value or all_injectable:
self.injs.append({
'field' : 'GET',
'part': 'value',
'param': param,
'value' : value,
'idx' : idx
})
def extract_video_id(url):
""" Extract the video id from a url, return video id as str. """
idregx = re.compile(r'[\w-]{11}$')
url = str(url)
if idregx.match(url):
return url # ID of video
if '://' not in url:
url = '//' + url
parsedurl = urlparse(url)
if parsedurl.netloc in ('youtube.com', 'www.youtube.com', 'm.youtube.com', 'gaming.youtube.com'):
query = parse_qs(parsedurl.query)
if 'v' in query and idregx.match(query['v'][0]):
return query['v'][0]
elif parsedurl.netloc in ('youtu.be', 'www.youtu.be'):
vidid = parsedurl.path.split('/')[-1] if parsedurl.path else ''
if idregx.match(vidid):
return vidid
err = "Need 11 character video id or the URL of the video. Got %s"
raise ValueError(err % url)
def parseqs(data):
""" parse_qs, return unicode. """
if type(data) == uni:
return parse_qs(data)
elif pyver == 3:
data = data.decode("utf8")
data = parse_qs(data)
else:
data = parse_qs(data)
out = {}
for k, v in data.items():
k = k.decode("utf8")
out[k] = [x.decode("utf8") for x in v]
data = out
return data
def extract_playlist_id(playlist_url):
# Normal playlists start with PL, Mixes start with RD + first video ID,
# Liked videos start with LL, Uploads start with UU,
# Favorites lists start with FL
idregx = re.compile(r'((?:RD|PL|LL|UU|FL)[-_0-9a-zA-Z]+)$')
playlist_id = None
if idregx.match(playlist_url):
playlist_id = playlist_url # ID of video
if '://' not in playlist_url:
playlist_url = '//' + playlist_url
parsedurl = urlparse(playlist_url)
if parsedurl.netloc in ('youtube.com', 'www.youtube.com'):
query = parse_qs(parsedurl.query)
if 'list' in query and idregx.match(query['list'][0]):
playlist_id = query['list'][0]
return playlist_id
def do_POST(self):
# http://stackoverflow.com/questions/4233218/python-basehttprequesthandler-post-variables
ctype, pdict = cgi.parse_header(self.headers['content-type'])
if ctype == 'multipart/form-data':
postvars = cgi.parse_multipart(self.rfile, pdict)
elif ctype == 'application/x-www-form-urlencoded':
length = int(self.headers['content-length'])
postvars = cgi.parse_qs(self.rfile.read(length), keep_blank_values=1)
else:
postvars = {}
# print(postvars)
if 'Username' not in list(postvars.keys()) \
or 'Password' not in list(postvars.keys()):
log('E', 'vali.', 'No credentials.')
self.exit_on_error('No credentials.')
return
if not validate_id(postvars['Username'][0], postvars['Password'][0]):
log('E', 'vali.', 'Wrong credentials.')
self.exit_on_error('Wrong credentials.')
return
# print(postvars)
try:
dispatch(postvars)
self.write_response({'Status': 'OK'})
except:
log('E', 'hand.', 'Handler throws an exception.')
self.exit_on_error('Handler throws and exception.')
def content():
parsed = urlparse.parse_qs(app.current_request.raw_body)
return { 'states': parsed.get('states', []) }
def parse_qs(qs, keep_blank_values=0, strict_parsing=0):
"""Parse a query given as a string argument."""
warn("cgi.parse_qs is deprecated, use urlparse.parse_qs instead",
PendingDeprecationWarning, 2)
return urlparse.parse_qs(qs, keep_blank_values, strict_parsing)
def test_obtain_access_token(self, rmock):
rmock.post(requests_mock.ANY, text='{"access_token": "ANY_TOKEN"}')
cmock = Mock()
cmock.username = "ANY_USERNAME"
cmock.auth_host = "ANY_URL.example"
result = obtain_access_token(cmock, 'ANY_PASSWORD')
self.assertEqual('ANY_TOKEN', result)
received_post_data = parse_qs(rmock.request_history[0].text)
expected_post_data = {u'username': [u'ANY_USERNAME'],
u'password': [u'ANY_PASSWORD'],
u'client_id': [u'jumpauth'],
u'grant_type': [u'password']}
self.assertEqual(received_post_data, expected_post_data)
def parse(url):
try: url = client.replaceHTMLCodes(url)
except: pass
try: url = urlparse.parse_qs(urlparse.urlparse(url).query)['u'][0]
except: pass
try: url = urlparse.parse_qs(urlparse.urlparse(url).query)['q'][0]
except: pass
return url
def episode(self, url, imdb, tvdb, title, premiered, season, episode):
try:
data = urlparse.parse_qs(url)
data = dict([(i, data[i][0]) if data[i] else (i, '') for i in data])
show = data['url'].split('/')[4]
r = urlparse.urljoin(self.base_link, self.episode_link % (show, season, episode))
url = re.findall('(?://.+?|)(/.+)', r)[0]
url = client.replaceHTMLCodes(url)
url = url.encode('utf-8')
return url
except:
pass
def episode(self, url, imdb, tvdb, title, premiered, season, episode):
try:
if url == None: return
url = urlparse.parse_qs(url)
url = dict([(i, url[i][0]) if url[i] else (i, '') for i in url])
url['title'], url['premiered'], url['season'], url['episode'] = title, premiered, season, episode
url = urllib.urlencode(url)
return url
except:
return
def episode(self, url, imdb, tvdb, title, premiered, season, episode):
try:
if url == None: return
url = urlparse.parse_qs(url)
url = dict([(i, url[i][0]) if url[i] else (i, '') for i in url])
url['title'], url['premiered'], url['season'], url['episode'] = title, premiered, season, episode
url = urllib.urlencode(url)
return url
except:
return
def episode(self, url, imdb, tvdb, title, premiered, season, episode):
try:
if url == None: return
url = urlparse.parse_qs(url)
url = dict([(i, url[i][0]) if url[i] else (i, '') for i in url])
url['title'], url['premiered'], url['season'], url['episode'] = title, premiered, season, episode
url = urllib.urlencode(url)
return url
except:
return
def episode(self, url, imdb, tvdb, title, premiered, season, episode):
try:
if url == None: return
url = urlparse.parse_qs(url)
url = dict([(i, url[i][0]) if url[i] else (i, '') for i in url])
url['title'], url['premiered'], url['season'], url['episode'] = title, premiered, season, episode
url = urllib.urlencode(url)
return url
except:
return
def episode(self, url, imdb, tvdb, title, premiered, season, episode):
try:
data = urlparse.parse_qs(url)
data = dict([(i, data[i][0]) if data[i] else (i, '') for i in data])
return urllib.urlencode({'imdb': imdb, 'title': title, 'year': data['year'], 'season': season, 'episode': episode})
except:
return