def put (url, data, headers={}):
"""Make a PUT request to the url, using data in the message body,
with the additional headers, if any"""
reply = -1 # default, non-http response
curl = pycurl.Curl()
curl.setopt(pycurl.URL, url)
if len(headers) > 0:
curl.setopt(pycurl.HTTPHEADER, [k+': '+v for k,v in headers.items()])
curl.setopt(pycurl.PUT, 1)
curl.setopt(pycurl.INFILESIZE, len(data))
databuffer = StringIO(data)
curl.setopt(pycurl.READFUNCTION, databuffer.read)
try:
curl.perform()
reply = curl.getinfo(pycurl.HTTP_CODE)
except Exception:
pass
curl.close()
return reply
python类URL的实例源码
def request(self, endpoint, post=None):
buffer = BytesIO()
ch = pycurl.Curl()
ch.setopt(pycurl.URL, Constants.API_URL + endpoint)
ch.setopt(pycurl.USERAGENT, self.userAgent)
ch.setopt(pycurl.WRITEFUNCTION, buffer.write)
ch.setopt(pycurl.FOLLOWLOCATION, True)
ch.setopt(pycurl.HEADER, True)
ch.setopt(pycurl.VERBOSE, False)
ch.setopt(pycurl.COOKIEFILE, os.path.join(self.IGDataPath, self.username, self.username + "-cookies.dat"))
ch.setopt(pycurl.COOKIEJAR, os.path.join(self.IGDataPath, self.username, self.username + "-cookies.dat"))
if post is not None:
ch.setopt(pycurl.POST, True)
ch.setopt(pycurl.POSTFIELDS, post)
if self.proxy:
ch.setopt(pycurl.PROXY, self.proxyHost)
if self.proxyAuth:
ch.setopt(pycurl.PROXYUSERPWD, self.proxyAuth)
ch.perform()
resp = buffer.getvalue()
header_len = ch.getinfo(pycurl.HEADER_SIZE)
header = resp[0: header_len]
body = resp[header_len:]
ch.close()
if self.debug:
print("REQUEST: " + endpoint)
if post is not None:
if not isinstance(post, list):
print("DATA: " + str(post))
print("RESPONSE: " + body)
return [header, json_decode(body)]
def CurlPOST(url, data, cookie):
c = pycurl.Curl()
b = StringIO.StringIO()
c.setopt(pycurl.URL, url)
c.setopt(pycurl.POST, 1)
c.setopt(pycurl.HTTPHEADER,['Content-Type: application/json'])
# c.setopt(pycurl.TIMEOUT, 10)
c.setopt(pycurl.WRITEFUNCTION, b.write)
c.setopt(pycurl.COOKIEFILE, cookie)
c.setopt(pycurl.COOKIEJAR, cookie)
c.setopt(pycurl.POSTFIELDS, data)
c.perform()
html = b.getvalue()
b.close()
c.close()
return html
def getXML(self,obj):
r=obj.createElement("request")
r.setAttribute("method",self.method)
url=obj.createElement("URL")
url.appendChild(obj.createTextNode(self.completeUrl))
r.appendChild(url)
if self.method=="POST":
pd=obj.createElement("PostData")
pd.appendChild(obj.createTextNode(self.postdata))
r.appendChild(pd)
if "Cookie" in self.__headers:
ck=obj.createElement("Cookie")
ck.appendChild(obj.createTextNode(self.__headers["Cookie"]))
r.appendChild(ck)
return r
def file(self, path, output, args={}, progress_callback=lambda *x: None):
self.logger.debug('??????????')
self.web_cache[path] = dict(args)
url = urllib.parse.urljoin(self.file_url, urllib.parse.quote(path))
if len(args) > 0:
url += '?' + urllib.parse.urlencode(args)
self.logger.debug('HTTP ?????{}'.format(url))
self.curl.setopt(pycurl.URL, url)
self.curl.setopt(pycurl.COOKIE, self.web_cookie)
self.curl.setopt(pycurl.NOBODY, False)
self.curl.setopt(pycurl.NOPROGRESS, False)
self.curl.setopt(pycurl.WRITEDATA, output)
self.curl.setopt(pycurl.HEADERFUNCTION, lambda *x: None)
self.curl.setopt(pycurl.XFERINFOFUNCTION, progress_callback)
self.curl.perform()
status = self.curl.getinfo(pycurl.RESPONSE_CODE)
if status != 200:
raise ServerError(status)
def file_size(self, path, args={}):
self.logger.debug('????????????')
self.web_cache[path] = dict(args)
url = urllib.parse.urljoin(self.file_url, urllib.parse.quote(path))
if len(args) > 0:
url += '?' + urllib.parse.urlencode(args)
self.logger.debug('HTTP ?????{}'.format(url))
self.curl.setopt(pycurl.URL, url)
self.curl.setopt(pycurl.COOKIE, self.web_cookie)
self.curl.setopt(pycurl.NOBODY, True)
self.curl.setopt(pycurl.NOPROGRESS, True)
self.curl.setopt(pycurl.WRITEDATA, io.BytesIO())
self.curl.setopt(pycurl.HEADERFUNCTION, lambda *x: None)
self.curl.setopt(pycurl.XFERINFOFUNCTION, lambda *x: None)
self.curl.perform()
status = self.curl.getinfo(pycurl.RESPONSE_CODE)
if status != 200:
raise ServerError(status)
return self.curl.getinfo(pycurl.CONTENT_LENGTH_DOWNLOAD)
def web_redirect(self, path, args={}):
self.logger.debug('????????????')
self.web_cache[path] = dict(args)
url = urllib.parse.urljoin(self.web_url, urllib.parse.quote(path))
if len(args) > 0:
url += '?' + urllib.parse.urlencode(args)
self.logger.debug('HTTP ?????{}'.format(url))
headers = io.BytesIO()
self.curl.setopt(pycurl.URL, url)
self.curl.setopt(pycurl.COOKIE, self.web_cookie)
self.curl.setopt(pycurl.NOBODY, False)
self.curl.setopt(pycurl.NOPROGRESS, True)
self.curl.setopt(pycurl.WRITEDATA, NoneIO())
self.curl.setopt(pycurl.HEADERFUNCTION, headers.write)
self.curl.setopt(pycurl.XFERINFOFUNCTION, lambda *x: None)
self.curl.perform()
status = self.curl.getinfo(pycurl.RESPONSE_CODE)
if status != 302:
raise ServerError(status)
for header_line in headers.getvalue().split(b'\r\n'):
if header_line.startswith(b'Location:'):
return header_line.split(b':', maxsplit=1)[1].strip().decode()
return None
def connect_https(source, job, conn_timeout, curlopts=None, curlinfos=None):
if curlopts is None:
curlopts = {}
if ':' in job['dip']:
ipString = '[' + job['dip'] + ']'
else:
ipString = job['dip']
if pycurl.URL not in curlopts:
if 'domain' in job:
url = "http://" + job['domain'] + ":" + str(job['dp']) + "/"
else:
url = "http://" + ipString + ":" + str(job['dp']) + "/"
else:
curlopts[pycurl.URL] = url
if pycurl.SSL_VERIFYHOST not in curlopts:
curlopts[pycurl.SSL_VERIFYHOST] = 0
if pycurl.SSL_VERIFYPEER not in curlopts:
curlopts[pycurl.SSL_VERIFYPEER] = 0
return connect_http(source, job, conn_timeout, curlopts, curlinfos)
def getXML(self,obj):
r=obj.createElement("request")
r.setAttribute("method",self.method)
url=obj.createElement("URL")
url.appendChild(obj.createTextNode(self.completeUrl))
r.appendChild(url)
if self.method=="POST":
pd=obj.createElement("PostData")
pd.appendChild(obj.createTextNode(self.postdata))
r.appendChild(pd)
if "Cookie" in self._headers:
ck=obj.createElement("Cookie")
ck.appendChild(obj.createTextNode(self._headers["Cookie"]))
r.appendChild(ck)
return r
def fetch(self, url, body=None, headers=None):
if not _allowedURL(url):
raise ValueError('Bad URL scheme: %r' % (url,))
if headers is None:
headers = {}
headers.setdefault(
'User-Agent',
"%s Python-urllib/%s" % (USER_AGENT, urllib2.__version__,))
req = urllib2.Request(url, data=body, headers=headers)
try:
f = self.urlopen(req)
try:
return self._makeResponse(f)
finally:
f.close()
except urllib2.HTTPError, why:
try:
return self._makeResponse(why)
finally:
why.close()
def fetch_many_async(urls, callback=None, errback=None, **kwargs):
"""
Retrieve a list of URLs asynchronously.
@param callback: Optionally, a function that will be fired one time for
each successful URL, and will be passed its content and the URL itself.
@param errback: Optionally, a function that will be fired one time for each
failing URL, and will be passed the failure and the URL itself.
@return: A C{DeferredList} whose callback chain will be fired as soon as
all downloads have terminated. If an error occurs, the errback chain
of the C{DeferredList} will be fired immediatly.
"""
results = []
for url in urls:
result = fetch_async(url, **kwargs)
if callback:
result.addCallback(callback, url)
if errback:
result.addErrback(errback, url)
results.append(result)
return DeferredList(results, fireOnOneErrback=True, consumeErrors=True)
def fetch_to_files(urls, directory, logger=None, **kwargs):
"""
Retrieve a list of URLs and save their content as files in a directory.
@param urls: The list URLs to fetch.
@param directory: The directory to save the files to, the name of the file
will equal the last fragment of the URL.
@param logger: Optional function to be used to log errors for failed URLs.
"""
def write(data, url):
filename = url_to_filename(url, directory=directory)
fd = open(filename, "wb")
fd.write(data)
fd.close()
def log_error(failure, url):
if logger:
logger("Couldn't fetch file from %s (%s)" % (
url, str(failure.value)))
return failure
return fetch_many_async(urls, callback=write, errback=log_error, **kwargs)
def test_post(self):
curl = CurlStub(b"result")
result = fetch("http://example.com", post=True, curl=curl)
self.assertEqual(result, b"result")
self.assertEqual(curl.options,
{pycurl.URL: b"http://example.com",
pycurl.FOLLOWLOCATION: 1,
pycurl.MAXREDIRS: 5,
pycurl.CONNECTTIMEOUT: 30,
pycurl.LOW_SPEED_LIMIT: 1,
pycurl.LOW_SPEED_TIME: 600,
pycurl.NOSIGNAL: 1,
pycurl.WRITEFUNCTION: Any(),
pycurl.POST: True,
pycurl.DNS_CACHE_TIMEOUT: 0,
pycurl.ENCODING: b"gzip,deflate"})
def test_post_data(self):
curl = CurlStub(b"result")
result = fetch("http://example.com", post=True, data="data", curl=curl)
self.assertEqual(result, b"result")
self.assertEqual(curl.options[pycurl.READFUNCTION](), b"data")
self.assertEqual(curl.options,
{pycurl.URL: b"http://example.com",
pycurl.FOLLOWLOCATION: 1,
pycurl.MAXREDIRS: 5,
pycurl.CONNECTTIMEOUT: 30,
pycurl.LOW_SPEED_LIMIT: 1,
pycurl.LOW_SPEED_TIME: 600,
pycurl.NOSIGNAL: 1,
pycurl.WRITEFUNCTION: Any(),
pycurl.POST: True,
pycurl.POSTFIELDSIZE: 4,
pycurl.READFUNCTION: Any(),
pycurl.DNS_CACHE_TIMEOUT: 0,
pycurl.ENCODING: b"gzip,deflate"})
def test_cainfo(self):
curl = CurlStub(b"result")
result = fetch("https://example.com", cainfo="cainfo", curl=curl)
self.assertEqual(result, b"result")
self.assertEqual(curl.options,
{pycurl.URL: b"https://example.com",
pycurl.FOLLOWLOCATION: 1,
pycurl.MAXREDIRS: 5,
pycurl.CONNECTTIMEOUT: 30,
pycurl.LOW_SPEED_LIMIT: 1,
pycurl.LOW_SPEED_TIME: 600,
pycurl.NOSIGNAL: 1,
pycurl.WRITEFUNCTION: Any(),
pycurl.CAINFO: b"cainfo",
pycurl.DNS_CACHE_TIMEOUT: 0,
pycurl.ENCODING: b"gzip,deflate"})
def test_headers(self):
curl = CurlStub(b"result")
result = fetch("http://example.com",
headers={"a": "1", "b": "2"}, curl=curl)
self.assertEqual(result, b"result")
self.assertEqual(curl.options,
{pycurl.URL: b"http://example.com",
pycurl.FOLLOWLOCATION: 1,
pycurl.MAXREDIRS: 5,
pycurl.CONNECTTIMEOUT: 30,
pycurl.LOW_SPEED_LIMIT: 1,
pycurl.LOW_SPEED_TIME: 600,
pycurl.NOSIGNAL: 1,
pycurl.WRITEFUNCTION: Any(),
pycurl.HTTPHEADER: ["a: 1", "b: 2"],
pycurl.DNS_CACHE_TIMEOUT: 0,
pycurl.ENCODING: b"gzip,deflate"})
def test_timeouts(self):
curl = CurlStub(b"result")
result = fetch("http://example.com", connect_timeout=5,
total_timeout=30, curl=curl)
self.assertEqual(result, b"result")
self.assertEqual(curl.options,
{pycurl.URL: b"http://example.com",
pycurl.FOLLOWLOCATION: 1,
pycurl.MAXREDIRS: 5,
pycurl.CONNECTTIMEOUT: 5,
pycurl.LOW_SPEED_LIMIT: 1,
pycurl.LOW_SPEED_TIME: 30,
pycurl.NOSIGNAL: 1,
pycurl.WRITEFUNCTION: Any(),
pycurl.DNS_CACHE_TIMEOUT: 0,
pycurl.ENCODING: b"gzip,deflate"})
def test_pycurl_insecure(self):
curl = CurlStub(b"result")
result = fetch("http://example.com/get-ca-cert", curl=curl,
insecure=True)
self.assertEqual(result, b"result")
self.assertEqual(curl.options,
{pycurl.URL: b"http://example.com/get-ca-cert",
pycurl.FOLLOWLOCATION: 1,
pycurl.MAXREDIRS: 5,
pycurl.CONNECTTIMEOUT: 30,
pycurl.LOW_SPEED_LIMIT: 1,
pycurl.LOW_SPEED_TIME: 600,
pycurl.NOSIGNAL: 1,
pycurl.WRITEFUNCTION: Any(),
pycurl.SSL_VERIFYPEER: False,
pycurl.DNS_CACHE_TIMEOUT: 0,
pycurl.ENCODING: b"gzip,deflate"})
def get_user_init(self):
'''
Description:
(re)initialize the nonce of current session and fetch the session ID
Return:
{
"success": true,
"jsessionid": "3185591CD191F18D1551440AE1BEF86A-n1.frontend3",
"nonce": "GTPSLZUcDyjEBqeL"
}
'''
uri = "user/init"
api_url = self.url + uri
c = pycurl.Curl()
output_init = BytesIO()
c.setopt(c.URL, api_url)
### Create the cookie File
c.setopt(pycurl.COOKIEJAR, 'cookie.txt')
c.setopt(c.WRITEFUNCTION, output_init.write)
c.perform()
return json.loads(output_init.getvalue())
def get_user_login(self, token):
'''
Description:
login
'''
self.token = token
uri = "user/login?username="+self.login+"&token="+self.token
api_url = self.url + uri
c = pycurl.Curl()
output = BytesIO()
c.setopt(c.URL, api_url)
### Read the cookie File
c.setopt(pycurl.COOKIEFILE, 'cookie.txt')
c.setopt(c.WRITEFUNCTION, output.write)
c.perform()
return json.loads(output.getvalue())
def get_all_virtual_endpoints(self):
'''
Description:
get all virtual endpoints
'''
uri = "virtualEndpoints"
api_url = self.url + uri
c = pycurl.Curl()
output_init = BytesIO()
c.setopt(c.URL, api_url)
### Create the cookie File
c.setopt(pycurl.COOKIEFILE, 'cookie.txt')
c.setopt(c.WRITEFUNCTION, output_init.write)
c.perform()
return json.loads(output_init.getvalue())
def create_virtual_endpoints(self, data, category):
'''
Description:
create a virtual endpoints
category:
SENSOR
METER
GAUGE
ONOFF
LEVEL_CONTROL
'''
self.data = data
self.category = category
uri = "virtualEndpoints/?category=" + self.category
api_url = self.url + uri
c = pycurl.Curl()
c.setopt(pycurl.URL, api_url)
c.setopt(pycurl.HTTPHEADER, ['Accept: application/json','Content-Type: application/json','charset=UTF-8'])
c.setopt(pycurl.COOKIEFILE, 'cookie.txt')
c.setopt(pycurl.POST, 1)
c.setopt(pycurl.POSTFIELDS, self.data)
c.setopt(pycurl.VERBOSE, 1)
c.perform()
def get_virtual_endpoints_config(self, uuid):
'''
Description:
get virtual endpoints config
'''
self.uuid = uuid
uri = "virtualEndpoints/"+self.uuid+"/config"
api_url = self.url + uri
c = pycurl.Curl()
output_init = BytesIO()
c.setopt(c.URL, api_url)
### Create the cookie File
c.setopt(pycurl.COOKIEFILE, 'cookie.txt')
c.setopt(c.WRITEFUNCTION, output_init.write)
c.perform()
return json.loads(output_init.getvalue())
def create_rooms(self, data):
'''
Description:
create a room
'''
self.data = data
uri = "rooms/"
api_url = self.url + uri
c = pycurl.Curl()
c.setopt(pycurl.URL, api_url)
c.setopt(pycurl.HTTPHEADER, ['Accept: application/json','Content-Type: application/json','charset=UTF-8'])
c.setopt(pycurl.COOKIEFILE, 'cookie.txt')
c.setopt(pycurl.POST, 1)
c.setopt(pycurl.POSTFIELDS, self.data)
c.setopt(pycurl.VERBOSE, 1)
c.perform()
def put_attributes_config(self, data, uuid):
'''
Description:
modify an attribute
'''
self.data = data
self.uuid = uuid
uri = "attributes/" + self.uuid + "/config"
api_url = self.url + uri
c = pycurl.Curl()
c.setopt(pycurl.URL, api_url)
c.setopt(pycurl.HTTPHEADER, ['Accept: application/json','Content-Type: application/json','charset=UTF-8'])
c.setopt(pycurl.COOKIEFILE, 'cookie.txt')
c.setopt(pycurl.POST, 1)
c.setopt(pycurl.POSTFIELDS, self.data)
c.setopt(pycurl.VERBOSE, 1)
c.perform()
def save_and_synchronize(self, wait="false", timeout=30):
'''
Description:
synchronize Zipato with the Server
'''
self.wait = wait
self.timeout = timeout
uri = "box/saveAndSynchronize?wait=" + self.wait + "&timeout=" + str(self.timeout)
api_url = self.url + uri
c = pycurl.Curl()
output_init = BytesIO()
c.setopt(c.URL, api_url)
### Create the cookie File
c.setopt(pycurl.COOKIEFILE, 'cookie.txt')
c.setopt(c.WRITEFUNCTION, output_init.write)
c.perform()
c.close()
return json.loads(output_init.getvalue())
def synchronize(self, ifneeded="false", wait="false", timeout=30):
'''
Description:
synchronize Zipato with the Server
'''
self.ifneeded = ifneeded
self.wait = wait
self.timeout = timeout
uri = "box/synchronize?ifNeeded=" + self.ifneeded + "wait=" + self.wait + "&timeout=" + str(self.timeout)
api_url = self.url + uri
c = pycurl.Curl()
output_init = BytesIO()
c.setopt(c.URL, api_url)
### Create the cookie File
c.setopt(pycurl.COOKIEFILE, 'cookie.txt')
c.setopt(c.WRITEFUNCTION, output_init.write)
c.perform()
c.close()
return json.loads(output_init.getvalue())
def postXmlSSL(self, xml, url, second=30, cert=True, post=True):
"""????"""
self.curl.setopt(pycurl.URL, url)
self.curl.setopt(pycurl.TIMEOUT, second)
# ????
# ?????cert ? key ??????.pem??
# ?????PEM?????
if cert:
self.curl.setopt(pycurl.SSLKEYTYPE, "PEM")
self.curl.setopt(pycurl.SSLKEY, WxPayConf_pub.SSLKEY_PATH)
self.curl.setopt(pycurl.SSLCERTTYPE, "PEM")
self.curl.setopt(pycurl.SSLCERT, WxPayConf_pub.SSLCERT_PATH)
# post????
if post:
self.curl.setopt(pycurl.POST, True)
self.curl.setopt(pycurl.POSTFIELDS, xml)
buff = StringIO()
self.curl.setopt(pycurl.WRITEFUNCTION, buff.write)
self.curl.perform()
return buff.getvalue()
def use_cloud(token):
fp = wave.open('output.wav','r')
nf = fp.getnframes()
f_len = nf * 2
audio_data = fp.readframes(nf)
cuid = "123456" #my xiaomi phone MAC
srv_url = 'http://vop.baidu.com/server_api' + '?cuid=' + cuid + '&token=' + token
http_header = [
'Content-Type: audio/pcm; rate=8000',
'Content-Length: %d' % f_len
]
print srv_url
c = pycurl.Curl()
c.setopt(pycurl.URL, str(srv_url)) #curl doesn't support unicode
c.setopt(c.HTTPHEADER, http_header) #must be list, not dict
c.setopt(c.POST, 1)
c.setopt(c.CONNECTTIMEOUT, 30)
c.setopt(c.TIMEOUT, 30)
c.setopt(c.WRITEFUNCTION, dump_res)
c.setopt(c.POSTFIELDS, audio_data)
c.setopt(c.POSTFIELDSIZE, f_len)
c.perform()
def use_cloud(token):
fp = wave.open('output.wav','r')
nf = fp.getnframes()
f_len = nf * 2
audio_data = fp.readframes(nf)
cuid = "123456" #my xiaomi phone MAC
srv_url = 'http://vop.baidu.com/server_api' + '?cuid=' + cuid + '&token=' + token
http_header = [
'Content-Type: audio/pcm; rate=8000',
'Content-Length: %d' % f_len
]
print srv_url
c = pycurl.Curl()
c.setopt(pycurl.URL, str(srv_url)) #curl doesn't support unicode
c.setopt(c.HTTPHEADER, http_header) #must be list, not dict
c.setopt(c.POST, 1)
c.setopt(c.CONNECTTIMEOUT, 30)
c.setopt(c.TIMEOUT, 30)
c.setopt(c.WRITEFUNCTION, dump_res)
c.setopt(c.POSTFIELDS, audio_data)
c.setopt(c.POSTFIELDSIZE, f_len)
c.perform()