def use_cloud(token):
fp = wave.open('output.wav', 'rb')
nf = fp.getnframes()
f_len = nf * 2
audio_data = fp.readframes(nf)
cuid = "xxxxxxxxxx" #my xiaomi phone MAC
srv_url = 'http://vop.baidu.com/server_api' + '?cuid=' + cuid + '&token=' + token
http_header = [
'Content-Type: audio/pcm; rate=8000',
'Content-Length: %d' % f_len
]
print srv_url
c = pycurl.Curl()
c.setopt(pycurl.URL, str(srv_url)) #curl doesn't support unicode
#c.setopt(c.RETURNTRANSFER, 1)
c.setopt(c.HTTPHEADER, http_header) #must be list, not dict
c.setopt(c.POST, 1)
c.setopt(c.CONNECTTIMEOUT, 30)
c.setopt(c.TIMEOUT, 30)
c.setopt(c.WRITEFUNCTION, dump_res)
c.setopt(c.POSTFIELDS, audio_data)
c.setopt(c.POSTFIELDSIZE, f_len)
c.perform() #pycurl.perform() has no return val
python类URL的实例源码
def use_cloud(token):
fp = wave.open('output.wav','r')
nf = fp.getnframes()
f_len = nf * 2
audio_data = fp.readframes(nf)
cuid = "123456" #my xiaomi phone MAC
srv_url = 'http://vop.baidu.com/server_api' + '?cuid=' + cuid + '&token=' + token
http_header = [
'Content-Type: audio/pcm; rate=8000',
'Content-Length: %d' % f_len
]
c = pycurl.Curl()
c.setopt(pycurl.URL, str(srv_url)) #curl doesn't support unicode
c.setopt(c.HTTPHEADER, http_header) #must be list, not dict
c.setopt(c.POST, 1)
c.setopt(c.CONNECTTIMEOUT, 30)
c.setopt(c.TIMEOUT, 30)
c.setopt(c.WRITEFUNCTION, dump_res)
c.setopt(c.POSTFIELDS, audio_data)
c.setopt(c.POSTFIELDSIZE, f_len)
c.perform()
def _get_url(self, url):
if self.API_TOKEN == None:
logging.error('none token') # 3 For ERROR level
return
try:
c = pycurl.Curl()
c.setopt(pycurl.CAINFO, certifi.where())
c.setopt(pycurl.URL, url)
b = StringIO.StringIO()
c.setopt(pycurl.WRITEFUNCTION, b.write)
c.setopt(pycurl.USERAGENT, "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; .NET CLR 1.1.4322)")
c.setopt(pycurl.HTTPHEADER, ['Authorization: JWT %s' % self.API_TOKEN.encode()])
c.setopt(pycurl.CUSTOMREQUEST, "GET")
c.setopt(pycurl.FOLLOWLOCATION, 1)
c.perform()
result = b.getvalue()
logging.debug('result')
except Exception as e:
logging.error(e.message)
logging.error('go error')
pass
return result
def use_cloud(token):
fp=wave.open(u'01.wav','rb')
nf=fp.getnframes()
print 'sampwidth:',fp.getnframes()
print 'framerate:',fp.getframerate()
print 'channels:',fp.getnchannels()
f_len=nf*2
audio_data=fp.readframes(nf)
cuid="10:2A:B3:58:28:88" #my redmi phone MAC
srv_url='http://vop.baidu.com/server_api'+'?cuid='+cuid+'&token='+token
http_header=[
'Content-Type:audio/pcm; rate=8000',
'Content-length:%d' % f_len
]
c=pycurl.Curl()
c.setopt(pycurl.URL,str(srv_url))
c.setopt(c.HTTPHEADER,http_header)
c.setopt(c.CONNECTTIMEOUT,80)
c.setopt(c.TIMEOUT,80)
c.setopt(c.WRITEFUNCTION,dump_res)
c.setopt(c.POSTFIELDS,audio_data)
c.setopt(c.POSTFIELDSIZE,f_len)
c.perform() #pycurl.perform() has no return val
def voice_to_text(self, voice_file, callback):
# print 'xdc::::::::voice:::::', voice_file, callback
fp = wave.open(voice_file, 'rb')
nf = fp.getnframes()
f_len = nf * 2
audio_data = fp.readframes(nf)
cuid = "xxxxxxxxxx" #my xiaomi phone MAC
srv_url = 'http://vop.baidu.com/server_api' + '?cuid=' + cuid + '&token=' + self.get_token()
http_header = [
'Content-Type: audio/pcm; rate=8000',
'Content-Length: %d' % f_len
]
c = pycurl.Curl()
c.setopt(pycurl.URL, str(srv_url)) #curl doesn't support unicode
#c.setopt(c.RETURNTRANSFER, 1)
c.setopt(c.HTTPHEADER, http_header) #must be list, not dict
c.setopt(c.POST, 1)
c.setopt(c.CONNECTTIMEOUT, 30)
c.setopt(c.TIMEOUT, 30)
c.setopt(c.WRITEFUNCTION, callback)
c.setopt(c.POSTFIELDS, audio_data)
c.setopt(c.POSTFIELDSIZE, f_len)
c.perform() #pycurl.perform() has no return val
def getXML(self,obj):
r=obj.createElement("request")
r.setAttribute("method",self.method)
url=obj.createElement("URL")
url.appendChild(obj.createTextNode(self.completeUrl))
r.appendChild(url)
if self.method=="POST":
pd=obj.createElement("PostData")
pd.appendChild(obj.createTextNode(self.postdata))
r.appendChild(pd)
if "Cookie" in self._headers:
ck=obj.createElement("Cookie")
ck.appendChild(obj.createTextNode(self._headers["Cookie"]))
r.appendChild(ck)
return r
def __init__(self, uploader):
self.handle = pycurl.Curl()
self.response_headers = {}
self.output = six.StringIO()
self.status_code = None
self.handle.setopt(pycurl.CAINFO, certifi.where())
self.handle.setopt(pycurl.URL, uploader.url)
self.handle.setopt(pycurl.HEADERFUNCTION, self._prepare_response_header)
self.handle.setopt(pycurl.UPLOAD, 1)
self.handle.setopt(pycurl.CUSTOMREQUEST, 'PATCH')
self.file = uploader.get_file_stream()
self.file.seek(uploader.offset)
self.handle.setopt(pycurl.READFUNCTION, self.file.read)
self.handle.setopt(pycurl.WRITEFUNCTION, self.output.write)
self.handle.setopt(pycurl.INFILESIZE, uploader.request_length)
headers = ["upload-offset: {}".format(uploader.offset),
"Content-Type: application/offset+octet-stream"] + uploader.headers_as_list
self.handle.setopt(pycurl.HTTPHEADER, headers)
def get_page_data(url, head = None, curl = None):
stream_buffer = StringIO()
if not curl:
curl = pycurl.Curl()
curl.setopt(pycurl.URL, url)#curl doesn't support unicode
if head:
curl.setopt(pycurl.HTTPHEADER, head)#must be list, not dict
curl.setopt(pycurl.WRITEFUNCTION, stream_buffer.write)
curl.setopt(pycurl.CUSTOMREQUEST,"GET")
curl.setopt(pycurl.CONNECTTIMEOUT, 30)
curl.setopt(pycurl.TIMEOUT, 30)
curl.setopt(pycurl.SSL_VERIFYPEER, 0)
curl.setopt(pycurl.SSL_VERIFYHOST, 0)
curl.perform()
page_data =stream_buffer.getvalue()
stream_buffer.close()
return page_data
def post_page_data(url, data = None, head = None, curl = None):
stream_buffer = StringIO()
if not curl:
curl = pycurl.Curl()
curl.setopt(pycurl.URL, url)#curl doesn't support unicode
if head:
curl.setopt(pycurl.HTTPHEADER, head)#must be list, not dict
curl.setopt(pycurl.POSTFIELDS, data)
curl.setopt(pycurl.WRITEFUNCTION, stream_buffer.write)
curl.setopt(pycurl.CUSTOMREQUEST,"POST")
# curl.setopt(pycurl.CONNECTTIMEOUT, 30)
# curl.setopt(pycurl.TIMEOUT, 30)
curl.perform()
page_data = stream_buffer.getvalue()
stream_buffer.close()
return page_data
def getlat4city():
avglat = -1
sp_url = "http://www.super-ping.com/ping.php?node=" + CITY + "&ping=" + WAN_IP
sp_refer_url = "http://www.super-ping.com/?ping=" + WAN_IP + "&locale=en"
sp_http_headers = [ 'Referer: ' + sp_refer_url, 'X-Requested-With: XMLHttpRequest']
crl = pyc.Curl()
sio = StringIO()
crl.setopt(pyc.URL, sp_url)
crl.setopt(pyc.HTTPHEADER, sp_http_headers)
crl.setopt(pyc.WRITEFUNCTION, sio.write)
crl.perform()
crl.close()
lat_http_result = sio.getvalue() #process http result only if html
if lat_http_result.strip() != "-" and lat_http_result.strip() != "super-ping.com":
fstring="ping-avg'>"
lstring="</div>"
start = lat_http_result.index(fstring) + len(fstring)
end = lat_http_result.index(lstring,start)
avglat = lat_http_result[start:end]
return float(avglat)
def Curl(url,headers):
while 1:
try:
c = pycurl.Curl()
c.setopt(pycurl.REFERER, 'http://weixin.sogou.com/')
c.setopt(pycurl.FOLLOWLOCATION, True)
c.setopt(pycurl.MAXREDIRS,5)
c.setopt(pycurl.CONNECTTIMEOUT, 60)
c.setopt(pycurl.TIMEOUT,120)
c.setopt(pycurl.ENCODING, 'gzip,deflate')
c.fp = StringIO.StringIO()
c.setopt(pycurl.URL, url)
c.setopt(pycurl.HTTPHEADER,headers)
c.setopt(c.WRITEFUNCTION, c.fp.write)
c.perform()
html = c.fp.getvalue()
if '??????' in html:
print u'??????,??10??'
time.sleep(600)
else:
return html
except Exception, e:
print url,'curl(url)',e
continue
#????????
def getHtml(url,headers):
c = pycurl.Curl() #??curl????????
c.setopt(pycurl.URL, url) #??????URL
c.setopt(pycurl.FOLLOWLOCATION, True) #????????
c.setopt(pycurl.MAXREDIRS,5) #?????????
c.setopt(pycurl.CONNECTTIMEOUT, 60) #??????
c.setopt(pycurl.TIMEOUT,120) #????
c.setopt(pycurl.ENCODING, 'gzip,deflate') #??gzip???????????????????gzip?????????gzip??????
c.fp = StringIO.StringIO() #??StringIO??
c.setopt(pycurl.HTTPHEADER,headers) #?????
c.setopt(pycurl.POST, 1) #??get
c.setopt(pycurl.POSTFIELDS, data) #??POST??
c.setopt(c.WRITEFUNCTION, c.fp.write) #???????
c.perform() #??
html = c.fp.getvalue() #?????
return html
def curl(url, debug=False, **kwargs):
while 1:
try:
s = StringIO.StringIO()
c = pycurl.Curl()
c.setopt(pycurl.URL, url)
c.setopt(pycurl.REFERER, url)
c.setopt(pycurl.FOLLOWLOCATION, True)
c.setopt(pycurl.TIMEOUT, 60)
c.setopt(pycurl.ENCODING, 'gzip')
c.setopt(pycurl.USERAGENT, 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/31.0.1650.63 Safari/537.36')
c.setopt(pycurl.NOSIGNAL, True)
c.setopt(pycurl.WRITEFUNCTION, s.write)
for k, v in kwargs.iteritems():
c.setopt(vars(pycurl)[k], v)
c.perform()
c.close()
return s.getvalue()
except:
if debug:
raise
continue
def scan_info(self, start_time, stop_time=0, start_mz=0, stop_mz=99999):
"""Gets a list of [(time, mz, scan_name, scan_type, scan_mode)] in the time and mz range provided
scan_name = the URL of the scan (goes to the HTML version)
All full MS scans that fall within the time range are included.
Only MS/MS scans that fall within the mz range (optional) are included
Example:
>>> scan_info = my_peakfile.scan_info(30.0, 35.0, 435.82, 436.00)
"""
if stop_time == 0:
stop_time = start_time
return [(t,mz,sn,st,sm) for t,mz,sn,st,sm in self.scans()
if start_time <= t <= stop_time and (st == 'MS1' or start_mz <= mz <= stop_mz)]
def xic(self, start_time, stop_time, start_mz, stop_mz, filter=None):
xic_url = str(self.data_file + ('/ric/%s-%s/%s-%s.txt' % (start_time, stop_time,
start_mz, stop_mz)))
self.crl.setopt(pycurl.HTTPGET, True)
self.crl.setopt(pycurl.URL, xic_url)
response = cStringIO.StringIO()
self.crl.setopt(pycurl.WRITEFUNCTION, response.write)
for i in range(5):
#print 'xic %d' % i
self.crl.perform()
if response.getvalue():
break
scan = response.getvalue().splitlines()
return [tuple(float(v) for v in s.split()) for s in scan]
def fetch(self, url, body=None, headers=None):
if not _allowedURL(url):
raise ValueError('Bad URL scheme: %r' % (url,))
if headers is None:
headers = {}
headers.setdefault(
'User-Agent',
"%s Python-urllib/%s" % (USER_AGENT, urllib2.__version__,))
req = urllib2.Request(url, data=body, headers=headers)
try:
f = self.urlopen(req)
try:
return self._makeResponse(f)
finally:
f.close()
except urllib2.HTTPError, why:
try:
return self._makeResponse(why)
finally:
why.close()
def test_gzip(url):
t = Test()
c = pycurl.Curl()
c.setopt(pycurl.WRITEFUNCTION,t.callback)
c.setopt(pycurl.ENCODING, 'gzip')
c.setopt(pycurl.URL,url)
c.setopt(pycurl.USERAGENT,"User-Agent':'EMAO_OPS_MONITOR) Gecko/20091201 Firefox/3.5.6)")
c.perform()
TOTAL_TIME = c.getinfo(c.TOTAL_TIME)
#print "????????%.2f ms" %(TOTAL_TIME*1000)
return TOTAL_TIME * 1000