def __init__(self, target):
# Target comes as protocol://target:port/path
self.target = target
proto, host, path = target.split(':')
host = host[2:]
self.path = '/' + path.split('/', 1)[1]
if proto.lower() == 'https':
#Create unverified (insecure) context
try:
uv_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
self.session = HTTPSConnection(host,context=uv_context)
except AttributeError:
#This does not exist on python < 2.7.11
self.session = HTTPSConnection(host)
else:
self.session = HTTPConnection(host)
self.lastresult = None
python类HTTPSConnection()的实例源码
def get_status(self,gps_lat,gps_lon,weather):
if Globals.AirConnected:
try:
self.connection = httplib.HTTPSConnection(Globals.httpsAddr, Globals.httpsPort, timeout=Globals.timeOut)
self.headers = Globals.xapikey
self.connection.request('GET', '/status/v2/point?latitude='+gps_lat+'&longitude='+gps_lon+'&weather='+weather, '', self.headers)
result = self.connection.getresponse().read()
self.status_json = json.loads(result)
Globals.strPrint (self.thisGlobals,self.status_json)
return (self.status_json['status'] == "success")
except Exception,e:
Globals.strPrint (self.thisGlobals,"Airmap: No Connection or slow connection ->Request Timeout...")
#Globals.strPrint(self.thisGlobals,str(e))
return False
else:
Globals.strPrint (self.thisGlobals,"Not Connected")
return False
def get_SecureToken(self):
"""Retrieve security token and refresh
:param: None
:returns: Token if successful otherwise False
:todo: Remove hardcoded token and add token from https endpoint based on CID
"""
try:
connectAuth0 = httplib.HTTPSConnection(Globals.keyAddr, Globals.httpsPort, timeout=Globals.timeOut)
headers = Globals.xapikey
connectAuth0.request('POST', '/delegation', json.dumps({"refresh_token":"ezKrfuSeSD8DA7w2Dq7gqsL10sYuKdVEXA6BIIJLEAJQh","grant_type":"urn:ietf:params:oauth:grant-type:jwt-bearer","client_id":"2iV1XSfdLJNOfZiTZ9JGdrNHtcNzYstt","api_type":"app"}), headers)
result = connectAuth0.getresponse().read()
parsed_json = json.loads(result)
Globals.myToken = parsed_json['id_token']
return Globals.myToken
except:
print "OAuth2 Error..."
traceback.print_exc()
return False
def end_Flight(self, flightID):
try:
connectFlight = httplib.HTTPSConnection(Globals.httpsAddr, Globals.httpsPort, timeout=Globals.timeOut)
headers = Globals.xapikey
headers['Authorization'] = "Bearer {}".format(Globals.myToken)
connectFlight.request('POST', '/flight/v2/{}/end'.format(flightID), '', headers)
result = connectFlight.getresponse().read()
parsed_json = json.loads(result)
parsed_status = parsed_json['status']
if parsed_status != "success":
return False
else:
return True
except:
print "End Flight Error..."
traceback.print_exc()
def delete_Flight(self, flightID):
try:
connectFlight = httplib.HTTPSConnection(Globals.httpsAddr, Globals.httpsPort, timeout=Globals.timeOut)
headers = Globals.xapikey
headers['Authorization'] = "Bearer {}".format(Globals.myToken)
connectFlight.request('POST', '/flight/v2/{}/delete'.format(flightID), '', headers)
result = connectFlight.getresponse().read()
parsed_json = json.loads(result)
parsed_status = parsed_json['status']
if parsed_status != "success":
return False
else:
return True
except:
print "End Flight Error..."
traceback.print_exc()
def start_comm(self, flightID):
try:
connectFlight = httplib.HTTPSConnection(Globals.httpsAddr, Globals.httpsPort, timeout=Globals.timeOut)
headers = Globals.xapikey
headers['Authorization'] = "Bearer {}".format(Globals.myToken)
connectFlight.request('POST', '/flight/v2/{}/start-comm'.format(flightID), '', headers)
result = connectFlight.getresponse().read()
parsed_json = json.loads(result)
print parsed_json
#parsed_status = parsed_json['data']['key']['data']
parsed_status = parsed_json['data']['key']
print "H:" + parsed_status
#thisKey = (''.join(str(hex(i)[2:].zfill(2)) for i in parsed_status)).decode('hex')
thisKey = parsed_status.decode('base64')
return thisKey
except:
print "Could Not Start Comms..."
traceback.print_exc()
def recover_Pilot(self):
try:
connectFlight = httplib.HTTPSConnection(Globals.httpsAddr, Globals.httpsPort, timeout=Globals.timeOut)
headers = Globals.xapikey
headers['Authorization'] = "Bearer {}".format(Globals.myToken)
connectFlight.request('GET', '/pilot/v2/profile', "", headers)
result = connectFlight.getresponse().read()
try:
parsed_json = json.loads(result)
parsed_status = parsed_json['status']
print parsed_status
Globals.pilot_id = parsed_json['data']['id']
Globals.pilotIDValid = True
except:
Globals.strPrint (self.thisGlobals,"Pilot Recover ID not found...Retry!")
Globals.strPrint (self.thisGlobals,result)
return False
if parsed_status != "success":
return False
except:
print "Create Flight Error..."
traceback.print_exc()
return Globals.pilot_id
def _http_request(self, url, timeout=40):
try:
if not url: url = '/'
conn_fuc = httplib.HTTPSConnection if self.schema == 'https' else httplib.HTTPConnection
conn = conn_fuc(self.host, timeout=timeout)
conn.request(method='GET', url=url,
headers={'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/38.0.2125.111 Safari/537.36 BBScan/1.0'}
)
resp = conn.getresponse()
resp_headers = dict(resp.getheaders())
status = resp.status
if resp_headers.get('content-type', '').find('text') >= 0 or resp_headers.get('content-type', '').find('html') >= 0 or \
int(resp_headers.get('content-length', '0')) <= 1048576:
html_doc = self._decode_response_text(resp.read())
else:
html_doc = ''
conn.close()
return status, resp_headers, html_doc
except Exception, e:
#logging.error('[Exception in InfoDisScanner._http_request] %s' % e)
return -1, {}, ''
def api_request(command):
ret = {}
conn = httplib.HTTPSConnection(target_url)
try:
conn.request("GET", "/api/" + command + "/")
except Exception as e:
logMsg("ERROR api_request(): " + str(e))
else:
response = conn.getresponse()
if response.status != 200:
logMsg("ERROR api_request(): " + str(response.status) + " " + response.reason)
else:
try:
ret = json.load(response)
except ValueError as e:
logMsg("ERROR api_request(): " + str(e))
conn.close()
return ret
def info_request(command):
logger.debug("Making api request to " + REQUEST_HOST + REQUEST_PATH + command + '/')
ret = None
conn = None
try:
conn = httplib.HTTPSConnection(REQUEST_HOST, timeout=HTTPCON_TIMEOUT)
conn.request('GET', REQUEST_PATH + command + '/')
# Pre-process response
resp = conn.getresponse()
data = resp.read()
# Utilizar a classe OrderedDict para preservar a ordem dos elementos
response_json = json.loads(data, object_pairs_hook=OrderedDict)
logger.debug("info_request status: " + str(resp.status) + " " + str(resp.reason))
except Exception as e:
logger.error("Failed api request: " + str(e))
else:
ret = response_json
finally:
if conn:
conn.close()
return ret
def https_open(self, req):
return self.do_open(httplib.HTTPSConnection, req)
def make_connection(self, host):
if self._connection and host == self._connection[0]:
return self._connection[1]
# create a HTTPS connection object from a host descriptor
# host may be a string, or a (host, x509-dict) tuple
try:
HTTPS = httplib.HTTPSConnection
except AttributeError:
raise NotImplementedError(
"your version of httplib doesn't support HTTPS"
)
else:
chost, self._extra_headers, x509 = self.get_host_info(host)
self._connection = host, HTTPS(chost, None, **(x509 or {}))
return self._connection[1]
##
# Standard server proxy. This class establishes a virtual connection
# to an XML-RPC server.
# <p>
# This class is available as ServerProxy and Server. New code should
# use ServerProxy, to avoid confusion.
#
# @def ServerProxy(uri, **options)
# @param uri The connection point on the server.
# @keyparam transport A transport factory, compatible with the
# standard transport class.
# @keyparam encoding The default encoding used for 8-bit strings
# (default is UTF-8).
# @keyparam verbose Use a true value to enable debugging output.
# (printed to standard output).
# @see Transport
def __init__(self, host, port=None, key_file=None, cert_file=None,
strict=None, timeout=None, proxy_info=None,
ca_certs=None, disable_ssl_certificate_validation=False):
httplib.HTTPSConnection.__init__(self, host, port=port,
key_file=key_file,
cert_file=cert_file, strict=strict)
self.timeout = timeout
self.proxy_info = proxy_info
if ca_certs is None:
ca_certs = CA_CERTS
self.ca_certs = ca_certs
self.disable_ssl_certificate_validation = \
disable_ssl_certificate_validation
# The following two methods were adapted from https_wrapper.py, released
# with the Google Appengine SDK at
# http://googleappengine.googlecode.com/svn-history/r136/trunk/python/google/appengine/tools/https_wrapper.py
# under the following license:
#
# Copyright 2007 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
def __init__(self, host, port=None, key_file=None, cert_file=None,
strict=None, timeout=None, proxy_info=None, ca_certs=None,
disable_ssl_certificate_validation=False):
httplib.HTTPSConnection.__init__(self, host, port=port,
key_file=key_file,
cert_file=cert_file, strict=strict,
timeout=timeout)
self._fetch = _new_fixed_fetch(
not disable_ssl_certificate_validation)
# Update the connection classes to use the Googel App Engine specific ones.
def _create_connection(scheme, netloc):
if scheme == 'https':
conn = httplib.HTTPSConnection(netloc)
else:
conn = httplib.HTTPConnection(netloc)
conn.connect()
return conn
def __init__(self, output_spec):
"""
Uses HTTP chunked transfer-encoding to stream a request body to a
server. Unfortunately requests does not support hooking into this logic
easily, so we use the lower-level httplib module.
"""
super(HttpStreamPushAdapter, self).__init__(output_spec)
self._closed = False
parts = urlparse.urlparse(output_spec['url'])
if parts.scheme == 'https':
ssl_context = ssl.create_default_context()
conn = httplib.HTTPSConnection(parts.netloc, context=ssl_context)
else:
conn = httplib.HTTPConnection(parts.netloc)
try:
conn.putrequest(output_spec.get('method', 'POST').upper(),
parts.path, skip_accept_encoding=True)
for header, value in output_spec.get('headers', {}).items():
conn.putheader(header, value)
conn.putheader('Transfer-Encoding', 'chunked')
conn.endheaders() # This actually flushes the headers to the server
except Exception:
print('HTTP connection to "%s" failed.' % output_spec['url'])
conn.close()
raise
self.conn = conn