python类PROTOCOL_TLSv1_2()的实例源码

mqtt_base.py 文件源码 项目:log-tick 作者: xxv 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def connect(self):
        print("Connecting to {host}:{port}...".format(**self.mqtt_config))
        if 'ca_certs' in self.mqtt_config:
            self.mqtt.tls_set(self.mqtt_config['ca_certs'], tls_version=ssl.PROTOCOL_TLSv1_2)

        if 'user' in self.mqtt_config:
            self.mqtt.username_pw_set(self.mqtt_config['user'], self.mqtt_config['password'])
        self.mqtt.connect(self.mqtt_config['host'], self.mqtt_config['port'])
media_server.py 文件源码 项目:kawaii-player 作者: kanishka-linux 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def run(self):
        logger.info('starting server...')
        try:
            cert = os.path.join(home, 'cert.pem')
            if ui.https_media_server:
                if not os.path.exists(cert):
                    self.cert_signal.emit(cert)
            if not ui.https_media_server:
                server_address = ('', self.port)
                self.httpd = ThreadedHTTPServerLocal(server_address, HTTPServer_RequestHandler)
                self.set_local_ip_val()
                self.media_server_start.emit('http')
            elif ui.https_media_server and os.path.exists(cert):
                server_address = ('', self.port)
                self.httpd = ThreadedHTTPServerLocal(server_address, HTTPServer_RequestHandler)
                self.httpd.socket = ssl.wrap_socket(self.httpd.socket, certfile=cert, ssl_version=ssl.PROTOCOL_TLSv1_2)
                self.set_local_ip_val()
                self.media_server_start.emit('https')
            #httpd = MyTCPServer(server_address, HTTPServer_RequestHandler)
        except OSError as e:
            e_str = str(e)
            logger.info(e_str)
            if 'errno 99' in e_str.lower():
                txt = 'Your local IP changed..or port is blocked.\n..Trying to find new IP'
                send_notification(txt)
                self.ip = get_lan_ip()
                txt = 'Your New Address is '+self.ip+':'+str(self.port) + '\n Please restart the application'
                send_notification(txt)
                change_config_file(self.ip, self.port)
                server_address = (self.ip, self.port)
                ui.local_ip_stream = self.ip
                #httpd = MyTCPServer(server_address, HTTPServer_RequestHandler)
                #httpd = ThreadedHTTPServerLocal(server_address, HTTPServer_RequestHandler)
            else:
                pass
        if self.httpd:
            logger.info('running server...at..'+self.ip+':'+str(self.port))
            #httpd.allow_reuse_address = True
            self.httpd.serve_forever()
            logger.info('quitting http server')
        else:
            logger.info('server not started')
sunny.py 文件源码 项目:python-ngrok 作者: hauntek 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def ngrok_auth(options):
    host = 'www.ngrok.cc'
    port = 443
    try:
        client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        ssl_client = ssl.wrap_socket(client, ssl_version=ssl.PROTOCOL_TLSv1) # ssl.PROTOCOL_TLSv1_2
        ssl_client.connect((host, port))
    except Exception:
        print('???????: https://www.ngrok.cc ??.')
        time.sleep(10)
        sys.exit()

    header = "POST " + "/api/clientid/clientid/%s" + " HTTP/1.1" + "\r\n"
    header += "Content-Type: text/html" + "\r\n"
    header += "Host: %s" + "\r\n"
    header += "\r\n"
    buf = header % (options, host)
    ssl_client.sendall(buf.encode('utf-8')) # ?????

    fd = ssl_client.makefile('rb', 0)
    body = bytes()
    while True:
        line = fd.readline().decode('utf-8')
        if line == "\n" or line == "\r\n":
            chunk_size = int(fd.readline(), 16)
            if chunk_size > 0:
                body = fd.read(chunk_size).decode('utf-8')
                break

    ssl_client.close()

    authData = json.loads(body)
    if authData['status'] != 200:
        print('????:%s, ErrorCode:%s' % (authData['msg'], authData['status']))
        time.sleep(10)
        sys.exit()

    print('????,???????...')
    # ??????,?????[???id]
    ngrok_adds(authData['data'])
    proto = authData['server'].split(':')
    return proto
