def scan(self, time):
scan_time,mz,scan_name,st,scan_mode = min(self.scans(), key=lambda si: abs(time - si[0]))
self.crl.setopt(pycurl.HTTPGET, True)
self.crl.setopt(pycurl.URL, str(scan_name + '.txt'))
response = cStringIO.StringIO()
self.crl.setopt(pycurl.WRITEFUNCTION, response.write)
for i in range(5):
#print 'scan %d' % i
self.crl.perform()
if response.getvalue():
break
scan = response.getvalue().splitlines()
# how to get charge for an mzURL scan?
return mzScan([tuple(float(v) for v in s.split()) for s in scan],
scan_time, mode=scan_mode, mz=mz)
python类HTTPGET的实例源码
def xic(self, start_time, stop_time, start_mz, stop_mz, filter=None):
xic_url = str(self.data_file + ('/ric/%s-%s/%s-%s.txt' % (start_time, stop_time,
start_mz, stop_mz)))
self.crl.setopt(pycurl.HTTPGET, True)
self.crl.setopt(pycurl.URL, xic_url)
response = cStringIO.StringIO()
self.crl.setopt(pycurl.WRITEFUNCTION, response.write)
for i in range(5):
#print 'xic %d' % i
self.crl.perform()
if response.getvalue():
break
scan = response.getvalue().splitlines()
return [tuple(float(v) for v in s.split()) for s in scan]
def _init_method(self):
if self.m_request.m_method == "GET":
self.m_handle.setopt(pycurl.HTTPGET, 1)
elif self.m_request.m_method == "PUT":
self.m_handle.setopt(pycurl.PUT, 1)
elif self.m_request.m_method == "POST":
if self.m_request.m_data:
l_data = self.m_request.m_data
self.m_handle.setopt(pycurl.POSTFIELDS, l_data)
else:
self.m_handle.setopt(pycurl.CUSTOMREQUEST, "POST")
elif self.m_request.m_method == "HEAD":
self.m_handle.setopt(pycurl.NOBODY, 1)
elif self.m_request.m_method == "DELETE":
self.m_handle.setopt(pycurl.CUSTOMREQUEST, "DELETE")
def curl_get(self, url, refUrl=None):
buf = cStringIO.StringIO()
curl = pycurl.Curl()
curl.setopt(curl.URL, url)
curl.setopt(curl.WRITEFUNCTION, buf.write)
curl.setopt(pycurl.SSL_VERIFYPEER, 0)
#curl.setopt(pycurl.SSL_VERIFYHOST, 0)
#curl.setopt(pycurl.HEADERFUNCTION, self.headerCookie)
curl.setopt(pycurl.VERBOSE, 0)
curl.setopt(pycurl.USERAGENT, 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.11; rv:46.0) Gecko/20100101 Firefox/46.0')
#curl.setopt(pycurl.HTTPGET,1)
#curl.setopt(pycurl.COOKIE, Cookie)
#curl.setopt(pycurl.POSTFIELDS, 'j_username={ngnms_user}&j_password={ngnms_password}'.format(**self.ngnms_login))
curl.setopt(pycurl.COOKIEJAR, '/htdocs/logs/py_cookie.txt')
curl.setopt(pycurl.COOKIEFILE, '/htdocs/logs/py_cookie.txt')
if refUrl:
curl.setopt(pycurl.REFERER, refUrl)
#curl.setopt(c.CONNECTTIMEOUT, 5)
#curl.setopt(c.TIMEOUT, 8)
curl.perform()
backinfo = ''
if curl.getinfo(pycurl.RESPONSE_CODE) == 200:
backinfo = buf.getvalue()
curl.close()
return backinfo
def get(self, url="", params=None):
"Ship a GET request for a specified URL, capture the response."
if params:
url += "?" + urllib_parse.urlencode(params)
self.set_option(pycurl.HTTPGET, 1)
return self.__request(url)
def request(self, method, url, headers, post_data=None):
s = util.StringIO.StringIO()
rheaders = util.StringIO.StringIO()
curl = pycurl.Curl()
proxy = self._get_proxy(url)
if proxy:
if proxy.hostname:
curl.setopt(pycurl.PROXY, proxy.hostname)
if proxy.port:
curl.setopt(pycurl.PROXYPORT, proxy.port)
if proxy.username or proxy.password:
curl.setopt(
pycurl.PROXYUSERPWD,
"%s:%s" % (proxy.username, proxy.password))
if method == 'get':
curl.setopt(pycurl.HTTPGET, 1)
elif method == 'post':
curl.setopt(pycurl.POST, 1)
curl.setopt(pycurl.POSTFIELDS, post_data)
else:
curl.setopt(pycurl.CUSTOMREQUEST, method.upper())
# pycurl doesn't like unicode URLs
curl.setopt(pycurl.URL, util.utf8(url))
curl.setopt(pycurl.WRITEFUNCTION, s.write)
curl.setopt(pycurl.HEADERFUNCTION, rheaders.write)
curl.setopt(pycurl.NOSIGNAL, 1)
curl.setopt(pycurl.CONNECTTIMEOUT, 30)
curl.setopt(pycurl.TIMEOUT, 80)
curl.setopt(pycurl.HTTPHEADER, ['%s: %s' % (k, v)
for k, v in headers.items()])
if self._verify_ssl_certs:
curl.setopt(pycurl.CAINFO, os.path.join(
os.path.dirname(__file__), 'data/ca-certificates.crt'))
else:
curl.setopt(pycurl.SSL_VERIFYHOST, False)
try:
curl.perform()
except pycurl.error as e:
self._handle_request_error(e)
rbody = s.getvalue()
rcode = curl.getinfo(pycurl.RESPONSE_CODE)
return rbody, rcode, self.parse_headers(rheaders.getvalue())
def scans(self):
'''Gets the scan_info and caches it, to save extra trips to the server'''
if not self._scans:
self.crl.setopt(pycurl.HTTPGET, True)
self.crl.setopt(pycurl.URL, str(self.data_file + '/scan_info'))
# the scan_name is the URL of that scan, which is just the time
scan_url = self.data_file + '/scans/%s'
response = cStringIO.StringIO()
self.crl.setopt(pycurl.WRITEFUNCTION, response.write)
#self.crl.perform()
for i in range(5):
#print 'scans %d' % i
self.crl.perform()
if response.getvalue():
break
scans = response.getvalue().splitlines()
self._scans = []
for scan_line in scans:
time,mz,scan_type,scan_mode = scan_line.split()
self._scans.append((float(time),
float(mz),
scan_url % time,
scan_type.upper(),
scan_mode.lower()))
self._scans.sort()
return self._scans