python类Context()的实例源码

ssl.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __init__(self, privateKeyFileName, certificateFileName,
                 sslmethod=SSL.SSLv23_METHOD, _contextFactory=SSL.Context):
        """
        @param privateKeyFileName: Name of a file containing a private key
        @param certificateFileName: Name of a file containing a certificate
        @param sslmethod: The SSL method to use
        """
        self.privateKeyFileName = privateKeyFileName
        self.certificateFileName = certificateFileName
        self.sslmethod = sslmethod
        self._contextFactory = _contextFactory

        # Create a context object right now.  This is to force validation of
        # the given parameters so that errors are detected earlier rather
        # than later.
        self.cacheContext()
recipe-496786.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def __init__(self, server_address, HandlerClass, logRequests=True):
        """Secure XML-RPC server.

        It it very similar to SimpleXMLRPCServer but it uses HTTPS for transporting XML data.
        """
        self.logRequests = logRequests

        SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self)
        SocketServer.BaseServer.__init__(self, server_address, HandlerClass)
        ctx = SSL.Context(SSL.SSLv23_METHOD)
        ctx.use_privatekey_file (KEYFILE)
        ctx.use_certificate_file(CERTFILE)
        self.socket = SSL.Connection(ctx, socket.socket(self.address_family,
                                                        self.socket_type))
        self.server_bind()
        self.server_activate()
test_https.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test03_ssl_verification_of_peer_fails(self):
        ctx = SSL.Context(SSL.TLSv1_METHOD)

        def verify_callback(conn, x509, errnum, errdepth, preverify_ok): 
            log.debug('SSL peer certificate verification failed for %r',
                      x509.get_subject())
            return preverify_ok 

        ctx.set_verify(SSL.VERIFY_PEER, verify_callback)
        ctx.set_verify_depth(9)

        # Set bad location - unit test dir has no CA certs to verify with
        ctx.load_verify_locations(None, Constants.UNITTEST_DIR)

        conn = HTTPSConnection(Constants.HOSTNAME, port=Constants.PORT,
                               ssl_context=ctx)
        conn.connect()        
        self.assertRaises(SSL.Error, conn.request, 'GET', '/')
test_https.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test04_ssl_verification_with_subj_alt_name(self):
        ctx = SSL.Context(SSL.TLSv1_METHOD)

        verification = ServerSSLCertVerification(hostname='localhost')
        verify_callback = verification.get_verify_server_cert_func()

        ctx.set_verify(SSL.VERIFY_PEER, verify_callback)
        ctx.set_verify_depth(9)

        # Set correct location for CA certs to verify with
        ctx.load_verify_locations(None, Constants.CACERT_DIR)

        conn = HTTPSConnection(Constants.HOSTNAME, port=Constants.PORT,
                               ssl_context=ctx)
        conn.connect()
        conn.request('GET', '/')
        resp = conn.getresponse()
        print('Response = %s' % resp.read())
test_https.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 50 收藏 0 点赞 0 评论 0
def test04_ssl_verification_with_subj_common_name(self):
        ctx = SSL.Context(SSL.TLSv1_METHOD)

        # Explicitly set verification of peer hostname using peer certificate
        # subject common name
        verification = ServerSSLCertVerification(hostname='localhost',
                                                 subj_alt_name_match=False)

        verify_callback = verification.get_verify_server_cert_func()

        ctx.set_verify(SSL.VERIFY_PEER, verify_callback)
        ctx.set_verify_depth(9)

        # Set correct location for CA certs to verify with
        ctx.load_verify_locations(None, Constants.CACERT_DIR)

        conn = HTTPSConnection(Constants.HOSTNAME, port=Constants.PORT,
                               ssl_context=ctx)
        conn.connect()
        conn.request('GET', '/')
        resp = conn.getresponse()
        print('Response = %s' % resp.read())
