datetime.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. #
  2. # Licensed to the Apache Software Foundation (ASF) under one
  3. # or more contributor license agreements. See the NOTICE file
  4. # distributed with this work for additional information
  5. # regarding copyright ownership. The ASF licenses this file
  6. # to you under the Apache License, Version 2.0 (the
  7. # "License"); you may not use this file except in compliance
  8. # with the License. You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing,
  13. # software distributed under the License is distributed on an
  14. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. # KIND, either express or implied. See the License for the
  16. # specific language governing permissions and limitations
  17. # under the License.
  18. from __future__ import annotations
  19. from typing import TYPE_CHECKING
  20. from airflow.serialization.serializers.timezone import (
  21. deserialize as deserialize_timezone,
  22. serialize as serialize_timezone,
  23. )
  24. from airflow.utils.module_loading import qualname
  25. from airflow.utils.timezone import parse_timezone
  26. if TYPE_CHECKING:
  27. import datetime
  28. from airflow.serialization.serde import U
  29. __version__ = 2
  30. serializers = ["datetime.date", "datetime.datetime", "datetime.timedelta", "pendulum.datetime.DateTime"]
  31. deserializers = serializers
  32. TIMESTAMP = "timestamp"
  33. TIMEZONE = "tz"
  34. def serialize(o: object) -> tuple[U, str, int, bool]:
  35. from datetime import date, datetime, timedelta
  36. if isinstance(o, datetime):
  37. qn = qualname(o)
  38. tz = serialize_timezone(o.tzinfo) if o.tzinfo else None
  39. return {TIMESTAMP: o.timestamp(), TIMEZONE: tz}, qn, __version__, True
  40. if isinstance(o, date):
  41. return o.isoformat(), qualname(o), __version__, True
  42. if isinstance(o, timedelta):
  43. return o.total_seconds(), qualname(o), __version__, True
  44. return "", "", 0, False
  45. def deserialize(classname: str, version: int, data: dict | str) -> datetime.date | datetime.timedelta:
  46. import datetime
  47. from pendulum import DateTime
  48. tz: datetime.tzinfo | None = None
  49. if isinstance(data, dict) and TIMEZONE in data:
  50. if version == 1:
  51. # try to deserialize unsupported timezones
  52. timezone_mapping = {
  53. "EDT": parse_timezone(-4 * 3600),
  54. "CDT": parse_timezone(-5 * 3600),
  55. "MDT": parse_timezone(-6 * 3600),
  56. "PDT": parse_timezone(-7 * 3600),
  57. "CEST": parse_timezone("CET"),
  58. }
  59. if data[TIMEZONE] in timezone_mapping:
  60. tz = timezone_mapping[data[TIMEZONE]]
  61. else:
  62. tz = parse_timezone(data[TIMEZONE])
  63. else:
  64. tz = (
  65. deserialize_timezone(data[TIMEZONE][1], data[TIMEZONE][2], data[TIMEZONE][0])
  66. if data[TIMEZONE]
  67. else None
  68. )
  69. if classname == qualname(datetime.datetime) and isinstance(data, dict):
  70. return datetime.datetime.fromtimestamp(float(data[TIMESTAMP]), tz=tz)
  71. if classname == qualname(DateTime) and isinstance(data, dict):
  72. return DateTime.fromtimestamp(float(data[TIMESTAMP]), tz=tz)
  73. if classname == qualname(datetime.timedelta) and isinstance(data, (str, float)):
  74. return datetime.timedelta(seconds=float(data))
  75. if classname == qualname(datetime.date) and isinstance(data, str):
  76. return datetime.date.fromisoformat(data)
  77. raise TypeError(f"unknown date/time format {classname}")