operator_resources.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. #
  2. # Licensed to the Apache Software Foundation (ASF) under one
  3. # or more contributor license agreements. See the NOTICE file
  4. # distributed with this work for additional information
  5. # regarding copyright ownership. The ASF licenses this file
  6. # to you under the Apache License, Version 2.0 (the
  7. # "License"); you may not use this file except in compliance
  8. # with the License. You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing,
  13. # software distributed under the License is distributed on an
  14. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. # KIND, either express or implied. See the License for the
  16. # specific language governing permissions and limitations
  17. # under the License.
  18. from __future__ import annotations
  19. from airflow.configuration import conf
  20. from airflow.exceptions import AirflowException
  21. # Constants for resources (megabytes are the base unit)
  22. MB = 1
  23. GB = 1024 * MB
  24. TB = 1024 * GB
  25. PB = 1024 * TB
  26. EB = 1024 * PB
  27. class Resource:
  28. """
  29. Represents a resource requirement in an execution environment for an operator.
  30. :param name: Name of the resource
  31. :param units_str: The string representing the units of a resource (e.g. MB for a CPU
  32. resource) to be used for display purposes
  33. :param qty: The number of units of the specified resource that are required for
  34. execution of the operator.
  35. """
  36. def __init__(self, name, units_str, qty):
  37. if qty < 0:
  38. raise AirflowException(
  39. f"Received resource quantity {qty} for resource {name}, "
  40. f"but resource quantity must be non-negative."
  41. )
  42. self._name = name
  43. self._units_str = units_str
  44. self._qty = qty
  45. def __eq__(self, other):
  46. if not isinstance(other, self.__class__):
  47. return NotImplemented
  48. return self.__dict__ == other.__dict__
  49. def __repr__(self):
  50. return str(self.__dict__)
  51. @property
  52. def name(self):
  53. """Name of the resource."""
  54. return self._name
  55. @property
  56. def units_str(self):
  57. """The string representing the units of a resource."""
  58. return self._units_str
  59. @property
  60. def qty(self):
  61. """The number of units of the specified resource that are required for execution of the operator."""
  62. return self._qty
  63. def to_dict(self):
  64. return {
  65. "name": self.name,
  66. "qty": self.qty,
  67. "units_str": self.units_str,
  68. }
  69. class CpuResource(Resource):
  70. """Represents a CPU requirement in an execution environment for an operator."""
  71. def __init__(self, qty):
  72. super().__init__("CPU", "core(s)", qty)
  73. class RamResource(Resource):
  74. """Represents a RAM requirement in an execution environment for an operator."""
  75. def __init__(self, qty):
  76. super().__init__("RAM", "MB", qty)
  77. class DiskResource(Resource):
  78. """Represents a disk requirement in an execution environment for an operator."""
  79. def __init__(self, qty):
  80. super().__init__("Disk", "MB", qty)
  81. class GpuResource(Resource):
  82. """Represents a GPU requirement in an execution environment for an operator."""
  83. def __init__(self, qty):
  84. super().__init__("GPU", "gpu(s)", qty)
  85. class Resources:
  86. """
  87. The resources required by an operator.
  88. Resources that are not specified will use the default values from the airflow config.
  89. :param cpus: The number of cpu cores that are required
  90. :param ram: The amount of RAM required
  91. :param disk: The amount of disk space required
  92. :param gpus: The number of gpu units that are required
  93. """
  94. def __init__(
  95. self,
  96. cpus=conf.getint("operators", "default_cpus"),
  97. ram=conf.getint("operators", "default_ram"),
  98. disk=conf.getint("operators", "default_disk"),
  99. gpus=conf.getint("operators", "default_gpus"),
  100. ):
  101. self.cpus = CpuResource(cpus)
  102. self.ram = RamResource(ram)
  103. self.disk = DiskResource(disk)
  104. self.gpus = GpuResource(gpus)
  105. def __eq__(self, other):
  106. if not isinstance(other, self.__class__):
  107. return NotImplemented
  108. return self.__dict__ == other.__dict__
  109. def __repr__(self):
  110. return str(self.__dict__)
  111. def to_dict(self):
  112. return {
  113. "cpus": self.cpus.to_dict(),
  114. "ram": self.ram.to_dict(),
  115. "disk": self.disk.to_dict(),
  116. "gpus": self.gpus.to_dict(),
  117. }
  118. @classmethod
  119. def from_dict(cls, resources_dict: dict):
  120. """Create resources from resources dict."""
  121. cpus = resources_dict["cpus"]["qty"]
  122. ram = resources_dict["ram"]["qty"]
  123. disk = resources_dict["disk"]["qty"]
  124. gpus = resources_dict["gpus"]["qty"]
  125. return cls(cpus=cpus, ram=ram, disk=disk, gpus=gpus)