_utils.py 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. # Copyright 2016 Julien Danjou
  2. # Copyright 2016 Joshua Harlow
  3. # Copyright 2013-2014 Ray Holder
  4. #
  5. # Licensed under the Apache License, Version 2.0 (the "License");
  6. # you may not use this file except in compliance with the License.
  7. # You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. import functools
  17. import inspect
  18. import sys
  19. import typing
  20. from datetime import timedelta
  21. # sys.maxsize:
  22. # An integer giving the maximum value a variable of type Py_ssize_t can take.
  23. MAX_WAIT = sys.maxsize / 2
  24. def find_ordinal(pos_num: int) -> str:
  25. # See: https://en.wikipedia.org/wiki/English_numerals#Ordinal_numbers
  26. if pos_num == 0:
  27. return "th"
  28. elif pos_num == 1:
  29. return "st"
  30. elif pos_num == 2:
  31. return "nd"
  32. elif pos_num == 3:
  33. return "rd"
  34. elif 4 <= pos_num <= 20:
  35. return "th"
  36. else:
  37. return find_ordinal(pos_num % 10)
  38. def to_ordinal(pos_num: int) -> str:
  39. return f"{pos_num}{find_ordinal(pos_num)}"
  40. def get_callback_name(cb: typing.Callable[..., typing.Any]) -> str:
  41. """Get a callback fully-qualified name.
  42. If no name can be produced ``repr(cb)`` is called and returned.
  43. """
  44. segments = []
  45. try:
  46. segments.append(cb.__qualname__)
  47. except AttributeError:
  48. try:
  49. segments.append(cb.__name__)
  50. except AttributeError:
  51. pass
  52. if not segments:
  53. return repr(cb)
  54. else:
  55. try:
  56. # When running under sphinx it appears this can be none?
  57. if cb.__module__:
  58. segments.insert(0, cb.__module__)
  59. except AttributeError:
  60. pass
  61. return ".".join(segments)
  62. time_unit_type = typing.Union[int, float, timedelta]
  63. def to_seconds(time_unit: time_unit_type) -> float:
  64. return float(
  65. time_unit.total_seconds() if isinstance(time_unit, timedelta) else time_unit
  66. )
  67. def is_coroutine_callable(call: typing.Callable[..., typing.Any]) -> bool:
  68. if inspect.isclass(call):
  69. return False
  70. if inspect.iscoroutinefunction(call):
  71. return True
  72. partial_call = isinstance(call, functools.partial) and call.func
  73. dunder_call = partial_call or getattr(call, "__call__", None)
  74. return inspect.iscoroutinefunction(dunder_call)
  75. def wrap_to_async_func(
  76. call: typing.Callable[..., typing.Any],
  77. ) -> typing.Callable[..., typing.Awaitable[typing.Any]]:
  78. if is_coroutine_callable(call):
  79. return call
  80. async def inner(*args: typing.Any, **kwargs: typing.Any) -> typing.Any:
  81. return call(*args, **kwargs)
  82. return inner