python类create_default_context()的实例源码

ssl_servers.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def make_https_server(case, context=None, certfile=CERTFILE,
                      host=HOST, handler_class=None):
    if context is None:
        context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
    # We assume the certfile contains both private key and certificate
    context.load_cert_chain(certfile)
    server = HTTPSServerThread(context, host, handler_class)
    flag = threading.Event()
    server.start(flag)
    flag.wait()
    def cleanup():
        if support.verbose:
            sys.stdout.write('stopping HTTPS server\n')
        server.stop()
        if support.verbose:
            sys.stdout.write('joining HTTPS thread\n')
        server.join()
    case.addCleanup(cleanup)
    return server
urllib2.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def urlopen(url, data=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT,
            cafile=None, capath=None, cadefault=False, context=None):
    global _opener
    if cafile or capath or cadefault:
        if context is not None:
            raise ValueError(
                "You can't pass both context and any of cafile, capath, and "
                "cadefault"
            )
        if not _have_ssl:
            raise ValueError('SSL support not available')
        context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH,
                                             cafile=cafile,
                                             capath=capath)
        https_handler = HTTPSHandler(context=context)
        opener = build_opener(https_handler)
    elif context:
        https_handler = HTTPSHandler(context=context)
        opener = build_opener(https_handler)
    elif _opener is None:
        _opener = opener = build_opener()
    else:
        opener = _opener
    return opener.open(url, data, timeout)
ssl_servers.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def make_https_server(case, context=None, certfile=CERTFILE,
                      host=HOST, handler_class=None):
    if context is None:
        context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
    # We assume the certfile contains both private key and certificate
    context.load_cert_chain(certfile)
    server = HTTPSServerThread(context, host, handler_class)
    flag = threading.Event()
    server.start(flag)
    flag.wait()
    def cleanup():
        if support.verbose:
            sys.stdout.write('stopping HTTPS server\n')
        server.stop()
        if support.verbose:
            sys.stdout.write('joining HTTPS thread\n')
        server.join()
    case.addCleanup(cleanup)
    return server
utils.py 文件源码 项目:kodi-plugin.video.ted-talks-chinese 作者: daineseh 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def make_HTTPS_handler(params, **kwargs):
    opts_no_check_certificate = params.get('nocheckcertificate', False)
    if hasattr(ssl, 'create_default_context'):  # Python >= 3.4 or 2.7.9
        context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
        if opts_no_check_certificate:
            context.check_hostname = False
            context.verify_mode = ssl.CERT_NONE
        try:
            return YoutubeDLHTTPSHandler(params, context=context, **kwargs)
        except TypeError:
            # Python 2.7.8
            # (create_default_context present but HTTPSHandler has no context=)
            pass

    if sys.version_info < (3, 2):
        return YoutubeDLHTTPSHandler(params, **kwargs)
    else:  # Python < 3.4
        context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
        context.verify_mode = (ssl.CERT_NONE
                               if opts_no_check_certificate
                               else ssl.CERT_REQUIRED)
        context.set_default_verify_paths()
        return YoutubeDLHTTPSHandler(params, context=context, **kwargs)
urllib2.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def urlopen(url, data=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT,
            cafile=None, capath=None, cadefault=False, context=None):
    global _opener
    if cafile or capath or cadefault:
        if context is not None:
            raise ValueError(
                "You can't pass both context and any of cafile, capath, and "
                "cadefault"
            )
        if not _have_ssl:
            raise ValueError('SSL support not available')
        context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH,
                                             cafile=cafile,
                                             capath=capath)
        https_handler = HTTPSHandler(context=context)
        opener = build_opener(https_handler)
    elif context:
        https_handler = HTTPSHandler(context=context)
        opener = build_opener(https_handler)
    elif _opener is None:
        _opener = opener = build_opener()
    else:
        opener = _opener
    return opener.open(url, data, timeout)
thingtalk.py 文件源码 项目:almond-nnparser 作者: Stanford-Mobisocial-IoT-Lab 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def init_from_url(self, snapshot=-1, thingpedia_url=None):
        if thingpedia_url is None:
            thingpedia_url = os.getenv('THINGPEDIA_URL', 'https://thingpedia.stanford.edu/thingpedia')
        ssl_context = ssl.create_default_context()

        with urllib.request.urlopen(thingpedia_url + '/api/snapshot/' + str(snapshot) + '?meta=1', context=ssl_context) as res:
            self._process_devices(json.load(res)['data'])

        with urllib.request.urlopen(thingpedia_url + '/api/entities?snapshot=' + str(snapshot), context=ssl_context) as res:
            self._process_entities(json.load(res)['data'])
