python类IMAP4_SSL的实例源码

test.py 文件源码 项目:Personal_AI_Assistant 作者: PratylenClub 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def read(username, password, sender_of_interest):
    # Login to INBOX
    imap = imaplib.IMAP4_SSL("imap.gmail.com", 993)
    imap.login(username, password)
    imap.select('INBOX')

    status, response = imap.search(None, '(UNSEEN)')
    unread_msg_nums = response[0].split()

    # Print the count of all unread messages
    print len(unread_msg_nums)

    # Print all unread messages from a certain sender of interest
    status, response = imap.search(None, '(UNSEEN)', '(FROM "%s")' % (sender_of_interest))
    unread_msg_nums = response[0].split()
    da = []
    for e_id in unread_msg_nums:
        _, response = imap.fetch(e_id, '(UID BODY[TEXT])')
        da.append(response[0][1])
    print da

    # Mark them as seen
    for e_id in unread_msg_nums:
        imap.store(e_id, '+FLAGS', '\Seen')
PySMS.py 文件源码 项目:PySMS 作者: clayshieh 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def init_server(self):
        self.logger.info("Initializing SMTP/IMAP servers.")
        # PySMS at minimum uses smtp server
        try:
            if self.ssl:
                self.smtp = smtplib.SMTP_SSL(self.smtp_server, self.smtp_port)
            else:
                self.smtp = smtplib.SMTP(self.smtp_server, self.smtp_port)
                self.smtp.starttls()
            self.smtp.login(self.address, self.password)
        except smtplib.SMTPException:
            raise PySMSException("Unable to start smtp server, please check credentials.")

        # If responding functionality is enabled
        if self.imap_server:
            try:
                if self.ssl:
                    self.imap = imaplib.IMAP4_SSL(self.imap_server)
                else:
                    self.imap = imaplib.IMAP4(self.imap_server)
                r, data = self.imap.login(self.address, self.password)
                if r == "OK":
                    r, data = self.imap.select(self.imap_mailbox)
                    if r != "OK":
                        raise PySMSException("Unable to select mailbox: {0}".format(self.imap_mailbox))
                else:
                    raise PySMSException("Unable to login to IMAP server with given credentials.")
            except imaplib.IMAP4.error:
                raise PySMSException("Unable to start IMAP server, please check address and SSL/TLS settings.")
cli.py 文件源码 项目:sndlatr 作者: Schibum 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_imap_old(user):
    storage = Storage('credentials_file')
    credentials = storage.get()
    xoauth = xoauth2_str(user, credentials.access_token)
    conn = imaplib.IMAP4_SSL('imap.googlemail.com')
    conn.debug = 4

    conn.authenticate('XOAUTH2', lambda x: xoauth)

    status, labels = conn.list()

    conn.select("[Gmail]/All Mail")
    # Once authenticated everything from the impalib.IMAP4_SSL class will
    # work as per usual without any modification to your code.
    typ, msgnums = conn.search(None, 'X-GM-RAW', 'vget')

    print 'typ', typ
    print 'num', msgnums
    # conn.select('INBOX')
    # print conn.list()
imaplib_connect.py 文件源码 项目:pymotw3 作者: reingart 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def open_connection(verbose=False):
    # Read the config file
    config = configparser.ConfigParser()
    config.read([os.path.expanduser('~/.pymotw')])

    # Connect to the server
    hostname = config.get('server', 'hostname')
    if verbose:
        print('Connecting to', hostname)
    connection = imaplib.IMAP4_SSL(hostname)

    # Login to our account
    username = config.get('account', 'username')
    password = config.get('account', 'password')
    if verbose:
        print('Logging in as', username)
    connection.login(username, password)
    return connection
check_for_articles_from_email.py 文件源码 项目:prestashop-sync 作者: dragoon 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def connect(self):
        """Connects to and authenticates with an IMAP4 mail server"""

        import imaplib

        M = None
        try:
            if (self.keyfile and self.certfile) or self.ssl:
                M = imaplib.IMAP4_SSL(self.host, self.port, self.keyfile, self.certfile)
            else:
                M = imaplib.IMAP4(self.host, self.port)

            M.login(self.username, self.password)
            M.select()
        except socket.error, err:
            raise
        else:
            return M