https.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def connect(self):
        """Create SSL socket and connect to peer
        """
        if getattr(self, 'ssl_context', None):
            if not isinstance(self.ssl_context, SSL.Context):
                raise TypeError('Expecting OpenSSL.SSL.Context type for "'
                                'ssl_context" attribute; got %r instead' %
                                self.ssl_context)
            ssl_context = self.ssl_context
        else:
            ssl_context = SSL.Context(self.__class__.default_ssl_method)

        sock = socket.create_connection((self.host, self.port), self.timeout)

        # Tunnel if using a proxy - ONLY available for Python 2.6.2 and above      
        if getattr(self, '_tunnel_host', None):
            self.sock = sock
            self._tunnel()

        self.sock = SSLSocket(ssl_context, sock)

        # Go to client mode.
        self.sock.set_connect_state()
LuckyThirteen_vulnerability_tester_plugin.py 文件源码 项目:midip-sslyze 作者: soukupa5 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _test_dtls_ciphersuite(self, server_connectivity_info, dtls_version, cipher, port):
        """This function is used by threads to it investigates with support the cipher suite on server, when DTLS protocol(s) is/are tested. Returns instance of class AcceptCipher or RejectCipher.

            Args:
            server_connectivity_info (ServerConnectivityInfo): contains information for connection on server
            dtls_version (str): contains SSL/TLS protocol version, which is used to connect
            cipher (str): contains OpenSSL shortcut for identification cipher suite
            port (int): contains port number for connecting comunication.
        """
        cnx = SSL.Context(dtls_version)
        cnx.set_cipher_list(cipher)
        conn = SSL.Connection(cnx, socket.socket(socket.AF_INET, socket.SOCK_DGRAM))
        try:
            conn.connect((server_connectivity_info.ip_address, port))
            conn.do_handshake()
        except SSL.Error as e:
            error_msg = ((e[0])[0])[2]
            cipher_result = RejectCipher(TLS_OPENSSL_TO_RFC_NAMES_MAPPING[cipher], error_msg)
        else:
            cipher_result = AcceptCipher(TLS_OPENSSL_TO_RFC_NAMES_MAPPING[cipher])
        finally:
            conn.shutdown()
            conn.close()
        return cipher_result
LuckyThirteen_vulnerability_tester_plugin.py 文件源码 项目:midip-sslyze 作者: soukupa5 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_dtls_protocol_support(self, server_connectivity_info, dtls_version, port):
        """Tests if DTLS protocols are supported by server. Returns true if server supports protocol otherwise returns false.

            Args:
            server_connectivity_info (ServerConnectivityInfo): contains information for connection on server
            dtls_protocol (str): contains version of DTLS protocol, which is supposed to be tested
            port (int): contains port number for connecting comunication.
        """
        cnx = SSL.Context(dtls_version)
        cnx.set_cipher_list('ALL:COMPLEMENTOFALL')
        conn = SSL.Connection(cnx,socket.socket(socket.AF_INET, socket.SOCK_DGRAM))
        try:
            conn.connect((server_connectivity_info.ip_address, port))
            conn.do_handshake()
        except SSL.SysCallError as ex:
            if ex[0] == 111:
                raise ValueError('LuckyThirteenVulnerabilityTesterPlugin: It is entered wrong port for DTLS connection.')
            else:
                support = False
        else:
            support = True
        finally:
            conn.shutdown()
            conn.close()
        return support
sslold.py 文件源码 项目:Intranet-Penetration 作者: yuxiaokui 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def ssl(sock, keyfile=None, certfile=None):
    context = SSL.Context(SSL.SSLv23_METHOD)
    if certfile is not None:
        context.use_certificate_file(certfile)
    if keyfile is not None:
        context.use_privatekey_file(keyfile)
    context.set_verify(SSL.VERIFY_NONE, lambda *x: True)
    timeout = sock.gettimeout()
    try:
        sock = sock._sock
    except AttributeError:
        pass
    connection = SSL.Connection(context, sock)
    ssl_sock = SSLObject(connection)
    ssl_sock.settimeout(timeout)

    try:
        sock.getpeername()
    except Exception:
        # no, no connection yet
        pass
    else:
        # yes, do the handshake
        ssl_sock.do_handshake()
    return ssl_sock
