python类SSLSocket()的实例源码

test_ftplib.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_context(self):
        self.client.quit()
        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
        self.assertRaises(ValueError, ftplib.FTP_TLS, keyfile=CERTFILE,
                          context=ctx)
        self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE,
                          context=ctx)
        self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE,
                          keyfile=CERTFILE, context=ctx)

        self.client = ftplib.FTP_TLS(context=ctx, timeout=10)
        self.client.connect(self.server.host, self.server.port)
        self.assertNotIsInstance(self.client.sock, ssl.SSLSocket)
        self.client.auth()
        self.assertIs(self.client.sock.context, ctx)
        self.assertIsInstance(self.client.sock, ssl.SSLSocket)

        self.client.prot_p()
        with self.client.transfercmd('list') as sock:
            self.assertIs(sock.context, ctx)
            self.assertIsInstance(sock, ssl.SSLSocket)
ftplib.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def retrlines(self, cmd, callback = None):
            if callback is None: callback = print_line
            resp = self.sendcmd('TYPE A')
            conn = self.transfercmd(cmd)
            fp = conn.makefile('rb')
            try:
                while 1:
                    line = fp.readline()
                    if self.debugging > 2: print '*retr*', repr(line)
                    if not line:
                        break
                    if line[-2:] == CRLF:
                        line = line[:-2]
                    elif line[-1:] == '\n':
                        line = line[:-1]
                    callback(line)
                # shutdown ssl layer
                if isinstance(conn, ssl.SSLSocket):
                    conn.unwrap()
            finally:
                fp.close()
                conn.close()
            return self.voidresp()
netutil.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def ssl_wrap_socket(socket, ssl_options, server_hostname=None, **kwargs):
    """Returns an ``ssl.SSLSocket`` wrapping the given socket.

    ``ssl_options`` may be either an `ssl.SSLContext` object or a
    dictionary (as accepted by `ssl_options_to_context`).  Additional
    keyword arguments are passed to ``wrap_socket`` (either the
    `~ssl.SSLContext` method or the `ssl` module function as
    appropriate).
    """
    context = ssl_options_to_context(ssl_options)
    if hasattr(ssl, 'SSLContext') and isinstance(context, ssl.SSLContext):
        if server_hostname is not None and getattr(ssl, 'HAS_SNI'):
            # Python doesn't have server-side SNI support so we can't
            # really unittest this, but it can be manually tested with
            # python3.2 -m tornado.httpclient https://sni.velox.ch
            return context.wrap_socket(socket, server_hostname=server_hostname,
                                       **kwargs)
        else:
            return context.wrap_socket(socket, **kwargs)
    else:
        return ssl.wrap_socket(socket, **dict(context, **kwargs))
netutil.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def ssl_wrap_socket(socket, ssl_options, server_hostname=None, **kwargs):
    """Returns an ``ssl.SSLSocket`` wrapping the given socket.

    ``ssl_options`` may be either an `ssl.SSLContext` object or a
    dictionary (as accepted by `ssl_options_to_context`).  Additional
    keyword arguments are passed to ``wrap_socket`` (either the
    `~ssl.SSLContext` method or the `ssl` module function as
    appropriate).
    """
    context = ssl_options_to_context(ssl_options)
    if hasattr(ssl, 'SSLContext') and isinstance(context, ssl.SSLContext):
        if server_hostname is not None and getattr(ssl, 'HAS_SNI'):
            # Python doesn't have server-side SNI support so we can't
            # really unittest this, but it can be manually tested with
            # python3.2 -m tornado.httpclient https://sni.velox.ch
            return context.wrap_socket(socket, server_hostname=server_hostname,
                                       **kwargs)
        else:
            return context.wrap_socket(socket, **kwargs)
    else:
        return ssl.wrap_socket(socket, **dict(context, **kwargs))
netutil.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def ssl_wrap_socket(socket, ssl_options, server_hostname=None, **kwargs):
    """Returns an ``ssl.SSLSocket`` wrapping the given socket.

    ``ssl_options`` may be either an `ssl.SSLContext` object or a
    dictionary (as accepted by `ssl_options_to_context`).  Additional
    keyword arguments are passed to ``wrap_socket`` (either the
    `~ssl.SSLContext` method or the `ssl` module function as
    appropriate).
    """
    context = ssl_options_to_context(ssl_options)
    if hasattr(ssl, 'SSLContext') and isinstance(context, ssl.SSLContext):
        if server_hostname is not None and getattr(ssl, 'HAS_SNI'):
            # Python doesn't have server-side SNI support so we can't
            # really unittest this, but it can be manually tested with
            # python3.2 -m tornado.httpclient https://sni.velox.ch
            return context.wrap_socket(socket, server_hostname=server_hostname,
                                       **kwargs)
        else:
            return context.wrap_socket(socket, **kwargs)
    else:
        return ssl.wrap_socket(socket, **dict(context, **kwargs))