natapp.py 文件源码 项目:python-ngrok 作者: hauntek 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def natapp_auth(options):
    host = 'auth.natapp.cn'
    port = 443
    try:
        client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        ssl_client = ssl.wrap_socket(client, ssl_version=ssl.PROTOCOL_TLSv1) # ssl.PROTOCOL_TLSv1_2
        ssl_client.connect((host, port))
    except Exception:
        print('???????: https://auth.natapp.cn ??.')
        time.sleep(10)
        sys.exit()

    data = {
        'Authtoken': options['authtoken'],
        'Clienttoken': options['clienttoken'],
        'Token': 'fffeephptokenkhd672'
    }
    query = json.dumps(data)

    header = "POST " + "/auth" + " HTTP/1.1" + "\r\n"
    header += "Content-Type: text/html" + "\r\n"
    header += "Host: auth.natapp.cn" + "\r\n"
    header += "Content-Length: %d" + "\r\n"
    header += "\r\n" + "%s"
    buf = header % (len(query), query)
    ssl_client.sendall(buf.encode('utf-8')) # ?????

    fd = ssl_client.makefile('rb', 0)
    body = bytes()
    while True:
        line = fd.readline().decode('utf-8')
        if line == "\n" or line == "\r\n":
            chunk_size = int(fd.readline(), 16)
            if chunk_size > 0:
                body = fd.read(chunk_size).decode('utf-8')
                break

    ssl_client.close()

    authData = json.loads(body)
    if authData['success'] == False:
        print('????:%s, ErrorCode:%s' % (authData['msg'], authData['errorCode']))
        time.sleep(10)
        sys.exit()

    print('????,???????...')
    proto = authData['data']['ServerAddr'].split(':')
    return proto
tls_server.py 文件源码 项目:packetweaver 作者: ANSSI-FR 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def main(self):
        # Check Python version
        py_ver = sys.version_info
        if (
            py_ver.major < 2
            or (
                py_ver.major == 2
                and (
                    py_ver.minor < 7
                    or (py_ver.minor >= 7 and py_ver.micro < 10)
                )
            )
        ):
            raise Exception('Your version of Python and Python-ssl are too old. Please upgrade to more "current" versions')

        # Set up SSL/TLS context
        tls_version_table = {
            'SSLv3': ssl.PROTOCOL_SSLv23,
            'TLSv1': ssl.PROTOCOL_TLSv1,
            'TLSv1.1': ssl.PROTOCOL_TLSv1_1,
            'TLSv1.2': ssl.PROTOCOL_TLSv1_2,
        }

        tls_version = tls_version_table[self.version]

        ctx = ssl.SSLContext(tls_version)
        if not isinstance(self.alpn, type(None)):
            ctx.set_alpn_protocols(','.join(self.alpn))
        ctx.set_ciphers(self.cipher_suites)
        if not isinstance(self.cacert_file, type(None)):
            ctx.load_verify_locations(cafile=self.cacert_file)

        ctx.load_cert_chain(self.cert_file, self.key_file)

        if self.protocol == 'IPv4':
            server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        else:
            server_sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)

        server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)

        ssl_sock = ctx.wrap_socket(server_sock, server_side=True)

        ssl_sock.bind(('' if isinstance(self.ip_dst, type(None)) else self.ip_dst, self.port_dst))

        ssl_sock.listen(self.backlog_size)
        ssl_sock.settimeout(self.timeout)

        self._serve(ssl_sock)

        try:
            server_sock = ssl_sock.unwrap()
            server_sock.shutdown(socket.SHUT_RDWR)
        except:
            pass
        finally:
            server_sock.close()