sslold.py 文件源码 项目:MKFQ 作者: maojingios 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def ssl(sock, keyfile=None, certfile=None):
    context = SSL.Context(SSL.SSLv23_METHOD)
    if certfile is not None:
        context.use_certificate_file(certfile)
    if keyfile is not None:
        context.use_privatekey_file(keyfile)
    context.set_verify(SSL.VERIFY_NONE, lambda *x: True)
    timeout = sock.gettimeout()
    try:
        sock = sock._sock
    except AttributeError:
        pass
    connection = SSL.Connection(context, sock)
    ssl_sock = SSLObject(connection)
    ssl_sock.settimeout(timeout)

    try:
        sock.getpeername()
    except Exception:
        # no, no connection yet
        pass
    else:
        # yes, do the handshake
        ssl_sock.do_handshake()
    return ssl_sock
client.py 文件源码 项目:2FAssassin 作者: maxwellkoh 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def main():
    """
    Connect to an SNI-enabled server and request a specific hostname, specified
    by argv[1], of it.
    """
    if len(argv) < 2:
        print 'Usage: %s <hostname>' % (argv[0],)
        return 1

    client = socket()

    print 'Connecting...',
    stdout.flush()
    client.connect(('127.0.0.1', 8443))
    print 'connected', client.getpeername()

    client_ssl = Connection(Context(TLSv1_METHOD), client)
    client_ssl.set_connect_state()
    client_ssl.set_tlsext_host_name(argv[1])
    client_ssl.do_handshake()
    print 'Server subject is', client_ssl.get_peer_certificate().get_subject()
    client_ssl.close()
server.py 文件源码 项目:2FAssassin 作者: maxwellkoh 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def main():
    """
    Run an SNI-enabled server which selects between a few certificates in a
    C{dict} based on the handshake request it receives from a client.
    """
    port = socket()
    port.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
    port.bind(('', 8443))
    port.listen(3)

    print 'Accepting...',
    stdout.flush()
    server, addr = port.accept()
    print 'accepted', addr

    server_context = Context(TLSv1_METHOD)
    server_context.set_tlsext_servername_callback(pick_certificate)

    server_ssl = Connection(server_context, server)
    server_ssl.set_accept_state()
    server_ssl.do_handshake()
    server.close()
SecureXMLRPCServer.py 文件源码 项目:2FAssassin 作者: maxwellkoh 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, server_address, RequestHandlerClass):
        SocketServer.BaseServer.__init__(
            self, server_address, RequestHandlerClass
        )

        # Same as normal, but make it secure:
        ctx = SSL.Context(SSL.SSLv23_METHOD)
        ctx.set_options(SSL.OP_NO_SSLv2)

        dir = os.curdir
        ctx.use_privatekey_file(os.path.join(dir, 'server.pkey'))
        ctx.use_certificate_file(os.path.join(dir, 'server.cert'))

        self.socket = SSLWrapper(
            SSL.Connection(
                ctx, socket.socket(self.address_family, self.socket_type)
            )
        )
        self.server_bind()
        self.server_activate()
sslold.py 文件源码 项目:xxNet 作者: drzorm 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def ssl(sock, keyfile=None, certfile=None):
    context = SSL.Context(SSL.SSLv23_METHOD)
    if certfile is not None:
        context.use_certificate_file(certfile)
    if keyfile is not None:
        context.use_privatekey_file(keyfile)
    context.set_verify(SSL.VERIFY_NONE, lambda *x: True)
    timeout = sock.gettimeout()
    try:
        sock = sock._sock
    except AttributeError:
        pass
    connection = SSL.Connection(context, sock)
    ssl_sock = SSLObject(connection)
    ssl_sock.settimeout(timeout)

    try:
        sock.getpeername()
    except Exception:
        # no, no connection yet
        pass
    else:
        # yes, do the handshake
        ssl_sock.do_handshake()
    return ssl_sock
https.py 文件源码 项目:package-33c3 作者: info-beamer 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def connect(self):
        """Create SSL socket and connect to peer
        """
        if getattr(self, 'ssl_context', None):
            if not isinstance(self.ssl_context, SSL.Context):
                raise TypeError('Expecting OpenSSL.SSL.Context type for "'
                                'ssl_context" attribute; got %r instead' %
                                self.ssl_context)
            ssl_context = self.ssl_context
        else:
            ssl_context = SSL.Context(self.__class__.default_ssl_method)

        sock = socket.create_connection((self.host, self.port), self.timeout)

        # Tunnel if using a proxy - ONLY available for Python 2.6.2 and above      
        if getattr(self, '_tunnel_host', None):
            self.sock = sock
            self._tunnel()

        self.sock = SSLSocket(ssl_context, sock)

        # Go to client mode.
        self.sock.set_connect_state()
