python类SSLSocket()的实例源码

ftplib.py 文件源码 项目:xxNet 作者: drzorm 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def retrlines(self, cmd, callback = None):
            if callback is None: callback = print_line
            resp = self.sendcmd('TYPE A')
            conn = self.transfercmd(cmd)
            fp = conn.makefile('rb')
            try:
                while 1:
                    line = fp.readline(self.maxline + 1)
                    if len(line) > self.maxline:
                        raise Error("got more than %d bytes" % self.maxline)
                    if self.debugging > 2: print '*retr*', repr(line)
                    if not line:
                        break
                    if line[-2:] == CRLF:
                        line = line[:-2]
                    elif line[-1:] == '\n':
                        line = line[:-1]
                    callback(line)
                # shutdown ssl layer
                if isinstance(conn, ssl.SSLSocket):
                    conn.unwrap()
            finally:
                fp.close()
                conn.close()
            return self.voidresp()
ftplib.py 文件源码 项目:pefile.pypy 作者: cloudtracer 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def retrlines(self, cmd, callback = None):
            if callback is None: callback = print_line
            resp = self.sendcmd('TYPE A')
            conn = self.transfercmd(cmd)
            fp = conn.makefile('rb')
            try:
                while 1:
                    line = fp.readline(self.maxline + 1)
                    if len(line) > self.maxline:
                        raise Error("got more than %d bytes" % self.maxline)
                    if self.debugging > 2: print '*retr*', repr(line)
                    if not line:
                        break
                    if line[-2:] == CRLF:
                        line = line[:-2]
                    elif line[-1:] == '\n':
                        line = line[:-1]
                    callback(line)
                # shutdown ssl layer
                if isinstance(conn, ssl.SSLSocket):
                    conn.unwrap()
            finally:
                fp.close()
                conn.close()
            return self.voidresp()
test_ftplib.py 文件源码 项目:pefile.pypy 作者: cloudtracer 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_data_connection(self):
        # clear text
        sock = self.client.transfercmd('list')
        self.assertNotIsInstance(sock, ssl.SSLSocket)
        sock.close()
        self.assertEqual(self.client.voidresp(), "226 transfer complete")

        # secured, after PROT P
        self.client.prot_p()
        sock = self.client.transfercmd('list')
        self.assertIsInstance(sock, ssl.SSLSocket)
        sock.close()
        self.assertEqual(self.client.voidresp(), "226 transfer complete")

        # PROT C is issued, the connection must be in cleartext again
        self.client.prot_c()
        sock = self.client.transfercmd('list')
        self.assertNotIsInstance(sock, ssl.SSLSocket)
        sock.close()
        self.assertEqual(self.client.voidresp(), "226 transfer complete")
test_nntplib.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_starttls(self):
        file = self.server.file
        sock = self.server.sock
        try:
            self.server.starttls()
        except nntplib.NNTPPermanentError:
            self.skipTest("STARTTLS not supported by server.")
        else:
            # Check that the socket and internal pseudo-file really were
            # changed.
            self.assertNotEqual(file, self.server.file)
            self.assertNotEqual(sock, self.server.sock)
            # Check that the new socket really is an SSL one
            self.assertIsInstance(self.server.sock, ssl.SSLSocket)
            # Check that trying starttls when it's already active fails.
            self.assertRaises(ValueError, self.server.starttls)
test_poplib.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_context(self):
        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
        self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host,
                            self.server.port, keyfile=CERTFILE, context=ctx)
        self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host,
                            self.server.port, certfile=CERTFILE, context=ctx)
        self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host,
                            self.server.port, keyfile=CERTFILE,
                            certfile=CERTFILE, context=ctx)

        self.client.quit()
        self.client = poplib.POP3_SSL(self.server.host, self.server.port,
                                        context=ctx)
        self.assertIsInstance(self.client.sock, ssl.SSLSocket)
        self.assertIs(self.client.sock.context, ctx)
        self.assertTrue(self.client.noop().startswith(b'+OK'))
test_ftplib.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_data_connection(self):
        # clear text
        with self.client.transfercmd('list') as sock:
            self.assertNotIsInstance(sock, ssl.SSLSocket)
        self.assertEqual(self.client.voidresp(), "226 transfer complete")

        # secured, after PROT P
        self.client.prot_p()
        with self.client.transfercmd('list') as sock:
            self.assertIsInstance(sock, ssl.SSLSocket)
        self.assertEqual(self.client.voidresp(), "226 transfer complete")

        # PROT C is issued, the connection must be in cleartext again
        self.client.prot_c()
        with self.client.transfercmd('list') as sock:
            self.assertNotIsInstance(sock, ssl.SSLSocket)
        self.assertEqual(self.client.voidresp(), "226 transfer complete")
