#!/usr/bin/env python # -*- coding: utf-8 -*- from __future__ import absolute_import, division, print_function import binascii import calendar import copy import datetime import math import platform import random import re import struct import sys import traceback as _traceback from time import time # as pytz is optional in thirdparty libs but we need it for good support under # python2, just test that it's well installed import pytz # noqa from dateutil.relativedelta import relativedelta from dateutil.tz import tzutc def is_32bit(): """ Detect if Python is running in 32-bit mode. Compatible with Python 2.6 and later versions. Returns True if running on 32-bit Python, False for 64-bit. """ # Method 1: Check pointer size bits = struct.calcsize("P") * 8 # Method 2: Check platform architecture string try: architecture = platform.architecture()[0] except RuntimeError: architecture = None # Method 3: Check maxsize (sys.maxint in Python 2) try: # Python 2 is_small_maxsize = sys.maxint <= 2 ** 32 except AttributeError: # Python 3 is_small_maxsize = sys.maxsize <= 2 ** 32 # Evaluate all available methods is_32 = False if bits == 32: is_32 = True elif architecture and "32" in architecture: is_32 = True elif is_small_maxsize: is_32 = True return is_32 try: # https://github.com/python/cpython/issues/101069 detection if is_32bit(): datetime.datetime.fromtimestamp(3999999999) OVERFLOW32B_MODE = False except OverflowError: OVERFLOW32B_MODE = True try: from collections import OrderedDict except ImportError: OrderedDict = dict # py26 degraded mode, expanders order will not be immutable try: # py3 recent UTC_DT = datetime.timezone.utc except AttributeError: UTC_DT = pytz.utc EPOCH = datetime.datetime.fromtimestamp(0, UTC_DT) # fmt: off M_ALPHAS = { "jan": 1, "feb": 2, "mar": 3, "apr": 4, # noqa: E241 "may": 5, "jun": 6, "jul": 7, "aug": 8, # noqa: E241 "sep": 9, "oct": 10, "nov": 11, "dec": 12, } DOW_ALPHAS = { "sun": 0, "mon": 1, "tue": 2, "wed": 3, "thu": 4, "fri": 5, "sat": 6 } MINUTE_FIELD = 0 HOUR_FIELD = 1 DAY_FIELD = 2 MONTH_FIELD = 3 DOW_FIELD = 4 SECOND_FIELD = 5 YEAR_FIELD = 6 UNIX_FIELDS = (MINUTE_FIELD, HOUR_FIELD, DAY_FIELD, MONTH_FIELD, DOW_FIELD) # noqa: E222 SECOND_FIELDS = (MINUTE_FIELD, HOUR_FIELD, DAY_FIELD, MONTH_FIELD, DOW_FIELD, SECOND_FIELD) # noqa: E222 YEAR_FIELDS = (MINUTE_FIELD, HOUR_FIELD, DAY_FIELD, MONTH_FIELD, DOW_FIELD, SECOND_FIELD, YEAR_FIELD) # noqa: E222 # fmt: on step_search_re = re.compile(r"^([^-]+)-([^-/]+)(/(\d+))?$") only_int_re = re.compile(r"^\d+$") WEEKDAYS = "|".join(DOW_ALPHAS.keys()) MONTHS = "|".join(M_ALPHAS.keys()) star_or_int_re = re.compile(r"^(\d+|\*)$") special_dow_re = re.compile( (r"^(?P
((?P(({WEEKDAYS})(-({WEEKDAYS}))?)").format(WEEKDAYS=WEEKDAYS) + (r"|(({MONTHS})(-({MONTHS}))?)|\w+)#)|l)(?P \d+)$").format(MONTHS=MONTHS) ) re_star = re.compile("[*]") hash_expression_re = re.compile( r"^(?P h|r)(\((?P \d+)-(?P \d+)\))?(\/(?P \d+))?$" ) CRON_FIELDS = { "unix": UNIX_FIELDS, "second": SECOND_FIELDS, "year": YEAR_FIELDS, len(UNIX_FIELDS): UNIX_FIELDS, len(SECOND_FIELDS): SECOND_FIELDS, len(YEAR_FIELDS): YEAR_FIELDS, } UNIX_CRON_LEN = len(UNIX_FIELDS) SECOND_CRON_LEN = len(SECOND_FIELDS) YEAR_CRON_LEN = len(YEAR_FIELDS) # retrocompat VALID_LEN_EXPRESSION = set(a for a in CRON_FIELDS if isinstance(a, int)) TIMESTAMP_TO_DT_CACHE = {} EXPRESSIONS = {} MARKER = object() def timedelta_to_seconds(td): return (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10 ** 6) / 10 ** 6 def datetime_to_timestamp(d): if d.tzinfo is not None: d = d.replace(tzinfo=None) - d.utcoffset() return timedelta_to_seconds(d - datetime.datetime(1970, 1, 1)) class CroniterError(ValueError): """General top-level Croniter base exception""" class CroniterBadTypeRangeError(TypeError): """.""" class CroniterBadCronError(CroniterError): """Syntax, unknown value, or range error within a cron expression""" class CroniterUnsupportedSyntaxError(CroniterBadCronError): """Valid cron syntax, but likely to produce inaccurate results""" # Extending CroniterBadCronError, which may be contridatory, but this allows # catching both errors with a single exception. From a user perspective # these will likely be handled the same way. class CroniterBadDateError(CroniterError): """Unable to find next/prev timestamp match""" class CroniterNotAlphaError(CroniterBadCronError): """Cron syntax contains an invalid day or month abbreviation""" class croniter(object): MONTHS_IN_YEAR = 12 # This helps with expanding `*` fields into `lower-upper` ranges. Each item # in this tuple maps to the corresponding field index RANGES = ( (0, 59), (0, 23), (1, 31), (1, 12), (0, 6), (0, 59), (1970, 2099), ) DAYS = (31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31) ALPHACONV = ( {}, # 0: min {}, # 1: hour {"l": "l"}, # 2: dom # 3: mon copy.deepcopy(M_ALPHAS), # 4: dow copy.deepcopy(DOW_ALPHAS), # 5: second {}, # 6: year {}, ) LOWMAP = ( {}, {}, {0: 1}, {0: 1}, {7: 0}, {}, {}, ) LEN_MEANS_ALL = ( 60, 24, 31, 12, 7, 60, 130, ) def __init__( self, expr_format, start_time=None, ret_type=float, day_or=True, max_years_between_matches=None, is_prev=False, hash_id=None, implement_cron_bug=False, second_at_beginning=None, expand_from_start_time=False, ): self._ret_type = ret_type self._day_or = day_or self._implement_cron_bug = implement_cron_bug self.second_at_beginning = bool(second_at_beginning) self._expand_from_start_time = expand_from_start_time if hash_id: if not isinstance(hash_id, (bytes, str)): raise TypeError("hash_id must be bytes or UTF-8 string") if not isinstance(hash_id, bytes): hash_id = hash_id.encode("UTF-8") self._max_years_btw_matches_explicitly_set = max_years_between_matches is not None if not self._max_years_btw_matches_explicitly_set: max_years_between_matches = 50 self._max_years_between_matches = max(int(max_years_between_matches), 1) if start_time is None: start_time = time() self.tzinfo = None self.start_time = None self.dst_start_time = None self.cur = None self.set_current(start_time, force=False) self.expanded, self.nth_weekday_of_month = self.expand( expr_format, hash_id=hash_id, from_timestamp=self.dst_start_time if self._expand_from_start_time else None, second_at_beginning=second_at_beginning, ) self.fields = CRON_FIELDS[len(self.expanded)] self.expressions = EXPRESSIONS[(expr_format, hash_id, second_at_beginning)] self._is_prev = is_prev @classmethod def _alphaconv(cls, index, key, expressions): try: return cls.ALPHACONV[index][key] except KeyError: raise CroniterNotAlphaError("[{0}] is not acceptable".format(" ".join(expressions))) def get_next(self, ret_type=None, start_time=None, update_current=True): if start_time and self._expand_from_start_time: raise ValueError("start_time is not supported when using expand_from_start_time = True.") return self._get_next( ret_type=ret_type, start_time=start_time, is_prev=False, update_current=update_current, ) def get_prev(self, ret_type=None, start_time=None, update_current=True): return self._get_next( ret_type=ret_type, start_time=start_time, is_prev=True, update_current=update_current, ) def get_current(self, ret_type=None): ret_type = ret_type or self._ret_type if issubclass(ret_type, datetime.datetime): return self.timestamp_to_datetime(self.cur) return self.cur def set_current(self, start_time, force=True): if (force or (self.cur is None)) and start_time is not None: if isinstance(start_time, datetime.datetime): self.tzinfo = start_time.tzinfo start_time = self.datetime_to_timestamp(start_time) self.start_time = start_time self.dst_start_time = start_time self.cur = start_time return self.cur @staticmethod def datetime_to_timestamp(d): """ Converts a `datetime` object `d` into a UNIX timestamp. """ return datetime_to_timestamp(d) _datetime_to_timestamp = datetime_to_timestamp # retrocompat def timestamp_to_datetime(self, timestamp, tzinfo=MARKER): """ Converts a UNIX `timestamp` into a `datetime` object. """ if tzinfo is MARKER: # allow to give tzinfo=None even if self.tzinfo is set tzinfo = self.tzinfo k = timestamp if tzinfo: k = (timestamp, repr(tzinfo)) try: return TIMESTAMP_TO_DT_CACHE[k] except KeyError: pass if OVERFLOW32B_MODE: # degraded mode to workaround Y2038 # see https://github.com/python/cpython/issues/101069 result = EPOCH.replace(tzinfo=None) + datetime.timedelta(seconds=timestamp) else: result = datetime.datetime.fromtimestamp(timestamp, tz=tzutc()).replace(tzinfo=None) if tzinfo: result = result.replace(tzinfo=UTC_DT).astimezone(tzinfo) TIMESTAMP_TO_DT_CACHE[(result, repr(result.tzinfo))] = result return result _timestamp_to_datetime = timestamp_to_datetime # retrocompat @staticmethod def timedelta_to_seconds(td): """ Converts a 'datetime.timedelta' object `td` into seconds contained in the duration. Note: We cannot use `timedelta.total_seconds()` because this is not supported by Python 2.6. """ return timedelta_to_seconds(td) _timedelta_to_seconds = timedelta_to_seconds # retrocompat def _get_next( self, ret_type=None, start_time=None, is_prev=None, update_current=None, ): if update_current is None: update_current = True self.set_current(start_time, force=True) if is_prev is None: is_prev = self._is_prev self._is_prev = is_prev expanded = self.expanded[:] nth_weekday_of_month = self.nth_weekday_of_month.copy() ret_type = ret_type or self._ret_type if not issubclass(ret_type, (float, datetime.datetime)): raise TypeError("Invalid ret_type, only 'float' or 'datetime' is acceptable.") # exception to support day of month and day of week as defined in cron dom_dow_exception_processed = False if (expanded[DAY_FIELD][0] != "*" and expanded[DOW_FIELD][0] != "*") and self._day_or: # If requested, handle a bug in vixie cron/ISC cron where day_of_month and day_of_week form # an intersection (AND) instead of a union (OR) if either field is an asterisk or starts with an asterisk # (https://crontab.guru/cron-bug.html) if self._implement_cron_bug and ( re_star.match(self.expressions[DAY_FIELD]) or re_star.match(self.expressions[DOW_FIELD]) ): # To produce a schedule identical to the cron bug, we'll bypass the code that # makes a union of DOM and DOW, and instead skip to the code that does an intersect instead pass else: bak = expanded[DOW_FIELD] expanded[DOW_FIELD] = ["*"] t1 = self._calc(self.cur, expanded, nth_weekday_of_month, is_prev) expanded[DOW_FIELD] = bak expanded[DAY_FIELD] = ["*"] t2 = self._calc(self.cur, expanded, nth_weekday_of_month, is_prev) if not is_prev: result = t1 if t1 < t2 else t2 else: result = t1 if t1 > t2 else t2 dom_dow_exception_processed = True if not dom_dow_exception_processed: result = self._calc(self.cur, expanded, nth_weekday_of_month, is_prev) # DST Handling for cron job spanning across days dtstarttime = self._timestamp_to_datetime(self.dst_start_time) dtstarttime_utcoffset = dtstarttime.utcoffset() or datetime.timedelta(0) dtresult = self.timestamp_to_datetime(result) lag = lag_hours = 0 # do we trigger DST on next crontab (handle backward changes) dtresult_utcoffset = dtstarttime_utcoffset if dtresult and self.tzinfo: dtresult_utcoffset = dtresult.utcoffset() lag_hours = self._timedelta_to_seconds(dtresult - dtstarttime) / (60 * 60) lag = self._timedelta_to_seconds(dtresult_utcoffset - dtstarttime_utcoffset) hours_before_midnight = 24 - dtstarttime.hour if dtresult_utcoffset != dtstarttime_utcoffset: if (lag > 0 and abs(lag_hours) >= hours_before_midnight) or ( lag < 0 and ((3600 * abs(lag_hours) + abs(lag)) >= hours_before_midnight * 3600) ): dtresult_adjusted = dtresult - datetime.timedelta(seconds=lag) result_adjusted = self._datetime_to_timestamp(dtresult_adjusted) # Do the actual adjust only if the result time actually exists if self._timestamp_to_datetime(result_adjusted).tzinfo == dtresult_adjusted.tzinfo: dtresult = dtresult_adjusted result = result_adjusted self.dst_start_time = result if update_current: self.cur = result if issubclass(ret_type, datetime.datetime): result = dtresult return result # iterator protocol, to enable direct use of croniter # objects in a loop, like "for dt in croniter("5 0 * * *'): ..." # or for combining multiple croniters into single # dates feed using 'itertools' module def all_next(self, ret_type=None, start_time=None, update_current=None): """ Returns a generator yielding consecutive dates. May be used instead of an implicit call to __iter__ whenever a non-default `ret_type` needs to be specified. """ # In a Python 3.7+ world: contextlib.suppress and contextlib.nullcontext could be used instead try: while True: self._is_prev = False yield self._get_next( ret_type=ret_type, start_time=start_time, update_current=update_current, ) start_time = None except CroniterBadDateError: if self._max_years_btw_matches_explicitly_set: return raise def all_prev(self, ret_type=None, start_time=None, update_current=None): """ Returns a generator yielding previous dates. """ try: while True: self._is_prev = True yield self._get_next( ret_type=ret_type, start_time=start_time, update_current=update_current, ) start_time = None except CroniterBadDateError: if self._max_years_btw_matches_explicitly_set: return raise def iter(self, *args, **kwargs): return self.all_prev if self._is_prev else self.all_next def __iter__(self): return self __next__ = next = _get_next def _calc(self, now, expanded, nth_weekday_of_month, is_prev): if is_prev: now = math.ceil(now) nearest_diff_method = self._get_prev_nearest_diff sign = -1 offset = 1 if (len(expanded) > UNIX_CRON_LEN or now % 60 > 0) else 60 else: now = math.floor(now) nearest_diff_method = self._get_next_nearest_diff sign = 1 offset = 1 if (len(expanded) > UNIX_CRON_LEN) else 60 dst = now = self.timestamp_to_datetime(now + sign * offset) month, year = dst.month, dst.year current_year = now.year DAYS = self.DAYS def proc_year(d): if len(expanded) == YEAR_CRON_LEN: try: expanded[YEAR_FIELD].index("*") except ValueError: # use None as range_val to indicate no loop diff_year = nearest_diff_method(d.year, expanded[YEAR_FIELD], None) if diff_year is None: return None, d if diff_year != 0: if is_prev: d += relativedelta( years=diff_year, month=12, day=31, hour=23, minute=59, second=59, ) else: d += relativedelta( years=diff_year, month=1, day=1, hour=0, minute=0, second=0, ) return True, d return False, d def proc_month(d): try: expanded[MONTH_FIELD].index("*") except ValueError: diff_month = nearest_diff_method(d.month, expanded[MONTH_FIELD], self.MONTHS_IN_YEAR) reset_day = 1 if diff_month is not None and diff_month != 0: if is_prev: d += relativedelta(months=diff_month) reset_day = DAYS[d.month - 1] if d.month == 2 and self.is_leap(d.year) is True: reset_day += 1 d += relativedelta(day=reset_day, hour=23, minute=59, second=59) else: d += relativedelta(months=diff_month, day=reset_day, hour=0, minute=0, second=0) return True, d return False, d def proc_day_of_month(d): try: expanded[DAY_FIELD].index("*") except ValueError: days = DAYS[month - 1] if month == 2 and self.is_leap(year) is True: days += 1 if "l" in expanded[DAY_FIELD] and days == d.day: return False, d if is_prev: days_in_prev_month = DAYS[(month - 2) % self.MONTHS_IN_YEAR] diff_day = nearest_diff_method(d.day, expanded[DAY_FIELD], days_in_prev_month) else: diff_day = nearest_diff_method(d.day, expanded[DAY_FIELD], days) if diff_day is not None and diff_day != 0: if is_prev: d += relativedelta(days=diff_day, hour=23, minute=59, second=59) else: d += relativedelta(days=diff_day, hour=0, minute=0, second=0) return True, d return False, d def proc_day_of_week(d): try: expanded[DOW_FIELD].index("*") except ValueError: diff_day_of_week = nearest_diff_method(d.isoweekday() % 7, expanded[DOW_FIELD], 7) if diff_day_of_week is not None and diff_day_of_week != 0: if is_prev: d += relativedelta(days=diff_day_of_week, hour=23, minute=59, second=59) else: d += relativedelta(days=diff_day_of_week, hour=0, minute=0, second=0) return True, d return False, d def proc_day_of_week_nth(d): if "*" in nth_weekday_of_month: s = nth_weekday_of_month["*"] for i in range(0, 7): if i in nth_weekday_of_month: nth_weekday_of_month[i].update(s) else: nth_weekday_of_month[i] = s del nth_weekday_of_month["*"] candidates = [] for wday, nth in nth_weekday_of_month.items(): c = self._get_nth_weekday_of_month(d.year, d.month, wday) for n in nth: if n == "l": candidate = c[-1] elif len(c) < n: continue else: candidate = c[n - 1] if (is_prev and candidate <= d.day) or (not is_prev and d.day <= candidate): candidates.append(candidate) if not candidates: if is_prev: d += relativedelta(days=-d.day, hour=23, minute=59, second=59) else: days = DAYS[month - 1] if month == 2 and self.is_leap(year) is True: days += 1 d += relativedelta(days=(days - d.day + 1), hour=0, minute=0, second=0) return True, d candidates.sort() diff_day = (candidates[-1] if is_prev else candidates[0]) - d.day if diff_day != 0: if is_prev: d += relativedelta(days=diff_day, hour=23, minute=59, second=59) else: d += relativedelta(days=diff_day, hour=0, minute=0, second=0) return True, d return False, d def proc_hour(d): try: expanded[HOUR_FIELD].index("*") except ValueError: diff_hour = nearest_diff_method(d.hour, expanded[HOUR_FIELD], 24) if diff_hour is not None and diff_hour != 0: if is_prev: d += relativedelta(hours=diff_hour, minute=59, second=59) else: d += relativedelta(hours=diff_hour, minute=0, second=0) return True, d return False, d def proc_minute(d): try: expanded[MINUTE_FIELD].index("*") except ValueError: diff_min = nearest_diff_method(d.minute, expanded[MINUTE_FIELD], 60) if diff_min is not None and diff_min != 0: if is_prev: d += relativedelta(minutes=diff_min, second=59) else: d += relativedelta(minutes=diff_min, second=0) return True, d return False, d def proc_second(d): if len(expanded) > UNIX_CRON_LEN: try: expanded[SECOND_FIELD].index("*") except ValueError: diff_sec = nearest_diff_method(d.second, expanded[SECOND_FIELD], 60) if diff_sec is not None and diff_sec != 0: d += relativedelta(seconds=diff_sec) return True, d else: d += relativedelta(second=0) return False, d procs = [ proc_year, proc_month, proc_day_of_month, (proc_day_of_week_nth if nth_weekday_of_month else proc_day_of_week), proc_hour, proc_minute, proc_second, ] while abs(year - current_year) <= self._max_years_between_matches: next = False stop = False for proc in procs: (changed, dst) = proc(dst) # `None` can be set mostly for year processing # so please see proc_year / _get_prev_nearest_diff / _get_next_nearest_diff if changed is None: stop = True break if changed: month, year = dst.month, dst.year next = True break if stop: break if next: continue return self.datetime_to_timestamp(dst.replace(microsecond=0)) if is_prev: raise CroniterBadDateError("failed to find prev date") raise CroniterBadDateError("failed to find next date") @staticmethod def _get_next_nearest(x, to_check): small = [item for item in to_check if item < x] large = [item for item in to_check if item >= x] large.extend(small) return large[0] @staticmethod def _get_prev_nearest(x, to_check): small = [item for item in to_check if item <= x] large = [item for item in to_check if item > x] small.reverse() large.reverse() small.extend(large) return small[0] @staticmethod def _get_next_nearest_diff(x, to_check, range_val): """ `range_val` is the range of a field. If no available time, we can move to next loop(like next month). `range_val` can also be set to `None` to indicate that there is no loop. ( Currently, should only used for `year` field ) """ for i, d in enumerate(to_check): if d == "l" and range_val is not None: # if 'l' then it is the last day of month # => its value of range_val d = range_val if d >= x: return d - x # When range_val is None and x not exists in to_check, # `None` will be returned to suggest no more available time if range_val is None: return None return to_check[0] - x + range_val @staticmethod def _get_prev_nearest_diff(x, to_check, range_val): """ `range_val` is the range of a field. If no available time, we can move to previous loop(like previous month). Range_val can also be set to `None` to indicate that there is no loop. ( Currently should only used for `year` field ) """ candidates = to_check[:] candidates.reverse() for d in candidates: if d != "l" and d <= x: return d - x if "l" in candidates: return -x # When range_val is None and x not exists in to_check, # `None` will be returned to suggest no more available time if range_val is None: return None candidate = candidates[0] for c in candidates: # fixed: c < range_val # this code will reject all 31 day of month, 12 month, 59 second, # 23 hour and so on. # if candidates has just a element, this will not harmful. # but candidates have multiple elements, then values equal to # range_val will rejected. if c <= range_val: candidate = c break # fix crontab "0 6 30 3 *" condidates only a element, then get_prev error return 2021-03-02 06:00:00 if candidate > range_val: return -range_val return candidate - x - range_val @staticmethod def _get_nth_weekday_of_month(year, month, day_of_week): """For a given year/month return a list of days in nth-day-of-month order. The last weekday of the month is always [-1]. """ w = (day_of_week + 6) % 7 c = calendar.Calendar(w).monthdayscalendar(year, month) if c[0][0] == 0: c.pop(0) return tuple(i[0] for i in c) @staticmethod def is_leap(year): return bool(year % 400 == 0 or (year % 4 == 0 and year % 100 != 0)) @classmethod def value_alias(cls, val, field_index, len_expressions=UNIX_CRON_LEN): if isinstance(len_expressions, (list, dict, tuple, set)): len_expressions = len(len_expressions) if val in cls.LOWMAP[field_index] and not ( # do not support 0 as a month either for classical 5 fields cron, # 6fields second repeat form or 7 fields year form # but still let conversion happen if day field is shifted (field_index in [DAY_FIELD, MONTH_FIELD] and len_expressions == UNIX_CRON_LEN) or (field_index in [MONTH_FIELD, DOW_FIELD] and len_expressions == SECOND_CRON_LEN) or (field_index in [DAY_FIELD, MONTH_FIELD, DOW_FIELD] and len_expressions == YEAR_CRON_LEN) ): val = cls.LOWMAP[field_index][val] return val @classmethod def _expand( cls, expr_format, hash_id=None, second_at_beginning=False, from_timestamp=None, ): # Split the expression in components, and normalize L -> l, MON -> mon, # etc. Keep expr_format untouched so we can use it in the exception # messages. expr_aliases = { "@midnight": ("0 0 * * *", "h h(0-2) * * * h"), "@hourly": ("0 * * * *", "h * * * * h"), "@daily": ("0 0 * * *", "h h * * * h"), "@weekly": ("0 0 * * 0", "h h * * h h"), "@monthly": ("0 0 1 * *", "h h h * * h"), "@yearly": ("0 0 1 1 *", "h h h h * h"), "@annually": ("0 0 1 1 *", "h h h h * h"), } efl = expr_format.lower() hash_id_expr = 1 if hash_id is not None else 0 try: efl = expr_aliases[efl][hash_id_expr] except KeyError: pass expressions = efl.split() if len(expressions) not in VALID_LEN_EXPRESSION: raise CroniterBadCronError("Exactly 5, 6 or 7 columns has to be specified for iterator expression.") if len(expressions) > UNIX_CRON_LEN and second_at_beginning: # move second to it's own(6th) field to process by same logical expressions.insert(SECOND_FIELD, expressions.pop(0)) expanded = [] nth_weekday_of_month = {} for field_index, expr in enumerate(expressions): for expanderid, expander in EXPANDERS.items(): expr = expander(cls).expand( efl, field_index, expr, hash_id=hash_id, from_timestamp=from_timestamp, ) if "?" in expr: if expr != "?": raise CroniterBadCronError( "[{0}] is not acceptable. Question mark can not used with other characters".format(expr_format) ) if field_index not in [DAY_FIELD, DOW_FIELD]: raise CroniterBadCronError( "[{0}] is not acceptable. Question mark can only used in day_of_month or day_of_week".format( expr_format ) ) # currently just trade `?` as `*` expr = "*" e_list = expr.split(",") res = [] while len(e_list) > 0: e = e_list.pop() nth = None if field_index == DOW_FIELD: # Handle special case in the dow expression: 2#3, l3 special_dow_rem = special_dow_re.match(str(e)) if special_dow_rem: g = special_dow_rem.groupdict() he, last = g.get("he", ""), g.get("last", "") if he: e = he try: nth = int(last) assert 5 >= nth >= 1 except (KeyError, ValueError, AssertionError): raise CroniterBadCronError( "[{0}] is not acceptable. Invalid day_of_week value: '{1}'".format( expr_format, nth ) ) elif last: e = last nth = g["pre"] # 'l' # Before matching step_search_re, normalize "*" to "{min}-{max}". # Example: in the minute field, "*/5" normalizes to "0-59/5" t = re.sub( r"^\*(\/.+)$", r"%d-%d\1" % (cls.RANGES[field_index][0], cls.RANGES[field_index][1]), str(e), ) m = step_search_re.search(t) if not m: # Before matching step_search_re, # normalize "{start}/{step}" to "{start}-{max}/{step}". # Example: in the minute field, "10/5" normalizes to "10-59/5" t = re.sub( r"^(.+)\/(.+)$", r"\1-%d/\2" % (cls.RANGES[field_index][1]), str(e), ) m = step_search_re.search(t) if m: # early abort if low/high are out of bounds (low, high, step) = m.group(1), m.group(2), m.group(4) or 1 if field_index == DAY_FIELD and high == "l": high = "31" if not only_int_re.search(low): low = "{0}".format(cls._alphaconv(field_index, low, expressions)) if not only_int_re.search(high): high = "{0}".format(cls._alphaconv(field_index, high, expressions)) # normally, it's already guarded by the RE that should not accept not-int values. if not only_int_re.search(str(step)): raise CroniterBadCronError( "[{0}] step '{2}' in field {1} is not acceptable".format(expr_format, field_index, step) ) step = int(step) for band in low, high: if not only_int_re.search(str(band)): raise CroniterBadCronError( "[{0}] bands '{2}-{3}' in field {1} are not acceptable".format( expr_format, field_index, low, high ) ) low, high = [cls.value_alias(int(_val), field_index, expressions) for _val in (low, high)] if max(low, high) > max(cls.RANGES[field_index][0], cls.RANGES[field_index][1]): raise CroniterBadCronError("{0} is out of bands".format(expr_format)) if from_timestamp: low = cls._get_low_from_current_date_number(field_index, int(step), int(from_timestamp)) # Handle when the second bound of the range is in backtracking order: # eg: X-Sun or X-7 (Sat-Sun) in DOW, or X-Jan (Apr-Jan) in MONTH if low > high: whole_field_range = list( range( cls.RANGES[field_index][0], cls.RANGES[field_index][1] + 1, 1, ) ) # Add FirstBound -> ENDRANGE, respecting step rng = list(range(low, cls.RANGES[field_index][1] + 1, step)) # Then 0 -> SecondBound, but skipping n first occurences according to step # EG to respect such expressions : Apr-Jan/3 to_skip = 0 if rng: already_skipped = list(reversed(whole_field_range)).index(rng[-1]) curpos = whole_field_range.index(rng[-1]) if ((curpos + step) > len(whole_field_range)) and (already_skipped < step): to_skip = step - already_skipped rng += list(range(cls.RANGES[field_index][0] + to_skip, high + 1, step)) # if we include a range type: Jan-Jan, or Sun-Sun, # it means the whole cycle (all days of week, # all monthes of year, etc) elif low == high: rng = list( range( cls.RANGES[field_index][0], cls.RANGES[field_index][1] + 1, step, ) ) else: try: rng = list(range(low, high + 1, step)) except ValueError as exc: raise CroniterBadCronError("invalid range: {0}".format(exc)) rng = ( ["{0}#{1}".format(item, nth) for item in rng] if field_index == DOW_FIELD and nth and nth != "l" else rng ) e_list += [a for a in rng if a not in e_list] else: if t.startswith("-"): raise CroniterBadCronError( "[{0}] is not acceptable," "negative numbers not allowed".format(expr_format) ) if not star_or_int_re.search(t): t = cls._alphaconv(field_index, t, expressions) try: t = int(t) except ValueError: pass t = cls.value_alias(t, field_index, expressions) if t not in ["*", "l"] and ( int(t) < cls.RANGES[field_index][0] or int(t) > cls.RANGES[field_index][1] ): raise CroniterBadCronError("[{0}] is not acceptable, out of range".format(expr_format)) res.append(t) if field_index == DOW_FIELD and nth: if t not in nth_weekday_of_month: nth_weekday_of_month[t] = set() nth_weekday_of_month[t].add(nth) res = set(res) res = sorted(res, key=lambda i: "{:02}".format(i) if isinstance(i, int) else i) if len(res) == cls.LEN_MEANS_ALL[field_index]: # Make sure the wildcard is used in the correct way (avoid over-optimization) if (field_index == DAY_FIELD and "*" not in expressions[DOW_FIELD]) or ( field_index == DOW_FIELD and "*" not in expressions[DAY_FIELD] ): pass else: res = ["*"] expanded.append(["*"] if (len(res) == 1 and res[0] == "*") else res) # Check to make sure the dow combo in use is supported if nth_weekday_of_month: dow_expanded_set = set(expanded[DOW_FIELD]) dow_expanded_set = dow_expanded_set.difference(nth_weekday_of_month.keys()) dow_expanded_set.discard("*") # Skip: if it's all weeks instead of wildcard if dow_expanded_set and len(set(expanded[DOW_FIELD])) != cls.LEN_MEANS_ALL[DOW_FIELD]: raise CroniterUnsupportedSyntaxError( "day-of-week field does not support mixing literal values and nth day of week syntax. " "Cron: '{}' dow={} vs nth={}".format(expr_format, dow_expanded_set, nth_weekday_of_month) ) EXPRESSIONS[(expr_format, hash_id, second_at_beginning)] = expressions return expanded, nth_weekday_of_month @classmethod def expand( cls, expr_format, hash_id=None, second_at_beginning=False, from_timestamp=None, ): """ Expand a cron expression format into a noramlized format of list[list[int | 'l' | '*']]. The first list representing each element of the epxression, and each sub-list representing the allowed values for that expression component. A tuple is returned, the first value being the expanded epxression list, and the second being a `nth_weekday_of_month` mapping. Examples: # Every minute >>> croniter.expand('* * * * *') ([['*'], ['*'], ['*'], ['*'], ['*']], {}) # On the hour >>> croniter.expand('0 0 * * *') ([[0], [0], ['*'], ['*'], ['*']], {}) # Hours 0-5 and 10 monday through friday >>> croniter.expand('0-5,10 * * * mon-fri') ([[0, 1, 2, 3, 4, 5, 10], ['*'], ['*'], ['*'], [1, 2, 3, 4, 5]], {}) Note that some special values such as nth day of week are expanded to a special mapping format for later processing: # Every minute on the 3rd tuesday of the month >>> croniter.expand('* * * * 2#3') ([['*'], ['*'], ['*'], ['*'], [2]], {2: {3}}) # Every hour on the last day of the month >>> croniter.expand('0 * l * *') ([[0], ['*'], ['l'], ['*'], ['*']], {}) # On the hour every 15 seconds >>> croniter.expand('0 0 * * * */15') ([[0], [0], ['*'], ['*'], ['*'], [0, 15, 30, 45]], {}) """ try: return cls._expand( expr_format, hash_id=hash_id, second_at_beginning=second_at_beginning, from_timestamp=from_timestamp, ) except (ValueError,) as exc: if isinstance(exc, CroniterError): raise if int(sys.version[0]) >= 3: trace = _traceback.format_exc() raise CroniterBadCronError(trace) raise CroniterBadCronError("{0}".format(exc)) @classmethod def _get_low_from_current_date_number(cls, field_index, step, from_timestamp): dt = datetime.datetime.fromtimestamp(from_timestamp, tz=UTC_DT) if field_index == MINUTE_FIELD: return dt.minute % step if field_index == HOUR_FIELD: return dt.hour % step if field_index == DAY_FIELD: return ((dt.day - 1) % step) + 1 if field_index == MONTH_FIELD: return dt.month % step if field_index == DOW_FIELD: return (dt.weekday() + 1) % step raise ValueError("Can't get current date number for index larger than 4") @classmethod def is_valid( cls, expression, hash_id=None, encoding="UTF-8", second_at_beginning=False, ): if hash_id: if not isinstance(hash_id, (bytes, str)): raise TypeError("hash_id must be bytes or UTF-8 string") if not isinstance(hash_id, bytes): hash_id = hash_id.encode(encoding) try: cls.expand(expression, hash_id=hash_id, second_at_beginning=second_at_beginning) except CroniterError: return False return True @classmethod def match(cls, cron_expression, testdate, day_or=True, second_at_beginning=False): return cls.match_range(cron_expression, testdate, testdate, day_or, second_at_beginning) @classmethod def match_range( cls, cron_expression, from_datetime, to_datetime, day_or=True, second_at_beginning=False, ): cron = cls( cron_expression, to_datetime, ret_type=datetime.datetime, day_or=day_or, second_at_beginning=second_at_beginning, ) tdp = cron.get_current(datetime.datetime) if not tdp.microsecond: tdp += relativedelta(microseconds=1) cron.set_current(tdp, force=True) try: tdt = cron.get_prev() except CroniterBadDateError: return False precision_in_seconds = 1 if len(cron.expanded) > UNIX_CRON_LEN else 60 duration_in_second = (to_datetime - from_datetime).total_seconds() + precision_in_seconds return (max(tdp, tdt) - min(tdp, tdt)).total_seconds() < duration_in_second def croniter_range( start, stop, expr_format, ret_type=None, day_or=True, exclude_ends=False, _croniter=None, second_at_beginning=False, expand_from_start_time=False, ): """ Generator that provides all times from start to stop matching the given cron expression. If the cron expression matches either 'start' and/or 'stop', those times will be returned as well unless 'exclude_ends=True' is passed. You can think of this function as sibling to the builtin range function for datetime objects. Like range(start,stop,step), except that here 'step' is a cron expression. """ _croniter = _croniter or croniter auto_rt = datetime.datetime # type is used in first if branch for perfs reasons if type(start) is not type(stop) and not (isinstance(start, type(stop)) or isinstance(stop, type(start))): raise CroniterBadTypeRangeError( "The start and stop must be same type. {0} != {1}".format(type(start), type(stop)) ) if isinstance(start, (float, int)): start, stop = (datetime.datetime.fromtimestamp(t, tzutc()).replace(tzinfo=None) for t in (start, stop)) auto_rt = float if ret_type is None: ret_type = auto_rt if not exclude_ends: ms1 = relativedelta(microseconds=1) if start < stop: # Forward (normal) time order start -= ms1 stop += ms1 else: # Reverse time order start += ms1 stop -= ms1 year_span = math.floor(abs(stop.year - start.year)) + 1 ic = _croniter( expr_format, start, ret_type=datetime.datetime, day_or=day_or, max_years_between_matches=year_span, second_at_beginning=second_at_beginning, expand_from_start_time=expand_from_start_time, ) # define a continue (cont) condition function and step function for the main while loop if start < stop: # Forward def cont(v): return v < stop step = ic.get_next else: # Reverse def cont(v): return v > stop step = ic.get_prev try: dt = step() while cont(dt): if ret_type is float: yield ic.get_current(float) else: yield dt dt = step() except CroniterBadDateError: # Stop iteration when this exception is raised; no match found within the given year range return class HashExpander: def __init__(self, cronit): self.cron = cronit def do(self, idx, hash_type="h", hash_id=None, range_end=None, range_begin=None): """Return a hashed/random integer given range/hash information""" if range_end is None: range_end = self.cron.RANGES[idx][1] if range_begin is None: range_begin = self.cron.RANGES[idx][0] if hash_type == "r": crc = random.randint(0, 0xFFFFFFFF) else: crc = binascii.crc32(hash_id) & 0xFFFFFFFF return ((crc >> idx) % (range_end - range_begin + 1)) + range_begin def match(self, efl, idx, expr, hash_id=None, **kw): return hash_expression_re.match(expr) def expand(self, efl, idx, expr, hash_id=None, match="", **kw): """Expand a hashed/random expression to its normal representation""" if match == "": match = self.match(efl, idx, expr, hash_id, **kw) if not match: return expr m = match.groupdict() if m["hash_type"] == "h" and hash_id is None: raise CroniterBadCronError("Hashed definitions must include hash_id") if m["range_begin"] and m["range_end"]: if int(m["range_begin"]) >= int(m["range_end"]): raise CroniterBadCronError("Range end must be greater than range begin") if m["range_begin"] and m["range_end"] and m["divisor"]: # Example: H(30-59)/10 -> 34-59/10 (i.e. 34,44,54) if int(m["divisor"]) == 0: raise CroniterBadCronError("Bad expression: {0}".format(expr)) return "{0}-{1}/{2}".format( self.do( idx, hash_type=m["hash_type"], hash_id=hash_id, range_begin=int(m["range_begin"]), range_end=int(m["divisor"]) - 1 + int(m["range_begin"]), ), int(m["range_end"]), int(m["divisor"]), ) elif m["range_begin"] and m["range_end"]: # Example: H(0-29) -> 12 return str( self.do( idx, hash_type=m["hash_type"], hash_id=hash_id, range_end=int(m["range_end"]), range_begin=int(m["range_begin"]), ) ) elif m["divisor"]: # Example: H/15 -> 7-59/15 (i.e. 7,22,37,52) if int(m["divisor"]) == 0: raise CroniterBadCronError("Bad expression: {0}".format(expr)) return "{0}-{1}/{2}".format( self.do( idx, hash_type=m["hash_type"], hash_id=hash_id, range_begin=self.cron.RANGES[idx][0], range_end=int(m["divisor"]) - 1 + self.cron.RANGES[idx][0], ), self.cron.RANGES[idx][1], int(m["divisor"]), ) else: # Example: H -> 32 return str( self.do( idx, hash_type=m["hash_type"], hash_id=hash_id, ) ) EXPANDERS = OrderedDict( [ ("hash", HashExpander), ] )