def as_mailobj(self):
# Create the root message and fill in the from, to, and subject headers
msgRoot = MIMEMultipart('related')
msgRoot['Subject'] = self.subject
msgRoot['From'] = self.get_sender()
msgRoot['To'] = ", ".join(self.recipients)
msgRoot.preamble = 'Generated by mailprod python tool'
for i in range(len(self.headers)):
name, value, params = self.headers[i]
msgRoot.add_header(name, value, **params)
for i in range(len(self.body)):
part = self.body[i]
msgRoot.attach(part)
for i in range(len(self.attachments)):
attachment = self.attachments[i]
msgRoot.attach(attachment)
return msgRoot
python类MIMEMultipart()的实例源码
def test_mime_attachments_in_constructor(self):
eq = self.assertEqual
text1 = MIMEText('')
text2 = MIMEText('')
msg = MIMEMultipart(_subparts=(text1, text2))
eq(len(msg.get_payload()), 2)
eq(msg.get_payload(0), text1)
eq(msg.get_payload(1), text2)
# A general test of parser->model->generator idempotency. IOW, read a message
# in, parse it into a message object tree, then without touching the tree,
# regenerate the plain text. The original text and the transformed text
# should be identical. Note: that we ignore the Unix-From since that may
# contain a changed date.
def send_email_account(remote_server, remote_port, username, password, email_from, email_to, subject, body):
if (remote_server == "smtp.gmail.com"):
send_email_gmail(username, password, email_from, email_to, subject, body)
else:
# connect to remote mail server and forward message on
server = smtplib.SMTP(remote_server, remote_port)
msg = MIMEMultipart()
msg['From'] = email_from
msg['To'] = email_to
msg['Subject'] = subject
msg.attach(MIMEText(body, 'plain'))
smtp_sendmail_return = ""
try:
server.login(username, password)
smtp_sendmail_return = server.sendmail(email_from, email_to, msg.as_string())
except Exception, e:
print 'SMTP Exception:\n' + str( e) + '\n' + str( smtp_sendmail_return)
finally:
server.quit()
def send_email_direct(email_from, email_to, subject, body):
# find the appropiate mail server
domain = email_to.split('@')[1]
remote_server = get_mx_record(domain)
if (remote_server is None):
print "No valid email server could be found for [%s]!" % (email_to)
return
# connect to remote mail server and forward message on
server = smtplib.SMTP(remote_server, 25)
msg = MIMEMultipart()
msg['From'] = email_from
msg['To'] = email_to
msg['Subject'] = subject
msg.attach(MIMEText(body, 'plain'))
smtp_sendmail_return = ""
try:
smtp_sendmail_return = server.sendmail(email_from, email_to, msg.as_string())
except Exception, e:
print 'SMTP Exception:\n' + str( e) + '\n' + str( smtp_sendmail_return)
finally:
server.quit()
def test__all__(self):
module = __import__('email')
all = module.__all__
all.sort()
self.assertEqual(all, [
# Old names
'Charset', 'Encoders', 'Errors', 'Generator',
'Header', 'Iterators', 'MIMEAudio', 'MIMEBase',
'MIMEImage', 'MIMEMessage', 'MIMEMultipart',
'MIMENonMultipart', 'MIMEText', 'Message',
'Parser', 'Utils', 'base64MIME',
# new names
'base64mime', 'charset', 'encoders', 'errors', 'generator',
'header', 'iterators', 'message', 'message_from_file',
'message_from_string', 'mime', 'parser',
'quopriMIME', 'quoprimime', 'utils',
])
def sendmail(toaddr, subject, body):
""" ???????
:param SMTPSession session: ???????????
:param str fromaddr: ???
:param list toaddr: ?????
:param str subject: ????
:param str body: ????
:rtype: dict: ????????????
"""
session = SMTPSession
fromaddr = ALERT_EMAIL_FROM
msg = MIMEMultipart()
msg['From'] = fromaddr
msg['To'] = ', '.join(toaddr)
msg['Subject'] = subject
msg.attach(MIMEText(body, 'plain'))
text = msg.as_string()
yield session.send(fromaddr, toaddr, text)
# ????????????????
# send_alert_mail = partial(sendmail, session=SMTPSession,
# fromaddr=ALERT_EMAIL_FROM)
alert_io_task.py 文件源码
项目:lustre_task_driven_monitoring_framework
作者: GSI-HPC
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def _send_mail(self, args):
if args is None:
raise RuntimeError("Passed argument for send mail is not set!")
if len(args) != 2:
raise RuntimeError("Passed argument for send mail has invalid number of arguments!")
subject = args[0]
text = args[1]
msg = MIMEMultipart()
msg['Subject'] = subject
msg['From'] = self.mail_sender
msg['To'] = ', '.join(self.mail_receiver_list)
msg.attach(MIMEText(text))
msg_string = msg.as_string()
logging.debug(msg_string)
smtp_conn = smtplib.SMTP(self.mail_server)
smtp_conn.sendmail(self.mail_sender, self.mail_receiver_list, msg_string)
smtp_conn.quit()
def send(To, Subject, Body, Cc=[], Bcc=[], html=False, files=[]):
"""Send an email
"""
subtype = 'html' if html else 'plain'
message = MIMEMultipart()
message['To'] = ', '.join(To)
message['Subject'] = Subject
message['Cc'] = ', '.join(Cc)
message['Bcc'] = ', '.join(Bcc)
message.attach(MIMEText(Body, subtype))
for f in files:
with open(f, "rb") as In:
part = MIMEApplication(In.read(), Name=basename(f))
part['Content-Disposition'] = 'attachment; filename="%s"' % basename(f)
message.attach(part)
message = {'raw': base64.urlsafe_b64encode(message.as_string())}
credentials = oauth2client.file.Storage(CREDENTIALS_PATH).get()
Http = credentials.authorize(httplib2.Http())
service = discovery.build('gmail', 'v1', http=Http)
message = service.users().messages().send(userId='me', body=message).execute()
def sender(username, password, toaddress, email_text):
fromaddr = username
toaddrs = toaddress
now = datetime.datetime.now()
msg = MIMEMultipart()
msg['From'] = fromaddr
msg['To'] = toaddrs
msg['Subject'] = "Tv shows [ " + now.strftime('%Y/%m/%d') + " ]"
msg.attach(MIMEText(email_text, 'plain'))
text = msg.as_string()
# Credentials (if needed)
username = username
password = password
# The actual mail send
server = smtplib.SMTP('smtp.gmail.com:587')
server.starttls()
server.login(username, password)
server.sendmail(fromaddr, toaddrs, text)
server.quit()
def test__all__(self):
module = __import__('email')
all = module.__all__
all.sort()
self.assertEqual(all, [
# Old names
'Charset', 'Encoders', 'Errors', 'Generator',
'Header', 'Iterators', 'MIMEAudio', 'MIMEBase',
'MIMEImage', 'MIMEMessage', 'MIMEMultipart',
'MIMENonMultipart', 'MIMEText', 'Message',
'Parser', 'Utils', 'base64MIME',
# new names
'base64mime', 'charset', 'encoders', 'errors', 'generator',
'header', 'iterators', 'message', 'message_from_file',
'message_from_string', 'mime', 'parser',
'quopriMIME', 'quoprimime', 'utils',
])
def test_get_msg_from_string_multipart(self):
msg = MIMEMultipart()
msg['Subject'] = 'Test multipart mail'
msg.attach(MIMEText(u'a utf8 message', _charset='utf-8'))
adaptor = self.get_adaptor()
msg = adaptor.get_msg_from_string(MessageClass, msg.as_string())
self.assertEqual(
'base64', msg.wrapper.cdocs[1].content_transfer_encoding)
self.assertEqual(
'text/plain', msg.wrapper.cdocs[1].content_type)
self.assertEqual(
'utf-8', msg.wrapper.cdocs[1].charset)
self.assertEqual(
'YSB1dGY4IG1lc3NhZ2U=\n', msg.wrapper.cdocs[1].raw)
def test__all__(self):
module = __import__('email')
all = module.__all__
all.sort()
self.assertEqual(all, [
# Old names
'Charset', 'Encoders', 'Errors', 'Generator',
'Header', 'Iterators', 'MIMEAudio', 'MIMEBase',
'MIMEImage', 'MIMEMessage', 'MIMEMultipart',
'MIMENonMultipart', 'MIMEText', 'Message',
'Parser', 'Utils', 'base64MIME',
# new names
'base64mime', 'charset', 'encoders', 'errors', 'generator',
'header', 'iterators', 'message', 'message_from_file',
'message_from_string', 'mime', 'parser',
'quopriMIME', 'quoprimime', 'utils',
])
def sendEmailPrintable(textToSend,fileToSend,addressToSend, pathToFile, subject):
msg = MIMEMultipart()
msg['From'] = printableFromAddress
msg['To'] = addressToSend
msg['Subject'] = subject
msg.attach(MIMEText(textToSend,'plain'))
attachment = open(pathToFile+fileToSend, "rb")
part = MIMEBase('application','octet-stream')
part.set_payload((attachment).read())
encoders.encode_base64(part)
part.add_header('Content-Disposition', "attachment; filename= %s" % fileToSend)
msg.attach(part)
server = smtplib.SMTP(emailServer, 25)
#server.starttls()
text = msg.as_string()
server.sendmail(printableFromAddress, addressToSend.split(","), text)
server.quit
attachment.close()
def sendmail(to_list,subject,content):
# translation
me = mail_user+"<"+mail_user+"@"+mail_postfix+">"
msg = MIMEMultipart('related')
msg['Subject'] = email.Header.Header(subject,'utf-8')
msg['From'] = me
msg['To'] = ";".join(to_list)
msg.preamble = 'This is a multi-part message in MIME format.'
msgAlternative = MIMEMultipart('alternative')
msgText = MIMEText(content, 'plain', 'utf-8')
msgAlternative.attach(msgText)
msg.attach(msgAlternative)
try:
s = smtplib.SMTP()
s.connect(mail_host)
s.login(mail_user,mail_pwd)
s.sendmail(me, to_list, msg.as_string())
s.quit()
except Exception,e:
print e
return False
return True
def sendhtmlmail(to_list,subject,content):
# translation
me = mail_user+"<"+mail_user+"@"+mail_postfix+">"
msg = MIMEMultipart('related')
msg['Subject'] = email.Header.Header(subject,'utf-8')
msg['From'] = me
msg['To'] = ";".join(to_list)
msg.preamble = 'This is a multi-part message in MIME format.'
msgAlternative = MIMEMultipart('alternative')
msgText = MIMEText(content, 'html', 'utf-8')
msgAlternative.attach(msgText)
msg.attach(msgAlternative)
try:
s = smtplib.SMTP()
s.connect(mail_host)
s.login(mail_user,mail_pwd)
s.sendmail(me, to_list, msg.as_string())
s.quit()
except Exception,e:
print e
return False
return True
def send_mail(server, from_address, from_name, to_address, subject, body_arg, return_addr, tls):
msg = MIMEMultipart('alternative')
msg['To'] = to_address
msg['Date'] = formatdate(localtime = True)
msg['Subject'] = subject
msg.add_header('reply-to', return_addr)
msg.add_header('from', from_name + "<" + from_address + ">")
part1 = MIMEText(body_arg, 'plain')
part2 = MIMEText(body_arg, 'html')
msg.attach(part1)
msg.attach(part2)
try:
smtp = smtplib.SMTP(server)
smtp.set_debuglevel(VERBOSE)
smtp.sendmail(from_address, to_address, msg.as_string())
smtp.quit()
except smtplib.SMTPException:
print FAIL + "[-] Error: unable to send email to ", to_address, ENDC
# --------------------- PARSE includes/config.massmail -------------------
def notify(self, event):
try:
msg = MIMEMultipart()
msg['From'] = self.emailer_settings['from_address']
msg['To'] = self.emailer_settings['to_address']
msg['Subject'] = "{0} {1}".format(event['subject'], event['scanname'])
targets = "".join(["<li>{0}</li>".format(t) for t in event['targets']]) if event['targets'] else ""
html = str(self.emailer_settings['email_template']).format(event['server'],
event['scanname'],
event['scanid'],
event['subject'],
targets)
msg.attach(MIMEText(html, 'html'))
mail_server = smtplib.SMTP(self.emailer_settings['smtp_host'], self.emailer_settings['smtp_port'])
mail_server.sendmail(msg['From'], msg['To'], msg.as_string())
mail_server.quit()
except (Exception, AttributeError) as e: # we don't want email failure to stop us, just log that it happened
Logger.app.error("Your emailer settings in config.ini is incorrectly configured. Error: {}".format(e))
def cloudscan_notify(self, recipient, subject, git_url, ssc_url, state, scan_id, scan_name):
try:
msg = MIMEMultipart()
msg['From'] = self.from_address
msg['To'] = recipient
msg['Subject'] = subject
html = str(self.email_template).format(git_url, ssc_url, scan_name, scan_id, state, self.chatroom)
msg.attach(MIMEText(html, 'html'))
mail_server = smtplib.SMTP(self.smtp_host, self.smtp_port)
mail_server.sendmail(msg['From'], msg['To'], msg.as_string())
mail_server.quit()
except (Exception, AttributeError) as e: # we don't want email failure to stop us, just log that it happened
Logger.app.error("Your emailer settings in config.ini is incorrectly configured. Error: {}".format(e))
def test__all__(self):
module = __import__('email')
all = module.__all__
all.sort()
self.assertEqual(all, [
# Old names
'Charset', 'Encoders', 'Errors', 'Generator',
'Header', 'Iterators', 'MIMEAudio', 'MIMEBase',
'MIMEImage', 'MIMEMessage', 'MIMEMultipart',
'MIMENonMultipart', 'MIMEText', 'Message',
'Parser', 'Utils', 'base64MIME',
# new names
'base64mime', 'charset', 'encoders', 'errors', 'generator',
'header', 'iterators', 'message', 'message_from_file',
'message_from_string', 'mime', 'parser',
'quopriMIME', 'quoprimime', 'utils',
])
def test__all__(self):
module = __import__('email')
all = module.__all__
all.sort()
self.assertEqual(all, [
# Old names
'Charset', 'Encoders', 'Errors', 'Generator',
'Header', 'Iterators', 'MIMEAudio', 'MIMEBase',
'MIMEImage', 'MIMEMessage', 'MIMEMultipart',
'MIMENonMultipart', 'MIMEText', 'Message',
'Parser', 'Utils', 'base64MIME',
# new names
'base64mime', 'charset', 'encoders', 'errors', 'generator',
'header', 'iterators', 'message', 'message_from_file',
'message_from_string', 'mime', 'parser',
'quopriMIME', 'quoprimime', 'utils',
])
def test__all__(self):
module = __import__('email')
all = module.__all__
all.sort()
self.assertEqual(all, [
# Old names
'Charset', 'Encoders', 'Errors', 'Generator',
'Header', 'Iterators', 'MIMEAudio', 'MIMEBase',
'MIMEImage', 'MIMEMessage', 'MIMEMultipart',
'MIMENonMultipart', 'MIMEText', 'Message',
'Parser', 'Utils', 'base64MIME',
# new names
'base64mime', 'charset', 'encoders', 'errors', 'generator',
'header', 'iterators', 'message', 'message_from_file',
'message_from_string', 'mime', 'parser',
'quopriMIME', 'quoprimime', 'utils',
])
def send_mail_reinit(destination_address, token):
source_address = info_mail.mail
body = "Rendez-vous à cette adresse pour réinitialiser votre mot de passe - localhost:5000/changer-mot-de-passe/" + token
subject = "Récupération du mot de passe CMS TP2"
msg = MIMEMultipart()
msg['Subject'] = subject
msg['From'] = source_address
msg['To'] = destination_address
msg.attach(MIMEText(body, 'plain'))
server = smtplib.SMTP('smtp.gmail.com', 587)
server.starttls()
server.login(source_address, info_mail.mdp)
text = msg.as_string()
server.sendmail(source_address, destination_address, text)
server.quit()
def send_mail_invite(destination_address, token):
source_address = info_mail.mail
body = "Rendez-vous à cette adresse pour vous inscrire en tant qu'admin sur CMS TP2 - localhost:5000/inscription/" + token
subject = "Invitation au rôle d'admin CMS TP2"
msg = MIMEMultipart()
msg['Subject'] = subject
msg['From'] = source_address
msg['To'] = destination_address
msg.attach(MIMEText(body, 'plain'))
server = smtplib.SMTP('smtp.gmail.com', 587)
server.starttls()
server.login(source_address, info_mail.mdp)
text = msg.as_string()
server.sendmail(source_address, destination_address, text)
server.quit()
def send_mail( server, subject, sender, receiver, text, filepath ):
msg = MIMEMultipart()
msg['Subject'] = subject
msg['From'] = sender
msg['To'] = ', '.join( receiver )
msg.attach( MIMEText( text ) )
filename = os.path.split( filepath )[1]
filetype = filename.split( '.' )[1]
img = MIMEImage( file( filepath ).read(), _subtype=filetype )
img.add_header( 'Content-Disposition', "attachment; filename= %s" % filename )
msg.attach( img )
mail_server = smtplib.SMTP( server )
mail_server.sendmail( sender, receiver, msg.as_string() )
mail_server.quit()
def test__all__(self):
module = __import__('email')
all = module.__all__
all.sort()
self.assertEqual(all, [
# Old names
'Charset', 'Encoders', 'Errors', 'Generator',
'Header', 'Iterators', 'MIMEAudio', 'MIMEBase',
'MIMEImage', 'MIMEMessage', 'MIMEMultipart',
'MIMENonMultipart', 'MIMEText', 'Message',
'Parser', 'Utils', 'base64MIME',
# new names
'base64mime', 'charset', 'encoders', 'errors', 'generator',
'header', 'iterators', 'message', 'message_from_file',
'message_from_string', 'mime', 'parser',
'quopriMIME', 'quoprimime', 'utils',
])
es_query_ipv4_outhtml_outmail.py 文件源码
项目:es_email_intel
作者: xujun10110
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def send_mail(report_contents):
msg = MIMEMultipart()
msg['Subject'] = SUBJECT
msg['From'] = EMAIL_FROM
msg['To'] = ', '.join(EMAIL_TO)
fp = open('/home/pierre/es_email_intel/wordcloud.png', 'rb')
try:
msgImage = MIMEImage(fp.read())
except:
fp = open('/home/pierre/es_email_intel/1x1.png', 'rb')
msgImage = MIMEImage(fp.read())
fp.close()
msgImage.add_header('Content-ID', '<wordcloud>')
msg.attach(msgImage)
part = MIMEBase('application', "octet-stream")
part.set_payload(report_contents)
Encoders.encode_base64(part)
part.add_header('Content-Disposition', 'attachment; filename="report.html"')
msg.attach(part)
server = smtplib.SMTP(EMAIL_SERVER)
server.sendmail(EMAIL_FROM, EMAIL_TO, msg.as_string())
def test__all__(self):
module = __import__('email')
all = module.__all__
all.sort()
self.assertEqual(all, [
# Old names
'Charset', 'Encoders', 'Errors', 'Generator',
'Header', 'Iterators', 'MIMEAudio', 'MIMEBase',
'MIMEImage', 'MIMEMessage', 'MIMEMultipart',
'MIMENonMultipart', 'MIMEText', 'Message',
'Parser', 'Utils', 'base64MIME',
# new names
'base64mime', 'charset', 'encoders', 'errors', 'generator',
'header', 'iterators', 'message', 'message_from_file',
'message_from_string', 'mime', 'parser',
'quopriMIME', 'quoprimime', 'utils',
])
def test__all__(self):
module = __import__('email')
all = module.__all__
all.sort()
self.assertEqual(all, [
# Old names
'Charset', 'Encoders', 'Errors', 'Generator',
'Header', 'Iterators', 'MIMEAudio', 'MIMEBase',
'MIMEImage', 'MIMEMessage', 'MIMEMultipart',
'MIMENonMultipart', 'MIMEText', 'Message',
'Parser', 'Utils', 'base64MIME',
# new names
'base64mime', 'charset', 'encoders', 'errors', 'generator',
'header', 'iterators', 'message', 'message_from_file',
'message_from_string', 'mime', 'parser',
'quopriMIME', 'quoprimime', 'utils',
])
def test__all__(self):
module = __import__('email')
all = module.__all__
all.sort()
self.assertEqual(all, [
# Old names
'Charset', 'Encoders', 'Errors', 'Generator',
'Header', 'Iterators', 'MIMEAudio', 'MIMEBase',
'MIMEImage', 'MIMEMessage', 'MIMEMultipart',
'MIMENonMultipart', 'MIMEText', 'Message',
'Parser', 'Utils', 'base64MIME',
# new names
'base64mime', 'charset', 'encoders', 'errors', 'generator',
'header', 'iterators', 'message', 'message_from_file',
'message_from_string', 'mime', 'parser',
'quopriMIME', 'quoprimime', 'utils',
])
def __init__(self, to, efrom, subject, text, attach, user, pw, server, port, callback=None):
self.msg = MIMEMultipart()
self.text = text
self.attach = attach
self.to = to
self.efrom = efrom
self.callbackfunction=callback
self.gmail_user = user
self.gmail_pwd = pw
self.server=server
self.port=port
self.msg['From'] = self.efrom
self.msg['To'] = self.to
self.msg['Subject'] = subject
print "SENDMAIL: Sending started."
thread=threading.Thread.__init__(self)
return thread