Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/package/archive/loader.py
Show First 20 Lines • Show All 133 Lines • ▼ Show 20 Lines | class ArchiveLoader(PackageLoader[ArchivePackageInfo]): | ||||
) -> Iterator[Tuple[str, ArchivePackageInfo]]: | ) -> Iterator[Tuple[str, ArchivePackageInfo]]: | ||||
for a_metadata in self.artifacts: | for a_metadata in self.artifacts: | ||||
p_info = ArchivePackageInfo.from_metadata(a_metadata) | p_info = ArchivePackageInfo.from_metadata(a_metadata) | ||||
if version == p_info.version: | if version == p_info.version: | ||||
# FIXME: this code assumes we have only 1 artifact per | # FIXME: this code assumes we have only 1 artifact per | ||||
# versioned package | # versioned package | ||||
yield release_name(version), p_info | yield release_name(version), p_info | ||||
def extid_from_reference_artifact(self, reference_artifact: Dict) -> bytes: | def new_packageinfo_to_extid(self, p_info: ArchivePackageInfo) -> Optional[bytes]: | ||||
reference_artifact_info = ArchivePackageInfo.from_metadata(reference_artifact) | return p_info.extid(manifest_format=self.extid_manifest_format) | ||||
return reference_artifact_info.extid(manifest_format=self.extid_manifest_format) | |||||
def known_artifact_to_extid(self, known_artifact: Dict) -> Optional[bytes]: | |||||
def resolve_revision_from( | known_artifact_info = ArchivePackageInfo.from_metadata( | ||||
self, known_artifacts: Dict, p_info: ArchivePackageInfo | known_artifact["extrinsic"]["raw"] | ||||
) -> Optional[bytes]: | ) | ||||
extid = p_info.extid(manifest_format=self.extid_manifest_format) | return known_artifact_info.extid(manifest_format=self.extid_manifest_format) | ||||
for rev_id, known_artifact in known_artifacts.items(): | |||||
logging.debug("known_artifact: %s", known_artifact) | |||||
reference_artifact = known_artifact["extrinsic"]["raw"] | |||||
known_extid = self.extid_from_reference_artifact(reference_artifact) | |||||
if extid == known_extid: | |||||
return rev_id | |||||
return None | |||||
def build_revision( | def build_revision( | ||||
self, p_info: ArchivePackageInfo, uncompressed_path: str, directory: Sha1Git | self, p_info: ArchivePackageInfo, uncompressed_path: str, directory: Sha1Git | ||||
) -> Optional[Revision]: | ) -> Optional[Revision]: | ||||
time = p_info.time # assume it's a timestamp | time = p_info.time # assume it's a timestamp | ||||
if isinstance(time, str): # otherwise, assume it's a parsable date | if isinstance(time, str): # otherwise, assume it's a parsable date | ||||
parsed_time = iso8601.parse_date(time) | parsed_time = iso8601.parse_date(time) | ||||
else: | else: | ||||
Show All 21 Lines |