def close(self):
"""Closes the connection to the email server."""
if self.connection is None:
return
try:
try:
self.connection.quit()
except (ssl.SSLError, smtplib.SMTPServerDisconnected):
# This happens when calling quit() on a TLS connection
# sometimes, or when the connection was already disconnected
# by the server.
self.connection.close()
except smtplib.SMTPException:
if self.fail_silently:
return
raise
finally:
self.connection = None
python类SMTPServerDisconnected()的实例源码
def close(self):
"""Closes the connection to the email server."""
if self.connection is None:
return
try:
try:
self.connection.quit()
except (ssl.SSLError, smtplib.SMTPServerDisconnected):
# This happens when calling quit() on a TLS connection
# sometimes, or when the connection was already disconnected
# by the server.
self.connection.close()
except smtplib.SMTPException:
if self.fail_silently:
return
raise
finally:
self.connection = None
def close(self):
"""Closes the connection to the email server."""
if self.connection is None:
return
try:
try:
self.connection.quit()
except (ssl.SSLError, smtplib.SMTPServerDisconnected):
# This happens when calling quit() on a TLS connection
# sometimes, or when the connection was already disconnected
# by the server.
self.connection.close()
except smtplib.SMTPException:
if self.fail_silently:
return
raise
finally:
self.connection = None
def send_message(self, target):
try:
msg = self.messages[random.randrange(len(self.messages))]
self.server.sendmail(self.email, target, msg)
self.messages_sent += 1
self.write_to_messages_sent()
self.write_to_log("%s sent to %s" % (self.email, target))
except smtplib.SMTPServerDisconnected:
self.write_to_log("Disconnected by ban, email %s" % (self.email))
self.initialize_server()
self.write_to_log("Logged in again to %s" % (self.email))
#Put universal except in thread class
#Consider calling write_to_messages_sent only when thread is being shut down
def testNotConnected(self):
# Test various operations on an unconnected SMTP object that
# should raise exceptions (at present the attempt in SMTP.send
# to reference the nonexistent 'sock' attribute of the SMTP object
# causes an AttributeError)
smtp = smtplib.SMTP()
self.assertRaises(smtplib.SMTPServerDisconnected, smtp.ehlo)
self.assertRaises(smtplib.SMTPServerDisconnected,
smtp.send, 'test msg')
def testNotConnected(self):
# Test various operations on an unconnected SMTP object that
# should raise exceptions (at present the attempt in SMTP.send
# to reference the nonexistent 'sock' attribute of the SMTP object
# causes an AttributeError)
smtp = smtplib.SMTP()
self.assertRaises(smtplib.SMTPServerDisconnected, smtp.ehlo)
self.assertRaises(smtplib.SMTPServerDisconnected,
smtp.send, 'test msg')
def testNotConnected(self):
# Test various operations on an unconnected SMTP object that
# should raise exceptions (at present the attempt in SMTP.send
# to reference the nonexistent 'sock' attribute of the SMTP object
# causes an AttributeError)
smtp = smtplib.SMTP()
self.assertRaises(smtplib.SMTPServerDisconnected, smtp.ehlo)
self.assertRaises(smtplib.SMTPServerDisconnected,
smtp.send, 'test msg')
def testNotConnected(self):
# Test various operations on an unconnected SMTP object that
# should raise exceptions (at present the attempt in SMTP.send
# to reference the nonexistent 'sock' attribute of the SMTP object
# causes an AttributeError)
smtp = smtplib.SMTP()
self.assertRaises(smtplib.SMTPServerDisconnected, smtp.ehlo)
self.assertRaises(smtplib.SMTPServerDisconnected,
smtp.send, 'test msg')
def test_with_statement(self):
with smtplib.SMTP(HOST, self.port) as smtp:
code, message = smtp.noop()
self.assertEqual(code, 250)
self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo')
with smtplib.SMTP(HOST, self.port) as smtp:
smtp.close()
self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo')
def disconnect(self):
if self.connected:
try:
self.smtp_server.quit()
except smtplib.SMTPServerDisconnected as e:
pass
self.connected = False
self.smtp_server = None
def testNotConnected(self):
# Test various operations on an unconnected SMTP object that
# should raise exceptions (at present the attempt in SMTP.send
# to reference the nonexistent 'sock' attribute of the SMTP object
# causes an AttributeError)
smtp = smtplib.SMTP()
self.assertRaises(smtplib.SMTPServerDisconnected, smtp.ehlo)
self.assertRaises(smtplib.SMTPServerDisconnected,
smtp.send, 'test msg')
def testNotConnected(self):
# Test various operations on an unconnected SMTP object that
# should raise exceptions (at present the attempt in SMTP.send
# to reference the nonexistent 'sock' attribute of the SMTP object
# causes an AttributeError)
smtp = smtplib.SMTP()
self.assertRaises(smtplib.SMTPServerDisconnected, smtp.ehlo)
self.assertRaises(smtplib.SMTPServerDisconnected,
smtp.send, 'test msg')
def test_with_statement(self):
with smtplib.SMTP(HOST, self.port) as smtp:
code, message = smtp.noop()
self.assertEqual(code, 250)
self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo')
with smtplib.SMTP(HOST, self.port) as smtp:
smtp.close()
self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo')
def test_with_statement_QUIT_failure(self):
with self.assertRaises(smtplib.SMTPResponseException) as error:
with smtplib.SMTP(HOST, self.port) as smtp:
smtp.noop()
self.serv._SMTPchannel.quit_response = '421 QUIT FAILED'
self.assertEqual(error.exception.smtp_code, 421)
self.assertEqual(error.exception.smtp_error, b'QUIT FAILED')
#TODO: add tests for correct AUTH method fallback now that the
#test infrastructure can support it.
# Issue 17498: make sure _rset does not raise SMTPServerDisconnected exception
def testNotConnected(self):
# Test various operations on an unconnected SMTP object that
# should raise exceptions (at present the attempt in SMTP.send
# to reference the nonexistent 'sock' attribute of the SMTP object
# causes an AttributeError)
smtp = smtplib.SMTP()
self.assertRaises(smtplib.SMTPServerDisconnected, smtp.ehlo)
self.assertRaises(smtplib.SMTPServerDisconnected,
smtp.send, 'test msg')
def testNotConnected(self):
# Test various operations on an unconnected SMTP object that
# should raise exceptions (at present the attempt in SMTP.send
# to reference the nonexistent 'sock' attribute of the SMTP object
# causes an AttributeError)
smtp = smtplib.SMTP()
self.assertRaises(smtplib.SMTPServerDisconnected, smtp.ehlo)
self.assertRaises(smtplib.SMTPServerDisconnected,
smtp.send, 'test msg')
def test_with_statement(self):
with smtplib.SMTP(HOST, self.port) as smtp:
code, message = smtp.noop()
self.assertEqual(code, 250)
self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo')
with smtplib.SMTP(HOST, self.port) as smtp:
smtp.close()
self.assertRaises(smtplib.SMTPServerDisconnected, smtp.send, b'foo')
def test_with_statement_QUIT_failure(self):
with self.assertRaises(smtplib.SMTPResponseException) as error:
with smtplib.SMTP(HOST, self.port) as smtp:
smtp.noop()
self.serv._SMTPchannel.quit_response = '421 QUIT FAILED'
self.assertEqual(error.exception.smtp_code, 421)
self.assertEqual(error.exception.smtp_error, b'QUIT FAILED')
#TODO: add tests for correct AUTH method fallback now that the
#test infrastructure can support it.
# Issue 17498: make sure _rset does not raise SMTPServerDisconnected exception
def start(self):
smtp = smtplib.SMTP(self.mta)
while self.running:
try:
(sender, receivers, subject, message, attachments) = MAIL_QUEUE.get(block=True, timeout=2)
self._send_mail(smtp, sender, receivers, subject, message, attachments)
except Empty:
pass
except smtplib.SMTPServerDisconnected:
# Reconnect and retry once
smtp.connect(self.mta)
self._send_mail(smtp, sender, receivers, subject, message, attachments)
except Exception:
logger.exception("Unhandled exception in mail thread")
smtp.quit()