run_server.py 文件源码 项目:almond-nnparser 作者: Stanford-Mobisocial-IoT-Lab 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def run():
    np.random.seed(42)
    config = ServerConfig.load(('./server.conf',))

    if sys.version_info[2] >= 6:
        thread_pool = ThreadPoolExecutor(thread_name_prefix='query-thread-')
    else:
        thread_pool = ThreadPoolExecutor(max_workers=32)
    app = Application(config, thread_pool)

    if config.ssl_key:
        ssl_ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
        ssl_ctx.load_cert_chain(config.ssl_chain, config.ssl_key)
        app.listen(config.port, ssl_options=ssl_ctx)
    else:
        app.listen(config.port)

    if config.user:
        os.setgid(grp.getgrnam(config.user)[2])
        os.setuid(pwd.getpwnam(config.user)[2])

    if sd:
        sd.notify('READY=1')

    tokenizer_service = TokenizerService()
    tokenizer_service.run()

    for language in config.languages:
        load_language(app, tokenizer_service, language, config.get_model_directory(language))

    sys.stdout.flush()
    tornado.ioloop.IOLoop.current().start()
connection.py 文件源码 项目:python-libjuju 作者: juju 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _get_ssl(self, cert=None):
        return ssl.create_default_context(
            purpose=ssl.Purpose.CLIENT_AUTH, cadata=cert)
http.py 文件源码 项目:girder_worker 作者: girder 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __init__(self, output_spec):
        """
        Uses HTTP chunked transfer-encoding to stream a request body to a
        server. Unfortunately requests does not support hooking into this logic
        easily, so we use the lower-level httplib module.
        """
        super(HttpStreamPushAdapter, self).__init__(output_spec)
        self._closed = False

        parts = urlparse.urlparse(output_spec['url'])
        if parts.scheme == 'https':
            ssl_context = ssl.create_default_context()
            conn = httplib.HTTPSConnection(parts.netloc, context=ssl_context)
        else:
            conn = httplib.HTTPConnection(parts.netloc)

        try:
            conn.putrequest(output_spec.get('method', 'POST').upper(),
                            parts.path, skip_accept_encoding=True)

            for header, value in output_spec.get('headers', {}).items():
                conn.putheader(header, value)

            conn.putheader('Transfer-Encoding', 'chunked')
            conn.endheaders()  # This actually flushes the headers to the server
        except Exception:
            print('HTTP connection to "%s" failed.' % output_spec['url'])
            conn.close()
            raise

        self.conn = conn
__init__.py 文件源码 项目:girder_worker 作者: girder 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self, url, headers={}):
        self._url = url

        """
        Uses HTTP chunked transfer-encoding to stream a request body to a
        server. Unfortunately requests does not support hooking into this logic
        easily, so we use the lower-level httplib module.
        """
        self._closed = False

        parts = urlparse.urlparse(self._url)
        if parts.scheme == 'https':
            ssl_context = ssl.create_default_context()
            conn = httplib.HTTPSConnection(parts.netloc, context=ssl_context)
        else:
            conn = httplib.HTTPConnection(parts.netloc)

        try:
            url = parts.path
            if parts.query is not None:
                url = '%s?%s' % (url, parts.query)
            conn.putrequest('POST',
                            url, skip_accept_encoding=True)

            for header, value in headers.items():
                conn.putheader(header, value)

            conn.putheader('Transfer-Encoding', 'chunked')

            conn.endheaders()  # This actually flushes the headers to the server
        except Exception:
            sys.stderr.write('HTTP connection to "%s" failed.\n' % self._url)
            conn.close()
            raise

        self.conn = conn
utils.py 文件源码 项目:socketshark 作者: closeio 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def http_post(shark, url, data):
    log = shark.log.bind(url=url)
    opts = shark.config['HTTP']
    if opts.get('ssl_cafile'):
        ssl_context = ssl.create_default_context(cafile=opts['ssl_cafile'])
    else:
        ssl_context = None
    conn = aiohttp.TCPConnector(ssl_context=ssl_context)
    async with aiohttp.ClientSession(connector=conn) as session:
        wait = opts['wait']
        for n in range(opts['tries']):
            if n > 0:
                await asyncio.sleep(wait)
            try:
                log.debug('http request', data=data)
                async with session.post(url, json=data,
                                        timeout=opts['timeout']) as resp:
                    if resp.status == 429:  # Too many requests.
                        wait = _get_rate_limit_wait(log, resp, opts)
                        continue
                    else:
                        wait = opts['wait']
                    resp.raise_for_status()
                    data = await resp.json()
                    log.debug('http response', data=data)
                    return data
            except aiohttp.ClientError:
                log.exception('unhandled exception in http_post')
            except asyncio.TimeoutError:
                log.exception('timeout in http_post')
        return {'status': 'error', 'error': c.ERR_SERVICE_UNAVAILABLE}
