def request(self, endpoint, post=None):
buffer = BytesIO()
ch = pycurl.Curl()
ch.setopt(pycurl.URL, Constants.API_URL + endpoint)
ch.setopt(pycurl.USERAGENT, self.userAgent)
ch.setopt(pycurl.WRITEFUNCTION, buffer.write)
ch.setopt(pycurl.FOLLOWLOCATION, True)
ch.setopt(pycurl.HEADER, True)
ch.setopt(pycurl.VERBOSE, False)
ch.setopt(pycurl.COOKIEFILE, os.path.join(self.IGDataPath, self.username, self.username + "-cookies.dat"))
ch.setopt(pycurl.COOKIEJAR, os.path.join(self.IGDataPath, self.username, self.username + "-cookies.dat"))
if post is not None:
ch.setopt(pycurl.POST, True)
ch.setopt(pycurl.POSTFIELDS, post)
if self.proxy:
ch.setopt(pycurl.PROXY, self.proxyHost)
if self.proxyAuth:
ch.setopt(pycurl.PROXYUSERPWD, self.proxyAuth)
ch.perform()
resp = buffer.getvalue()
header_len = ch.getinfo(pycurl.HEADER_SIZE)
header = resp[0: header_len]
body = resp[header_len:]
ch.close()
if self.debug:
print("REQUEST: " + endpoint)
if post is not None:
if not isinstance(post, list):
print("DATA: " + str(post))
print("RESPONSE: " + body)
return [header, json_decode(body)]
python类HEADER的实例源码
def setup_curl_for_post(c, p, data_buf, headers=None, share=None):
setup_curl_basic(c, p, data_buf, headers, share)
httpheader = p.get('httpheader', ['Accept: application/json', "Content-type: application/json"])
if httpheader:
# c.setopt(pycurl.HEADER, p.get('header', 1))
c.setopt(pycurl.HTTPHEADER, httpheader)
post301 = getattr(pycurl, 'POST301', None)
if post301 is not None:
# Added in libcurl 7.17.1.
c.setopt(post301, True)
c.setopt(pycurl.POST, 1)
postfields = p.get('postfields')
if postfields:
postfields = json.dumps(postfields, indent=2, ensure_ascii=False)
c.setopt(pycurl.POSTFIELDS, postfields)
return c
def request(self, endpoint, headers=None, post=None, first=True):
buffer = BytesIO()
ch = pycurl.Curl()
ch.setopt(pycurl.URL, endpoint)
ch.setopt(pycurl.USERAGENT, self.userAgent)
ch.setopt(pycurl.WRITEFUNCTION, buffer.write)
ch.setopt(pycurl.FOLLOWLOCATION, True)
ch.setopt(pycurl.HEADER, True)
if headers:
ch.setopt(pycurl.HTTPHEADER, headers)
ch.setopt(pycurl.VERBOSE, self.debug)
ch.setopt(pycurl.SSL_VERIFYPEER, False)
ch.setopt(pycurl.SSL_VERIFYHOST, False)
ch.setopt(pycurl.COOKIEFILE, self.settingsPath + self.username + '-cookies.dat')
ch.setopt(pycurl.COOKIEJAR, self.settingsPath + self.username + '-cookies.dat')
if post:
import urllib
ch.setopt(pycurl.POST, len(post))
ch.setopt(pycurl.POSTFIELDS, urllib.urlencode(post))
ch.perform()
resp = buffer.getvalue()
header_len = ch.getinfo(pycurl.HEADER_SIZE)
header = resp[0: header_len]
body = resp[header_len:]
ch.close()
if self.debug:
import urllib
print("REQUEST: " + endpoint)
if post is not None:
if not isinstance(post, list):
print('DATA: ' + urllib.unquote_plus(json.dumps(post)))
print("RESPONSE: " + body + "\n")
return [header, json_decode(body)]
def __init__(self):
self.curl = pycurl.Curl()
self.curl.setopt(pycurl.SSL_VERIFYHOST, False)
self.curl.setopt(pycurl.SSL_VERIFYPEER, False)
# ?????header
self.curl.setopt(pycurl.HEADER, False)
def handle_request(self):
curl_handle = pycurl.Curl()
# set default options.
curl_handle.setopt(pycurl.URL, self.request_url)
curl_handle.setopt(pycurl.REFERER, self.request_url)
curl_handle.setopt(pycurl.USERAGENT, self.useragent)
curl_handle.setopt(pycurl.TIMEOUT, self.curlopts['TIMEOUT'])
curl_handle.setopt(pycurl.CONNECTTIMEOUT, self.curlopts['CONNECTTIMEOUT'])
curl_handle.setopt(pycurl.HEADER, True)
#curl_handle.setopt(pycurl.VERBOSE, 1)
curl_handle.setopt(pycurl.FOLLOWLOCATION, 1)
curl_handle.setopt(pycurl.MAXREDIRS, 5)
if(self.request_headers and len(self.request_headers) > 0):
tmplist = list()
for(key, value) in self.request_headers.items():
tmplist.append(key + ':' + value)
curl_handle.setopt(pycurl.HTTPHEADER, tmplist)
#??????POST
curl_handle.setopt(pycurl.HTTPPROXYTUNNEL, 1)
curl_handle.setopt(pycurl.POSTFIELDS, self.request_body)
response = StringIO.StringIO()
curl_handle.setopt(pycurl.WRITEFUNCTION, response.write)
try:
curl_handle.perform()
except pycurl.error as error:
raise ChannelException(error, 5)
self.response_code = curl_handle.getinfo(curl_handle.HTTP_CODE)
header_size = curl_handle.getinfo(curl_handle.HEADER_SIZE)
resp_str = response.getvalue()
self.response_headers = resp_str[0 : header_size]
self.response_body = resp_str[header_size : ]
response.close()
curl_handle.close()
def getPage (self, url, requestHeader = []) :
resultFormate = StringIO.StringIO()
fakeIp = self.fakeIp()
requestHeader.append('CLIENT-IP:' + fakeIp)
requestHeader.append('X-FORWARDED-FOR:' + fakeIp)
try:
curl = pycurl.Curl()
curl.setopt(pycurl.URL, url.strip())
curl.setopt(pycurl.ENCODING, 'gzip,deflate')
curl.setopt(pycurl.HEADER, 1)
curl.setopt(pycurl.TIMEOUT, 120)
curl.setopt(pycurl.SSL_VERIFYPEER, 0)
curl.setopt(pycurl.SSL_VERIFYHOST, 0)
curl.setopt(pycurl.HTTPHEADER, requestHeader)
curl.setopt(pycurl.WRITEFUNCTION, resultFormate.write)
curl.perform()
headerSize = curl.getinfo(pycurl.HEADER_SIZE)
curl.close()
header = resultFormate.getvalue()[0 : headerSize].split('\r\n')
body = resultFormate.getvalue()[headerSize : ]
except Exception, e:
header = ''
body = ''
return header, body
def setup_curl_for_get(c, p, data_buf, headers=None, share=None):
setup_curl_basic(c, p, data_buf, headers, share)
httpheader = p.get('httpheader')
if httpheader:
# c.setopt(pycurl.HEADER, p.get('header', 1))
c.setopt(c.HTTPHEADER, httpheader)
return c
def get_new_release_albums(self):
self.logger.info("Get albums of the new release from Spotify.")
service_host = "https://api.spotify.com"
"""
Get client_id and client_secret value from local.
"""
config = Config()
client_id = config.spotify_client_id
client_secret = config.spotify_client_secret
access_token = Spotify(client_id, client_secret).get_token()
headers = ["Authorization: Bearer " + access_token]
offset = 0
new_release = []
while True:
params = {
"offset": offset,
"limit": 50
}
buf = BytesIO()
client = pycurl.Curl()
client.setopt(pycurl.HEADER, False)
client.setopt(pycurl.HTTPHEADER, headers)
client.setopt(pycurl.URL, service_host +
"/v1/browse/new-releases" + "?" + urlencode(params))
client.setopt(pycurl.WRITEFUNCTION, buf.write)
client.perform()
client.close()
body = json.loads(buf.getvalue().decode("utf-8"))
buf.close()
if body["albums"]["total"] > 0:
for data in body["albums"]["items"]:
album = Album(data["name"], None)
images = []
for image_data in data["images"]:
images.append(Image(image_data["url"],
image_data["width"],
image_data["height"]))
album.images = images
album.provider = self._provider
album.provider_res_id = data["id"]
artist_id = data["artists"][0]["id"] \
if len(data["artists"]) > 0 else ""
new_release.append((artist_id, album))
if body["albums"]["total"] \
> body["albums"]["limit"] * (offset + 1):
offset = offset + 1
else:
break
return new_release
def version_update(self):
if not obplayer.Config.setting('sync_url'):
return
obplayer.Log.log('sending player version to server: ' + obplayer.Config.version, 'sync')
postfields = {}
postfields['id'] = obplayer.Config.setting('sync_device_id')
postfields['pw'] = obplayer.Config.setting('sync_device_password')
postfields['version'] = obplayer.Config.version
postfields['longitude'] = obplayer.Config.setting('location_longitude')
postfields['latitude'] = obplayer.Config.setting('location_latitude')
curl = pycurl.Curl()
enc_postfields = urllib.urlencode(postfields)
curl.setopt(pycurl.NOSIGNAL, 1)
curl.setopt(pycurl.USERAGENT, 'OpenBroadcaster Player')
curl.setopt(pycurl.URL, obplayer.Config.setting('sync_url') + '?action=version')
curl.setopt(pycurl.HEADER, False)
curl.setopt(pycurl.POST, True)
curl.setopt(pycurl.POSTFIELDS, enc_postfields)
curl.setopt(pycurl.LOW_SPEED_LIMIT, 10)
curl.setopt(pycurl.LOW_SPEED_TIME, 60)
curl.setopt(pycurl.NOPROGRESS, 0)
curl.setopt(pycurl.PROGRESSFUNCTION, self.curl_progress)
class CurlResponse:
def __init__(self):
self.buffer = u''
def __call__(self, data):
self.buffer += data.decode('utf-8')
curl_response = CurlResponse()
curl.setopt(pycurl.WRITEFUNCTION, curl_response)
try:
curl.perform()
except:
obplayer.Log.log("exception in VersionUpdate thread", 'error')
obplayer.Log.log(traceback.format_exc(), 'error')
curl.close()
if curl_response.buffer:
version = json.loads(curl_response.buffer)
obplayer.Log.log("server version reported as " + str(version), 'sync')
if not self.check_min_version(version):
obplayer.Log.log("minimum server version " + str(MIN_SERVER_VERSION) + " is required. Please update server software before continuing", 'error')
else:
obplayer.Log.log("server did not report a version number", 'warning')
def now_playing_update_thread(self, playlist_id, playlist_end, media_id, media_end, show_name):
if not obplayer.Config.setting('sync_url'):
return
postfields = {}
postfields['id'] = obplayer.Config.setting('sync_device_id')
postfields['pw'] = obplayer.Config.setting('sync_device_password')
postfields['playlist_id'] = playlist_id
postfields['media_id'] = media_id
postfields['show_name'] = show_name
if playlist_end != '':
postfields['playlist_end'] = int(round(playlist_end))
else:
postfields['playlist_end'] = ''
if media_end != '':
postfields['media_end'] = int(round(media_end))
else:
postfields['media_end'] = ''
curl = pycurl.Curl()
enc_postfields = urllib.urlencode(postfields)
curl.setopt(pycurl.NOSIGNAL, 1)
curl.setopt(pycurl.USERAGENT, 'OpenBroadcaster Player')
curl.setopt(pycurl.URL, obplayer.Config.setting('sync_url') + '?action=now_playing')
curl.setopt(pycurl.HEADER, False)
curl.setopt(pycurl.POST, True)
curl.setopt(pycurl.POSTFIELDS, enc_postfields)
#curl.setopt(pycurl.FOLLOWLOCATION, 1)
curl.setopt(pycurl.LOW_SPEED_LIMIT, 10)
curl.setopt(pycurl.LOW_SPEED_TIME, 60)
curl.setopt(pycurl.NOPROGRESS, 0)
curl.setopt(pycurl.PROGRESSFUNCTION, self.curl_progress)
try:
curl.perform()
except:
obplayer.Log.log("exception in NowPlayingUpdate thread", 'error')
obplayer.Log.log(traceback.format_exc(), 'error')
curl.close()
#
# Request sync data from web application.
# This is used by sync (with request_type='schedule') and sync_priority_broadcasts (with request_type='emerg').
# Function outputs XML response from server.
#
def sync_request(self, request_type='', data=False):
sync_url = obplayer.Config.setting('sync_url')
if not sync_url:
obplayer.Log.log("sync url is blank, skipping sync request", 'sync')
return ''
curl = pycurl.Curl()
postfields = {}
postfields['id'] = obplayer.Config.setting('sync_device_id')
postfields['pw'] = obplayer.Config.setting('sync_device_password')
postfields['hbuffer'] = obplayer.Config.setting('sync_buffer')
if data:
postfields['data'] = data
enc_postfields = urllib.urlencode(postfields)
curl.setopt(pycurl.NOSIGNAL, 1)
curl.setopt(pycurl.USERAGENT, 'OpenBroadcaster Player')
curl.setopt(pycurl.URL, sync_url + '?action=' + request_type)
curl.setopt(pycurl.HEADER, False)
curl.setopt(pycurl.POST, True)
curl.setopt(pycurl.POSTFIELDS, enc_postfields)
# some options so that it'll abort the transfer if the speed is too low (i.e., network problem)
# low speed abort set to 0.01Kbytes/s for 60 seconds).
curl.setopt(pycurl.LOW_SPEED_LIMIT, 10)
curl.setopt(pycurl.LOW_SPEED_TIME, 60)
curl.setopt(pycurl.NOPROGRESS, 0)
curl.setopt(pycurl.PROGRESSFUNCTION, self.curl_progress)
class CurlResponse:
def __init__(self):
self.buffer = u''
def __call__(self, data):
self.buffer += data.decode('utf-8')
curl_response = CurlResponse()
curl.setopt(pycurl.WRITEFUNCTION, curl_response)
try:
curl.perform()
#except pycurl.error as error:
# (errno, errstr) = error
# obplayer.Log.log('network error: ' + errstr, 'error')
except:
obplayer.Log.log("exception in sync " + request_type + " thread", 'error')
obplayer.Log.log(traceback.format_exc(), 'error')
curl.close()
return curl_response.buffer
#
# Fetch media from web application. Saves under media directory.
# media_id : id of the media we want
# filename : filename to save under.
#