ftplib.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def retrlines(self, cmd, callback = None):
            if callback is None: callback = print_line
            resp = self.sendcmd('TYPE A')
            conn = self.transfercmd(cmd)
            fp = conn.makefile('rb')
            try:
                while 1:
                    line = fp.readline(self.maxline + 1)
                    if len(line) > self.maxline:
                        raise Error("got more than %d bytes" % self.maxline)
                    if self.debugging > 2: print '*retr*', repr(line)
                    if not line:
                        break
                    if line[-2:] == CRLF:
                        line = line[:-2]
                    elif line[-1:] == '\n':
                        line = line[:-1]
                    callback(line)
                # shutdown ssl layer
                if isinstance(conn, ssl.SSLSocket):
                    conn.unwrap()
            finally:
                fp.close()
                conn.close()
            return self.voidresp()
test_ssl_context.py 文件源码 项目:httplib2 作者: httplib2 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def testHttpsContext(self):
        client = httplib2.Http(ca_certs=self.ca_certs_path)

        # Establish connection to local server
        client.request('https://localhost:%d/' % (self.port))

        # Verify that connection uses a TLS context with the correct hostname
        conn = client.connections['https:localhost:%d' % self.port]

        self.assertIsInstance(conn.sock, ssl.SSLSocket)
        self.assertTrue(hasattr(conn.sock, 'context'))
        self.assertIsInstance(conn.sock.context, ssl.SSLContext)
        self.assertTrue(conn.sock.context.check_hostname)
        self.assertEqual(conn.sock.server_hostname, 'localhost')
        self.assertEqual(conn.sock.context.verify_mode, ssl.CERT_REQUIRED)
        self.assertEqual(conn.sock.context.protocol, ssl.PROTOCOL_SSLv23)
tornado2.py 文件源码 项目:http2 作者: mSOHU 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def ssl_wrap_socket(cls, s, ssl_options, server_hostname=None, **kwargs):
        """Returns an ``ssl.SSLSocket`` wrapping the given socket.

        ``ssl_options`` may be either a dictionary (as accepted by
        `ssl_options_to_context`) or an `ssl.SSLContext` object.
        Additional keyword arguments are passed to ``wrap_socket``
        (either the `~ssl.SSLContext` method or the `ssl` module function
        as appropriate).
        """
        context = ssl_options_to_context(ssl_options)
        if hasattr(ssl, 'SSLContext') and isinstance(context, ssl.SSLContext):
            if server_hostname is not None and getattr(ssl, 'HAS_SNI'):
                # Python doesn't have server-side SNI support so we can't
                # really unittest this, but it can be manually tested with
                # python3.2 -m tornado.httpclient https://sni.velox.ch
                return context.wrap_socket(s, server_hostname=server_hostname,
                                           **kwargs)
            else:
                return context.wrap_socket(s, **kwargs)
        else:
            return ssl.wrap_socket(s, **dict(context, **kwargs))
ftplib.py 文件源码 项目:Intranet-Penetration 作者: yuxiaokui 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def retrlines(self, cmd, callback = None):
            if callback is None: callback = print_line
            resp = self.sendcmd('TYPE A')
            conn = self.transfercmd(cmd)
            fp = conn.makefile('rb')
            try:
                while 1:
                    line = fp.readline(self.maxline + 1)
                    if len(line) > self.maxline:
                        raise Error("got more than %d bytes" % self.maxline)
                    if self.debugging > 2: print '*retr*', repr(line)
                    if not line:
                        break
                    if line[-2:] == CRLF:
                        line = line[:-2]
                    elif line[-1:] == '\n':
                        line = line[:-1]
                    callback(line)
                # shutdown ssl layer
                if isinstance(conn, ssl.SSLSocket):
                    conn.unwrap()
            finally:
                fp.close()
                conn.close()
            return self.voidresp()