https.py 文件源码 项目:package-33c3 作者: info-beamer 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(self, ssl_context, debuglevel=0):
        """
        @param ssl_context:SSL context
        @type ssl_context: OpenSSL.SSL.Context
        @param debuglevel: debug level for HTTPSHandler
        @type debuglevel: int
        """
        AbstractHTTPHandler.__init__(self, debuglevel)

        if ssl_context is not None:
            if not isinstance(ssl_context, SSL.Context):
                raise TypeError('Expecting OpenSSL.SSL.Context type for "'
                                'ssl_context" keyword; got %r instead' %
                                ssl_context)
            self.ssl_context = ssl_context
        else:
            self.ssl_context = SSL.Context(SSL.TLSv1_METHOD)
https.py 文件源码 项目:package-33c3 作者: info-beamer 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def connect(self):
        """Create SSL socket and connect to peer
        """
        if getattr(self, 'ssl_context', None):
            if not isinstance(self.ssl_context, SSL.Context):
                raise TypeError('Expecting OpenSSL.SSL.Context type for "'
                                'ssl_context" attribute; got %r instead' %
                                self.ssl_context)
            ssl_context = self.ssl_context
        else:
            ssl_context = SSL.Context(self.__class__.default_ssl_method)

        sock = socket.create_connection((self.host, self.port), self.timeout)

        # Tunnel if using a proxy - ONLY available for Python 2.6.2 and above      
        if getattr(self, '_tunnel_host', None):
            self.sock = sock
            self._tunnel()

        self.sock = SSLSocket(ssl_context, sock)

        # Go to client mode.
        self.sock.set_connect_state()
https.py 文件源码 项目:package-33c3 作者: info-beamer 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def __init__(self, ssl_context, debuglevel=0):
        """
        @param ssl_context:SSL context
        @type ssl_context: OpenSSL.SSL.Context
        @param debuglevel: debug level for HTTPSHandler
        @type debuglevel: int
        """
        AbstractHTTPHandler.__init__(self, debuglevel)

        if ssl_context is not None:
            if not isinstance(ssl_context, SSL.Context):
                raise TypeError('Expecting OpenSSL.SSL.Context type for "'
                                'ssl_context" keyword; got %r instead' %
                                ssl_context)
            self.ssl_context = ssl_context
        else:
            self.ssl_context = SSL.Context(SSL.TLSv1_METHOD)
__init__.py 文件源码 项目:lantern-detection 作者: gongxijun 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def bind(self, family, type, proto=0):
        """Create (or recreate) the actual socket object."""
        self.socket = socket.socket(family, type, proto)
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
##        self.socket.setsockopt(socket.SOL_SOCKET, socket.TCP_NODELAY, 1)
        if self.ssl_certificate and self.ssl_private_key:
            if SSL is None:
                raise ImportError("You must install pyOpenSSL to use HTTPS.")

            # See http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/442473
            ctx = SSL.Context(SSL.SSLv23_METHOD)
            ctx.use_privatekey_file(self.ssl_private_key)
            ctx.use_certificate_file(self.ssl_certificate)
            self.socket = SSLConnection(ctx, self.socket)
            self.populate_ssl_environ()
        self.socket.bind(self.bind_addr)
__init__.py 文件源码 项目:lantern-detection 作者: gongxijun 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def bind(self, family, type, proto=0):
        """Create (or recreate) the actual socket object."""
        self.socket = socket.socket(family, type, proto)
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
##        self.socket.setsockopt(socket.SOL_SOCKET, socket.TCP_NODELAY, 1)
        if self.ssl_certificate and self.ssl_private_key:
            if SSL is None:
                raise ImportError("You must install pyOpenSSL to use HTTPS.")

            # See http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/442473
            ctx = SSL.Context(SSL.SSLv23_METHOD)
            ctx.use_privatekey_file(self.ssl_private_key)
            ctx.use_certificate_file(self.ssl_certificate)
            self.socket = SSLConnection(ctx, self.socket)
            self.populate_ssl_environ()
        self.socket.bind(self.bind_addr)
