def oauth_request(self, url, method='GET', args={}, headers={}):
base_args = {
'oauth_consumer_key': self.oauth_consumer['key'],
'oauth_signature_method': 'HMAC-SHA1',
'oauth_timestamp': oauth_timestamp(),
'oauth_nonce': oauth_nonce(),
'oauth_version': '1.0'
}
args = args.copy()
headers = headers.copy()
headers['User-Agent'] = headers.get('User-Agent', 'fanfou-py')
if url.startswith('/'):
if ':' in url:
path, _ = url.split(':')
if args.get('id'):
url = path + oauth_escape(args['id'], via='quote_plus', safe='')
else:
url = path[:-1]
url = self.base_api_url % url
if method == 'POST':
headers['Content-Type'] = headers.get('Content-Type', self.form_urlencoded)
(headers['Content-Type'] == self.form_urlencoded) and base_args.update(args)
else:
base_args.update(args)
url = url + '?' + oauth_query(args, via='quote_plus', safe='')
self.oauth_token and base_args.update({'oauth_token': self.oauth_token['key']})
base_args['oauth_signature'] = self.oauth_signature(url, method, base_args)
headers.update(self.oauth_header(base_args))
req = request.Request(url, headers=headers)
if headers.get('Content-Type') == self.form_urlencoded:
data = oauth_query(args, via='quote_plus', safe='').encode()
elif 'form-data' in headers.get('Content-Type', ''): # multipart/form-data
data = args['form-data']
else:
data = None
resp = request.urlopen(req, data=data)
resp.json = lambda: json.loads(resp.read().decode() or '""')
return resp
评论列表
文章目录