kubernetes_command.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. # Licensed to the Apache Software Foundation (ASF) under one
  2. # or more contributor license agreements. See the NOTICE file
  3. # distributed with this work for additional information
  4. # regarding copyright ownership. The ASF licenses this file
  5. # to you under the Apache License, Version 2.0 (the
  6. # "License"); you may not use this file except in compliance
  7. # with the License. You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing,
  12. # software distributed under the License is distributed on an
  13. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. # KIND, either express or implied. See the License for the
  15. # specific language governing permissions and limitations
  16. # under the License.
  17. """Kubernetes sub-commands."""
  18. from __future__ import annotations
  19. import os
  20. import sys
  21. import warnings
  22. from datetime import datetime, timedelta
  23. from kubernetes import client
  24. from kubernetes.client.api_client import ApiClient
  25. from kubernetes.client.rest import ApiException
  26. from airflow.models import DagRun, TaskInstance
  27. from airflow.providers.cncf.kubernetes import pod_generator
  28. from airflow.providers.cncf.kubernetes.executors.kubernetes_executor import KubeConfig
  29. from airflow.providers.cncf.kubernetes.kube_client import get_kube_client
  30. from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import create_pod_id
  31. from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator
  32. from airflow.utils import cli as cli_utils, yaml
  33. from airflow.utils.cli import get_dag
  34. from airflow.utils.providers_configuration_loader import providers_configuration_loaded
  35. warnings.warn(
  36. "Use kubernetes command from providers package, Use cncf.kubernetes provider >= 8.2.1",
  37. DeprecationWarning,
  38. stacklevel=2,
  39. )
  40. @cli_utils.action_cli
  41. @providers_configuration_loaded
  42. def generate_pod_yaml(args):
  43. """Generate yaml files for each task in the DAG. Used for testing output of KubernetesExecutor."""
  44. execution_date = args.execution_date
  45. dag = get_dag(subdir=args.subdir, dag_id=args.dag_id)
  46. yaml_output_path = args.output_path
  47. dr = DagRun(dag.dag_id, execution_date=execution_date)
  48. kube_config = KubeConfig()
  49. for task in dag.tasks:
  50. ti = TaskInstance(task, None)
  51. ti.dag_run = dr
  52. pod = PodGenerator.construct_pod(
  53. dag_id=args.dag_id,
  54. task_id=ti.task_id,
  55. pod_id=create_pod_id(args.dag_id, ti.task_id),
  56. try_number=ti.try_number,
  57. kube_image=kube_config.kube_image,
  58. date=ti.execution_date,
  59. args=ti.command_as_list(),
  60. pod_override_object=PodGenerator.from_obj(ti.executor_config),
  61. scheduler_job_id="worker-config",
  62. namespace=kube_config.executor_namespace,
  63. base_worker_pod=PodGenerator.deserialize_model_file(kube_config.pod_template_file),
  64. with_mutation_hook=True,
  65. )
  66. api_client = ApiClient()
  67. date_string = pod_generator.datetime_to_label_safe_datestring(execution_date)
  68. yaml_file_name = f"{args.dag_id}_{ti.task_id}_{date_string}.yml"
  69. os.makedirs(os.path.dirname(yaml_output_path + "/airflow_yaml_output/"), exist_ok=True)
  70. with open(yaml_output_path + "/airflow_yaml_output/" + yaml_file_name, "w") as output:
  71. sanitized_pod = api_client.sanitize_for_serialization(pod)
  72. output.write(yaml.dump(sanitized_pod))
  73. print(f"YAML output can be found at {yaml_output_path}/airflow_yaml_output/")
  74. @cli_utils.action_cli
  75. @providers_configuration_loaded
  76. def cleanup_pods(args):
  77. """Clean up k8s pods in evicted/failed/succeeded/pending states."""
  78. namespace = args.namespace
  79. min_pending_minutes = args.min_pending_minutes
  80. # protect newly created pods from deletion
  81. if min_pending_minutes < 5:
  82. min_pending_minutes = 5
  83. # https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/
  84. # All Containers in the Pod have terminated in success, and will not be restarted.
  85. pod_succeeded = "succeeded"
  86. # The Pod has been accepted by the Kubernetes cluster,
  87. # but one or more of the containers has not been set up and made ready to run.
  88. pod_pending = "pending"
  89. # All Containers in the Pod have terminated, and at least one Container has terminated in failure.
  90. # That is, the Container either exited with non-zero status or was terminated by the system.
  91. pod_failed = "failed"
  92. # https://kubernetes.io/docs/tasks/administer-cluster/out-of-resource/
  93. pod_reason_evicted = "evicted"
  94. # If pod is failed and restartPolicy is:
  95. # * Always: Restart Container; Pod phase stays Running.
  96. # * OnFailure: Restart Container; Pod phase stays Running.
  97. # * Never: Pod phase becomes Failed.
  98. pod_restart_policy_never = "never"
  99. print("Loading Kubernetes configuration")
  100. kube_client = get_kube_client()
  101. print(f"Listing pods in namespace {namespace}")
  102. airflow_pod_labels = [
  103. "dag_id",
  104. "task_id",
  105. "try_number",
  106. "airflow_version",
  107. ]
  108. list_kwargs = {"namespace": namespace, "limit": 500, "label_selector": ",".join(airflow_pod_labels)}
  109. while True:
  110. pod_list = kube_client.list_namespaced_pod(**list_kwargs)
  111. for pod in pod_list.items:
  112. pod_name = pod.metadata.name
  113. print(f"Inspecting pod {pod_name}")
  114. pod_phase = pod.status.phase.lower()
  115. pod_reason = pod.status.reason.lower() if pod.status.reason else ""
  116. pod_restart_policy = pod.spec.restart_policy.lower()
  117. current_time = datetime.now(pod.metadata.creation_timestamp.tzinfo)
  118. if (
  119. pod_phase == pod_succeeded
  120. or (pod_phase == pod_failed and pod_restart_policy == pod_restart_policy_never)
  121. or (pod_reason == pod_reason_evicted)
  122. or (
  123. pod_phase == pod_pending
  124. and current_time - pod.metadata.creation_timestamp
  125. > timedelta(minutes=min_pending_minutes)
  126. )
  127. ):
  128. print(
  129. f'Deleting pod "{pod_name}" phase "{pod_phase}" and reason "{pod_reason}", '
  130. f'restart policy "{pod_restart_policy}"'
  131. )
  132. try:
  133. _delete_pod(pod.metadata.name, namespace)
  134. except ApiException as e:
  135. print(f"Can't remove POD: {e}", file=sys.stderr)
  136. else:
  137. print(f"No action taken on pod {pod_name}")
  138. continue_token = pod_list.metadata._continue
  139. if not continue_token:
  140. break
  141. list_kwargs["_continue"] = continue_token
  142. def _delete_pod(name, namespace):
  143. """
  144. Delete a namespaced pod.
  145. Helper Function for cleanup_pods.
  146. """
  147. kube_client = get_kube_client()
  148. delete_options = client.V1DeleteOptions()
  149. print(f'Deleting POD "{name}" from "{namespace}" namespace')
  150. api_response = kube_client.delete_namespaced_pod(name=name, namespace=namespace, body=delete_options)
  151. print(api_response)