def __init__(self, target):
# Target comes as protocol://target:port
self.target = target
proto, host, port = target.split(':')
host = host[2:]
if int(port) == 993 or proto.upper() == 'IMAPS':
self.session = imaplib.IMAP4_SSL(host,int(port))
else:
#assume non-ssl IMAP
self.session = imaplib.IMAP4(host,port)
if 'AUTH=NTLM' not in self.session.capabilities:
logging.error('IMAP server does not support NTLM authentication!')
return False
self.authtag = self.session._new_tag()
self.lastresult = None
python类IMAP4的实例源码
def __init__(self, *args, **kwargs): # {{{2
if settings.IMAP_SSL:
self.mailbox = IMAP4_SSL(
host = settings.IMAP_SERVER, port = settings.IMAP_PORT)
else:
self.mailbox = IMAP4( settings.IMAP_SERVER )
self.mailbox.login( settings.IMAP_LOGIN, settings.IMAP_PASSWD )
self.mailbox.select()
super( Command, self ).__init__( *args, **kwargs )
def test_issue5949(self):
class EOFHandler(socketserver.StreamRequestHandler):
def handle(self):
# EOF without sending a complete welcome message.
self.wfile.write(b'* OK')
with self.reaped_server(EOFHandler) as server:
self.assertRaises(imaplib.IMAP4.abort,
self.imap_class, *server.server_address)
def test_line_termination(self):
class BadNewlineHandler(SimpleIMAPHandler):
def cmd_CAPABILITY(self, tag, args):
self._send(b'* CAPABILITY IMAP4rev1 AUTH\n')
self._send_tagged(tag, 'OK', 'CAPABILITY completed')
with self.reaped_server(BadNewlineHandler) as server:
self.assertRaises(imaplib.IMAP4.abort,
self.imap_class, *server.server_address)
def test_bad_auth_name(self):
class MyServer(SimpleIMAPHandler):
def cmd_AUTHENTICATE(self, tag, args):
self._send_tagged(tag, 'NO', 'unrecognized authentication '
'type {}'.format(args[0]))
with self.reaped_pair(MyServer) as (server, client):
with self.assertRaises(imaplib.IMAP4.error):
client.authenticate('METHOD', lambda: 1)
def test_invalid_authentication(self):
class MyServer(SimpleIMAPHandler):
def cmd_AUTHENTICATE(self, tag, args):
self._send_textline('+')
self.response = yield
self._send_tagged(tag, 'NO', '[AUTHENTICATIONFAILED] invalid')
with self.reaped_pair(MyServer) as (server, client):
with self.assertRaises(imaplib.IMAP4.error):
code, data = client.authenticate('MYAUTH', lambda x: b'fake')
def test_linetoolong(self):
class TooLongHandler(SimpleIMAPHandler):
def handle(self):
# Send a very long response line
self.wfile.write(b'* OK ' + imaplib._MAXLINE*b'x' + b'\r\n')
with self.reaped_server(TooLongHandler) as server:
self.assertRaises(imaplib.IMAP4.error,
self.imap_class, *server.server_address)
11_04_email_facade.py 文件源码
项目:Python_Master-the-Art-of-Design-Patterns
作者: PacktPublishing
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def get_inbox(self):
mailbox = imaplib.IMAP4(self.host)
mailbox.login(bytes(self.username, 'utf8'),
bytes(self.password, 'utf8'))
mailbox.select()
x, data = mailbox.search(None, 'ALL')
messages = []
for num in data[0].split():
x, message = mailbox.fetch(num, '(RFC822)')
messages.append(message[0][1])
return messages
def test_issue5949(self):
class EOFHandler(SocketServer.StreamRequestHandler):
def handle(self):
# EOF without sending a complete welcome message.
self.wfile.write('* OK')
with self.reaped_server(EOFHandler) as server:
self.assertRaises(imaplib.IMAP4.abort,
self.imap_class, *server.server_address)
def test_linetoolong(self):
class TooLongHandler(SimpleIMAPHandler):
def handle(self):
# Send a very long response line
self.wfile.write('* OK ' + imaplib._MAXLINE*'x' + '\r\n')
with self.reaped_server(TooLongHandler) as server:
self.assertRaises(imaplib.IMAP4.error,
self.imap_class, *server.server_address)
def test_issue5949(self):
class EOFHandler(socketserver.StreamRequestHandler):
def handle(self):
# EOF without sending a complete welcome message.
self.wfile.write(b'* OK')
with self.reaped_server(EOFHandler) as server:
self.assertRaises(imaplib.IMAP4.abort,
self.imap_class, *server.server_address)
def test_line_termination(self):
class BadNewlineHandler(SimpleIMAPHandler):
def cmd_CAPABILITY(self, tag, args):
self._send(b'* CAPABILITY IMAP4rev1 AUTH\n')
self._send_tagged(tag, 'OK', 'CAPABILITY completed')
with self.reaped_server(BadNewlineHandler) as server:
self.assertRaises(imaplib.IMAP4.abort,
self.imap_class, *server.server_address)
def test_bad_auth_name(self):
class MyServer(SimpleIMAPHandler):
def cmd_AUTHENTICATE(self, tag, args):
self._send_tagged(tag, 'NO', 'unrecognized authentication '
'type {}'.format(args[0]))
with self.reaped_pair(MyServer) as (server, client):
with self.assertRaises(imaplib.IMAP4.error):
client.authenticate('METHOD', lambda: 1)
def test_invalid_authentication(self):
class MyServer(SimpleIMAPHandler):
def cmd_AUTHENTICATE(self, tag, args):
self._send_textline('+')
self.response = yield
self._send_tagged(tag, 'NO', '[AUTHENTICATIONFAILED] invalid')
with self.reaped_pair(MyServer) as (server, client):
with self.assertRaises(imaplib.IMAP4.error):
code, data = client.authenticate('MYAUTH', lambda x: b'fake')
def test_issue5949(self):
class EOFHandler(SocketServer.StreamRequestHandler):
def handle(self):
# EOF without sending a complete welcome message.
self.wfile.write('* OK')
with self.reaped_server(EOFHandler) as server:
self.assertRaises(imaplib.IMAP4.abort,
self.imap_class, *server.server_address)
def resetConnection(self):
parsed_url = self.url
try:
imap_server = os.environ['IMAP_SERVER']
except KeyError:
imap_server = parsed_url.hostname
# Try to close the connection cleanly
try:
self.conn.close()
except Exception:
pass
if (parsed_url.scheme == "imap"):
cl = imaplib.IMAP4
self.conn = cl(imap_server, 143)
elif (parsed_url.scheme == "imaps"):
cl = imaplib.IMAP4_SSL
self.conn = cl(imap_server, 993)
log.Debug("Type of imap class: %s" % (cl.__name__))
self.remote_dir = re.sub(r'^/', r'', parsed_url.path, 1)
# Login
if (not(globals.imap_full_address)):
self.conn.login(self.username, self.password)
self.conn.select(globals.imap_mailbox)
log.Info("IMAP connected")
else:
self.conn.login(self.username + "@" + parsed_url.hostname, self.password)
self.conn.select(globals.imap_mailbox)
log.Info("IMAP connected")
def _put(self, source_path, remote_filename):
f = source_path.open("rb")
allowedTimeout = globals.timeout
if (allowedTimeout == 0):
# Allow a total timeout of 1 day
allowedTimeout = 2880
while allowedTimeout > 0:
try:
self.conn.select(remote_filename)
body = self.prepareBody(f, remote_filename)
# If we don't select the IMAP folder before
# append, the message goes into the INBOX.
self.conn.select(globals.imap_mailbox)
self.conn.append(globals.imap_mailbox, None, None, body)
break
except (imaplib.IMAP4.abort, socket.error, socket.sslerror):
allowedTimeout -= 1
log.Info("Error saving '%s', retrying in 30s " % remote_filename)
time.sleep(30)
while allowedTimeout > 0:
try:
self.resetConnection()
break
except (imaplib.IMAP4.abort, socket.error, socket.sslerror):
allowedTimeout -= 1
log.Info("Error reconnecting, retrying in 30s ")
time.sleep(30)
log.Info("IMAP mail with '%s' subject stored" % remote_filename)
def __init__(self, timeout, host='', port=993, keyfile=None, certfile=None):
self.keyfile = keyfile
self.certfile = certfile
self.timeout = timeout
self.sslobj = None
imaplib.IMAP4.__init__(self, host, port)
# pylint: disable=W0222
def open(self, host, port):
"""Setup connection to remote server on "host:port".
(default: localhost:standard IMAP4 SSL port).
This connection will be used by the routines:
read, readline, send, shutdown.
"""
self.host = host
self.port = port
self.sock = socket.create_connection((host, port))
self.sslobj = socket.ssl(self.sock, self.keyfile, self.certfile)
def socket(self):
"""Return socket instance used to connect to IMAP4 server.
socket = <instance>.socket()
"""
return self.sock