smtp.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. # Licensed to the Apache Software Foundation (ASF) under one
  2. # or more contributor license agreements. See the NOTICE file
  3. # distributed with this work for additional information
  4. # regarding copyright ownership. The ASF licenses this file
  5. # to you under the Apache License, Version 2.0 (the
  6. # "License"); you may not use this file except in compliance
  7. # with the License. You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing,
  12. # software distributed under the License is distributed on an
  13. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. # KIND, either express or implied. See the License for the
  15. # specific language governing permissions and limitations
  16. # under the License.
  17. from __future__ import annotations
  18. from collections.abc import Iterable
  19. from functools import cached_property
  20. from pathlib import Path
  21. from typing import Any
  22. from airflow.providers.common.compat.notifier import BaseNotifier
  23. from airflow.providers.smtp.hooks.smtp import SmtpHook
  24. class SmtpNotifier(BaseNotifier):
  25. """
  26. SMTP Notifier.
  27. Accepts keyword arguments. The only required arguments are `from_email` and `to`. Examples:
  28. .. code-block:: python
  29. EmptyOperator(task_id="task", on_failure_callback=SmtpNotifier(from_email=None, to="myemail@myemail.com"))
  30. EmptyOperator(
  31. task_id="task",
  32. on_failure_callback=SmtpNotifier(
  33. from_email="myemail@myemail.com",
  34. to="myemail@myemail.com",
  35. subject="Task {{ ti.task_id }} failed",
  36. ),
  37. )
  38. You can define a default template for subject and html_content in the SMTP connection configuration.
  39. :param smtp_conn_id: The :ref:`smtp connection id <howto/connection:smtp>`
  40. that contains the information used to authenticate the client.
  41. """
  42. template_fields = (
  43. "from_email",
  44. "to",
  45. "subject",
  46. "html_content",
  47. "files",
  48. "cc",
  49. "bcc",
  50. "mime_subtype",
  51. "mime_charset",
  52. "custom_headers",
  53. )
  54. def __init__(
  55. self,
  56. to: str | Iterable[str],
  57. from_email: str | None = None,
  58. subject: str | None = None,
  59. html_content: str | None = None,
  60. files: list[str] | None = None,
  61. cc: str | Iterable[str] | None = None,
  62. bcc: str | Iterable[str] | None = None,
  63. mime_subtype: str = "mixed",
  64. mime_charset: str = "utf-8",
  65. custom_headers: dict[str, Any] | None = None,
  66. smtp_conn_id: str = SmtpHook.default_conn_name,
  67. *,
  68. template: str | None = None,
  69. ):
  70. super().__init__()
  71. self.smtp_conn_id = smtp_conn_id
  72. self.from_email = from_email
  73. self.to = to
  74. self.files = files
  75. self.cc = cc
  76. self.bcc = bcc
  77. self.mime_subtype = mime_subtype
  78. self.mime_charset = mime_charset
  79. self.custom_headers = custom_headers
  80. self.subject = subject
  81. self.html_content = html_content
  82. if self.html_content is None and template is not None:
  83. self.html_content = self._read_template(template)
  84. @staticmethod
  85. def _read_template(template_path: str) -> str:
  86. return Path(template_path).read_text().replace("\n", "").strip()
  87. @cached_property
  88. def hook(self) -> SmtpHook:
  89. """Smtp Events Hook."""
  90. return SmtpHook(smtp_conn_id=self.smtp_conn_id)
  91. def notify(self, context):
  92. """Send a email via smtp server."""
  93. fields_to_re_render = []
  94. if self.from_email is None:
  95. if self.hook.from_email is not None:
  96. self.from_email = self.hook.from_email
  97. else:
  98. raise ValueError("You should provide `from_email` or define it in the connection")
  99. fields_to_re_render.append("from_email")
  100. if self.subject is None:
  101. smtp_default_templated_subject_path: str
  102. if self.hook.subject_template:
  103. smtp_default_templated_subject_path = self.hook.subject_template
  104. else:
  105. smtp_default_templated_subject_path = (
  106. Path(__file__).parent / "templates" / "email_subject.jinja2"
  107. ).as_posix()
  108. self.subject = self._read_template(smtp_default_templated_subject_path)
  109. fields_to_re_render.append("subject")
  110. if self.html_content is None:
  111. smtp_default_templated_html_content_path: str
  112. if self.hook.html_content_template:
  113. smtp_default_templated_html_content_path = self.hook.html_content_template
  114. else:
  115. smtp_default_templated_html_content_path = (
  116. Path(__file__).parent / "templates" / "email.html"
  117. ).as_posix()
  118. self.html_content = self._read_template(smtp_default_templated_html_content_path)
  119. fields_to_re_render.append("html_content")
  120. if fields_to_re_render:
  121. jinja_env = self.get_template_env(dag=context["dag"])
  122. self._do_render_template_fields(self, fields_to_re_render, context, jinja_env, set())
  123. with self.hook as smtp:
  124. smtp.send_email_smtp(
  125. smtp_conn_id=self.smtp_conn_id,
  126. from_email=self.from_email,
  127. to=self.to,
  128. subject=self.subject,
  129. html_content=self.html_content,
  130. files=self.files,
  131. cc=self.cc,
  132. bcc=self.bcc,
  133. mime_subtype=self.mime_subtype,
  134. mime_charset=self.mime_charset,
  135. custom_headers=self.custom_headers,
  136. )
  137. send_smtp_notification = SmtpNotifier