Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/package/archive/loader.py
# Copyright (C) 2019-2021 The Software Heritage developers | # Copyright (C) 2019-2022 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import datetime | import datetime | ||||
import hashlib | import hashlib | ||||
import logging | import logging | ||||
from os import path | from os import path | ||||
▲ Show 20 Lines • Show All 130 Lines • ▼ Show 20 Lines | class ArchiveLoader(PackageLoader[ArchivePackageInfo]): | ||||
def build_release( | def build_release( | ||||
self, p_info: ArchivePackageInfo, uncompressed_path: str, directory: Sha1Git | self, p_info: ArchivePackageInfo, uncompressed_path: str, directory: Sha1Git | ||||
) -> Optional[Release]: | ) -> Optional[Release]: | ||||
time = p_info.time # assume it's a timestamp | time = p_info.time # assume it's a timestamp | ||||
if isinstance(time, str): # otherwise, assume it's a parsable date | if isinstance(time, str): # otherwise, assume it's a parsable date | ||||
parsed_time = iso8601.parse_date(time) | parsed_time = iso8601.parse_date(time) | ||||
else: | else: | ||||
parsed_time = time | parsed_time = time | ||||
normalized_time = TimestampWithTimezone.from_datetime(parsed_time) | normalized_time = ( | ||||
TimestampWithTimezone.from_datetime(parsed_time) | |||||
if parsed_time is not None | |||||
else None | |||||
) | |||||
msg = f"Synthetic release for archive at {p_info.url}\n" | msg = f"Synthetic release for archive at {p_info.url}\n" | ||||
return Release( | return Release( | ||||
name=p_info.version.encode(), | name=p_info.version.encode(), | ||||
message=msg.encode(), | message=msg.encode(), | ||||
date=normalized_time, | date=normalized_time, | ||||
author=EMPTY_AUTHOR, | author=EMPTY_AUTHOR, | ||||
target=directory, | target=directory, | ||||
target_type=ObjectType.DIRECTORY, | target_type=ObjectType.DIRECTORY, | ||||
synthetic=True, | synthetic=True, | ||||
) | ) | ||||
def extra_branches(self) -> Dict[bytes, Mapping[str, Any]]: | def extra_branches(self) -> Dict[bytes, Mapping[str, Any]]: | ||||
if not self.snapshot_append: | if not self.snapshot_append: | ||||
return {} | return {} | ||||
last_snapshot = self.last_snapshot() | last_snapshot = self.last_snapshot() | ||||
return last_snapshot.to_dict()["branches"] if last_snapshot else {} | return last_snapshot.to_dict()["branches"] if last_snapshot else {} |