def test_init_without_cacert(self, disable_warning_mock):
ris_client = ris.RISOperations(
"x.x.x.x", "admin", "Admin", bios_password='foo')
self.assertEqual(ris_client.host, "x.x.x.x")
self.assertEqual(ris_client.login, "admin")
self.assertEqual(ris_client.password, "Admin")
self.assertIsNone(ris_client.cacert)
disable_warning_mock.assert_called_once_with(
urllib3_exceptions.InsecureRequestWarning)
python类InsecureRequestWarning()的实例源码
def test_init_without_cacert(self, disable_warning_mock):
ribcl_client = ribcl.RIBCLOperations(
"x.x.x.x", "admin", "Admin", 60, 443)
self.assertEqual(ribcl_client.host, "x.x.x.x")
self.assertEqual(ribcl_client.login, "admin")
self.assertEqual(ribcl_client.password, "Admin")
self.assertEqual(ribcl_client.timeout, 60)
self.assertEqual(ribcl_client.port, 443)
self.assertIsNone(ribcl_client.cacert)
disable_warning_mock.assert_called_once_with(
urllib3_exceptions.InsecureRequestWarning)
def _suppress_ssl_warning(self):
# Disable https warning because of non-standard certs on appliance
try:
self.logger.debug("Suppressing SSL Warnings.")
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
except AttributeError:
self.logger.warning("load requests.packages.urllib3.disable_warnings() failed")
def _suppress_ssl_warning(self):
# Disable https warning because of non-standard certs on appliance
try:
self.logger.debug("Suppressing SSL Warnings.")
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
except AttributeError:
self.logger.warning("load requests.packages.urllib3.disable_warnings() failed")
def __init__(self, filename = None, activity = None, name = None, description = None, private = False, trainer = False, commute = False, format = 'gpx', handle = None, apiKey = None):
self.apiKey = None
self.reset()
# turn off the warning because we're not checking the cert
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
def attach_disk(bkpid,diskid,snapid):
xmlattach = "<disk id=\""+diskid+"\"><snapshot id=\""+snapid+"\"/> <active>true</active></disk>"
urlattach = url+"/v3/vms/"+bkpid+"/disks/"
headers = {'Content-Type': 'application/xml', 'Accept': 'application/xml'}
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
resp_attach = requests.post(urlattach, data=xmlattach, headers=headers, verify=False, auth=(user,password))
# Funcion para desactivar disco virtual
# Function to deactivate virtual disk
def detach_disk(bkpid,diskid):
urldelete = url+"/vms/"+bkpid+"/diskattachments/"+diskid
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
requests.delete(urldelete, verify=False, auth=(user,password))
# Funcion para obtener el nombre de dispositivo de disco virtual
# Function to get device name of a virtual disk
def _import_file(self, filename):
if not self.api_key:
key = self.device.keygen()
else:
key = self.api_key
params = {
'type': 'import',
'category': 'configuration',
'key': key
}
path = os.path.basename(filename)
mef = requests_toolbelt.MultipartEncoder(
fields={
'file': (path, open(filename, 'rb'), 'application/octet-stream')
}
)
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
url = 'https://{0}/api/'.format(self.hostname)
request = requests.post(
url,
verify=False,
params=params,
headers={'Content-Type': mef.content_type},
data=mef
)
# if something goes wrong just raise an exception
request.raise_for_status()
response = xml.etree.ElementTree.fromstring(request.content)
if response.attrib['status'] == 'error':
return False
else:
return path
def ignore_warnings():
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
requests.packages.urllib3.disable_warnings(SubjectAltNameWarning)
requests.packages.urllib3.disable_warnings(InsecurePlatformWarning)
requests.packages.urllib3.disable_warnings(SNIMissingWarning)
def issue_request(self, method, params, endpoint=None):
if params is None:
params = {}
# NOTE(jdg): We allow passing in a new endpoint to issue_api_req
# to enable some of the multi-cluster features like replication etc
if endpoint is None:
endpoint_dict = self.endpoint_dict
payload = {'method': method, 'params': params}
url = '%s/json-rpc/%s/' % (endpoint_dict['url'], self.endpoint_version)
LOG.debug('Issue SolidFire API call: %s' % json.dumps(payload))
start_time = time.time()
with warnings.catch_warnings():
warnings.simplefilter("ignore", exceptions.InsecureRequestWarning)
req = requests.post(url,
data=json.dumps(payload),
auth=(endpoint_dict['login'],
endpoint_dict['password']),
verify=False,
timeout=30)
# FIXME(jdg): Failure cases like wrong password
# missing something that cause req.json to puke
response = req.json()
req.close()
end_time = time.time()
duration = end_time - start_time
LOG.debug('Raw response data from SolidFire API: %s' % response)
# TODO(jdg): Add check/retry catch for things where it's appropriate
if 'error' in response:
msg = ('API response: %s'), response
self.request_history.append(
(method, start_time, duration, 'failed'))
LOG.error('Error in API request: %s' % response['error'])
raise sfexceptions.SolidFireRequestException(msg)
self.request_history.append(
(method, start_time, duration, 'ok'))
return response['result']
def send_request(self, method, params, version='1.0', endpoint=None):
if params is None:
params = {}
# NOTE(jdg): We allow passing in a new endpoint to issue_api_req
# to enable some of the multi-cluster features like replication etc
if endpoint is None:
endpoint_dict = self.endpoint_dict
payload = {'method': method, 'params': params}
url = '%s/json-rpc/%s/' % (endpoint_dict['url'], version)
LOG.debug('Issue SolidFire API call: %s' % json.dumps(payload))
with warnings.catch_warnings():
warnings.simplefilter("ignore", exceptions.InsecureRequestWarning)
req = requests.post(url,
data=json.dumps(payload),
auth=(endpoint_dict['login'],
endpoint_dict['password']),
verify=False,
timeout=30)
response = req.json()
req.close()
# TODO(jdg): Fix the above, failure cases like wrong password
# missing something that cause req.json to puke
LOG.debug('Raw response data from SolidFire API: %s' % response)
# TODO(jdg): Add check/retry catch for things where it's appropriate
if 'error' in response:
msg = ('API response: %s'), response
raise SolidFireRequestException(msg)
return response['result']
def live_dangerously(self):
"""
Disable SSL checking. Note that this should *only* be used while developing
against a local Socrata instance.
"""
import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
self.verify = False
def __init__(self, **kwargs):
super(Microsoft, self).__init__(**kwargs)
import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
self.directline_host = kwargs.get('directline_host', 'https://directline.botframework.com')
# NOTE: Direct Line client credentials are different from your bot's
# credentials
self.direct_line_token_or_secret = kwargs.\
get('direct_line_token_or_secret')
authorization_header = 'BotConnector {}'.\
format(self.direct_line_token_or_secret)
self.headers = {
'Authorization': authorization_header,
'Content-Type': 'application/json',
'Accept': 'application/json',
'charset': 'utf-8'
}
conversation_data = self.start_conversation()
self.conversation_id = conversation_data.get('conversationId')
self.conversation_token = conversation_data.get('token')
def setUpClass(cls):
"""Do an initial DB clean up in case something went wrong the last time.
If a test failed really badly, the DB might be in a bad state despite
attempts to clean up during tearDown().
"""
warnings.filterwarnings('ignore')
cls.rmt = BossRemote('test.cfg', API_VER)
# Turn off SSL cert verification. This is necessary for interacting with
# developer instances of the Boss.
cls.rmt.project_service.session_send_opts = {'verify': False}
cls.rmt.metadata_service.session_send_opts = {'verify': False}
cls.rmt.volume_service.session_send_opts = {'verify': False}
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
cls.create_grp_name = 'int_test_group{}'.format(random.randint(0, 9999))
cls.existing_grp_name = 'int_test_group_exists{}'.format(random.randint(0, 9999))
cls.rmt.create_group(cls.existing_grp_name)
cls.user_name = 'bossadmin'
# Create a new user because the user tests run under, will
# automatically be a member and maintainer of any groups created
# during testing.
cls.created_user = 'group_test_johndoeski{}'.format(random.randint(0, 9999))
password = 'myPassW0rd'
cls.rmt.add_user(cls.created_user, 'John', 'Doeski', 'jdoe{}@rime.com'.format(random.randint(0, 9999)),
password)
token = cls.get_access_token(cls.created_user, password)
cls.login_user(token)
def setUpClass(cls):
"""Do an initial DB clean up in case something went wrong the last time.
If a test failed really badly, the DB might be in a bad state despite
attempts to clean up during tearDown().
"""
cls.rmt = BossRemote('test.cfg', API_VER)
# Turn off SSL cert verification. This is necessary for interacting with
# developer instances of the Boss.
cls.rmt.project_service.session_send_opts = {'verify': False}
cls.rmt.metadata_service.session_send_opts = {'verify': False}
cls.rmt.volume_service.session_send_opts = {'verify': False}
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
coll_name = 'collection_perm_test-{}'.format(random.randint(0, 9999))
cls.coll = CollectionResource(coll_name, 'bar')
cf_name = 'PermissionTestFrame{}'.format(random.randint(0, 9999))
cls.coord = CoordinateFrameResource(
cf_name, 'Test coordinate frame.', 0, 10, -5, 5, 3, 6,
1, 1, 1, 'nanometers', 0, 'nanoseconds')
cls.exp = ExperimentResource(
'perm_test_exp', cls.coll.name, cls.coord.name, 'my experiment', 1,
'isotropic', 1)
cls.chan = ChannelResource(
'perm_test_ch', cls.coll.name, cls.exp.name, 'image', 'test channel',
0, 'uint8', 0)
cls.grp_name = 'int_perm_test_group{}'.format(random.randint(0, 9999))
cls.rmt.create_project(cls.coll)
cls.rmt.create_project(cls.coord)
cls.rmt.create_project(cls.exp)
cls.rmt.create_project(cls.chan)
cls.rmt.create_group(cls.grp_name)
def __init__(self, service_information, proxies=None):
self.service_information = service_information
self.proxies = proxies if proxies is not None else dict(http='', https='')
self.authorization_code_context = None
self.refresh_token = None
self._session = None
if service_information.skip_ssl_verifications:
from requests.packages.urllib3.exceptions import InsecureRequestWarning
import warnings
warnings.filterwarnings('ignore', 'Unverified HTTPS request is being made.*', InsecureRequestWarning)
def main():
# We silence the insecure requests warnings we get for using
# self-signed certificates.
disable_warnings(InsecureRequestWarning)
cli(obj=RemoteAppRestContext())
def __init__(self, log_level, hostname,
username, password, verify=True, prefix=""):
"""
Constructor, creating the class. It requires specifying a
hostname, username and password to access the API. After
initialization, a connected is established.
:param log_level: log level
:type log_level: logging
:param hostname: Foreman host
:type hostname: str
:param username: API username
:type username: str
:param password: corresponding password
:type password: str
:param verify: force SSL verification
:type verify: bool
:param prefix: API prefix (e.g. /katello)
:type prefix: str
"""
#set logging
self.LOGGER.setLevel(log_level)
#disable SSL warning outputs
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
#set connection information
self.HOSTNAME = self.validate_hostname(hostname)
self.USERNAME = username
self.PASSWORD = password
self.VERIFY = verify
self.URL = "https://{0}{1}/api/v2".format(self.HOSTNAME, prefix)
#start session and check API version if Foreman API
self.__connect()
if prefix == "":
self.validate_api_support()
def __init__ (self):
# Disable insecure-certificate-warning message
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
return
def perform_query(self, method, path, data = {}, headers = None):
"""set up connection and perform query"""
if headers is None:
headers = self.default_headers
with warnings.catch_warnings():
warnings.simplefilter("ignore", exceptions.InsecureRequestWarning)
resp = self.session.request(method, self.url + path, data = json.dumps(data),
verify = self.verify, headers = headers)
try:
resp.raise_for_status()
except requests.exceptions.HTTPError, e:
raise e
return resp.json()