def read(username, password, sender_of_interest):
# Login to INBOX
imap = imaplib.IMAP4_SSL("imap.gmail.com", 993)
imap.login(username, password)
imap.select('INBOX')
status, response = imap.search(None, '(UNSEEN)')
unread_msg_nums = response[0].split()
# Print the count of all unread messages
print len(unread_msg_nums)
# Print all unread messages from a certain sender of interest
status, response = imap.search(None, '(UNSEEN)', '(FROM "%s")' % (sender_of_interest))
unread_msg_nums = response[0].split()
da = []
for e_id in unread_msg_nums:
_, response = imap.fetch(e_id, '(UID BODY[TEXT])')
da.append(response[0][1])
print da
# Mark them as seen
for e_id in unread_msg_nums:
imap.store(e_id, '+FLAGS', '\Seen')
python类IMAP4_SSL的实例源码
def init_server(self):
self.logger.info("Initializing SMTP/IMAP servers.")
# PySMS at minimum uses smtp server
try:
if self.ssl:
self.smtp = smtplib.SMTP_SSL(self.smtp_server, self.smtp_port)
else:
self.smtp = smtplib.SMTP(self.smtp_server, self.smtp_port)
self.smtp.starttls()
self.smtp.login(self.address, self.password)
except smtplib.SMTPException:
raise PySMSException("Unable to start smtp server, please check credentials.")
# If responding functionality is enabled
if self.imap_server:
try:
if self.ssl:
self.imap = imaplib.IMAP4_SSL(self.imap_server)
else:
self.imap = imaplib.IMAP4(self.imap_server)
r, data = self.imap.login(self.address, self.password)
if r == "OK":
r, data = self.imap.select(self.imap_mailbox)
if r != "OK":
raise PySMSException("Unable to select mailbox: {0}".format(self.imap_mailbox))
else:
raise PySMSException("Unable to login to IMAP server with given credentials.")
except imaplib.IMAP4.error:
raise PySMSException("Unable to start IMAP server, please check address and SSL/TLS settings.")
def test_imap_old(user):
storage = Storage('credentials_file')
credentials = storage.get()
xoauth = xoauth2_str(user, credentials.access_token)
conn = imaplib.IMAP4_SSL('imap.googlemail.com')
conn.debug = 4
conn.authenticate('XOAUTH2', lambda x: xoauth)
status, labels = conn.list()
conn.select("[Gmail]/All Mail")
# Once authenticated everything from the impalib.IMAP4_SSL class will
# work as per usual without any modification to your code.
typ, msgnums = conn.search(None, 'X-GM-RAW', 'vget')
print 'typ', typ
print 'num', msgnums
# conn.select('INBOX')
# print conn.list()
def open_connection(verbose=False):
# Read the config file
config = configparser.ConfigParser()
config.read([os.path.expanduser('~/.pymotw')])
# Connect to the server
hostname = config.get('server', 'hostname')
if verbose:
print('Connecting to', hostname)
connection = imaplib.IMAP4_SSL(hostname)
# Login to our account
username = config.get('account', 'username')
password = config.get('account', 'password')
if verbose:
print('Logging in as', username)
connection.login(username, password)
return connection
def connect(self):
"""Connects to and authenticates with an IMAP4 mail server"""
import imaplib
M = None
try:
if (self.keyfile and self.certfile) or self.ssl:
M = imaplib.IMAP4_SSL(self.host, self.port, self.keyfile, self.certfile)
else:
M = imaplib.IMAP4(self.host, self.port)
M.login(self.username, self.password)
M.select()
except socket.error, err:
raise
else:
return M
def __init__(self, host='', port=None, ssl=True, keyfile=None, certfile=None, ssl_context=None):
"""
:param host: host's name (default: localhost)
:param port: port number (default: standard IMAP4 SSL port)
:param ssl: use client class over SSL connection (IMAP4_SSL) if True, else use IMAP4
:param keyfile: PEM formatted file that contains your private key (default: None)
:param certfile: PEM formatted certificate chain file (default: None)
:param ssl_context: SSLContext object that contains your certificate chain and private key (default: None)
Note: if ssl_context is provided, then parameters keyfile or
certfile should not be set otherwise ValueError is raised.
"""
self._host = host
self._port = port
self._keyfile = keyfile
self._certfile = certfile
self._ssl_context = ssl_context
if ssl:
self.box = imaplib.IMAP4_SSL(
host, port or imaplib.IMAP4_SSL_PORT, keyfile, certfile, ssl_context)
else:
self.box = imaplib.IMAP4(host, port or imaplib.IMAP4_PORT)
self._username = None
self._password = None
self._initial_folder = None
self.folder = None
def read(hostname, username, password):
# Get the imap server for the host
hostname = 'imap.' + hostname.lower()
# Start SSL connection with the host server
server = IMAP4_SSL(hostname)
try:
# Login to the given account
server.login(username, password)
# Select the Inbox and make it read only
server.select('INBOX', readonly=True)
# Check how many unread messages are in the inbox
msgnums = server.search(None, 'UnSeen')
if(msgnums[0] == 'OK'):
# List how many unread messages there are [1,2, 3, ...]
for num in msgnums[1]:
# Return number of unread messages (The last one in the list msgnums)
return (num[-1])
else:
return ('Failed')
except IMAP4.error:
return ('Failed')
def email_verify(plusmail, googlepass):
time.sleep(5)
#Waiting 5 seconds before checking email
email_address = plusmail
M = imaplib.IMAP4_SSL('imap.gmail.com')
try:
M.login(email_address, googlepass)
print "Logged in to: " + email_address
rv, mailboxes = M.list()
rv, data = M.select("INBOX")
if rv == 'OK':
print "Processing mailbox..."
proc_mail(M)
M.close()
M.logout()
except imaplib.IMAP4.error:
print "Unable to login to: " + email_address + ". Was not verified\n"
def listen():
app_exfiltrate.log_message('info', "[gmail] Listening for mails...")
client_imap = imaplib.IMAP4_SSL(server)
try:
client_imap.login(gmail_user, gmail_pwd)
except:
app_exfiltrate.log_message(
'warning', "[gmail] Did not manage to authenticate with creds: {}:{}".format(gmail_user, gmail_pwd))
sys.exit(-1)
while True:
client_imap.select("INBOX")
typ, id_list = client_imap.uid(
'search', None, "(UNSEEN SUBJECT 'det:toolkit')")
for msg_id in id_list[0].split():
msg_data = client_imap.uid('fetch', msg_id, '(RFC822)')
raw_email = msg_data[1][0][1]
# continue inside the same for loop as above
raw_email_string = raw_email.decode('utf-8')
# converts byte literal to string removing b''
email_message = email.message_from_string(raw_email_string)
# this will loop through all the available multiparts in mail
for part in email_message.walk():
if part.get_content_type() == "text/plain": # ignore attachments/html
body = part.get_payload(decode=True)
data = body.split('\r\n')[0]
# print data
try:
app_exfiltrate.retrieve_data(base64.b64decode(data))
except Exception, e:
print e
else:
continue
time.sleep(2)
def imap(usr, pw, host):
socket.setdefaulttimeout(time_out)
usr = usr.lower()
try:
if len(host) < 2:
port = 993
else:
port = int(host[1])
mail = imaplib.IMAP4_SSL(str(host[0]), port)
a = str(mail.login(usr, pw))
return a[2: 4]
except imaplib.IMAP4.error:
return False
except BaseException:
return "Error"
#/-----------------IMAP-------------------------#
#------GETUNKNOWN--HOST--------------------------#
def getunknown_imap(subb):
socket.setdefaulttimeout(time_out)
try:
# TODO: Change to dynamic matchers
sub = [
'imap',
'mail',
'pop',
'pop3',
'imap-mail',
'inbound',
'mx',
'imaps',
'smtp',
'm']
for host in sub:
host = host + '.' + subb
try:
mail = imaplib.IMAP4_SSL(str(host))
mail.login('test', 'test')
except imaplib.IMAP4.error:
return host
except BaseException:
return None
def reconnect(self):
auth_attempts = 0
while auth_attempts <= constants.MAX_RECONNECT_RETRIES:
auth_attempts += 1
try:
self.mail = imaplib.IMAP4_SSL(pub_config['gmail']['imap'])
u, data = self.mail.login(private_config['gmail']['username'],
private_config['gmail']['password'])
return
except imaplib.IMAP4.error as e:
logging.error("Login to retrieve emails failed!")
logging.error(e)
except Exception as e:
logging.error("Error from IMAP! Trying to reconnect...")
logging.error(e)
time.sleep(constants.SMTP_RECONNECT_SLEEP_TIME)
if auth_attempts > constants.MAX_RECONNECT_RETRIES:
from pnu.outbound.alerter import smtp
logging.critical("Could not connect to IMAP handler")
smtp.send_error("Could not connect to IMAP handler")
def connect(self, raise_errors=True):
# try:
# self.imap = imaplib.IMAP4_SSL(self.GMAIL_IMAP_HOST, self.GMAIL_IMAP_PORT)
# except socket.error:
# if raise_errors:
# raise Exception('Connection failure.')
# self.imap = None
self.imap = imaplib.IMAP4_SSL(self.GMAIL_IMAP_HOST, self.GMAIL_IMAP_PORT)
# self.smtp = smtplib.SMTP(self.server,self.port)
# self.smtp.set_debuglevel(self.debug)
# self.smtp.ehlo()
# self.smtp.starttls()
# self.smtp.ehlo()
return self.imap
def readMail(self, subject):
connection = imaplib.IMAP4_SSL('imap.mail.yahoo.com')
connection.login('pnikoulis', 'ai024709th3669')
connection.select(readonly=True)
typ, data = connection.search(None, 'SUBJECT', subject)
ids = data[0].split()
id = ids[0]
typ, data = connection.fetch(id, '(RFC822)')
message = email.message_from_string(data[0][1])
print '------------------------------------------> Message', id
print 'Date:', message['Date']
print 'From:', message['From']
print 'Subject:', message['Subject']
self.positions = getPositionsFromMessage(message)
#------------------------------------------------------------------------
# Check if a position is in portfolio (by checking symbol and exit date)
#------------------------------------------------------------------------
def readMail(self, subject):
connection = imaplib.IMAP4_SSL('imap.mail.yahoo.com')
connection.login('pnikoulis', 'ai024709th3669')
connection.select(readonly=True)
typ, data = connection.search(None, 'SUBJECT', subject)
ids = data[0].split()
id = ids[0]
typ, data = connection.fetch(id, '(RFC822)')
message = email.message_from_string(data[0][1])
print '------------------------------------------> Message', id
print 'Date:', message['Date']
print 'From:', message['From']
print 'Subject:', message['Subject']
self.positions = getPositionsFromMessage(message)
#------------------------------------------------------------------------
# Check if a position is in portfolio (by checking symbol and exit date)
#------------------------------------------------------------------------
def connect(self):
if self.source_id and self.source_id.id:
self.ensure_one()
if self.source_id.type == 'imap':
if self.source_id.is_ssl:
connection = IMAP4_SSL(self.source_id.server, int(self.source_id.port))
else:
connection = IMAP4(self.source_id.server, int(self.source_id.port))
connection.login(self.user, self.password)
elif self.type == 'pop':
if self.source_id.is_ssl:
connection = POP3_SSL(self.source_id.server, int(self.source_id.port))
else:
connection = POP3(self.source_id.server, int(self.source_id.port))
# TODO: use this to remove only unread messages
# connection.user("recent:"+server.user)
connection.user(self.user)
connection.pass_(self.password)
# Add timeout on socket
connection.sock.settimeout(MAIL_TIMEOUT)
return connection
return super(vieterp_fetchmail_server, self).connect()
def read(self):
try:
# connect to server
server = imaplib.IMAP4_SSL('imap.' + self.ID[self.ID.index('@') + 1:])
# login
server.login(self.ID, self.PASSWORD)
server.list()
server.select()
typ, data = server.search(None, 'ALL')
for num in data[0].split():
typ, data = server.fetch(num, '(RFC822)')
print('Message %s\n%s\n' % (num, data[0][1]))
server.logout()
except Exception as e:
print("Reading is failed : {}".format(e.__str__()))
def login(self, username, password):
self.M = imaplib.IMAP4_SSL(self.IMAP_SERVER, self.IMAP_PORT)
self.S = smtplib.SMTP_SSL(self.SMTP_SERVER, self.SMTP_PORT)
rc, self.response = self.M.login(username, password)
sc, self.response_s = self.S.login(username, password)
self.username = username
return rc, sc
def test_imap(user):
credentials = get_credentials()
conn = IMAPClient('imap.googlemail.com', use_uid=True, ssl=True)
# conn.debug = 4
conn.oauth2_login(user, credentials.access_token)
# status, labels = conn.list()
folders = conn.list_folders()
try:
all_box = next(box for (flags, _, box) in folders if '\All' in flags)
except StopIteration:
raise Error('all message box not found')
logging.debug('All message box is {}'.format(all_box))
conn.select_folder(all_box)
# Once authenticated everything from the impalib.IMAP4_SSL class will
# work as per usual without any modification to your code.
# typ, msgnums = conn.search('X-GM-RAW vget')
tid = int('14095f27c538b207', 16)
# msgs = conn.search('X-GM-THRID {}'.format(tid))
msgs = conn.search('X-GM-RAW uniquetokenXXX')
print msgs
# print conn.fetch(msgs, 'X-GM-MSGID')
# print conn.fetch(msgs, 'RFC822')
# conn.select('INBOX')
# print conn.list()
def connect(self, mailserver, port="993"):
self.mailserver = mailserver
self.port = port
try:
self.srv = imaplib.IMAP4_SSL(self.mailserver, self.port)
except:
self.srv = None
pass
#-----------------------------------------------------------------------------
# POP3 subclass of Pillager Class
#-----------------------------------------------------------------------------
def __init__(self, host, port, certfile=None, keyfile=None, timeout=None, ca_certs=None):
self._timeout = timeout
self.ca_certs = ca_certs
imaplib.IMAP4_SSL.__init__(self, host, port, certfile, keyfile)
def __init__(self, host, port, certfile=None, keyfile=None, timeout=None, ca_certs=None):
self._timeout = timeout
self.ca_certs = ca_certs
imaplib.IMAP4_SSL.__init__(self, host, port, certfile, keyfile)
def _connect_imap(self):
print("CONNECTION ...")
self._mail_server = imaplib.IMAP4_SSL(self._server)
self._mail_server.login(self._user, self._password)
def compute_state(self, key):
if not key.username or not key.password:
self.warn('Username and password are not configured')
return None
if key.use_ssl:
mail = IMAP4_SSL(key.server, key.port)
else:
mail = IMAP4(key.server, key.port)
mail.login(key.username, key.password)
rc, message = mail.status(key.folder, '(UNSEEN)')
unread_str = message[0].decode('utf-8')
unread_count = int(re.search('UNSEEN (\d+)', unread_str).group(1))
return unread_count
def __init__(self, target):
# Target comes as protocol://target:port
self.target = target
proto, host, port = target.split(':')
host = host[2:]
if int(port) == 993 or proto.upper() == 'IMAPS':
self.session = imaplib.IMAP4_SSL(host,int(port))
else:
#assume non-ssl IMAP
self.session = imaplib.IMAP4(host,port)
if 'AUTH=NTLM' not in self.session.capabilities:
logging.error('IMAP server does not support NTLM authentication!')
return False
self.authtag = self.session._new_tag()
self.lastresult = None
def authenticate(self, url, consumer, token):
if consumer is not None and not isinstance(consumer, oauth2.Consumer):
raise ValueError("Invalid consumer.")
if token is not None and not isinstance(token, oauth2.Token):
raise ValueError("Invalid token.")
imaplib.IMAP4_SSL.authenticate(self, 'XOAUTH',
lambda x: oauth2.build_xoauth_string(url, consumer, token))
def authenticate(self, url, consumer, token):
if consumer is not None and not isinstance(consumer, oauth2.Consumer):
raise ValueError("Invalid consumer.")
if token is not None and not isinstance(token, oauth2.Token):
raise ValueError("Invalid token.")
imaplib.IMAP4_SSL.authenticate(self, 'XOAUTH',
lambda x: oauth2.build_xoauth_string(url, consumer, token))
5_5_check_remote_email_via_imap.py 文件源码
项目:Python-Network-Programming-Cookbook-Second-Edition
作者: PacktPublishing
项目源码
文件源码
阅读 17
收藏 0
点赞 0
评论 0
def check_email(username):
mailbox = imaplib.IMAP4_SSL(GOOGLE_IMAP_SERVER, '993')
password = getpass.getpass(prompt="Enter your Google password: ")
mailbox.login(username, password)
mailbox.select('Inbox')
typ, data = mailbox.search(None, 'ALL')
for num in data[0].split():
typ, data = mailbox.fetch(num, '(RFC822)')
print ('Message %s\n%s\n' % (num, data[0][1]))
break
mailbox.close()
mailbox.logout()
def do_init(self):
try:
self.conn = imaplib.IMAP4_SSL('imap.gmail.com')
self.conn.debug = 0
self.conn.login(self.gmail_address, self.password)
except:
response = ("Either your credentials are wrong mate, or there is some problem going on, do me a favor, I know "
"you won't but whatever, just inform me in the forums.")
print(response)
return response
def read_message(assistant, instance_vlc, player_vlc, username, password, sender_of_interest=None):
imap = imaplib.IMAP4_SSL("imap.gmail.com", 993)
imap.login(username, password)
imap.select('INBOX')
if sender_of_interest is None:
status, response = imap.search(None, '(UNSEEN)')
unread_msg_nums = response[0].split()
else:
status, response = imap.search(None, '(UNSEEN)', '(FROM "%s")' % (sender_of_interest))
unread_msg_nums = response[0].split()
if unread_msg_nums:
assistant.speak("Very good! Please wait, my assistant will read your messages!")
else:
assistant.speak("You do not have new messages!")
for e_id in unread_msg_nums:
_, header = imap.fetch(e_id, '(UID BODY[HEADER])')
header = header[0][-1]
header = "".join(header.split("\r"))
header = [h for h in header.split("\n") if h != ""]
subject = header[-1]
sender = header[-3].split("@")[0]
_, text = imap.fetch(e_id, '(UID BODY[1])')
text = text[0][1]
text = "Content :"+text
message = sender + ". " + subject + ". " + text
read_nicely_text(message, instance_vlc, player_vlc)
# Mark them as seen
for e_id in unread_msg_nums:
imap.store(e_id, '+FLAGS', '\Seen')