| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424 |
- #
- # Licensed to the Apache Software Foundation (ASF) under one
- # or more contributor license agreements. See the NOTICE file
- # distributed with this work for additional information
- # regarding copyright ownership. The ASF licenses this file
- # to you under the Apache License, Version 2.0 (the
- # "License"); you may not use this file except in compliance
- # with the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing,
- # software distributed under the License is distributed on an
- # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- # KIND, either express or implied. See the License for the
- # specific language governing permissions and limitations
- # under the License.
- """
- Search in emails for a specific attachment and also to download it.
- It uses the smtplib library that is already integrated in python 3.
- """
- from __future__ import annotations
- import collections.abc
- import os
- import re
- import smtplib
- import ssl
- from collections.abc import Iterable
- from email.mime.application import MIMEApplication
- from email.mime.multipart import MIMEMultipart
- from email.mime.text import MIMEText
- from email.utils import formatdate
- from pathlib import Path
- from typing import TYPE_CHECKING, Any
- from airflow.exceptions import AirflowException, AirflowNotFoundException
- from airflow.hooks.base import BaseHook
- if TYPE_CHECKING:
- from airflow.models.connection import Connection
- class SmtpHook(BaseHook):
- """
- This hook connects to a mail server by using the smtp protocol.
- .. note:: Please call this Hook as context manager via `with`
- to automatically open and close the connection to the mail server.
- :param smtp_conn_id: The :ref:`smtp connection id <howto/connection:smtp>`
- that contains the information used to authenticate the client.
- """
- conn_name_attr = "smtp_conn_id"
- default_conn_name = "smtp_default"
- conn_type = "smtp"
- hook_name = "SMTP"
- def __init__(self, smtp_conn_id: str = default_conn_name) -> None:
- super().__init__()
- self.smtp_conn_id = smtp_conn_id
- self.smtp_connection: Connection | None = None
- self.smtp_client: smtplib.SMTP_SSL | smtplib.SMTP | None = None
- def __enter__(self) -> SmtpHook:
- return self.get_conn()
- def __exit__(self, exc_type, exc_val, exc_tb):
- self.smtp_client.close()
- def get_conn(self) -> SmtpHook:
- """
- Login to the smtp server.
- .. note:: Please call this Hook as context manager via `with`
- to automatically open and close the connection to the smtp server.
- :return: an authorized SmtpHook object.
- """
- if not self.smtp_client:
- try:
- self.smtp_connection = self.get_connection(self.smtp_conn_id)
- except AirflowNotFoundException:
- raise AirflowException("SMTP connection is not found.")
- for attempt in range(1, self.smtp_retry_limit + 1):
- try:
- self.smtp_client = self._build_client()
- except smtplib.SMTPServerDisconnected:
- if attempt == self.smtp_retry_limit:
- raise AirflowException("Unable to connect to smtp server")
- else:
- if self.smtp_starttls:
- self.smtp_client.starttls()
- if self.smtp_user and self.smtp_password:
- self.smtp_client.login(self.smtp_user, self.smtp_password)
- break
- return self
- def _build_client(self) -> smtplib.SMTP_SSL | smtplib.SMTP:
- SMTP: type[smtplib.SMTP_SSL] | type[smtplib.SMTP]
- if self.use_ssl:
- SMTP = smtplib.SMTP_SSL
- else:
- SMTP = smtplib.SMTP
- smtp_kwargs: dict[str, Any] = {"host": self.host}
- if self.port:
- smtp_kwargs["port"] = self.port
- smtp_kwargs["timeout"] = self.timeout
- if self.use_ssl:
- ssl_context_string = self.ssl_context
- if ssl_context_string is None or ssl_context_string == "default":
- ssl_context = ssl.create_default_context()
- elif ssl_context_string == "none":
- ssl_context = None
- else:
- raise RuntimeError(
- f"The connection extra field `ssl_context` must "
- f"be set to 'default' or 'none' but it is set to '{ssl_context_string}'."
- )
- smtp_kwargs["context"] = ssl_context
- return SMTP(**smtp_kwargs)
- @classmethod
- def get_connection_form_widgets(cls) -> dict[str, Any]:
- """Return connection widgets to add to connection form."""
- from flask_appbuilder.fieldwidgets import BS3TextFieldWidget
- from flask_babel import lazy_gettext
- from wtforms import BooleanField, IntegerField, StringField
- from wtforms.validators import NumberRange
- return {
- "from_email": StringField(lazy_gettext("From email"), widget=BS3TextFieldWidget()),
- "timeout": IntegerField(
- lazy_gettext("Connection timeout"),
- validators=[NumberRange(min=0)],
- widget=BS3TextFieldWidget(),
- default=30,
- ),
- "retry_limit": IntegerField(
- lazy_gettext("Number of Retries"),
- validators=[NumberRange(min=0)],
- widget=BS3TextFieldWidget(),
- default=5,
- ),
- "disable_tls": BooleanField(lazy_gettext("Disable TLS"), default=False),
- "disable_ssl": BooleanField(lazy_gettext("Disable SSL"), default=False),
- "subject_template": StringField(
- lazy_gettext("Path to the subject template"), widget=BS3TextFieldWidget()
- ),
- "html_content_template": StringField(
- lazy_gettext("Path to the html content template"), widget=BS3TextFieldWidget()
- ),
- }
- def test_connection(self) -> tuple[bool, str]:
- """Test SMTP connectivity from UI."""
- try:
- smtp_client = self.get_conn().smtp_client
- if smtp_client:
- status = smtp_client.noop()[0]
- if status == 250:
- return True, "Connection successfully tested"
- except Exception as e:
- return False, str(e)
- return False, "Failed to establish connection"
- def send_email_smtp(
- self,
- *,
- to: str | Iterable[str],
- subject: str | None = None,
- html_content: str | None = None,
- from_email: str | None = None,
- files: list[str] | None = None,
- dryrun: bool = False,
- cc: str | Iterable[str] | None = None,
- bcc: str | Iterable[str] | None = None,
- mime_subtype: str = "mixed",
- mime_charset: str = "utf-8",
- custom_headers: dict[str, Any] | None = None,
- **kwargs,
- ) -> None:
- """
- Send an email with html content.
- :param to: Recipient email address or list of addresses.
- :param subject: Email subject. If it's None, the hook will check if there is a path to a subject
- file provided in the connection, and raises an exception if not.
- :param html_content: Email body in HTML format. If it's None, the hook will check if there is a path
- to a html content file provided in the connection, and raises an exception if not.
- :param from_email: Sender email address. If it's None, the hook will check if there is an email
- provided in the connection, and raises an exception if not.
- :param files: List of file paths to attach to the email.
- :param dryrun: If True, the email will not be sent, but all other actions will be performed.
- :param cc: Carbon copy recipient email address or list of addresses.
- :param bcc: Blind carbon copy recipient email address or list of addresses.
- :param mime_subtype: MIME subtype of the email.
- :param mime_charset: MIME charset of the email.
- :param custom_headers: Dictionary of custom headers to include in the email.
- :param kwargs: Additional keyword arguments.
- >>> send_email_smtp(
- 'test@example.com', 'foo', '<b>Foo</b> bar', ['/dev/null'], dryrun=True
- )
- """
- if not self.smtp_client:
- raise AirflowException("The 'smtp_client' should be initialized before!")
- from_email = from_email or self.from_email
- if not from_email:
- raise AirflowException("You should provide `from_email` or define it in the connection.")
- if not subject:
- if self.subject_template is None:
- raise AirflowException(
- "You should provide `subject` or define `subject_template` in the connection."
- )
- subject = self._read_template(self.subject_template)
- if not html_content:
- if self.html_content_template is None:
- raise AirflowException(
- "You should provide `html_content` or define `html_content_template` in the connection."
- )
- html_content = self._read_template(self.html_content_template)
- mime_msg, recipients = self._build_mime_message(
- mail_from=from_email,
- to=to,
- subject=subject,
- html_content=html_content,
- files=files,
- cc=cc,
- bcc=bcc,
- mime_subtype=mime_subtype,
- mime_charset=mime_charset,
- custom_headers=custom_headers,
- )
- if not dryrun:
- for attempt in range(1, self.smtp_retry_limit + 1):
- try:
- self.smtp_client.sendmail(
- from_addr=from_email, to_addrs=recipients, msg=mime_msg.as_string()
- )
- except smtplib.SMTPServerDisconnected as e:
- if attempt == self.smtp_retry_limit:
- raise e
- else:
- break
- def _build_mime_message(
- self,
- mail_from: str | None,
- to: str | Iterable[str],
- subject: str,
- html_content: str,
- files: list[str] | None = None,
- cc: str | Iterable[str] | None = None,
- bcc: str | Iterable[str] | None = None,
- mime_subtype: str = "mixed",
- mime_charset: str = "utf-8",
- custom_headers: dict[str, Any] | None = None,
- ) -> tuple[MIMEMultipart, list[str]]:
- """
- Build a MIME message that can be used to send an email and returns a full list of recipients.
- :param mail_from: Email address to set as the email's "From" field.
- :param to: A string or iterable of strings containing email addresses
- to set as the email's "To" field.
- :param subject: The subject of the email.
- :param html_content: The content of the email in HTML format.
- :param files: A list of paths to files to be attached to the email.
- :param cc: A string or iterable of strings containing email addresses
- to set as the email's "CC" field.
- :param bcc: A string or iterable of strings containing email addresses
- to set as the email's "BCC" field.
- :param mime_subtype: The subtype of the MIME message. Default: "mixed".
- :param mime_charset: The charset of the email. Default: "utf-8".
- :param custom_headers: Additional headers to add to the MIME message.
- No validations are run on these values, and they should be able to be encoded.
- :return: A tuple containing the email as a MIMEMultipart object and
- a list of recipient email addresses.
- """
- to = self._get_email_address_list(to)
- msg = MIMEMultipart(mime_subtype)
- msg["Subject"] = subject
- if mail_from:
- msg["From"] = mail_from
- msg["To"] = ", ".join(to)
- recipients = to
- if cc:
- cc = self._get_email_address_list(cc)
- msg["CC"] = ", ".join(cc)
- recipients += cc
- if bcc:
- # don't add bcc in header
- bcc = self._get_email_address_list(bcc)
- recipients += bcc
- msg["Date"] = formatdate(localtime=True)
- mime_text = MIMEText(html_content, "html", mime_charset)
- msg.attach(mime_text)
- for fname in files or []:
- basename = os.path.basename(fname)
- with open(fname, "rb") as file:
- part = MIMEApplication(file.read(), Name=basename)
- part["Content-Disposition"] = f'attachment; filename="{basename}"'
- part["Content-ID"] = f"<{basename}>"
- msg.attach(part)
- if custom_headers:
- for header_key, header_value in custom_headers.items():
- msg[header_key] = header_value
- return msg, recipients
- def _get_email_address_list(self, addresses: str | Iterable[str]) -> list[str]:
- """
- Return a list of email addresses from the provided input.
- :param addresses: A string or iterable of strings containing email addresses.
- :return: A list of email addresses.
- :raises TypeError: If the input is not a string or iterable of strings.
- """
- if isinstance(addresses, str):
- return self._get_email_list_from_str(addresses)
- elif isinstance(addresses, collections.abc.Iterable):
- if not all(isinstance(item, str) for item in addresses):
- raise TypeError("The items in your iterable must be strings.")
- return list(addresses)
- else:
- raise TypeError(f"Unexpected argument type: Received '{type(addresses).__name__}'.")
- def _get_email_list_from_str(self, addresses: str) -> list[str]:
- """
- Extract a list of email addresses from a string.
- The string can contain multiple email addresses separated by
- any of the following delimiters: ',' or ';'.
- :param addresses: A string containing one or more email addresses.
- :return: A list of email addresses.
- """
- pattern = r"\s*[,;]\s*"
- return re.split(pattern, addresses)
- @property
- def conn(self) -> Connection:
- if not self.smtp_connection:
- raise AirflowException("The smtp connection should be loaded before!")
- return self.smtp_connection
- @property
- def smtp_retry_limit(self) -> int:
- return int(self.conn.extra_dejson.get("retry_limit", 5))
- @property
- def from_email(self) -> str | None:
- return self.conn.extra_dejson.get("from_email")
- @property
- def smtp_user(self) -> str:
- return self.conn.login
- @property
- def smtp_password(self) -> str:
- return self.conn.password
- @property
- def smtp_starttls(self) -> bool:
- return not bool(self.conn.extra_dejson.get("disable_tls", False))
- @property
- def host(self) -> str:
- return self.conn.host
- @property
- def port(self) -> int:
- return self.conn.port
- @property
- def timeout(self) -> int:
- return int(self.conn.extra_dejson.get("timeout", 30))
- @property
- def use_ssl(self) -> bool:
- return not bool(self.conn.extra_dejson.get("disable_ssl", False))
- @property
- def subject_template(self) -> str | None:
- return self.conn.extra_dejson.get("subject_template")
- @property
- def html_content_template(self) -> str | None:
- return self.conn.extra_dejson.get("html_content_template")
- @property
- def ssl_context(self) -> str | None:
- return self.conn.extra_dejson.get("ssl_context")
- @staticmethod
- def _read_template(template_path: str) -> str:
- """
- Read the content of a template file.
- :param template_path: The path to the template file.
- :return: The content of the template file.
- """
- return Path(template_path).read_text()
- @classmethod
- def get_ui_field_behaviour(cls) -> dict[str, Any]:
- """Return custom field behaviour."""
- return {
- "hidden_fields": ["schema", "extra"],
- "relabeling": {},
- }
|