def checkenv(self):
try:
import requests.packages.urllib3
except ImportError:
raise Exception('requests package not installed. Run pip install -r requirements.txt and try again.')
# Disable warnings about SSL connections
try:
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
except ImportError:
pass
try:
from requests.packages.urllib3.exceptions import InsecurePlatformWarning
requests.packages.urllib3.disable_warnings(InsecurePlatformWarning)
except ImportError:
pass
python类InsecureRequestWarning()的实例源码
def __init__(self, base_url, project, user, password, verify=False, timeout=None, auth_type=None):
if not base_url.endswith('/'):
base_url += '/'
collection, project = self.get_collection_and_project(project)
# Remove part after / in project-name, like DefaultCollection/MyProject => DefaultCollection
# API responce only in Project, without subproject
self._url = base_url + '%s/_apis/' % collection
if project:
self._url_prj = base_url + '%s/%s/_apis/' % (collection, project)
else:
self._url_prj = self._url
self.http_session = requests.Session()
auth = auth_type(user, password)
self.http_session.auth = auth
self.timeout = timeout
self._verify = verify
if not self._verify:
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
def get_room_id(room_name, token):
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
id = ""
headers = {'Authorization':'Bearer '+token,
'Content-Type':'application/json'}
resp = requests.get('https://api.ciscospark.com/v1/rooms',
verify=False,headers=headers,proxies=proxies)
if resp.status_code == 200:
rooms = json.loads(resp.text)['items']
for room in rooms:
if room['title'] == room_name:
id = room['id']
return id
def post_message(message_text, room_id, token, markdown=False):
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
headers = {'Authorization':'Bearer '+token,
'Content-Type':'application/json'}
if markdown:
body = json.dumps({'roomId':room_id,'markdown':message_text})
else:
body = json.dumps({'roomId':room_id,'text':message_text})
resp = requests.post('https://api.ciscospark.com/v1/messages',
verify=False,headers=headers,data=body,proxies=proxies)
return resp
def test_connection(self):
"""tests connection to OpenNMS REST API and returns HTTP status code
returns -1, if there was an error connecting to the server
"""
# return code
output = -1
# config
config_rest_url = self.__source.source_url
config_rest_user = self.__source.source_user
config_rest_pw = self.__source.source_password
# test connection to OpenNMS REST API
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
request_url = config_rest_url + "/info"
try:
response = requests.get(request_url, auth=(config_rest_user, config_rest_pw),
verify=False)
except:
return output
return response.status_code
def __init__(self, host, login, password, bios_password=None,
cacert=None):
self.host = host
self.login = login
self.password = password
self.bios_password = bios_password
# Message registry support
self.message_registries = {}
self.cacert = cacert
# By default, requests logs following message if verify=False
# InsecureRequestWarning: Unverified HTTPS request is
# being made. Adding certificate verification is strongly advised.
# Just disable the warning if user intentionally did this.
if self.cacert is None:
urllib3.disable_warnings(urllib3_exceptions.InsecureRequestWarning)
def __init__(self, host, login, password, timeout=60, port=443,
cacert=None):
"""Constructor for RIBCLOperations.
"""
self.host = host
self.login = login
self.password = password
self.timeout = timeout
self.port = port
self.cacert = cacert
# By default, requests logs following message if verify=False
# InsecureRequestWarning: Unverified HTTPS request is
# being made. Adding certificate verification is strongly advised.
# Just disable the warning if user intentionally did this.
if self.cacert is None:
urllib3.disable_warnings(urllib3_exceptions.InsecureRequestWarning)
def setUpClass(cls):
"""Do an initial DB clean up in case something went wrong the last time.
If a test failed really badly, the DB might be in a bad state despite
attempts to clean up during tearDown().
"""
cls.rmt = BossRemote('test.cfg', API_VER)
# Turn off SSL cert verification. This is necessary for interacting with
# developer instances of the Boss.
cls.rmt.project_service.session_send_opts = {'verify': False}
cls.rmt.metadata_service.session_send_opts = {'verify': False}
cls.rmt.volume_service.session_send_opts = {'verify': False}
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
cls.admin = 'admin'
cls.user_mgr = 'user-manager'
cls.rsrc_mgr = 'resource-manager'
cls.user = 'role_test_user{}'.format(random.randint(0, 9999))
cls.rmt.add_user(cls.user, 'John', 'Doe', 'jd{}@me.com'.format(random.randint(0, 9999)), 'password')
cls.cleanup_db()
def setUpClass(cls):
"""Do an initial DB clean up in case something went wrong the last time.
If a test failed really badly, the DB might be in a bad state despite
attempts to clean up during tearDown().
"""
warnings.filterwarnings('ignore')
cls.rmt = BossRemote('test.cfg', API_VER)
# Turn off SSL cert verification. This is necessary for interacting with
# developer instances of the Boss.
cls.rmt.project_service.session_send_opts = {'verify': False}
cls.rmt.metadata_service.session_send_opts = {'verify': False}
cls.rmt.volume_service.session_send_opts = {'verify': False}
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
cls.user = 'user_test_user{}'.format(random.randint(0, 9999))
cls.first_name = 'john'
cls.last_name = 'doe'
cls.email = 'jd{}@me.com'.format(random.randint(0, 9999))
cls.password = 'password'
def getHTMLCookies(url):
"""
????????
"""
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
proxies = {
"http": "http://36.111.205.166:80",
"https": "http://36.111.205.166:80"
}
headers = {
'user-agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36',
}
try:
r = requests.get(url, proxies=proxies, headers=headers, verify=False, timeout=20)
r.raise_for_status()
r.encoding = 'utf-8'
return r.cookies
except Exception:
logger.exception("Download HTML Text failed")
def getHTMLText(url, cookies):
"""
????????
"""
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
proxies = {
"http": "http://36.111.205.166:80",
"https": "http://36.111.205.166:80"
}
headers = {
'user-agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36',
}
try:
r = requests.get(url, proxies=proxies, headers=headers, cookies=cookies, verify=False, timeout=20)
r.raise_for_status()
r.encoding = 'utf-8'
return r.text
except Exception:
logger.exception("Download HTML Text failed")
def getStations():
"""
???????????
"""
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
st_dict = {}
stations_url = 'https://kyfw.12306.cn/otn/resources/js/framework/station_name.js?station_version=1.8971'
html = requests.get(stations_url, verify=False)
html.raise_for_status()
stations_list = re.findall(r'[\u4e00-\u9fa5]+\|[A-Z]+', html.text)
for stations in stations_list:
area = str(stations).split('|')
stations_name = area[0]
stations_code = area[1]
st_dict[stations_name] = stations_code
return st_dict
def runnessus(self, url, api_akey, api_skey, policy, targets, scan_name, insecure):
if insecure:
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
scan = ness6rest.Scanner(url=url, api_akey=api_akey, api_skey=api_skey,
insecure=insecure)
scan.policy_set(name=policy)
logging.debug("TARGETS: {t}".format(t=targets))
scan.scan_add(targets=targets, name=scan_name)
scan.action(action="scans/" + str(scan.scan_id) + "/launch",
method="POST")
scan.scan_uuid = scan.res["scan_uuid"]
print("{i} SCAN NAME: {n}".format(i=ctinfo, n=scan.scan_name))
print("{i} SCAN UUID: {n}".format(i=ctinfo, n=scan.scan_uuid))
self.scanstatus(scan.tag_id, scan.scan_uuid, url, api_akey, api_skey,
insecure)
with suppress_stdout():
output = str(scan.download_scan(export_format="nessus"))
return output
def cli():
"""command-line interface"""
arguments = docopt(__doc__)
from_station = stations.get(arguments['<from>'])
to_station = stations.get(arguments['<to>'])
date = arguments['<date>']
# URL
url = 'https://kyfw.12306.cn/otn/lcxxcx/query?purpose_codes=ADULT&queryDate={}&from_station={}&to_station={}'.format(
date, from_station, to_station
)
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
r = requests.get(url,verify = False)
print(r)
rows = r.json()['data']['datas']
trains = TrainCollection(rows)
trains.pretty_print()
def __init__(self, apiKey, server, username=None, password=None):
if not ((apiKey or (username and password)) and server):
raise ValueError("You must provide server argument and key or user & password")
if not server.find('https://') == 0 and not server.find('http://') == 0:
raise ValueError("Server must be a url (e.g. 'https://<server>' or 'http://<server>')")
if not server[-1] == '/':
server += '/'
self.server = server
self.session = Session()
if not apiKey:
self.xsrf = True
try:
r = self.session.get(server, verify=False)
except InsecureRequestWarning:
pass
self.token = r.cookies[DemistoClient.XSRF_COOKIE_KEY]
self.username = username
self.password = password
else:
self.xsrf = False
self.apiKey = apiKey
def req(self, method, path, data):
h = {"Accept": "application/json",
"Content-type": "application/json"}
if self.xsrf:
h[DemistoClient.XSRF_TOKEN_KEY] = self.token
else:
h[DemistoClient.AUTHORIZATION] = self.apiKey
try:
if self.session:
r = self.session.request(method, self.server+path, headers=h, verify=False, json=data)
else:
raise RuntimeError("Session not initialized!")
except InsecureRequestWarning:
pass
return r
def __init__(self, http_config):
assert isinstance(http_config, HTTPConfig), "not an instance of HTTPConfig"
self.http_config = http_config
super(CustomHTTPAdapter, self).__init__(max_retries=http_config.max_retries)
self._logger = logging.getLogger(__name__)
if not http_config.verify_ssl_cert:
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
# pylint: disable=R0913
def __init__(self):
self.__host = None
self.__user = None
self.__pwd = None
self.__SSL_VERIFY = False
self.__POST_HEADERS = None
self.__APIVER = None
# TODO: If log level is DEBUG avoid this code
logging.getLogger('requests').setLevel(logging.CRITICAL)
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
def create_unverified_session(session, suppress_warning=True):
"""
Create a unverified session to disable the server certificate verification.
This is not recommended in production code.
"""
session.verify = False
if suppress_warning:
# Suppress unverified https request warnings
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
return session
def __init__(self, hostname, username, password, verify=True, prefix=""):
"""
Constructor, creating the class. It requires specifying a
hostname, username and password to access the API. After
initialization, a connected is established.
:param hostname: Foreman host
:type hostname: str
:param username: API username
:type username: str
:param password: corresponding password
:type password: str
:param verify: force SSL verification
:type verify: bool
:param prefix: API prefix (e.g. /katello)
:type prefix: str
"""
#disable SSL warning outputs
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
#set connection information
self.HOSTNAME = self.validate_hostname(hostname)
self.USERNAME = username
self.PASSWORD = password
self.VERIFY = verify
self.URL = "https://{0}{1}/api/v2".format(self.HOSTNAME, prefix)
#start session and check API version if Foreman API
self.__connect()
if prefix == "":
self.validate_api_support()
def init(self, out_stream, url, proxy=None, g_stop_event=None, maxbitrate=0):
global clientHeader, gproxy, session, use_proxy
try:
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
session = requests.Session()
session.cookies = cookieJar
self.init_done = False
self.init_url = url
clientHeader = None
self.proxy = proxy
use_proxy = False
if self.proxy and len(self.proxy) == 0:
self.proxy = None
gproxy = self.proxy
self.out_stream = out_stream
if g_stop_event: g_stop_event.clear()
self.g_stopEvent = g_stop_event
self.maxbitrate = maxbitrate
if '|' in url:
sp = url.split('|')
url = sp[0]
clientHeader = sp[1]
log(clientHeader)
clientHeader = urlparse.parse_qsl(clientHeader)
log('header received now url and headers are %s | %s' % (url, clientHeader))
self.url = url
return True
except:
traceback.print_exc()
return False
def __init__(self, base_url, username, password, verify, organization):
"""
:param base_url: Url to connect to
:type base_url: str
:param username: Username for querying
:type username: str
:param password: Password for querying
:type password: str
:param verify: Whether to accept self-signed ssl
:type verify: bool
:param organization: Organization to use
:type organization: str
:returns: KatelloConnection object
:rtype: KatelloConnection
"""
self.organization = organization
self.base_url = base_url
self.session = requests.Session()
self.session.auth = (username, password)
self.session.verify = verify
self.post_headers = {'Content-Type': 'application/json'}
if not verify:
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
# Check if we can authenticate
if 'results' not in self.session.get('%s/katello/api/v2/organizations' % self.base_url).json():
raise AuthenticationError('Authentication failed')
def validate_cert(self, validate_cert: bool):
if not isinstance(validate_cert, bool):
raise ValueError(f"{type(validate_cert)} is not a valid validate_cert argument type")
self._caller.validate_cert = bool(validate_cert)
if not validate_cert:
# Warnings in urllib3 can only be disabled, not reenabled
urllib3.disable_warnings(InsecureRequestWarning)
def __init__(self):
self._backend = PY_ACTIVE_SYNC
self._creds = {
'server': None,
'user': None,
'password': None,
'domain': None, # This could be optional.
'device_id': None, # This could be optional.
}
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
def create_room(room_name, token):
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
headers = {'Authorization':'Bearer '+token,
'Content-Type':'application/json'}
body = json.dumps({'title':room_name})
resp = requests.post('https://api.ciscospark.com/v1/rooms',
verify=False,headers=headers,data=body)
print resp
def list_rooms(token):
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
headers = {'Authorization':'Bearer '+token,
'Content-Type':'application/json'}
resp = requests.get('https://api.ciscospark.com/v1/rooms',
verify=False,headers=headers)
return resp
def list_messages(room_id, token):
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
headers = {'Authorization':'Bearer '+token,
'Content-Type':'application/json'}
resp = requests.get('https://api.ciscospark.com/v1/messages?roomId={}'.format(room_id),
verify=False,headers=headers)
return resp.text
def cleanup_room(room_id, token):
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
messages = json.loads(list_messages(room_id, token))
headers = {'Authorization':'Bearer '+token,
'Content-Type':'application/json'}
for message in messages['items']:
resp = requests.delete('https://api.ciscospark.com/v1/messages/{}'.format(message['id']),
verify=False,headers=headers)
print resp.status_code
def send_message(self, message):
"""send message
Args:
- message: message to send
"""
# set parameters
target = self.get_parameter("target")
url = self.get_parameter("url")
url_parameters = {
"login": self.get_parameter("user"),
"pass": self.get_parameter("password"),
"to": target,
"message": message
}
# send HTTP GET request to SMS Eagle
try:
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
response = requests.get(url, params=url_parameters)
# check response
if response.status_code != 200:
self._logger.error("Could not send SMS to %s: %s", target, message)
else:
self._logger.info("Send SMS to %s: %s", target, message)
except:
self._logger.error("Could not connect to SMS Eagle")
def test_init_without_cacert(self, disable_warning_mock):
rest_client = v1.RestConnectorBase(
"x.x.x.x", "admin", "Admin", bios_password='foo')
self.assertEqual(rest_client.host, "x.x.x.x")
self.assertEqual(rest_client.login, "admin")
self.assertEqual(rest_client.password, "Admin")
self.assertIsNone(rest_client.cacert)
disable_warning_mock.assert_called_once_with(
urllib3_exceptions.InsecureRequestWarning)