def __init__(self, account, auth_code, name='', **config):
account_name, server_name = account.split('@')
self.smtp = 'smtp.' + server_name
self.imap = 'imap.' + server_name
self.server_name = server_name
self.smtp_port = 0
self.imap_port = 0
self.use_ssl = True
self.__dict__.update(SERVER_LIB.get(server_name, {}))
self.__dict__.update(config)
self.name = '%s <%s>' % (name or account_name, account)
self.account = account
self.auth_code = auth_code
st_SMTP = smtplib.SMTP_SSL if self.use_ssl else smtplib.SMTP
st_IMAP = imaplib.IMAP4_SSL if self.use_ssl else imaplib.IMAP4
if self.smtp_port:
self.st_SMTP = lambda : st_SMTP(self.smtp, self.smtp_port)
else:
self.st_SMTP = lambda : st_SMTP(self.smtp)
if self.imap_port:
self.st_IMAP = lambda : st_IMAP(self.imap, self.imap_port)
else:
self.st_IMAP = lambda : st_IMAP(self.imap)
self.SMTP = lambda : SMTP(self)
self.IMAP = lambda : IMAP(self)
python类IMAP4_SSL的实例源码
def __init__(self):
self.c = imaplib.IMAP4_SSL(server)
self.c.login(gmail_user, gmail_pwd)
def __init__(self, target):
# Target comes as protocol://target:port
self.target = target
proto, host, port = target.split(':')
host = host[2:]
if int(port) == 993 or proto.upper() == 'IMAPS':
self.session = imaplib.IMAP4_SSL(host,int(port))
else:
#assume non-ssl IMAP
self.session = imaplib.IMAP4(host,port)
if 'AUTH=NTLM' not in self.session.capabilities:
logging.error('IMAP server does not support NTLM authentication!')
return False
self.authtag = self.session._new_tag()
self.lastresult = None
def fetch_unread_emails(profile, since=None, mark_read=False, limit=None):
"""
Fetches a list of unread email objects from a user's Gmail inbox.
Arguments:
profile -- contains information related to the user (e.g., Gmail
address)
since -- if provided, no emails before this date will be returned
markRead -- if True, marks all returned emails as read in target inbox
Returns:
A list of unread email objects.
"""
conn = imaplib.IMAP4_SSL('imap.gmail.com')
conn.debug = 0
conn.login(profile['gmail_address'], profile['gmail_password'])
conn.select(readonly=(not mark_read))
msgs = []
(retcode, messages) = conn.search(None, '(UNSEEN)')
if retcode == 'OK' and messages != ['']:
numUnread = len(messages[0].split(' '))
if limit and numUnread > limit:
return numUnread
for num in messages[0].split(' '):
# parse email RFC822 format
ret, data = conn.fetch(num, '(RFC822)')
msg = email.message_from_string(data[0][1])
if not since or get_date(msg) > since:
msgs.append(msg)
conn.close()
conn.logout()
return msgs
def __init__(self, *args, **kwargs): # {{{2
if settings.IMAP_SSL:
self.mailbox = IMAP4_SSL(
host = settings.IMAP_SERVER, port = settings.IMAP_PORT)
else:
self.mailbox = IMAP4( settings.IMAP_SERVER )
self.mailbox.login( settings.IMAP_LOGIN, settings.IMAP_PASSWD )
self.mailbox.select()
super( Command, self ).__init__( *args, **kwargs )
def __init__(self):
"""
Connects to Google's imap mailbox via SSL
"""
self.mailbox = imaplib.IMAP4_SSL('imap.gmail.com')
def authenticate(self, url, consumer, token):
if consumer is not None and not isinstance(consumer, oauth2.Consumer):
raise ValueError("Invalid consumer.")
if token is not None and not isinstance(token, oauth2.Token):
raise ValueError("Invalid token.")
imaplib.IMAP4_SSL.authenticate(self, 'XOAUTH',
lambda x: oauth2.build_xoauth_string(url, consumer, token))
def signIn(verbose=False):
mail = imaplib.IMAP4_SSL('imap.gmail.com')
mail.login(username, password)
if verbose:
print('Signed into ' + username)
return mail
def read(hostname, username, password):
hostname = 'imap.' + hostname
server = IMAP4_SSL(hostname)
try:
server.login(username, password)
server.select('INBOX', readonly=True)
msgnums = server.search(None, 'UnSeen')
if(msgnums[0] == 'OK'):
for num in msgnums[1]:
unread.append(num[-1])
else:
unread.append(0)
except IMAP4.error:
failed = True
def get_mails(mail_address, mail_pass):
mail = imaplib.IMAP4_SSL('mail.bilkent.edu.tr')
mail.login(mail_address, mail_pass)
mail.list()
mail.select("inbox")
result, data = mail.search(None, "ALL")
ids = data[0]
id_list = ids.split()
latest_email_id = id_list[-1]
result, data = mail.fetch(latest_email_id, "(RFC822)")
raw_email = data[0][1]
return raw_email
def authenticate(self, url, consumer, token):
if consumer is not None and not isinstance(consumer, oauth2.Consumer):
raise ValueError("Invalid consumer.")
if token is not None and not isinstance(token, oauth2.Token):
raise ValueError("Invalid token.")
imaplib.IMAP4_SSL.authenticate(self, 'XOAUTH',
lambda x: oauth2.build_xoauth_string(url, consumer, token))
def fetch_unread_emails(self, since=None, markRead=False, limit=None):
"""
Fetches a list of unread email objects from a user's Gmail inbox.
Arguments:
since -- if provided, no emails before this date will be returned
markRead -- if True, marks all returned emails as read in target
inbox
Returns:
A list of unread email objects.
"""
conn = imaplib.IMAP4_SSL('imap.gmail.com')
conn.debug = 0
conn.login(self.profile['gmail_address'],
self.profile['gmail_password'])
conn.select(readonly=(not markRead))
msgs = []
(retcode, messages) = conn.search(None, '(UNSEEN)')
if retcode == 'OK' and messages != ['']:
numUnread = len(messages[0].split(' '))
if limit and numUnread > limit:
return numUnread
for num in messages[0].split(' '):
# parse email RFC822 format
ret, data = conn.fetch(num, '(RFC822)')
msg = email.message_from_string(data[0][1])
if not since or get_date(msg) > since:
msgs.append(msg)
conn.close()
conn.logout()
return msgs
def connect(self):
self._imap = imaplib.IMAP4_SSL(self._address)
return_code, data = self._imap.login(self._username, self._password)
if return_code != "OK":
raise Exception("Error logging in to IMAP as {}: {} {}".format(self._username, return_code, data))
def resetConnection(self):
parsed_url = self.url
try:
imap_server = os.environ['IMAP_SERVER']
except KeyError:
imap_server = parsed_url.hostname
# Try to close the connection cleanly
try:
self.conn.close()
except Exception:
pass
if (parsed_url.scheme == "imap"):
cl = imaplib.IMAP4
self.conn = cl(imap_server, 143)
elif (parsed_url.scheme == "imaps"):
cl = imaplib.IMAP4_SSL
self.conn = cl(imap_server, 993)
log.Debug("Type of imap class: %s" % (cl.__name__))
self.remote_dir = re.sub(r'^/', r'', parsed_url.path, 1)
# Login
if (not(globals.imap_full_address)):
self.conn.login(self.username, self.password)
self.conn.select(globals.imap_mailbox)
log.Info("IMAP connected")
else:
self.conn.login(self.username + "@" + parsed_url.hostname, self.password)
self.conn.select(globals.imap_mailbox)
log.Info("IMAP connected")
def fetchUnreadEmails(profile, since=None, markRead=False, limit=None):
"""
Fetches a list of unread email objects from a user's Gmail inbox.
Arguments:
profile -- contains information related to the user (e.g., Gmail
address)
since -- if provided, no emails before this date will be returned
markRead -- if True, marks all returned emails as read in target inbox
Returns:
A list of unread email objects.
"""
conn = imaplib.IMAP4_SSL('imap.gmail.com')
conn.debug = 0
conn.login(profile['gmail_address'], profile['gmail_password'])
conn.select(readonly=(not markRead))
msgs = []
(retcode, messages) = conn.search(None, '(UNSEEN)')
if retcode == 'OK' and messages != ['']:
numUnread = len(messages[0].split(' '))
if limit and numUnread > limit:
return numUnread
for num in messages[0].split(' '):
# parse email RFC822 format
ret, data = conn.fetch(num, '(RFC822)')
msg = email.message_from_string(data[0][1])
if not since or getDate(msg) > since:
msgs.append(msg)
conn.close()
conn.logout()
return msgs
def connect(self,imap_host):
"Connect with the host"
self.mail = imaplib.IMAP4_SSL(imap_host)
return self.mail
def compute_state(self, key):
if not key.username or not key.password:
self.warn('Username and password are not configured')
return None
if key.use_ssl:
mail = IMAP4_SSL(key.server, key.port)
else:
mail = IMAP4(key.server, key.port)
mail.login(key.username, key.password)
rc, message = mail.status(key.folder, '(UNSEEN)')
unread_str = message[0].decode('utf-8')
unread_count = int(re.search('UNSEEN (\d+)', unread_str).group(1))
return unread_count
def main():
global i
clear()
mail = imaplib.IMAP4_SSL(SMTP_SERVER)
mail.login(FROM_EMAIL, FROM_PWD)
mail.list()
mail.select('inbox')
inp = 'n'
while(inp!='q'):
print('Fetching mails. Please wait...')
if inp == 'n':
i += 1
elif inp == 'p' and i != 1:
i -= 1
else:
print('Please enter valid input.')
result, data = mail.uid('search', None, "ALL") # search and return uids instead
latest_email_uid = data[0].split()[-i] # fetch mails
result, data = mail.uid('fetch', latest_email_uid, '(RFC822)')
raw_email = data[0][1]
email_message = email.message_from_string(raw_email)
clear() # clear screen and print mail
print 'To:', email_message['To']
print 'Sent from:', email_message['From']
print 'Date:', email_message['Date']
print 'Subject:', email_message['Subject']
print '*'*30, 'MESSAGE', '*'*30
maintype = email_message.get_content_maintype()
#print maintype
if maintype == 'multipart': # get the body of the mail
print email_message.get_payload()[0].get_payload() # to get the plain text only
elif maintype == 'text':
line = email_message.get_payload()[ 0 ]
print line
print '*'*69
welcome()
inp = raw_input('>> Enter your choice: ').lower()
Gmail Inbox.py 文件源码
项目:Web-Scraping-and-Python-Codes
作者: dushyantRathore
项目源码
文件源码
阅读 18
收藏 0
点赞 0
评论 0
def readmail():
username = raw_input("Enter username : ")
password = raw_input("Enter password : ")
mail = imaplib.IMAP4_SSL(smtp_server)
mail.login(username, password)
mail.select('inbox')
type, data = mail.search(None, 'ALL')
mail_ids = data[0]
id_list = mail_ids.split()
first_id = int(id_list[0])
last_id = int(id_list[-1])
# print first_id
# print last_id
for i in range(last_id, first_id, -1):
typ, data = mail.fetch(i, '(RFC822)')
for response in data:
if isinstance(response, tuple):
msg = email.message_from_string(response[1])
email_subject = msg['subject']
email_from = msg['from']
print 'From : ' + email_from + '\n'
print 'Subject : ' + email_subject + '\n'
def connect(self):
self.imap_conn = imaplib.IMAP4_SSL(self.imap_server)