kubernetes.py 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. #
  2. # Licensed to the Apache Software Foundation (ASF) under one
  3. # or more contributor license agreements. See the NOTICE file
  4. # distributed with this work for additional information
  5. # regarding copyright ownership. The ASF licenses this file
  6. # to you under the Apache License, Version 2.0 (the
  7. # "License"); you may not use this file except in compliance
  8. # with the License. You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing,
  13. # software distributed under the License is distributed on an
  14. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. # KIND, either express or implied. See the License for the
  16. # specific language governing permissions and limitations
  17. # under the License.
  18. from __future__ import annotations
  19. import logging
  20. from typing import TYPE_CHECKING
  21. from airflow.utils.module_loading import qualname
  22. # lazy loading for performance reasons
  23. serializers = [
  24. "kubernetes.client.models.v1_resource_requirements.V1ResourceRequirements",
  25. "kubernetes.client.models.v1_pod.V1Pod",
  26. ]
  27. if TYPE_CHECKING:
  28. from airflow.serialization.serde import U
  29. __version__ = 1
  30. deserializers: list[type[object]] = []
  31. log = logging.getLogger(__name__)
  32. def serialize(o: object) -> tuple[U, str, int, bool]:
  33. from kubernetes.client import models as k8s
  34. if not k8s:
  35. return "", "", 0, False
  36. if isinstance(o, (k8s.V1Pod, k8s.V1ResourceRequirements)):
  37. try:
  38. from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator
  39. except ImportError:
  40. from airflow.kubernetes.pre_7_4_0_compatibility.pod_generator import ( # type: ignore[assignment]
  41. PodGenerator,
  42. )
  43. # We're running this in an except block, so we don't want it to fail
  44. # under any circumstances, e.g. accessing a non-existing attribute.
  45. def safe_get_name(pod):
  46. try:
  47. return pod.metadata.name
  48. except Exception:
  49. return None
  50. try:
  51. return PodGenerator.serialize_pod(o), qualname(o), __version__, True
  52. except Exception:
  53. log.warning("Serialization failed for pod %s", safe_get_name(o))
  54. log.debug("traceback for serialization error", exc_info=True)
  55. return "", "", 0, False
  56. return "", "", 0, False