def __init__(self, host, username=None, password=None, verify_ssl=True, user_agent=None, cert=None):
self.host = host
self.username = username
self.password = password
self.cert = cert
self.verify_ssl = verify_ssl
if not user_agent:
self.user_agent = 'webinspectapi/' + version
else:
self.user_agent = user_agent
if not self.verify_ssl:
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# Set auth_type based on what's been provided
if username is not None:
self.auth_type = 'basic'
elif cert is not None:
self.auth_type = 'certificate'
else:
self.auth_type = 'unauthenticated'
python类disable_warnings()的实例源码
def get_e2e_configuration():
config = Configuration()
config.host = None
if os.path.exists(
os.path.expanduser(kube_config.KUBE_CONFIG_DEFAULT_LOCATION)):
kube_config.load_kube_config(client_configuration=config)
else:
print('Unable to load config from %s' %
kube_config.KUBE_CONFIG_DEFAULT_LOCATION)
for url in ['https://%s:8443' % DEFAULT_E2E_HOST,
'http://%s:8080' % DEFAULT_E2E_HOST]:
try:
urllib3.PoolManager().request('GET', url)
config.host = url
config.verify_ssl = False
urllib3.disable_warnings()
break
except urllib3.exceptions.HTTPError:
pass
if config.host is None:
raise unittest.SkipTest('Unable to find a running Kubernetes instance')
print('Running test against : %s' % config.host)
config.assert_hostname = False
return config
def do_get(self, url, top_level=False, top_level_path=""):
parts = list(urlparse.urlparse(url))
# 2 is the path offset
if top_level:
parts[2] = '/' + top_level_path
parts[2] = MULTIPLE_SLASH.sub('/', parts[2])
url = urlparse.urlunparse(parts)
try:
if self.disable_ssl_validation:
urllib3.disable_warnings()
http = urllib3.PoolManager(cert_reqs='CERT_NONE')
else:
http = urllib3.PoolManager()
r = http.request('GET', url, headers=self.headers)
except Exception as e:
LOG.error("Request on service '%s' with url '%s' failed",
(self.name, url))
raise e
if r.status >= 400:
raise ServiceError("Request on service '%s' with url '%s' failed"
" with code %d" % (self.name, url, r.status))
return r.data
def client(self):
cls = self.__class__
if cls._client is None:
if self.verify_ssl is False:
import requests
requests.packages.urllib3.disable_warnings()
import urllib3
urllib3.disable_warnings()
if self.username and self.password:
self.log.debug("Creating Kubernetes client from username and password")
cls._client = KubernetesClient.from_username_password(self.host, self.username, self.password,
verify_ssl=self.verify_ssl)
else:
self.log.debug("Creating Kubernetes client from Service Account")
cls._client = KubernetesClient.from_service_account(self.host, verify_ssl=self.verify_ssl)
return cls._client
def orion_init():
""" Prompts for Orion credentials and returns a SwisClient object
"""
global orion_server
global orion_username
global orion_password
if not orion_username:
default_username = getuser()
orion_username = raw_input('Orion username [' + default_username + ']: ') or default_username
if not orion_password:
orion_password = getpass('Orion password: ')
# SolarWinds-Orion is a special hostname in /etc/hosts
# This was necessary to implement SSL checking
# https://github.com/solarwinds/orionsdk-python#ssl-certificate-verification
# this disables the SubjectAltNameWarning
urllib3.disable_warnings()
# TODO: Need a better/more resilient way of referencing server cert.
return orionsdk.SwisClient('SolarWinds-Orion', orion_username, orion_password, verify='server.pem')
def urllib3_disable_warnings():
if 'requests' not in sys.modules:
import requests
else:
requests = sys.modules['requests']
# On latest Fedora, this is a symlink
if hasattr(requests, 'packages'):
requests.packages.urllib3.disable_warnings() # pylint: disable=maybe-no-member
else:
# But with python-requests-2.4.3-1.el7.noarch, we need
# to talk to urllib3 directly
have_urllib3 = False
try:
if 'urllib3' not in sys.modules:
import urllib3
have_urllib3 = True
except ImportError:
pass
if have_urllib3:
# Except only call disable-warnings if it exists
if hasattr(urllib3, 'disable_warnings'):
urllib3.disable_warnings()
def send_push(file):
try:
urllib3.disable_warnings()
pb = Pushbullet(API_KEY)
#push = pb.push_note(pb.devices[3]['iden'],'Alarm', 'Motion detected')
push = pb.push_note('Alarm', 'Motion detected')
print "push-uploading file.."
with open(file, 'rb') as vid:
file_data = pb.upload_file(vid, 'video.mkv')
push = pb.push_file(**file_data)
# only for debug:
#pushes = pb.get_pushes()
#latest = pushes[0]
#print latest
except Exception, error:
print "Error in send_push: " + str(error)
def send(self,u,m="GET",p=None,h=None,c=None):
if p is None : p = {}
if h is None : h = {}
if c is not None : c = {c:''}
if '-r' in sys.argv or '--ragent' in sys.argv:
h['user-agent'] = self.ragent()
else:
h['user-agent'] = self.agent
# request
request = requests.Session()
req = urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# get
if m.lower()=='get':
if p: u='{}'.format(Request.ucheck.payload(u,p))
req = request.request(
method=m.upper(),url=u,headers=h,cookies=c,timeout=self.time,
allow_redirects=self.redir,proxies={'http':self.proxy,'https':self.proxy},verify=False)
# post
elif m.lower()=='post':
req = request.request(
method=m.upper(),url=u,headers=h,cookies=c,timeout=self.time,
allow_redirects=self.redir,proxies={'http':self.proxy,'https':self.proxy},verify=False)
# req
return req
def remote_image_exist(registry, image, tag):
urllib3.disable_warnings()
url = IMAGE_TAGS_URL % dict(registry=registry, image=image)
response = requests.get(url=url, verify=False)
if response.status_code != http_client.OK:
return False
info = response.json()
return tag in info['tags']
def get_remote_image_info(image, registry):
urllib3.disable_warnings()
image_info = []
url = IMAGE_TAGS_URL % dict(registry=registry, image=image)
response = requests.get(url=url, verify=False)
info = response.json()
if response.ok:
image_info += [[registry, image, tag] for tag in info['tags']]
else:
if info['errors'][0]['code'] == 'NAME_UNKNOWN':
pass
else:
raise Exception(info)
return image_info
def get_image_digest(registry, image, tag):
urllib3.disable_warnings()
url = MANIFEST_URL % dict(registry=registry, image=image, reference=tag)
headers = {"Accept": "application/vnd.docker.distribution.manifest.v2+json"}
response = requests.get(url=url, headers=headers, verify=False)
return response.headers['Docker-Content-Digest']
def __init__(self):
"""
???,???????.
"""
self._logger = logger
self._token = None
# ????
self._session = requests.session()
# ?? http ???
self._session.headers.update({
'User-Agent': 'com.zhihu.android/Futureve/5.1.1 Mozilla/5.0 (Linux; Android 4.4.4; 2014811 Build/KTU84P) '
'AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/33.0.0.0 Mobile Safari/537.36 '
'Google-HTTP-Java-Client/1.22.0 (gzip)',
'x-api-version': '3.0.66',
'x-app-version': '5.1.1',
'x-app-za': 'OS=Android&Release=4.4.4&Model=2014811&VersionName=5.1.1&VersionCode=533&Width=720'
'&Height=1280&Installer=360&WebView=33.0.0.0DeviceType=AndroidPhoneBrand=Xiaomi',
'x-app-build': 'release',
'x-network-type': 'WiFi',
'x-suger': 'SU1FST04Njc2MjIwMjQ1Njc4MDU7QU5EUk9JRF9JRD1jOTY1NGVkMzcwMWRjYjU1O01BQz05Yzo5OTphMDpiZjo3YzpjMQ',
'x-udid': 'AGCCEL7IpwtLBdi3V7e7dsEXtuW9g1-pets=',
'Connection': 'keep-alive',
})
# ??? ssl
self._session.verify = False
# ?????? HTTPS ?????????
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
isi_data_insights_config.py 文件源码
项目:isilon_data_insights_connector
作者: Isilon
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def _build_cluster_configs(cluster_list):
cluster_configs = []
for cluster in cluster_list:
username, password, verify_ssl = _get_cluster_auth_data(cluster)
if cluster in g_cluster_configs:
cluster_name, isi_sdk, api_client, version = \
g_cluster_configs[cluster]
else:
if verify_ssl is False:
urllib3.disable_warnings()
try:
isi_sdk, api_client, version = \
isi_sdk_utils.configure(
cluster, username, password, verify_ssl)
except RuntimeError as exc:
print >> sys.stderr, "Failed to configure SDK for " \
"cluster %s. Exception raised: %s" \
% (cluster, str(exc))
sys.exit(1)
print "Configured %s as version %d cluster, using SDK %s." \
% (cluster, int(version), isi_sdk.__name__)
cluster_name = \
_query_cluster_name(cluster, isi_sdk, api_client)
g_cluster_configs[cluster] = \
cluster_name, isi_sdk, api_client, version
cluster_config = \
ClusterConfig(
cluster, cluster_name, version, isi_sdk, api_client)
cluster_configs.append(cluster_config)
return cluster_configs
def http_request(path, query='', json_body=None, method='get', config=None):
config = read_home_config() if config is None else config
try:
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
except Exception: pass
if query is not None:
if isinstance(query, dict):
query = urlencode(query)
if '?' in path:
path += '&' + query
else:
path += '?' + query
url = 'https://' + config['host'] + '/api/' + path
auth = None
if 'auth_user' in config:
auth = HTTPBasicAuth(config['auth_user'], config['auth_pw'])
if json_body is not None and method == 'get':
method = 'post'
response = requests.request(
method, url, data=json_body,
auth=auth, verify=config['ssl_verify'],
headers={'Accept': 'application/json'}
)
if response.status_code >= 400:
raise_response_exception('Failed request ' + path, response)
return parse_json(response.content.decode('utf-8'))
def __init__(self, host, api_key, verify_ssl=True, timeout=30, user_agent=None, cert=None, debug=False):
"""
Initialize a ThreadFix API instance.
:param host: The URL for the ThreadFix server. (e.g., http://localhost:8080/threadfix/)
:param api_key: The API key generated on the ThreadFix API Key page.
:param verify_ssl: Specify if API requests will verify the host's SSL certificate, defaults to true.
:param timeout: HTTP timeout in seconds, default is 30.
:param user_agent: HTTP user agent string, default is "threadfix_api/[version]".
:param cert: You can also specify a local cert to use as client side certificate, as a single file (containing
the private key and the certificate) or as a tuple of both file’s path
:param debug: Prints requests and responses, useful for debugging.
"""
self.host = host
self.api_key = api_key
self.verify_ssl = verify_ssl
self.timeout = timeout
if not user_agent:
self.user_agent = 'threadfix_api/' + version
else:
self.user_agent = user_agent
self.cert = cert
self.debug = debug # Prints request and response information.
if not self.verify_ssl:
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) # Disabling SSL warning messages if verification is disabled.
# Team
def __init__(self, host, token, verify_ssl=True):
if 'github.com' in host:
self.host = 'https://api.github.com'
else:
self.host = host + '/api/v3'
self.token = token
self.verify_ssl = verify_ssl
if not self.verify_ssl:
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
def disableInsecureRequestWarning():
try:
try:
urllib3 = requests.packages.urllib3
except AttributeError:
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
except Exception as e:
ERROR('???? InsecureRequestWarning ????%s', e)
def getFAinfo(flash_array, apitoken):
# Retrieves the basic statistics of the given flash_array
urllib3.disable_warnings()
fa = purestorage.FlashArray(flash_array, api_token=apitoken)
fainfo = fa.get(action='monitor')
fa.invalidate_cookie()
return(fainfo)
def main1():
urllib3.disable_warnings()
#?????????warning
global http
http = urllib3.PoolManager()
#?????
global headers
headers = { #????????
'User-Agent':'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/31.0.1650.63 Safari/537.36',
#'Accept-Language':'zh-CN',
'Content-type':'text/html;charset=utf-8'
}
#?????????????????https??????????
global url
url='https://www.qiushibaike.com'
print ('1')
try_1()
print ('2')
#content=downloadwebpage(url)
pattern = re.compile('<span>(.*?)</span>',re.S)
b=re.findall(pattern,content)
#??????????????????????????????
print ('??IP?'+b[0],'\n?????'+b[1])
#?????????
return b[0]
def main1():
urllib3.disable_warnings()
#?????????warning
global http
http = urllib3.PoolManager()
#?????
global headers
headers = { #????????
'User-Agent':'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/31.0.1650.63 Safari/537.36',
#'Accept-Language':'zh-CN',
'Content-type':'text/html;charset=utf-8'
}
#?????????????????https??????????
global url
url='https://www.qiushibaike.com'
print ('1')
try_1()
print ('2')
#content=downloadwebpage(url)
pattern = re.compile('<span>(.*?)</span>',re.S)
b=re.findall(pattern,content)
#??????????????????????????????
print ('??IP?'+b[0],'\n?????'+b[1])
#?????????
return b[0]
def main():
print ('start')
urllib3.disable_warnings()
#?????????warning
global http
http = urllib3.PoolManager()
#?????
global headers
headers = { #????????
'User-Agent':'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/31.0.1650.63 Safari/537.36',
#'Accept-Language':'zh-CN',
'Content-type':'text/html;charset=utf-8'
}
#?????????????????https??????????
global url
url='https://www.qiushibaike.com'
print ('url is ',url)
try_1()
#content=downloadwebpage(url)
pattern = re.compile('<span>(.*?)</span>',re.S)
b=re.findall(pattern,content)
#??????????????????????????????
print ('????'+b[0])
#?????????
return(b[0])
def main():
print ('start')
urllib3.disable_warnings()
#?????????warning
global http
http = urllib3.PoolManager()
#?????
global headers
headers = { #????????
'User-Agent':'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/31.0.1650.63 Safari/537.36',
#'Accept-Language':'zh-CN',
'Content-type':'text/html;charset=utf-8'
}
#?????????????????https??????????
global url
url='https://www.qiushibaike.com'
print ('url is ',url)
try_1()
#content=downloadwebpage(url)
pattern = re.compile('<span>(.*?)</span>',re.S)
b=re.findall(pattern,content)
#??????????????????????????????
print ('????'+b[0])
#?????????
return(b[0])
def __init__(self, api_key):
self.api_key = api_key
urllib3.disable_warnings()
def __init__(self, host_port, num_tries=5, delay_in_sec=5):
urllib3.disable_warnings()
if host_port.startswith('http://') or host_port.startswith('https://'):
self.url = host_port
else:
self.url = 'https://' + host_port
self.num_tries = num_tries
self.delay_in_sec = delay_in_sec
self.headers = None
self.conn = None
self.init_connection()
def __init__(self):
"""
Initialize all needed Variables
"""
self.log = logging.getLogger('Icinga2API.client')
self.connection = Session()
self.connection.headers.update({'Accept': 'application/json'})
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
def report_usage_and_get_warnings(calico_version, hostname, cluster_guid, cluster_size, cluster_type):
"""Reports the cluster's guid, size and version to projectcalico.org.
Logs out of date calico versions, to the standard log file.
Logs warnings returned by the usage server. The warnings might including warning
if using a version of Calico that is no longer supported
:param calico_version: the calico version
:param hostname: the agent's hostname
:param cluster_guid: the unique cluster identifier
:param cluster_size: the number of felix instances
:cluster_type: the type of cluster
"""
_log.info("report_usage_and_get_warnings calico_version=%s, hostname=%s, guid=%s, size=%s, cluster_type=%s", calico_version, hostname, cluster_guid, cluster_size, cluster_type)
try:
url = 'https://usage.projectcalico.org/UsageCheck/calicoVersionCheck'
urllib3.disable_warnings()
http = urllib3.PoolManager()
fields = {
'version': calico_version,
'hostname': hostname,
'guid': cluster_guid,
'size': cluster_size,
'cluster_type': cluster_type
}
# Exponential backoff retry
# http://urllib3.readthedocs.io/en/latest/reference/urllib3.util.html#module-urllib3.util.retry
# Note this retry is not required to prevent thundering herd, because the jitter takes care of that
# It is simply an additional retry in case of dropped or lost connections.
retries = urllib3.util.retry.Retry(connect=5, read=5, redirect=5, backoff_factor=1.0)
# Send the Usage Report to projectcalico.org
r = http.request('GET', url, fields=fields, retries=retries)
reply = r.data.decode('utf-8')
_log.info("usage_report status=%s, reply=%s", r.status, reply)
except Exception:
_log.exception("Exception in usage_report")
def report_usage_and_get_warnings(calico_version, hostname, cluster_guid, cluster_size, cluster_type):
"""Reports the cluster's guid, size and version to projectcalico.org.
Logs out of date calico versions, to the standard log file.
Logs warnings returned by the usage server. The warnings might including warning
if using a version of Calico that is no longer supported
:param calico_version: the calico version
:param hostname: the agent's hostname
:param cluster_guid: the unique cluster identifier
:param cluster_size: the number of felix instances
:cluster_type: the type of cluster
"""
_log.info("report_usage_and_get_warnings calico_version=%s, hostname=%s, guid=%s, size=%s, cluster_type=%s", calico_version, hostname, cluster_guid, cluster_size, cluster_type)
try:
url = 'https://usage.projectcalico.org/UsageCheck/calicoVersionCheck'
urllib3.disable_warnings()
http = urllib3.PoolManager()
fields = {
'version': calico_version,
'hostname': hostname,
'guid': cluster_guid,
'size': cluster_size,
'cluster_type': cluster_type
}
# Exponential backoff retry
# http://urllib3.readthedocs.io/en/latest/reference/urllib3.util.html#module-urllib3.util.retry
# Note this retry is not required to prevent thundering herd, because the jitter takes care of that
# It is simply an additional retry in case of dropped or lost connections.
retries = urllib3.util.retry.Retry(connect=5, read=5, redirect=5, backoff_factor=1.0)
# Send the Usage Report to projectcalico.org
r = http.request('GET', url, fields=fields, retries=retries)
reply = r.data.decode('utf-8')
_log.info("usage_report status=%s, reply=%s", r.status, reply)
except Exception:
_log.exception("Exception in usage_report")
def get_remote_version():
""" Show remote version """
config = FileReader()
if hasattr(urllib3, 'disable_warnings'):
urllib3.disable_warnings()
http = urllib3.PoolManager()
response = http.request('GET', config.get_config().get('info', 'setup'))
config = config.get_config_raw(response.data)
return config.get('info', 'version')
zabbix-monitor-nginx-stub-status.py 文件源码
项目:LinuxBashShellScriptForOps
作者: DingGuodong
项目源码
文件源码
阅读 33
收藏 0
点赞 0
评论 0
def http_request(self):
import requests
import urllib3
urllib3.disable_warnings()
headers = {
'cache-control': "no-cache",
}
response = requests.request("GET", self.url, headers=headers)
if response.status_code == 200:
return response.text.encode('utf-8')
else:
return ""
def get(self, host, params=()):
# type: (object, object) -> object
"""Get metadata by url"""
self.__is_server_online(host)
self.__disable_verbose()
self.__parse_params(params)
scheme, host = urlparse(host).scheme, urlparse(host).netloc
self.DEFAULT_HTTP_PROTOCOL = scheme + "://"
self.urls = self.__get_urls(host)
response = {}
self.HEADER['user-agent'] = self.reader.get_random_user_agent()
log.info("user-agent : " + self.HEADER['user-agent'])
log.info('Thread num : ' + str(self.threads))
try:
httplib.HTTPConnection.debuglevel = self.debug
if hasattr(urllib3, 'disable_warnings'):
urllib3.disable_warnings()
if scheme == "http":
self.http = urllib3.HTTPConnectionPool(host.split(':')[0],
port=80 if len(host.split(':')) == 1 else int(
host.split(':')[1]), block=True, maxsize=10)
elif scheme == "https":
self.http = urllib3.HTTPSConnectionPool(host.split(':')[0],
port=443 if len(host.split(':')) == 1 else int(
host.split(':')[1]), block=True, maxsize=10)
else:
log.critical("not support http protocl, Exit now ")
sys.exit(1);
pool = threadpool.ThreadPool(self.threads)
requests = threadpool.makeRequests(self.request, self.urls)
for req in requests:
pool.putRequest(req)
time.sleep(1)
pool.wait()
except exceptions.AttributeError as e:
log.critical(e.message)
except KeyboardInterrupt:
log.warning('Session canceled')
sys.exit()
self.counter['total'] = self.urls.__len__()
self.counter['pools'] = pool.workers.__len__()
response['count'] = self.counter
response['result'] = self.result
return response