example_xcom.py 3.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. #
  2. # Licensed to the Apache Software Foundation (ASF) under one
  3. # or more contributor license agreements. See the NOTICE file
  4. # distributed with this work for additional information
  5. # regarding copyright ownership. The ASF licenses this file
  6. # to you under the Apache License, Version 2.0 (the
  7. # "License"); you may not use this file except in compliance
  8. # with the License. You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing,
  13. # software distributed under the License is distributed on an
  14. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. # KIND, either express or implied. See the License for the
  16. # specific language governing permissions and limitations
  17. # under the License.
  18. """Example DAG demonstrating the usage of XComs."""
  19. from __future__ import annotations
  20. import pendulum
  21. from airflow.decorators import task
  22. from airflow.models.dag import DAG
  23. from airflow.models.xcom_arg import XComArg
  24. from airflow.operators.bash import BashOperator
  25. value_1 = [1, 2, 3]
  26. value_2 = {"a": "b"}
  27. @task
  28. def push(ti=None):
  29. """Pushes an XCom without a specific target"""
  30. ti.xcom_push(key="value from pusher 1", value=value_1)
  31. @task
  32. def push_by_returning():
  33. """Pushes an XCom without a specific target, just by returning it"""
  34. return value_2
  35. def _compare_values(pulled_value, check_value):
  36. if pulled_value != check_value:
  37. raise ValueError(f"The two values differ {pulled_value} and {check_value}")
  38. @task
  39. def puller(pulled_value_2, ti=None):
  40. """Pull all previously pushed XComs and check if the pushed values match the pulled values."""
  41. pulled_value_1 = ti.xcom_pull(task_ids="push", key="value from pusher 1")
  42. _compare_values(pulled_value_1, value_1)
  43. _compare_values(pulled_value_2, value_2)
  44. @task
  45. def pull_value_from_bash_push(ti=None):
  46. bash_pushed_via_return_value = ti.xcom_pull(key="return_value", task_ids="bash_push")
  47. bash_manually_pushed_value = ti.xcom_pull(key="manually_pushed_value", task_ids="bash_push")
  48. print(f"The xcom value pushed by task push via return value is {bash_pushed_via_return_value}")
  49. print(f"The xcom value pushed by task push manually is {bash_manually_pushed_value}")
  50. with DAG(
  51. "example_xcom",
  52. schedule="@once",
  53. start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
  54. catchup=False,
  55. tags=["example"],
  56. ) as dag:
  57. bash_push = BashOperator(
  58. task_id="bash_push",
  59. bash_command='echo "bash_push demo" && '
  60. 'echo "Manually set xcom value '
  61. '{{ ti.xcom_push(key="manually_pushed_value", value="manually_pushed_value") }}" && '
  62. 'echo "value_by_return"',
  63. )
  64. bash_pull = BashOperator(
  65. task_id="bash_pull",
  66. bash_command='echo "bash pull demo" && '
  67. f'echo "The xcom pushed manually is {XComArg(bash_push, key="manually_pushed_value")}" && '
  68. f'echo "The returned_value xcom is {XComArg(bash_push)}" && '
  69. 'echo "finished"',
  70. do_xcom_push=False,
  71. )
  72. python_pull_from_bash = pull_value_from_bash_push()
  73. [bash_pull, python_pull_from_bash] << bash_push
  74. puller(push_by_returning()) << push()