diff --git a/swh/loader/package/archive/loader.py b/swh/loader/package/archive/loader.py index 82d2cc5..b4b99db 100644 --- a/swh/loader/package/archive/loader.py +++ b/swh/loader/package/archive/loader.py @@ -1,174 +1,185 @@ # Copyright (C) 2019-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime +import hashlib import logging from os import path +import string from typing import Any, Dict, Iterator, Optional, Sequence, Tuple, Union import attr import iso8601 from swh.loader.package.loader import BasePackageInfo, PackageLoader from swh.loader.package.utils import release_name from swh.model.model import ( Person, Revision, RevisionType, Sha1Git, TimestampWithTimezone, ) from swh.storage.interface import StorageInterface logger = logging.getLogger(__name__) SWH_PERSON = Person( name=b"Software Heritage", fullname=b"Software Heritage", email=b"robot@softwareheritage.org", ) REVISION_MESSAGE = b"swh-loader-package: synthetic revision message" @attr.s class ArchivePackageInfo(BasePackageInfo): raw_info = attr.ib(type=Dict[str, Any]) length = attr.ib(type=int) """Size of the archive file""" time = attr.ib(type=Union[str, datetime.datetime]) """Timestamp of the archive file on the server""" version = attr.ib(type=str) - # default keys for gnu - ID_KEYS = ["time", "url", "length", "version"] + # default format for gnu + MANIFEST_FORMAT = string.Template("$time $length $version $url") - def artifact_identity(self, id_keys=None): - if id_keys is None: - id_keys = self.ID_KEYS + def extid(self, manifest_format: Optional[string.Template] = None) -> bytes: + """Returns a unique intrinsic identifier of this package info + + ``manifest_format`` allows overriding the class' default MANIFEST_FORMAT""" + manifest_format = manifest_format or self.MANIFEST_FORMAT # TODO: use parsed attributes instead of self.raw_info - return [self.raw_info.get(k) for k in id_keys] + manifest = manifest_format.substitute( + {k: str(v) for (k, v) in self.raw_info.items()} + ) + return hashlib.sha256(manifest.encode()).digest() @classmethod def from_metadata(cls, a_metadata: Dict[str, Any]) -> "ArchivePackageInfo": url = a_metadata["url"] filename = a_metadata.get("filename") return cls( url=url, filename=filename if filename else path.split(url)[-1], raw_info=a_metadata, length=a_metadata["length"], time=a_metadata["time"], version=a_metadata["version"], ) class ArchiveLoader(PackageLoader[ArchivePackageInfo]): """Load archive origin's artifact files into swh archive """ visit_type = "tar" def __init__( self, storage: StorageInterface, url: str, artifacts: Sequence[Dict[str, Any]], - identity_artifact_keys: Optional[Sequence[str]] = None, + extid_manifest_format: Optional[str] = None, max_content_size: Optional[int] = None, ): - """Loader constructor. + f"""Loader constructor. For now, this is the lister's task output. Args: url: Origin url artifacts: List of artifact information with keys: - **time**: last modification time as either isoformat date string or timestamp - **url**: the artifact url to retrieve filename - **filename**: optionally, the file's name - **version**: artifact's version - **length**: artifact's length - identity_artifact_keys: Optional List of keys forming the - "identity" of an artifact + extid_manifest_format: template string used to format a manifest, + which is hashed to get the extid of a package. + Defaults to {ArchivePackageInfo.MANIFEST_FORMAT!r} """ super().__init__(storage=storage, url=url, max_content_size=max_content_size) self.artifacts = artifacts # assume order is enforced in the lister - self.identity_artifact_keys = identity_artifact_keys + self.extid_manifest_format = ( + None + if extid_manifest_format is None + else string.Template(extid_manifest_format) + ) def get_versions(self) -> Sequence[str]: versions = [] for archive in self.artifacts: v = archive.get("version") if v: versions.append(v) return versions def get_default_version(self) -> str: # It's the most recent, so for this loader, it's the last one return self.artifacts[-1]["version"] def get_package_info( self, version: str ) -> Iterator[Tuple[str, ArchivePackageInfo]]: for a_metadata in self.artifacts: p_info = ArchivePackageInfo.from_metadata(a_metadata) if version == p_info.version: # FIXME: this code assumes we have only 1 artifact per # versioned package yield release_name(version), p_info + def extid_from_reference_artifact(self, reference_artifact: Dict) -> bytes: + reference_artifact_info = ArchivePackageInfo.from_metadata(reference_artifact) + return reference_artifact_info.extid(manifest_format=self.extid_manifest_format) + def resolve_revision_from( self, known_artifacts: Dict, p_info: ArchivePackageInfo ) -> Optional[bytes]: - identity = p_info.artifact_identity(id_keys=self.identity_artifact_keys) + extid = p_info.extid(manifest_format=self.extid_manifest_format) for rev_id, known_artifact in known_artifacts.items(): logging.debug("known_artifact: %s", known_artifact) reference_artifact = known_artifact["extrinsic"]["raw"] - reference_artifact_info = ArchivePackageInfo.from_metadata( - reference_artifact - ) - known_identity = reference_artifact_info.artifact_identity( - id_keys=self.identity_artifact_keys - ) - if identity == known_identity: + known_extid = self.extid_from_reference_artifact(reference_artifact) + if extid == known_extid: return rev_id return None def build_revision( self, p_info: ArchivePackageInfo, uncompressed_path: str, directory: Sha1Git ) -> Optional[Revision]: time = p_info.time # assume it's a timestamp if isinstance(time, str): # otherwise, assume it's a parsable date parsed_time = iso8601.parse_date(time) else: parsed_time = time normalized_time = TimestampWithTimezone.from_datetime(parsed_time) return Revision( type=RevisionType.TAR, message=REVISION_MESSAGE, date=normalized_time, author=SWH_PERSON, committer=SWH_PERSON, committer_date=normalized_time, parents=(), directory=directory, synthetic=True, metadata={ "intrinsic": {}, "extrinsic": { "provider": self.url, "when": self.visit_date.isoformat(), "raw": p_info.raw_info, }, }, ) diff --git a/swh/loader/package/archive/tests/test_archive.py b/swh/loader/package/archive/tests/test_archive.py index 9c88fe6..aee018a 100644 --- a/swh/loader/package/archive/tests/test_archive.py +++ b/swh/loader/package/archive/tests/test_archive.py @@ -1,374 +1,372 @@ # Copyright (C) 2019-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import hashlib +import string + import attr +import pytest from swh.loader.package.archive.loader import ArchiveLoader, ArchivePackageInfo from swh.loader.package.tests.common import check_metadata_paths from swh.loader.tests import assert_last_visit_matches, check_snapshot, get_stats from swh.model.hashutil import hash_to_bytes from swh.model.model import Snapshot, SnapshotBranch, TargetType URL = "https://ftp.gnu.org/gnu/8sync/" GNU_ARTIFACTS = [ { "time": 944729610, "url": "https://ftp.gnu.org/gnu/8sync/8sync-0.1.0.tar.gz", "length": 221837, "filename": "8sync-0.1.0.tar.gz", "version": "0.1.0", } ] _expected_new_contents_first_visit = [ "e9258d81faf5881a2f96a77ba609396f82cb97ad", "1170cf105b04b7e2822a0e09d2acf71da7b9a130", "fbd27c3f41f2668624ffc80b7ba5db9b92ff27ac", "0057bec9b5422aff9256af240b177ac0e3ac2608", "2b8d0d0b43a1078fc708930c8ddc2956a86c566e", "27de3b3bc6545d2a797aeeb4657c0e215a0c2e55", "2e6db43f5cd764e677f416ff0d0c78c7a82ef19b", "ae9be03bd2a06ed8f4f118d3fe76330bb1d77f62", "edeb33282b2bffa0e608e9d2fd960fd08093c0ea", "d64e64d4c73679323f8d4cde2643331ba6c20af9", "7a756602914be889c0a2d3952c710144b3e64cb0", "84fb589b554fcb7f32b806951dcf19518d67b08f", "8624bcdae55baeef00cd11d5dfcfa60f68710a02", "e08441aeab02704cfbd435d6445f7c072f8f524e", "f67935bc3a83a67259cda4b2d43373bd56703844", "809788434b433eb2e3cfabd5d591c9a659d5e3d8", "7d7c6c8c5ebaeff879f61f37083a3854184f6c41", "b99fec102eb24bffd53ab61fc30d59e810f116a2", "7d149b28eaa228b3871c91f0d5a95a2fa7cb0c68", "f0c97052e567948adf03e641301e9983c478ccff", "7fb724242e2b62b85ca64190c31dcae5303e19b3", "4f9709e64a9134fe8aefb36fd827b84d8b617ab5", "7350628ccf194c2c3afba4ac588c33e3f3ac778d", "0bb892d9391aa706dc2c3b1906567df43cbe06a2", "49d4c0ce1a16601f1e265d446b6c5ea6b512f27c", "6b5cc594ac466351450f7f64a0b79fdaf4435ad3", "3046e5d1f70297e2a507b98224b6222c9688d610", "1572607d456d7f633bc6065a2b3048496d679a31", ] _expected_new_directories_first_visit = [ "daabc65ec75d487b1335ffc101c0ac11c803f8fc", "263be23b4a8101d3ad0d9831319a3e0f2b065f36", "7f6e63ba6eb3e2236f65892cd822041f1a01dd5c", "4db0a3ecbc976083e2dac01a62f93729698429a3", "dfef1c80e1098dd5deda664bb44a9ab1f738af13", "eca971d346ea54d95a6e19d5051f900237fafdaa", "3aebc29ed1fccc4a6f2f2010fb8e57882406b528", ] _expected_new_revisions_first_visit = { "44183488c0774ce3c957fa19ba695cf18a4a42b3": ( "3aebc29ed1fccc4a6f2f2010fb8e57882406b528" ) } def test_archive_visit_with_no_artifact_found(swh_storage, requests_mock_datadir): url = URL unknown_artifact_url = "https://ftp.g.o/unknown/8sync-0.1.0.tar.gz" loader = ArchiveLoader( swh_storage, url, artifacts=[ { "time": 944729610, "url": unknown_artifact_url, # unknown artifact "length": 221837, "filename": "8sync-0.1.0.tar.gz", "version": "0.1.0", } ], ) actual_load_status = loader.load() assert actual_load_status["status"] == "uneventful" assert actual_load_status["snapshot_id"] is not None stats = get_stats(swh_storage) assert { "content": 0, "directory": 0, "origin": 1, "origin_visit": 1, "release": 0, "revision": 0, "skipped_content": 0, "snapshot": 1, } == stats assert_last_visit_matches(swh_storage, url, status="partial", type="tar") def test_archive_check_revision_metadata_structure(swh_storage, requests_mock_datadir): loader = ArchiveLoader(swh_storage, URL, artifacts=GNU_ARTIFACTS) actual_load_status = loader.load() assert actual_load_status["status"] == "eventful" assert actual_load_status["snapshot_id"] is not None assert_last_visit_matches(swh_storage, URL, status="full", type="tar") expected_revision_id = hash_to_bytes("44183488c0774ce3c957fa19ba695cf18a4a42b3") revision = swh_storage.revision_get([expected_revision_id])[0] assert revision is not None check_metadata_paths( revision.metadata, paths=[ ("intrinsic", dict), ("extrinsic.provider", str), ("extrinsic.when", str), ("extrinsic.raw", dict), ("original_artifact", list), ], ) for original_artifact in revision.metadata["original_artifact"]: check_metadata_paths( original_artifact, paths=[("filename", str), ("length", int), ("checksums", dict),], ) def test_archive_visit_with_release_artifact_no_prior_visit( swh_storage, requests_mock_datadir ): """With no prior visit, load a gnu project ends up with 1 snapshot """ loader = ArchiveLoader(swh_storage, URL, artifacts=GNU_ARTIFACTS) actual_load_status = loader.load() assert actual_load_status["status"] == "eventful" expected_snapshot_first_visit_id = hash_to_bytes( "c419397fd912039825ebdbea378bc6283f006bf5" ) assert ( hash_to_bytes(actual_load_status["snapshot_id"]) == expected_snapshot_first_visit_id ) assert_last_visit_matches(swh_storage, URL, status="full", type="tar") stats = get_stats(swh_storage) assert { "content": len(_expected_new_contents_first_visit), "directory": len(_expected_new_directories_first_visit), "origin": 1, "origin_visit": 1, "release": 0, "revision": len(_expected_new_revisions_first_visit), "skipped_content": 0, "snapshot": 1, } == stats expected_contents = map(hash_to_bytes, _expected_new_contents_first_visit) assert list(swh_storage.content_missing_per_sha1(expected_contents)) == [] expected_dirs = map(hash_to_bytes, _expected_new_directories_first_visit) assert list(swh_storage.directory_missing(expected_dirs)) == [] expected_revs = map(hash_to_bytes, _expected_new_revisions_first_visit) assert list(swh_storage.revision_missing(expected_revs)) == [] expected_snapshot = Snapshot( id=expected_snapshot_first_visit_id, branches={ b"HEAD": SnapshotBranch( target_type=TargetType.ALIAS, target=b"releases/0.1.0", ), b"releases/0.1.0": SnapshotBranch( target_type=TargetType.REVISION, target=hash_to_bytes("44183488c0774ce3c957fa19ba695cf18a4a42b3"), ), }, ) check_snapshot(expected_snapshot, swh_storage) def test_archive_2_visits_without_change(swh_storage, requests_mock_datadir): """With no prior visit, load a gnu project ends up with 1 snapshot """ url = URL loader = ArchiveLoader(swh_storage, url, artifacts=GNU_ARTIFACTS) actual_load_status = loader.load() assert actual_load_status["status"] == "eventful" assert actual_load_status["snapshot_id"] is not None assert_last_visit_matches(swh_storage, url, status="full", type="tar") actual_load_status2 = loader.load() assert actual_load_status2["status"] == "uneventful" assert actual_load_status2["snapshot_id"] is not None assert actual_load_status["snapshot_id"] == actual_load_status2["snapshot_id"] assert_last_visit_matches(swh_storage, url, status="full", type="tar") urls = [ m.url for m in requests_mock_datadir.request_history if m.url.startswith("https://ftp.gnu.org") ] assert len(urls) == 1 def test_archive_2_visits_with_new_artifact(swh_storage, requests_mock_datadir): """With no prior visit, load a gnu project ends up with 1 snapshot """ url = URL artifact1 = GNU_ARTIFACTS[0] loader = ArchiveLoader(swh_storage, url, [artifact1]) actual_load_status = loader.load() assert actual_load_status["status"] == "eventful" assert actual_load_status["snapshot_id"] is not None assert_last_visit_matches(swh_storage, url, status="full", type="tar") stats = get_stats(swh_storage) assert { "content": len(_expected_new_contents_first_visit), "directory": len(_expected_new_directories_first_visit), "origin": 1, "origin_visit": 1, "release": 0, "revision": len(_expected_new_revisions_first_visit), "skipped_content": 0, "snapshot": 1, } == stats urls = [ m.url for m in requests_mock_datadir.request_history if m.url.startswith("https://ftp.gnu.org") ] assert len(urls) == 1 artifact2 = { "time": 1480991830, "url": "https://ftp.gnu.org/gnu/8sync/8sync-0.2.0.tar.gz", "length": 238466, "filename": "8sync-0.2.0.tar.gz", "version": "0.2.0", } loader2 = ArchiveLoader(swh_storage, url, [artifact1, artifact2]) stats2 = get_stats(swh_storage) assert stats == stats2 # ensure we share the storage actual_load_status2 = loader2.load() assert actual_load_status2["status"] == "eventful" assert actual_load_status2["snapshot_id"] is not None stats2 = get_stats(swh_storage) assert { "content": len(_expected_new_contents_first_visit) + 14, "directory": len(_expected_new_directories_first_visit) + 8, "origin": 1, "origin_visit": 1 + 1, "release": 0, "revision": len(_expected_new_revisions_first_visit) + 1, "skipped_content": 0, "snapshot": 1 + 1, } == stats2 assert_last_visit_matches(swh_storage, url, status="full", type="tar") urls = [ m.url for m in requests_mock_datadir.request_history if m.url.startswith("https://ftp.gnu.org") ] # 1 artifact (2nd time no modification) + 1 new artifact assert len(urls) == 2 def test_archive_2_visits_without_change_not_gnu(swh_storage, requests_mock_datadir): """Load a project archive (not gnu) ends up with 1 snapshot """ url = "https://something.else.org/8sync/" artifacts = [ # this is not a gnu artifact { "time": "1999-12-09T09:53:30+00:00", # it's also not a timestamp "sha256": "d5d1051e59b2be6f065a9fc6aedd3a391e44d0274b78b9bb4e2b57a09134dbe4", # noqa # keep a gnu artifact reference to avoid adding other test files "url": "https://ftp.gnu.org/gnu/8sync/8sync-0.2.0.tar.gz", "length": 238466, "filename": "8sync-0.2.0.tar.gz", "version": "0.2.0", } ] # Here the loader defines the id_keys to use for existence in the snapshot # It's not the default archive loader which loader = ArchiveLoader( swh_storage, url, artifacts=artifacts, - identity_artifact_keys=["sha256", "length", "url"], + extid_manifest_format="$sha256 $length $url", ) actual_load_status = loader.load() assert actual_load_status["status"] == "eventful" assert actual_load_status["snapshot_id"] is not None assert_last_visit_matches(swh_storage, url, status="full", type="tar") actual_load_status2 = loader.load() assert actual_load_status2["status"] == "uneventful" assert actual_load_status2["snapshot_id"] == actual_load_status["snapshot_id"] assert_last_visit_matches(swh_storage, url, status="full", type="tar") urls = [ m.url for m in requests_mock_datadir.request_history if m.url.startswith("https://ftp.gnu.org") ] assert len(urls) == 1 -def test_archive_artifact_identity(): +def test_archive_extid(): """Compute primary key should return the right identity """ @attr.s class TestPackageInfo(ArchivePackageInfo): a = attr.ib() b = attr.ib() metadata = GNU_ARTIFACTS[0] p_info = TestPackageInfo( raw_info={**metadata, "a": 1, "b": 2}, a=1, b=2, **metadata, ) - for id_keys, expected_id in [ - (["a", "b"], [1, 2]), - ([], []), - (["a", "key-that-does-not-exist"], [1, None]), - ( - None, - [ - metadata["time"], - metadata["url"], - metadata["length"], - metadata["version"], - ], - ), + for manifest_format, expected_manifest in [ + (string.Template("$a $b"), b"1 2"), + (string.Template(""), b""), + (None, "{time} {length} {version} {url}".format(**metadata).encode()), ]: - actual_id = p_info.artifact_identity(id_keys=id_keys) - assert actual_id == expected_id + actual_id = p_info.extid(manifest_format=manifest_format) + assert actual_id == hashlib.sha256(expected_manifest).digest() + + with pytest.raises(KeyError): + p_info.extid(manifest_format=string.Template("$a $unknown_key")) diff --git a/swh/loader/package/cran/loader.py b/swh/loader/package/cran/loader.py index 4af0e8f..38adeb0 100644 --- a/swh/loader/package/cran/loader.py +++ b/swh/loader/package/cran/loader.py @@ -1,209 +1,211 @@ # Copyright (C) 2019-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from datetime import timezone import logging import os from os import path import re +import string from typing import Any, Dict, Iterator, List, Mapping, Optional, Tuple import attr import dateutil.parser from debian.deb822 import Deb822 from swh.loader.package.loader import BasePackageInfo, PackageLoader from swh.loader.package.utils import release_name from swh.model.model import ( Person, Revision, RevisionType, Sha1Git, TimestampWithTimezone, ) from swh.storage.interface import StorageInterface logger = logging.getLogger(__name__) DATE_PATTERN = re.compile(r"^(?P\d{4})-(?P\d{2})$") @attr.s class CRANPackageInfo(BasePackageInfo): raw_info = attr.ib(type=Dict[str, Any]) version = attr.ib(type=str) - ID_KEYS = ["url", "version"] + MANIFEST_FORMAT = string.Template("$version $url") @classmethod def from_metadata(cls, a_metadata: Dict[str, Any]) -> "CRANPackageInfo": url = a_metadata["url"] return CRANPackageInfo( url=url, filename=path.basename(url), raw_info=a_metadata, version=a_metadata["version"], ) class CRANLoader(PackageLoader[CRANPackageInfo]): visit_type = "cran" def __init__( self, storage: StorageInterface, url: str, artifacts: List[Dict], max_content_size: Optional[int] = None, ): """Loader constructor. Args: url: Origin url to retrieve cran artifact(s) from artifacts: List of associated artifact for the origin url """ super().__init__(storage=storage, url=url, max_content_size=max_content_size) # explicit what we consider the artifact identity self.artifacts = artifacts def get_versions(self) -> List[str]: versions = [] for artifact in self.artifacts: versions.append(artifact["version"]) return versions def get_default_version(self) -> str: return self.artifacts[-1]["version"] def get_package_info(self, version: str) -> Iterator[Tuple[str, CRANPackageInfo]]: for a_metadata in self.artifacts: p_info = CRANPackageInfo.from_metadata(a_metadata) if version == p_info.version: yield release_name(version), p_info + def extid_from_known_artifact(self, known_artifact: Dict) -> bytes: + return CRANPackageInfo.from_metadata(known_artifact).extid() + def resolve_revision_from( self, known_artifacts: Mapping[bytes, Mapping], p_info: CRANPackageInfo, ) -> Optional[bytes]: """Given known_artifacts per revision, try to determine the revision for artifact_metadata """ - new_identity = p_info.artifact_identity() + new_extid = p_info.extid() for rev_id, known_artifact_meta in known_artifacts.items(): logging.debug("known_artifact_meta: %s", known_artifact_meta) known_artifact = known_artifact_meta["extrinsic"]["raw"] - known_identity = CRANPackageInfo.from_metadata( - known_artifact - ).artifact_identity() - if new_identity == known_identity: + known_extid = self.extid_from_known_artifact(known_artifact) + if new_extid == known_extid: return rev_id return None def build_revision( self, p_info: CRANPackageInfo, uncompressed_path: str, directory: Sha1Git ) -> Optional[Revision]: # a_metadata is empty metadata = extract_intrinsic_metadata(uncompressed_path) date = parse_date(metadata.get("Date")) author = Person.from_fullname(metadata.get("Maintainer", "").encode()) version = metadata.get("Version", p_info.version) return Revision( message=version.encode("utf-8"), type=RevisionType.TAR, date=date, author=author, committer=author, committer_date=date, parents=(), directory=directory, synthetic=True, metadata={ "intrinsic": {"tool": "DESCRIPTION", "raw": metadata,}, "extrinsic": { "provider": self.url, "when": self.visit_date.isoformat(), "raw": p_info.raw_info, }, }, ) def parse_debian_control(filepath: str) -> Dict[str, Any]: """Parse debian control at filepath""" metadata: Dict = {} logger.debug("Debian control file %s", filepath) for paragraph in Deb822.iter_paragraphs(open(filepath, "rb")): logger.debug("paragraph: %s", paragraph) metadata.update(**paragraph) logger.debug("metadata parsed: %s", metadata) return metadata def extract_intrinsic_metadata(dir_path: str) -> Dict[str, Any]: """Given an uncompressed path holding the DESCRIPTION file, returns a DESCRIPTION parsed structure as a dict. Cran origins describes their intrinsic metadata within a DESCRIPTION file at the root tree of a tarball. This DESCRIPTION uses a simple file format called DCF, the Debian control format. The release artifact contains at their root one folder. For example: $ tar tvf zprint-0.0.6.tar.gz drwxr-xr-x root/root 0 2018-08-22 11:01 zprint-0.0.6/ ... Args: dir_path (str): Path to the uncompressed directory representing a release artifact from pypi. Returns: the DESCRIPTION parsed structure as a dict (or empty dict if missing) """ # Retrieve the root folder of the archive if not os.path.exists(dir_path): return {} lst = os.listdir(dir_path) if len(lst) != 1: return {} project_dirname = lst[0] description_path = os.path.join(dir_path, project_dirname, "DESCRIPTION") if not os.path.exists(description_path): return {} return parse_debian_control(description_path) def parse_date(date: Optional[str]) -> Optional[TimestampWithTimezone]: """Parse a date into a datetime """ assert not date or isinstance(date, str) dt: Optional[datetime.datetime] = None if not date: return None try: specific_date = DATE_PATTERN.match(date) if specific_date: year = int(specific_date.group("year")) month = int(specific_date.group("month")) dt = datetime.datetime(year, month, 1) else: dt = dateutil.parser.parse(date) if not dt.tzinfo: # up for discussion the timezone needs to be set or # normalize_timestamp is not happy: ValueError: normalize_timestamp # received datetime without timezone: 2001-06-08 00:00:00 dt = dt.replace(tzinfo=timezone.utc) except Exception as e: logger.warning("Fail to parse date %s. Reason: %s", date, e) if dt: return TimestampWithTimezone.from_datetime(dt) else: return None diff --git a/swh/loader/package/loader.py b/swh/loader/package/loader.py index 3638527..71b4b7e 100644 --- a/swh/loader/package/loader.py +++ b/swh/loader/package/loader.py @@ -1,840 +1,851 @@ # Copyright (C) 2019-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime +import hashlib from itertools import islice import json import logging import os +import string import sys import tempfile from typing import ( Any, Dict, Generic, Iterable, Iterator, List, Mapping, Optional, Sequence, Tuple, TypeVar, ) import attr import sentry_sdk from swh.core.tarball import uncompress from swh.loader.core.loader import BaseLoader from swh.loader.exception import NotFound from swh.loader.package.utils import download from swh.model import from_disk from swh.model.collections import ImmutableDict from swh.model.hashutil import hash_to_hex from swh.model.identifiers import ( CoreSWHID, ExtendedObjectType, ExtendedSWHID, ObjectType, ) from swh.model.model import ( MetadataAuthority, MetadataAuthorityType, MetadataFetcher, Origin, OriginVisit, OriginVisitStatus, RawExtrinsicMetadata, Revision, Sha1Git, Snapshot, TargetType, ) from swh.storage.algos.snapshot import snapshot_get_latest from swh.storage.interface import StorageInterface from swh.storage.utils import now logger = logging.getLogger(__name__) SWH_METADATA_AUTHORITY = MetadataAuthority( type=MetadataAuthorityType.REGISTRY, url="https://softwareheritage.org/", metadata={}, ) """Metadata authority for extrinsic metadata generated by Software Heritage. Used for metadata on "original artifacts", ie. length, filename, and checksums of downloaded archive files.""" @attr.s class RawExtrinsicMetadataCore: """Contains the core of the metadata extracted by a loader, that will be used to build a full RawExtrinsicMetadata object by adding object identifier, context, and provenance information.""" format = attr.ib(type=str) metadata = attr.ib(type=bytes) discovery_date = attr.ib(type=Optional[datetime.datetime], default=None) """Defaults to the visit date.""" @attr.s class BasePackageInfo: """Compute the primary key for a dict using the id_keys as primary key composite. Args: d: A dict entry to compute the primary key on id_keys: Sequence of keys to use as primary key Returns: The identity for that dict entry """ url = attr.ib(type=str) filename = attr.ib(type=Optional[str]) # The following attribute has kw_only=True in order to allow subclasses # to add attributes. Without kw_only, attributes without default values cannot # go after attributes with default values. # See directory_extrinsic_metadata = attr.ib( type=List[RawExtrinsicMetadataCore], default=[], kw_only=True, ) # TODO: add support for metadata for directories and contents @property - def ID_KEYS(self): - raise NotImplementedError(f"{self.__class__.__name__} is missing ID_KEYS") + def MANIFEST_FORMAT(self) -> string.Template: + """A string.Template object used to format a manifest, which is hashed + to get the extid of this package info object""" + raise NotImplementedError( + f"{self.__class__.__name__} is missing MANIFEST_FORMAT " + f"or an override of extid()" + ) - def artifact_identity(self): - return [getattr(self, k) for k in self.ID_KEYS] + def extid(self) -> bytes: + """Returns a unique intrinsic identifier of this package info""" + manifest = self.MANIFEST_FORMAT.substitute( + {k: str(v) for (k, v) in attr.asdict(self).items()} + ) + return hashlib.sha256(manifest.encode()).digest() TPackageInfo = TypeVar("TPackageInfo", bound=BasePackageInfo) class PackageLoader(BaseLoader, Generic[TPackageInfo]): # Origin visit type (str) set by the loader visit_type = "" visit_date: datetime.datetime def __init__( self, storage: StorageInterface, url: str, max_content_size: Optional[int] = None, ): """Loader's constructor. This raises exception if the minimal required configuration is missing (cf. fn:`check` method). Args: storage: Storage instance url: Origin url to load data from """ super().__init__(storage=storage, max_content_size=max_content_size) self.url = url self.visit_date = datetime.datetime.now(tz=datetime.timezone.utc) def get_versions(self) -> Sequence[str]: """Return the list of all published package versions. Raises: `class:swh.loader.exception.NotFound` error when failing to read the published package versions. Returns: Sequence of published versions """ return [] def get_package_info(self, version: str) -> Iterator[Tuple[str, TPackageInfo]]: """Given a release version of a package, retrieve the associated package information for such version. Args: version: Package version Returns: (branch name, package metadata) """ yield from {} def build_revision( self, p_info: TPackageInfo, uncompressed_path: str, directory: Sha1Git ) -> Optional[Revision]: """Build the revision from the archive metadata (extrinsic artifact metadata) and the intrinsic metadata. Args: p_info: Package information uncompressed_path: Artifact uncompressed path on disk Returns: Revision object """ raise NotImplementedError("build_revision") def get_default_version(self) -> str: """Retrieve the latest release version if any. Returns: Latest version """ return "" def last_snapshot(self) -> Optional[Snapshot]: """Retrieve the last snapshot out of the last visit. """ return snapshot_get_latest(self.storage, self.url) def known_artifacts( self, snapshot: Optional[Snapshot] ) -> Dict[Sha1Git, Optional[ImmutableDict[str, object]]]: """Retrieve the known releases/artifact for the origin. Args snapshot: snapshot for the visit Returns: Dict of keys revision id (bytes), values a metadata Dict. """ if not snapshot: return {} # retrieve only revisions (e.g the alias we do not want here) revs = [ rev.target for rev in snapshot.branches.values() if rev and rev.target_type == TargetType.REVISION ] known_revisions = self.storage.revision_get(revs) return { revision.id: revision.metadata for revision in known_revisions if revision } def resolve_revision_from( self, known_artifacts: Dict, p_info: TPackageInfo, ) -> Optional[bytes]: """Resolve the revision from a snapshot and an artifact metadata dict. If the artifact has already been downloaded, this will return the existing revision targeting that uncompressed artifact directory. Otherwise, this returns None. Args: snapshot: Snapshot p_info: Package information Returns: None or revision identifier """ return None def download_package( self, p_info: TPackageInfo, tmpdir: str ) -> List[Tuple[str, Mapping]]: """Download artifacts for a specific package. All downloads happen in in the tmpdir folder. Default implementation expects the artifacts package info to be about one artifact per package. Note that most implementation have 1 artifact per package. But some implementation have multiple artifacts per package (debian), some have none, the package is the artifact (gnu). Args: artifacts_package_info: Information on the package artifacts to download (url, filename, etc...) tmpdir: Location to retrieve such artifacts Returns: List of (path, computed hashes) """ return [download(p_info.url, dest=tmpdir, filename=p_info.filename)] def uncompress( self, dl_artifacts: List[Tuple[str, Mapping[str, Any]]], dest: str ) -> str: """Uncompress the artifact(s) in the destination folder dest. Optionally, this could need to use the p_info dict for some more information (debian). """ uncompressed_path = os.path.join(dest, "src") for a_path, _ in dl_artifacts: uncompress(a_path, dest=uncompressed_path) return uncompressed_path def extra_branches(self) -> Dict[bytes, Mapping[str, Any]]: """Return an extra dict of branches that are used to update the set of branches. """ return {} def finalize_visit( self, *, snapshot: Optional[Snapshot], visit: OriginVisit, status_visit: str, status_load: str, failed_branches: List[str], ) -> Dict[str, Any]: """Finalize the visit: - flush eventual unflushed data to storage - update origin visit's status - return the task's status """ self.storage.flush() snapshot_id: Optional[bytes] = None if snapshot and snapshot.id: # to prevent the snapshot.id to b"" snapshot_id = snapshot.id assert visit.visit visit_status = OriginVisitStatus( origin=self.url, visit=visit.visit, type=self.visit_type, date=now(), status=status_visit, snapshot=snapshot_id, ) self.storage.origin_visit_status_add([visit_status]) result: Dict[str, Any] = { "status": status_load, } if snapshot_id: result["snapshot_id"] = hash_to_hex(snapshot_id) if failed_branches: logger.warning("%d failed branches", len(failed_branches)) for i, urls in enumerate(islice(failed_branches, 50)): prefix_url = "Failed branches: " if i == 0 else "" logger.warning("%s%s", prefix_url, urls) return result def load(self) -> Dict: """Load for a specific origin the associated contents. for each package version of the origin 1. Fetch the files for one package version By default, this can be implemented as a simple HTTP request. Loaders with more specific requirements can override this, e.g.: the PyPI loader checks the integrity of the downloaded files; the Debian loader has to download and check several files for one package version. 2. Extract the downloaded files By default, this would be a universal archive/tarball extraction. Loaders for specific formats can override this method (for instance, the Debian loader uses dpkg-source -x). 3. Convert the extracted directory to a set of Software Heritage objects Using swh.model.from_disk. 4. Extract the metadata from the unpacked directories This would only be applicable for "smart" loaders like npm (parsing the package.json), PyPI (parsing the PKG-INFO file) or Debian (parsing debian/changelog and debian/control). On "minimal-metadata" sources such as the GNU archive, the lister should provide the minimal set of metadata needed to populate the revision/release objects (authors, dates) as an argument to the task. 5. Generate the revision/release objects for the given version. From the data generated at steps 3 and 4. end for each 6. Generate and load the snapshot for the visit Using the revisions/releases collected at step 5., and the branch information from step 0., generate a snapshot and load it into the Software Heritage archive """ status_load = "uneventful" # either: eventful, uneventful, failed status_visit = "full" # see swh.model.model.OriginVisitStatus tmp_revisions = {} # type: Dict[str, List] snapshot = None failed_branches: List[str] = [] # Prepare origin and origin_visit origin = Origin(url=self.url) try: self.storage.origin_add([origin]) visit = list( self.storage.origin_visit_add( [ OriginVisit( origin=self.url, date=self.visit_date, type=self.visit_type, ) ] ) )[0] except Exception as e: logger.exception("Failed to initialize origin_visit for %s", self.url) sentry_sdk.capture_exception(e) return {"status": "failed"} try: last_snapshot = self.last_snapshot() logger.debug("last snapshot: %s", last_snapshot) known_artifacts = self.known_artifacts(last_snapshot) logger.debug("known artifacts: %s", known_artifacts) except Exception as e: logger.exception("Failed to get previous state for %s", self.url) sentry_sdk.capture_exception(e) return self.finalize_visit( snapshot=snapshot, visit=visit, failed_branches=failed_branches, status_visit="failed", status_load="failed", ) load_exceptions: List[Exception] = [] try: versions = self.get_versions() except NotFound: return self.finalize_visit( snapshot=snapshot, visit=visit, failed_branches=failed_branches, status_visit="not_found", status_load="failed", ) except Exception: return self.finalize_visit( snapshot=snapshot, visit=visit, failed_branches=failed_branches, status_visit="failed", status_load="failed", ) for version in versions: logger.debug("version: %s", version) tmp_revisions[version] = [] # `p_` stands for `package_` for branch_name, p_info in self.get_package_info(version): logger.debug("package_info: %s", p_info) revision_id = self.resolve_revision_from(known_artifacts, p_info) if revision_id is None: try: res = self._load_revision(p_info, origin) if res: (revision_id, directory_id) = res assert revision_id assert directory_id self._load_extrinsic_directory_metadata( p_info, revision_id, directory_id ) self.storage.flush() status_load = "eventful" except Exception as e: self.storage.clear_buffers() load_exceptions.append(e) sentry_sdk.capture_exception(e) logger.exception( "Failed loading branch %s for %s", branch_name, self.url ) failed_branches.append(branch_name) continue if revision_id is None: continue tmp_revisions[version].append((branch_name, revision_id)) if load_exceptions: status_visit = "partial" if not tmp_revisions: # We could not load any revisions; fail completely return self.finalize_visit( snapshot=snapshot, visit=visit, failed_branches=failed_branches, status_visit="failed", status_load="failed", ) try: # Retrieve the default release version (the "latest" one) default_version = self.get_default_version() logger.debug("default version: %s", default_version) # Retrieve extra branches extra_branches = self.extra_branches() logger.debug("extra branches: %s", extra_branches) snapshot = self._load_snapshot( default_version, tmp_revisions, extra_branches ) self.storage.flush() except Exception as e: logger.exception("Failed to build snapshot for origin %s", self.url) sentry_sdk.capture_exception(e) status_visit = "failed" status_load = "failed" if snapshot: try: metadata_objects = self.build_extrinsic_snapshot_metadata(snapshot.id) self._load_metadata_objects(metadata_objects) except Exception as e: logger.exception( "Failed to load extrinsic snapshot metadata for %s", self.url ) sentry_sdk.capture_exception(e) status_visit = "partial" status_load = "failed" try: metadata_objects = self.build_extrinsic_origin_metadata() self._load_metadata_objects(metadata_objects) except Exception as e: logger.exception( "Failed to load extrinsic origin metadata for %s", self.url ) sentry_sdk.capture_exception(e) status_visit = "partial" status_load = "failed" return self.finalize_visit( snapshot=snapshot, visit=visit, failed_branches=failed_branches, status_visit=status_visit, status_load=status_load, ) def _load_directory( self, dl_artifacts: List[Tuple[str, Mapping[str, Any]]], tmpdir: str ) -> Tuple[str, from_disk.Directory]: uncompressed_path = self.uncompress(dl_artifacts, dest=tmpdir) logger.debug("uncompressed_path: %s", uncompressed_path) directory = from_disk.Directory.from_disk( path=uncompressed_path.encode("utf-8"), max_content_length=self.max_content_size, ) contents, skipped_contents, directories = from_disk.iter_directory(directory) logger.debug("Number of skipped contents: %s", len(skipped_contents)) self.storage.skipped_content_add(skipped_contents) logger.debug("Number of contents: %s", len(contents)) self.storage.content_add(contents) logger.debug("Number of directories: %s", len(directories)) self.storage.directory_add(directories) return (uncompressed_path, directory) def _load_revision( self, p_info: TPackageInfo, origin ) -> Optional[Tuple[Sha1Git, Sha1Git]]: """Does all the loading of a revision itself: * downloads a package and uncompresses it * loads it from disk * adds contents, directories, and revision to self.storage * returns (revision_id, directory_id) Raises exception when unable to download or uncompress artifacts """ with tempfile.TemporaryDirectory() as tmpdir: dl_artifacts = self.download_package(p_info, tmpdir) (uncompressed_path, directory) = self._load_directory(dl_artifacts, tmpdir) # FIXME: This should be release. cf. D409 revision = self.build_revision( p_info, uncompressed_path, directory=directory.hash ) if not revision: # Some artifacts are missing intrinsic metadata # skipping those return None metadata = [metadata for (filepath, metadata) in dl_artifacts] extra_metadata: Tuple[str, Any] = ( "original_artifact", metadata, ) if revision.metadata is not None: full_metadata = list(revision.metadata.items()) + [extra_metadata] else: full_metadata = [extra_metadata] # TODO: don't add these extrinsic metadata to the revision. revision = attr.evolve(revision, metadata=ImmutableDict(full_metadata)) original_artifact_metadata = RawExtrinsicMetadata( target=ExtendedSWHID( object_type=ExtendedObjectType.DIRECTORY, object_id=revision.directory ), discovery_date=self.visit_date, authority=SWH_METADATA_AUTHORITY, fetcher=self.get_metadata_fetcher(), format="original-artifacts-json", metadata=json.dumps(metadata).encode(), origin=self.url, revision=CoreSWHID(object_type=ObjectType.REVISION, object_id=revision.id), ) self._load_metadata_objects([original_artifact_metadata]) logger.debug("Revision: %s", revision) self.storage.revision_add([revision]) assert directory.hash return (revision.id, directory.hash) def _load_snapshot( self, default_version: str, revisions: Dict[str, List[Tuple[str, bytes]]], extra_branches: Dict[bytes, Mapping[str, Any]], ) -> Optional[Snapshot]: """Build snapshot out of the current revisions stored and extra branches. Then load it in the storage. """ logger.debug("revisions: %s", revisions) # Build and load the snapshot branches = {} # type: Dict[bytes, Mapping[str, Any]] for version, branch_name_revisions in revisions.items(): if version == default_version and len(branch_name_revisions) == 1: # only 1 branch (no ambiguity), we can create an alias # branch 'HEAD' branch_name, _ = branch_name_revisions[0] # except for some corner case (deposit) if branch_name != "HEAD": branches[b"HEAD"] = { "target_type": "alias", "target": branch_name.encode("utf-8"), } for branch_name, target in branch_name_revisions: branches[branch_name.encode("utf-8")] = { "target_type": "revision", "target": target, } # Deal with extra-branches for name, branch_target in extra_branches.items(): if name in branches: logger.error("Extra branch '%s' has been ignored", name) else: branches[name] = branch_target snapshot_data = {"branches": branches} logger.debug("snapshot: %s", snapshot_data) snapshot = Snapshot.from_dict(snapshot_data) logger.debug("snapshot: %s", snapshot) self.storage.snapshot_add([snapshot]) return snapshot def get_loader_name(self) -> str: """Returns a fully qualified name of this loader.""" return f"{self.__class__.__module__}.{self.__class__.__name__}" def get_loader_version(self) -> str: """Returns the version of the current loader.""" module_name = self.__class__.__module__ or "" module_name_parts = module_name.split(".") # Iterate rootward through the package hierarchy until we find a parent of this # loader's module with a __version__ attribute. for prefix_size in range(len(module_name_parts), 0, -1): package_name = ".".join(module_name_parts[0:prefix_size]) module = sys.modules[package_name] if hasattr(module, "__version__"): return module.__version__ # type: ignore # If this loader's class has no parent package with a __version__, # it should implement it itself. raise NotImplementedError( f"Could not dynamically find the version of {self.get_loader_name()}." ) def get_metadata_fetcher(self) -> MetadataFetcher: """Returns a MetadataFetcher instance representing this package loader; which is used to for adding provenance information to extracted extrinsic metadata, if any.""" return MetadataFetcher( name=self.get_loader_name(), version=self.get_loader_version(), metadata={}, ) def get_metadata_authority(self) -> MetadataAuthority: """For package loaders that get extrinsic metadata, returns the authority the metadata are coming from. """ raise NotImplementedError("get_metadata_authority") def get_extrinsic_origin_metadata(self) -> List[RawExtrinsicMetadataCore]: """Returns metadata items, used by build_extrinsic_origin_metadata.""" return [] def build_extrinsic_origin_metadata(self) -> List[RawExtrinsicMetadata]: """Builds a list of full RawExtrinsicMetadata objects, using metadata returned by get_extrinsic_origin_metadata.""" metadata_items = self.get_extrinsic_origin_metadata() if not metadata_items: # If this package loader doesn't write metadata, no need to require # an implementation for get_metadata_authority. return [] authority = self.get_metadata_authority() fetcher = self.get_metadata_fetcher() metadata_objects = [] for item in metadata_items: metadata_objects.append( RawExtrinsicMetadata( target=Origin(self.url).swhid(), discovery_date=item.discovery_date or self.visit_date, authority=authority, fetcher=fetcher, format=item.format, metadata=item.metadata, ) ) return metadata_objects def get_extrinsic_snapshot_metadata(self) -> List[RawExtrinsicMetadataCore]: """Returns metadata items, used by build_extrinsic_snapshot_metadata.""" return [] def build_extrinsic_snapshot_metadata( self, snapshot_id: Sha1Git ) -> List[RawExtrinsicMetadata]: """Builds a list of full RawExtrinsicMetadata objects, using metadata returned by get_extrinsic_snapshot_metadata.""" metadata_items = self.get_extrinsic_snapshot_metadata() if not metadata_items: # If this package loader doesn't write metadata, no need to require # an implementation for get_metadata_authority. return [] authority = self.get_metadata_authority() fetcher = self.get_metadata_fetcher() metadata_objects = [] for item in metadata_items: metadata_objects.append( RawExtrinsicMetadata( target=ExtendedSWHID( object_type=ExtendedObjectType.SNAPSHOT, object_id=snapshot_id ), discovery_date=item.discovery_date or self.visit_date, authority=authority, fetcher=fetcher, format=item.format, metadata=item.metadata, origin=self.url, ) ) return metadata_objects def build_extrinsic_directory_metadata( self, p_info: TPackageInfo, revision_id: Sha1Git, directory_id: Sha1Git, ) -> List[RawExtrinsicMetadata]: if not p_info.directory_extrinsic_metadata: # If this package loader doesn't write metadata, no need to require # an implementation for get_metadata_authority. return [] authority = self.get_metadata_authority() fetcher = self.get_metadata_fetcher() metadata_objects = [] for item in p_info.directory_extrinsic_metadata: metadata_objects.append( RawExtrinsicMetadata( target=ExtendedSWHID( object_type=ExtendedObjectType.DIRECTORY, object_id=directory_id ), discovery_date=item.discovery_date or self.visit_date, authority=authority, fetcher=fetcher, format=item.format, metadata=item.metadata, origin=self.url, revision=CoreSWHID( object_type=ObjectType.REVISION, object_id=revision_id ), ) ) return metadata_objects def _load_extrinsic_directory_metadata( self, p_info: TPackageInfo, revision_id: Sha1Git, directory_id: Sha1Git, ) -> None: metadata_objects = self.build_extrinsic_directory_metadata( p_info, revision_id, directory_id ) self._load_metadata_objects(metadata_objects) def _load_metadata_objects( self, metadata_objects: List[RawExtrinsicMetadata] ) -> None: if not metadata_objects: # If this package loader doesn't write metadata, no need to require # an implementation for get_metadata_authority. return self._create_authorities(mo.authority for mo in metadata_objects) self._create_fetchers(mo.fetcher for mo in metadata_objects) self.storage.raw_extrinsic_metadata_add(metadata_objects) def _create_authorities(self, authorities: Iterable[MetadataAuthority]) -> None: deduplicated_authorities = { (authority.type, authority.url): authority for authority in authorities } if authorities: self.storage.metadata_authority_add(list(deduplicated_authorities.values())) def _create_fetchers(self, fetchers: Iterable[MetadataFetcher]) -> None: deduplicated_fetchers = { (fetcher.name, fetcher.version): fetcher for fetcher in fetchers } if fetchers: self.storage.metadata_fetcher_add(list(deduplicated_fetchers.values())) diff --git a/swh/loader/package/tests/test_loader.py b/swh/loader/package/tests/test_loader.py index ac9ee79..2403e78 100644 --- a/swh/loader/package/tests/test_loader.py +++ b/swh/loader/package/tests/test_loader.py @@ -1,87 +1,90 @@ # Copyright (C) 2019-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import hashlib +import string + import attr import pytest from swh.loader.package.loader import BasePackageInfo, PackageLoader class FakeStorage: def origin_add(self, origins): raise ValueError("We refuse to add an origin") def origin_visit_get_latest(self, origin): return None class FakeStorage2(FakeStorage): def origin_add(self, origins): pass def origin_visit_add(self, visits): raise ValueError("We refuse to add an origin visit") def test_loader_origin_visit_failure(swh_storage): """Failure to add origin or origin visit should failed immediately """ loader = PackageLoader(swh_storage, "some-url") loader.storage = FakeStorage() actual_load_status = loader.load() assert actual_load_status == {"status": "failed"} loader.storage = FakeStorage2() actual_load_status2 = loader.load() assert actual_load_status2 == {"status": "failed"} -def test_artifact_identity(): +def test_extid(): """Compute primary key should return the right identity """ @attr.s class TestPackageInfo(BasePackageInfo): a = attr.ib() b = attr.ib() length = attr.ib() filename = attr.ib() version = attr.ib() - ID_KEYS = ["a", "b"] + MANIFEST_FORMAT = string.Template("$a $b") p_info = TestPackageInfo( url="http://example.org/", a=1, b=2, length=221837, filename="8sync-0.1.0.tar.gz", version="0.1.0", ) - actual_id = p_info.artifact_identity() - assert actual_id == [1, 2] + actual_id = p_info.extid() + assert actual_id == hashlib.sha256(b"1 2").digest() def test_no_env_swh_config_filename_raise(monkeypatch): """No SWH_CONFIG_FILENAME environment variable makes package loader init raise """ class DummyPackageLoader(PackageLoader): """A dummy package loader for test purpose""" pass monkeypatch.delenv("SWH_CONFIG_FILENAME", raising=False) with pytest.raises( AssertionError, match="SWH_CONFIG_FILENAME environment variable is undefined" ): DummyPackageLoader.from_configfile(url="some-url")