test_https.py 文件源码 项目:slack_scholar 作者: xLeitix 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test03_ssl_verification_of_peer_fails(self):
        ctx = SSL.Context(SSL.TLSv1_METHOD)

        def verify_callback(conn, x509, errnum, errdepth, preverify_ok): 
            log.debug('SSL peer certificate verification failed for %r',
                      x509.get_subject())
            return preverify_ok 

        ctx.set_verify(SSL.VERIFY_PEER, verify_callback)
        ctx.set_verify_depth(9)

        # Set bad location - unit test dir has no CA certs to verify with
        ctx.load_verify_locations(None, Constants.UNITTEST_DIR)

        conn = HTTPSConnection(Constants.HOSTNAME, port=Constants.PORT,
                               ssl_context=ctx)
        conn.connect()        
        self.assertRaises(SSL.Error, conn.request, 'GET', '/')
test_https.py 文件源码 项目:slack_scholar 作者: xLeitix 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test04_ssl_verification_with_subj_alt_name(self):
        ctx = SSL.Context(SSL.TLSv1_METHOD)

        verification = ServerSSLCertVerification(hostname='localhost')
        verify_callback = verification.get_verify_server_cert_func()

        ctx.set_verify(SSL.VERIFY_PEER, verify_callback)
        ctx.set_verify_depth(9)

        # Set correct location for CA certs to verify with
        ctx.load_verify_locations(None, Constants.CACERT_DIR)

        conn = HTTPSConnection(Constants.HOSTNAME, port=Constants.PORT,
                               ssl_context=ctx)
        conn.connect()
        conn.request('GET', '/')
        resp = conn.getresponse()
        print('Response = %s' % resp.read())
test_https.py 文件源码 项目:slack_scholar 作者: xLeitix 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test04_ssl_verification_with_subj_common_name(self):
        ctx = SSL.Context(SSL.TLSv1_METHOD)

        # Explicitly set verification of peer hostname using peer certificate
        # subject common name
        verification = ServerSSLCertVerification(hostname='localhost',
                                                 subj_alt_name_match=False)

        verify_callback = verification.get_verify_server_cert_func()

        ctx.set_verify(SSL.VERIFY_PEER, verify_callback)
        ctx.set_verify_depth(9)

        # Set correct location for CA certs to verify with
        ctx.load_verify_locations(None, Constants.CACERT_DIR)

        conn = HTTPSConnection(Constants.HOSTNAME, port=Constants.PORT,
                               ssl_context=ctx)
        conn.connect()
        conn.request('GET', '/')
        resp = conn.getresponse()
        print('Response = %s' % resp.read())
https.py 文件源码 项目:slack_scholar 作者: xLeitix 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def connect(self):
        """Create SSL socket and connect to peer
        """
        if getattr(self, 'ssl_context', None):
            if not isinstance(self.ssl_context, SSL.Context):
                raise TypeError('Expecting OpenSSL.SSL.Context type for "'
                                'ssl_context" attribute; got %r instead' %
                                self.ssl_context)
            ssl_context = self.ssl_context
        else:
            ssl_context = SSL.Context(self.__class__.default_ssl_method)

        sock = socket.create_connection((self.host, self.port), self.timeout)

        # Tunnel if using a proxy - ONLY available for Python 2.6.2 and above      
        if getattr(self, '_tunnel_host', None):
            self.sock = sock
            self._tunnel()

        self.sock = SSLSocket(ssl_context, sock)

        # Go to client mode.
        self.sock.set_connect_state()
_sslverify.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __init__(self, hostname, ctx):
        """
        Initialize L{ClientTLSOptions}.

        @param hostname: The hostname to verify as input by a human.
        @type hostname: L{unicode}

        @param ctx: an L{OpenSSL.SSL.Context} to use for new connections.
        @type ctx: L{OpenSSL.SSL.Context}.
        """
        self._ctx = ctx
        self._hostname = hostname
        self._hostnameBytes = _idnaBytes(hostname)
        self._hostnameASCII = self._hostnameBytes.decode("ascii")
        ctx.set_info_callback(
            _tolerateErrors(self._identityVerifyingInfoCallback)
        )