test_ftplib.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_context(self):
        self.client.quit()
        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
        self.assertRaises(ValueError, ftplib.FTP_TLS, keyfile=CERTFILE,
                          context=ctx)
        self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE,
                          context=ctx)
        self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE,
                          keyfile=CERTFILE, context=ctx)

        self.client = ftplib.FTP_TLS(context=ctx, timeout=TIMEOUT)
        self.client.connect(self.server.host, self.server.port)
        self.assertNotIsInstance(self.client.sock, ssl.SSLSocket)
        self.client.auth()
        self.assertIs(self.client.sock.context, ctx)
        self.assertIsInstance(self.client.sock, ssl.SSLSocket)

        self.client.prot_p()
        with self.client.transfercmd('list') as sock:
            self.assertIs(sock.context, ctx)
            self.assertIsInstance(sock, ssl.SSLSocket)
ftplib.py 文件源码 项目:ndk-python 作者: gittor 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def retrlines(self, cmd, callback = None):
            if callback is None: callback = print_line
            resp = self.sendcmd('TYPE A')
            conn = self.transfercmd(cmd)
            fp = conn.makefile('rb')
            try:
                while 1:
                    line = fp.readline()
                    if self.debugging > 2: print '*retr*', repr(line)
                    if not line:
                        break
                    if line[-2:] == CRLF:
                        line = line[:-2]
                    elif line[-1:] == '\n':
                        line = line[:-1]
                    callback(line)
                # shutdown ssl layer
                if isinstance(conn, ssl.SSLSocket):
                    conn.unwrap()
            finally:
                fp.close()
                conn.close()
            return self.voidresp()
test_ftplib.py 文件源码 项目:ndk-python 作者: gittor 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_data_connection(self):
        # clear text
        sock = self.client.transfercmd('list')
        self.assertNotIsInstance(sock, ssl.SSLSocket)
        sock.close()
        self.assertEqual(self.client.voidresp(), "226 transfer complete")

        # secured, after PROT P
        self.client.prot_p()
        sock = self.client.transfercmd('list')
        self.assertIsInstance(sock, ssl.SSLSocket)
        sock.close()
        self.assertEqual(self.client.voidresp(), "226 transfer complete")

        # PROT C is issued, the connection must be in cleartext again
        self.client.prot_c()
        sock = self.client.transfercmd('list')
        self.assertNotIsInstance(sock, ssl.SSLSocket)
        sock.close()
        self.assertEqual(self.client.voidresp(), "226 transfer complete")
netutil.py 文件源码 项目:get_started_with_respeaker 作者: respeaker 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def ssl_wrap_socket(socket, ssl_options, server_hostname=None, **kwargs):
    """Returns an ``ssl.SSLSocket`` wrapping the given socket.

    ``ssl_options`` may be either a dictionary (as accepted by
    `ssl_options_to_context`) or an `ssl.SSLContext` object.
    Additional keyword arguments are passed to ``wrap_socket``
    (either the `~ssl.SSLContext` method or the `ssl` module function
    as appropriate).
    """
    context = ssl_options_to_context(ssl_options)
    if hasattr(ssl, 'SSLContext') and isinstance(context, ssl.SSLContext):
        if server_hostname is not None and getattr(ssl, 'HAS_SNI'):
            # Python doesn't have server-side SNI support so we can't
            # really unittest this, but it can be manually tested with
            # python3.2 -m tornado.httpclient https://sni.velox.ch
            return context.wrap_socket(socket, server_hostname=server_hostname,
                                       **kwargs)
        else:
            return context.wrap_socket(socket, **kwargs)
    else:
        return ssl.wrap_socket(socket, **dict(context, **kwargs))
test_functional_ssl.py 文件源码 项目:sdk-samples 作者: cradlepoint 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_prot(self):
        self.client.login(secure=False)
        msg = "503 PROT not allowed on insecure control connection."
        self.assertRaisesWithMsg(ftplib.error_perm, msg,
                                 self.client.sendcmd, 'prot p')
        self.client.login(secure=True)
        # secured
        self.client.prot_p()
        sock = self.client.transfercmd('list')
        with contextlib.closing(sock):
            while True:
                if not sock.recv(1024):
                    self.client.voidresp()
                    break
            self.assertTrue(isinstance(sock, ssl.SSLSocket))
            # unsecured
            self.client.prot_c()
        sock = self.client.transfercmd('list')
        with contextlib.closing(sock):
            while True:
                if not sock.recv(1024):
                    self.client.voidresp()
                    break
            self.assertFalse(isinstance(sock, ssl.SSLSocket))
