123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778 |
- {#
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
- -#}
- from __future__ import annotations
- import {{ pickling_library }}
- import sys
- {% if expect_airflow %}
- {# Check whether Airflow is available in the environment.
- # If it is, we'll want to ensure that we integrate any macros that are being provided
- # by plugins prior to unpickling the task context. #}
- if sys.version_info >= (3,6):
- try:
- from airflow.plugins_manager import integrate_macros_plugins
- integrate_macros_plugins()
- except ImportError:
- {# Airflow is not available in this environment, therefore we won't
- # be able to integrate any plugin macros. #}
- pass
- {% endif %}
- # Script
- {{ python_callable_source }}
- # monkey patching for the cases when python_callable is part of the dag module.
- {% if modified_dag_module_name is defined %}
- import types
- {{ modified_dag_module_name }} = types.ModuleType("{{ modified_dag_module_name }}")
- {{ modified_dag_module_name }}.{{ python_callable }} = {{ python_callable }}
- sys.modules["{{modified_dag_module_name}}"] = {{modified_dag_module_name}}
- {% endif%}
- {% if op_args or op_kwargs %}
- with open(sys.argv[1], "rb") as file:
- arg_dict = {{ pickling_library }}.load(file)
- {% else %}
- arg_dict = {"args": [], "kwargs": {}}
- {% endif %}
- {% if string_args_global | default(true) -%}
- # Read string args
- with open(sys.argv[3], "r") as file:
- virtualenv_string_args = list(map(lambda x: x.strip(), list(file)))
- {% endif %}
- try:
- res = {{ python_callable }}(*arg_dict["args"], **arg_dict["kwargs"])
- except Exception as e:
- with open(sys.argv[4], "w") as file:
- file.write(str(e))
- raise
- # Write output
- with open(sys.argv[2], "wb") as file:
- if res is not None:
- {{ pickling_library }}.dump(res, file)
|