policies.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. # Licensed to the Apache Software Foundation (ASF) under one
  2. # or more contributor license agreements. See the NOTICE file
  3. # distributed with this work for additional information
  4. # regarding copyright ownership. The ASF licenses this file
  5. # to you under the Apache License, Version 2.0 (the
  6. # "License"); you may not use this file except in compliance
  7. # with the License. You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing,
  12. # software distributed under the License is distributed on an
  13. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. # KIND, either express or implied. See the License for the
  15. # specific language governing permissions and limitations
  16. # under the License.
  17. from __future__ import annotations
  18. from typing import TYPE_CHECKING
  19. import pluggy
  20. local_settings_hookspec = pluggy.HookspecMarker("airflow.policy")
  21. hookimpl = pluggy.HookimplMarker("airflow.policy")
  22. __all__: list[str] = ["hookimpl"]
  23. if TYPE_CHECKING:
  24. from airflow.models.baseoperator import BaseOperator
  25. from airflow.models.dag import DAG
  26. from airflow.models.taskinstance import TaskInstance
  27. @local_settings_hookspec
  28. def task_policy(task: BaseOperator) -> None:
  29. """
  30. Allow altering tasks after they are loaded in the DagBag.
  31. It allows administrator to rewire some task's parameters. Alternatively you can raise
  32. ``AirflowClusterPolicyViolation`` exception to stop DAG from being executed.
  33. Here are a few examples of how this can be useful:
  34. * You could enforce a specific queue (say the ``spark`` queue) for tasks using the ``SparkOperator`` to
  35. make sure that these tasks get wired to the right workers
  36. * You could enforce a task timeout policy, making sure that no tasks run for more than 48 hours
  37. :param task: task to be mutated
  38. """
  39. @local_settings_hookspec
  40. def dag_policy(dag: DAG) -> None:
  41. """
  42. Allow altering DAGs after they are loaded in the DagBag.
  43. It allows administrator to rewire some DAG's parameters.
  44. Alternatively you can raise ``AirflowClusterPolicyViolation`` exception
  45. to stop DAG from being executed.
  46. Here are a few examples of how this can be useful:
  47. * You could enforce default user for DAGs
  48. * Check if every DAG has configured tags
  49. :param dag: dag to be mutated
  50. """
  51. @local_settings_hookspec
  52. def task_instance_mutation_hook(task_instance: TaskInstance) -> None:
  53. """
  54. Allow altering task instances before being queued by the Airflow scheduler.
  55. This could be used, for instance, to modify the task instance during retries.
  56. :param task_instance: task instance to be mutated
  57. """
  58. @local_settings_hookspec
  59. def pod_mutation_hook(pod) -> None:
  60. """
  61. Mutate pod before scheduling.
  62. This setting allows altering ``kubernetes.client.models.V1Pod`` object before they are passed to the
  63. Kubernetes client for scheduling.
  64. This could be used, for instance, to add sidecar or init containers to every worker pod launched by
  65. KubernetesExecutor or KubernetesPodOperator.
  66. """
  67. @local_settings_hookspec(firstresult=True)
  68. def get_airflow_context_vars(context) -> dict[str, str]: # type: ignore[empty-body]
  69. """
  70. Inject airflow context vars into default airflow context vars.
  71. This setting allows getting the airflow context vars, which are key value pairs. They are then injected
  72. to default airflow context vars, which in the end are available as environment variables when running
  73. tasks dag_id, task_id, execution_date, dag_run_id, try_number are reserved keys.
  74. :param context: The context for the task_instance of interest.
  75. """
  76. @local_settings_hookspec(firstresult=True)
  77. def get_dagbag_import_timeout(dag_file_path: str) -> int | float: # type: ignore[empty-body]
  78. """
  79. Allow for dynamic control of the DAG file parsing timeout based on the DAG file path.
  80. It is useful when there are a few DAG files requiring longer parsing times, while others do not.
  81. You can control them separately instead of having one value for all DAG files.
  82. If the return value is less than or equal to 0, it means no timeout during the DAG parsing.
  83. """
  84. class DefaultPolicy:
  85. """
  86. Default implementations of the policy functions.
  87. :meta private:
  88. """
  89. # Default implementations of the policy functions
  90. @staticmethod
  91. @hookimpl
  92. def get_dagbag_import_timeout(dag_file_path: str):
  93. from airflow.configuration import conf
  94. return conf.getfloat("core", "DAGBAG_IMPORT_TIMEOUT")
  95. @staticmethod
  96. @hookimpl
  97. def get_airflow_context_vars(context):
  98. return {}
  99. def make_plugin_from_local_settings(pm: pluggy.PluginManager, module, names: set[str]):
  100. """
  101. Turn the functions from airflow_local_settings module into a custom/local plugin.
  102. Allows plugin-registered functions to co-operate with pluggy/setuptool
  103. entrypoint plugins of the same methods.
  104. Airflow local settings will be "win" (i.e. they have the final say) as they are the last plugin
  105. registered.
  106. :meta private:
  107. """
  108. import inspect
  109. import textwrap
  110. import attr
  111. hook_methods = set()
  112. def _make_shim_fn(name, desired_sig, target):
  113. # Functions defined in airflow_local_settings are called by positional parameters, so the names don't
  114. # have to match what we define in the "template" policy.
  115. #
  116. # However Pluggy validates the names match (and will raise an error if they don't!)
  117. #
  118. # To maintain compat, if we detect the names don't match, we will wrap it with a dynamically created
  119. # shim function that looks somewhat like this:
  120. #
  121. # def dag_policy_name_mismatch_shim(dag):
  122. # airflow_local_settings.dag_policy(dag)
  123. #
  124. codestr = textwrap.dedent(
  125. f"""
  126. def {name}_name_mismatch_shim{desired_sig}:
  127. return __target({' ,'.join(desired_sig.parameters)})
  128. """
  129. )
  130. code = compile(codestr, "<policy-shim>", "single")
  131. scope = {"__target": target}
  132. exec(code, scope, scope)
  133. return scope[f"{name}_name_mismatch_shim"]
  134. @attr.define(frozen=True)
  135. class AirflowLocalSettingsPolicy:
  136. hook_methods: tuple[str, ...]
  137. __name__ = "AirflowLocalSettingsPolicy"
  138. def __dir__(self):
  139. return self.hook_methods
  140. for name in names:
  141. if not hasattr(pm.hook, name):
  142. continue
  143. policy = getattr(module, name)
  144. if not policy:
  145. continue
  146. local_sig = inspect.signature(policy)
  147. policy_sig = inspect.signature(globals()[name])
  148. # We only care if names/order/number of parameters match, not type hints
  149. if local_sig.parameters.keys() != policy_sig.parameters.keys():
  150. policy = _make_shim_fn(name, policy_sig, target=policy)
  151. setattr(AirflowLocalSettingsPolicy, name, staticmethod(hookimpl(policy, specname=name)))
  152. hook_methods.add(name)
  153. if hook_methods:
  154. pm.register(AirflowLocalSettingsPolicy(hook_methods=tuple(hook_methods)))
  155. return hook_methods