diff --git a/swh/loader/package/loader.py b/swh/loader/package/loader.py index 622d56b..edef327 100644 --- a/swh/loader/package/loader.py +++ b/swh/loader/package/loader.py @@ -1,489 +1,492 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import logging import tempfile import os from typing import Any, Dict, Generator, List, Mapping, Optional, Sequence, Tuple import attr import sentry_sdk from swh.core.tarball import uncompress from swh.core.config import SWHConfig from swh.model import from_disk from swh.model.hashutil import hash_to_hex from swh.model.model import ( BaseModel, Sha1Git, Content, SkippedContent, Directory, Revision, TargetType, Snapshot, Origin, ) from swh.storage import get_storage from swh.storage.algos.snapshot import snapshot_get_all_branches from swh.loader.package.utils import download logger = logging.getLogger(__name__) class PackageLoader: # Origin visit type (str) set by the loader visit_type = "" def __init__(self, url): """Loader's constructor. This raises exception if the minimal required configuration is missing (cf. fn:`check` method). Args: url (str): Origin url to load data from """ # This expects to use the environment variable SWH_CONFIG_FILENAME self.config = SWHConfig.parse_config_file() self._check_configuration() self.storage = get_storage(**self.config["storage"]) self.url = url self.visit_date = datetime.datetime.now(tz=datetime.timezone.utc) self.max_content_size = self.config["max_content_size"] def _check_configuration(self): """Checks the minimal configuration required is set for the loader. If some required configuration is missing, exception detailing the issue is raised. """ if "storage" not in self.config: raise ValueError("Misconfiguration, at least the storage key should be set") def get_versions(self) -> Sequence[str]: """Return the list of all published package versions. Returns: Sequence of published versions """ return [] def get_package_info( self, version: str ) -> Generator[Tuple[str, Mapping[str, Any]], None, None]: """Given a release version of a package, retrieve the associated package information for such version. Args: version: Package version Returns: (branch name, package metadata) """ yield from {} def build_revision( self, a_metadata: Dict, uncompressed_path: str, directory: Sha1Git ) -> Optional[Revision]: """Build the revision from the archive metadata (extrinsic artifact metadata) and the intrinsic metadata. Args: a_metadata: Artifact metadata uncompressed_path: Artifact uncompressed path on disk Returns: SWH data dict """ raise NotImplementedError("build_revision") def get_default_version(self) -> str: """Retrieve the latest release version if any. Returns: Latest version """ return "" def last_snapshot(self) -> Optional[Snapshot]: """Retrieve the last snapshot """ snapshot = None visit = self.storage.origin_visit_get_latest(self.url, require_snapshot=True) if visit and visit.get("snapshot"): snapshot = Snapshot.from_dict( snapshot_get_all_branches(self.storage, visit["snapshot"]) ) return snapshot def known_artifacts(self, snapshot: Optional[Snapshot]) -> Dict[Sha1Git, BaseModel]: """Retrieve the known releases/artifact for the origin. Args snapshot: snapshot for the visit Returns: Dict of keys revision id (bytes), values a metadata Dict. """ if not snapshot: return {} # retrieve only revisions (e.g the alias we do not want here) revs = [ rev.target for rev in snapshot.branches.values() if rev and rev.target_type == TargetType.REVISION ] known_revisions = self.storage.revision_get(revs) ret = {} for revision in known_revisions: if not revision: # revision_get can return None continue ret[revision["id"]] = revision["metadata"] return ret def resolve_revision_from( self, known_artifacts: Dict, artifact_metadata: Dict ) -> Optional[bytes]: """Resolve the revision from a snapshot and an artifact metadata dict. If the artifact has already been downloaded, this will return the existing revision targeting that uncompressed artifact directory. Otherwise, this returns None. Args: snapshot: Snapshot artifact_metadata: Information dict Returns: None or revision identifier """ return None def download_package( self, p_info: Mapping[str, Any], tmpdir: str ) -> List[Tuple[str, Mapping]]: """Download artifacts for a specific package. All downloads happen in in the tmpdir folder. Default implementation expects the artifacts package info to be about one artifact per package. Note that most implementation have 1 artifact per package. But some implementation have multiple artifacts per package (debian), some have none, the package is the artifact (gnu). Args: artifacts_package_info: Information on the package artifacts to download (url, filename, etc...) tmpdir: Location to retrieve such artifacts Returns: List of (path, computed hashes) """ a_uri = p_info["url"] filename = p_info.get("filename") return [download(a_uri, dest=tmpdir, filename=filename)] def uncompress( self, dl_artifacts: List[Tuple[str, Mapping[str, Any]]], dest: str ) -> str: """Uncompress the artifact(s) in the destination folder dest. Optionally, this could need to use the p_info dict for some more information (debian). """ uncompressed_path = os.path.join(dest, "src") for a_path, _ in dl_artifacts: uncompress(a_path, dest=uncompressed_path) return uncompressed_path def extra_branches(self) -> Dict[bytes, Mapping[str, Any]]: """Return an extra dict of branches that are used to update the set of branches. """ return {} def load(self) -> Dict: """Load for a specific origin the associated contents. for each package version of the origin 1. Fetch the files for one package version By default, this can be implemented as a simple HTTP request. Loaders with more specific requirements can override this, e.g.: the PyPI loader checks the integrity of the downloaded files; the Debian loader has to download and check several files for one package version. 2. Extract the downloaded files By default, this would be a universal archive/tarball extraction. Loaders for specific formats can override this method (for instance, the Debian loader uses dpkg-source -x). 3. Convert the extracted directory to a set of Software Heritage objects Using swh.model.from_disk. 4. Extract the metadata from the unpacked directories This would only be applicable for "smart" loaders like npm (parsing the package.json), PyPI (parsing the PKG-INFO file) or Debian (parsing debian/changelog and debian/control). On "minimal-metadata" sources such as the GNU archive, the lister should provide the minimal set of metadata needed to populate the revision/release objects (authors, dates) as an argument to the task. 5. Generate the revision/release objects for the given version. From the data generated at steps 3 and 4. end for each 6. Generate and load the snapshot for the visit Using the revisions/releases collected at step 5., and the branch information from step 0., generate a snapshot and load it into the Software Heritage archive """ status_load = "uneventful" # either: eventful, uneventful, failed status_visit = "full" # either: partial, full tmp_revisions = {} # type: Dict[str, List] snapshot = None def finalize_visit() -> Dict[str, Any]: """Finalize the visit: - flush eventual unflushed data to storage - update origin visit's status - return the task's status """ self.storage.flush() self.storage.origin_visit_update( origin=self.url, visit_id=visit.visit, status=status_visit, snapshot=snapshot and snapshot.id, ) result: Dict[str, Any] = { "status": status_load, } if snapshot: result["snapshot_id"] = hash_to_hex(snapshot.id) return result # Prepare origin and origin_visit origin = Origin(url=self.url) try: self.storage.origin_add_one(origin) visit = self.storage.origin_visit_add( self.url, date=self.visit_date, type=self.visit_type ) except Exception as e: logger.exception("Failed to initialize origin_visit for %s", self.url) sentry_sdk.capture_exception(e) return {"status": "failed"} try: last_snapshot = self.last_snapshot() logger.debug("last snapshot: %s", last_snapshot) known_artifacts = self.known_artifacts(last_snapshot) logger.debug("known artifacts: %s", known_artifacts) except Exception as e: logger.exception("Failed to get previous state for %s", self.url) sentry_sdk.capture_exception(e) status_visit = "partial" status_load = "failed" return finalize_visit() - load_exceptions = [] + load_exceptions: List[Exception] = [] for version in self.get_versions(): # for each logger.debug("version: %s", version) tmp_revisions[version] = [] # `p_` stands for `package_` for branch_name, p_info in self.get_package_info(version): logger.debug("package_info: %s", p_info) revision_id = self.resolve_revision_from(known_artifacts, p_info["raw"]) if revision_id is None: try: revision_id = self._load_revision(p_info, origin) - status_load = "eventful" except Exception as e: + self.storage.clear_buffers() load_exceptions.append(e) sentry_sdk.capture_exception(e) logger.exception( "Failed loading branch %s for %s", branch_name, self.url ) continue + else: + # Flush loaded artifacts objects to storage + self.storage.flush() + status_load = "eventful" if revision_id is None: continue tmp_revisions[version].append((branch_name, revision_id)) if load_exceptions: status_visit = "partial" if not tmp_revisions: # We could not load any revisions; fail completely status_visit = "partial" status_load = "failed" return finalize_visit() try: # Retrieve the default release version (the "latest" one) default_version = self.get_default_version() logger.debug("default version: %s", default_version) # Retrieve extra branches extra_branches = self.extra_branches() logger.debug("extra branches: %s", extra_branches) snapshot = self._load_snapshot( default_version, tmp_revisions, extra_branches ) except Exception as e: logger.exception("Failed to build snapshot for origin %s", self.url) sentry_sdk.capture_exception(e) status_visit = "partial" status_load = "failed" return finalize_visit() def _load_revision(self, p_info, origin) -> Optional[Sha1Git]: """Does all the loading of a revision itself: * downloads a package and uncompresses it * loads it from disk * adds contents, directories, and revision to self.storage * returns (revision_id, loaded) Raises exception when unable to download or uncompress artifacts """ with tempfile.TemporaryDirectory() as tmpdir: dl_artifacts = self.download_package(p_info, tmpdir) uncompressed_path = self.uncompress(dl_artifacts, dest=tmpdir) logger.debug("uncompressed_path: %s", uncompressed_path) directory = from_disk.Directory.from_disk( path=uncompressed_path.encode("utf-8"), max_content_length=self.max_content_size, ) contents: List[Content] = [] skipped_contents: List[SkippedContent] = [] directories: List[Directory] = [] for obj in directory.iter_tree(): obj = obj.to_model() if isinstance(obj, Content): # FIXME: read the data from disk later (when the # storage buffer is flushed). obj = obj.with_data() contents.append(obj) elif isinstance(obj, SkippedContent): skipped_contents.append(obj) elif isinstance(obj, Directory): directories.append(obj) else: raise TypeError(f"Unexpected content type from disk: {obj}") logger.debug("Number of skipped contents: %s", len(skipped_contents)) self.storage.skipped_content_add(skipped_contents) logger.debug("Number of contents: %s", len(contents)) self.storage.content_add(contents) logger.debug("Number of directories: %s", len(directories)) self.storage.directory_add(directories) # FIXME: This should be release. cf. D409 revision = self.build_revision( p_info["raw"], uncompressed_path, directory=directory.hash ) if not revision: # Some artifacts are missing intrinsic metadata # skipping those return None metadata = revision.metadata or {} metadata.update( {"original_artifact": [hashes for _, hashes in dl_artifacts],} ) revision = attr.evolve(revision, metadata=metadata) logger.debug("Revision: %s", revision) self.storage.revision_add([revision]) - return revision.id def _load_snapshot( self, default_version: str, revisions: Dict[str, List[Tuple[str, bytes]]], extra_branches: Dict[bytes, Mapping[str, Any]], ) -> Optional[Snapshot]: """Build snapshot out of the current revisions stored and extra branches. Then load it in the storage. """ logger.debug("revisions: %s", revisions) # Build and load the snapshot branches = {} # type: Dict[bytes, Mapping[str, Any]] for version, branch_name_revisions in revisions.items(): if version == default_version and len(branch_name_revisions) == 1: # only 1 branch (no ambiguity), we can create an alias # branch 'HEAD' branch_name, _ = branch_name_revisions[0] # except for some corner case (deposit) if branch_name != "HEAD": branches[b"HEAD"] = { "target_type": "alias", "target": branch_name.encode("utf-8"), } for branch_name, target in branch_name_revisions: branches[branch_name.encode("utf-8")] = { "target_type": "revision", "target": target, } # Deal with extra-branches for name, branch_target in extra_branches.items(): if name in branches: logger.error("Extra branch '%s' has been ignored", name) else: branches[name] = branch_target snapshot_data = {"branches": branches} logger.debug("snapshot: %s", snapshot_data) snapshot = Snapshot.from_dict(snapshot_data) logger.debug("snapshot: %s", snapshot) self.storage.snapshot_add([snapshot]) return snapshot diff --git a/swh/loader/package/nixguix/tests/test_nixguix.py b/swh/loader/package/nixguix/tests/test_nixguix.py index 503081a..32f22f9 100644 --- a/swh/loader/package/nixguix/tests/test_nixguix.py +++ b/swh/loader/package/nixguix/tests/test_nixguix.py @@ -1,304 +1,368 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest + +from typing import Dict, Optional, Tuple + from json.decoder import JSONDecodeError from swh.loader.package.nixguix.loader import ( NixGuixLoader, retrieve_sources, clean_sources, ) from swh.loader.package.tests.common import get_stats, check_snapshot +from swh.loader.package.utils import download +from swh.storage.exc import HashCollision sources_url = "https://nix-community.github.io/nixpkgs-swh/sources.json" def test_retrieve_sources(swh_config, requests_mock_datadir): j = retrieve_sources(sources_url) assert "sources" in j.keys() assert len(j["sources"]) == 2 def test_retrieve_non_existing(swh_config, requests_mock_datadir): with pytest.raises(ValueError): NixGuixLoader("https://non-existing-url") def test_retrieve_non_json(swh_config, requests_mock_datadir): with pytest.raises(JSONDecodeError): NixGuixLoader("https://example.com/file.txt") def test_clean_sources_invalid_schema(swh_config, requests_mock_datadir): sources = {} with pytest.raises(ValueError, match="sources structure invalid, missing: .*"): clean_sources(sources) def test_clean_sources_invalid_version(swh_config, requests_mock_datadir): sources = {"version": 2, "sources": [], "revision": "my-revision"} with pytest.raises( ValueError, match="sources structure version .* is not supported" ): clean_sources(sources) def test_clean_sources_invalid_sources(swh_config, requests_mock_datadir): sources = { "version": 1, "sources": [ # Valid source {"type": "url", "urls": ["my-url"], "integrity": "my-integrity"}, # integrity is missing {"type": "url", "urls": ["my-url"],}, # urls is not a list {"type": "url", "urls": "my-url", "integrity": "my-integrity"}, # type is not url {"type": "git", "urls": ["my-url"], "integrity": "my-integrity"}, ], "revision": "my-revision", } clean = clean_sources(sources) assert len(clean["sources"]) == 1 def test_loader_one_visit(swh_config, requests_mock_datadir): loader = NixGuixLoader(sources_url) res = loader.load() assert res["status"] == "eventful" stats = get_stats(loader.storage) assert { "content": 1, "directory": 3, "origin": 1, "origin_visit": 1, "person": 1, "release": 0, "revision": 1, "skipped_content": 0, "snapshot": 1, } == stats origin_visit = loader.storage.origin_visit_get_latest(sources_url) # The visit is partial because urls pointing to non tarball file # are not handled yet assert origin_visit["status"] == "partial" assert origin_visit["type"] == "nixguix" def test_uncompress_failure(swh_config, requests_mock_datadir): """Non tarball files are currently not supported and the uncompress function fails on such kind of files. However, even in this case of failure (because of the url https://example.com/file.txt), a snapshot and a visit has to be created (with a status partial since all files are not archived). """ loader = NixGuixLoader(sources_url) loader_status = loader.load() urls = [s["urls"][0] for s in loader.sources] assert "https://example.com/file.txt" in urls assert loader_status["status"] == "eventful" origin_visit = loader.storage.origin_visit_get_latest(sources_url) # The visit is partial because urls pointing to non tarball files # are not handled yet assert origin_visit["status"] == "partial" def test_loader_incremental(swh_config, requests_mock_datadir): """Ensure a second visit do not download artifact already downloaded by the previous visit. """ loader = NixGuixLoader(sources_url) load_status = loader.load() loader.load() expected_snapshot_id = "0c5881c74283793ebe9a09a105a9381e41380383" assert load_status == {"status": "eventful", "snapshot_id": expected_snapshot_id} expected_branches = { "evaluation": { "target": "cc4e04c26672dd74e5fd0fecb78b435fb55368f7", "target_type": "revision", }, "https://github.com/owner-1/repository-1/revision-1.tgz": { "target": "488ad4e7b8e2511258725063cf43a2b897c503b4", "target_type": "revision", }, } expected_snapshot = { "id": expected_snapshot_id, "branches": expected_branches, } check_snapshot(expected_snapshot, storage=loader.storage) urls = [ m.url for m in requests_mock_datadir.request_history if m.url == ("https://github.com/owner-1/repository-1/revision-1.tgz") ] # The artifact # 'https://github.com/owner-1/repository-1/revision-1.tgz' is only # visited one time assert len(urls) == 1 def test_loader_two_visits(swh_config, requests_mock_datadir_visits): """To ensure there is only one origin, but two visits, two revisions and two snapshots are created. The first visit creates a snapshot containing one tarball. The second visit creates a snapshot containing the same tarball and another tarball. """ loader = NixGuixLoader(sources_url) load_status = loader.load() expected_snapshot_id = "0c5881c74283793ebe9a09a105a9381e41380383" assert load_status == {"status": "eventful", "snapshot_id": expected_snapshot_id} expected_branches = { "evaluation": { "target": "cc4e04c26672dd74e5fd0fecb78b435fb55368f7", "target_type": "revision", }, "https://github.com/owner-1/repository-1/revision-1.tgz": { "target": "488ad4e7b8e2511258725063cf43a2b897c503b4", "target_type": "revision", }, } expected_snapshot = { "id": expected_snapshot_id, "branches": expected_branches, } check_snapshot(expected_snapshot, storage=loader.storage) stats = get_stats(loader.storage) assert { "content": 1, "directory": 3, "origin": 1, "origin_visit": 1, "person": 1, "release": 0, "revision": 1, "skipped_content": 0, "snapshot": 1, } == stats loader = NixGuixLoader(sources_url) load_status = loader.load() expected_snapshot_id = "b0bfa75cbd0cc90aac3b9e95fb0f59c731176d97" assert load_status == {"status": "eventful", "snapshot_id": expected_snapshot_id} # This ensures visits are incremental. Indeed, if we request a # second time an url, because of the requests_mock_datadir_visits # fixture, the file has to end with `_visit1`. expected_branches = { "evaluation": { "target": "602140776b2ce6c9159bcf52ada73a297c063d5e", "target_type": "revision", }, "https://github.com/owner-1/repository-1/revision-1.tgz": { "target": "488ad4e7b8e2511258725063cf43a2b897c503b4", "target_type": "revision", }, "https://github.com/owner-2/repository-1/revision-1.tgz": { "target": "85e0bad74e33e390aaeb74f139853ae3863ee544", "target_type": "revision", }, } expected_snapshot = { "id": expected_snapshot_id, "branches": expected_branches, } check_snapshot(expected_snapshot, storage=loader.storage) stats = get_stats(loader.storage) assert { "content": 2, "directory": 5, "origin": 1, "origin_visit": 2, "person": 1, "release": 0, "revision": 2, "skipped_content": 0, "snapshot": 2, } == stats def test_resolve_revision_from(swh_config, requests_mock_datadir): loader = NixGuixLoader(sources_url) known_artifacts = { "id1": {"extrinsic": {"raw": {"url": "url1", "integrity": "integrity1"}}}, "id2": {"extrinsic": {"raw": {"url": "url2", "integrity": "integrity2"}}}, } metadata = {"url": "url1", "integrity": "integrity1"} assert loader.resolve_revision_from(known_artifacts, metadata) == "id1" metadata = {"url": "url3", "integrity": "integrity3"} assert loader.resolve_revision_from(known_artifacts, metadata) == None # noqa def test_evaluation_branch(swh_config, requests_mock_datadir): loader = NixGuixLoader(sources_url) res = loader.load() assert res["status"] == "eventful" expected_branches = { "https://github.com/owner-1/repository-1/revision-1.tgz": { "target": "488ad4e7b8e2511258725063cf43a2b897c503b4", "target_type": "revision", }, "evaluation": { "target": "cc4e04c26672dd74e5fd0fecb78b435fb55368f7", "target_type": "revision", }, } expected_snapshot = { "id": "0c5881c74283793ebe9a09a105a9381e41380383", "branches": expected_branches, } check_snapshot(expected_snapshot, storage=loader.storage) def test_eoferror(swh_config, requests_mock_datadir): """Load a truncated archive which is invalid to make the uncompress function raising the exception EOFError. We then check if a snapshot is created, meaning this error is well managed. """ sources = ( "https://nix-community.github.io/nixpkgs-swh/sources-EOFError.json" # noqa ) loader = NixGuixLoader(sources) loader.load() expected_branches = { "evaluation": { "target": "cc4e04c26672dd74e5fd0fecb78b435fb55368f7", "target_type": "revision", }, } expected_snapshot = { "id": "4257fa2350168c6bfec726a06452ea27a2c0cb33", "branches": expected_branches, } check_snapshot(expected_snapshot, storage=loader.storage) + + +def fake_download( + url: str, + dest: str, + hashes: Dict = {}, + filename: Optional[str] = None, + auth: Optional[Tuple[str, str]] = None, +) -> Tuple[str, Dict]: + """Fake download which raises HashCollision (for the sake of test simpliciy, + let's accept that makes sense) + + For tests purpose only. + + """ + if url == "https://example.com/file.txt": + # instead of failing because it's a file not dealt with by the nix guix + # loader, make it raise a hash collision + raise HashCollision("sha1", "f92d74e3874587aaf443d1db961d4e26dde13e9c", []) + return download(url, dest, hashes, filename, auth) + + +def test_raise_exception(swh_config, requests_mock_datadir, mocker): + mock_download = mocker.patch("swh.loader.package.loader.download") + mock_download.side_effect = fake_download + + loader = NixGuixLoader(sources_url) + res = loader.load() + + expected_snapshot_id = "0c5881c74283793ebe9a09a105a9381e41380383" + assert res == { + "status": "eventful", + "snapshot_id": expected_snapshot_id, + } + + expected_branches = { + "https://github.com/owner-1/repository-1/revision-1.tgz": { + "target": "488ad4e7b8e2511258725063cf43a2b897c503b4", + "target_type": "revision", + }, + "evaluation": { + "target": "cc4e04c26672dd74e5fd0fecb78b435fb55368f7", + "target_type": "revision", + }, + } + expected_snapshot = { + "id": expected_snapshot_id, + "branches": expected_branches, + } + + check_snapshot(expected_snapshot, storage=loader.storage) + + assert len(mock_download.mock_calls) == 2 + + origin_visit = loader.storage.origin_visit_get_latest(sources_url) + + # The visit is partial because some hash collision were detected + assert origin_visit["status"] == "partial" + assert origin_visit["type"] == "nixguix"