python类EmailMessage()的实例源码

mhtml.py 文件源码 项目:chrome-prerender 作者: bosondata 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def add(self, location: str, content_type: str, payload: str, encoding: str = 'quoted-printable') -> None:
        resource = EmailMessage()
        if content_type == 'text/html':
            resource.add_header('Content-Type', 'text/html', charset='utf-8')
        else:
            resource['Content-Type'] = content_type
        if encoding == 'quoted-printable':
            resource['Content-Transfer-Encoding'] = encoding
            resource.set_payload(quopri.encodestring(payload.encode()))
        elif encoding == 'base64':
            resource['Content-Transfer-Encoding'] = encoding
            resource.set_payload(base64.b64encode(payload))
        elif encoding == 'base64-encoded':  # Already base64 encoded
            resource['Content-Transfer-Encoding'] = 'base64'
            resource.set_payload(payload)
        else:
            raise ValueError('invalid encoding')
        resource['Content-Location'] = location
        self._msg.attach(resource)
mail_shooter.py 文件源码 项目:DualisWatcher 作者: LucaVazz 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def send(self, target: str, subject: str, html_content_with_cids: str, inline_png_cids_filenames: {str : str}):
        msg = EmailMessage()
        msg['Subject'] = subject
        msg['From'] = self.sender
        msg['To'] = target

        msg.set_content('')

        msg.add_alternative(
            html_content_with_cids, subtype='html'
        )

        for png_cid in inline_png_cids_filenames:
            full_path_to_png = os.path.abspath(os.path.join(
                os.path.dirname(__file__), inline_png_cids_filenames[png_cid]
            ))
            with open(full_path_to_png, 'rb') as png_file:
                file_contents = png_file.read()
                msg.get_payload()[1].add_related(file_contents, 'image', 'png', cid=png_cid)

        with smtplib.SMTP(self.smtp_server_host, self.smtp_server_port) as smtp_connection:
            smtp_connection.starttls()
            smtp_connection.login(self.username, self.password)
            smtp_connection.send_message(msg)
__init__.py 文件源码 项目:aioweb 作者: kreopt 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def send_mail(app,
                    sender='%s service <%s>' % (settings.BRAND, settings.SERVER_EMAIL),
                    recipients=tuple(),
                    subject='',
                    body=''):
    if not len(recipients):
        return
    async with app.smtp as conn:

        msg = EmailMessage()

        msg['Subject'] = subject
        msg['From'] = sender
        msg['To'] = ', '.join(recipients)

        msg.set_content(body)

        return await conn.send_message(msg, sender, recipients, timeout=5)
feedparser.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __init__(self, _factory=None, *, policy=compat32):
        """_factory is called with no arguments to create a new message obj

        The policy keyword specifies a policy object that controls a number of
        aspects of the parser's operation.  The default policy maintains
        backward compatibility.

        """
        self.policy = policy
        self._factory_kwds = lambda: {'policy': self.policy}
        if _factory is None:
            # What this should be:
            #self._factory = policy.default_message_factory
            # but, because we are post 3.4 feature freeze, fix with temp hack:
            if self.policy is compat32:
                self._factory = message.Message
            else:
                self._factory = message.EmailMessage
        else:
            self._factory = _factory
            try:
                _factory(policy=self.policy)
            except TypeError:
                # Assume this is an old-style factory
                self._factory_kwds = lambda: {}
        self._input = BufferedSubFile()
        self._msgstack = []
        self._parse = self._parsegen().__next__
        self._cur = None
        self._last = None
        self._headersonly = False

    # Non-public interface for supporting Parser's headersonly flag
mhtml.py 文件源码 项目:chrome-prerender 作者: bosondata 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __init__(self):
        self._msg = EmailMessage()
        self._msg['MIME-Version'] = '1.0'
        self._msg.add_header('Content-Type', 'multipart/related', type='text/html')
