123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- from __future__ import absolute_import, division, print_function
- import binascii
- import calendar
- import copy
- import datetime
- import math
- import platform
- import random
- import re
- import struct
- import sys
- import traceback as _traceback
- from time import time
- # as pytz is optional in thirdparty libs but we need it for good support under
- # python2, just test that it's well installed
- import pytz # noqa
- from dateutil.relativedelta import relativedelta
- from dateutil.tz import tzutc
- def is_32bit():
- """
- Detect if Python is running in 32-bit mode.
- Compatible with Python 2.6 and later versions.
- Returns True if running on 32-bit Python, False for 64-bit.
- """
- # Method 1: Check pointer size
- bits = struct.calcsize("P") * 8
- # Method 2: Check platform architecture string
- try:
- architecture = platform.architecture()[0]
- except RuntimeError:
- architecture = None
- # Method 3: Check maxsize (sys.maxint in Python 2)
- try:
- # Python 2
- is_small_maxsize = sys.maxint <= 2 ** 32
- except AttributeError:
- # Python 3
- is_small_maxsize = sys.maxsize <= 2 ** 32
- # Evaluate all available methods
- is_32 = False
- if bits == 32:
- is_32 = True
- elif architecture and "32" in architecture:
- is_32 = True
- elif is_small_maxsize:
- is_32 = True
- return is_32
- try:
- # https://github.com/python/cpython/issues/101069 detection
- if is_32bit():
- datetime.datetime.fromtimestamp(3999999999)
- OVERFLOW32B_MODE = False
- except OverflowError:
- OVERFLOW32B_MODE = True
- try:
- from collections import OrderedDict
- except ImportError:
- OrderedDict = dict # py26 degraded mode, expanders order will not be immutable
- try:
- # py3 recent
- UTC_DT = datetime.timezone.utc
- except AttributeError:
- UTC_DT = pytz.utc
- EPOCH = datetime.datetime.fromtimestamp(0, UTC_DT)
- # fmt: off
- M_ALPHAS = {
- "jan": 1, "feb": 2, "mar": 3, "apr": 4, # noqa: E241
- "may": 5, "jun": 6, "jul": 7, "aug": 8, # noqa: E241
- "sep": 9, "oct": 10, "nov": 11, "dec": 12,
- }
- DOW_ALPHAS = {
- "sun": 0, "mon": 1, "tue": 2, "wed": 3, "thu": 4, "fri": 5, "sat": 6
- }
- MINUTE_FIELD = 0
- HOUR_FIELD = 1
- DAY_FIELD = 2
- MONTH_FIELD = 3
- DOW_FIELD = 4
- SECOND_FIELD = 5
- YEAR_FIELD = 6
- UNIX_FIELDS = (MINUTE_FIELD, HOUR_FIELD, DAY_FIELD, MONTH_FIELD, DOW_FIELD) # noqa: E222
- SECOND_FIELDS = (MINUTE_FIELD, HOUR_FIELD, DAY_FIELD, MONTH_FIELD, DOW_FIELD, SECOND_FIELD) # noqa: E222
- YEAR_FIELDS = (MINUTE_FIELD, HOUR_FIELD, DAY_FIELD, MONTH_FIELD, DOW_FIELD, SECOND_FIELD, YEAR_FIELD) # noqa: E222
- # fmt: on
- step_search_re = re.compile(r"^([^-]+)-([^-/]+)(/(\d+))?$")
- only_int_re = re.compile(r"^\d+$")
- WEEKDAYS = "|".join(DOW_ALPHAS.keys())
- MONTHS = "|".join(M_ALPHAS.keys())
- star_or_int_re = re.compile(r"^(\d+|\*)$")
- special_dow_re = re.compile(
- (r"^(?P<pre>((?P<he>(({WEEKDAYS})(-({WEEKDAYS}))?)").format(WEEKDAYS=WEEKDAYS)
- + (r"|(({MONTHS})(-({MONTHS}))?)|\w+)#)|l)(?P<last>\d+)$").format(MONTHS=MONTHS)
- )
- re_star = re.compile("[*]")
- hash_expression_re = re.compile(
- r"^(?P<hash_type>h|r)(\((?P<range_begin>\d+)-(?P<range_end>\d+)\))?(\/(?P<divisor>\d+))?$"
- )
- CRON_FIELDS = {
- "unix": UNIX_FIELDS,
- "second": SECOND_FIELDS,
- "year": YEAR_FIELDS,
- len(UNIX_FIELDS): UNIX_FIELDS,
- len(SECOND_FIELDS): SECOND_FIELDS,
- len(YEAR_FIELDS): YEAR_FIELDS,
- }
- UNIX_CRON_LEN = len(UNIX_FIELDS)
- SECOND_CRON_LEN = len(SECOND_FIELDS)
- YEAR_CRON_LEN = len(YEAR_FIELDS)
- # retrocompat
- VALID_LEN_EXPRESSION = set(a for a in CRON_FIELDS if isinstance(a, int))
- TIMESTAMP_TO_DT_CACHE = {}
- EXPRESSIONS = {}
- MARKER = object()
- def timedelta_to_seconds(td):
- return (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10 ** 6) / 10 ** 6
- def datetime_to_timestamp(d):
- if d.tzinfo is not None:
- d = d.replace(tzinfo=None) - d.utcoffset()
- return timedelta_to_seconds(d - datetime.datetime(1970, 1, 1))
- class CroniterError(ValueError):
- """General top-level Croniter base exception"""
- class CroniterBadTypeRangeError(TypeError):
- """."""
- class CroniterBadCronError(CroniterError):
- """Syntax, unknown value, or range error within a cron expression"""
- class CroniterUnsupportedSyntaxError(CroniterBadCronError):
- """Valid cron syntax, but likely to produce inaccurate results"""
- # Extending CroniterBadCronError, which may be contridatory, but this allows
- # catching both errors with a single exception. From a user perspective
- # these will likely be handled the same way.
- class CroniterBadDateError(CroniterError):
- """Unable to find next/prev timestamp match"""
- class CroniterNotAlphaError(CroniterBadCronError):
- """Cron syntax contains an invalid day or month abbreviation"""
- class croniter(object):
- MONTHS_IN_YEAR = 12
- # This helps with expanding `*` fields into `lower-upper` ranges. Each item
- # in this tuple maps to the corresponding field index
- RANGES = (
- (0, 59),
- (0, 23),
- (1, 31),
- (1, 12),
- (0, 6),
- (0, 59),
- (1970, 2099),
- )
- DAYS = (31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31)
- ALPHACONV = (
- {}, # 0: min
- {}, # 1: hour
- {"l": "l"}, # 2: dom
- # 3: mon
- copy.deepcopy(M_ALPHAS),
- # 4: dow
- copy.deepcopy(DOW_ALPHAS),
- # 5: second
- {},
- # 6: year
- {},
- )
- LOWMAP = (
- {},
- {},
- {0: 1},
- {0: 1},
- {7: 0},
- {},
- {},
- )
- LEN_MEANS_ALL = (
- 60,
- 24,
- 31,
- 12,
- 7,
- 60,
- 130,
- )
- def __init__(
- self,
- expr_format,
- start_time=None,
- ret_type=float,
- day_or=True,
- max_years_between_matches=None,
- is_prev=False,
- hash_id=None,
- implement_cron_bug=False,
- second_at_beginning=None,
- expand_from_start_time=False,
- ):
- self._ret_type = ret_type
- self._day_or = day_or
- self._implement_cron_bug = implement_cron_bug
- self.second_at_beginning = bool(second_at_beginning)
- self._expand_from_start_time = expand_from_start_time
- if hash_id:
- if not isinstance(hash_id, (bytes, str)):
- raise TypeError("hash_id must be bytes or UTF-8 string")
- if not isinstance(hash_id, bytes):
- hash_id = hash_id.encode("UTF-8")
- self._max_years_btw_matches_explicitly_set = max_years_between_matches is not None
- if not self._max_years_btw_matches_explicitly_set:
- max_years_between_matches = 50
- self._max_years_between_matches = max(int(max_years_between_matches), 1)
- if start_time is None:
- start_time = time()
- self.tzinfo = None
- self.start_time = None
- self.dst_start_time = None
- self.cur = None
- self.set_current(start_time, force=False)
- self.expanded, self.nth_weekday_of_month = self.expand(
- expr_format,
- hash_id=hash_id,
- from_timestamp=self.dst_start_time if self._expand_from_start_time else None,
- second_at_beginning=second_at_beginning,
- )
- self.fields = CRON_FIELDS[len(self.expanded)]
- self.expressions = EXPRESSIONS[(expr_format, hash_id, second_at_beginning)]
- self._is_prev = is_prev
- @classmethod
- def _alphaconv(cls, index, key, expressions):
- try:
- return cls.ALPHACONV[index][key]
- except KeyError:
- raise CroniterNotAlphaError("[{0}] is not acceptable".format(" ".join(expressions)))
- def get_next(self, ret_type=None, start_time=None, update_current=True):
- if start_time and self._expand_from_start_time:
- raise ValueError("start_time is not supported when using expand_from_start_time = True.")
- return self._get_next(
- ret_type=ret_type,
- start_time=start_time,
- is_prev=False,
- update_current=update_current,
- )
- def get_prev(self, ret_type=None, start_time=None, update_current=True):
- return self._get_next(
- ret_type=ret_type,
- start_time=start_time,
- is_prev=True,
- update_current=update_current,
- )
- def get_current(self, ret_type=None):
- ret_type = ret_type or self._ret_type
- if issubclass(ret_type, datetime.datetime):
- return self.timestamp_to_datetime(self.cur)
- return self.cur
- def set_current(self, start_time, force=True):
- if (force or (self.cur is None)) and start_time is not None:
- if isinstance(start_time, datetime.datetime):
- self.tzinfo = start_time.tzinfo
- start_time = self.datetime_to_timestamp(start_time)
- self.start_time = start_time
- self.dst_start_time = start_time
- self.cur = start_time
- return self.cur
- @staticmethod
- def datetime_to_timestamp(d):
- """
- Converts a `datetime` object `d` into a UNIX timestamp.
- """
- return datetime_to_timestamp(d)
- _datetime_to_timestamp = datetime_to_timestamp # retrocompat
- def timestamp_to_datetime(self, timestamp, tzinfo=MARKER):
- """
- Converts a UNIX `timestamp` into a `datetime` object.
- """
- if tzinfo is MARKER: # allow to give tzinfo=None even if self.tzinfo is set
- tzinfo = self.tzinfo
- k = timestamp
- if tzinfo:
- k = (timestamp, repr(tzinfo))
- try:
- return TIMESTAMP_TO_DT_CACHE[k]
- except KeyError:
- pass
- if OVERFLOW32B_MODE:
- # degraded mode to workaround Y2038
- # see https://github.com/python/cpython/issues/101069
- result = EPOCH.replace(tzinfo=None) + datetime.timedelta(seconds=timestamp)
- else:
- result = datetime.datetime.fromtimestamp(timestamp, tz=tzutc()).replace(tzinfo=None)
- if tzinfo:
- result = result.replace(tzinfo=UTC_DT).astimezone(tzinfo)
- TIMESTAMP_TO_DT_CACHE[(result, repr(result.tzinfo))] = result
- return result
- _timestamp_to_datetime = timestamp_to_datetime # retrocompat
- @staticmethod
- def timedelta_to_seconds(td):
- """
- Converts a 'datetime.timedelta' object `td` into seconds contained in
- the duration.
- Note: We cannot use `timedelta.total_seconds()` because this is not
- supported by Python 2.6.
- """
- return timedelta_to_seconds(td)
- _timedelta_to_seconds = timedelta_to_seconds # retrocompat
- def _get_next(
- self,
- ret_type=None,
- start_time=None,
- is_prev=None,
- update_current=None,
- ):
- if update_current is None:
- update_current = True
- self.set_current(start_time, force=True)
- if is_prev is None:
- is_prev = self._is_prev
- self._is_prev = is_prev
- expanded = self.expanded[:]
- nth_weekday_of_month = self.nth_weekday_of_month.copy()
- ret_type = ret_type or self._ret_type
- if not issubclass(ret_type, (float, datetime.datetime)):
- raise TypeError("Invalid ret_type, only 'float' or 'datetime' is acceptable.")
- # exception to support day of month and day of week as defined in cron
- dom_dow_exception_processed = False
- if (expanded[DAY_FIELD][0] != "*" and expanded[DOW_FIELD][0] != "*") and self._day_or:
- # If requested, handle a bug in vixie cron/ISC cron where day_of_month and day_of_week form
- # an intersection (AND) instead of a union (OR) if either field is an asterisk or starts with an asterisk
- # (https://crontab.guru/cron-bug.html)
- if self._implement_cron_bug and (
- re_star.match(self.expressions[DAY_FIELD]) or re_star.match(self.expressions[DOW_FIELD])
- ):
- # To produce a schedule identical to the cron bug, we'll bypass the code that
- # makes a union of DOM and DOW, and instead skip to the code that does an intersect instead
- pass
- else:
- bak = expanded[DOW_FIELD]
- expanded[DOW_FIELD] = ["*"]
- t1 = self._calc(self.cur, expanded, nth_weekday_of_month, is_prev)
- expanded[DOW_FIELD] = bak
- expanded[DAY_FIELD] = ["*"]
- t2 = self._calc(self.cur, expanded, nth_weekday_of_month, is_prev)
- if not is_prev:
- result = t1 if t1 < t2 else t2
- else:
- result = t1 if t1 > t2 else t2
- dom_dow_exception_processed = True
- if not dom_dow_exception_processed:
- result = self._calc(self.cur, expanded, nth_weekday_of_month, is_prev)
- # DST Handling for cron job spanning across days
- dtstarttime = self._timestamp_to_datetime(self.dst_start_time)
- dtstarttime_utcoffset = dtstarttime.utcoffset() or datetime.timedelta(0)
- dtresult = self.timestamp_to_datetime(result)
- lag = lag_hours = 0
- # do we trigger DST on next crontab (handle backward changes)
- dtresult_utcoffset = dtstarttime_utcoffset
- if dtresult and self.tzinfo:
- dtresult_utcoffset = dtresult.utcoffset()
- lag_hours = self._timedelta_to_seconds(dtresult - dtstarttime) / (60 * 60)
- lag = self._timedelta_to_seconds(dtresult_utcoffset - dtstarttime_utcoffset)
- hours_before_midnight = 24 - dtstarttime.hour
- if dtresult_utcoffset != dtstarttime_utcoffset:
- if (lag > 0 and abs(lag_hours) >= hours_before_midnight) or (
- lag < 0 and ((3600 * abs(lag_hours) + abs(lag)) >= hours_before_midnight * 3600)
- ):
- dtresult_adjusted = dtresult - datetime.timedelta(seconds=lag)
- result_adjusted = self._datetime_to_timestamp(dtresult_adjusted)
- # Do the actual adjust only if the result time actually exists
- if self._timestamp_to_datetime(result_adjusted).tzinfo == dtresult_adjusted.tzinfo:
- dtresult = dtresult_adjusted
- result = result_adjusted
- self.dst_start_time = result
- if update_current:
- self.cur = result
- if issubclass(ret_type, datetime.datetime):
- result = dtresult
- return result
- # iterator protocol, to enable direct use of croniter
- # objects in a loop, like "for dt in croniter("5 0 * * *'): ..."
- # or for combining multiple croniters into single
- # dates feed using 'itertools' module
- def all_next(self, ret_type=None, start_time=None, update_current=None):
- """
- Returns a generator yielding consecutive dates.
- May be used instead of an implicit call to __iter__ whenever a
- non-default `ret_type` needs to be specified.
- """
- # In a Python 3.7+ world: contextlib.suppress and contextlib.nullcontext could be used instead
- try:
- while True:
- self._is_prev = False
- yield self._get_next(
- ret_type=ret_type,
- start_time=start_time,
- update_current=update_current,
- )
- start_time = None
- except CroniterBadDateError:
- if self._max_years_btw_matches_explicitly_set:
- return
- raise
- def all_prev(self, ret_type=None, start_time=None, update_current=None):
- """
- Returns a generator yielding previous dates.
- """
- try:
- while True:
- self._is_prev = True
- yield self._get_next(
- ret_type=ret_type,
- start_time=start_time,
- update_current=update_current,
- )
- start_time = None
- except CroniterBadDateError:
- if self._max_years_btw_matches_explicitly_set:
- return
- raise
- def iter(self, *args, **kwargs):
- return self.all_prev if self._is_prev else self.all_next
- def __iter__(self):
- return self
- __next__ = next = _get_next
- def _calc(self, now, expanded, nth_weekday_of_month, is_prev):
- if is_prev:
- now = math.ceil(now)
- nearest_diff_method = self._get_prev_nearest_diff
- sign = -1
- offset = 1 if (len(expanded) > UNIX_CRON_LEN or now % 60 > 0) else 60
- else:
- now = math.floor(now)
- nearest_diff_method = self._get_next_nearest_diff
- sign = 1
- offset = 1 if (len(expanded) > UNIX_CRON_LEN) else 60
- dst = now = self.timestamp_to_datetime(now + sign * offset)
- month, year = dst.month, dst.year
- current_year = now.year
- DAYS = self.DAYS
- def proc_year(d):
- if len(expanded) == YEAR_CRON_LEN:
- try:
- expanded[YEAR_FIELD].index("*")
- except ValueError:
- # use None as range_val to indicate no loop
- diff_year = nearest_diff_method(d.year, expanded[YEAR_FIELD], None)
- if diff_year is None:
- return None, d
- if diff_year != 0:
- if is_prev:
- d += relativedelta(
- years=diff_year,
- month=12,
- day=31,
- hour=23,
- minute=59,
- second=59,
- )
- else:
- d += relativedelta(
- years=diff_year,
- month=1,
- day=1,
- hour=0,
- minute=0,
- second=0,
- )
- return True, d
- return False, d
- def proc_month(d):
- try:
- expanded[MONTH_FIELD].index("*")
- except ValueError:
- diff_month = nearest_diff_method(d.month, expanded[MONTH_FIELD], self.MONTHS_IN_YEAR)
- reset_day = 1
- if diff_month is not None and diff_month != 0:
- if is_prev:
- d += relativedelta(months=diff_month)
- reset_day = DAYS[d.month - 1]
- if d.month == 2 and self.is_leap(d.year) is True:
- reset_day += 1
- d += relativedelta(day=reset_day, hour=23, minute=59, second=59)
- else:
- d += relativedelta(months=diff_month, day=reset_day, hour=0, minute=0, second=0)
- return True, d
- return False, d
- def proc_day_of_month(d):
- try:
- expanded[DAY_FIELD].index("*")
- except ValueError:
- days = DAYS[month - 1]
- if month == 2 and self.is_leap(year) is True:
- days += 1
- if "l" in expanded[DAY_FIELD] and days == d.day:
- return False, d
- if is_prev:
- days_in_prev_month = DAYS[(month - 2) % self.MONTHS_IN_YEAR]
- diff_day = nearest_diff_method(d.day, expanded[DAY_FIELD], days_in_prev_month)
- else:
- diff_day = nearest_diff_method(d.day, expanded[DAY_FIELD], days)
- if diff_day is not None and diff_day != 0:
- if is_prev:
- d += relativedelta(days=diff_day, hour=23, minute=59, second=59)
- else:
- d += relativedelta(days=diff_day, hour=0, minute=0, second=0)
- return True, d
- return False, d
- def proc_day_of_week(d):
- try:
- expanded[DOW_FIELD].index("*")
- except ValueError:
- diff_day_of_week = nearest_diff_method(d.isoweekday() % 7, expanded[DOW_FIELD], 7)
- if diff_day_of_week is not None and diff_day_of_week != 0:
- if is_prev:
- d += relativedelta(days=diff_day_of_week, hour=23, minute=59, second=59)
- else:
- d += relativedelta(days=diff_day_of_week, hour=0, minute=0, second=0)
- return True, d
- return False, d
- def proc_day_of_week_nth(d):
- if "*" in nth_weekday_of_month:
- s = nth_weekday_of_month["*"]
- for i in range(0, 7):
- if i in nth_weekday_of_month:
- nth_weekday_of_month[i].update(s)
- else:
- nth_weekday_of_month[i] = s
- del nth_weekday_of_month["*"]
- candidates = []
- for wday, nth in nth_weekday_of_month.items():
- c = self._get_nth_weekday_of_month(d.year, d.month, wday)
- for n in nth:
- if n == "l":
- candidate = c[-1]
- elif len(c) < n:
- continue
- else:
- candidate = c[n - 1]
- if (is_prev and candidate <= d.day) or (not is_prev and d.day <= candidate):
- candidates.append(candidate)
- if not candidates:
- if is_prev:
- d += relativedelta(days=-d.day, hour=23, minute=59, second=59)
- else:
- days = DAYS[month - 1]
- if month == 2 and self.is_leap(year) is True:
- days += 1
- d += relativedelta(days=(days - d.day + 1), hour=0, minute=0, second=0)
- return True, d
- candidates.sort()
- diff_day = (candidates[-1] if is_prev else candidates[0]) - d.day
- if diff_day != 0:
- if is_prev:
- d += relativedelta(days=diff_day, hour=23, minute=59, second=59)
- else:
- d += relativedelta(days=diff_day, hour=0, minute=0, second=0)
- return True, d
- return False, d
- def proc_hour(d):
- try:
- expanded[HOUR_FIELD].index("*")
- except ValueError:
- diff_hour = nearest_diff_method(d.hour, expanded[HOUR_FIELD], 24)
- if diff_hour is not None and diff_hour != 0:
- if is_prev:
- d += relativedelta(hours=diff_hour, minute=59, second=59)
- else:
- d += relativedelta(hours=diff_hour, minute=0, second=0)
- return True, d
- return False, d
- def proc_minute(d):
- try:
- expanded[MINUTE_FIELD].index("*")
- except ValueError:
- diff_min = nearest_diff_method(d.minute, expanded[MINUTE_FIELD], 60)
- if diff_min is not None and diff_min != 0:
- if is_prev:
- d += relativedelta(minutes=diff_min, second=59)
- else:
- d += relativedelta(minutes=diff_min, second=0)
- return True, d
- return False, d
- def proc_second(d):
- if len(expanded) > UNIX_CRON_LEN:
- try:
- expanded[SECOND_FIELD].index("*")
- except ValueError:
- diff_sec = nearest_diff_method(d.second, expanded[SECOND_FIELD], 60)
- if diff_sec is not None and diff_sec != 0:
- d += relativedelta(seconds=diff_sec)
- return True, d
- else:
- d += relativedelta(second=0)
- return False, d
- procs = [
- proc_year,
- proc_month,
- proc_day_of_month,
- (proc_day_of_week_nth if nth_weekday_of_month else proc_day_of_week),
- proc_hour,
- proc_minute,
- proc_second,
- ]
- while abs(year - current_year) <= self._max_years_between_matches:
- next = False
- stop = False
- for proc in procs:
- (changed, dst) = proc(dst)
- # `None` can be set mostly for year processing
- # so please see proc_year / _get_prev_nearest_diff / _get_next_nearest_diff
- if changed is None:
- stop = True
- break
- if changed:
- month, year = dst.month, dst.year
- next = True
- break
- if stop:
- break
- if next:
- continue
- return self.datetime_to_timestamp(dst.replace(microsecond=0))
- if is_prev:
- raise CroniterBadDateError("failed to find prev date")
- raise CroniterBadDateError("failed to find next date")
- @staticmethod
- def _get_next_nearest(x, to_check):
- small = [item for item in to_check if item < x]
- large = [item for item in to_check if item >= x]
- large.extend(small)
- return large[0]
- @staticmethod
- def _get_prev_nearest(x, to_check):
- small = [item for item in to_check if item <= x]
- large = [item for item in to_check if item > x]
- small.reverse()
- large.reverse()
- small.extend(large)
- return small[0]
- @staticmethod
- def _get_next_nearest_diff(x, to_check, range_val):
- """
- `range_val` is the range of a field.
- If no available time, we can move to next loop(like next month).
- `range_val` can also be set to `None` to indicate that there is no loop.
- ( Currently, should only used for `year` field )
- """
- for i, d in enumerate(to_check):
- if d == "l" and range_val is not None:
- # if 'l' then it is the last day of month
- # => its value of range_val
- d = range_val
- if d >= x:
- return d - x
- # When range_val is None and x not exists in to_check,
- # `None` will be returned to suggest no more available time
- if range_val is None:
- return None
- return to_check[0] - x + range_val
- @staticmethod
- def _get_prev_nearest_diff(x, to_check, range_val):
- """
- `range_val` is the range of a field.
- If no available time, we can move to previous loop(like previous month).
- Range_val can also be set to `None` to indicate that there is no loop.
- ( Currently should only used for `year` field )
- """
- candidates = to_check[:]
- candidates.reverse()
- for d in candidates:
- if d != "l" and d <= x:
- return d - x
- if "l" in candidates:
- return -x
- # When range_val is None and x not exists in to_check,
- # `None` will be returned to suggest no more available time
- if range_val is None:
- return None
- candidate = candidates[0]
- for c in candidates:
- # fixed: c < range_val
- # this code will reject all 31 day of month, 12 month, 59 second,
- # 23 hour and so on.
- # if candidates has just a element, this will not harmful.
- # but candidates have multiple elements, then values equal to
- # range_val will rejected.
- if c <= range_val:
- candidate = c
- break
- # fix crontab "0 6 30 3 *" condidates only a element, then get_prev error return 2021-03-02 06:00:00
- if candidate > range_val:
- return -range_val
- return candidate - x - range_val
- @staticmethod
- def _get_nth_weekday_of_month(year, month, day_of_week):
- """For a given year/month return a list of days in nth-day-of-month order.
- The last weekday of the month is always [-1].
- """
- w = (day_of_week + 6) % 7
- c = calendar.Calendar(w).monthdayscalendar(year, month)
- if c[0][0] == 0:
- c.pop(0)
- return tuple(i[0] for i in c)
- @staticmethod
- def is_leap(year):
- return bool(year % 400 == 0 or (year % 4 == 0 and year % 100 != 0))
- @classmethod
- def value_alias(cls, val, field_index, len_expressions=UNIX_CRON_LEN):
- if isinstance(len_expressions, (list, dict, tuple, set)):
- len_expressions = len(len_expressions)
- if val in cls.LOWMAP[field_index] and not (
- # do not support 0 as a month either for classical 5 fields cron,
- # 6fields second repeat form or 7 fields year form
- # but still let conversion happen if day field is shifted
- (field_index in [DAY_FIELD, MONTH_FIELD] and len_expressions == UNIX_CRON_LEN)
- or (field_index in [MONTH_FIELD, DOW_FIELD] and len_expressions == SECOND_CRON_LEN)
- or (field_index in [DAY_FIELD, MONTH_FIELD, DOW_FIELD] and len_expressions == YEAR_CRON_LEN)
- ):
- val = cls.LOWMAP[field_index][val]
- return val
- @classmethod
- def _expand(
- cls,
- expr_format,
- hash_id=None,
- second_at_beginning=False,
- from_timestamp=None,
- ):
- # Split the expression in components, and normalize L -> l, MON -> mon,
- # etc. Keep expr_format untouched so we can use it in the exception
- # messages.
- expr_aliases = {
- "@midnight": ("0 0 * * *", "h h(0-2) * * * h"),
- "@hourly": ("0 * * * *", "h * * * * h"),
- "@daily": ("0 0 * * *", "h h * * * h"),
- "@weekly": ("0 0 * * 0", "h h * * h h"),
- "@monthly": ("0 0 1 * *", "h h h * * h"),
- "@yearly": ("0 0 1 1 *", "h h h h * h"),
- "@annually": ("0 0 1 1 *", "h h h h * h"),
- }
- efl = expr_format.lower()
- hash_id_expr = 1 if hash_id is not None else 0
- try:
- efl = expr_aliases[efl][hash_id_expr]
- except KeyError:
- pass
- expressions = efl.split()
- if len(expressions) not in VALID_LEN_EXPRESSION:
- raise CroniterBadCronError("Exactly 5, 6 or 7 columns has to be specified for iterator expression.")
- if len(expressions) > UNIX_CRON_LEN and second_at_beginning:
- # move second to it's own(6th) field to process by same logical
- expressions.insert(SECOND_FIELD, expressions.pop(0))
- expanded = []
- nth_weekday_of_month = {}
- for field_index, expr in enumerate(expressions):
- for expanderid, expander in EXPANDERS.items():
- expr = expander(cls).expand(
- efl,
- field_index,
- expr,
- hash_id=hash_id,
- from_timestamp=from_timestamp,
- )
- if "?" in expr:
- if expr != "?":
- raise CroniterBadCronError(
- "[{0}] is not acceptable. Question mark can not used with other characters".format(expr_format)
- )
- if field_index not in [DAY_FIELD, DOW_FIELD]:
- raise CroniterBadCronError(
- "[{0}] is not acceptable. Question mark can only used in day_of_month or day_of_week".format(
- expr_format
- )
- )
- # currently just trade `?` as `*`
- expr = "*"
- e_list = expr.split(",")
- res = []
- while len(e_list) > 0:
- e = e_list.pop()
- nth = None
- if field_index == DOW_FIELD:
- # Handle special case in the dow expression: 2#3, l3
- special_dow_rem = special_dow_re.match(str(e))
- if special_dow_rem:
- g = special_dow_rem.groupdict()
- he, last = g.get("he", ""), g.get("last", "")
- if he:
- e = he
- try:
- nth = int(last)
- assert 5 >= nth >= 1
- except (KeyError, ValueError, AssertionError):
- raise CroniterBadCronError(
- "[{0}] is not acceptable. Invalid day_of_week value: '{1}'".format(
- expr_format, nth
- )
- )
- elif last:
- e = last
- nth = g["pre"] # 'l'
- # Before matching step_search_re, normalize "*" to "{min}-{max}".
- # Example: in the minute field, "*/5" normalizes to "0-59/5"
- t = re.sub(
- r"^\*(\/.+)$",
- r"%d-%d\1" % (cls.RANGES[field_index][0], cls.RANGES[field_index][1]),
- str(e),
- )
- m = step_search_re.search(t)
- if not m:
- # Before matching step_search_re,
- # normalize "{start}/{step}" to "{start}-{max}/{step}".
- # Example: in the minute field, "10/5" normalizes to "10-59/5"
- t = re.sub(
- r"^(.+)\/(.+)$",
- r"\1-%d/\2" % (cls.RANGES[field_index][1]),
- str(e),
- )
- m = step_search_re.search(t)
- if m:
- # early abort if low/high are out of bounds
- (low, high, step) = m.group(1), m.group(2), m.group(4) or 1
- if field_index == DAY_FIELD and high == "l":
- high = "31"
- if not only_int_re.search(low):
- low = "{0}".format(cls._alphaconv(field_index, low, expressions))
- if not only_int_re.search(high):
- high = "{0}".format(cls._alphaconv(field_index, high, expressions))
- # normally, it's already guarded by the RE that should not accept not-int values.
- if not only_int_re.search(str(step)):
- raise CroniterBadCronError(
- "[{0}] step '{2}' in field {1} is not acceptable".format(expr_format, field_index, step)
- )
- step = int(step)
- for band in low, high:
- if not only_int_re.search(str(band)):
- raise CroniterBadCronError(
- "[{0}] bands '{2}-{3}' in field {1} are not acceptable".format(
- expr_format, field_index, low, high
- )
- )
- low, high = [cls.value_alias(int(_val), field_index, expressions) for _val in (low, high)]
- if max(low, high) > max(cls.RANGES[field_index][0], cls.RANGES[field_index][1]):
- raise CroniterBadCronError("{0} is out of bands".format(expr_format))
- if from_timestamp:
- low = cls._get_low_from_current_date_number(field_index, int(step), int(from_timestamp))
- # Handle when the second bound of the range is in backtracking order:
- # eg: X-Sun or X-7 (Sat-Sun) in DOW, or X-Jan (Apr-Jan) in MONTH
- if low > high:
- whole_field_range = list(
- range(
- cls.RANGES[field_index][0],
- cls.RANGES[field_index][1] + 1,
- 1,
- )
- )
- # Add FirstBound -> ENDRANGE, respecting step
- rng = list(range(low, cls.RANGES[field_index][1] + 1, step))
- # Then 0 -> SecondBound, but skipping n first occurences according to step
- # EG to respect such expressions : Apr-Jan/3
- to_skip = 0
- if rng:
- already_skipped = list(reversed(whole_field_range)).index(rng[-1])
- curpos = whole_field_range.index(rng[-1])
- if ((curpos + step) > len(whole_field_range)) and (already_skipped < step):
- to_skip = step - already_skipped
- rng += list(range(cls.RANGES[field_index][0] + to_skip, high + 1, step))
- # if we include a range type: Jan-Jan, or Sun-Sun,
- # it means the whole cycle (all days of week, # all monthes of year, etc)
- elif low == high:
- rng = list(
- range(
- cls.RANGES[field_index][0],
- cls.RANGES[field_index][1] + 1,
- step,
- )
- )
- else:
- try:
- rng = list(range(low, high + 1, step))
- except ValueError as exc:
- raise CroniterBadCronError("invalid range: {0}".format(exc))
- rng = (
- ["{0}#{1}".format(item, nth) for item in rng]
- if field_index == DOW_FIELD and nth and nth != "l"
- else rng
- )
- e_list += [a for a in rng if a not in e_list]
- else:
- if t.startswith("-"):
- raise CroniterBadCronError(
- "[{0}] is not acceptable," "negative numbers not allowed".format(expr_format)
- )
- if not star_or_int_re.search(t):
- t = cls._alphaconv(field_index, t, expressions)
- try:
- t = int(t)
- except ValueError:
- pass
- t = cls.value_alias(t, field_index, expressions)
- if t not in ["*", "l"] and (
- int(t) < cls.RANGES[field_index][0] or int(t) > cls.RANGES[field_index][1]
- ):
- raise CroniterBadCronError("[{0}] is not acceptable, out of range".format(expr_format))
- res.append(t)
- if field_index == DOW_FIELD and nth:
- if t not in nth_weekday_of_month:
- nth_weekday_of_month[t] = set()
- nth_weekday_of_month[t].add(nth)
- res = set(res)
- res = sorted(res, key=lambda i: "{:02}".format(i) if isinstance(i, int) else i)
- if len(res) == cls.LEN_MEANS_ALL[field_index]:
- # Make sure the wildcard is used in the correct way (avoid over-optimization)
- if (field_index == DAY_FIELD and "*" not in expressions[DOW_FIELD]) or (
- field_index == DOW_FIELD and "*" not in expressions[DAY_FIELD]
- ):
- pass
- else:
- res = ["*"]
- expanded.append(["*"] if (len(res) == 1 and res[0] == "*") else res)
- # Check to make sure the dow combo in use is supported
- if nth_weekday_of_month:
- dow_expanded_set = set(expanded[DOW_FIELD])
- dow_expanded_set = dow_expanded_set.difference(nth_weekday_of_month.keys())
- dow_expanded_set.discard("*")
- # Skip: if it's all weeks instead of wildcard
- if dow_expanded_set and len(set(expanded[DOW_FIELD])) != cls.LEN_MEANS_ALL[DOW_FIELD]:
- raise CroniterUnsupportedSyntaxError(
- "day-of-week field does not support mixing literal values and nth day of week syntax. "
- "Cron: '{}' dow={} vs nth={}".format(expr_format, dow_expanded_set, nth_weekday_of_month)
- )
- EXPRESSIONS[(expr_format, hash_id, second_at_beginning)] = expressions
- return expanded, nth_weekday_of_month
- @classmethod
- def expand(
- cls,
- expr_format,
- hash_id=None,
- second_at_beginning=False,
- from_timestamp=None,
- ):
- """
- Expand a cron expression format into a noramlized format of
- list[list[int | 'l' | '*']]. The first list representing each element
- of the epxression, and each sub-list representing the allowed values
- for that expression component.
- A tuple is returned, the first value being the expanded epxression
- list, and the second being a `nth_weekday_of_month` mapping.
- Examples:
- # Every minute
- >>> croniter.expand('* * * * *')
- ([['*'], ['*'], ['*'], ['*'], ['*']], {})
- # On the hour
- >>> croniter.expand('0 0 * * *')
- ([[0], [0], ['*'], ['*'], ['*']], {})
- # Hours 0-5 and 10 monday through friday
- >>> croniter.expand('0-5,10 * * * mon-fri')
- ([[0, 1, 2, 3, 4, 5, 10], ['*'], ['*'], ['*'], [1, 2, 3, 4, 5]], {})
- Note that some special values such as nth day of week are expanded to a
- special mapping format for later processing:
- # Every minute on the 3rd tuesday of the month
- >>> croniter.expand('* * * * 2#3')
- ([['*'], ['*'], ['*'], ['*'], [2]], {2: {3}})
- # Every hour on the last day of the month
- >>> croniter.expand('0 * l * *')
- ([[0], ['*'], ['l'], ['*'], ['*']], {})
- # On the hour every 15 seconds
- >>> croniter.expand('0 0 * * * */15')
- ([[0], [0], ['*'], ['*'], ['*'], [0, 15, 30, 45]], {})
- """
- try:
- return cls._expand(
- expr_format,
- hash_id=hash_id,
- second_at_beginning=second_at_beginning,
- from_timestamp=from_timestamp,
- )
- except (ValueError,) as exc:
- if isinstance(exc, CroniterError):
- raise
- if int(sys.version[0]) >= 3:
- trace = _traceback.format_exc()
- raise CroniterBadCronError(trace)
- raise CroniterBadCronError("{0}".format(exc))
- @classmethod
- def _get_low_from_current_date_number(cls, field_index, step, from_timestamp):
- dt = datetime.datetime.fromtimestamp(from_timestamp, tz=UTC_DT)
- if field_index == MINUTE_FIELD:
- return dt.minute % step
- if field_index == HOUR_FIELD:
- return dt.hour % step
- if field_index == DAY_FIELD:
- return ((dt.day - 1) % step) + 1
- if field_index == MONTH_FIELD:
- return dt.month % step
- if field_index == DOW_FIELD:
- return (dt.weekday() + 1) % step
- raise ValueError("Can't get current date number for index larger than 4")
- @classmethod
- def is_valid(
- cls,
- expression,
- hash_id=None,
- encoding="UTF-8",
- second_at_beginning=False,
- ):
- if hash_id:
- if not isinstance(hash_id, (bytes, str)):
- raise TypeError("hash_id must be bytes or UTF-8 string")
- if not isinstance(hash_id, bytes):
- hash_id = hash_id.encode(encoding)
- try:
- cls.expand(expression, hash_id=hash_id, second_at_beginning=second_at_beginning)
- except CroniterError:
- return False
- return True
- @classmethod
- def match(cls, cron_expression, testdate, day_or=True, second_at_beginning=False):
- return cls.match_range(cron_expression, testdate, testdate, day_or, second_at_beginning)
- @classmethod
- def match_range(
- cls,
- cron_expression,
- from_datetime,
- to_datetime,
- day_or=True,
- second_at_beginning=False,
- ):
- cron = cls(
- cron_expression,
- to_datetime,
- ret_type=datetime.datetime,
- day_or=day_or,
- second_at_beginning=second_at_beginning,
- )
- tdp = cron.get_current(datetime.datetime)
- if not tdp.microsecond:
- tdp += relativedelta(microseconds=1)
- cron.set_current(tdp, force=True)
- try:
- tdt = cron.get_prev()
- except CroniterBadDateError:
- return False
- precision_in_seconds = 1 if len(cron.expanded) > UNIX_CRON_LEN else 60
- duration_in_second = (to_datetime - from_datetime).total_seconds() + precision_in_seconds
- return (max(tdp, tdt) - min(tdp, tdt)).total_seconds() < duration_in_second
- def croniter_range(
- start,
- stop,
- expr_format,
- ret_type=None,
- day_or=True,
- exclude_ends=False,
- _croniter=None,
- second_at_beginning=False,
- expand_from_start_time=False,
- ):
- """
- Generator that provides all times from start to stop matching the given cron expression.
- If the cron expression matches either 'start' and/or 'stop', those times will be returned as
- well unless 'exclude_ends=True' is passed.
- You can think of this function as sibling to the builtin range function for datetime objects.
- Like range(start,stop,step), except that here 'step' is a cron expression.
- """
- _croniter = _croniter or croniter
- auto_rt = datetime.datetime
- # type is used in first if branch for perfs reasons
- if type(start) is not type(stop) and not (isinstance(start, type(stop)) or isinstance(stop, type(start))):
- raise CroniterBadTypeRangeError(
- "The start and stop must be same type. {0} != {1}".format(type(start), type(stop))
- )
- if isinstance(start, (float, int)):
- start, stop = (datetime.datetime.fromtimestamp(t, tzutc()).replace(tzinfo=None) for t in (start, stop))
- auto_rt = float
- if ret_type is None:
- ret_type = auto_rt
- if not exclude_ends:
- ms1 = relativedelta(microseconds=1)
- if start < stop: # Forward (normal) time order
- start -= ms1
- stop += ms1
- else: # Reverse time order
- start += ms1
- stop -= ms1
- year_span = math.floor(abs(stop.year - start.year)) + 1
- ic = _croniter(
- expr_format,
- start,
- ret_type=datetime.datetime,
- day_or=day_or,
- max_years_between_matches=year_span,
- second_at_beginning=second_at_beginning,
- expand_from_start_time=expand_from_start_time,
- )
- # define a continue (cont) condition function and step function for the main while loop
- if start < stop: # Forward
- def cont(v):
- return v < stop
- step = ic.get_next
- else: # Reverse
- def cont(v):
- return v > stop
- step = ic.get_prev
- try:
- dt = step()
- while cont(dt):
- if ret_type is float:
- yield ic.get_current(float)
- else:
- yield dt
- dt = step()
- except CroniterBadDateError:
- # Stop iteration when this exception is raised; no match found within the given year range
- return
- class HashExpander:
- def __init__(self, cronit):
- self.cron = cronit
- def do(self, idx, hash_type="h", hash_id=None, range_end=None, range_begin=None):
- """Return a hashed/random integer given range/hash information"""
- if range_end is None:
- range_end = self.cron.RANGES[idx][1]
- if range_begin is None:
- range_begin = self.cron.RANGES[idx][0]
- if hash_type == "r":
- crc = random.randint(0, 0xFFFFFFFF)
- else:
- crc = binascii.crc32(hash_id) & 0xFFFFFFFF
- return ((crc >> idx) % (range_end - range_begin + 1)) + range_begin
- def match(self, efl, idx, expr, hash_id=None, **kw):
- return hash_expression_re.match(expr)
- def expand(self, efl, idx, expr, hash_id=None, match="", **kw):
- """Expand a hashed/random expression to its normal representation"""
- if match == "":
- match = self.match(efl, idx, expr, hash_id, **kw)
- if not match:
- return expr
- m = match.groupdict()
- if m["hash_type"] == "h" and hash_id is None:
- raise CroniterBadCronError("Hashed definitions must include hash_id")
- if m["range_begin"] and m["range_end"]:
- if int(m["range_begin"]) >= int(m["range_end"]):
- raise CroniterBadCronError("Range end must be greater than range begin")
- if m["range_begin"] and m["range_end"] and m["divisor"]:
- # Example: H(30-59)/10 -> 34-59/10 (i.e. 34,44,54)
- if int(m["divisor"]) == 0:
- raise CroniterBadCronError("Bad expression: {0}".format(expr))
- return "{0}-{1}/{2}".format(
- self.do(
- idx,
- hash_type=m["hash_type"],
- hash_id=hash_id,
- range_begin=int(m["range_begin"]),
- range_end=int(m["divisor"]) - 1 + int(m["range_begin"]),
- ),
- int(m["range_end"]),
- int(m["divisor"]),
- )
- elif m["range_begin"] and m["range_end"]:
- # Example: H(0-29) -> 12
- return str(
- self.do(
- idx,
- hash_type=m["hash_type"],
- hash_id=hash_id,
- range_end=int(m["range_end"]),
- range_begin=int(m["range_begin"]),
- )
- )
- elif m["divisor"]:
- # Example: H/15 -> 7-59/15 (i.e. 7,22,37,52)
- if int(m["divisor"]) == 0:
- raise CroniterBadCronError("Bad expression: {0}".format(expr))
- return "{0}-{1}/{2}".format(
- self.do(
- idx,
- hash_type=m["hash_type"],
- hash_id=hash_id,
- range_begin=self.cron.RANGES[idx][0],
- range_end=int(m["divisor"]) - 1 + self.cron.RANGES[idx][0],
- ),
- self.cron.RANGES[idx][1],
- int(m["divisor"]),
- )
- else:
- # Example: H -> 32
- return str(
- self.do(
- idx,
- hash_type=m["hash_type"],
- hash_id=hash_id,
- )
- )
- EXPANDERS = OrderedDict(
- [
- ("hash", HashExpander),
- ]
- )
|