def init_server(self):
self.logger.info("Initializing SMTP/IMAP servers.")
# PySMS at minimum uses smtp server
try:
if self.ssl:
self.smtp = smtplib.SMTP_SSL(self.smtp_server, self.smtp_port)
else:
self.smtp = smtplib.SMTP(self.smtp_server, self.smtp_port)
self.smtp.starttls()
self.smtp.login(self.address, self.password)
except smtplib.SMTPException:
raise PySMSException("Unable to start smtp server, please check credentials.")
# If responding functionality is enabled
if self.imap_server:
try:
if self.ssl:
self.imap = imaplib.IMAP4_SSL(self.imap_server)
else:
self.imap = imaplib.IMAP4(self.imap_server)
r, data = self.imap.login(self.address, self.password)
if r == "OK":
r, data = self.imap.select(self.imap_mailbox)
if r != "OK":
raise PySMSException("Unable to select mailbox: {0}".format(self.imap_mailbox))
else:
raise PySMSException("Unable to login to IMAP server with given credentials.")
except imaplib.IMAP4.error:
raise PySMSException("Unable to start IMAP server, please check address and SSL/TLS settings.")
python类IMAP4的实例源码
def __init__(self, hostname, port=None, ssl=True):
self.hostname = hostname
self.port = port
kwargs = {}
if ssl:
self.transport = IMAP4_SSL
if not self.port:
self.port = 993
else:
self.transport = IMAP4
if not self.port:
self.port = 143
self.server = self.transport(self.hostname, self.port)
logger.debug("Created IMAP4 transport for {host}:{port}"
.format(host=self.hostname, port=self.port))
def connect(self):
"""Connects to and authenticates with an IMAP4 mail server"""
import imaplib
M = None
try:
if (self.keyfile and self.certfile) or self.ssl:
M = imaplib.IMAP4_SSL(self.host, self.port, self.keyfile, self.certfile)
else:
M = imaplib.IMAP4(self.host, self.port)
M.login(self.username, self.password)
M.select()
except socket.error, err:
raise
else:
return M
def __init__(self, host='', port=None, ssl=True, keyfile=None, certfile=None, ssl_context=None):
"""
:param host: host's name (default: localhost)
:param port: port number (default: standard IMAP4 SSL port)
:param ssl: use client class over SSL connection (IMAP4_SSL) if True, else use IMAP4
:param keyfile: PEM formatted file that contains your private key (default: None)
:param certfile: PEM formatted certificate chain file (default: None)
:param ssl_context: SSLContext object that contains your certificate chain and private key (default: None)
Note: if ssl_context is provided, then parameters keyfile or
certfile should not be set otherwise ValueError is raised.
"""
self._host = host
self._port = port
self._keyfile = keyfile
self._certfile = certfile
self._ssl_context = ssl_context
if ssl:
self.box = imaplib.IMAP4_SSL(
host, port or imaplib.IMAP4_SSL_PORT, keyfile, certfile, ssl_context)
else:
self.box = imaplib.IMAP4(host, port or imaplib.IMAP4_PORT)
self._username = None
self._password = None
self._initial_folder = None
self.folder = None
def test_aborted_authentication(self):
class MyServer(SimpleIMAPHandler):
def cmd_AUTHENTICATE(self, tag, args):
self._send_textline('+')
self.response = yield
if self.response == b'*\r\n':
self._send_tagged(tag, 'NO', '[AUTHENTICATIONFAILED] aborted')
else:
self._send_tagged(tag, 'OK', 'MYAUTH successful')
with self.reaped_pair(MyServer) as (server, client):
with self.assertRaises(imaplib.IMAP4.error):
code, data = client.authenticate('MYAUTH', lambda x: None)
def connect(self):
if self.source_id and self.source_id.id:
self.ensure_one()
if self.source_id.type == 'imap':
if self.source_id.is_ssl:
connection = IMAP4_SSL(self.source_id.server, int(self.source_id.port))
else:
connection = IMAP4(self.source_id.server, int(self.source_id.port))
connection.login(self.user, self.password)
elif self.type == 'pop':
if self.source_id.is_ssl:
connection = POP3_SSL(self.source_id.server, int(self.source_id.port))
else:
connection = POP3(self.source_id.server, int(self.source_id.port))
# TODO: use this to remove only unread messages
# connection.user("recent:"+server.user)
connection.user(self.user)
connection.pass_(self.password)
# Add timeout on socket
connection.sock.settimeout(MAIL_TIMEOUT)
return connection
return super(vieterp_fetchmail_server, self).connect()
def connect(self, mailserver, port="143"):
self.mailserver = mailserver
self.port = port
try:
self.srv = imaplib.IMAP4(self.mailserver)
except:
self.srv = None
pass
def validate(self, user, password):
if (not self.srv):
return
self.user = user
self.password = password
try:
self.srv.login(user, password)
except ssl.SSLError as e:
return False
except imaplib.IMAP4.error as e:
return False
return True
def run(self):
value = getword()
try:
print "-"*12
print "User:",user[:-1],"Password:",value
M = imaplib.IMAP4(ip)
M = login(user[:-1], value)
print "\t\nLogin successful:",user, value
M.close()
M.logout()
work.join()
sys.exit(2)
except(IMAP4.error, socket.gaierror, socket.error, socket.herror), msg:
print "An error occurred:", msg
pass
def run(self):
value = getword()
try:
print "-"*12
print "User:",user[:-1],"Password:",value
M = imaplib.IMAP4(ipaddr[0])
M = login(user[:-1], value)
print "\t\nLogin successful:",user, value
M.close()
M.logout()
work.join()
sys.exit(2)
except(IMAP4.error, socket.gaierror, socket.error, socket.herror), msg:
print "An error occurred:", msg
pass
def run(self):
value, user = getword()
try:
print "-"*12
print "User:",user,"Password:",value
M = imaplib.IMAP4(sys.argv[1])
M = login(user, value)
print "\t\nLogin successful:",user, value
M.close()
M.logout()
work.join()
sys.exit(2)
except(IMAP4.error, socket.gaierror, socket.error, socket.herror), msg:
print "An error occurred:", msg
pass
def __init__(self, host, port, timeout=None):
self._timeout = timeout
imaplib.IMAP4.__init__(self, host, port)
def run_mailbox(self, min_delay=5.0, max_delay=60.0):
mailbox = None
try:
while True:
item = yield idiokit.next()
while True:
delay = min(min_delay, max_delay)
while mailbox is None:
try:
mailbox = yield idiokit.thread(self.connect)
except (imaplib.IMAP4.abort, socket.error) as error:
self.log.error("Failed IMAP connection ({0})".format(utils.format_exception(error)))
else:
break
self.log.info("Retrying connection in {0:.2f} seconds".format(delay))
yield idiokit.sleep(delay)
delay = min(2 * delay, max_delay)
event, name, args, keys = item
if event.result().unsafe_is_set():
break
try:
method = getattr(mailbox, name)
result = yield idiokit.thread(method, *args, **keys)
except (imaplib.IMAP4.abort, socket.error) as error:
yield idiokit.thread(self.disconnect, mailbox)
self.log.error("Lost IMAP connection ({0})".format(utils.format_exception(error)))
mailbox = None
except imaplib.IMAP4.error as error:
event.fail(type(error), error, None)
break
else:
event.succeed(result)
break
finally:
if mailbox is not None:
yield idiokit.thread(self.disconnect, mailbox)
def connect(self):
self.log.info("Connecting to IMAP server {0!r} port {1}".format(
self.mail_server, self.mail_port))
if self.mail_disable_ssl:
mailbox = _IMAP4(
self.mail_server,
self.mail_port,
timeout=self.mail_connection_timeout
)
else:
mailbox = _IMAP4_SSL(
self.mail_server,
self.mail_port,
timeout=self.mail_connection_timeout,
ca_certs=self.mail_ca_certs
)
self.log.info("Logging in to IMAP server {0!r} port {1}".format(
self.mail_server, self.mail_port))
mailbox.login(self.mail_user, self.mail_password)
try:
status, msgs = mailbox.select(self.mail_box, readonly=False)
if status != "OK":
for msg in msgs:
raise imaplib.IMAP4.abort(msg)
except:
mailbox.logout()
raise
self.log.info("Logged in to IMAP server {0!r} port {1}".format(
self.mail_server, self.mail_port))
return mailbox
def disconnect(self, mailbox):
try:
mailbox.close()
except (imaplib.IMAP4.error, socket.error):
pass
try:
mailbox.logout()
except (imaplib.IMAP4.error, socket.error):
pass
def __init__(self, host, port, timeout=None):
self._timeout = timeout
imaplib.IMAP4.__init__(self, host, port)
def connect(self):
self.log.info("Connecting to IMAP server {0!r} port {1}".format(
self.mail_server, self.mail_port))
if self.mail_disable_ssl:
mailbox = _IMAP4(
self.mail_server,
self.mail_port,
timeout=self.mail_connection_timeout
)
else:
mailbox = _IMAP4_SSL(
self.mail_server,
self.mail_port,
timeout=self.mail_connection_timeout,
ca_certs=self.mail_ca_certs
)
self.log.info("Logging in to IMAP server {0!r} port {1}".format(
self.mail_server, self.mail_port))
mailbox.login(self.mail_user, self.mail_password)
try:
status, msgs = mailbox.select(self.mail_box, readonly=False)
if status != "OK":
for msg in msgs:
raise imaplib.IMAP4.abort(msg)
except:
mailbox.logout()
raise
self.log.info("Logged in to IMAP server {0!r} port {1}".format(
self.mail_server, self.mail_port))
return mailbox
def disconnect(self, mailbox):
try:
mailbox.close()
except (imaplib.IMAP4.error, socket.error):
pass
try:
mailbox.logout()
except (imaplib.IMAP4.error, socket.error):
pass
def compute_state(self, key):
if not key.username or not key.password:
self.warn('Username and password are not configured')
return None
if key.use_ssl:
mail = IMAP4_SSL(key.server, key.port)
else:
mail = IMAP4(key.server, key.port)
mail.login(key.username, key.password)
rc, message = mail.status(key.folder, '(UNSEEN)')
unread_str = message[0].decode('utf-8')
unread_count = int(re.search('UNSEEN (\d+)', unread_str).group(1))
return unread_count
def __init__(self, target):
# Target comes as protocol://target:port
self.target = target
proto, host, port = target.split(':')
host = host[2:]
if int(port) == 993 or proto.upper() == 'IMAPS':
self.session = imaplib.IMAP4_SSL(host,int(port))
else:
#assume non-ssl IMAP
self.session = imaplib.IMAP4(host,port)
if 'AUTH=NTLM' not in self.session.capabilities:
logging.error('IMAP server does not support NTLM authentication!')
return False
self.authtag = self.session._new_tag()
self.lastresult = None
def is_authenticated(self, user, password):
host = ""
if self.configuration.has_option("auth", "imap_host"):
host = self.configuration.get("auth", "imap_host")
secure = True
if self.configuration.has_option("auth", "imap_secure"):
secure = self.configuration.getboolean("auth", "imap_secure")
try:
if ":" in host:
address, port = host.rsplit(":", maxsplit=1)
else:
address, port = host, 143
address, port = address.strip("[] "), int(port)
except ValueError as e:
raise RuntimeError(
"Failed to parse address %r: %s" % (host, e)) from e
if sys.version_info < (3, 4) and secure:
raise RuntimeError("Secure IMAP is not availabe in Python < 3.4")
try:
connection = imaplib.IMAP4(host=address, port=port)
try:
if sys.version_info < (3, 4):
connection.starttls()
else:
connection.starttls(ssl.create_default_context())
except (imaplib.IMAP4.error, ssl.CertificateError) as e:
if secure:
raise
self.logger.debug("Failed to establish secure connection: %s",
e, exc_info=True)
try:
connection.login(user, password)
except imaplib.IMAP4.error as e:
self.logger.debug(
"IMAP authentication failed: %s", e, exc_info=True)
return False
connection.logout()
return True
except (OSError, imaplib.IMAP4.error) as e:
raise RuntimeError("Failed to communicate with IMAP server %r: "
"%s" % (host, e)) from e
def fetch(self):
"""Fetches email messages from an IMAP4 server"""
messages = {}
typ, data = self.handle.search(None, 'ALL')
for num in data[0].split():
typ, data = self.handle.fetch(num, '(RFC822)')
messages[num] = self.parse_email(data[0][1])
return messages
def disconnect(self):
"""Closes the IMAP4 handle"""
self.handle.expunge()
self.handle.close()
self.handle.logout()
def test_issue5949(self):
class EOFHandler(socketserver.StreamRequestHandler):
def handle(self):
# EOF without sending a complete welcome message.
self.wfile.write(b'* OK')
with self.reaped_server(EOFHandler) as server:
self.assertRaises(imaplib.IMAP4.abort,
self.imap_class, *server.server_address)
def test_line_termination(self):
class BadNewlineHandler(SimpleIMAPHandler):
def cmd_CAPABILITY(self, tag, args):
self._send(b'* CAPABILITY IMAP4rev1 AUTH\n')
self._send('{} OK CAPABILITY completed\r\n'.format(tag).encode('ASCII'))
with self.reaped_server(BadNewlineHandler) as server:
self.assertRaises(imaplib.IMAP4.abort,
self.imap_class, *server.server_address)
def __init__(self, account, auth_code, name='', **config):
account_name, server_name = account.split('@')
self.smtp = 'smtp.' + server_name
self.imap = 'imap.' + server_name
self.server_name = server_name
self.smtp_port = 0
self.imap_port = 0
self.use_ssl = True
self.__dict__.update(SERVER_LIB.get(server_name, {}))
self.__dict__.update(config)
self.name = '%s <%s>' % (name or account_name, account)
self.account = account
self.auth_code = auth_code
st_SMTP = smtplib.SMTP_SSL if self.use_ssl else smtplib.SMTP
st_IMAP = imaplib.IMAP4_SSL if self.use_ssl else imaplib.IMAP4
if self.smtp_port:
self.st_SMTP = lambda : st_SMTP(self.smtp, self.smtp_port)
else:
self.st_SMTP = lambda : st_SMTP(self.smtp)
if self.imap_port:
self.st_IMAP = lambda : st_IMAP(self.imap, self.imap_port)
else:
self.st_IMAP = lambda : st_IMAP(self.imap)
self.SMTP = lambda : SMTP(self)
self.IMAP = lambda : IMAP(self)
def test_issue5949(self):
class EOFHandler(SocketServer.StreamRequestHandler):
def handle(self):
# EOF without sending a complete welcome message.
self.wfile.write('* OK')
with self.reaped_server(EOFHandler) as server:
self.assertRaises(imaplib.IMAP4.abort,
self.imap_class, *server.server_address)
def test_linetoolong(self):
class TooLongHandler(SimpleIMAPHandler):
def handle(self):
# Send a very long response line
self.wfile.write('* OK ' + imaplib._MAXLINE*'x' + '\r\n')
with self.reaped_server(TooLongHandler) as server:
self.assertRaises(imaplib.IMAP4.error,
self.imap_class, *server.server_address)
def test_issue5949(self):
class EOFHandler(SocketServer.StreamRequestHandler):
def handle(self):
# EOF without sending a complete welcome message.
self.wfile.write('* OK')
with self.reaped_server(EOFHandler) as server:
self.assertRaises(imaplib.IMAP4.abort,
self.imap_class, *server.server_address)
def test_linetoolong(self):
class TooLongHandler(SimpleIMAPHandler):
def handle(self):
# Send a very long response line
self.wfile.write('* OK ' + imaplib._MAXLINE*'x' + '\r\n')
with self.reaped_server(TooLongHandler) as server:
self.assertRaises(imaplib.IMAP4.error,
self.imap_class, *server.server_address)