help_post.py 文件源码 项目:netsocadmin2 作者: UCCNetworkingSociety 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def send_help_email(username:str, user_email:str, subject:str, message:str) -> bool:
    """
    Sends an email to the netsoc email address containing the help data, 
    CC'ing all the SysAdmins and the user requesting help.
    This enables us to reply to the email directly instead of copypasting the
    from address and disconnecting history.

    :param username the user requesting help
    :param user_email the user's email address
    :param subject the subject of the user's help requests
    :param message the user's actual message
    """
    message_body = \
    """
From: %s\n
Email: %s

%s

PS: Please "Reply All" to the emails so that you get a quicker response."""%(
        username, user_email, message)

    msg = EmailMessage()
    msg.set_content(message_body)
    msg["From"] = p.NETSOC_ADMIN_EMAIL_ADDRESS
    msg["To"] = p.NETSOC_EMAIL_ADDRESS
    msg["Subject"] = "[Netsoc Help] " + subject
    msg["Cc"] = tuple(p.SYSADMIN_EMAILS + [user_email])
    try:
        with smtplib.SMTP("smtp.sendgrid.net", 587) as s:
            s.login(p.SENDGRID_USERNAME, p.SENDGRID_PASSWORD)
            s.send_message(msg)
    except:
        return False
    return True
help_post.py 文件源码 项目:netsocadmin2 作者: UCCNetworkingSociety 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def send_sudo_request_email(username:str, user_email:str):
    """
    Sends an email notifying SysAdmins that a user has requested an account on feynman.

    :param username the server username of the user who made the request.
    :param user_email the email address of that user to contact them for vetting.
    """
    message_body = \
    """
Hi {username},

Thank you for making a request for an account with sudo privileges on feynman.netsoc.co.

We will be in touch shortly. 

Best,

The UCC Netsoc SysAdmin Team.

PS: Please "Reply All" to the emails so that you get a quicker response.

""".format(username=username)

    msg = EmailMessage()
    msg.set_content(message_body)
    msg["From"] = p.NETSOC_ADMIN_EMAIL_ADDRESS
    msg["To"] = p.NETSOC_EMAIL_ADDRESS
    msg["Subject"] = "[Netsoc Help] Sudo request on Feynman for {user}".format(
        user=username)
    msg["Cc"] = tuple(p.SYSADMIN_EMAILS + [user_email])

    with smtplib.SMTP("smtp.sendgrid.net", 587) as s:
        s.login(p.SENDGRID_USERNAME, p.SENDGRID_PASSWORD)
        s.send_message(msg)
views.py 文件源码 项目:WebGames 作者: Julien00859 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def signup(req):
    if any(map(lambda key: key not in req.json, ["name", "email", "password"])):
        logger.debug(f"Request is {req.json} but some arguments are missing.")
        raise InvalidUsage("Missing argument")

    if not await User.is_free(req.json["name"], req.json["email"]):
        logger.debug(f"Request is {req.json} but name or email is already taken")
        raise InvalidUsage("Username or email already taken")

    guest = Guest(req.json["name"], req.json["email"], User.hashpwd(req.json["password"]))

    chlg = await challenges.create_for(guest)

    logger.info(f"Guest signed up with name: {guest.name} and email: {guest.email}. Challenge generated: {chlg}")
    with open("mails" + sep + "challenge.txt") as mailtext:
        mail = EmailMessage()
        mail.set_content(mailtext.read().format(domain=req.app.config.domain,
                                                scheme="https" if req.app.config.schemessl else "http",
                                                challenge=chlg))
        mail["Subject"] = "WebGames Registration Challenge"
        mail["From"] = req.app.config.smtpuser
        mail["To"] = guest.email

        with SMTP(req.app.config.smtphost, req.app.config.smtpport) as smtp:
            if req.app.config.smtpssl:
                smtp.ehlo()
                smtp.starttls()
                smtp.ehlo()
            else:
                smtp.helo()
            smtp.login(req.app.config.smtpuser, req.app.config.smtppwd)
            smtp.send_message(mail)

    return text(f"Challenge sent to {guest.email}")