ftplib.py 文件源码 项目:MKFQ 作者: maojingios 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def retrlines(self, cmd, callback = None):
            if callback is None: callback = print_line
            resp = self.sendcmd('TYPE A')
            conn = self.transfercmd(cmd)
            fp = conn.makefile('rb')
            try:
                while 1:
                    line = fp.readline(self.maxline + 1)
                    if len(line) > self.maxline:
                        raise Error("got more than %d bytes" % self.maxline)
                    if self.debugging > 2: print '*retr*', repr(line)
                    if not line:
                        break
                    if line[-2:] == CRLF:
                        line = line[:-2]
                    elif line[-1:] == '\n':
                        line = line[:-1]
                    callback(line)
                # shutdown ssl layer
                if isinstance(conn, ssl.SSLSocket):
                    conn.unwrap()
            finally:
                fp.close()
                conn.close()
            return self.voidresp()
ftplib.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def auth(self):
            '''Set up secure control connection by using TLS/SSL.'''
            if isinstance(self.sock, ssl.SSLSocket):
                raise ValueError("Already using TLS")
            if self.ssl_version == ssl.PROTOCOL_TLSv1:
                resp = self.voidcmd('AUTH TLS')
            else:
                resp = self.voidcmd('AUTH SSL')
            if self.context is not None:
                self.sock = self.context.wrap_socket(self.sock)
            else:
                self.sock = ssl.wrap_socket(self.sock, self.keyfile,
                                            self.certfile,
                                            ssl_version=self.ssl_version)
            self.file = self.sock.makefile(mode='r', encoding=self.encoding)
            return resp
ftplib.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def retrlines(self, cmd, callback = None):
            if callback is None: callback = print_line
            resp = self.sendcmd('TYPE A')
            conn = self.transfercmd(cmd)
            fp = conn.makefile('r', encoding=self.encoding)
            try:
                while 1:
                    line = fp.readline()
                    if self.debugging > 2: print('*retr*', repr(line))
                    if not line:
                        break
                    if line[-2:] == CRLF:
                        line = line[:-2]
                    elif line[-1:] == '\n':
                        line = line[:-1]
                    callback(line)
                # shutdown ssl layer
                if isinstance(conn, ssl.SSLSocket):
                    conn.unwrap()
            finally:
                fp.close()
                conn.close()
            return self.voidresp()
test_nntplib.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_starttls(self):
            file = self.server.file
            sock = self.server.sock
            try:
                self.server.starttls()
            except nntplib.NNTPPermanentError:
                self.skipTest("STARTTLS not supported by server.")
            else:
                # Check that the socket and internal pseudo-file really were
                # changed.
                self.assertNotEqual(file, self.server.file)
                self.assertNotEqual(sock, self.server.sock)
                # Check that the new socket really is an SSL one
                self.assertIsInstance(self.server.sock, ssl.SSLSocket)
                # Check that trying starttls when it's already active fails.
                self.assertRaises(ValueError, self.server.starttls)
test_poplib.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def test_context(self):
            ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
            self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host,
                              self.server.port, keyfile=CERTFILE, context=ctx)
            self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host,
                              self.server.port, certfile=CERTFILE, context=ctx)
            self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host,
                              self.server.port, keyfile=CERTFILE,
                              certfile=CERTFILE, context=ctx)

            self.client.quit()
            self.client = poplib.POP3_SSL(self.server.host, self.server.port,
                                          context=ctx)
            self.assertIsInstance(self.client.sock, ssl.SSLSocket)
            self.assertIs(self.client.sock.context, ctx)
            self.assertTrue(self.client.noop().startswith(b'+OK'))
test_ftplib.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_data_connection(self):
        # clear text
        with self.client.transfercmd('list') as sock:
            self.assertNotIsInstance(sock, ssl.SSLSocket)
        self.assertEqual(self.client.voidresp(), "226 transfer complete")

        # secured, after PROT P
        self.client.prot_p()
        with self.client.transfercmd('list') as sock:
            self.assertIsInstance(sock, ssl.SSLSocket)
        self.assertEqual(self.client.voidresp(), "226 transfer complete")

        # PROT C is issued, the connection must be in cleartext again
        self.client.prot_c()
        with self.client.transfercmd('list') as sock:
            self.assertNotIsInstance(sock, ssl.SSLSocket)
        self.assertEqual(self.client.voidresp(), "226 transfer complete")
