diff --git a/swh/loader/package/loader.py b/swh/loader/package/loader.py index ef2cdb2..d9b04a7 100644 --- a/swh/loader/package/loader.py +++ b/swh/loader/package/loader.py @@ -1,519 +1,638 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import logging import tempfile import os - +import sys from typing import ( Any, Dict, Iterator, Generic, List, Mapping, Optional, Sequence, Tuple, TypeVar, ) import attr import sentry_sdk from swh.core.tarball import uncompress from swh.core.config import SWHConfig from swh.model import from_disk from swh.model.collections import ImmutableDict from swh.model.hashutil import hash_to_hex from swh.model.model import ( BaseModel, Sha1Git, Revision, TargetType, Snapshot, Origin, OriginVisit, OriginVisitStatus, + MetadataAuthority, + MetadataFetcher, + MetadataTargetType, + RawExtrinsicMetadata, ) +from swh.model.identifiers import SWHID from swh.storage import get_storage from swh.storage.utils import now from swh.storage.algos.snapshot import snapshot_get_latest from swh.loader.package.utils import download logger = logging.getLogger(__name__) @attr.s class BasePackageInfo: """Compute the primary key for a dict using the id_keys as primary key composite. Args: d: A dict entry to compute the primary key on id_keys: Sequence of keys to use as primary key Returns: The identity for that dict entry """ url = attr.ib(type=str) filename = attr.ib(type=Optional[str]) + # The following attribute has kw_only=True in order to allow subclasses + # to add attributes. Without kw_only, attributes without default values cannot + # go after attributes with default values. + # See + + revision_extrinsic_metadata = attr.ib( + type=List[Tuple[datetime.datetime, str, bytes]], default=[], kw_only=True, + ) + """Tuple elements are respectively the 'discovery_date', 'format', + and 'metadata' fields of RawExtrinsicMetadata""" + + # TODO: add support for metadata for origins, directories, and contents + @property def ID_KEYS(self): raise NotImplementedError(f"{self.__class__.__name__} is missing ID_KEYS") def artifact_identity(self): return [getattr(self, k) for k in self.ID_KEYS] TPackageInfo = TypeVar("TPackageInfo", bound=BasePackageInfo) class PackageLoader(Generic[TPackageInfo]): # Origin visit type (str) set by the loader visit_type = "" + DEFAULT_CONFIG = { + "create_authorities": ("bool", True), + "create_fetchers": ("bool", True), + } + def __init__(self, url): """Loader's constructor. This raises exception if the minimal required configuration is missing (cf. fn:`check` method). Args: url (str): Origin url to load data from """ # This expects to use the environment variable SWH_CONFIG_FILENAME self.config = SWHConfig.parse_config_file() self._check_configuration() self.storage = get_storage(**self.config["storage"]) self.url = url self.visit_date = datetime.datetime.now(tz=datetime.timezone.utc) self.max_content_size = self.config["max_content_size"] def _check_configuration(self): """Checks the minimal configuration required is set for the loader. If some required configuration is missing, exception detailing the issue is raised. """ if "storage" not in self.config: raise ValueError("Misconfiguration, at least the storage key should be set") def get_versions(self) -> Sequence[str]: """Return the list of all published package versions. Returns: Sequence of published versions """ return [] def get_package_info(self, version: str) -> Iterator[Tuple[str, TPackageInfo]]: """Given a release version of a package, retrieve the associated package information for such version. Args: version: Package version Returns: (branch name, package metadata) """ yield from {} def build_revision( self, p_info: TPackageInfo, uncompressed_path: str, directory: Sha1Git ) -> Optional[Revision]: """Build the revision from the archive metadata (extrinsic artifact metadata) and the intrinsic metadata. Args: p_info: Package information uncompressed_path: Artifact uncompressed path on disk Returns: SWH data dict """ raise NotImplementedError("build_revision") def get_default_version(self) -> str: """Retrieve the latest release version if any. Returns: Latest version """ return "" def last_snapshot(self) -> Optional[Snapshot]: """Retrieve the last snapshot out of the last visit. """ return snapshot_get_latest(self.storage, self.url) def known_artifacts(self, snapshot: Optional[Snapshot]) -> Dict[Sha1Git, BaseModel]: """Retrieve the known releases/artifact for the origin. Args snapshot: snapshot for the visit Returns: Dict of keys revision id (bytes), values a metadata Dict. """ if not snapshot: return {} # retrieve only revisions (e.g the alias we do not want here) revs = [ rev.target for rev in snapshot.branches.values() if rev and rev.target_type == TargetType.REVISION ] known_revisions = self.storage.revision_get(revs) return { revision["id"]: revision["metadata"] for revision in known_revisions if revision } def resolve_revision_from( self, known_artifacts: Dict, p_info: TPackageInfo, ) -> Optional[bytes]: """Resolve the revision from a snapshot and an artifact metadata dict. If the artifact has already been downloaded, this will return the existing revision targeting that uncompressed artifact directory. Otherwise, this returns None. Args: snapshot: Snapshot p_info: Package information Returns: None or revision identifier """ return None def download_package( self, p_info: TPackageInfo, tmpdir: str ) -> List[Tuple[str, Mapping]]: """Download artifacts for a specific package. All downloads happen in in the tmpdir folder. Default implementation expects the artifacts package info to be about one artifact per package. Note that most implementation have 1 artifact per package. But some implementation have multiple artifacts per package (debian), some have none, the package is the artifact (gnu). Args: artifacts_package_info: Information on the package artifacts to download (url, filename, etc...) tmpdir: Location to retrieve such artifacts Returns: List of (path, computed hashes) """ return [download(p_info.url, dest=tmpdir, filename=p_info.filename)] def uncompress( self, dl_artifacts: List[Tuple[str, Mapping[str, Any]]], dest: str ) -> str: """Uncompress the artifact(s) in the destination folder dest. Optionally, this could need to use the p_info dict for some more information (debian). """ uncompressed_path = os.path.join(dest, "src") for a_path, _ in dl_artifacts: uncompress(a_path, dest=uncompressed_path) return uncompressed_path def extra_branches(self) -> Dict[bytes, Mapping[str, Any]]: """Return an extra dict of branches that are used to update the set of branches. """ return {} def load(self) -> Dict: """Load for a specific origin the associated contents. for each package version of the origin 1. Fetch the files for one package version By default, this can be implemented as a simple HTTP request. Loaders with more specific requirements can override this, e.g.: the PyPI loader checks the integrity of the downloaded files; the Debian loader has to download and check several files for one package version. 2. Extract the downloaded files By default, this would be a universal archive/tarball extraction. Loaders for specific formats can override this method (for instance, the Debian loader uses dpkg-source -x). 3. Convert the extracted directory to a set of Software Heritage objects Using swh.model.from_disk. 4. Extract the metadata from the unpacked directories This would only be applicable for "smart" loaders like npm (parsing the package.json), PyPI (parsing the PKG-INFO file) or Debian (parsing debian/changelog and debian/control). On "minimal-metadata" sources such as the GNU archive, the lister should provide the minimal set of metadata needed to populate the revision/release objects (authors, dates) as an argument to the task. 5. Generate the revision/release objects for the given version. From the data generated at steps 3 and 4. end for each 6. Generate and load the snapshot for the visit Using the revisions/releases collected at step 5., and the branch information from step 0., generate a snapshot and load it into the Software Heritage archive """ status_load = "uneventful" # either: eventful, uneventful, failed status_visit = "full" # either: partial, full tmp_revisions = {} # type: Dict[str, List] snapshot = None def finalize_visit() -> Dict[str, Any]: """Finalize the visit: - flush eventual unflushed data to storage - update origin visit's status - return the task's status """ self.storage.flush() snapshot_id: Optional[bytes] = None if snapshot and snapshot.id: # to prevent the snapshot.id to b"" snapshot_id = snapshot.id assert visit.visit visit_status = OriginVisitStatus( origin=self.url, visit=visit.visit, date=now(), status=status_visit, snapshot=snapshot_id, ) self.storage.origin_visit_status_add([visit_status]) result: Dict[str, Any] = { "status": status_load, } if snapshot_id: result["snapshot_id"] = hash_to_hex(snapshot_id) return result # Prepare origin and origin_visit origin = Origin(url=self.url) try: self.storage.origin_add([origin]) visit = self.storage.origin_visit_add( [ OriginVisit( origin=self.url, date=self.visit_date, type=self.visit_type, ) ] )[0] except Exception as e: logger.exception("Failed to initialize origin_visit for %s", self.url) sentry_sdk.capture_exception(e) return {"status": "failed"} try: last_snapshot = self.last_snapshot() logger.debug("last snapshot: %s", last_snapshot) known_artifacts = self.known_artifacts(last_snapshot) logger.debug("known artifacts: %s", known_artifacts) except Exception as e: logger.exception("Failed to get previous state for %s", self.url) sentry_sdk.capture_exception(e) status_visit = "partial" status_load = "failed" return finalize_visit() load_exceptions: List[Exception] = [] for version in self.get_versions(): # for each logger.debug("version: %s", version) tmp_revisions[version] = [] # `p_` stands for `package_` for branch_name, p_info in self.get_package_info(version): logger.debug("package_info: %s", p_info) revision_id = self.resolve_revision_from(known_artifacts, p_info) if revision_id is None: try: revision_id = self._load_revision(p_info, origin) + if revision_id: + self._load_extrinsic_revision_metadata(p_info, revision_id) self.storage.flush() status_load = "eventful" except Exception as e: self.storage.clear_buffers() load_exceptions.append(e) sentry_sdk.capture_exception(e) logger.exception( "Failed loading branch %s for %s", branch_name, self.url ) continue if revision_id is None: continue tmp_revisions[version].append((branch_name, revision_id)) if load_exceptions: status_visit = "partial" if not tmp_revisions: # We could not load any revisions; fail completely status_visit = "partial" status_load = "failed" return finalize_visit() try: # Retrieve the default release version (the "latest" one) default_version = self.get_default_version() logger.debug("default version: %s", default_version) # Retrieve extra branches extra_branches = self.extra_branches() logger.debug("extra branches: %s", extra_branches) snapshot = self._load_snapshot( default_version, tmp_revisions, extra_branches ) except Exception as e: logger.exception("Failed to build snapshot for origin %s", self.url) sentry_sdk.capture_exception(e) status_visit = "partial" status_load = "failed" return finalize_visit() def _load_revision(self, p_info: TPackageInfo, origin) -> Optional[Sha1Git]: """Does all the loading of a revision itself: * downloads a package and uncompresses it * loads it from disk * adds contents, directories, and revision to self.storage * returns (revision_id, loaded) Raises exception when unable to download or uncompress artifacts """ with tempfile.TemporaryDirectory() as tmpdir: dl_artifacts = self.download_package(p_info, tmpdir) uncompressed_path = self.uncompress(dl_artifacts, dest=tmpdir) logger.debug("uncompressed_path: %s", uncompressed_path) directory = from_disk.Directory.from_disk( path=uncompressed_path.encode("utf-8"), max_content_length=self.max_content_size, ) contents, skipped_contents, directories = from_disk.iter_directory( directory ) logger.debug("Number of skipped contents: %s", len(skipped_contents)) self.storage.skipped_content_add(skipped_contents) logger.debug("Number of contents: %s", len(contents)) self.storage.content_add(contents) logger.debug("Number of directories: %s", len(directories)) self.storage.directory_add(directories) # FIXME: This should be release. cf. D409 revision = self.build_revision( p_info, uncompressed_path, directory=directory.hash ) if not revision: # Some artifacts are missing intrinsic metadata # skipping those return None extra_metadata: Tuple[str, Any] = ( "original_artifact", [hashes for _, hashes in dl_artifacts], ) if revision.metadata is not None: full_metadata = list(revision.metadata.items()) + [extra_metadata] else: full_metadata = [extra_metadata] revision = attr.evolve(revision, metadata=ImmutableDict(full_metadata)) logger.debug("Revision: %s", revision) self.storage.revision_add([revision]) return revision.id def _load_snapshot( self, default_version: str, revisions: Dict[str, List[Tuple[str, bytes]]], extra_branches: Dict[bytes, Mapping[str, Any]], ) -> Optional[Snapshot]: """Build snapshot out of the current revisions stored and extra branches. Then load it in the storage. """ logger.debug("revisions: %s", revisions) # Build and load the snapshot branches = {} # type: Dict[bytes, Mapping[str, Any]] for version, branch_name_revisions in revisions.items(): if version == default_version and len(branch_name_revisions) == 1: # only 1 branch (no ambiguity), we can create an alias # branch 'HEAD' branch_name, _ = branch_name_revisions[0] # except for some corner case (deposit) if branch_name != "HEAD": branches[b"HEAD"] = { "target_type": "alias", "target": branch_name.encode("utf-8"), } for branch_name, target in branch_name_revisions: branches[branch_name.encode("utf-8")] = { "target_type": "revision", "target": target, } # Deal with extra-branches for name, branch_target in extra_branches.items(): if name in branches: logger.error("Extra branch '%s' has been ignored", name) else: branches[name] = branch_target snapshot_data = {"branches": branches} logger.debug("snapshot: %s", snapshot_data) snapshot = Snapshot.from_dict(snapshot_data) logger.debug("snapshot: %s", snapshot) self.storage.snapshot_add([snapshot]) return snapshot + + def get_loader_name(self) -> str: + """Returns a fully qualified name of this loader.""" + return f"{self.__class__.__module__}.{self.__class__.__name__}" + + def get_loader_version(self) -> str: + """Returns the version of the current loader.""" + module_name = self.__class__.__module__ or "" + module_name_parts = module_name.split(".") + + # Iterate rootward through the package hierarchy until we find a parent of this + # loader's module with a __version__ attribute. + for prefix_size in range(len(module_name_parts), 0, -1): + package_name = ".".join(module_name_parts[0:prefix_size]) + module = sys.modules[package_name] + if hasattr(module, "__version__"): + return module.__version__ # type: ignore + + # If this loader's class has no parent package with a __version__, + # it should implement it itself. + raise NotImplementedError( + f"Could not dynamically find the version of {self.get_loader_name()}." + ) + + def get_metadata_fetcher(self) -> MetadataFetcher: + """Returns a MetadataFetcher instance representing this package loader; + which is used to for adding provenance information to extracted + extrinsic metadata, if any.""" + return MetadataFetcher( + name=self.get_loader_name(), version=self.get_loader_version(), metadata={}, + ) + + def get_metadata_authority(self) -> MetadataAuthority: + """For package loaders that get extrinsic metadata, returns the authority + the metadata are coming from. + """ + raise NotImplementedError("get_metadata_authority") + + def build_extrinsic_revision_metadata( + self, p_info: TPackageInfo, revision_id: Sha1Git + ) -> List[RawExtrinsicMetadata]: + if not p_info.revision_extrinsic_metadata: + # If this package loader doesn't write metadata, no need to require + # an implementation for get_metadata_authority. + return [] + + authority = self.get_metadata_authority() + fetcher = self.get_metadata_fetcher() + + metadata_objects = [] + + for (discovery_date, format, metadata) in p_info.revision_extrinsic_metadata: + metadata_objects.append( + RawExtrinsicMetadata( + type=MetadataTargetType.REVISION, + id=SWHID(object_type="revision", object_id=revision_id), + discovery_date=discovery_date, + authority=authority, + fetcher=fetcher, + format=format, + metadata=metadata, + origin=self.url, + ) + ) + + return metadata_objects + + def _load_extrinsic_revision_metadata( + self, p_info: TPackageInfo, revision_id: Sha1Git + ) -> None: + metadata_objects = self.build_extrinsic_revision_metadata(p_info, revision_id) + + authorities = { + ( + metadata_object.authority.type, + metadata_object.authority.url, + ): metadata_object.authority + for metadata_object in metadata_objects + } + if authorities: + self.storage.metadata_authority_add(authorities.values()) + + fetchers = { + ( + metadata_object.fetcher.name, + metadata_object.fetcher.version, + ): metadata_object.fetcher + for metadata_object in metadata_objects + } + if fetchers: + self.storage.metadata_fetcher_add(fetchers.values()) + + if metadata_objects: + self.storage.object_metadata_add(metadata_objects) diff --git a/swh/loader/package/tests/test_loader_metadata.py b/swh/loader/package/tests/test_loader_metadata.py new file mode 100644 index 0000000..a60a55d --- /dev/null +++ b/swh/loader/package/tests/test_loader_metadata.py @@ -0,0 +1,158 @@ +# Copyright (C) 2019-2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import datetime +from typing import Iterator, Optional, Sequence, Tuple + +import attr + +from swh.loader.package.loader import BasePackageInfo, PackageLoader +from swh.model.hashutil import hash_to_bytes +from swh.model.identifiers import SWHID +from swh.model.model import ( + MetadataAuthority, + MetadataAuthorityType, + MetadataFetcher, + MetadataTargetType, + RawExtrinsicMetadata, + Sha1Git, +) +from swh.storage import get_storage + +from swh.loader.package import __version__ + +EMPTY_SNAPSHOT_ID = "1a8893e6a86f444e8be8e7bda6cb34fb1735a00e" +FULL_SNAPSHOT_ID = "4a9b608c9f01860a627237dd2409d1d50ec4b054" + +AUTHORITY = MetadataAuthority( + type=MetadataAuthorityType.FORGE, url="http://example.org/", +) +ORIGIN_URL = "http://example.org/archive.tgz" + +REVISION_ID = hash_to_bytes("8ff44f081d43176474b267de5451f2c2e88089d0") +REVISION_SWHID = SWHID(object_type="revision", object_id=REVISION_ID) + + +FETCHER = MetadataFetcher( + name="swh.loader.package.tests.test_loader_metadata.MetadataTestLoader", + version=__version__, +) + +METADATA = [ + RawExtrinsicMetadata( + type=MetadataTargetType.REVISION, + id=REVISION_SWHID, + discovery_date=datetime.datetime.now(), + authority=AUTHORITY, + fetcher=FETCHER, + format="test-format1", + metadata=b"foo bar", + origin=ORIGIN_URL, + ), + RawExtrinsicMetadata( + type=MetadataTargetType.REVISION, + id=REVISION_SWHID, + discovery_date=datetime.datetime.now() + datetime.timedelta(seconds=1), + authority=AUTHORITY, + fetcher=FETCHER, + format="test-format2", + metadata=b"bar baz", + origin=ORIGIN_URL, + ), +] + + +class MetadataTestLoader(PackageLoader[BasePackageInfo]): + def get_versions(self) -> Sequence[str]: + return ["v1.0.0"] + + def _load_revision(self, p_info: BasePackageInfo, origin) -> Optional[Sha1Git]: + return REVISION_ID + + def get_metadata_authority(self): + return attr.evolve(AUTHORITY, metadata={}) + + def get_package_info(self, version: str) -> Iterator[Tuple[str, BasePackageInfo]]: + m0 = METADATA[0] + m1 = METADATA[1] + p_info = BasePackageInfo( + url=ORIGIN_URL, + filename="archive.tgz", + revision_extrinsic_metadata=[ + (m0.discovery_date, m0.format, m0.metadata), + (m1.discovery_date, m1.format, m1.metadata), + ], + ) + + yield (version, p_info) + + +def test_load_revision_metadata(swh_config, caplog): + storage = get_storage("memory") + + loader = MetadataTestLoader(ORIGIN_URL) + loader.storage = storage + + load_status = loader.load() + assert load_status == { + "status": "eventful", + "snapshot_id": FULL_SNAPSHOT_ID, + } + + result = storage.object_metadata_get( + MetadataTargetType.REVISION, REVISION_SWHID, AUTHORITY, + ) + assert result["next_page_token"] is None + assert result["results"] == METADATA + + assert caplog.text == "" + + +def test_existing_authority(swh_config, caplog): + storage = get_storage("memory") + + loader = MetadataTestLoader(ORIGIN_URL) + loader.storage = storage + loader.config["create_authorities"] = False + + storage.metadata_authority_add([attr.evolve(AUTHORITY, metadata={})]) + + load_status = loader.load() + assert load_status == { + "status": "eventful", + "snapshot_id": FULL_SNAPSHOT_ID, + } + + result = storage.object_metadata_get( + MetadataTargetType.REVISION, REVISION_SWHID, AUTHORITY, + ) + assert result["next_page_token"] is None + assert result["results"] == METADATA + + assert caplog.text == "" + + +def test_existing_fetcher(swh_config, caplog): + storage = get_storage("memory") + + loader = MetadataTestLoader(ORIGIN_URL) + loader.storage = storage + loader.config["create_fetchers"] = False + + storage.metadata_fetcher_add([attr.evolve(FETCHER, metadata={})]) + + load_status = loader.load() + assert load_status == { + "status": "eventful", + "snapshot_id": FULL_SNAPSHOT_ID, + } + + result = storage.object_metadata_get( + MetadataTargetType.REVISION, REVISION_SWHID, AUTHORITY, + ) + assert result["next_page_token"] is None + assert result["results"] == METADATA + + assert caplog.text == ""