def update_status():
'''
-< True : Version is test; no update needed
-> False : Check for update failed
-> str : Latest version to update to
'''
try:
r = requests.get(constants.UPDATES, timeout=5)
except (requests.ConnectionError, requests.ConnectTimeout,
requests.ReadTimeout):
return False
else:
latestVersion = r.json()['latest_version']
if constants.VERSION == latestVersion:
return True
else:
return latestVersion
python类ConnectTimeout()的实例源码
def get_title_by_url(url, timeout=5, pattern='<title>(.*?)</title>'):
"""return {url:title}, if title do not find we return{url:None}"""
try:
raw_http = requests.get(url, timeout=timeout)
raw_http.encoding = raw_http.apparent_encoding
except requests.ConnectionError or requests.ConnectTimeout:
logger_util.log_debug('Connect failed to %s ' % url)
return
title = re.findall(pattern, raw_http.text)
if not title:
logger_util.log_debug('This page do not have title %s' % url)
return {url: None}
else:
return {url: title[0]}
# ----------------------------------------------------------------------
def test_raises_error_with_any_other_requests_exception(self, mock):
exceptions = [
requests.exceptions.HTTPError,
requests.exceptions.ConnectionError,
requests.exceptions.ProxyError,
requests.exceptions.SSLError,
requests.exceptions.Timeout,
requests.exceptions.ConnectTimeout,
requests.exceptions.ReadTimeout,
requests.exceptions.URLRequired,
requests.exceptions.TooManyRedirects,
requests.exceptions.MissingSchema,
requests.exceptions.InvalidSchema,
requests.exceptions.InvalidURL,
requests.exceptions.InvalidHeader,
requests.exceptions.ChunkedEncodingError,
requests.exceptions.ContentDecodingError,
requests.exceptions.StreamConsumedError,
requests.exceptions.RetryError,
requests.exceptions.UnrewindableBodyError,
]
mock.side_effect = exceptions
for _ in exceptions:
with self.assertRaises(HTTPError):
request('GET', 'foo')
def crawl(self, job: Job):
try:
worker = Worker(job)
worker.start()
worker.join()
except ValueError as e:
print("Couldn't parse url: ", job.url, e)
pass
except (requests.ConnectionError, requests.ConnectTimeout, requests.exceptions.SSLError) as e:
print("Couldn't parse url: ", job.url, e.strerror)
pass
else:
while not linkparser.links.empty():
job = linkparser.links.get()
if job.priority < self._depth:
self.crawl(job)
linkparser.links.task_done()
def crawl(self, job: Job):
try:
r = requests.get(job.url)
link_parser = LinkParser(job)
link_parser.parse(r.text)
links = link_parser.get_links()
except ValueError as e:
print("Couldn't parse url: ", job.url, e)
pass
except (requests.ConnectionError, requests.ConnectTimeout, requests.exceptions.SSLError) as e:
print("Couldn't parse url: ", job.url, e.strerror)
pass
else:
while not links.empty():
job = links.get()
self.print_status(job, links)
if job.priority < self._depth:
self.crawl(job)
links.task_done()
def __call__(self):
try:
req = requests.get(self.url, headers=self.header,
timeout=10, proxies=self.proxies)
except requests.ConnectTimeout or requests.exceptions.ReadTimeout as e:
print(f"??{self.url}????")
self.status = False
return {"status": self.status, 'html': ''}
try:
encodeing = chardet.detect(req.content)['encoding']
html = req.content.decode(encodeing, errors='replace')
except Exception as e:
print(e)
print("?????,??????......")
self.status = False
return {"status": self.status, 'html': ''}
return {"status": self.status, 'html': html}
def test_on_exception(self, catch_signal):
with patch('requests.Session.request', side_effect=ConnectTimeout):
with catch_signal(on_exception) as handler:
send_mail(fail_silently=True, **SEND_KWARGS)
assert handler.called
kwargs = handler.call_args[1]
assert kwargs['sender'] == EmailBackend
assert kwargs['signal'] == on_exception
assert isinstance(kwargs['exception'], ConnectTimeout)
assert len(kwargs['raw_messages']) == 1
def request(self, api_name=None, pk=None, method='get', use_auth=True,
data=None, params=None, content_type='application/json'):
if api_name in self.api_url_mapping:
path = self.api_url_mapping.get(api_name)
if pk and '%s' in path:
path = path % pk
else:
path = '/'
url = self.endpoint.rstrip('/') + path
self.req = req = Request(url, method=method, data=data,
params=params, content_type=content_type,
app_name=self.app_name)
if use_auth:
if not self._auth:
raise RequestError('Authentication required')
else:
self._auth.sign_request(req)
try:
result = req.request()
if result.status_code > 500:
logging.warning('Server internal error')
except (requests.ConnectionError, requests.ConnectTimeout):
result = FakeResponse()
logging.warning('Connect endpoint: {} error'.format(self.endpoint))
return self.parse_result(result)
def main(url, timeout=30, redirect_unknown=True, debug=False):
"""Actual monitoring execution"""
logging.basicConfig(level=logging.WARNING)
logger = logging.getLogger(__name__)
if debug: # pragma: no cover
logger.setLevel(logging.DEBUG)
logger.info('debug logging enabled')
# Check if URL is valid
logger.debug('perform URL validation check')
if not valid_http_url(url):
nagios.plugin_exit(nagios.Codes.UNKNOWN, 'provided URL is not valid')
# Send a HEAD request
logger.debug('send HEAD request')
try:
response = requests.head(url, timeout=timeout)
except requests.ConnectTimeout:
nagios.plugin_exit(nagios.Codes.CRITICAL, 'connection timeout')
except requests.ReadTimeout:
nagios.plugin_exit(nagios.Codes.CRITICAL, 'no response received before'
'timeout')
except requests.ConnectionError:
nagios.plugin_exit(nagios.Codes.UNKNOWN, 'connection error')
else:
logger.debug('response received')
if response.status_code == requests.codes.ok:
# Response is OK
nagios.plugin_exit(nagios.Codes.OK,
'status code is %d' % response.status_code)
elif redirect_unknown and response.status_code == requests.codes.found:
# Redirect considered as UNKNOWN
nagios.plugin_exit(nagios.Codes.UNKNOWN,
'redirection with code %d' %
response.status_code)
else:
# Other code, considered not working
nagios.plugin_exit(nagios.Codes.CRITICAL,
'status code is %d' % response.status_code)
def __request_data(self):
if self.__is_cache_valid():
return True
# requesting data
try:
# setting 5 sec timeouts for connect and read
r = requests.get(self.data_url, verify=False, allow_redirects=True, timeout=(5, 5))
except requests.ConnectionError as e:
print("Unable to connect to ", self.data_url, " error is ", e, file=sys.stderr)
return False
except requests.ConnectTimeout as e:
print("Timed out connection to ", self.data_url, " error is ", e, file=sys.stderr)
return False
except requests.ReadTimeout as e:
print("Timed out while reading data from ", self.data_url, " error is ", e, file=sys.stderr)
return False
if r.status_code == 200:
# got HTTP/200 for request - storing it in cache
try:
open(self.temp_file_name, mode="w").write(json.dumps(r.json()))
except IOError as e:
print("IO error while trying to store cache into file ", self.temp_file_name, " error is ",
e, file=sys.stderr)
return False
return True
else:
return False
def isActiveLink(link):
try:
request = requests.get(link)
return request.status_code
except (requests.ConnectionError, requests.ConnectTimeout):
return False
def downloadFile(url):
tries = 0
while tries < 3:
try:
r = requests.get(BASE_URL + url)
r.encoding = "utf-8"
fileContent = r.text
return fileContent.strip(), len(fileContent)
except (requests.ConnectionError, requests.ConnectTimeout) as e:
tries += 1
if tries >= 3:
raise e
# Decorator
def _download(self, request, spider):
def _retry():
if self.retry_on_download_timeout:
self.logger.debug('Read timed out, retry request {}'.format(request))
self.crawl(request, spider)
try:
self._process_request(request, spider)
if request is None:
return
method = request.method.upper()
resp = None
kw_params = {
'timeout': self.download_timeout,
'cookies': request.cookies,
'headers': request.headers,
'proxies': {
'http': request.proxy,
'https': request.proxy
}
}
self.logger.debug('[{}]<{} {}>'.format(spider.name, method, request.url))
if method == 'GET':
resp = requests.get(request.url, **kw_params)
elif method == 'POST':
resp = requests.post(request.url, request.data, **kw_params)
self._responses_queue.put((Response(resp.url, resp.status_code, resp.content, request,
resp.cookies), spider))
except (requests.ReadTimeout, requests.ConnectTimeout, requests.ConnectionError):
_retry()
except Exception as err:
self.logger.error(err, exc_info=True)
def search_by_baidu(self, url, index):
# print 'baidu', GetNowTime()
if url is None:
url = r'https://www.baidu.com/s?&wd=site%3A' + self.domain
try:
baidu_rsp = requests.get(url=url, headers=headers, timeout=5)
except requests.ConnectTimeout, requests.ConnectionError:
# print e.message, "????????"
return url, index
def is_https1(self):
url = self.PREFIX_URL2 + self.raw_domain
try:
requests.get(url, timeout=4)
except requests.ConnectionError, requests.ConnectTimeout:
return False
def is_https2(self):
url = self.PREFIX_URL1 + self.raw_domain
try:
requests.get(url, timeout=4)
except requests.ConnectionError, requests.ConnectTimeout:
return False
def get_html(url):
for i in range(3):
try:
head=headers
head['Referer']='https://www.bing.com'
rsp = requests.get(url, headers=head, timeout=5)
except requests.ConnectionError, requests.ConnectTimeout:
# print e.message, 'bing?????????5?'
time.sleep(5)
continue
if rsp.status_code == 200:
root = etree.HTML(rsp.text)
return root
else:
time.sleep(5)
def acme_enabled(url):
"if given url can be hit and it looks like the acme hidden dir exists, return True."
url = 'http://' + url + "/.well-known/acme-challenge/" # ll: http://lax.elifesciences.org/.well-known/acme-challenge
try:
resp = requests.head(url, allow_redirects=False)
if 'crm.elifesciences' in url:
return resp.status_code == 404 # apache behaves differently to nginx
return resp.status_code == 403 # forbidden rather than not found.
except (requests.ConnectionError, requests.ConnectTimeout):
# couldn't connect for whatever reason
return False
def _download(self,url='http://zt.bdinfo.net/speedtest/wo3G.rar',filepath="/work/test/tempdown"):
'''
Download file from website to local file
@param:url,download url
@param:filepath:the file location u want to save file
@return:None
'''
if not url:
return
print('downloading:%s' % url)
#r=requests.get(url,stream=True,timeout=5)
filename=os.path.join(filepath,'.'+urlparse.urlsplit(url).path)
chunk_size=1024*1024
print filename
if not os.path.exists(os.path.dirname(filename)):
os.makedirs(os.path.dirname(filename))
try:
r=requests.get(url,stream=True,timeout=5)
with open(filename,'wb') as f:
for data in r.iter_content(chunk_size=chunk_size):
#data=temp.read(1024*1024)
f.write(data)
self._add_downloaded(url)
except requests.ConnectTimeout,requests.ReadTimeout:
print("Download %s timeout,this will redownload later.\n" % (url))
if self._redis_enable:
self._r.lpush(self._download_list,url)
def _get_response(self,url,method='get',headers={},files=[],data=None,cookies=None,cert=None,timeout=30,**kwargs):
method=method.upper()
#self._s.headers.update(headers)
pre=requests.Request(method=method,url=url,data=data,files=files)
prepped=self._s.prepare_request(pre)
try:
with self._s.send(prepped,stream=True,cert=cert,timeout=timeout) as resp:
#self._header.parse_header(dict(resp.headers))
#self._s.headers.update(self._header.get_default_header())
#content_type=resp.headers.get('content-type')
#encoding=self._get_content_encoding(content_type)
#regx=re.compile('.*(text\/html|text\/xml).*')
if resp.status_code==requests.codes.OK:
#Don't handle redirect url for now
'''
with open('temp.txt','wb') as f:
f.write(resp.content)
'''
self._handler.handle_data(resp)
elif resp.status_code!=requests.codes.OK:
print("Connected url %s \t %d" % (url,resp.status_code))
else:
#If the response is not html or xml ,save the url to redis
pass
except requests.ConnectTimeout,requests.ReadTimeout:
print('Connect %s timeout .\n' % (url))
if self._redis_enable:
self._r.lpush(self._start,url)
else:
task.url_task.append(url)
def request(method, url, *args, sign=True, **kwargs):
custom_ca_certs_file = get_custom_ca_certs_file()
if custom_ca_certs_file is not None and 'verify' not in kwargs:
kwargs['verify'] = custom_ca_certs_file
try:
if sign:
response = Request(url, method, *args, **kwargs).send()
else:
response = requests.request(
method, url, *args, timeout=30, **kwargs)
except HTTPError as error:
raise error
except (requests.exceptions.MissingSchema,
requests.exceptions.InvalidSchema,
requests.exceptions.URLRequired,
requests.exceptions.InvalidURL):
raise HTTPError('You have provided an invalid server URL.')
except requests.ConnectTimeout:
raise HTTPError('Connection timed out. Try again later.')
except requests.ConnectionError:
raise HTTPError('Server is not available. Try again later.')
except requests.RequestException:
raise HTTPError(UNKNOWN_ERROR)
if response.status_code == 401:
raise HTTPError('Unautorized. Did you set your credentials?')
elif not response.ok:
raise HTTPError(format_server_error(response))
return response
def test_raises_error_when_server_is_unavailable(self, mock):
exceptions = [requests.ConnectionError, requests.ConnectTimeout]
mock.side_effect = exceptions
for _ in exceptions:
with self.assertRaises(HTTPError):
request('GET', 'foo')
def execute(self, method, *args):
payload = dumps(args, methodname=method, allow_none=True)
body = gzip.compress(payload.encode('utf8'))
try:
res = await self.loop.run_in_executor(None, self.__request, body)
data, _ = loads(res.text, use_datetime=True)
if isinstance(data, (tuple, list)) and len(data) > 0 and len(data[0]) > 0:
if isinstance(data[0][0], dict) and 'faultCode' in data[0][0]:
raise DedimaniaFault(faultCode=data[0][0]['faultCode'], faultString=data[0][0]['faultString'])
self.retries = 0
return data[0]
raise DedimaniaTransportException('Invalid response from dedimania!')
except (ConnectionError, ReadTimeout, ConnectionRefusedError) as e:
raise DedimaniaTransportException(e) from e
except ConnectTimeout as e:
raise DedimaniaTransportException(e) from e
except DedimaniaTransportException:
# Try to setup new session.
self.retries += 1
if self.retries > 5:
raise DedimaniaTransportException('Dedimania didn\'t gave the right answer after few retries!')
self.client = requests.session()
try:
await self.authenticate()
return await self.execute(method, *args)
except Exception as e:
logger.error('XML-RPC Fault retrieved from Dedimania: {}'.format(str(e)))
handle_exception(e, __name__, 'execute')
raise DedimaniaTransportException('Could not retrieve data from dedimania!')
except DedimaniaFault as e:
if 'Bad SessionId' in e.faultString or ('SessionId' in e.faultString and 'not found' in e.faultString):
try:
self.retries += 1
if self.retries > 5:
raise DedimaniaTransportException('Max retries reached for reauthenticating with dedimania!')
await self.authenticate()
return await self.execute(method, *args)
except:
return
logger.error('XML-RPC Fault retrieved from Dedimania: {}'.format(str(e)))
handle_exception(e, __name__, 'execute', extra_data={
'dedimania_retries': self.retries,
})
raise DedimaniaTransportException('Could not retrieve data from dedimania!')
def get_urls(self, get_proxie_or_not=False):
"""
:type get_proxie_or_not: bool
:param get_proxie_or_not: ??????ip
:return: ?????url???????url??
"""
list_url = []
try:
if get_proxie_or_not:
p = Proxies()
p.get_ip_and_port()
self.session.proxies = {
"http": p.ip_and_port,
"https": p.ip_and_port
}
response = self.session.get(self.start_url, timeout=30)
if response.status_code == 200:
html = response.content
else:
# ??selenium+phantomjs???????
# ??phantomjs????????
desired_capabilities = DesiredCapabilities.PHANTOMJS.copy()
headers = self.headers
for key, value in headers.iteritems():
desired_capabilities['phantomjs.page.customHeaders.{}'.format(key)] = value
driver = webdriver.PhantomJS(
desired_capabilities=desired_capabilities
)
driver.get(self.start_url)
html = driver.page_source
driver.quit()
soup = BeautifulSoup(html, 'lxml')
# ?????????????BeautifulSoup?????
urls = soup.find()
assert urls is not None
repeat_num = 0
for url in urls:
if url['href'] not in list_url:
list_url.append(url['href'])
else:
repeat_num += 1
print "??%d??????????" % repeat_num
except requests.ConnectTimeout:
print "url????????????????"
if list_url:
return list_url
else:
print "??url????????"
raise ValueError
def get_htmls(self, urls, get_proxie_or_not=False):
"""
:type urls: list
:type get_proxie_or_not: bool
:param urls: ?????url??
:param get_proxie_or_not: ??????ip
:return: ???????html??
"""
list_html = []
for url in urls:
try:
if get_proxie_or_not:
p = Proxies()
p.get_ip_and_port()
self.session.proxies = {
"http": p.ip_and_port,
"https": p.ip_and_port
}
response = self.session.get(url, timeout=30)
if response.status_code == 200:
html = response.content
else:
# ??selenium+phantomjs???????
# ??phantomjs????????
desired_capabilities = DesiredCapabilities.PHANTOMJS.copy()
headers = self.headers
for key, value in headers.iteritems():
desired_capabilities['phantomjs.page.customHeaders.{}'.format(key)] = value
driver = webdriver.PhantomJS(
desired_capabilities=desired_capabilities
)
driver.get(self.start_url)
html = driver.page_source
driver.quit()
assert html is not None
list_html.append(BeautifulSoup(html, 'lxml'))
except requests.ConnectTimeout:
print "url????"
if list_html:
return list_html
else:
print "??html???????????"
raise ValueError
def checkin(cred, code):
'''
-> 0: Successful check-in
-> 1: No internet connection
-> 2: Invalid credentials
-> 3: Not connected to SunwayEdu Wi-Fi
-> 4: Invalid code
-> 5: Wrong class
-> 6: Already checked-in
'''
# Start a session
session = requests.Session()
# Login to iZone
payload = {
'form_action': 'submitted',
'student_uid': cred[0],
'password': cred[1],
}
try:
r = session.post(constants.LOGIN, data=payload)
except requests.ConnectionError:
return 1
if not r.history:
return 2
# Check for SunwayEdu Wi-Fi
try:
r = requests.get(constants.WIFI, timeout=2)
except requests.ConnectTimeout:
return 3
except requests.ConnectionError:
return 1
# Check-in with code
try:
r = session.post(constants.CHECKIN, data={'checkin_code': code},
timeout=2)
except (requests.ReadTimeout, requests.ConnectionError):
return 1
if 'Checkin code not valid.' in r.text or \
'The specified URL cannot be found.' in r.text:
return 4
if 'You cannot check in to a class you are not a part of.' in r.text:
return 5
if 'You have already checked in' in r.text:
return 6
return 0