cli_action_loggers.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. #
  2. # Licensed to the Apache Software Foundation (ASF) under one
  3. # or more contributor license agreements. See the NOTICE file
  4. # distributed with this work for additional information
  5. # regarding copyright ownership. The ASF licenses this file
  6. # to you under the Apache License, Version 2.0 (the
  7. # "License"); you may not use this file except in compliance
  8. # with the License. You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing,
  13. # software distributed under the License is distributed on an
  14. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. # KIND, either express or implied. See the License for the
  16. # specific language governing permissions and limitations
  17. # under the License.
  18. """
  19. An Action Logger module.
  20. Singleton pattern has been applied into this module so that registered
  21. callbacks can be used all through the same python process.
  22. """
  23. from __future__ import annotations
  24. import json
  25. import logging
  26. from typing import TYPE_CHECKING, Callable
  27. from airflow.api_internal.internal_api_call import internal_api_call
  28. from airflow.utils.session import NEW_SESSION, provide_session
  29. if TYPE_CHECKING:
  30. from sqlalchemy.orm import Session
  31. logger = logging.getLogger(__name__)
  32. def register_pre_exec_callback(action_logger):
  33. """
  34. Register more action_logger function callback for pre-execution.
  35. This function callback is expected to be called with keyword args.
  36. For more about the arguments that is being passed to the callback,
  37. refer to airflow.utils.cli.action_logging().
  38. :param action_logger: An action logger function
  39. :return: None
  40. """
  41. logger.debug("Adding %s to pre execution callback", action_logger)
  42. __pre_exec_callbacks.append(action_logger)
  43. def register_post_exec_callback(action_logger):
  44. """
  45. Register more action_logger function callback for post-execution.
  46. This function callback is expected to be called with keyword args.
  47. For more about the arguments that is being passed to the callback,
  48. refer to airflow.utils.cli.action_logging().
  49. :param action_logger: An action logger function
  50. :return: None
  51. """
  52. logger.debug("Adding %s to post execution callback", action_logger)
  53. __post_exec_callbacks.append(action_logger)
  54. def on_pre_execution(**kwargs):
  55. """
  56. Call callbacks before execution.
  57. Note that any exception from callback will be logged but won't be propagated.
  58. :param kwargs:
  59. :return: None
  60. """
  61. logger.debug("Calling callbacks: %s", __pre_exec_callbacks)
  62. for callback in __pre_exec_callbacks:
  63. try:
  64. callback(**kwargs)
  65. except Exception:
  66. logger.exception("Failed on pre-execution callback using %s", callback)
  67. def on_post_execution(**kwargs):
  68. """
  69. Call callbacks after execution.
  70. As it's being called after execution, it can capture status of execution,
  71. duration, etc. Note that any exception from callback will be logged but
  72. won't be propagated.
  73. :param kwargs:
  74. :return: None
  75. """
  76. logger.debug("Calling callbacks: %s", __post_exec_callbacks)
  77. for callback in __post_exec_callbacks:
  78. try:
  79. callback(**kwargs)
  80. except Exception:
  81. logger.exception("Failed on post-execution callback using %s", callback)
  82. def default_action_log(sub_command, user, task_id, dag_id, execution_date, host_name, full_command, **_):
  83. """
  84. Behave similar to ``action_logging``; default action logger callback.
  85. The difference is this function uses the global ORM session, and pushes a
  86. ``Log`` row into the database instead of actually logging.
  87. """
  88. _default_action_log_internal(
  89. sub_command=sub_command,
  90. user=user,
  91. task_id=task_id,
  92. dag_id=dag_id,
  93. execution_date=execution_date,
  94. host_name=host_name,
  95. full_command=full_command,
  96. )
  97. @internal_api_call
  98. @provide_session
  99. def _default_action_log_internal(
  100. *,
  101. sub_command,
  102. user,
  103. task_id,
  104. dag_id,
  105. execution_date,
  106. host_name,
  107. full_command,
  108. session: Session = NEW_SESSION,
  109. ):
  110. """
  111. RPC portion of default_action_log.
  112. To use RPC, we need to accept a session, which is provided by the RPC call handler.
  113. But, the action log callback system may already be forwarding a session, so to avoid
  114. a collision, I have made this internal function instead of making default_action_log
  115. an RPC function.
  116. """
  117. from sqlalchemy.exc import OperationalError, ProgrammingError
  118. from airflow.models.log import Log
  119. from airflow.utils import timezone
  120. try:
  121. # Use bulk_insert_mappings here to avoid importing all models (which using the classes does) early
  122. # on in the CLI
  123. session.bulk_insert_mappings(
  124. Log,
  125. [
  126. {
  127. "event": f"cli_{sub_command}",
  128. "task_instance": None,
  129. "owner": user,
  130. "extra": json.dumps({"host_name": host_name, "full_command": full_command}),
  131. "task_id": task_id,
  132. "dag_id": dag_id,
  133. "execution_date": execution_date,
  134. "dttm": timezone.utcnow(),
  135. }
  136. ],
  137. )
  138. session.commit()
  139. except (OperationalError, ProgrammingError) as e:
  140. expected = [
  141. '"log" does not exist', # postgres
  142. "no such table", # sqlite
  143. "log' doesn't exist", # mysql
  144. ]
  145. error_is_ok = e.args and any(x in e.args[0] for x in expected)
  146. if not error_is_ok:
  147. logger.warning("Failed to log action %s", e)
  148. session.rollback()
  149. except Exception as e:
  150. logger.warning("Failed to log action %s", e)
  151. session.rollback()
  152. __pre_exec_callbacks: list[Callable] = []
  153. __post_exec_callbacks: list[Callable] = []
  154. # By default, register default action log into pre-execution callback
  155. register_pre_exec_callback(default_action_log)