ftplib.py 文件源码 项目:empyrion-python-api 作者: huhlig 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def retrlines(self, cmd, callback = None):
            if callback is None: callback = print_line
            resp = self.sendcmd('TYPE A')
            conn = self.transfercmd(cmd)
            fp = conn.makefile('rb')
            try:
                while 1:
                    line = fp.readline(self.maxline + 1)
                    if len(line) > self.maxline:
                        raise Error("got more than %d bytes" % self.maxline)
                    if self.debugging > 2: print '*retr*', repr(line)
                    if not line:
                        break
                    if line[-2:] == CRLF:
                        line = line[:-2]
                    elif line[-1:] == '\n':
                        line = line[:-1]
                    callback(line)
                # shutdown ssl layer
                if isinstance(conn, ssl.SSLSocket):
                    conn.unwrap()
            finally:
                fp.close()
                conn.close()
            return self.voidresp()
netutil.py 文件源码 项目:teleport 作者: eomsoft 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def ssl_wrap_socket(socket, ssl_options, server_hostname=None, **kwargs):
    """Returns an ``ssl.SSLSocket`` wrapping the given socket.

    ``ssl_options`` may be either an `ssl.SSLContext` object or a
    dictionary (as accepted by `ssl_options_to_context`).  Additional
    keyword arguments are passed to ``wrap_socket`` (either the
    `~ssl.SSLContext` method or the `ssl` module function as
    appropriate).
    """
    context = ssl_options_to_context(ssl_options)
    if hasattr(ssl, 'SSLContext') and isinstance(context, ssl.SSLContext):
        if server_hostname is not None and getattr(ssl, 'HAS_SNI'):
            # Python doesn't have server-side SNI support so we can't
            # really unittest this, but it can be manually tested with
            # python3.2 -m tornado.httpclient https://sni.velox.ch
            return context.wrap_socket(socket, server_hostname=server_hostname,
                                       **kwargs)
        else:
            return context.wrap_socket(socket, **kwargs)
    else:
        return ssl.wrap_socket(socket, **dict(context, **kwargs))
netutil.py 文件源码 项目:projects-2017-2 作者: ncss 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def ssl_wrap_socket(socket, ssl_options, server_hostname=None, **kwargs):
    """Returns an ``ssl.SSLSocket`` wrapping the given socket.

    ``ssl_options`` may be either an `ssl.SSLContext` object or a
    dictionary (as accepted by `ssl_options_to_context`).  Additional
    keyword arguments are passed to ``wrap_socket`` (either the
    `~ssl.SSLContext` method or the `ssl` module function as
    appropriate).
    """
    context = ssl_options_to_context(ssl_options)
    if hasattr(ssl, 'SSLContext') and isinstance(context, ssl.SSLContext):
        if server_hostname is not None and getattr(ssl, 'HAS_SNI'):
            # Python doesn't have server-side SNI support so we can't
            # really unittest this, but it can be manually tested with
            # python3.2 -m tornado.httpclient https://sni.velox.ch
            return context.wrap_socket(socket, server_hostname=server_hostname,
                                       **kwargs)
        else:
            return context.wrap_socket(socket, **kwargs)
    else:
        return ssl.wrap_socket(socket, **dict(context, **kwargs))
netutil.py 文件源码 项目:aweasome_learning 作者: Knight-ZXW 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def ssl_wrap_socket(socket, ssl_options, server_hostname=None, **kwargs):
    """Returns an ``ssl.SSLSocket`` wrapping the given socket.

    ``ssl_options`` may be either an `ssl.SSLContext` object or a
    dictionary (as accepted by `ssl_options_to_context`).  Additional
    keyword arguments are passed to ``wrap_socket`` (either the
    `~ssl.SSLContext` method or the `ssl` module function as
    appropriate).
    """
    context = ssl_options_to_context(ssl_options)
    if hasattr(ssl, 'SSLContext') and isinstance(context, ssl.SSLContext):
        if server_hostname is not None and getattr(ssl, 'HAS_SNI'):
            # Python doesn't have server-side SNI support so we can't
            # really unittest this, but it can be manually tested with
            # python3.2 -m tornado.httpclient https://sni.velox.ch
            return context.wrap_socket(socket, server_hostname=server_hostname,
                                       **kwargs)
        else:
            return context.wrap_socket(socket, **kwargs)
    else:
        return ssl.wrap_socket(socket, **dict(context, **kwargs))  # type: ignore
