123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869 |
- # Licensed to the Apache Software Foundation (ASF) under one
- # or more contributor license agreements. See the NOTICE file
- # distributed with this work for additional information
- # regarding copyright ownership. The ASF licenses this file
- # to you under the Apache License, Version 2.0 (the
- # "License"); you may not use this file except in compliance
- # with the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing,
- # software distributed under the License is distributed on an
- # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- # KIND, either express or implied. See the License for the
- # specific language governing permissions and limitations
- # under the License.
- import datetime
- from functools import cached_property
- from typing import TYPE_CHECKING, Optional
- from airflow.executors.executor_loader import ExecutorLoader
- from airflow.jobs.base_job_runner import BaseJobRunner
- from airflow.utils.pydantic import BaseModel as BaseModelPydantic, ConfigDict
- def check_runner_initialized(job_runner: Optional[BaseJobRunner], job_type: str) -> BaseJobRunner:
- if job_runner is None:
- raise ValueError(f"In order to run {job_type} you need to initialize the {job_type}Runner first.")
- return job_runner
- class JobPydantic(BaseModelPydantic):
- """Serializable representation of the Job ORM SqlAlchemyModel used by internal API."""
- id: Optional[int]
- dag_id: Optional[str]
- state: Optional[str]
- job_type: Optional[str]
- start_date: Optional[datetime.datetime]
- end_date: Optional[datetime.datetime]
- latest_heartbeat: datetime.datetime
- executor_class: Optional[str]
- hostname: Optional[str]
- unixname: Optional[str]
- grace_multiplier: float = 2.1
- model_config = ConfigDict(from_attributes=True)
- @cached_property
- def executor(self):
- return ExecutorLoader.get_default_executor()
- @cached_property
- def heartrate(self) -> float:
- from airflow.jobs.job import Job
- if TYPE_CHECKING:
- assert self.job_type is not None
- return Job._heartrate(self.job_type)
- def is_alive(self) -> bool:
- """Is this job currently alive."""
- from airflow.jobs.job import Job, health_check_threshold
- return Job._is_alive(
- state=self.state,
- health_check_threshold_value=health_check_threshold(self.job_type, self.heartrate),
- latest_heartbeat=self.latest_heartbeat,
- )
|