main.py 文件源码 项目:imap_tools 作者: ikvk 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def __init__(self, host='', port=None, ssl=True, keyfile=None, certfile=None, ssl_context=None):
        """
        :param host: host's name (default: localhost)
        :param port: port number (default: standard IMAP4 SSL port)
        :param ssl: use client class over SSL connection (IMAP4_SSL) if True, else use IMAP4
        :param keyfile: PEM formatted file that contains your private key (default: None)
        :param certfile: PEM formatted certificate chain file (default: None)
        :param ssl_context: SSLContext object that contains your certificate chain and private key (default: None)
        Note: if ssl_context is provided, then parameters keyfile or
              certfile should not be set otherwise ValueError is raised.
        """
        self._host = host
        self._port = port
        self._keyfile = keyfile
        self._certfile = certfile
        self._ssl_context = ssl_context
        if ssl:
            self.box = imaplib.IMAP4_SSL(
                host, port or imaplib.IMAP4_SSL_PORT, keyfile, certfile, ssl_context)
        else:
            self.box = imaplib.IMAP4(host, port or imaplib.IMAP4_PORT)
        self._username = None
        self._password = None
        self._initial_folder = None
        self.folder = None
emailCheck.py 文件源码 项目:textMe 作者: RParkerE 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def read(hostname, username, password):
    # Get the imap server for the host
    hostname = 'imap.' + hostname.lower()
    # Start SSL connection with the host server
    server = IMAP4_SSL(hostname)
    try:
        # Login to the given account
        server.login(username, password)
        # Select the Inbox and make it read only
        server.select('INBOX', readonly=True)
        # Check how many unread messages are in the inbox
        msgnums = server.search(None, 'UnSeen')
        if(msgnums[0] == 'OK'):
            # List how many unread messages there are [1,2, 3, ...]
            for num in msgnums[1]:
                # Return number of unread messages (The last one in the list msgnums)
                return (num[-1])
        else:
            return ('Failed')
    except IMAP4.error:
        return ('Failed')
gmailv.py 文件源码 项目:Pikaptcha 作者: sriyegna 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def email_verify(plusmail, googlepass):
    time.sleep(5)
    #Waiting 5 seconds before checking email
    email_address = plusmail
    M = imaplib.IMAP4_SSL('imap.gmail.com')    
    try:
        M.login(email_address, googlepass)
        print "Logged in to: " + email_address

        rv, mailboxes = M.list()
        rv, data = M.select("INBOX")
        if rv == 'OK':
            print "Processing mailbox..."
            proc_mail(M)
            M.close()
        M.logout()
    except imaplib.IMAP4.error:
        print "Unable to login to: " + email_address + ". Was not verified\n"
gmail.py 文件源码 项目:DET 作者: sensepost 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def listen():
    app_exfiltrate.log_message('info', "[gmail] Listening for mails...")
    client_imap = imaplib.IMAP4_SSL(server)
    try:
        client_imap.login(gmail_user, gmail_pwd)
    except:
        app_exfiltrate.log_message(
            'warning', "[gmail] Did not manage to authenticate with creds: {}:{}".format(gmail_user, gmail_pwd))
        sys.exit(-1)

    while True:
        client_imap.select("INBOX")
        typ, id_list = client_imap.uid(
            'search', None, "(UNSEEN SUBJECT 'det:toolkit')")
        for msg_id in id_list[0].split():
            msg_data = client_imap.uid('fetch', msg_id, '(RFC822)')
            raw_email = msg_data[1][0][1]
            # continue inside the same for loop as above
            raw_email_string = raw_email.decode('utf-8')
            # converts byte literal to string removing b''
            email_message = email.message_from_string(raw_email_string)
            # this will loop through all the available multiparts in mail
            for part in email_message.walk():
                if part.get_content_type() == "text/plain":  # ignore attachments/html
                    body = part.get_payload(decode=True)
                    data = body.split('\r\n')[0]
                    # print data
                    try:
                        app_exfiltrate.retrieve_data(base64.b64decode(data))
                    except Exception, e:
                        print e
                else:
                    continue
        time.sleep(2)
atr3.py 文件源码 项目:Atlantr 作者: SUP3RIA 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def imap(usr, pw, host):
    socket.setdefaulttimeout(time_out)
    usr = usr.lower()
    try:
        if len(host) < 2:
            port = 993
        else:
            port = int(host[1])
        mail = imaplib.IMAP4_SSL(str(host[0]), port)
        a = str(mail.login(usr, pw))
        return a[2: 4]
    except imaplib.IMAP4.error:
        return False
    except BaseException:
        return "Error"

#/-----------------IMAP-------------------------#