netutil.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def ssl_wrap_socket(socket, ssl_options, server_hostname=None, **kwargs):
    """Returns an ``ssl.SSLSocket`` wrapping the given socket.

    ``ssl_options`` may be either an `ssl.SSLContext` object or a
    dictionary (as accepted by `ssl_options_to_context`).  Additional
    keyword arguments are passed to ``wrap_socket`` (either the
    `~ssl.SSLContext` method or the `ssl` module function as
    appropriate).
    """
    context = ssl_options_to_context(ssl_options)
    if hasattr(ssl, 'SSLContext') and isinstance(context, ssl.SSLContext):
        if server_hostname is not None and getattr(ssl, 'HAS_SNI'):
            # Python doesn't have server-side SNI support so we can't
            # really unittest this, but it can be manually tested with
            # python3.2 -m tornado.httpclient https://sni.velox.ch
            return context.wrap_socket(socket, server_hostname=server_hostname,
                                       **kwargs)
        else:
            return context.wrap_socket(socket, **kwargs)
    else:
        return ssl.wrap_socket(socket, **dict(context, **kwargs))  # type: ignore
test_nntplib.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_starttls(self):
        file = self.server.file
        sock = self.server.sock
        try:
            self.server.starttls()
        except nntplib.NNTPPermanentError:
            self.skipTest("STARTTLS not supported by server.")
        else:
            # Check that the socket and internal pseudo-file really were
            # changed.
            self.assertNotEqual(file, self.server.file)
            self.assertNotEqual(sock, self.server.sock)
            # Check that the new socket really is an SSL one
            self.assertIsInstance(self.server.sock, ssl.SSLSocket)
            # Check that trying starttls when it's already active fails.
            self.assertRaises(ValueError, self.server.starttls)
test_poplib.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_context(self):
        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
        self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host,
                            self.server.port, keyfile=CERTFILE, context=ctx)
        self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host,
                            self.server.port, certfile=CERTFILE, context=ctx)
        self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host,
                            self.server.port, keyfile=CERTFILE,
                            certfile=CERTFILE, context=ctx)

        self.client.quit()
        self.client = poplib.POP3_SSL(self.server.host, self.server.port,
                                        context=ctx)
        self.assertIsInstance(self.client.sock, ssl.SSLSocket)
        self.assertIs(self.client.sock.context, ctx)
        self.assertTrue(self.client.noop().startswith(b'+OK'))
test_ftplib.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_data_connection(self):
        # clear text
        with self.client.transfercmd('list') as sock:
            self.assertNotIsInstance(sock, ssl.SSLSocket)
        self.assertEqual(self.client.voidresp(), "226 transfer complete")

        # secured, after PROT P
        self.client.prot_p()
        with self.client.transfercmd('list') as sock:
            self.assertIsInstance(sock, ssl.SSLSocket)
        self.assertEqual(self.client.voidresp(), "226 transfer complete")

        # PROT C is issued, the connection must be in cleartext again
        self.client.prot_c()
        with self.client.transfercmd('list') as sock:
            self.assertNotIsInstance(sock, ssl.SSLSocket)
        self.assertEqual(self.client.voidresp(), "226 transfer complete")
test_ftplib.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_context(self):
        self.client.quit()
        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
        self.assertRaises(ValueError, ftplib.FTP_TLS, keyfile=CERTFILE,
                          context=ctx)
        self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE,
                          context=ctx)
        self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE,
                          keyfile=CERTFILE, context=ctx)

        self.client = ftplib.FTP_TLS(context=ctx, timeout=TIMEOUT)
        self.client.connect(self.server.host, self.server.port)
        self.assertNotIsInstance(self.client.sock, ssl.SSLSocket)
        self.client.auth()
        self.assertIs(self.client.sock.context, ctx)
        self.assertIsInstance(self.client.sock, ssl.SSLSocket)

        self.client.prot_p()
        with self.client.transfercmd('list') as sock:
            self.assertIs(sock.context, ctx)
            self.assertIsInstance(sock, ssl.SSLSocket)


问题


面经


文章

微信
公众号

扫码关注公众号