handlers.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def emit(self, record):
        """
        Emit a record.

        Format the record and send it to the specified addressees.
        """
        try:
            import smtplib
            from email.message import EmailMessage
            import email.utils

            port = self.mailport
            if not port:
                port = smtplib.SMTP_PORT
            smtp = smtplib.SMTP(self.mailhost, port, timeout=self.timeout)
            msg = EmailMessage()
            msg['From'] = self.fromaddr
            msg['To'] = ','.join(self.toaddrs)
            msg['Subject'] = self.getSubject(record)
            msg['Date'] = email.utils.localtime()
            msg.set_content(self.format(record))
            if self.username:
                if self.secure is not None:
                    smtp.ehlo()
                    smtp.starttls(*self.secure)
                    smtp.ehlo()
                smtp.login(self.username, self.password)
            smtp.send_message(msg)
            smtp.quit()
        except Exception:
            self.handleError(record)
email_tasks.py 文件源码 项目:pillar 作者: armadillica 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def send_email(self: celery.Task, to_name: str, to_addr: str, subject: str, text: str, html: str):
    """Send an email to a single address."""
    # WARNING: when changing the signature of this function, also change the
    # self.retry() call below.
    cfg = current_app.config

    # Construct the message
    msg = EmailMessage()
    msg['Subject'] = subject
    msg['From'] = Address(cfg['MAIL_DEFAULT_FROM_NAME'], addr_spec=cfg['MAIL_DEFAULT_FROM_ADDR'])
    msg['To'] = (Address(to_name, addr_spec=to_addr),)
    msg.set_content(text)
    msg.add_alternative(html, subtype='html')

    # Refuse to send mail when we're testing.
    if cfg['TESTING']:
        log.warning('not sending mail to %s <%s> because we are TESTING', to_name, to_addr)
        return
    log.info('sending email to %s <%s>', to_name, to_addr)

    # Send the message via local SMTP server.
    try:
        with smtplib.SMTP(cfg['SMTP_HOST'], cfg['SMTP_PORT'], timeout=cfg['SMTP_TIMEOUT']) as smtp:
            if cfg.get('SMTP_USERNAME') and cfg.get('SMTP_PASSWORD'):
                smtp.login(cfg['SMTP_USERNAME'], cfg['SMTP_PASSWORD'])
            smtp.send_message(msg)
    except (IOError, OSError) as ex:
        log.exception('error sending email to %s <%s>, will retry later: %s',
                      to_name, to_addr, ex)
        self.retry((to_name, to_addr, subject, text, html), countdown=cfg['MAIL_RETRY'])
    else:
        log.info('mail to %s <%s> successfully sent', to_name, to_addr)
feedparser.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, _factory=None, *, policy=compat32):
        """_factory is called with no arguments to create a new message obj

        The policy keyword specifies a policy object that controls a number of
        aspects of the parser's operation.  The default policy maintains
        backward compatibility.

        """
        self.policy = policy
        self._factory_kwds = lambda: {'policy': self.policy}
        if _factory is None:
            # What this should be:
            #self._factory = policy.default_message_factory
            # but, because we are post 3.4 feature freeze, fix with temp hack:
            if self.policy is compat32:
                self._factory = message.Message
            else:
                self._factory = message.EmailMessage
        else:
            self._factory = _factory
            try:
                _factory(policy=self.policy)
            except TypeError:
                # Assume this is an old-style factory
                self._factory_kwds = lambda: {}
        self._input = BufferedSubFile()
        self._msgstack = []
        self._parse = self._parsegen().__next__
        self._cur = None
        self._last = None
        self._headersonly = False

    # Non-public interface for supporting Parser's headersonly flag


问题


面经


文章

微信
公众号

扫码关注公众号