callback_requests.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. # Licensed to the Apache Software Foundation (ASF) under one
  2. # or more contributor license agreements. See the NOTICE file
  3. # distributed with this work for additional information
  4. # regarding copyright ownership. The ASF licenses this file
  5. # to you under the Apache License, Version 2.0 (the
  6. # "License"); you may not use this file except in compliance
  7. # with the License. You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing,
  12. # software distributed under the License is distributed on an
  13. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. # KIND, either express or implied. See the License for the
  15. # specific language governing permissions and limitations
  16. # under the License.
  17. from __future__ import annotations
  18. import json
  19. from typing import TYPE_CHECKING
  20. from airflow.utils.state import TaskInstanceState
  21. if TYPE_CHECKING:
  22. from airflow.models.taskinstance import SimpleTaskInstance
  23. class CallbackRequest:
  24. """
  25. Base Class with information about the callback to be executed.
  26. :param full_filepath: File Path to use to run the callback
  27. :param msg: Additional Message that can be used for logging
  28. :param processor_subdir: Directory used by Dag Processor when parsed the dag.
  29. """
  30. def __init__(
  31. self,
  32. full_filepath: str,
  33. processor_subdir: str | None = None,
  34. msg: str | None = None,
  35. ):
  36. self.full_filepath = full_filepath
  37. self.processor_subdir = processor_subdir
  38. self.msg = msg
  39. def __eq__(self, other):
  40. if isinstance(other, self.__class__):
  41. return self.__dict__ == other.__dict__
  42. return NotImplemented
  43. def __repr__(self):
  44. return str(self.__dict__)
  45. def to_json(self) -> str:
  46. return json.dumps(self.__dict__)
  47. @classmethod
  48. def from_json(cls, json_str: str):
  49. json_object = json.loads(json_str)
  50. return cls(**json_object)
  51. class TaskCallbackRequest(CallbackRequest):
  52. """
  53. Task callback status information.
  54. A Class with information about the success/failure TI callback to be executed. Currently, only failure
  55. callbacks (when tasks are externally killed) and Zombies are run via DagFileProcessorProcess.
  56. :param full_filepath: File Path to use to run the callback
  57. :param simple_task_instance: Simplified Task Instance representation
  58. :param msg: Additional Message that can be used for logging to determine failure/zombie
  59. :param processor_subdir: Directory used by Dag Processor when parsed the dag.
  60. :param task_callback_type: e.g. whether on success, on failure, on retry.
  61. """
  62. def __init__(
  63. self,
  64. full_filepath: str,
  65. simple_task_instance: SimpleTaskInstance,
  66. processor_subdir: str | None = None,
  67. msg: str | None = None,
  68. task_callback_type: TaskInstanceState | None = None,
  69. ):
  70. super().__init__(full_filepath=full_filepath, processor_subdir=processor_subdir, msg=msg)
  71. self.simple_task_instance = simple_task_instance
  72. self.task_callback_type = task_callback_type
  73. @property
  74. def is_failure_callback(self) -> bool:
  75. """Returns True if the callback is a failure callback."""
  76. if self.task_callback_type is None:
  77. return True
  78. return self.task_callback_type in {
  79. TaskInstanceState.FAILED,
  80. TaskInstanceState.UP_FOR_RETRY,
  81. TaskInstanceState.UPSTREAM_FAILED,
  82. }
  83. def to_json(self) -> str:
  84. from airflow.serialization.serialized_objects import BaseSerialization
  85. val = BaseSerialization.serialize(self.__dict__, strict=True)
  86. return json.dumps(val)
  87. @classmethod
  88. def from_json(cls, json_str: str):
  89. from airflow.serialization.serialized_objects import BaseSerialization
  90. val = json.loads(json_str)
  91. return cls(**BaseSerialization.deserialize(val))
  92. class DagCallbackRequest(CallbackRequest):
  93. """
  94. A Class with information about the success/failure DAG callback to be executed.
  95. :param full_filepath: File Path to use to run the callback
  96. :param dag_id: DAG ID
  97. :param run_id: Run ID for the DagRun
  98. :param processor_subdir: Directory used by Dag Processor when parsed the dag.
  99. :param is_failure_callback: Flag to determine whether it is a Failure Callback or Success Callback
  100. :param msg: Additional Message that can be used for logging
  101. """
  102. def __init__(
  103. self,
  104. full_filepath: str,
  105. dag_id: str,
  106. run_id: str,
  107. processor_subdir: str | None,
  108. is_failure_callback: bool | None = True,
  109. msg: str | None = None,
  110. ):
  111. super().__init__(full_filepath=full_filepath, processor_subdir=processor_subdir, msg=msg)
  112. self.dag_id = dag_id
  113. self.run_id = run_id
  114. self.is_failure_callback = is_failure_callback
  115. class SlaCallbackRequest(CallbackRequest):
  116. """
  117. A class with information about the SLA callback to be executed.
  118. :param full_filepath: File Path to use to run the callback
  119. :param dag_id: DAG ID
  120. :param processor_subdir: Directory used by Dag Processor when parsed the dag.
  121. """
  122. def __init__(
  123. self,
  124. full_filepath: str,
  125. dag_id: str,
  126. processor_subdir: str | None,
  127. msg: str | None = None,
  128. ):
  129. super().__init__(full_filepath, processor_subdir=processor_subdir, msg=msg)
  130. self.dag_id = dag_id