#------GETUNKNOWN--HOST--------------------------#
atr3.py 文件源码 项目:Atlantr 作者: SUP3RIA 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def getunknown_imap(subb):
    socket.setdefaulttimeout(time_out)
    try:
        # TODO: Change to dynamic matchers
        sub = [
            'imap',
            'mail',
            'pop',
            'pop3',
            'imap-mail',
            'inbound',
            'mx',
            'imaps',
            'smtp',
            'm']
        for host in sub:
            host = host + '.' + subb
            try:
                mail = imaplib.IMAP4_SSL(str(host))
                mail.login('test', 'test')
            except imaplib.IMAP4.error:
                return host
    except BaseException:
        return None
requester.py 文件源码 项目:pnu 作者: erasaur 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def reconnect(self):
        auth_attempts = 0

        while auth_attempts <= constants.MAX_RECONNECT_RETRIES:
            auth_attempts += 1

            try:
                self.mail = imaplib.IMAP4_SSL(pub_config['gmail']['imap'])
                u, data = self.mail.login(private_config['gmail']['username'],
                                          private_config['gmail']['password'])
                return
            except imaplib.IMAP4.error as e:
                logging.error("Login to retrieve emails failed!")
                logging.error(e)

            except Exception as e:
                logging.error("Error from IMAP! Trying to reconnect...")
                logging.error(e)

            time.sleep(constants.SMTP_RECONNECT_SLEEP_TIME)

        if auth_attempts > constants.MAX_RECONNECT_RETRIES:
            from pnu.outbound.alerter import smtp
            logging.critical("Could not connect to IMAP handler")
            smtp.send_error("Could not connect to IMAP handler")
gmail.py 文件源码 项目:qxf2-page-object-model 作者: qxf2 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def connect(self, raise_errors=True):
        # try:
        #     self.imap = imaplib.IMAP4_SSL(self.GMAIL_IMAP_HOST, self.GMAIL_IMAP_PORT)
        # except socket.error:
        #     if raise_errors:
        #         raise Exception('Connection failure.')
        #     self.imap = None

        self.imap = imaplib.IMAP4_SSL(self.GMAIL_IMAP_HOST, self.GMAIL_IMAP_PORT)

        # self.smtp = smtplib.SMTP(self.server,self.port)
        # self.smtp.set_debuglevel(self.debug)
        # self.smtp.ehlo()
        # self.smtp.starttls()
        # self.smtp.ehlo()

        return self.imap
diffs-20161024.py 文件源码 项目:Positions 作者: nikoulis 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def readMail(self, subject):
        connection = imaplib.IMAP4_SSL('imap.mail.yahoo.com')
        connection.login('pnikoulis', 'ai024709th3669')
        connection.select(readonly=True)
        typ, data = connection.search(None, 'SUBJECT', subject)
        ids = data[0].split()
        id = ids[0]
        typ, data = connection.fetch(id, '(RFC822)')
        message = email.message_from_string(data[0][1])
        print '------------------------------------------> Message', id
        print 'Date:', message['Date']
        print 'From:', message['From']
        print 'Subject:', message['Subject']
        self.positions = getPositionsFromMessage(message)

    #------------------------------------------------------------------------
    # Check if a position is in portfolio (by checking symbol and exit date)
    #------------------------------------------------------------------------
diffs.py 文件源码 项目:Positions 作者: nikoulis 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def readMail(self, subject):
        connection = imaplib.IMAP4_SSL('imap.mail.yahoo.com')
        connection.login('pnikoulis', 'ai024709th3669')
        connection.select(readonly=True)
        typ, data = connection.search(None, 'SUBJECT', subject)
        ids = data[0].split()
        id = ids[0]
        typ, data = connection.fetch(id, '(RFC822)')
        message = email.message_from_string(data[0][1])
        print '------------------------------------------> Message', id
        print 'Date:', message['Date']
        print 'From:', message['From']
        print 'Subject:', message['Subject']
        self.positions = getPositionsFromMessage(message)

    #------------------------------------------------------------------------
    # Check if a position is in portfolio (by checking symbol and exit date)
    #------------------------------------------------------------------------