__init__.py 文件源码 项目:socketshark 作者: closeio 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_ssl_context(self):
        ssl_settings = self.config.get('WS_SSL')
        if ssl_settings:
            ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
            ssl_context.load_cert_chain(certfile=ssl_settings['cert'],
                                        keyfile=ssl_settings['key'])
            return ssl_context
iostream_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_check_hostname(self):
        # Test that server_hostname parameter to start_tls is being used.
        # The check_hostname functionality is only available in python 2.7 and
        # up and in python 3.4 and up.
        server_future = self.server_start_tls(_server_ssl_options())
        client_future = self.client_start_tls(
            ssl.create_default_context(),
            server_hostname=b'127.0.0.1')
        with ExpectLog(gen_log, "SSL Error"):
            with self.assertRaises(ssl.SSLError):
                # The client fails to connect with an SSL error.
                yield client_future
        with self.assertRaises(Exception):
            # The server fails to connect, but the exact error is unspecified.
            yield server_future
iostream_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_check_hostname(self):
        # Test that server_hostname parameter to start_tls is being used.
        # The check_hostname functionality is only available in python 2.7 and
        # up and in python 3.4 and up.
        server_future = self.server_start_tls(_server_ssl_options())
        client_future = self.client_start_tls(
            ssl.create_default_context(),
            server_hostname=b'127.0.0.1')
        with ExpectLog(gen_log, "SSL Error"):
            with self.assertRaises(ssl.SSLError):
                # The client fails to connect with an SSL error.
                yield client_future
        with self.assertRaises(Exception):
            # The server fails to connect, but the exact error is unspecified.
            yield server_future
iostream_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_check_hostname(self):
        # Test that server_hostname parameter to start_tls is being used.
        # The check_hostname functionality is only available in python 2.7 and
        # up and in python 3.4 and up.
        server_future = self.server_start_tls(_server_ssl_options())
        client_future = self.client_start_tls(
            ssl.create_default_context(),
            server_hostname=b'127.0.0.1')
        with ExpectLog(gen_log, "SSL Error"):
            with self.assertRaises(ssl.SSLError):
                # The client fails to connect with an SSL error.
                yield client_future
        with self.assertRaises(Exception):
            # The server fails to connect, but the exact error is unspecified.
            yield server_future
tenant.py 文件源码 项目:python-keylime 作者: mit-ll 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_tls_context(self):
        ca_cert = config.get('tenant', 'ca_cert')
        my_cert = config.get('tenant', 'my_cert')
        my_priv_key = config.get('tenant', 'private_key')
        my_key_pw = config.get('tenant','private_key_pw')

        tls_dir = config.get('tenant','tls_dir')

        if tls_dir == 'default':
            ca_cert = 'cacert.crt'
            my_cert = 'client-cert.crt'
            my_priv_key = 'client-private.pem'
            tls_dir = 'cv_ca'

        # this is relative path, convert to absolute in WORK_DIR
        if tls_dir[0]!='/':
            tls_dir = os.path.abspath('%s/%s'%(common.WORK_DIR,tls_dir))

        if my_key_pw=='default':
            logger.warning("CAUTION: using default password for private key, please set private_key_pw to a strong password")

        logger.info("Setting up client TLS in %s"%(tls_dir))

        ca_path = "%s/%s"%(tls_dir,ca_cert)
        my_cert = "%s/%s"%(tls_dir,my_cert)
        my_priv_key = "%s/%s"%(tls_dir,my_priv_key)

        context = ssl.create_default_context()
        context.load_verify_locations(cafile=ca_path)   
        context.load_cert_chain(certfile=my_cert,keyfile=my_priv_key,password=my_key_pw)
        context.verify_mode = ssl.CERT_REQUIRED
        context.check_hostname = config.getboolean('general','tls_check_hostnames')
        return context