ftplib.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def retrlines(self, cmd, callback = None):
            if callback is None: callback = print_line
            resp = self.sendcmd('TYPE A')
            conn = self.transfercmd(cmd)
            fp = conn.makefile('rb')
            try:
                while 1:
                    line = fp.readline(self.maxline + 1)
                    if len(line) > self.maxline:
                        raise Error("got more than %d bytes" % self.maxline)
                    if self.debugging > 2: print '*retr*', repr(line)
                    if not line:
                        break
                    if line[-2:] == CRLF:
                        line = line[:-2]
                    elif line[-1:] == '\n':
                        line = line[:-1]
                    callback(line)
                # shutdown ssl layer
                if isinstance(conn, ssl.SSLSocket):
                    conn.unwrap()
            finally:
                fp.close()
                conn.close()
            return self.voidresp()
test_ftplib.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_data_connection(self):
        # clear text
        sock = self.client.transfercmd('list')
        self.assertNotIsInstance(sock, ssl.SSLSocket)
        sock.close()
        self.assertEqual(self.client.voidresp(), "226 transfer complete")

        # secured, after PROT P
        self.client.prot_p()
        sock = self.client.transfercmd('list')
        self.assertIsInstance(sock, ssl.SSLSocket)
        sock.close()
        self.assertEqual(self.client.voidresp(), "226 transfer complete")

        # PROT C is issued, the connection must be in cleartext again
        self.client.prot_c()
        sock = self.client.transfercmd('list')
        self.assertNotIsInstance(sock, ssl.SSLSocket)
        sock.close()
        self.assertEqual(self.client.voidresp(), "226 transfer complete")
test_ftplib.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_context(self):
        self.client.quit()
        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
        self.assertRaises(ValueError, ftplib.FTP_TLS, keyfile=CERTFILE,
                          context=ctx)
        self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE,
                          context=ctx)
        self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE,
                          keyfile=CERTFILE, context=ctx)

        self.client = ftplib.FTP_TLS(context=ctx, timeout=TIMEOUT)
        self.client.connect(self.server.host, self.server.port)
        self.assertNotIsInstance(self.client.sock, ssl.SSLSocket)
        self.client.auth()
        self.assertIs(self.client.sock.context, ctx)
        self.assertIsInstance(self.client.sock, ssl.SSLSocket)

        self.client.prot_p()
        sock = self.client.transfercmd('list')
        try:
            self.assertIs(sock.context, ctx)
            self.assertIsInstance(sock, ssl.SSLSocket)
        finally:
            sock.close()
ftplib.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def retrlines(self, cmd, callback = None):
            if callback is None: callback = print_line
            resp = self.sendcmd('TYPE A')
            conn = self.transfercmd(cmd)
            fp = conn.makefile('rb')
            try:
                while 1:
                    line = fp.readline(self.maxline + 1)
                    if len(line) > self.maxline:
                        raise Error("got more than %d bytes" % self.maxline)
                    if self.debugging > 2: print '*retr*', repr(line)
                    if not line:
                        break
                    if line[-2:] == CRLF:
                        line = line[:-2]
                    elif line[-1:] == '\n':
                        line = line[:-1]
                    callback(line)
                # shutdown ssl layer
                if isinstance(conn, ssl.SSLSocket):
                    conn.unwrap()
            finally:
                fp.close()
                conn.close()
            return self.voidresp()
test_ftplib.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_data_connection(self):
        # clear text
        sock = self.client.transfercmd('list')
        self.assertNotIsInstance(sock, ssl.SSLSocket)
        sock.close()
        self.assertEqual(self.client.voidresp(), "226 transfer complete")

        # secured, after PROT P
        self.client.prot_p()
        sock = self.client.transfercmd('list')
        self.assertIsInstance(sock, ssl.SSLSocket)
        sock.close()
        self.assertEqual(self.client.voidresp(), "226 transfer complete")

        # PROT C is issued, the connection must be in cleartext again
        self.client.prot_c()
        sock = self.client.transfercmd('list')
        self.assertNotIsInstance(sock, ssl.SSLSocket)
        sock.close()
        self.assertEqual(self.client.voidresp(), "226 transfer complete")
test_ftplib.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_context(self):
        self.client.quit()
        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
        self.assertRaises(ValueError, ftplib.FTP_TLS, keyfile=CERTFILE,
                          context=ctx)
        self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE,
                          context=ctx)
        self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE,
                          keyfile=CERTFILE, context=ctx)

        self.client = ftplib.FTP_TLS(context=ctx, timeout=TIMEOUT)
        self.client.connect(self.server.host, self.server.port)
        self.assertNotIsInstance(self.client.sock, ssl.SSLSocket)
        self.client.auth()
        self.assertIs(self.client.sock.context, ctx)
        self.assertIsInstance(self.client.sock, ssl.SSLSocket)

        self.client.prot_p()
        sock = self.client.transfercmd('list')
        try:
            self.assertIs(sock.context, ctx)
            self.assertIsInstance(sock, ssl.SSLSocket)
        finally:
            sock.close()
