Changeset View
Changeset View
Standalone View
Standalone View
swh/model/model.py
Show First 20 Lines • Show All 372 Lines • ▼ Show 20 Lines | class Timestamp(BaseModel): | ||||
@microseconds.validator | @microseconds.validator | ||||
def check_microseconds(self, attribute, value): | def check_microseconds(self, attribute, value): | ||||
"""Checks that microseconds are positive and < 1000000.""" | """Checks that microseconds are positive and < 1000000.""" | ||||
if not (0 <= value < 10 ** 6): | if not (0 <= value < 10 ** 6): | ||||
raise ValueError("Microseconds must be in [0, 1000000[.") | raise ValueError("Microseconds must be in [0, 1000000[.") | ||||
@attr.s(frozen=True, slots=True) | @attr.s(frozen=True, slots=True, init=False) | ||||
class TimestampWithTimezone(BaseModel): | class TimestampWithTimezone(BaseModel): | ||||
"""Represents a TZ-aware timestamp from a VCS.""" | """Represents a TZ-aware timestamp from a VCS.""" | ||||
object_type: Final = "timestamp_with_timezone" | object_type: Final = "timestamp_with_timezone" | ||||
timestamp = attr.ib(type=Timestamp, validator=type_validator()) | timestamp = attr.ib(type=Timestamp, validator=type_validator()) | ||||
offset = attr.ib(type=int, validator=type_validator()) | offset = attr.ib(type=int, validator=type_validator()) | ||||
negative_utc = attr.ib(type=bool, validator=type_validator()) | negative_utc = attr.ib(type=bool, validator=type_validator()) | ||||
offset_bytes = attr.ib(type=bytes, validator=type_validator()) | offset_bytes = attr.ib(type=bytes, validator=type_validator()) | ||||
"""Raw git representation of the timezone, as an offset from UTC. | """Raw git representation of the timezone, as an offset from UTC. | ||||
It should follow this format: ``+HHMM`` or ``-HHMM`` (including ``+0000`` and | It should follow this format: ``+HHMM`` or ``-HHMM`` (including ``+0000`` and | ||||
``-0000``). | ``-0000``). | ||||
However, when created from git objects, it must be the exact bytes used in the | However, when created from git objects, it must be the exact bytes used in the | ||||
original objects, so it may differ from this format when they do. | original objects, so it may differ from this format when they do. | ||||
""" | """ | ||||
def __init__( | |||||
self, | |||||
timestamp: Timestamp, | |||||
offset: int = None, | |||||
negative_utc: bool = None, | |||||
offset_bytes: bytes = None, | |||||
olasd: I assume that these should be marked optional | |||||
vlorentzAuthorUnsubmitted Done Inline Actionsnope, I don't want callers to pass None. vlorentz: nope, I don't want callers to pass `None`. | |||||
olasdUnsubmitted Not Done Inline ActionsAh, right. olasd: Ah, right. | |||||
): | |||||
if offset_bytes is None: | |||||
if offset is None: | |||||
raise AttributeError("Neither 'offset' nor 'offset_bytes' was passed.") | |||||
if negative_utc is None: | |||||
raise AttributeError( | |||||
"Neither 'negative_utc' nor 'offset_bytes' was passed." | |||||
) | |||||
negative = offset < 0 or negative_utc | |||||
(hours, minutes) = divmod(abs(offset), 60) | |||||
offset_bytes = f"{'-' if negative else '+'}{hours:02}{minutes:02}".encode() | |||||
else: | |||||
offset = self._parse_offset_bytes(offset_bytes) | |||||
negative_utc = offset == 0 and offset_bytes.startswith(b"-") | |||||
self.__attrs_init__( # type: ignore | |||||
timestamp=timestamp, | |||||
offset=offset, | |||||
negative_utc=negative_utc, | |||||
offset_bytes=offset_bytes, | |||||
) | |||||
@offset.validator | @offset.validator | ||||
def check_offset(self, attribute, value): | def check_offset(self, attribute, value): | ||||
"""Checks the offset is a 16-bits signed integer (in theory, it | """Checks the offset is a 16-bits signed integer (in theory, it | ||||
should always be between -14 and +14 hours).""" | should always be between -14 and +14 hours).""" | ||||
if not (-(2 ** 15) <= value < 2 ** 15): | if not (-(2 ** 15) <= value < 2 ** 15): | ||||
# max 14 hours offset in theory, but you never know what | # max 14 hours offset in theory, but you never know what | ||||
# you'll find in the wild... | # you'll find in the wild... | ||||
raise ValueError("offset too large: %d minutes" % value) | raise ValueError("offset too large: %d minutes" % value) | ||||
self._check_offsets_match() | self._check_offsets_match() | ||||
@negative_utc.validator | @negative_utc.validator | ||||
def check_negative_utc(self, attribute, value): | def check_negative_utc(self, attribute, value): | ||||
if self.offset and value: | if self.offset and value: | ||||
raise ValueError("negative_utc can only be True is offset=0") | raise ValueError("negative_utc can only be True is offset=0") | ||||
self._check_offsets_match() | self._check_offsets_match() | ||||
@offset_bytes.default | |||||
def _default_offset_bytes(self): | |||||
negative = self.offset < 0 or self.negative_utc | |||||
(hours, minutes) = divmod(abs(self.offset), 60) | |||||
return f"{'-' if negative else '+'}{hours:02}{minutes:02}".encode() | |||||
@offset_bytes.validator | @offset_bytes.validator | ||||
def check_offset_bytes(self, attribute, value): | def check_offset_bytes(self, attribute, value): | ||||
if not set(value) <= _OFFSET_CHARS: | if not set(value) <= _OFFSET_CHARS: | ||||
raise ValueError(f"invalid characters in offset_bytes: {value!r}") | raise ValueError(f"invalid characters in offset_bytes: {value!r}") | ||||
self._check_offsets_match() | self._check_offsets_match() | ||||
def _check_offsets_match(self): | @staticmethod | ||||
offset_str = self.offset_bytes.decode() | def _parse_offset_bytes(offset_bytes: bytes): | ||||
offset_str = offset_bytes.decode() | |||||
assert offset_str[0] in "+-" | assert offset_str[0] in "+-" | ||||
sign = int(offset_str[0] + "1") | sign = int(offset_str[0] + "1") | ||||
hours = int(offset_str[1:-2]) | hours = int(offset_str[1:-2]) | ||||
minutes = int(offset_str[-2:]) | minutes = int(offset_str[-2:]) | ||||
offset = sign * (hours * 60 + minutes) | offset = sign * (hours * 60 + minutes) | ||||
return offset | |||||
def _check_offsets_match(self): | |||||
offset = self._parse_offset_bytes(self.offset_bytes) | |||||
if offset != self.offset: | if offset != self.offset: | ||||
raise ValueError( | raise ValueError( | ||||
f"offset_bytes ({self.offset_bytes!r}) does not match offset " | f"offset_bytes ({self.offset_bytes!r}) does not match offset " | ||||
f"{divmod(self.offset, 60)}" | f"{divmod(self.offset, 60)}" | ||||
) | ) | ||||
if offset == 0 and self.negative_utc != self.offset_bytes.startswith(b"-"): | if offset == 0 and self.negative_utc != self.offset_bytes.startswith(b"-"): | ||||
raise ValueError( | raise ValueError( | ||||
f"offset_bytes ({self.offset_bytes!r}) does not match negative_utc " | f"offset_bytes ({self.offset_bytes!r}) does not match negative_utc " | ||||
f"({self.negative_utc})" | f"({self.negative_utc})" | ||||
) | ) | ||||
@classmethod | @classmethod | ||||
def from_dict(cls, time_representation: Union[Dict, datetime.datetime, int]): | def from_numeric_offset( | ||||
cls, timestamp: Timestamp, offset: int, negative_utc: bool | |||||
) -> "TimestampWithTimezone": | |||||
"""Returns a :class:`TimestampWithTimezone` instance from the old dictionary | |||||
format (with ``offset`` and ``negative_utc`` instead of ``offset_bytes``). | |||||
""" | |||||
negative = offset < 0 or negative_utc | |||||
(hours, minutes) = divmod(abs(offset), 60) | |||||
offset_bytes = f"{'-' if negative else '+'}{hours:02}{minutes:02}".encode() | |||||
tstz = TimestampWithTimezone( | |||||
timestamp=timestamp, | |||||
offset_bytes=offset_bytes, | |||||
offset=offset, | |||||
negative_utc=negative_utc, | |||||
) | |||||
assert tstz.offset == offset, (tstz.offset, offset) | |||||
return tstz | |||||
@classmethod | |||||
def from_dict( | |||||
cls, time_representation: Union[Dict, datetime.datetime, int] | |||||
) -> "TimestampWithTimezone": | |||||
"""Builds a TimestampWithTimezone from any of the formats | """Builds a TimestampWithTimezone from any of the formats | ||||
accepted by :func:`swh.model.normalize_timestamp`.""" | accepted by :func:`swh.model.normalize_timestamp`.""" | ||||
# TODO: this accept way more types than just dicts; find a better | # TODO: this accept way more types than just dicts; find a better | ||||
# name | # name | ||||
negative_utc = False | |||||
if isinstance(time_representation, dict): | if isinstance(time_representation, dict): | ||||
ts = time_representation["timestamp"] | ts = time_representation["timestamp"] | ||||
if isinstance(ts, dict): | if isinstance(ts, dict): | ||||
seconds = ts.get("seconds", 0) | seconds = ts.get("seconds", 0) | ||||
microseconds = ts.get("microseconds", 0) | microseconds = ts.get("microseconds", 0) | ||||
elif isinstance(ts, int): | elif isinstance(ts, int): | ||||
seconds = ts | seconds = ts | ||||
microseconds = 0 | microseconds = 0 | ||||
else: | else: | ||||
raise ValueError( | raise ValueError( | ||||
f"TimestampWithTimezone.from_dict received non-integer timestamp " | f"TimestampWithTimezone.from_dict received non-integer timestamp " | ||||
f"member {ts!r}" | f"member {ts!r}" | ||||
) | ) | ||||
timestamp = Timestamp(seconds=seconds, microseconds=microseconds) | |||||
if "offset_bytes" in time_representation: | |||||
return TimestampWithTimezone( | |||||
timestamp=timestamp, | |||||
offset_bytes=time_representation["offset_bytes"], | |||||
) | |||||
else: | |||||
# old format | |||||
offset = time_representation["offset"] | offset = time_representation["offset"] | ||||
if "negative_utc" in time_representation: | negative_utc = time_representation.get("negative_utc") or False | ||||
negative_utc = time_representation["negative_utc"] | return cls.from_numeric_offset(timestamp, offset, negative_utc) | ||||
if negative_utc is None: | |||||
negative_utc = False | |||||
elif isinstance(time_representation, datetime.datetime): | elif isinstance(time_representation, datetime.datetime): | ||||
# TODO: warn when using from_dict() on a datetime | |||||
utcoffset = time_representation.utcoffset() | utcoffset = time_representation.utcoffset() | ||||
time_representation = time_representation.astimezone(datetime.timezone.utc) | time_representation = time_representation.astimezone(datetime.timezone.utc) | ||||
microseconds = time_representation.microsecond | microseconds = time_representation.microsecond | ||||
if microseconds: | if microseconds: | ||||
time_representation = time_representation.replace(microsecond=0) | time_representation = time_representation.replace(microsecond=0) | ||||
seconds = int(time_representation.timestamp()) | seconds = int(time_representation.timestamp()) | ||||
if utcoffset is None: | if utcoffset is None: | ||||
raise ValueError( | raise ValueError( | ||||
f"TimestampWithTimezone.from_dict received datetime without " | f"TimestampWithTimezone.from_dict received datetime without " | ||||
f"timezone: {time_representation}" | f"timezone: {time_representation}" | ||||
) | ) | ||||
# utcoffset is an integer number of minutes | # utcoffset is an integer number of minutes | ||||
seconds_offset = utcoffset.total_seconds() | seconds_offset = utcoffset.total_seconds() | ||||
offset = int(seconds_offset) // 60 | offset = int(seconds_offset) // 60 | ||||
# TODO: warn if remainder is not zero | |||||
return cls.from_numeric_offset( | |||||
Timestamp(seconds=seconds, microseconds=microseconds), offset, False | |||||
) | |||||
elif isinstance(time_representation, int): | elif isinstance(time_representation, int): | ||||
# TODO: warn when using from_dict() on an int | |||||
seconds = time_representation | seconds = time_representation | ||||
microseconds = 0 | timestamp = Timestamp(seconds=time_representation, microseconds=0) | ||||
offset = 0 | return TimestampWithTimezone(timestamp=timestamp, offset_bytes=b"+0000") | ||||
else: | else: | ||||
raise ValueError( | raise ValueError( | ||||
f"TimestampWithTimezone.from_dict received non-integer timestamp: " | f"TimestampWithTimezone.from_dict received non-integer timestamp: " | ||||
f"{time_representation!r}" | f"{time_representation!r}" | ||||
) | ) | ||||
return cls( | |||||
timestamp=Timestamp(seconds=seconds, microseconds=microseconds), | |||||
offset=offset, | |||||
negative_utc=negative_utc, | |||||
) | |||||
@classmethod | @classmethod | ||||
def from_datetime(cls, dt: datetime.datetime): | def from_datetime(cls, dt: datetime.datetime) -> "TimestampWithTimezone": | ||||
return cls.from_dict(dt) | return cls.from_dict(dt) | ||||
def to_datetime(self) -> datetime.datetime: | def to_datetime(self) -> datetime.datetime: | ||||
"""Convert to a datetime (with a timezone set to the recorded fixed UTC offset) | """Convert to a datetime (with a timezone set to the recorded fixed UTC offset) | ||||
Beware that this conversion can be lossy: the negative_utc flag is not | Beware that this conversion can be lossy: ``-0000`` and 'weird' offsets | ||||
taken into consideration (since it cannot be represented in a | cannot be represented. Also note that it may fail due to type overflow. | ||||
datetime). Also note that it may fail due to type overflow. | |||||
""" | """ | ||||
timestamp = datetime.datetime.fromtimestamp( | timestamp = datetime.datetime.fromtimestamp( | ||||
self.timestamp.seconds, | self.timestamp.seconds, | ||||
datetime.timezone(datetime.timedelta(minutes=self.offset)), | datetime.timezone(datetime.timedelta(minutes=self.offset)), | ||||
) | ) | ||||
timestamp = timestamp.replace(microsecond=self.timestamp.microseconds) | timestamp = timestamp.replace(microsecond=self.timestamp.microseconds) | ||||
return timestamp | return timestamp | ||||
@classmethod | @classmethod | ||||
def from_iso8601(cls, s): | def from_iso8601(cls, s): | ||||
"""Builds a TimestampWithTimezone from an ISO8601-formatted string. | """Builds a TimestampWithTimezone from an ISO8601-formatted string. | ||||
""" | """ | ||||
dt = iso8601.parse_date(s) | dt = iso8601.parse_date(s) | ||||
tstz = cls.from_datetime(dt) | tstz = cls.from_datetime(dt) | ||||
if dt.tzname() == "-00:00": | if dt.tzname() == "-00:00": | ||||
assert tstz.offset_bytes == b"+0000" | assert tstz.offset_bytes == b"+0000" | ||||
tstz = attr.evolve(tstz, negative_utc=True, offset_bytes=b"-0000") | tstz = attr.evolve(tstz, offset_bytes=b"-0000", negative_utc=True) | ||||
return tstz | return tstz | ||||
@attr.s(frozen=True, slots=True) | @attr.s(frozen=True, slots=True) | ||||
class Origin(HashableObject, BaseModel): | class Origin(HashableObject, BaseModel): | ||||
"""Represents a software source: a VCS and an URL.""" | """Represents a software source: a VCS and an URL.""" | ||||
object_type: Final = "origin" | object_type: Final = "origin" | ||||
▲ Show 20 Lines • Show All 904 Lines • Show Last 20 Lines |
I assume that these should be marked optional