dirbuster_threaded.py 文件源码 项目:Python4Pentesters 作者: tatanus 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_dir(url, depth, max_depth, dir_list):
    if (depth > max_depth):
        return
    depth = depth +1

    for d in dir_list:
        new_url = url + "/" + d
        try:
            # make a SSL handler that ignores SSL CERT issues
            ctx = ssl.create_default_context()
            ctx.check_hostname = False
            ctx.verify_mode = ssl.CERT_NONE

            response = urllib2.urlopen(new_url, context=ctx)
            if response and response.getcode() == 200:
                print "[+] FOUND %s" % (new_url)
                thread.start_new_thread(test_dir, (new_url, depth, max_depth, dir_list, ))
        except urllib2.HTTPError, e:
            if e.code == 401:
                print "[!] Authorization Required %s " % (new_url)
            elif e.code == 403:
                print "[!] Forbidden %s " % (new_url)
            elif e.code == 404:
                print "[-] Not Found %s " % (new_url)
            elif e.code == 503:
                print "[!] Service Unavailable %s " % (new_url)
            else:
                print "[?] Unknwon"
dirbuster.py 文件源码 项目:Python4Pentesters 作者: tatanus 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_dir(url, depth, max_depth, dir_list):
    if (depth > max_depth):
        return
    depth = depth +1

    found = list()
    for d in dir_list:
        new_url = url + "/" + d
        try:
            # make a SSL handler that ignores SSL CERT issues
            ctx = ssl.create_default_context()
            ctx.check_hostname = False
            ctx.verify_mode = ssl.CERT_NONE

            response = urllib2.urlopen(new_url, context=ctx)
            if response and response.getcode() == 200:
                print "[+] FOUND %s" % (new_url)
                found.append(new_url)
        except urllib2.HTTPError, e:
            if e.code == 401:
                print "[!] Authorization Required %s " % (new_url)
            elif e.code == 403:
                print "[!] Forbidden %s " % (new_url)
            elif e.code == 404:
                print "[-] Not Found %s " % (new_url)
            elif e.code == 503:
                print "[!] Service Unavailable %s " % (new_url)
            else:
                print "[?] Unknwon"

    # loop over each identified valid directory and recurse into it
    for new_url in found:
        test_dir(new_url, depth, max_depth, dir_list)
asyncio_echo_client_ssl.py 文件源码 项目:pymotw3 作者: reingart 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def echo_client(server_address, messages):

    log = logging.getLogger('echo_client')

    # The certificate is created with pymotw.com as the hostname,
    # which will not match when the example code runs
    # elsewhere, so disable hostname verification.
    ssl_context = ssl.create_default_context(
        ssl.Purpose.SERVER_AUTH,
    )
    ssl_context.check_hostname = False
    ssl_context.load_verify_locations('pymotw.crt')

    log.debug('connecting to {} port {}'.format(*server_address))
    reader, writer = await asyncio.open_connection(
        *server_address, ssl=ssl_context)

    # This could be writer.writelines() except that
    # would make it harder to show each part of the message
    # being sent.
    for msg in messages:
        writer.write(msg)
        log.debug('sending {!r}'.format(msg))
    # SSL does not support EOF, so send a null byte to indicate
    # the end of the message.
    writer.write(b'\x00')
    await writer.drain()

    log.debug('waiting for response')
    while True:
        data = await reader.read(128)
        if data:
            log.debug('received {!r}'.format(data))
        else:
            log.debug('closing')
            writer.close()
            return
test_events.py 文件源码 项目:annotated-py-asyncio 作者: hhstore 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def _test_create_ssl_connection(self, httpd, create_connection,
                                    check_sockname=True):
        conn_fut = create_connection(ssl=test_utils.dummy_ssl_context())
        self._basetest_create_ssl_connection(conn_fut, check_sockname)

        # ssl.Purpose was introduced in Python 3.4
        if hasattr(ssl, 'Purpose'):
            def _dummy_ssl_create_context(purpose=ssl.Purpose.SERVER_AUTH, *,
                                          cafile=None, capath=None,
                                          cadata=None):
                """
                A ssl.create_default_context() replacement that doesn't enable
                cert validation.
                """
                self.assertEqual(purpose, ssl.Purpose.SERVER_AUTH)
                return test_utils.dummy_ssl_context()

            # With ssl=True, ssl.create_default_context() should be called
            with mock.patch('ssl.create_default_context',
                            side_effect=_dummy_ssl_create_context) as m:
                conn_fut = create_connection(ssl=True)
                self._basetest_create_ssl_connection(conn_fut, check_sockname)
                self.assertEqual(m.call_count, 1)

        # With the real ssl.create_default_context(), certificate
        # validation will fail
        with self.assertRaises(ssl.SSLError) as cm:
            conn_fut = create_connection(ssl=True)
            # Ignore the "SSL handshake failed" log in debug mode
            with test_utils.disable_logger():
                self._basetest_create_ssl_connection(conn_fut, check_sockname)

        self.assertEqual(cm.exception.reason, 'CERTIFICATE_VERIFY_FAILED')


问题


面经


文章

微信
公众号

扫码关注公众号