123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149 |
- #
- # Licensed to the Apache Software Foundation (ASF) under one
- # or more contributor license agreements. See the NOTICE file
- # distributed with this work for additional information
- # regarding copyright ownership. The ASF licenses this file
- # to you under the Apache License, Version 2.0 (the
- # "License"); you may not use this file except in compliance
- # with the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing,
- # software distributed under the License is distributed on an
- # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- # KIND, either express or implied. See the License for the
- # specific language governing permissions and limitations
- # under the License.
- """Pools sub-commands."""
- from __future__ import annotations
- import json
- import os
- from json import JSONDecodeError
- from airflow.api.client import get_current_api_client
- from airflow.cli.simple_table import AirflowConsole
- from airflow.exceptions import PoolNotFound
- from airflow.utils import cli as cli_utils
- from airflow.utils.cli import suppress_logs_and_warning
- from airflow.utils.providers_configuration_loader import providers_configuration_loaded
- def _show_pools(pools, output):
- AirflowConsole().print_as(
- data=pools,
- output=output,
- mapper=lambda x: {
- "pool": x[0],
- "slots": x[1],
- "description": x[2],
- "include_deferred": x[3],
- },
- )
- @suppress_logs_and_warning
- @providers_configuration_loaded
- def pool_list(args):
- """Display info of all the pools."""
- api_client = get_current_api_client()
- pools = api_client.get_pools()
- _show_pools(pools=pools, output=args.output)
- @suppress_logs_and_warning
- @providers_configuration_loaded
- def pool_get(args):
- """Display pool info by a given name."""
- api_client = get_current_api_client()
- try:
- pools = [api_client.get_pool(name=args.pool)]
- _show_pools(pools=pools, output=args.output)
- except PoolNotFound:
- raise SystemExit(f"Pool {args.pool} does not exist")
- @cli_utils.action_cli
- @suppress_logs_and_warning
- @providers_configuration_loaded
- def pool_set(args):
- """Create new pool with a given name and slots."""
- api_client = get_current_api_client()
- api_client.create_pool(
- name=args.pool, slots=args.slots, description=args.description, include_deferred=args.include_deferred
- )
- print(f"Pool {args.pool} created")
- @cli_utils.action_cli
- @suppress_logs_and_warning
- @providers_configuration_loaded
- def pool_delete(args):
- """Delete pool by a given name."""
- api_client = get_current_api_client()
- try:
- api_client.delete_pool(name=args.pool)
- print(f"Pool {args.pool} deleted")
- except PoolNotFound:
- raise SystemExit(f"Pool {args.pool} does not exist")
- @cli_utils.action_cli
- @suppress_logs_and_warning
- @providers_configuration_loaded
- def pool_import(args):
- """Import pools from the file."""
- if not os.path.exists(args.file):
- raise SystemExit(f"Missing pools file {args.file}")
- pools, failed = pool_import_helper(args.file)
- if failed:
- raise SystemExit(f"Failed to update pool(s): {', '.join(failed)}")
- print(f"Uploaded {len(pools)} pool(s)")
- @providers_configuration_loaded
- def pool_export(args):
- """Export all the pools to the file."""
- pools = pool_export_helper(args.file)
- print(f"Exported {len(pools)} pools to {args.file}")
- def pool_import_helper(filepath):
- """Help import pools from the json file."""
- api_client = get_current_api_client()
- with open(filepath) as poolfile:
- data = poolfile.read()
- try:
- pools_json = json.loads(data)
- except JSONDecodeError as e:
- raise SystemExit(f"Invalid json file: {e}")
- pools = []
- failed = []
- for k, v in pools_json.items():
- if isinstance(v, dict) and "slots" in v and "description" in v:
- pools.append(
- api_client.create_pool(
- name=k,
- slots=v["slots"],
- description=v["description"],
- include_deferred=v.get("include_deferred", False),
- )
- )
- else:
- failed.append(k)
- return pools, failed
- def pool_export_helper(filepath):
- """Help export all the pools to the json file."""
- api_client = get_current_api_client()
- pool_dict = {}
- pools = api_client.get_pools()
- for pool in pools:
- pool_dict[pool[0]] = {"slots": pool[1], "description": pool[2], "include_deferred": pool[3]}
- with open(filepath, "w") as poolfile:
- poolfile.write(json.dumps(pool_dict, sort_keys=True, indent=4))
- return pools
|