simple_table.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. # Licensed to the Apache Software Foundation (ASF) under one
  2. # or more contributor license agreements. See the NOTICE file
  3. # distributed with this work for additional information
  4. # regarding copyright ownership. The ASF licenses this file
  5. # to you under the Apache License, Version 2.0 (the
  6. # "License"); you may not use this file except in compliance
  7. # with the License. You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing,
  12. # software distributed under the License is distributed on an
  13. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. # KIND, either express or implied. See the License for the
  15. # specific language governing permissions and limitations
  16. # under the License.
  17. from __future__ import annotations
  18. import inspect
  19. import json
  20. from typing import TYPE_CHECKING, Any, Callable, Sequence
  21. from rich.box import ASCII_DOUBLE_HEAD
  22. from rich.console import Console
  23. from rich.syntax import Syntax
  24. from rich.table import Table
  25. from tabulate import tabulate
  26. from airflow.plugins_manager import PluginsDirectorySource
  27. from airflow.utils import yaml
  28. from airflow.utils.platform import is_tty
  29. if TYPE_CHECKING:
  30. from airflow.typing_compat import TypeGuard
  31. def is_data_sequence(data: Sequence[dict | Any]) -> TypeGuard[Sequence[dict]]:
  32. return all(isinstance(d, dict) for d in data)
  33. class AirflowConsole(Console):
  34. """Airflow rich console."""
  35. def __init__(self, show_header: bool = True, *args, **kwargs):
  36. super().__init__(*args, **kwargs)
  37. # Set the width to constant to pipe whole output from console
  38. self._width = 200 if not is_tty() else self._width
  39. # If show header in tables
  40. self.show_header = show_header
  41. def print_as_json(self, data: dict):
  42. """Render dict as json text representation."""
  43. json_content = json.dumps(data)
  44. self.print(Syntax(json_content, "json", theme="ansi_dark"), soft_wrap=True)
  45. def print_as_yaml(self, data: dict):
  46. """Render dict as yaml text representation."""
  47. yaml_content = yaml.dump(data)
  48. self.print(Syntax(yaml_content, "yaml", theme="ansi_dark"), soft_wrap=True)
  49. def print_as_table(self, data: list[dict]):
  50. """Render list of dictionaries as table."""
  51. if not data:
  52. self.print("No data found")
  53. return
  54. table = SimpleTable(show_header=self.show_header)
  55. for col in data[0]:
  56. table.add_column(col)
  57. for row in data:
  58. table.add_row(*(str(d) for d in row.values()))
  59. self.print(table)
  60. def print_as_plain_table(self, data: list[dict]):
  61. """Render list of dictionaries as a simple table than can be easily piped."""
  62. if not data:
  63. self.print("No data found")
  64. return
  65. rows = [d.values() for d in data]
  66. output = tabulate(rows, tablefmt="plain", headers=list(data[0]))
  67. print(output)
  68. def _normalize_data(self, value: Any, output: str) -> list | str | dict | None:
  69. if isinstance(value, (tuple, list)):
  70. if output == "table":
  71. return ",".join(str(self._normalize_data(x, output)) for x in value)
  72. return [self._normalize_data(x, output) for x in value]
  73. if isinstance(value, dict) and output != "table":
  74. return {k: self._normalize_data(v, output) for k, v in value.items()}
  75. if inspect.isclass(value) and not isinstance(value, PluginsDirectorySource):
  76. return value.__name__
  77. if value is None:
  78. return None
  79. return str(value)
  80. def print_as(
  81. self,
  82. data: Sequence[dict | Any],
  83. output: str,
  84. mapper: Callable[[Any], dict] | None = None,
  85. ) -> None:
  86. """Print provided using format specified by output argument."""
  87. output_to_renderer: dict[str, Callable[[Any], None]] = {
  88. "json": self.print_as_json,
  89. "yaml": self.print_as_yaml,
  90. "table": self.print_as_table,
  91. "plain": self.print_as_plain_table,
  92. }
  93. renderer = output_to_renderer.get(output)
  94. if not renderer:
  95. raise ValueError(f"Unknown formatter: {output}. Allowed options: {list(output_to_renderer)}")
  96. if mapper:
  97. dict_data: Sequence[dict] = [mapper(d) for d in data]
  98. elif is_data_sequence(data):
  99. dict_data = data
  100. else:
  101. raise ValueError("To tabulate non-dictionary data you need to provide `mapper` function")
  102. dict_data = [{k: self._normalize_data(v, output) for k, v in d.items()} for d in dict_data]
  103. renderer(dict_data)
  104. class SimpleTable(Table):
  105. """A rich Table with some default hardcoded for consistency."""
  106. def __init__(self, *args, **kwargs):
  107. super().__init__(*args, **kwargs)
  108. self.show_edge = kwargs.get("show_edge", False)
  109. self.pad_edge = kwargs.get("pad_edge", False)
  110. self.box = kwargs.get("box", ASCII_DOUBLE_HEAD)
  111. self.show_header = kwargs.get("show_header", False)
  112. self.title_style = kwargs.get("title_style", "bold green")
  113. self.title_justify = kwargs.get("title_justify", "left")
  114. self.caption = kwargs.get("caption", " ")
  115. def add_column(self, *args, **kwargs) -> None:
  116. """Add a column to the table. We use different default."""
  117. kwargs["overflow"] = kwargs.get("overflow") # to avoid truncating
  118. super().add_column(*args, **kwargs)