python.py 3.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. #
  2. # Licensed to the Apache Software Foundation (ASF) under one
  3. # or more contributor license agreements. See the NOTICE file
  4. # distributed with this work for additional information
  5. # regarding copyright ownership. The ASF licenses this file
  6. # to you under the Apache License, Version 2.0 (the
  7. # "License"); you may not use this file except in compliance
  8. # with the License. You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing,
  13. # software distributed under the License is distributed on an
  14. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. # KIND, either express or implied. See the License for the
  16. # specific language governing permissions and limitations
  17. # under the License.
  18. from __future__ import annotations
  19. from typing import TYPE_CHECKING, Any, Callable, Mapping, Sequence
  20. from airflow.sensors.base import BaseSensorOperator, PokeReturnValue
  21. from airflow.utils.context import context_merge
  22. from airflow.utils.operator_helpers import determine_kwargs
  23. if TYPE_CHECKING:
  24. from airflow.utils.context import Context
  25. class PythonSensor(BaseSensorOperator):
  26. """
  27. Waits for a Python callable to return True.
  28. User could put input argument in templates_dict
  29. e.g ``templates_dict = {'start_ds': 1970}``
  30. and access the argument by calling ``kwargs['templates_dict']['start_ds']``
  31. in the callable
  32. :param python_callable: A reference to an object that is callable
  33. :param op_kwargs: a dictionary of keyword arguments that will get unpacked
  34. in your function
  35. :param op_args: a list of positional arguments that will get unpacked when
  36. calling your callable
  37. :param templates_dict: a dictionary where the values are templates that
  38. will get templated by the Airflow engine sometime between
  39. ``__init__`` and ``execute`` takes place and are made available
  40. in your callable's context after the template has been applied.
  41. .. seealso::
  42. For more information on how to use this sensor, take a look at the guide:
  43. :ref:`howto/operator:PythonSensor`
  44. """
  45. template_fields: Sequence[str] = ("templates_dict", "op_args", "op_kwargs")
  46. def __init__(
  47. self,
  48. *,
  49. python_callable: Callable,
  50. op_args: list | None = None,
  51. op_kwargs: Mapping[str, Any] | None = None,
  52. templates_dict: dict | None = None,
  53. **kwargs,
  54. ):
  55. super().__init__(**kwargs)
  56. self.python_callable = python_callable
  57. self.op_args = op_args or []
  58. self.op_kwargs = op_kwargs or {}
  59. self.templates_dict = templates_dict
  60. def poke(self, context: Context) -> PokeReturnValue | bool:
  61. context_merge(context, self.op_kwargs, templates_dict=self.templates_dict)
  62. self.op_kwargs = determine_kwargs(self.python_callable, self.op_args, context)
  63. self.log.info("Poking callable: %s", str(self.python_callable))
  64. return_value = self.python_callable(*self.op_args, **self.op_kwargs)
  65. if isinstance(return_value, PokeReturnValue):
  66. return return_value
  67. else:
  68. return PokeReturnValue(bool(return_value))