ftplib.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def retrlines(self, cmd, callback = None):
            if callback is None: callback = print_line
            resp = self.sendcmd('TYPE A')
            conn = self.transfercmd(cmd)
            fp = conn.makefile('rb')
            try:
                while 1:
                    line = fp.readline(self.maxline + 1)
                    if len(line) > self.maxline:
                        raise Error("got more than %d bytes" % self.maxline)
                    if self.debugging > 2: print '*retr*', repr(line)
                    if not line:
                        break
                    if line[-2:] == CRLF:
                        line = line[:-2]
                    elif line[-1:] == '\n':
                        line = line[:-1]
                    callback(line)
                # shutdown ssl layer
                if isinstance(conn, ssl.SSLSocket):
                    conn.unwrap()
            finally:
                fp.close()
                conn.close()
            return self.voidresp()
netutil.py 文件源码 项目:My-Web-Server-Framework-With-Python2.7 作者: syjsu 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def ssl_wrap_socket(socket, ssl_options, server_hostname=None, **kwargs):
    """Returns an ``ssl.SSLSocket`` wrapping the given socket.

    ``ssl_options`` may be either an `ssl.SSLContext` object or a
    dictionary (as accepted by `ssl_options_to_context`).  Additional
    keyword arguments are passed to ``wrap_socket`` (either the
    `~ssl.SSLContext` method or the `ssl` module function as
    appropriate).
    """
    context = ssl_options_to_context(ssl_options)
    if hasattr(ssl, 'SSLContext') and isinstance(context, ssl.SSLContext):
        if server_hostname is not None and getattr(ssl, 'HAS_SNI'):
            # Python doesn't have server-side SNI support so we can't
            # really unittest this, but it can be manually tested with
            # python3.2 -m tornado.httpclient https://sni.velox.ch
            return context.wrap_socket(socket, server_hostname=server_hostname,
                                       **kwargs)
        else:
            return context.wrap_socket(socket, **kwargs)
    else:
        return ssl.wrap_socket(socket, **dict(context, **kwargs))
test_nntplib.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_starttls(self):
            file = self.server.file
            sock = self.server.sock
            try:
                self.server.starttls()
            except nntplib.NNTPPermanentError:
                self.skipTest("STARTTLS not supported by server.")
            else:
                # Check that the socket and internal pseudo-file really were
                # changed.
                self.assertNotEqual(file, self.server.file)
                self.assertNotEqual(sock, self.server.sock)
                # Check that the new socket really is an SSL one
                self.assertIsInstance(self.server.sock, ssl.SSLSocket)
                # Check that trying starttls when it's already active fails.
                self.assertRaises(ValueError, self.server.starttls)
test_poplib.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_context(self):
            ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
            self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host,
                              self.server.port, keyfile=CERTFILE, context=ctx)
            self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host,
                              self.server.port, certfile=CERTFILE, context=ctx)
            self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host,
                              self.server.port, keyfile=CERTFILE,
                              certfile=CERTFILE, context=ctx)

            self.client.quit()
            self.client = poplib.POP3_SSL(self.server.host, self.server.port,
                                          context=ctx)
            self.assertIsInstance(self.client.sock, ssl.SSLSocket)
            self.assertIs(self.client.sock.context, ctx)
            self.assertTrue(self.client.noop().startswith(b'+OK'))
test_ftplib.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_data_connection(self):
        # clear text
        with self.client.transfercmd('list') as sock:
            self.assertNotIsInstance(sock, ssl.SSLSocket)
        self.assertEqual(self.client.voidresp(), "226 transfer complete")

        # secured, after PROT P
        self.client.prot_p()
        with self.client.transfercmd('list') as sock:
            self.assertIsInstance(sock, ssl.SSLSocket)
        self.assertEqual(self.client.voidresp(), "226 transfer complete")

        # PROT C is issued, the connection must be in cleartext again
        self.client.prot_c()
        with self.client.transfercmd('list') as sock:
            self.assertNotIsInstance(sock, ssl.SSLSocket)
        self.assertEqual(self.client.voidresp(), "226 transfer complete")
