def retrlines(self, cmd, callback = None):
if callback is None: callback = print_line
resp = self.sendcmd('TYPE A')
conn = self.transfercmd(cmd)
fp = conn.makefile('rb')
try:
while 1:
line = fp.readline(self.maxline + 1)
if len(line) > self.maxline:
raise Error("got more than %d bytes" % self.maxline)
if self.debugging > 2: print '*retr*', repr(line)
if not line:
break
if line[-2:] == CRLF:
line = line[:-2]
elif line[-1:] == '\n':
line = line[:-1]
callback(line)
# shutdown ssl layer
if isinstance(conn, ssl.SSLSocket):
conn.unwrap()
finally:
fp.close()
conn.close()
return self.voidresp()
python类SSLSocket()的实例源码
def retrlines(self, cmd, callback = None):
if callback is None: callback = print_line
resp = self.sendcmd('TYPE A')
conn = self.transfercmd(cmd)
fp = conn.makefile('rb')
try:
while 1:
line = fp.readline(self.maxline + 1)
if len(line) > self.maxline:
raise Error("got more than %d bytes" % self.maxline)
if self.debugging > 2: print '*retr*', repr(line)
if not line:
break
if line[-2:] == CRLF:
line = line[:-2]
elif line[-1:] == '\n':
line = line[:-1]
callback(line)
# shutdown ssl layer
if isinstance(conn, ssl.SSLSocket):
conn.unwrap()
finally:
fp.close()
conn.close()
return self.voidresp()
def test_data_connection(self):
# clear text
sock = self.client.transfercmd('list')
self.assertNotIsInstance(sock, ssl.SSLSocket)
sock.close()
self.assertEqual(self.client.voidresp(), "226 transfer complete")
# secured, after PROT P
self.client.prot_p()
sock = self.client.transfercmd('list')
self.assertIsInstance(sock, ssl.SSLSocket)
sock.close()
self.assertEqual(self.client.voidresp(), "226 transfer complete")
# PROT C is issued, the connection must be in cleartext again
self.client.prot_c()
sock = self.client.transfercmd('list')
self.assertNotIsInstance(sock, ssl.SSLSocket)
sock.close()
self.assertEqual(self.client.voidresp(), "226 transfer complete")
def test_starttls(self):
file = self.server.file
sock = self.server.sock
try:
self.server.starttls()
except nntplib.NNTPPermanentError:
self.skipTest("STARTTLS not supported by server.")
else:
# Check that the socket and internal pseudo-file really were
# changed.
self.assertNotEqual(file, self.server.file)
self.assertNotEqual(sock, self.server.sock)
# Check that the new socket really is an SSL one
self.assertIsInstance(self.server.sock, ssl.SSLSocket)
# Check that trying starttls when it's already active fails.
self.assertRaises(ValueError, self.server.starttls)
def test_context(self):
ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host,
self.server.port, keyfile=CERTFILE, context=ctx)
self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host,
self.server.port, certfile=CERTFILE, context=ctx)
self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host,
self.server.port, keyfile=CERTFILE,
certfile=CERTFILE, context=ctx)
self.client.quit()
self.client = poplib.POP3_SSL(self.server.host, self.server.port,
context=ctx)
self.assertIsInstance(self.client.sock, ssl.SSLSocket)
self.assertIs(self.client.sock.context, ctx)
self.assertTrue(self.client.noop().startswith(b'+OK'))
def test_data_connection(self):
# clear text
with self.client.transfercmd('list') as sock:
self.assertNotIsInstance(sock, ssl.SSLSocket)
self.assertEqual(self.client.voidresp(), "226 transfer complete")
# secured, after PROT P
self.client.prot_p()
with self.client.transfercmd('list') as sock:
self.assertIsInstance(sock, ssl.SSLSocket)
self.assertEqual(self.client.voidresp(), "226 transfer complete")
# PROT C is issued, the connection must be in cleartext again
self.client.prot_c()
with self.client.transfercmd('list') as sock:
self.assertNotIsInstance(sock, ssl.SSLSocket)
self.assertEqual(self.client.voidresp(), "226 transfer complete")
def test_context(self):
self.client.quit()
ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
self.assertRaises(ValueError, ftplib.FTP_TLS, keyfile=CERTFILE,
context=ctx)
self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE,
context=ctx)
self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE,
keyfile=CERTFILE, context=ctx)
self.client = ftplib.FTP_TLS(context=ctx, timeout=TIMEOUT)
self.client.connect(self.server.host, self.server.port)
self.assertNotIsInstance(self.client.sock, ssl.SSLSocket)
self.client.auth()
self.assertIs(self.client.sock.context, ctx)
self.assertIsInstance(self.client.sock, ssl.SSLSocket)
self.client.prot_p()
with self.client.transfercmd('list') as sock:
self.assertIs(sock.context, ctx)
self.assertIsInstance(sock, ssl.SSLSocket)
def retrlines(self, cmd, callback = None):
if callback is None: callback = print_line
resp = self.sendcmd('TYPE A')
conn = self.transfercmd(cmd)
fp = conn.makefile('rb')
try:
while 1:
line = fp.readline()
if self.debugging > 2: print '*retr*', repr(line)
if not line:
break
if line[-2:] == CRLF:
line = line[:-2]
elif line[-1:] == '\n':
line = line[:-1]
callback(line)
# shutdown ssl layer
if isinstance(conn, ssl.SSLSocket):
conn.unwrap()
finally:
fp.close()
conn.close()
return self.voidresp()
def test_data_connection(self):
# clear text
sock = self.client.transfercmd('list')
self.assertNotIsInstance(sock, ssl.SSLSocket)
sock.close()
self.assertEqual(self.client.voidresp(), "226 transfer complete")
# secured, after PROT P
self.client.prot_p()
sock = self.client.transfercmd('list')
self.assertIsInstance(sock, ssl.SSLSocket)
sock.close()
self.assertEqual(self.client.voidresp(), "226 transfer complete")
# PROT C is issued, the connection must be in cleartext again
self.client.prot_c()
sock = self.client.transfercmd('list')
self.assertNotIsInstance(sock, ssl.SSLSocket)
sock.close()
self.assertEqual(self.client.voidresp(), "226 transfer complete")
def ssl_wrap_socket(socket, ssl_options, server_hostname=None, **kwargs):
"""Returns an ``ssl.SSLSocket`` wrapping the given socket.
``ssl_options`` may be either a dictionary (as accepted by
`ssl_options_to_context`) or an `ssl.SSLContext` object.
Additional keyword arguments are passed to ``wrap_socket``
(either the `~ssl.SSLContext` method or the `ssl` module function
as appropriate).
"""
context = ssl_options_to_context(ssl_options)
if hasattr(ssl, 'SSLContext') and isinstance(context, ssl.SSLContext):
if server_hostname is not None and getattr(ssl, 'HAS_SNI'):
# Python doesn't have server-side SNI support so we can't
# really unittest this, but it can be manually tested with
# python3.2 -m tornado.httpclient https://sni.velox.ch
return context.wrap_socket(socket, server_hostname=server_hostname,
**kwargs)
else:
return context.wrap_socket(socket, **kwargs)
else:
return ssl.wrap_socket(socket, **dict(context, **kwargs))
def test_prot(self):
self.client.login(secure=False)
msg = "503 PROT not allowed on insecure control connection."
self.assertRaisesWithMsg(ftplib.error_perm, msg,
self.client.sendcmd, 'prot p')
self.client.login(secure=True)
# secured
self.client.prot_p()
sock = self.client.transfercmd('list')
with contextlib.closing(sock):
while True:
if not sock.recv(1024):
self.client.voidresp()
break
self.assertTrue(isinstance(sock, ssl.SSLSocket))
# unsecured
self.client.prot_c()
sock = self.client.transfercmd('list')
with contextlib.closing(sock):
while True:
if not sock.recv(1024):
self.client.voidresp()
break
self.assertFalse(isinstance(sock, ssl.SSLSocket))
def retrlines(self, cmd, callback = None):
if callback is None: callback = print_line
resp = self.sendcmd('TYPE A')
conn = self.transfercmd(cmd)
fp = conn.makefile('rb')
try:
while 1:
line = fp.readline(self.maxline + 1)
if len(line) > self.maxline:
raise Error("got more than %d bytes" % self.maxline)
if self.debugging > 2: print '*retr*', repr(line)
if not line:
break
if line[-2:] == CRLF:
line = line[:-2]
elif line[-1:] == '\n':
line = line[:-1]
callback(line)
# shutdown ssl layer
if isinstance(conn, ssl.SSLSocket):
conn.unwrap()
finally:
fp.close()
conn.close()
return self.voidresp()
def ssl_wrap_socket(socket, ssl_options, server_hostname=None, **kwargs):
"""Returns an ``ssl.SSLSocket`` wrapping the given socket.
``ssl_options`` may be either an `ssl.SSLContext` object or a
dictionary (as accepted by `ssl_options_to_context`). Additional
keyword arguments are passed to ``wrap_socket`` (either the
`~ssl.SSLContext` method or the `ssl` module function as
appropriate).
"""
context = ssl_options_to_context(ssl_options)
if hasattr(ssl, 'SSLContext') and isinstance(context, ssl.SSLContext):
if server_hostname is not None and getattr(ssl, 'HAS_SNI'):
# Python doesn't have server-side SNI support so we can't
# really unittest this, but it can be manually tested with
# python3.2 -m tornado.httpclient https://sni.velox.ch
return context.wrap_socket(socket, server_hostname=server_hostname,
**kwargs)
else:
return context.wrap_socket(socket, **kwargs)
else:
return ssl.wrap_socket(socket, **dict(context, **kwargs))
def ssl_wrap_socket(socket, ssl_options, server_hostname=None, **kwargs):
"""Returns an ``ssl.SSLSocket`` wrapping the given socket.
``ssl_options`` may be either an `ssl.SSLContext` object or a
dictionary (as accepted by `ssl_options_to_context`). Additional
keyword arguments are passed to ``wrap_socket`` (either the
`~ssl.SSLContext` method or the `ssl` module function as
appropriate).
"""
context = ssl_options_to_context(ssl_options)
if hasattr(ssl, 'SSLContext') and isinstance(context, ssl.SSLContext):
if server_hostname is not None and getattr(ssl, 'HAS_SNI'):
# Python doesn't have server-side SNI support so we can't
# really unittest this, but it can be manually tested with
# python3.2 -m tornado.httpclient https://sni.velox.ch
return context.wrap_socket(socket, server_hostname=server_hostname,
**kwargs)
else:
return context.wrap_socket(socket, **kwargs)
else:
return ssl.wrap_socket(socket, **dict(context, **kwargs))
def ssl_wrap_socket(socket, ssl_options, server_hostname=None, **kwargs):
"""Returns an ``ssl.SSLSocket`` wrapping the given socket.
``ssl_options`` may be either an `ssl.SSLContext` object or a
dictionary (as accepted by `ssl_options_to_context`). Additional
keyword arguments are passed to ``wrap_socket`` (either the
`~ssl.SSLContext` method or the `ssl` module function as
appropriate).
"""
context = ssl_options_to_context(ssl_options)
if hasattr(ssl, 'SSLContext') and isinstance(context, ssl.SSLContext):
if server_hostname is not None and getattr(ssl, 'HAS_SNI'):
# Python doesn't have server-side SNI support so we can't
# really unittest this, but it can be manually tested with
# python3.2 -m tornado.httpclient https://sni.velox.ch
return context.wrap_socket(socket, server_hostname=server_hostname,
**kwargs)
else:
return context.wrap_socket(socket, **kwargs)
else:
return ssl.wrap_socket(socket, **dict(context, **kwargs)) # type: ignore
def ssl_wrap_socket(socket, ssl_options, server_hostname=None, **kwargs):
"""Returns an ``ssl.SSLSocket`` wrapping the given socket.
``ssl_options`` may be either an `ssl.SSLContext` object or a
dictionary (as accepted by `ssl_options_to_context`). Additional
keyword arguments are passed to ``wrap_socket`` (either the
`~ssl.SSLContext` method or the `ssl` module function as
appropriate).
"""
context = ssl_options_to_context(ssl_options)
if hasattr(ssl, 'SSLContext') and isinstance(context, ssl.SSLContext):
if server_hostname is not None and getattr(ssl, 'HAS_SNI'):
# Python doesn't have server-side SNI support so we can't
# really unittest this, but it can be manually tested with
# python3.2 -m tornado.httpclient https://sni.velox.ch
return context.wrap_socket(socket, server_hostname=server_hostname,
**kwargs)
else:
return context.wrap_socket(socket, **kwargs)
else:
return ssl.wrap_socket(socket, **dict(context, **kwargs)) # type: ignore
def test_starttls(self):
file = self.server.file
sock = self.server.sock
try:
self.server.starttls()
except nntplib.NNTPPermanentError:
self.skipTest("STARTTLS not supported by server.")
else:
# Check that the socket and internal pseudo-file really were
# changed.
self.assertNotEqual(file, self.server.file)
self.assertNotEqual(sock, self.server.sock)
# Check that the new socket really is an SSL one
self.assertIsInstance(self.server.sock, ssl.SSLSocket)
# Check that trying starttls when it's already active fails.
self.assertRaises(ValueError, self.server.starttls)
def test_context(self):
ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host,
self.server.port, keyfile=CERTFILE, context=ctx)
self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host,
self.server.port, certfile=CERTFILE, context=ctx)
self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host,
self.server.port, keyfile=CERTFILE,
certfile=CERTFILE, context=ctx)
self.client.quit()
self.client = poplib.POP3_SSL(self.server.host, self.server.port,
context=ctx)
self.assertIsInstance(self.client.sock, ssl.SSLSocket)
self.assertIs(self.client.sock.context, ctx)
self.assertTrue(self.client.noop().startswith(b'+OK'))
def test_data_connection(self):
# clear text
with self.client.transfercmd('list') as sock:
self.assertNotIsInstance(sock, ssl.SSLSocket)
self.assertEqual(self.client.voidresp(), "226 transfer complete")
# secured, after PROT P
self.client.prot_p()
with self.client.transfercmd('list') as sock:
self.assertIsInstance(sock, ssl.SSLSocket)
self.assertEqual(self.client.voidresp(), "226 transfer complete")
# PROT C is issued, the connection must be in cleartext again
self.client.prot_c()
with self.client.transfercmd('list') as sock:
self.assertNotIsInstance(sock, ssl.SSLSocket)
self.assertEqual(self.client.voidresp(), "226 transfer complete")
def test_context(self):
self.client.quit()
ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
self.assertRaises(ValueError, ftplib.FTP_TLS, keyfile=CERTFILE,
context=ctx)
self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE,
context=ctx)
self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE,
keyfile=CERTFILE, context=ctx)
self.client = ftplib.FTP_TLS(context=ctx, timeout=TIMEOUT)
self.client.connect(self.server.host, self.server.port)
self.assertNotIsInstance(self.client.sock, ssl.SSLSocket)
self.client.auth()
self.assertIs(self.client.sock.context, ctx)
self.assertIsInstance(self.client.sock, ssl.SSLSocket)
self.client.prot_p()
with self.client.transfercmd('list') as sock:
self.assertIs(sock.context, ctx)
self.assertIsInstance(sock, ssl.SSLSocket)