__init__.py 文件源码 项目:commissaire-http 作者: projectatomic 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def __init__(self, bind_host, bind_port, dispatcher,
                 tls_pem_file=None, tls_clientverify_file=None):
        """
        Initializes a new CommissaireHttpServer instance.

        :param bind_host: Host adapter to listen on.
        :type bind_host: str
        :param bind_port: Host port to listen on.
        :type bind_port: int
        :param dispatcher: Dispatcher instance (WSGI) to route and respond.
        :type dispatcher: commissaire_http.dispatcher.Dispatcher
        :param tls_pem_file: Full path to the PEM file for TLS.
        :type tls_pem_file: str
        :param tls_clientverify_file: Full path to CA to verify certs.
        :type tls_clientverify_file: str
        """
        self._bind_host = bind_host
        self._bind_port = bind_port
        self._tls_pem_file = tls_pem_file
        self._tls_clientverify_file = tls_clientverify_file
        self.dispatcher = dispatcher
        self._httpd = make_server(
            self._bind_host,
            self._bind_port,
            RoutesMiddleware(
                self.dispatcher.dispatch,
                self.dispatcher.router),
            server_class=ThreadedWSGIServer,
            handler_class=CommissaireRequestHandler)

        # If we are given a PEM file then wrap the socket
        if tls_pem_file:
            import ssl
            client_side_cert_kwargs = {}
            if self._tls_clientverify_file:
                client_side_cert_kwargs = {
                    'cert_reqs': ssl.CERT_REQUIRED,
                    'ca_certs': self._tls_clientverify_file,
                }
                self.logger.info(
                    'Requiring client side certificate CA validation.')

            self._httpd.socket = ssl.wrap_socket(
                self._httpd.socket,
                certfile=self._tls_pem_file,
                ssl_version=ssl.PROTOCOL_TLSv1_2,
                server_side=True,
                **client_side_cert_kwargs)
            self.logger.info('Using TLS with %s', self._tls_pem_file)

        self.logger.debug(
            'Created httpd server: %s:%s', self._bind_host, self._bind_port)
AmqpLink.py 文件源码 项目:py-IoticAgent 作者: Iotic-Labs 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def __get_ssl_context(cls, sslca=None):
        """Make an SSLConext for this Python version using public or sslca
        """
        if ((version_info[0] == 2 and (version_info[1] >= 7 and version_info[2] >= 5)) or
                (version_info[0] == 3 and version_info[1] >= 4)):
            logger.debug('SSL method for 2.7.5+ / 3.4+')
            # pylint: disable=no-name-in-module
            from ssl import SSLContext, PROTOCOL_TLSv1_2, CERT_REQUIRED, OP_NO_COMPRESSION
            ctx = SSLContext(PROTOCOL_TLSv1_2)
            ctx.set_ciphers('HIGH:!SSLv3:!TLSv1:!aNULL:@STRENGTH')
            # see CRIME security exploit
            ctx.options |= OP_NO_COMPRESSION
            # the following options are used to verify the identity of the broker
            if sslca:
                ctx.load_verify_locations(sslca)
                ctx.verify_mode = CERT_REQUIRED
                ctx.check_hostname = False
            else:
                # Verify public certifcates if sslca is None (default)
                from ssl import Purpose  # pylint: disable=no-name-in-module
                ctx.load_default_certs(purpose=Purpose.SERVER_AUTH)
                ctx.verify_mode = CERT_REQUIRED
                ctx.check_hostname = True

        elif version_info[0] == 3 and version_info[1] < 4:
            logger.debug('Using SSL method for 3.2+, < 3.4')
            # pylint: disable=no-name-in-module
            from ssl import SSLContext, CERT_REQUIRED, PROTOCOL_SSLv23, OP_NO_SSLv2, OP_NO_SSLv3, OP_NO_TLSv1
            ctx = SSLContext(PROTOCOL_SSLv23)
            ctx.options |= (OP_NO_SSLv2 | OP_NO_SSLv3 | OP_NO_TLSv1)
            ctx.set_ciphers('HIGH:!SSLv3:!TLSv1:!aNULL:@STRENGTH')
            # the following options are used to verify the identity of the broker
            if sslca:
                ctx.load_verify_locations(sslca)
                ctx.verify_mode = CERT_REQUIRED
            else:
                # Verify public certifcates if sslca is None (default)
                ctx.set_default_verify_paths()
                ctx.verify_mode = CERT_REQUIRED

        else:
            raise Exception("Unsupported Python version %s" % '.'.join(str(item) for item in version_info[:3]))

        return ctx


问题


面经


文章

微信
公众号

扫码关注公众号