fetchmail_server.py 文件源码 项目:vieterp-mailbox 作者: vieterp 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def connect(self):
        if self.source_id and self.source_id.id:
            self.ensure_one()
            if self.source_id.type == 'imap':
                if self.source_id.is_ssl:
                    connection = IMAP4_SSL(self.source_id.server, int(self.source_id.port))
                else:
                    connection = IMAP4(self.source_id.server, int(self.source_id.port))
                connection.login(self.user, self.password)
            elif self.type == 'pop':
                if self.source_id.is_ssl:
                    connection = POP3_SSL(self.source_id.server, int(self.source_id.port))
                else:
                    connection = POP3(self.source_id.server, int(self.source_id.port))
                # TODO: use this to remove only unread messages
                # connection.user("recent:"+server.user)
                connection.user(self.user)
                connection.pass_(self.password)
            # Add timeout on socket
            connection.sock.settimeout(MAIL_TIMEOUT)
            return connection
        return super(vieterp_fetchmail_server, self).connect()
bgs_mail.py 文件源码 项目:Python 作者: Gitcha-Beginners 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def read(self):
        try:
            # connect to server
            server = imaplib.IMAP4_SSL('imap.' + self.ID[self.ID.index('@') + 1:])

            # login
            server.login(self.ID, self.PASSWORD)
            server.list()
            server.select()

            typ, data = server.search(None, 'ALL')
            for num in data[0].split():
                typ, data = server.fetch(num, '(RFC822)')
                print('Message %s\n%s\n' % (num, data[0][1]))

            server.logout()
        except Exception as e:
            print("Reading is failed : {}".format(e.__str__()))
recipe-578203.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def login(self, username, password):
        self.M = imaplib.IMAP4_SSL(self.IMAP_SERVER, self.IMAP_PORT)
        self.S = smtplib.SMTP_SSL(self.SMTP_SERVER, self.SMTP_PORT)
        rc, self.response = self.M.login(username, password)
        sc, self.response_s = self.S.login(username, password)
        self.username = username
        return rc, sc
cli.py 文件源码 项目:sndlatr 作者: Schibum 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_imap(user):
    credentials = get_credentials()
    conn = IMAPClient('imap.googlemail.com', use_uid=True, ssl=True)
    # conn.debug = 4

    conn.oauth2_login(user, credentials.access_token)

    # status, labels = conn.list()
    folders = conn.list_folders()
    try:
        all_box = next(box for (flags, _, box) in folders if '\All' in flags)
    except StopIteration:
        raise Error('all message box not found')

    logging.debug('All message box is {}'.format(all_box))

    conn.select_folder(all_box)
    # Once authenticated everything from the impalib.IMAP4_SSL class will
    # work as per usual without any modification to your code.
    # typ, msgnums = conn.search('X-GM-RAW vget')
    tid = int('14095f27c538b207', 16)
    # msgs = conn.search('X-GM-THRID {}'.format(tid))
    msgs = conn.search('X-GM-RAW uniquetokenXXX')

    print msgs
    # print conn.fetch(msgs, 'X-GM-MSGID')
    # print conn.fetch(msgs, 'RFC822')
    # conn.select('INBOX')
    # print conn.list()
mailpillager.py 文件源码 项目:SPF 作者: Exploit-install 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def connect(self, mailserver, port="993"):
        self.mailserver = mailserver
        self.port = port
        try:
            self.srv = imaplib.IMAP4_SSL(self.mailserver, self.port)
        except:
            self.srv = None
            pass

#-----------------------------------------------------------------------------
# POP3 subclass of Pillager Class
#-----------------------------------------------------------------------------
imapbot.py 文件源码 项目:abusehelper 作者: Exploit-install 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __init__(self, host, port, certfile=None, keyfile=None, timeout=None, ca_certs=None):
        self._timeout = timeout
        self.ca_certs = ca_certs

        imaplib.IMAP4_SSL.__init__(self, host, port, certfile, keyfile)
imapbot.py 文件源码 项目:abusehelper 作者: Exploit-install 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def __init__(self, host, port, certfile=None, keyfile=None, timeout=None, ca_certs=None):
        self._timeout = timeout
        self.ca_certs = ca_certs

        imaplib.IMAP4_SSL.__init__(self, host, port, certfile, keyfile)
mail.py 文件源码 项目:lama 作者: CSE-POST 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _connect_imap(self):
        print("CONNECTION ...")
        self._mail_server = imaplib.IMAP4_SSL(self._server)
        self._mail_server.login(self._user, self._password)
mail.py 文件源码 项目:centos-base-consul 作者: zeroc0d3lab 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def compute_state(self, key):
        if not key.username or not key.password:
            self.warn('Username and password are not configured')
            return None
        if key.use_ssl:
            mail = IMAP4_SSL(key.server, key.port)
        else:
            mail = IMAP4(key.server, key.port)
        mail.login(key.username, key.password)
        rc, message = mail.status(key.folder, '(UNSEEN)')
        unread_str = message[0].decode('utf-8')
        unread_count = int(re.search('UNSEEN (\d+)', unread_str).group(1))
        return unread_count
