diff --git a/docs/package-loader-tutorial.rst b/docs/package-loader-tutorial.rst index 7c09d9d..6fed28e 100644 --- a/docs/package-loader-tutorial.rst +++ b/docs/package-loader-tutorial.rst @@ -1,592 +1,650 @@ .. _package-loader-tutorial: Package Loader Tutorial ======================= In this tutorial, we will see how to write a loader for |swh| that loads packages from a package manager, such as PyPI or Debian's. First, you should be familiar with Python, unit-testing, |swh|'s :ref:`data-model` and :ref:`architecture`, and go through the :ref:`developer-setup`. Creating the files hierarchy ---------------------------- Once this is done, you should create a new directory (ie. a (sub)package from Python's point of view) for you loader. It can be either a subdirectory of ``swh-loader-core/swh/loader/package/`` like the other package loaders, or it can be in its own package. If you choose the latter, you should also create the base file of any Python package (such as ``setup.py``), you should import them from the `swh-py-template`_ repository. In the rest of this tutorial, we will assume you chose the former and your loader is named "New Loader", so your package loader is in ``swh-loader-core/swh/loader/package/newloader/``. Next, you should create boilerplate files needed for SWH loaders: ``__init__.py``, ``tasks.py``, ``tests/__init__.py``, and ``tests/test_tasks.py``; copy them from an existing package, such as ``swh-loader-core/swh/loader/package/pypi/``, and replace the names in those with your loader's. Finally, create an `entrypoint`_ in ``setup.py``, so your loader can be discovered by the SWH Celery workers:: entry_points=""" [swh.workers] loader.newloader=swh.loader.package.newloader:register """, .. _swh-py-template: https://forge.softwareheritage.org/source/swh-py-template/ .. _entrypoint: https://setuptools.readthedocs.io/en/latest/userguide/entry_point.html Writing a minimal loader ------------------------ It is now time for the interesting part: writing the code to load packages from a package manager into the |swh| archive. Create a file named ``loader.py`` in your package's directory, with two empty classes (replace the names with what you think is relevant):: from typing import Optional import attr from swh.loader.package.loader import BasePackageInfo, PackageLoader from swh.model.model import Person, Revision, Sha1Git, TimestampWithTimezone @attr.s class NewPackageInfo(BasePackageInfo): pass class NewLoader(PackageLoader[NewPackageInfo]): visit_type = "newloader" We now have to fill some of the methods declared by :ref:class:`swh.loader.package.PackageLoader`: in your new ``NewLoader`` class. Listing versions ++++++++++++++++ ``get_versions`` should return the list of names of all versions of the origin defined at ``self.url`` by the default constructor; and ``get_default_version`` should return the name of the default version (usually the latest stable release). They are both implemented with an API call to the package repository. For example, for PyPI origin https://pypi.org/project/requests, this is done with a request to https://pypi.org/pypi/requests/json. Getting package information +++++++++++++++++++++++++++ Next, ``get_package_info`` takes as argument a version name (as returned by ``get_versions``) and yields ``(branch_name, p_info)`` tuples, where ``branch_name`` is a string and ``pkg_info`` is an instance of the ``NewPackageInfo`` class we defined earlier. Each of these tuples should match a single file the loader will download from the origin. Usually, there is only one file per versions, but this is not true for all package repositories (eg. CRAN and PyPI allow multiple version artifacts per version). As ``NewPackageInfo`` derives from :py:class:`swh.loader.package.BasePackageInfo`, it can be created like this:: return NewPackageInfo(url="https://...", filename="...-versionX.Y.tar.gz") The ``url`` must be a URL where to download the archive from. ``filename`` is optional, but it is nice to fill it when possible/relevant. The base ``PackageLoader`` will then take care of calling ``get_versions()`` to get all the versions, then call ``get_package_info()`` get the list of archives to download, download them, and load all the directories in the archive. This means you do not need to manage downloads yourself; and we are now done with interactions with the package repository. Building a revision +++++++++++++++++++ The final step for your minimal loader to work, is to implement ``build_revision``. This is a very important part, as it will create a revision object that will be inserted in |swh|, as a link between origins and the directories. This function takes three important arguments: * ``p_info`` is an object returned by ``get_package_info()`` * ``uncompressed_path`` is the location on the disk where the base ``PackageLoader`` extracted the archive, so you can access files from the archive. * ``directory`` is an :term:`intrinsic identifier` of the directory that was loaded from the archive The way to implement it depends very much on how the package manager works, but here is a rough idea:: def build_revision( self, p_info: NewPackageInfo, uncompressed_path: str, directory: Sha1Git ) -> Optional[Revision]: author = Person(name="Jane Doe", email="jdoe@example.org") date = TimestampWithTimezone.from_iso8601("2021-04-01T11:55:20Z") return Revision( type=RevisionType.TAR, message="This is a new release of the project", author=author, date=date, committer=author, committer_date=date, parents=(), directory=directory, synthetic=True, ) The strings here are placeholders, and you should extract them from either the extracted archive (using ``uncompressed_path``), or from the package repository's API. The various classes used in this example are :py:class:`swh.model.model.Person`, :py:class:`swh.model.model.TimestampWithTimezone`, and :py:class:`swh.model.model.Revision`. Note that you have access to the ``NewPackageInfo`` object created by ``get_package_info()``, so you can extend the ``NewPackageInfo`` class to pass data between these two functions. A few caveats: * Make sure the timezone matches the source's * ``Person`` can also be built with just a ``fullname``, if there aren't distinct fields for name and email. When in doubt, it's better to just write the ``fullname`` than try to parse it * ``author`` and ``committer`` (resp. ``date`` and ``committer_date``) may be different if the release was written and published by different people (resp. dates). This is only relevant when loading from VCS, so you can usually ignore it in you package loader. Running your loader +++++++++++++++++++ With Docker ^^^^^^^^^^^ We recommend you use our `Docker environment`_ to test your loader. In short, install Docker, ``cd`` to ``swh-environment/docker/``, then `edit docker-compose.override.yml`_ to insert your new loader in the Docker environment, something like this will do:: version: '2' services: swh-loader-core: volumes: - "$HOME/swh-environment/swh-loader-core:/src/swh-loader-core" Then start the Docker environment:: docker-compose start Then, you can run your loader:: docker-compose exec swh-loader swh loader run newloader "https://example.org/~jdoe/project/" where ``newloader`` is the name you registered as an entrypoint in ``setup.py`` and ``https://example.org/~jdoe/project/`` is the origin URL, that will be set as the ``self.url`` attribute of your loader. For example, to run the PyPI loader, the command would be:: docker-compose exec swh-loader swh loader run pypi "https://pypi.org/project/requests/" If you get this error, make sure you properly configured ``docker-compose.override.yml``:: Error: Invalid value for '[...]': invalid choice: newloader Without Docker ^^^^^^^^^^^^^^ If you do not want to use the Docker environment, you will need to start an :ref:`swh-storage` instance yourself, and create a config file that references it:: storage: cls: remote url: http://localhost:5002/ Or alternatively, this more efficient configuration:: storage: cls: pipeline steps: - cls: buffer min_batch_size: content: 10000 content_bytes: 104857600 directory: 1000 revision: 1000 - cls: filter - cls: remote url: http://localhost:5002/ And run your loader with:: swh loader -C loader.yml run newloader "https://example.org/~jdoe/project/" where ``newloader`` is the name you registered as an entrypoint in ``setup.py`` and ``https://example.org/~jdoe/project/`` is the origin URL, that will be set as the ``self.url`` attribute of your loader. For example, with PyPI:: swh loader -C loader.yml run pypi "https://pypi.org/project/requests/" .. _Docker environment: https://forge.softwareheritage.org/source/swh-environment/browse/master/docker/ .. _edit docker-compose.override.yml: https://forge.softwareheritage.org/source/swh-environment/browse/master/docker/#install-a-swh-package-from Testing your loader +++++++++++++++++++ You must write tests for your loader. First, of course, unit tests for the internal functions of your loader, if any (eg. the functions used to extract metadata); but this is not covered in this tutorial. Most importantly, you should write integration tests for your loader, that will simulate an origin, run the loader, and check everything is loaded in the storage as it should be. As we do not want tests to directly query an origin (it makes tests flaky, hard to reproduce, and put unnecessary load on the origin), we usually mock it using the :py:func:`swh.core.pytest_plugin.requests_mock_datadir` fixture It works by creating a ``data/`` folder in your tests (such as ``swh/loader/package/newloader/tests/data/``) and downloading results from API calls there, in the structured documented in :py:func:`swh.core.pytest_plugin.requests_mock_datadir_factory` The files in the ``datadir/`` will then be served whenever the loader tries to access an URL. This is very dependent on the kind of repositories your loader will read from, so here is an example with the PyPI loader. The files ``swh/loader/package/pypi/tests/data/https_pypi.org/pypi_nexter_json`` and ``swh/loader/package/pypi/tests/data/https_files.pythonhosted.org/nexter-*`` are used in this test:: from swh.loader.tests import assert_last_visit_matches, check_snapshot, get_stats def test_pypi_visit_1_release_with_2_artifacts(swh_storage, requests_mock_datadir): # Initialize the loader url = "https://pypi.org/project/nexter" loader = PyPILoader(swh_storage, url) # Run the loader, with a swh-storage instance, on the given URL. # HTTP calls will be mocked by the requests_mock_datadir fixture actual_load_status = loader.load() # Check the loader loaded exactly the snapshot we expected # (when writing your tests for the first time, you cannot know the # snapshot id without running your loader; so let it error and write # down the result here) expected_snapshot_id = hash_to_bytes("a27e638a4dad6fbfa273c6ebec1c4bf320fb84c6") assert actual_load_status == { "status": "eventful", "snapshot_id": expected_snapshot_id.hex(), } # Check the content of the snapshot. (ditto) expected_snapshot = Snapshot( id=expected_snapshot_id, branches={ b"releases/1.1.0/nexter-1.1.0.zip": SnapshotBranch( target=hash_to_bytes("4c99891f93b81450385777235a37b5e966dd1571"), target_type=TargetType.REVISION, ), b"releases/1.1.0/nexter-1.1.0.tar.gz": SnapshotBranch( target=hash_to_bytes("0bf88f5760cca7665d0af4d6575d9301134fe11a"), target_type=TargetType.REVISION, ), }, ) check_snapshot(expected_snapshot, swh_storage) # Check the visit was properly created with the right type assert_last_visit_matches( swh_storage, url, status="full", type="pypi", snapshot=expected_snapshot.id ) # Then you could check the directory structure: directory_id = swh_storage.revision_get( [hash_to_bytes("4c99891f93b81450385777235a37b5e966dd1571")] )[0].directory entries = list(swh_storage.directory_ls(directory_id, recursive=True)) assert entries == [ ... ] Here are some scenarios you should test, when relevant: * No versions * One version * Two or more versions * More than one package per version, if relevant * Corrupt packages (missing metadata, ...), if relevant * API errors * etc. Making your loader incremental ------------------------------ In the previous sections, you wrote a fully functional loader for a new type of package repository. This is great! Please tell us about it, and -:ref:`submit it for review ` so we can give you some feedback. +:ref:`submit it for review ` so we can give you some feedback early. Now, we will see a key optimization for any package loader: skipping packages it already downloaded, using :term:`extids `. The rough idea it to find some way to uniquely identify packages before downloading them and encode it in a short string, the ExtID. Using checksums +++++++++++++++ Ideally, this short string is a checksum of the archive, provided by the API before downloading the archive itself. This is ideal, because this ensures that we detect changes in the package's content even if it keeps the same name and version number. If this is not the case of the repository you want to load from, skip to the next subsection. This is used for example by the PyPI loader (with a sha256sum) and the NPM loader (with a sha1sum). The Debian loader uses a similar scheme: as a single package is assembled from a set of tarballs, it only uses the hash of the ``.dsc`` file, which itself contains a hash of all the tarballs. This is implemented by overriding the ``extid`` method of you ``NewPackageInfo`` class, that returns the type of the ExtID (see below) and the ExtID itself:: from swh.loader.package.loader import PartialExtID EXTID_TYPE: str = "pypi-archive-sha256" @attr.s class NewPackageInfo(BasePackageInfo): sha256: str def extid(self) -> PartialExtID: return (EXTID_TYPE, hash_to_bytes(self.sha256)) and the loader's ``get_package_info`` method sets the right value in the ``sha256`` attribute. Using a custom manifest +++++++++++++++++++++++ Unfortunaly, this does not work for all packages, as some package repositories do not provide a checksum of the archives via their API. If this is the case of the repository you want to load from, you need to find a way around it. It highly depends on the repository, so this tutorial cannot cover how to do it. We do however provide an easy option that should work in most cases: creating a "manifest" of the archive with some metadata in it, and hashing it. For example, when loading from the GNU FTP servers, we have access to some metadata, that is somewhat good enough to deduplicate. We write them all in a string and hash that string. It is done like this:: import string @attr.s class ArchivePackageInfo(BasePackageInfo): length = attr.ib(type=int) """Size of the archive file""" time = attr.ib(type=Union[str, datetime.datetime]) """Timestamp of the archive file on the server""" version = attr.ib(type=str) EXTID_FORMAT = "package-manifest-sha256" MANIFEST_FORMAT = string.Template("$time $length $version $url") The default implementation of :py:func:`swh.loader.package.loader.BasePackageInfo.extid` will read this template, substitute the variables based on the object's attributes, compute the hash of the result, and return it. Note that, as mentioned before, this is not perfect because a tarball may be replaced with a different tarball of exactly the same length and modification time, and we won't detect it. But this is extremely unlikely, so we consider it to be good enough. Alternatively, if this is not good enough for your loader, you can simply not implement ExtIDs, and your loader will always load all tarballs. This can be bandwidth-heavy for both |swh| and the origin you are loaded from, so this decision should not be taken lightly. Choosing the ExtID type +++++++++++++++++++++++ The type of your ExtID should be a short ASCII string, that is both unique to your loader and descriptive of how it was computed. Why unique to the loader? Because different loaders may load the same archive differently. For example, if I was to create an archive with both a ``PKG-INFO`` and a ``package.json`` file, and submit it to both NPM and PyPI, both package repositories would have exactly the same tarball. But the NPM loader would create the revision based on authorship info in ``package.json``, and the PyPI loader based on ``PKG-INFO``. But we do not want the PyPI loader to assume it already created a revision itself, while the revision was created by the NPM loader! And why descriptive? This is simply for future-proofing; in case your loader changes the format of the ExtID (eg. by using a different hash algorithm). Testing your incremental loading ++++++++++++++++++++++++++++++++ If you followed the steps above, your loader is now able to detect what packages it already downloaded and skip them. This is what we call an incremental loader. It is now time to write tests to make sure your loader fulfills this promise. This time, we want to use ``requests_mock_datadir_visits`` instead of ``requests_mock_datadir``, because we want to mock the repository's API to emulate its results changing over time (eg. because a new version was published between two runs of the loader). See the documentation of :py:func:`swh.core.pytest_plugin.requests_mock_datadir_factory` for a description of the file layout to use. Let's take, once again, a look at ``swh/loader/package/pypi/tests/test_pypi.py``, to use as an example:: def test_pypi_incremental_visit(swh_storage, requests_mock_datadir_visits): """With prior visit, 2nd load will result with a different snapshot """ # Initialize the loader url = "https://pypi.org/project/0805nexter" loader = PyPILoader(swh_storage, url) # First visit visit1_actual_load_status = loader.load() visit1_stats = get_stats(swh_storage) # Make sure everything is in order expected_snapshot_id = hash_to_bytes("ba6e158ada75d0b3cfb209ffdf6daa4ed34a227a") assert visit1_actual_load_status == { "status": "eventful", "snapshot_id": expected_snapshot_id.hex(), } assert_last_visit_matches( swh_storage, url, status="full", type="pypi", snapshot=expected_snapshot_id ) assert { "content": 6, "directory": 4, "origin": 1, "origin_visit": 1, "release": 0, "revision": 2, "skipped_content": 0, "snapshot": 1, } == visit1_stats # Reset internal state del loader._cached__raw_info del loader._cached_info # Second visit visit2_actual_load_status = loader.load() visit2_stats = get_stats(swh_storage) # Check the result of the visit assert visit2_actual_load_status["status"] == "eventful", visit2_actual_load_status expected_snapshot_id2 = hash_to_bytes("2e5149a7b0725d18231a37b342e9b7c4e121f283") assert visit2_actual_load_status == { "status": "eventful", "snapshot_id": expected_snapshot_id2.hex(), } assert_last_visit_matches( swh_storage, url, status="full", type="pypi", snapshot=expected_snapshot_id2 ) assert { "content": 6 + 1, # 1 more content "directory": 4 + 2, # 2 more directories "origin": 1, "origin_visit": 1 + 1, "release": 0, "revision": 2 + 1, # 1 more revision "skipped_content": 0, "snapshot": 1 + 1, # 1 more snapshot } == visit2_stats # Check all content objects were loaded expected_contents = map( hash_to_bytes, [ "a61e24cdfdab3bb7817f6be85d37a3e666b34566", "938c33483285fd8ad57f15497f538320df82aeb8", "a27576d60e08c94a05006d2e6d540c0fdb5f38c8", "405859113963cb7a797642b45f171d6360425d16", "e5686aa568fdb1d19d7f1329267082fe40482d31", "83ecf6ec1114fd260ca7a833a2d165e71258c338", "92689fa2b7fb4d4fc6fb195bf73a50c87c030639", ], ) assert list(swh_storage.content_missing_per_sha1(expected_contents)) == [] # Check all directory objects were loaded expected_dirs = map( hash_to_bytes, [ "05219ba38bc542d4345d5638af1ed56c7d43ca7d", "cf019eb456cf6f78d8c4674596f1c9a97ece8f44", "b178b66bd22383d5f16f4f5c923d39ca798861b4", "c3a58f8b57433a4b56caaa5033ae2e0931405338", "e226e7e4ad03b4fc1403d69a18ebdd6f2edd2b3a", "52604d46843b898f5a43208045d09fcf8731631b", ], ) assert list(swh_storage.directory_missing(expected_dirs)) == [] # etc. Loading metadata ---------------- -TODO +Finally, an optional step: collecting and loading :term:`extrinsic metadata`. +This is metadata that your loader may collect while loading an origin. +For example, the PyPI loader collects some parts of the API response +(eg. https://pypi.org/pypi/requests/json) + +They are stored as raw bytestring, along with a format (an ASCII string) and +a date of discovery (usually the time your loader ran). + +This is done by adding them to the ``directory_extrinsic_metadata`` attribute of +your ``NewPackageInfo`` object when creating it in ``get_package_info`` +as :py:cls:`swh.loader.package.loader.RawExtrinsicMetadataCore` objects:: + + NewPackageInfo( + ..., + directory_extrinsic_metadata=[ + RawExtrinsicMetadataCore( + format="new-format", + metadata=b"foo bar baz", + discovery_date=datetime.datetime(...), + ) + ] + ) + +``format`` should be a human-readable ASCII string that unambiguously describes +the format. Readers of the metadata object will have a built-in list of formats +they understand, and will check if your metadata object is among them. +You should use one of the :ref:`known metadata formats ` +if possible, or add yours to this list. + +``metadata`` is the metadata object itself. When possible, it should be copied verbatim +from the source object you got, and should not be created by the loader. +If this is not possible, for example because it is extracted from a larger +JSON or XML document, make sure you do as little modifications as possible to reduce +the risks of corruption. + +``discovery_date`` is optional, and defaults to the time your loader started working. + + +In theory, you can write extrinsic metadata on any kind of objects, eg. by implementing +:py:meth:`swh.loader.package.loader.PackageLoader.get_extrinsic_origin_metadata`, +:py:meth:`swh.loader.package.loader.PackageLoader.get_extrinsic_snapshot_metadata`; +but this is rarely relevant in practice. +Be sure to check if loader can find any potentially interesting metadata, though! + + +Final words +----------- + +Congratulations, you made it to the end. +If you have not already, please `contact us`_ to tell us about your new loader, +and :ref:`submit your loader for review ` on our forge +so we can merge it and run it along our other loaders to archive more repositories. + +And if you have any change in mind to improve this tutorial for future readers, +please submit them too. + +Thank you for your contributions! + +.. _contact us: https://www.softwareheritage.org/community/developers/ diff --git a/swh/loader/package/loader.py b/swh/loader/package/loader.py index 63bfd17..820a75f 100644 --- a/swh/loader/package/loader.py +++ b/swh/loader/package/loader.py @@ -1,1034 +1,1036 @@ # Copyright (C) 2019-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import hashlib from itertools import islice import json import logging import os import string import sys import tempfile from typing import ( Any, Dict, Generic, Iterable, Iterator, List, Mapping, Optional, Sequence, Set, Tuple, TypeVar, ) import attr import sentry_sdk from swh.core.tarball import uncompress from swh.loader.core.loader import BaseLoader from swh.loader.exception import NotFound from swh.loader.package.utils import download from swh.model import from_disk from swh.model.collections import ImmutableDict from swh.model.hashutil import hash_to_hex from swh.model.identifiers import ( CoreSWHID, ExtendedObjectType, ExtendedSWHID, ObjectType, ) from swh.model.model import ( ExtID, MetadataAuthority, MetadataAuthorityType, MetadataFetcher, Origin, OriginVisit, OriginVisitStatus, RawExtrinsicMetadata, Revision, Sha1Git, Snapshot, TargetType, ) from swh.storage.algos.snapshot import snapshot_get_latest from swh.storage.interface import StorageInterface from swh.storage.utils import now logger = logging.getLogger(__name__) SWH_METADATA_AUTHORITY = MetadataAuthority( type=MetadataAuthorityType.REGISTRY, url="https://softwareheritage.org/", metadata={}, ) """Metadata authority for extrinsic metadata generated by Software Heritage. Used for metadata on "original artifacts", ie. length, filename, and checksums of downloaded archive files.""" PartialExtID = Tuple[str, bytes] """The ``extid_type`` and ``extid`` fields of an :ref:py:`ExtID` object.""" @attr.s class RawExtrinsicMetadataCore: """Contains the core of the metadata extracted by a loader, that will be used to build a full RawExtrinsicMetadata object by adding object identifier, context, and provenance information.""" format = attr.ib(type=str) metadata = attr.ib(type=bytes) discovery_date = attr.ib(type=Optional[datetime.datetime], default=None) """Defaults to the visit date.""" @attr.s class BasePackageInfo: """Compute the primary key for a dict using the id_keys as primary key composite. Args: d: A dict entry to compute the primary key on id_keys: Sequence of keys to use as primary key Returns: The identity for that dict entry """ url = attr.ib(type=str) filename = attr.ib(type=Optional[str]) MANIFEST_FORMAT: Optional[string.Template] = None """If not None, used by the default extid() implementation to format a manifest, before hashing it to produce an ExtID.""" EXTID_TYPE: str = "package-manifest-sha256" # The following attribute has kw_only=True in order to allow subclasses # to add attributes. Without kw_only, attributes without default values cannot # go after attributes with default values. # See directory_extrinsic_metadata = attr.ib( type=List[RawExtrinsicMetadataCore], default=[], kw_only=True, ) + """:term:`extrinsic metadata` collected by the loader, that will be attached to the + loaded directory and added to the Metadata storage.""" - # TODO: add support for metadata for directories and contents + # TODO: add support for metadata for revisions and contents def extid(self) -> Optional[PartialExtID]: """Returns a unique intrinsic identifier of this package info, or None if this package info is not 'deduplicatable' (meaning that we will always load it, instead of checking the ExtID storage to see if we already did)""" if self.MANIFEST_FORMAT is None: return None else: manifest = self.MANIFEST_FORMAT.substitute( {k: str(v) for (k, v) in attr.asdict(self).items()} ) return (self.EXTID_TYPE, hashlib.sha256(manifest.encode()).digest()) TPackageInfo = TypeVar("TPackageInfo", bound=BasePackageInfo) class PackageLoader(BaseLoader, Generic[TPackageInfo]): # Origin visit type (str) set by the loader visit_type = "" visit_date: datetime.datetime def __init__( self, storage: StorageInterface, url: str, max_content_size: Optional[int] = None, ): """Loader's constructor. This raises exception if the minimal required configuration is missing (cf. fn:`check` method). Args: storage: Storage instance url: Origin url to load data from """ super().__init__(storage=storage, max_content_size=max_content_size) self.url = url self.visit_date = datetime.datetime.now(tz=datetime.timezone.utc) def get_versions(self) -> Sequence[str]: """Return the list of all published package versions. Raises: `class:swh.loader.exception.NotFound` error when failing to read the published package versions. Returns: Sequence of published versions """ return [] def get_package_info(self, version: str) -> Iterator[Tuple[str, TPackageInfo]]: """Given a release version of a package, retrieve the associated package information for such version. Args: version: Package version Returns: (branch name, package metadata) """ yield from {} def build_revision( self, p_info: TPackageInfo, uncompressed_path: str, directory: Sha1Git ) -> Optional[Revision]: """Build the revision from the archive metadata (extrinsic artifact metadata) and the intrinsic metadata. Args: p_info: Package information uncompressed_path: Artifact uncompressed path on disk Returns: Revision object """ raise NotImplementedError("build_revision") def get_default_version(self) -> str: """Retrieve the latest release version if any. Returns: Latest version """ return "" def last_snapshot(self) -> Optional[Snapshot]: """Retrieve the last snapshot out of the last visit. """ return snapshot_get_latest(self.storage, self.url) def known_artifacts( self, snapshot: Optional[Snapshot] ) -> Dict[Sha1Git, Optional[ImmutableDict[str, object]]]: """Retrieve the known releases/artifact for the origin. Args snapshot: snapshot for the visit Returns: Dict of keys revision id (bytes), values a metadata Dict. """ if not snapshot: return {} # retrieve only revisions (e.g the alias we do not want here) revs = [ rev.target for rev in snapshot.branches.values() if rev and rev.target_type == TargetType.REVISION ] known_revisions = self.storage.revision_get(revs) return { revision.id: revision.metadata for revision in known_revisions if revision } def new_packageinfo_to_extid(self, p_info: TPackageInfo) -> Optional[PartialExtID]: return p_info.extid() def known_artifact_to_extid(self, known_artifact: Dict) -> Optional[PartialExtID]: """Returns a unique intrinsic identifier of a downloaded artifact, used to check if a new artifact is the same.""" return None def resolve_revision_from_artifacts( self, known_artifacts: Dict[Sha1Git, Any], p_info: TPackageInfo, ) -> Optional[Sha1Git]: """Resolve the revision from known artifact metadata and a package info object. If the artifact has already been downloaded, this will return the existing revision targeting that uncompressed artifact directory. Otherwise, this returns None. Args: known_artifacts: dict from revision ids to revision metadata p_info: Package information Returns: None or revision identifier """ if not known_artifacts: # No known artifact, no need to compute the artifact's extid return None new_extid = self.new_packageinfo_to_extid(p_info) if new_extid is None: # This loader does not support deduplication, at least not for this # artifact. return None for rev_id, known_artifact in known_artifacts.items(): known_extid = self.known_artifact_to_extid(known_artifact) if new_extid == known_extid: return rev_id return None def _get_known_extids( self, packages_info: List[TPackageInfo] ) -> Dict[PartialExtID, List[CoreSWHID]]: """Compute the ExtIDs from new PackageInfo objects, searches which are already loaded in the archive, and returns them if any.""" # Compute the ExtIDs of all the new packages, grouped by extid type new_extids: Dict[str, List[bytes]] = {} for p_info in packages_info: res = p_info.extid() if res is not None: (extid_type, extid_extid) = res new_extids.setdefault(extid_type, []).append(extid_extid) # For each extid type, call extid_get_from_extid() with all the extids of # that type, and store them in the '(type, extid) -> target' map. known_extids: Dict[PartialExtID, List[CoreSWHID]] = {} for (extid_type, extids) in new_extids.items(): for extid in self.storage.extid_get_from_extid(extid_type, extids): if extid is not None: key = (extid.extid_type, extid.extid) known_extids.setdefault(key, []).append(extid.target) return known_extids def resolve_revision_from_extids( self, known_extids: Dict[PartialExtID, List[CoreSWHID]], p_info: TPackageInfo, revision_whitelist: Set[Sha1Git], ) -> Optional[Sha1Git]: """Resolve the revision from known ExtIDs and a package info object. If the artifact has already been downloaded, this will return the existing revision targeting that uncompressed artifact directory. Otherwise, this returns None. Args: known_extids: Dict built from a list of ExtID, with the target as value p_info: Package information revision_whitelist: Any ExtID with target not in this set is filtered out Returns: None or revision identifier """ new_extid = p_info.extid() if new_extid is None: return None for extid_target in known_extids.get(new_extid, []): if extid_target.object_id not in revision_whitelist: # There is a known ExtID for this package, but its target is not # in the snapshot. # This can happen for three reasons: # # 1. a loader crashed after writing the ExtID, but before writing # the snapshot # 2. some other loader loaded the same artifact, but produced # a different revision, causing an additional ExtID object # to be written. We will probably find this loader's ExtID # in a future iteration of this loop. # Note that for now, this is impossible, as each loader has a # completely different extid_type, but this is an implementation # detail of each loader. # 3. we took a snapshot, then the package disappeared, # then we took another snapshot, and the package reappeared # # In case of 1, we must actually load the package now, # so let's do it. # TODO: detect when we are in case 3 using revision_missing instead # of the snapshot. continue elif extid_target.object_type != ObjectType.REVISION: # We only support revisions for now. # Note that this case should never be reached unless there is a # collision between a revision hash and some non-revision object's # hash, but better safe than sorry. logger.warning( "%s is in the revision whitelist, but is not a revision.", hash_to_hex(extid_target.object_type), ) continue return extid_target.object_id return None def download_package( self, p_info: TPackageInfo, tmpdir: str ) -> List[Tuple[str, Mapping]]: """Download artifacts for a specific package. All downloads happen in in the tmpdir folder. Default implementation expects the artifacts package info to be about one artifact per package. Note that most implementation have 1 artifact per package. But some implementation have multiple artifacts per package (debian), some have none, the package is the artifact (gnu). Args: artifacts_package_info: Information on the package artifacts to download (url, filename, etc...) tmpdir: Location to retrieve such artifacts Returns: List of (path, computed hashes) """ return [download(p_info.url, dest=tmpdir, filename=p_info.filename)] def uncompress( self, dl_artifacts: List[Tuple[str, Mapping[str, Any]]], dest: str ) -> str: """Uncompress the artifact(s) in the destination folder dest. Optionally, this could need to use the p_info dict for some more information (debian). """ uncompressed_path = os.path.join(dest, "src") for a_path, _ in dl_artifacts: uncompress(a_path, dest=uncompressed_path) return uncompressed_path def extra_branches(self) -> Dict[bytes, Mapping[str, Any]]: """Return an extra dict of branches that are used to update the set of branches. """ return {} def finalize_visit( self, *, snapshot: Optional[Snapshot], visit: OriginVisit, status_visit: str, status_load: str, failed_branches: List[str], ) -> Dict[str, Any]: """Finalize the visit: - flush eventual unflushed data to storage - update origin visit's status - return the task's status """ self.storage.flush() snapshot_id: Optional[bytes] = None if snapshot and snapshot.id: # to prevent the snapshot.id to b"" snapshot_id = snapshot.id assert visit.visit visit_status = OriginVisitStatus( origin=self.url, visit=visit.visit, type=self.visit_type, date=now(), status=status_visit, snapshot=snapshot_id, ) self.storage.origin_visit_status_add([visit_status]) result: Dict[str, Any] = { "status": status_load, } if snapshot_id: result["snapshot_id"] = hash_to_hex(snapshot_id) if failed_branches: logger.warning("%d failed branches", len(failed_branches)) for i, urls in enumerate(islice(failed_branches, 50)): prefix_url = "Failed branches: " if i == 0 else "" logger.warning("%s%s", prefix_url, urls) return result def load(self) -> Dict: """Load for a specific origin the associated contents. 1. Get the list of versions in an origin. 2. Get the snapshot from the previous run of the loader, and filter out versions that were already loaded, if their :term:`extids ` match Then, for each remaining version in the origin 3. Fetch the files for one package version By default, this can be implemented as a simple HTTP request. Loaders with more specific requirements can override this, e.g.: the PyPI loader checks the integrity of the downloaded files; the Debian loader has to download and check several files for one package version. 4. Extract the downloaded files. By default, this would be a universal archive/tarball extraction. Loaders for specific formats can override this method (for instance, the Debian loader uses dpkg-source -x). 5. Convert the extracted directory to a set of Software Heritage objects Using swh.model.from_disk. 6. Extract the metadata from the unpacked directories This would only be applicable for "smart" loaders like npm (parsing the package.json), PyPI (parsing the PKG-INFO file) or Debian (parsing debian/changelog and debian/control). On "minimal-metadata" sources such as the GNU archive, the lister should provide the minimal set of metadata needed to populate the revision/release objects (authors, dates) as an argument to the task. 7. Generate the revision/release objects for the given version. From the data generated at steps 3 and 4. end for each 8. Generate and load the snapshot for the visit Using the revisions/releases collected at step 7., and the branch information from step 2., generate a snapshot and load it into the Software Heritage archive """ status_load = "uneventful" # either: eventful, uneventful, failed status_visit = "full" # see swh.model.model.OriginVisitStatus snapshot = None failed_branches: List[str] = [] # Prepare origin and origin_visit origin = Origin(url=self.url) try: self.storage.origin_add([origin]) visit = list( self.storage.origin_visit_add( [ OriginVisit( origin=self.url, date=self.visit_date, type=self.visit_type, ) ] ) )[0] except Exception as e: logger.exception("Failed to initialize origin_visit for %s", self.url) sentry_sdk.capture_exception(e) return {"status": "failed"} # Get the previous snapshot for this origin. It is then used to see which # of the package's versions are already loaded in the archive. try: last_snapshot = self.last_snapshot() logger.debug("last snapshot: %s", last_snapshot) known_artifacts = self.known_artifacts(last_snapshot) logger.debug("known artifacts: %s", known_artifacts) except Exception as e: logger.exception("Failed to get previous state for %s", self.url) sentry_sdk.capture_exception(e) return self.finalize_visit( snapshot=snapshot, visit=visit, failed_branches=failed_branches, status_visit="failed", status_load="failed", ) load_exceptions: List[Exception] = [] # Get the list of all version names try: versions = self.get_versions() except NotFound: return self.finalize_visit( snapshot=snapshot, visit=visit, failed_branches=failed_branches, status_visit="not_found", status_load="failed", ) except Exception: return self.finalize_visit( snapshot=snapshot, visit=visit, failed_branches=failed_branches, status_visit="failed", status_load="failed", ) # Get the metadata of each version's package packages_info: List[Tuple[str, str, TPackageInfo]] = [ (version, branch_name, p_info) for version in versions for (branch_name, p_info) in self.get_package_info(version) ] # Compute the ExtID of each of these packages known_extids = self._get_known_extids( [p_info for (_, _, p_info) in packages_info] ) if last_snapshot is None: last_snapshot_targets: Set[Sha1Git] = set() else: last_snapshot_targets = { branch.target for branch in last_snapshot.branches.values() } new_extids: Set[ExtID] = set() tmp_revisions: Dict[str, List[Tuple[str, Sha1Git]]] = { version: [] for version in versions } for (version, branch_name, p_info) in packages_info: logger.debug("package_info: %s", p_info) # Check if the package was already loaded, using its ExtID revision_id = self.resolve_revision_from_extids( known_extids, p_info, last_snapshot_targets ) if revision_id is None: # No existing revision found from an acceptable ExtID, # search in the artifact data instead. # TODO: remove this after we finished migrating to ExtIDs. revision_id = self.resolve_revision_from_artifacts( known_artifacts, p_info ) if revision_id is None: # No matching revision found in the last snapshot, load it. try: res = self._load_revision(p_info, origin) if res: (revision_id, directory_id) = res assert revision_id assert directory_id self._load_extrinsic_directory_metadata( p_info, revision_id, directory_id ) self.storage.flush() status_load = "eventful" except Exception as e: self.storage.clear_buffers() load_exceptions.append(e) sentry_sdk.capture_exception(e) logger.exception( "Failed loading branch %s for %s", branch_name, self.url ) failed_branches.append(branch_name) continue if revision_id is None: continue partial_extid = p_info.extid() if partial_extid is not None: (extid_type, extid) = partial_extid revision_swhid = CoreSWHID( object_type=ObjectType.REVISION, object_id=revision_id ) new_extids.add( ExtID(extid_type=extid_type, extid=extid, target=revision_swhid) ) tmp_revisions[version].append((branch_name, revision_id)) if load_exceptions: status_visit = "partial" if not tmp_revisions: # We could not load any revisions; fail completely return self.finalize_visit( snapshot=snapshot, visit=visit, failed_branches=failed_branches, status_visit="failed", status_load="failed", ) try: # Retrieve the default release version (the "latest" one) default_version = self.get_default_version() logger.debug("default version: %s", default_version) # Retrieve extra branches extra_branches = self.extra_branches() logger.debug("extra branches: %s", extra_branches) snapshot = self._load_snapshot( default_version, tmp_revisions, extra_branches ) self.storage.flush() except Exception as e: logger.exception("Failed to build snapshot for origin %s", self.url) sentry_sdk.capture_exception(e) status_visit = "failed" status_load = "failed" if snapshot: try: metadata_objects = self.build_extrinsic_snapshot_metadata(snapshot.id) self._load_metadata_objects(metadata_objects) except Exception as e: logger.exception( "Failed to load extrinsic snapshot metadata for %s", self.url ) sentry_sdk.capture_exception(e) status_visit = "partial" status_load = "failed" try: metadata_objects = self.build_extrinsic_origin_metadata() self._load_metadata_objects(metadata_objects) except Exception as e: logger.exception( "Failed to load extrinsic origin metadata for %s", self.url ) sentry_sdk.capture_exception(e) status_visit = "partial" status_load = "failed" self._load_extids(new_extids) return self.finalize_visit( snapshot=snapshot, visit=visit, failed_branches=failed_branches, status_visit=status_visit, status_load=status_load, ) def _load_directory( self, dl_artifacts: List[Tuple[str, Mapping[str, Any]]], tmpdir: str ) -> Tuple[str, from_disk.Directory]: uncompressed_path = self.uncompress(dl_artifacts, dest=tmpdir) logger.debug("uncompressed_path: %s", uncompressed_path) directory = from_disk.Directory.from_disk( path=uncompressed_path.encode("utf-8"), max_content_length=self.max_content_size, ) contents, skipped_contents, directories = from_disk.iter_directory(directory) logger.debug("Number of skipped contents: %s", len(skipped_contents)) self.storage.skipped_content_add(skipped_contents) logger.debug("Number of contents: %s", len(contents)) self.storage.content_add(contents) logger.debug("Number of directories: %s", len(directories)) self.storage.directory_add(directories) return (uncompressed_path, directory) def _load_revision( self, p_info: TPackageInfo, origin ) -> Optional[Tuple[Sha1Git, Sha1Git]]: """Does all the loading of a revision itself: * downloads a package and uncompresses it * loads it from disk * adds contents, directories, and revision to self.storage * returns (revision_id, directory_id) Raises exception when unable to download or uncompress artifacts """ with tempfile.TemporaryDirectory() as tmpdir: dl_artifacts = self.download_package(p_info, tmpdir) (uncompressed_path, directory) = self._load_directory(dl_artifacts, tmpdir) # FIXME: This should be release. cf. D409 revision = self.build_revision( p_info, uncompressed_path, directory=directory.hash ) if not revision: # Some artifacts are missing intrinsic metadata # skipping those return None metadata = [metadata for (filepath, metadata) in dl_artifacts] extra_metadata: Tuple[str, Any] = ( "original_artifact", metadata, ) if revision.metadata is not None: full_metadata = list(revision.metadata.items()) + [extra_metadata] else: full_metadata = [extra_metadata] # TODO: don't add these extrinsic metadata to the revision. revision = attr.evolve(revision, metadata=ImmutableDict(full_metadata)) original_artifact_metadata = RawExtrinsicMetadata( target=ExtendedSWHID( object_type=ExtendedObjectType.DIRECTORY, object_id=revision.directory ), discovery_date=self.visit_date, authority=SWH_METADATA_AUTHORITY, fetcher=self.get_metadata_fetcher(), format="original-artifacts-json", metadata=json.dumps(metadata).encode(), origin=self.url, revision=CoreSWHID(object_type=ObjectType.REVISION, object_id=revision.id), ) self._load_metadata_objects([original_artifact_metadata]) logger.debug("Revision: %s", revision) self.storage.revision_add([revision]) assert directory.hash return (revision.id, directory.hash) def _load_snapshot( self, default_version: str, revisions: Dict[str, List[Tuple[str, bytes]]], extra_branches: Dict[bytes, Mapping[str, Any]], ) -> Optional[Snapshot]: """Build snapshot out of the current revisions stored and extra branches. Then load it in the storage. """ logger.debug("revisions: %s", revisions) # Build and load the snapshot branches = {} # type: Dict[bytes, Mapping[str, Any]] for version, branch_name_revisions in revisions.items(): if version == default_version and len(branch_name_revisions) == 1: # only 1 branch (no ambiguity), we can create an alias # branch 'HEAD' branch_name, _ = branch_name_revisions[0] # except for some corner case (deposit) if branch_name != "HEAD": branches[b"HEAD"] = { "target_type": "alias", "target": branch_name.encode("utf-8"), } for branch_name, target in branch_name_revisions: branches[branch_name.encode("utf-8")] = { "target_type": "revision", "target": target, } # Deal with extra-branches for name, branch_target in extra_branches.items(): if name in branches: logger.error("Extra branch '%s' has been ignored", name) else: branches[name] = branch_target snapshot_data = {"branches": branches} logger.debug("snapshot: %s", snapshot_data) snapshot = Snapshot.from_dict(snapshot_data) logger.debug("snapshot: %s", snapshot) self.storage.snapshot_add([snapshot]) return snapshot def get_loader_name(self) -> str: """Returns a fully qualified name of this loader.""" return f"{self.__class__.__module__}.{self.__class__.__name__}" def get_loader_version(self) -> str: """Returns the version of the current loader.""" module_name = self.__class__.__module__ or "" module_name_parts = module_name.split(".") # Iterate rootward through the package hierarchy until we find a parent of this # loader's module with a __version__ attribute. for prefix_size in range(len(module_name_parts), 0, -1): package_name = ".".join(module_name_parts[0:prefix_size]) module = sys.modules[package_name] if hasattr(module, "__version__"): return module.__version__ # type: ignore # If this loader's class has no parent package with a __version__, # it should implement it itself. raise NotImplementedError( f"Could not dynamically find the version of {self.get_loader_name()}." ) def get_metadata_fetcher(self) -> MetadataFetcher: """Returns a MetadataFetcher instance representing this package loader; which is used to for adding provenance information to extracted extrinsic metadata, if any.""" return MetadataFetcher( name=self.get_loader_name(), version=self.get_loader_version(), metadata={}, ) def get_metadata_authority(self) -> MetadataAuthority: """For package loaders that get extrinsic metadata, returns the authority the metadata are coming from. """ raise NotImplementedError("get_metadata_authority") def get_extrinsic_origin_metadata(self) -> List[RawExtrinsicMetadataCore]: """Returns metadata items, used by build_extrinsic_origin_metadata.""" return [] def build_extrinsic_origin_metadata(self) -> List[RawExtrinsicMetadata]: """Builds a list of full RawExtrinsicMetadata objects, using metadata returned by get_extrinsic_origin_metadata.""" metadata_items = self.get_extrinsic_origin_metadata() if not metadata_items: # If this package loader doesn't write metadata, no need to require # an implementation for get_metadata_authority. return [] authority = self.get_metadata_authority() fetcher = self.get_metadata_fetcher() metadata_objects = [] for item in metadata_items: metadata_objects.append( RawExtrinsicMetadata( target=Origin(self.url).swhid(), discovery_date=item.discovery_date or self.visit_date, authority=authority, fetcher=fetcher, format=item.format, metadata=item.metadata, ) ) return metadata_objects def get_extrinsic_snapshot_metadata(self) -> List[RawExtrinsicMetadataCore]: """Returns metadata items, used by build_extrinsic_snapshot_metadata.""" return [] def build_extrinsic_snapshot_metadata( self, snapshot_id: Sha1Git ) -> List[RawExtrinsicMetadata]: """Builds a list of full RawExtrinsicMetadata objects, using metadata returned by get_extrinsic_snapshot_metadata.""" metadata_items = self.get_extrinsic_snapshot_metadata() if not metadata_items: # If this package loader doesn't write metadata, no need to require # an implementation for get_metadata_authority. return [] authority = self.get_metadata_authority() fetcher = self.get_metadata_fetcher() metadata_objects = [] for item in metadata_items: metadata_objects.append( RawExtrinsicMetadata( target=ExtendedSWHID( object_type=ExtendedObjectType.SNAPSHOT, object_id=snapshot_id ), discovery_date=item.discovery_date or self.visit_date, authority=authority, fetcher=fetcher, format=item.format, metadata=item.metadata, origin=self.url, ) ) return metadata_objects def build_extrinsic_directory_metadata( self, p_info: TPackageInfo, revision_id: Sha1Git, directory_id: Sha1Git, ) -> List[RawExtrinsicMetadata]: if not p_info.directory_extrinsic_metadata: # If this package loader doesn't write metadata, no need to require # an implementation for get_metadata_authority. return [] authority = self.get_metadata_authority() fetcher = self.get_metadata_fetcher() metadata_objects = [] for item in p_info.directory_extrinsic_metadata: metadata_objects.append( RawExtrinsicMetadata( target=ExtendedSWHID( object_type=ExtendedObjectType.DIRECTORY, object_id=directory_id ), discovery_date=item.discovery_date or self.visit_date, authority=authority, fetcher=fetcher, format=item.format, metadata=item.metadata, origin=self.url, revision=CoreSWHID( object_type=ObjectType.REVISION, object_id=revision_id ), ) ) return metadata_objects def _load_extrinsic_directory_metadata( self, p_info: TPackageInfo, revision_id: Sha1Git, directory_id: Sha1Git, ) -> None: metadata_objects = self.build_extrinsic_directory_metadata( p_info, revision_id, directory_id ) self._load_metadata_objects(metadata_objects) def _load_metadata_objects( self, metadata_objects: List[RawExtrinsicMetadata] ) -> None: if not metadata_objects: # If this package loader doesn't write metadata, no need to require # an implementation for get_metadata_authority. return self._create_authorities(mo.authority for mo in metadata_objects) self._create_fetchers(mo.fetcher for mo in metadata_objects) self.storage.raw_extrinsic_metadata_add(metadata_objects) def _create_authorities(self, authorities: Iterable[MetadataAuthority]) -> None: deduplicated_authorities = { (authority.type, authority.url): authority for authority in authorities } if authorities: self.storage.metadata_authority_add(list(deduplicated_authorities.values())) def _create_fetchers(self, fetchers: Iterable[MetadataFetcher]) -> None: deduplicated_fetchers = { (fetcher.name, fetcher.version): fetcher for fetcher in fetchers } if fetchers: self.storage.metadata_fetcher_add(list(deduplicated_fetchers.values())) def _load_extids(self, extids: Set[ExtID]) -> None: if not extids: return try: self.storage.extid_add(list(extids)) except Exception as e: logger.exception("Failed to load new ExtIDs for %s", self.url) sentry_sdk.capture_exception(e) # No big deal, it just means the next visit will load the same versions # again.