test_ftplib.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_context(self):
        self.client.quit()
        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
        self.assertRaises(ValueError, ftplib.FTP_TLS, keyfile=CERTFILE,
                          context=ctx)
        self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE,
                          context=ctx)
        self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE,
                          keyfile=CERTFILE, context=ctx)

        self.client = ftplib.FTP_TLS(context=ctx, timeout=2)
        self.client.connect(self.server.host, self.server.port)
        self.assertNotIsInstance(self.client.sock, ssl.SSLSocket)
        self.client.auth()
        self.assertIs(self.client.sock.context, ctx)
        self.assertIsInstance(self.client.sock, ssl.SSLSocket)

        self.client.prot_p()
        with self.client.transfercmd('list') as sock:
            self.assertIs(sock.context, ctx)
            self.assertIsInstance(sock, ssl.SSLSocket)
netutil.py 文件源码 项目:annotated-py-tornado 作者: hhstore 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def ssl_wrap_socket(socket, ssl_options, server_hostname=None, **kwargs):
    """Returns an ``ssl.SSLSocket`` wrapping the given socket.

    ``ssl_options`` may be either a dictionary (as accepted by
    `ssl_options_to_context`) or an `ssl.SSLContext` object.
    Additional keyword arguments are passed to ``wrap_socket``
    (either the `~ssl.SSLContext` method or the `ssl` module function
    as appropriate).
    """
    context = ssl_options_to_context(ssl_options)
    if hasattr(ssl, 'SSLContext') and isinstance(context, ssl.SSLContext):
        if server_hostname is not None and getattr(ssl, 'HAS_SNI'):
            # Python doesn't have server-side SNI support so we can't
            # really unittest this, but it can be manually tested with
            # python3.2 -m tornado.httpclient https://sni.velox.ch
            return context.wrap_socket(socket, server_hostname=server_hostname,
                                       **kwargs)
        else:
            return context.wrap_socket(socket, **kwargs)
    else:
        return ssl.wrap_socket(socket, **dict(context, **kwargs))
netutil.py 文件源码 项目:annotated-py-tornado 作者: hhstore 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def ssl_wrap_socket(socket, ssl_options, server_hostname=None, **kwargs):
    """Returns an ``ssl.SSLSocket`` wrapping the given socket.

    ``ssl_options`` may be either an `ssl.SSLContext` object or a
    dictionary (as accepted by `ssl_options_to_context`).  Additional
    keyword arguments are passed to ``wrap_socket`` (either the
    `~ssl.SSLContext` method or the `ssl` module function as
    appropriate).
    """
    context = ssl_options_to_context(ssl_options)
    if hasattr(ssl, 'SSLContext') and isinstance(context, ssl.SSLContext):
        if server_hostname is not None and getattr(ssl, 'HAS_SNI'):
            # Python doesn't have server-side SNI support so we can't
            # really unittest this, but it can be manually tested with
            # python3.2 -m tornado.httpclient https://sni.velox.ch
            return context.wrap_socket(socket, server_hostname=server_hostname,
                                       **kwargs)
        else:
            return context.wrap_socket(socket, **kwargs)
    else:
        return ssl.wrap_socket(socket, **dict(context, **kwargs))
netutil.py 文件源码 项目:annotated-py-tornado 作者: hhstore 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def ssl_wrap_socket(socket, ssl_options, server_hostname=None, **kwargs):
    """Returns an ``ssl.SSLSocket`` wrapping the given socket.

    ``ssl_options`` may be either an `ssl.SSLContext` object or a
    dictionary (as accepted by `ssl_options_to_context`).  Additional
    keyword arguments are passed to ``wrap_socket`` (either the
    `~ssl.SSLContext` method or the `ssl` module function as
    appropriate).
    """
    context = ssl_options_to_context(ssl_options)
    if hasattr(ssl, 'SSLContext') and isinstance(context, ssl.SSLContext):
        if server_hostname is not None and getattr(ssl, 'HAS_SNI'):
            # Python doesn't have server-side SNI support so we can't
            # really unittest this, but it can be manually tested with
            # python3.2 -m tornado.httpclient https://sni.velox.ch
            return context.wrap_socket(socket, server_hostname=server_hostname,
                                       **kwargs)
        else:
            return context.wrap_socket(socket, **kwargs)
    else:
        return ssl.wrap_socket(socket, **dict(context, **kwargs))


问题


面经


文章

微信
公众号

扫码关注公众号