_sslverify.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _identityVerifyingInfoCallback(self, connection, where, ret):
        """
        U{info_callback
        <http://pythonhosted.org/pyOpenSSL/api/ssl.html#OpenSSL.SSL.Context.set_info_callback>
        } for pyOpenSSL that verifies the hostname in the presented certificate
        matches the one passed to this L{ClientTLSOptions}.

        @param connection: the connection which is handshaking.
        @type connection: L{OpenSSL.SSL.Connection}

        @param where: flags indicating progress through a TLS handshake.
        @type where: L{int}

        @param ret: ignored
        @type ret: ignored
        """
        if where & SSL.SSL_CB_HANDSHAKE_START:
            connection.set_tlsext_host_name(self._hostnameBytes)
        elif where & SSL.SSL_CB_HANDSHAKE_DONE:
            try:
                verifyHostname(connection, self._hostnameASCII)
            except VerificationError:
                f = Failure()
                transport = connection.get_app_data()
                transport.failVerification(f)
test_sslverify.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_extraChainFilesAreAddedIfSupplied(self):
        """
        If C{extraCertChain} is set and all prerequisites are met, the
        specified chain certificates are added to C{Context}s that get
        created.
        """
        opts = sslverify.OpenSSLCertificateOptions(
            privateKey=self.sKey,
            certificate=self.sCert,
            extraCertChain=self.extraCertChain,
        )
        opts._contextFactory = FakeContext
        ctx = opts.getContext()
        self.assertEqual(self.sKey, ctx._privateKey)
        self.assertEqual(self.sCert, ctx._certificate)
        self.assertEqual(self.extraCertChain, ctx._extraCertChain)
sslold.py 文件源码 项目:Docker-XX-Net 作者: kuanghy 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def ssl(sock, keyfile=None, certfile=None):
    context = SSL.Context(SSL.SSLv23_METHOD)
    if certfile is not None:
        context.use_certificate_file(certfile)
    if keyfile is not None:
        context.use_privatekey_file(keyfile)
    context.set_verify(SSL.VERIFY_NONE, lambda *x: True)
    timeout = sock.gettimeout()
    try:
        sock = sock._sock
    except AttributeError:
        pass
    connection = SSL.Connection(context, sock)
    ssl_sock = SSLObject(connection)
    ssl_sock.settimeout(timeout)

    try:
        sock.getpeername()
    except Exception:
        # no, no connection yet
        pass
    else:
        # yes, do the handshake
        ssl_sock.do_handshake()
    return ssl_sock
ddnsv2.py 文件源码 项目:simple-ddns 作者: ihciah 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def main():
    resolver = DNSResolver()
    factory = server.DNSServerFactory(
        clients=[resolver]
    )

    protocol = dns.DNSDatagramProtocol(controller=factory)
    httpserver = webserver.Site(HTTPServer(resolver))
    context = Context(TLSv1_METHOD)
    context.use_certificate_chain_file(SERVER_CONFIG["ssl_crt"])
    context.use_privatekey_file(SERVER_CONFIG["ssl_key"])

    reactor.listenUDP(SERVER_CONFIG["dns_port"], protocol)
    reactor.listenSSL(SERVER_CONFIG["http_port"], httpserver, ContextFactory(context))

    reactor.run()
streamer.py 文件源码 项目:estreamer 作者: spohara79 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __enter__(self):
        self.ctx = SSL.Context(SSL.TLSv1_METHOD)
        if self.verify:
            self.ctx.set_verify(SSL.VERIFY_PEER, self.validate_cert)
            self.ctx.load_verify_locations(self.verify)
            self.trusted_cert = crypto.load_certificate(crypto.FILETYPE_PEM, file(self.verify).read())
        self.ctx.use_privatekey(self.pkey)
        self.ctx.use_certificate(self.cert)
        self.sock = SSL.Connection(self.ctx, socket.socket(socket.AF_INET, socket.SOCK_STREAM))
        self.sock.connect((self.host, self.port))
        return self


问题


面经


文章

微信
公众号

扫码关注公众号