imaprelayclient.py 文件源码 项目:PiBunny 作者: tholum 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def __init__(self, target):
        # Target comes as protocol://target:port
        self.target = target
        proto, host, port = target.split(':')
        host = host[2:]
        if int(port) == 993 or proto.upper() == 'IMAPS':
            self.session = imaplib.IMAP4_SSL(host,int(port))
        else:
            #assume non-ssl IMAP
            self.session = imaplib.IMAP4(host,port)
        if 'AUTH=NTLM' not in self.session.capabilities:
            logging.error('IMAP server does not support NTLM authentication!')
            return False
        self.authtag = self.session._new_tag()
        self.lastresult = None
imap.py 文件源码 项目:Taigabot 作者: FrozenPigs 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def authenticate(self, url, consumer, token):
        if consumer is not None and not isinstance(consumer, oauth2.Consumer):
            raise ValueError("Invalid consumer.")

        if token is not None and not isinstance(token, oauth2.Token):
            raise ValueError("Invalid token.")

        imaplib.IMAP4_SSL.authenticate(self, 'XOAUTH',
            lambda x: oauth2.build_xoauth_string(url, consumer, token))
imap.py 文件源码 项目:Fetch 作者: bourdakos1 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def authenticate(self, url, consumer, token):
        if consumer is not None and not isinstance(consumer, oauth2.Consumer):
            raise ValueError("Invalid consumer.")

        if token is not None and not isinstance(token, oauth2.Token):
            raise ValueError("Invalid token.")

        imaplib.IMAP4_SSL.authenticate(self, 'XOAUTH',
            lambda x: oauth2.build_xoauth_string(url, consumer, token))
5_5_check_remote_email_via_imap.py 文件源码 项目:Python-Network-Programming-Cookbook-Second-Edition 作者: PacktPublishing 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def check_email(username): 
    mailbox = imaplib.IMAP4_SSL(GOOGLE_IMAP_SERVER, '993') 
    password = getpass.getpass(prompt="Enter your Google password: ") 
    mailbox.login(username, password)
    mailbox.select('Inbox')
    typ, data = mailbox.search(None, 'ALL')
    for num in data[0].split():
        typ, data = mailbox.fetch(num, '(RFC822)')
        print ('Message %s\n%s\n' % (num, data[0][1]))
        break
    mailbox.close()
    mailbox.logout()
gmail_module.py 文件源码 项目:stephanie-va 作者: SlapBot 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def do_init(self):
        try:
            self.conn = imaplib.IMAP4_SSL('imap.gmail.com')
            self.conn.debug = 0
            self.conn.login(self.gmail_address, self.password)
        except:
            response = ("Either your credentials are wrong mate, or there is some problem going on, do me a favor, I know "
                        "you won't but whatever, just inform me in the forums.")
            print(response)
            return response
Functions.py 文件源码 项目:Personal_AI_Assistant 作者: PratylenClub 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def read_message(assistant, instance_vlc, player_vlc, username, password, sender_of_interest=None):
    imap = imaplib.IMAP4_SSL("imap.gmail.com", 993)
    imap.login(username, password)
    imap.select('INBOX')
    if sender_of_interest is None:
        status, response = imap.search(None, '(UNSEEN)')
        unread_msg_nums = response[0].split()
    else:
        status, response = imap.search(None, '(UNSEEN)', '(FROM "%s")' % (sender_of_interest))
        unread_msg_nums = response[0].split()
    if unread_msg_nums:
        assistant.speak("Very good! Please wait, my assistant will read your messages!")
    else:
        assistant.speak("You do not have new messages!")
    for e_id in unread_msg_nums:
        _, header = imap.fetch(e_id, '(UID BODY[HEADER])')
        header = header[0][-1]
        header = "".join(header.split("\r"))
        header = [h for h in header.split("\n") if h != ""]
        subject = header[-1]
        sender = header[-3].split("@")[0]
        _, text = imap.fetch(e_id, '(UID BODY[1])')
        text = text[0][1]
        text = "Content :"+text
        message = sender + ". " + subject + ". " + text
        read_nicely_text(message, instance_vlc, player_vlc)
    # Mark them as seen
    for e_id in unread_msg_nums:
        imap.store(e_id, '+FLAGS', '\Seen')


问题


面经


文章

微信
公众号

扫码关注公众号