def __init__(self, username=None, password=None, version=None, debug=None,
requesttype=None, baseurl=None, site=None):
if username:
self.username = username
if password:
self.password = password
if version:
self.version = version
if debug:
self.debug = debug
if requesttype:
self.requesttype = requesttype
if baseurl:
self.baseurl = baseurl
if site:
self.site = site
ssl._create_default_https_context = ssl._create_unverified_context # This is the way to allow unverified SSL
self.cj = http.cookiejar.CookieJar()
opener = urllib.request.build_opener(urllib.request.HTTPHandler(debuglevel=1 if self.debug else 0),
urllib.request.HTTPSHandler(debuglevel=1 if self.debug else 0),
urllib.request.HTTPCookieProcessor(self.cj))
opener.addheaders = [('User-agent', 'Mozilla/5.0')]
urllib.request.install_opener(opener)
python类_create_default_https_context()的实例源码
def __init__(self, username=None, password=None, debug=None,
requesttype=None, baseurl=None):
if username:
self.username = username
if password:
self.password = password
if debug:
self.debug = debug
if requesttype:
self.requesttype = requesttype
if baseurl:
self.baseurl = baseurl
ssl._create_default_https_context = ssl._create_unverified_context # This is the way to allow unverified SSL
self.cj = http.cookiejar.CookieJar()
opener = urllib.request.build_opener(urllib.request.HTTPHandler(debuglevel=1 if self.debug else 0),
urllib.request.HTTPSHandler(debuglevel=1 if self.debug else 0),
urllib.request.HTTPCookieProcessor(self.cj))
opener.addheaders = [('User-agent', 'Mozilla/5.0')]
urllib.request.install_opener(opener)
def download_image(url: str = '', save_path: str = '',
unverified_ctx: bool = False) -> Union[None, str]:
"""Download image and save in current directory on local machine.
:param str url: URL to image.
:param str save_path: Saving path.
:param bool unverified_ctx: Create unverified context.
:return: Image name.
:rtype: str or None
"""
if unverified_ctx:
ssl._create_default_https_context = ssl._create_unverified_context
if url is not None:
image_name = url.rsplit('/')[-1]
request.urlretrieve(url, save_path + image_name)
return image_name
return None
def SetCMSRecording(callID, state):
ssl._create_default_https_context = ssl._create_unverified_context
conn = http.client.HTTPSConnection(Config.cmsFqdn)
if (state == True):
payload = "recording=true"
else:
payload = "recording=false"
headers = {
'authorization': Config.cmsGenericAuthRealm,
'content-type': "application/x-www-form-urlencoded",
'cache-control': "no-cache",
'postman-token': "b5f016ed-5e19-d311-563e-c6aa7fdaa591"
}
conn.request("PUT", "/api/v1/calls/" + callID, payload, headers)
res = conn.getresponse()
data = res.read()
print(data.decode("utf-8"))
print("Recording Bit Set")
def index(request):
if request.method == "GET":
try:
ssl._create_default_https_context = ssl._create_unverified_context
opener = wdf_urllib.build_opener(
wdf_urllib.HTTPCookieProcessor(CookieJar()))
wdf_urllib.install_opener(opener)
except:
pass
uuid = getUUID()
url = 'https://login.weixin.qq.com/qrcode/' + uuid
params = {
't': 'webwx',
'_': int(time.time()),
}
request = getRequest(url=url, data=urlencode(params))
response = wdf_urllib.urlopen(request)
context = {
'uuid': uuid,
'response': response.read(),
'delyou': '',
}
return render_to_response('index.html', context)
def __init__(self, host, port=None, key_file=None, cert_file=None,
timeout=socket._GLOBAL_DEFAULT_TIMEOUT,
source_address=None, *, context=None,
check_hostname=None):
super(HTTPSConnection, self).__init__(host, port, timeout,
source_address)
self.key_file = key_file
self.cert_file = cert_file
if context is None:
context = ssl._create_default_https_context()
will_verify = context.verify_mode != ssl.CERT_NONE
if check_hostname is None:
check_hostname = context.check_hostname
if check_hostname and not will_verify:
raise ValueError("check_hostname needs a SSL context with "
"either CERT_OPTIONAL or CERT_REQUIRED")
if key_file or cert_file:
context.load_cert_chain(cert_file, key_file)
self._context = context
self._check_hostname = check_hostname
def create_api():
"""create a twitter api"""
from twitter import OAuth, Twitter
from app import APP_INSTANCE as app
access_token = app.get_config('api.twitter.access_token')
access_secret = app.get_config('api.twitter.access_secret')
consumer_key = app.get_config('api.twitter.consumer_key')
consumer_secret = app.get_config('api.twitter.consumer_secret')
# temporary fix for certificate error
ssl._create_default_https_context = ssl._create_unverified_context
oauth = OAuth(access_token, access_secret, consumer_key, consumer_secret)
# Initiate the connection to Twitter API
return Twitter(auth=oauth)
def __init__(self, host, port=None, key_file=None, cert_file=None,
timeout=socket._GLOBAL_DEFAULT_TIMEOUT,
source_address=None, *, context=None,
check_hostname=None):
super(HTTPSConnection, self).__init__(host, port, timeout,
source_address)
self.key_file = key_file
self.cert_file = cert_file
if context is None:
context = ssl._create_default_https_context()
will_verify = context.verify_mode != ssl.CERT_NONE
if check_hostname is None:
check_hostname = context.check_hostname
if check_hostname and not will_verify:
raise ValueError("check_hostname needs a SSL context with "
"either CERT_OPTIONAL or CERT_REQUIRED")
if key_file or cert_file:
context.load_cert_chain(cert_file, key_file)
self._context = context
self._check_hostname = check_hostname
def cache_download(url, filename, caller='', ssl_enabled=True):
'''Download a file to the cache'''
if caller == '':
caller_get()
filename_full = 'cache/{}_{}'.format(caller, filename)
if os.path.isfile(filename_full):
return 1
else:
try:
if ssl_enabled:
urllib.request.urlretrieve(url, filename_full)
else:
ssl._create_default_https_context = ssl._create_unverified_context
urllib.request.urlretrieve(url, filename_full)
return 1
except urllib.error.HTTPError:
return -1
except urllib.error.URLError:
return -2
def main():
global mySession
ssl._create_default_https_context = ssl._create_unverified_context
headers = {'User-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2663.0 Safari/537.36'}
mySession = requests.Session()
mySession.headers.update(headers)
if not getInfos():
print(u'??????')
return
getQrcode() # ????
waitLogin() # ????????
login() # ????
print('??????')
threading.Thread(target=keepLogin) # ?????????
getTasksign() # ????task????
getUserinfo() # ????????
# addLinktask("magnet:?xt=urn:btih:690ba0361597ffb2007ad717bd805447f2acc624")
# addLinktasks([link]) ????list
# print tsign
# print "fuck"
# get_bt_upload_info()
# upload_torrent()
add_many_bt()
def get_title_from_webpage(url):
""" Fetch <title> of a html site for title element
:url: str (http url)
:returns: str
"""
# LOL SECURITY
ssl._create_default_https_context = ssl._create_unverified_context
try:
h = {'User-Agent':'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_2) AppleWebKit/601.3.9 (KHTML, like Gecko) Version/9.0.2 Safari/601.3.9'}
u = urllib2.Request(url, headers=h)
u = urllib2.urlopen(u)
soup = BeautifulSoup(u, "html.parser")
s = soup.title.string.replace('\n', ' ').replace('\r', '').lstrip().rstrip()
s = s.lstrip()
return s
except (AttributeError, MemoryError, ssl.CertificateError, IOError) as e:
return "No title"
except ValueError:
return False
def __run(self,req):
try:
ssl._create_default_https_context = ssl._create_unverified_context
if self.__opener is not None:
response=self.__opener.open(req)
self.__code=response.code
return response.read()
else:
context = ssl._create_unverified_context()
response=urllib2.urlopen(req,context=context)
self.__code=response.code
return response.read()
except Exception,e:
raise Exception(e.message)
def __run(self,req):
try:
ssl._create_default_https_context = ssl._create_unverified_context
if self.__opener is not None:
response=self.__opener.open(req)
self.__code=response.code
return response.read()
else:
context = ssl._create_unverified_context()
response=urllib2.urlopen(req,context=context)
self.__code=response.code
return response.read()
except Exception,e:
print(e)
return None
def __init__(self, host, port=None, key_file=None, cert_file=None,
strict=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT,
source_address=None, context=None):
HTTPConnection.__init__(self, host, port, strict, timeout,
source_address)
self.key_file = key_file
self.cert_file = cert_file
if context is None:
context = ssl._create_default_https_context()
if key_file or cert_file:
context.load_cert_chain(cert_file, key_file)
self._context = context
def __init__(self, options, handle=None):
self.handle = handle
self.opts = deepcopy(options)
# This is ugly but we don't want any fetches to fail - we expect
# to encounter unverified SSL certs!
if sys.version_info >= (2, 7, 9):
ssl._create_default_https_context = ssl._create_unverified_context
# Bit of a hack to support SOCKS because of the loading order of
# modules. sfscan will call this to update the socket reference
# to the SOCKS one.
def main():
# ignore invalid cert on ESX box
import ssl
_create_unverified_https_context = ssl._create_unverified_context
ssl._create_default_https_context = _create_unverified_https_context
vm_list = get_vm_list()
app = Flask(__name__)
@app.route("/")
def index():
result = '''<!DOCTYPE html>
<html>
<head>
<title>VulnLab</title>
</head>
<body>
<h1>Reset</h1>
'''
for name, uuid in sorted(vm_list.items()):
result += ' <a href="/reset/' + name + '">' + name + '</a><br>\n'
result += ''' </body>
</html>
'''
return result, 200
@app.route("/reset/<string:vm_name>")
def reset(vm_name):
reset_vm(vm_list, vm_name)
return 'OK\n', 200
# start the web server
app.run(host="0.0.0.0", port=5000)
def disable_certificate_check():
ssl._create_default_https_context = ssl._create_unverified_context
def __init__(self, options, handle=None):
self.handle = handle
self.opts = deepcopy(options)
# This is ugly but we don't want any fetches to fail - we expect
# to encounter unverified SSL certs!
if sys.version_info >= (2, 7, 9):
ssl._create_default_https_context = ssl._create_unverified_context
# Bit of a hack to support SOCKS because of the loading order of
# modules. sfscan will call this to update the socket reference
# to the SOCKS one.
def getVmwServiceContent(vcip, vcuser, vcpwd):
unverified_context=ssl._create_unverified_context
ssl._create_default_https_context=unverified_context
si=Connect(host=vcip,port=443,user=vcuser,pwd=vcpwd)
return si.RetrieveContent()
def enable_ssl(cls):
ssl._create_default_https_context = ssl.create_default_context
def disable_ssl(cls):
ssl._create_default_https_context = ssl._create_unverified_context
def serversess(self, f_engine_address, f_engine_username,
f_engine_password, f_engine_namespace='DOMAIN'):
"""
Method to setup the session with the Virtualization Engine
f_engine_address: The Virtualization Engine's address (IP/DNS Name)
f_engine_username: Username to authenticate
f_engine_password: User's password
f_engine_namespace: Namespace to use for this session. Default: DOMAIN
"""
# if use_https:
# if hasattr(ssl, '_create_unverified_context'):
# ssl._create_default_https_context = \
# ssl._create_unverified_context
try:
if f_engine_password:
self.server_session = DelphixEngine(f_engine_address,
f_engine_username,
f_engine_password,
f_engine_namespace)
elif f_engine_password is None:
self.server_session = DelphixEngine(f_engine_address,
f_engine_username,
None, f_engine_namespace)
except (HttpError, RequestError, JobError) as e:
raise DlpxException('ERROR: An error occurred while authenticating'
' to {}:\n {}\n'.format(f_engine_address, e))
def __init__(self, host, port=None, key_file=None, cert_file=None,
strict=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT,
source_address=None, context=None):
HTTPConnection.__init__(self, host, port, strict, timeout,
source_address)
self.key_file = key_file
self.cert_file = cert_file
if context is None:
context = ssl._create_default_https_context()
if key_file or cert_file:
context.load_cert_chain(cert_file, key_file)
self._context = context
def refresh(self):
self.news = []
for link in tqdm(self.links, desc="Getting news"):
data = 0
if hasattr(ssl, '_create_unverified_context'):
ssl._create_default_https_context = ssl._create_unverified_context
data = feedparser.parse(link)
self.news += [News(binascii.b2a_base64(data['feed']['title'].replace(' VK feed', '').encode()).decode(),
binascii.b2a_base64(entry['link'].encode()).decode(),
int(time.mktime(entry['published_parsed']))) for entry in data['entries']]
time.sleep(1)
lookup_service_helper.py 文件源码
项目:vsphere-automation-sdk-python
作者: vmware
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def connect(self):
if self.client is None:
# Suds library doesn't support passing unverified context to disable
# server certificate verification. Thus disable checking globally in
# order to skip verification. This is not recommended in production
# code. see https://www.python.org/dev/peps/pep-0476/
if self.skip_verification:
import ssl
try:
_create_unverified_https_context = \
ssl._create_unverified_context
except AttributeError:
# Legacy Python that doesn't verify HTTPS certificates by
# default
pass
else:
# Handle target environment that doesn't support HTTPS
# verification
ssl._create_default_https_context = \
_create_unverified_https_context
self.client = Client(url=self.wsdl_url, location=self.soap_url)
assert self.client is not None
self.client.set_options(service='LsService', port='LsPort')
self.managedObjectReference = self.client.factory.create(
'ns0:ManagedObjectReference')
self.managedObjectReference._type = 'LookupServiceInstance'
self.managedObjectReference.value = 'ServiceInstance'
lookupServiceContent = self.client.service.RetrieveServiceContent(
self.managedObjectReference)
self.serviceRegistration = lookupServiceContent.serviceRegistration
def __init__(self, host, port=None, key_file=None, cert_file=None,
strict=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT,
source_address=None, context=None):
HTTPConnection.__init__(self, host, port, strict, timeout,
source_address)
self.key_file = key_file
self.cert_file = cert_file
if context is None:
context = ssl._create_default_https_context()
if key_file or cert_file:
context.load_cert_chain(cert_file, key_file)
self._context = context
def bigip_api(bigip, user, password, validate_certs, port=443):
try:
if bigsuds.__version__ >= '1.0.4':
api = bigsuds.BIGIP(hostname=bigip, username=user, password=password, verify=validate_certs, port=port)
elif bigsuds.__version__ == '1.0.3':
api = bigsuds.BIGIP(hostname=bigip, username=user, password=password, verify=validate_certs)
else:
api = bigsuds.BIGIP(hostname=bigip, username=user, password=password)
except TypeError:
# bigsuds < 1.0.3, no verify param
if validate_certs:
# Note: verified we have SSLContext when we parsed params
api = bigsuds.BIGIP(hostname=bigip, username=user, password=password)
else:
import ssl
if hasattr(ssl, 'SSLContext'):
# Really, you should never do this. It disables certificate
# verification *globally*. But since older bigip libraries
# don't give us a way to toggle verification we need to
# disable it at the global level.
# From https://www.python.org/dev/peps/pep-0476/#id29
ssl._create_default_https_context = ssl._create_unverified_context
api = bigsuds.BIGIP(hostname=bigip, username=user, password=password)
return api
# Fully Qualified name (with the partition)
def get_staff_info(self, openner, url='https://kaoqin.bangongyi.com/attend/index/record?_=1498544871927'):
"""
?????????????????????????????????
:param openner: http openner
:param url: ????
:return: ???? json ??
"""
logger.info('?? cookiee ????????')
ssl._create_default_https_context = ssl._create_unverified_context
now = datetime.now()
# ???????????????????
yesterday_month = now - timedelta(days=1)
if now.day == 1:
formated_month = yesterday_month.strftime('%Y-%m')
else:
formated_month = now.strftime('%Y-%m')
post_data = {"date": formated_month, "staffid": "6590415"}
post_data = parse.urlencode(post_data).encode()
response = openner.open(url, post_data)
# request = urllib.request.Request(self.request_url, post_data, headers=LixinStaffInfoSpider.head)
# ??openner????
# response = urllib.request.urlopen(request)
ungzip_response = self.__ungzip(response.read()).decode('utf-8')
logger.debug(ungzip_response)
logger.info('????????')
return ungzip_response
def get_staff_cookie(self, openner,
url='https://kaoqin.bangongyi.com/attend/index/index?corpid=wx7a3ce8cf2cdfb04c&t=3'):
"""
????????? session????????.
:param openner: ???????? http urllib ????
:param url: ??????? session ???
:return: ????? session ?????? cookie ? http openner ??
"""
logger.info('???????????? cookie ??')
ssl._create_default_https_context = ssl._create_unverified_context
openner.open(url)
logger.info('?? cookie ????')
return
def get_server_time(self, openner,
url='https://kaoqin.bangongyi.com/attend/check/get-time?v=1506595896876&_=1506595896882'):
"""
?? session ????????
:param openner:
:param url: ?????????????
:return:???????? epoch time
"""
logger.info('?? session ????????')
ssl._create_default_https_context = ssl._create_unverified_context
response = openner.open(url)
ungzip_response = self.__ungzip(response.read()).decode('utf-8')
logger.info('session ????????? %s ' % ungzip_response)
return ungzip_response