diff --git a/docs/README.rst b/docs/README.rst index b164662..4e27671 100644 --- a/docs/README.rst +++ b/docs/README.rst @@ -1,32 +1,30 @@ -.. _swh-loader-core: - Software Heritage - Loader foundations ====================================== The Software Heritage Loader Core is a low-level loading utilities and helpers used by :term:`loaders `. The main entry points are classes: - :class:`swh.loader.core.loader.BaseLoader` for loaders (e.g. svn) - :class:`swh.loader.core.loader.DVCSLoader` for DVCS loaders (e.g. hg, git, ...) - :class:`swh.loader.package.loader.PackageLoader` for Package loaders (e.g. PyPI, Npm, ...) Package loaders --------------- This package also implements many package loaders directly, out of convenience, as they usually are quite similar and each fits in a single file. They all roughly follow these steps, explained in the :py:meth:`swh.loader.package.loader.PackageLoader.load` documentation. See the :ref:`package-loader-tutorial` for details. VCS loaders ----------- Unlike package loaders, VCS loaders remain in separate packages, as they often need more advanced conversions and very VCS-specific operations. This usually involves getting the branches of a repository and recursively loading revisions in the history (and directory trees in these revisions), until a known revision is found diff --git a/docs/index.rst b/docs/index.rst index 8c3db9c..c518590 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -1,18 +1,21 @@ +.. _swh-loader-core: + .. include:: README.rst .. toctree:: :maxdepth: 2 :caption: Contents: package-loader-tutorial vcs-loader-overview Reference Documentation ----------------------- .. toctree:: :maxdepth: 2 cli /apidoc/swh.loader.core + /apidoc/swh.loader.package diff --git a/docs/package-loader-tutorial.rst b/docs/package-loader-tutorial.rst index 6fed28e..6880e1e 100644 --- a/docs/package-loader-tutorial.rst +++ b/docs/package-loader-tutorial.rst @@ -1,650 +1,650 @@ .. _package-loader-tutorial: Package Loader Tutorial ======================= In this tutorial, we will see how to write a loader for |swh| that loads packages from a package manager, such as PyPI or Debian's. First, you should be familiar with Python, unit-testing, |swh|'s :ref:`data-model` and :ref:`architecture`, and go through the :ref:`developer-setup`. Creating the files hierarchy ---------------------------- Once this is done, you should create a new directory (ie. a (sub)package from Python's point of view) for you loader. It can be either a subdirectory of ``swh-loader-core/swh/loader/package/`` like the other package loaders, or it can be in its own package. If you choose the latter, you should also create the base file of any Python package (such as ``setup.py``), you should import them from the `swh-py-template`_ repository. In the rest of this tutorial, we will assume you chose the former and your loader is named "New Loader", so your package loader is in ``swh-loader-core/swh/loader/package/newloader/``. Next, you should create boilerplate files needed for SWH loaders: ``__init__.py``, ``tasks.py``, ``tests/__init__.py``, and ``tests/test_tasks.py``; copy them from an existing package, such as ``swh-loader-core/swh/loader/package/pypi/``, and replace the names in those with your loader's. Finally, create an `entrypoint`_ in ``setup.py``, so your loader can be discovered by the SWH Celery workers:: entry_points=""" [swh.workers] loader.newloader=swh.loader.package.newloader:register """, .. _swh-py-template: https://forge.softwareheritage.org/source/swh-py-template/ .. _entrypoint: https://setuptools.readthedocs.io/en/latest/userguide/entry_point.html Writing a minimal loader ------------------------ It is now time for the interesting part: writing the code to load packages from a package manager into the |swh| archive. Create a file named ``loader.py`` in your package's directory, with two empty classes (replace the names with what you think is relevant):: from typing import Optional import attr from swh.loader.package.loader import BasePackageInfo, PackageLoader from swh.model.model import Person, Revision, Sha1Git, TimestampWithTimezone @attr.s class NewPackageInfo(BasePackageInfo): pass class NewLoader(PackageLoader[NewPackageInfo]): visit_type = "newloader" We now have to fill some of the methods declared by -:ref:class:`swh.loader.package.PackageLoader`: in your new ``NewLoader`` class. +:class:`swh.loader.package.PackageLoader`: in your new ``NewLoader`` class. Listing versions ++++++++++++++++ ``get_versions`` should return the list of names of all versions of the origin defined at ``self.url`` by the default constructor; and ``get_default_version`` should return the name of the default version (usually the latest stable release). They are both implemented with an API call to the package repository. For example, for PyPI origin https://pypi.org/project/requests, this is done with a request to https://pypi.org/pypi/requests/json. Getting package information +++++++++++++++++++++++++++ Next, ``get_package_info`` takes as argument a version name (as returned by ``get_versions``) and yields ``(branch_name, p_info)`` tuples, where ``branch_name`` is a string and ``pkg_info`` is an instance of the ``NewPackageInfo`` class we defined earlier. Each of these tuples should match a single file the loader will download from the origin. Usually, there is only one file per versions, but this is not true for all package repositories (eg. CRAN and PyPI allow multiple version artifacts per version). As ``NewPackageInfo`` derives from :py:class:`swh.loader.package.BasePackageInfo`, it can be created like this:: return NewPackageInfo(url="https://...", filename="...-versionX.Y.tar.gz") The ``url`` must be a URL where to download the archive from. ``filename`` is optional, but it is nice to fill it when possible/relevant. The base ``PackageLoader`` will then take care of calling ``get_versions()`` to get all the versions, then call ``get_package_info()`` get the list of archives to download, download them, and load all the directories in the archive. This means you do not need to manage downloads yourself; and we are now done with interactions with the package repository. Building a revision +++++++++++++++++++ The final step for your minimal loader to work, is to implement ``build_revision``. This is a very important part, as it will create a revision object that will be inserted in |swh|, as a link between origins and the directories. This function takes three important arguments: * ``p_info`` is an object returned by ``get_package_info()`` * ``uncompressed_path`` is the location on the disk where the base ``PackageLoader`` extracted the archive, so you can access files from the archive. * ``directory`` is an :term:`intrinsic identifier` of the directory that was loaded from the archive The way to implement it depends very much on how the package manager works, but here is a rough idea:: def build_revision( self, p_info: NewPackageInfo, uncompressed_path: str, directory: Sha1Git ) -> Optional[Revision]: author = Person(name="Jane Doe", email="jdoe@example.org") date = TimestampWithTimezone.from_iso8601("2021-04-01T11:55:20Z") return Revision( type=RevisionType.TAR, message="This is a new release of the project", author=author, date=date, committer=author, committer_date=date, parents=(), directory=directory, synthetic=True, ) The strings here are placeholders, and you should extract them from either the extracted archive (using ``uncompressed_path``), or from the package repository's API. The various classes used in this example are :py:class:`swh.model.model.Person`, :py:class:`swh.model.model.TimestampWithTimezone`, and :py:class:`swh.model.model.Revision`. Note that you have access to the ``NewPackageInfo`` object created by ``get_package_info()``, so you can extend the ``NewPackageInfo`` class to pass data between these two functions. A few caveats: * Make sure the timezone matches the source's * ``Person`` can also be built with just a ``fullname``, if there aren't distinct fields for name and email. When in doubt, it's better to just write the ``fullname`` than try to parse it * ``author`` and ``committer`` (resp. ``date`` and ``committer_date``) may be different if the release was written and published by different people (resp. dates). This is only relevant when loading from VCS, so you can usually ignore it in you package loader. Running your loader +++++++++++++++++++ With Docker ^^^^^^^^^^^ We recommend you use our `Docker environment`_ to test your loader. In short, install Docker, ``cd`` to ``swh-environment/docker/``, then `edit docker-compose.override.yml`_ to insert your new loader in the Docker environment, something like this will do:: version: '2' services: swh-loader-core: volumes: - "$HOME/swh-environment/swh-loader-core:/src/swh-loader-core" Then start the Docker environment:: docker-compose start Then, you can run your loader:: docker-compose exec swh-loader swh loader run newloader "https://example.org/~jdoe/project/" where ``newloader`` is the name you registered as an entrypoint in ``setup.py`` and ``https://example.org/~jdoe/project/`` is the origin URL, that will be set as the ``self.url`` attribute of your loader. For example, to run the PyPI loader, the command would be:: docker-compose exec swh-loader swh loader run pypi "https://pypi.org/project/requests/" If you get this error, make sure you properly configured ``docker-compose.override.yml``:: Error: Invalid value for '[...]': invalid choice: newloader Without Docker ^^^^^^^^^^^^^^ If you do not want to use the Docker environment, you will need to start an :ref:`swh-storage` instance yourself, and create a config file that references it:: storage: cls: remote url: http://localhost:5002/ Or alternatively, this more efficient configuration:: storage: cls: pipeline steps: - cls: buffer min_batch_size: content: 10000 content_bytes: 104857600 directory: 1000 revision: 1000 - cls: filter - cls: remote url: http://localhost:5002/ And run your loader with:: swh loader -C loader.yml run newloader "https://example.org/~jdoe/project/" where ``newloader`` is the name you registered as an entrypoint in ``setup.py`` and ``https://example.org/~jdoe/project/`` is the origin URL, that will be set as the ``self.url`` attribute of your loader. For example, with PyPI:: swh loader -C loader.yml run pypi "https://pypi.org/project/requests/" .. _Docker environment: https://forge.softwareheritage.org/source/swh-environment/browse/master/docker/ .. _edit docker-compose.override.yml: https://forge.softwareheritage.org/source/swh-environment/browse/master/docker/#install-a-swh-package-from Testing your loader +++++++++++++++++++ You must write tests for your loader. First, of course, unit tests for the internal functions of your loader, if any (eg. the functions used to extract metadata); but this is not covered in this tutorial. Most importantly, you should write integration tests for your loader, that will simulate an origin, run the loader, and check everything is loaded in the storage as it should be. As we do not want tests to directly query an origin (it makes tests flaky, hard to reproduce, and put unnecessary load on the origin), we usually mock it using the :py:func:`swh.core.pytest_plugin.requests_mock_datadir` fixture It works by creating a ``data/`` folder in your tests (such as ``swh/loader/package/newloader/tests/data/``) and downloading results from API calls there, in the structured documented in :py:func:`swh.core.pytest_plugin.requests_mock_datadir_factory` The files in the ``datadir/`` will then be served whenever the loader tries to access an URL. This is very dependent on the kind of repositories your loader will read from, so here is an example with the PyPI loader. The files ``swh/loader/package/pypi/tests/data/https_pypi.org/pypi_nexter_json`` and ``swh/loader/package/pypi/tests/data/https_files.pythonhosted.org/nexter-*`` are used in this test:: from swh.loader.tests import assert_last_visit_matches, check_snapshot, get_stats def test_pypi_visit_1_release_with_2_artifacts(swh_storage, requests_mock_datadir): # Initialize the loader url = "https://pypi.org/project/nexter" loader = PyPILoader(swh_storage, url) # Run the loader, with a swh-storage instance, on the given URL. # HTTP calls will be mocked by the requests_mock_datadir fixture actual_load_status = loader.load() # Check the loader loaded exactly the snapshot we expected # (when writing your tests for the first time, you cannot know the # snapshot id without running your loader; so let it error and write # down the result here) expected_snapshot_id = hash_to_bytes("a27e638a4dad6fbfa273c6ebec1c4bf320fb84c6") assert actual_load_status == { "status": "eventful", "snapshot_id": expected_snapshot_id.hex(), } # Check the content of the snapshot. (ditto) expected_snapshot = Snapshot( id=expected_snapshot_id, branches={ b"releases/1.1.0/nexter-1.1.0.zip": SnapshotBranch( target=hash_to_bytes("4c99891f93b81450385777235a37b5e966dd1571"), target_type=TargetType.REVISION, ), b"releases/1.1.0/nexter-1.1.0.tar.gz": SnapshotBranch( target=hash_to_bytes("0bf88f5760cca7665d0af4d6575d9301134fe11a"), target_type=TargetType.REVISION, ), }, ) check_snapshot(expected_snapshot, swh_storage) # Check the visit was properly created with the right type assert_last_visit_matches( swh_storage, url, status="full", type="pypi", snapshot=expected_snapshot.id ) # Then you could check the directory structure: directory_id = swh_storage.revision_get( [hash_to_bytes("4c99891f93b81450385777235a37b5e966dd1571")] )[0].directory entries = list(swh_storage.directory_ls(directory_id, recursive=True)) assert entries == [ ... ] Here are some scenarios you should test, when relevant: * No versions * One version * Two or more versions * More than one package per version, if relevant * Corrupt packages (missing metadata, ...), if relevant * API errors * etc. Making your loader incremental ------------------------------ In the previous sections, you wrote a fully functional loader for a new type of package repository. This is great! Please tell us about it, and :ref:`submit it for review ` so we can give you some feedback early. Now, we will see a key optimization for any package loader: skipping packages it already downloaded, using :term:`extids `. The rough idea it to find some way to uniquely identify packages before downloading them and encode it in a short string, the ExtID. Using checksums +++++++++++++++ Ideally, this short string is a checksum of the archive, provided by the API before downloading the archive itself. This is ideal, because this ensures that we detect changes in the package's content even if it keeps the same name and version number. If this is not the case of the repository you want to load from, skip to the next subsection. This is used for example by the PyPI loader (with a sha256sum) and the NPM loader (with a sha1sum). The Debian loader uses a similar scheme: as a single package is assembled from a set of tarballs, it only uses the hash of the ``.dsc`` file, which itself contains a hash of all the tarballs. This is implemented by overriding the ``extid`` method of you ``NewPackageInfo`` class, that returns the type of the ExtID (see below) and the ExtID itself:: from swh.loader.package.loader import PartialExtID EXTID_TYPE: str = "pypi-archive-sha256" @attr.s class NewPackageInfo(BasePackageInfo): sha256: str def extid(self) -> PartialExtID: return (EXTID_TYPE, hash_to_bytes(self.sha256)) and the loader's ``get_package_info`` method sets the right value in the ``sha256`` attribute. Using a custom manifest +++++++++++++++++++++++ Unfortunaly, this does not work for all packages, as some package repositories do not provide a checksum of the archives via their API. If this is the case of the repository you want to load from, you need to find a way around it. It highly depends on the repository, so this tutorial cannot cover how to do it. We do however provide an easy option that should work in most cases: creating a "manifest" of the archive with some metadata in it, and hashing it. For example, when loading from the GNU FTP servers, we have access to some metadata, that is somewhat good enough to deduplicate. We write them all in a string and hash that string. It is done like this:: import string @attr.s class ArchivePackageInfo(BasePackageInfo): length = attr.ib(type=int) """Size of the archive file""" time = attr.ib(type=Union[str, datetime.datetime]) """Timestamp of the archive file on the server""" version = attr.ib(type=str) EXTID_FORMAT = "package-manifest-sha256" MANIFEST_FORMAT = string.Template("$time $length $version $url") The default implementation of :py:func:`swh.loader.package.loader.BasePackageInfo.extid` will read this template, substitute the variables based on the object's attributes, compute the hash of the result, and return it. Note that, as mentioned before, this is not perfect because a tarball may be replaced with a different tarball of exactly the same length and modification time, and we won't detect it. But this is extremely unlikely, so we consider it to be good enough. Alternatively, if this is not good enough for your loader, you can simply not implement ExtIDs, and your loader will always load all tarballs. This can be bandwidth-heavy for both |swh| and the origin you are loaded from, so this decision should not be taken lightly. Choosing the ExtID type +++++++++++++++++++++++ The type of your ExtID should be a short ASCII string, that is both unique to your loader and descriptive of how it was computed. Why unique to the loader? Because different loaders may load the same archive differently. For example, if I was to create an archive with both a ``PKG-INFO`` and a ``package.json`` file, and submit it to both NPM and PyPI, both package repositories would have exactly the same tarball. But the NPM loader would create the revision based on authorship info in ``package.json``, and the PyPI loader based on ``PKG-INFO``. But we do not want the PyPI loader to assume it already created a revision itself, while the revision was created by the NPM loader! And why descriptive? This is simply for future-proofing; in case your loader changes the format of the ExtID (eg. by using a different hash algorithm). Testing your incremental loading ++++++++++++++++++++++++++++++++ If you followed the steps above, your loader is now able to detect what packages it already downloaded and skip them. This is what we call an incremental loader. It is now time to write tests to make sure your loader fulfills this promise. This time, we want to use ``requests_mock_datadir_visits`` instead of ``requests_mock_datadir``, because we want to mock the repository's API to emulate its results changing over time (eg. because a new version was published between two runs of the loader). See the documentation of :py:func:`swh.core.pytest_plugin.requests_mock_datadir_factory` for a description of the file layout to use. Let's take, once again, a look at ``swh/loader/package/pypi/tests/test_pypi.py``, to use as an example:: def test_pypi_incremental_visit(swh_storage, requests_mock_datadir_visits): """With prior visit, 2nd load will result with a different snapshot """ # Initialize the loader url = "https://pypi.org/project/0805nexter" loader = PyPILoader(swh_storage, url) # First visit visit1_actual_load_status = loader.load() visit1_stats = get_stats(swh_storage) # Make sure everything is in order expected_snapshot_id = hash_to_bytes("ba6e158ada75d0b3cfb209ffdf6daa4ed34a227a") assert visit1_actual_load_status == { "status": "eventful", "snapshot_id": expected_snapshot_id.hex(), } assert_last_visit_matches( swh_storage, url, status="full", type="pypi", snapshot=expected_snapshot_id ) assert { "content": 6, "directory": 4, "origin": 1, "origin_visit": 1, "release": 0, "revision": 2, "skipped_content": 0, "snapshot": 1, } == visit1_stats # Reset internal state del loader._cached__raw_info del loader._cached_info # Second visit visit2_actual_load_status = loader.load() visit2_stats = get_stats(swh_storage) # Check the result of the visit assert visit2_actual_load_status["status"] == "eventful", visit2_actual_load_status expected_snapshot_id2 = hash_to_bytes("2e5149a7b0725d18231a37b342e9b7c4e121f283") assert visit2_actual_load_status == { "status": "eventful", "snapshot_id": expected_snapshot_id2.hex(), } assert_last_visit_matches( swh_storage, url, status="full", type="pypi", snapshot=expected_snapshot_id2 ) assert { "content": 6 + 1, # 1 more content "directory": 4 + 2, # 2 more directories "origin": 1, "origin_visit": 1 + 1, "release": 0, "revision": 2 + 1, # 1 more revision "skipped_content": 0, "snapshot": 1 + 1, # 1 more snapshot } == visit2_stats # Check all content objects were loaded expected_contents = map( hash_to_bytes, [ "a61e24cdfdab3bb7817f6be85d37a3e666b34566", "938c33483285fd8ad57f15497f538320df82aeb8", "a27576d60e08c94a05006d2e6d540c0fdb5f38c8", "405859113963cb7a797642b45f171d6360425d16", "e5686aa568fdb1d19d7f1329267082fe40482d31", "83ecf6ec1114fd260ca7a833a2d165e71258c338", "92689fa2b7fb4d4fc6fb195bf73a50c87c030639", ], ) assert list(swh_storage.content_missing_per_sha1(expected_contents)) == [] # Check all directory objects were loaded expected_dirs = map( hash_to_bytes, [ "05219ba38bc542d4345d5638af1ed56c7d43ca7d", "cf019eb456cf6f78d8c4674596f1c9a97ece8f44", "b178b66bd22383d5f16f4f5c923d39ca798861b4", "c3a58f8b57433a4b56caaa5033ae2e0931405338", "e226e7e4ad03b4fc1403d69a18ebdd6f2edd2b3a", "52604d46843b898f5a43208045d09fcf8731631b", ], ) assert list(swh_storage.directory_missing(expected_dirs)) == [] # etc. Loading metadata ---------------- Finally, an optional step: collecting and loading :term:`extrinsic metadata`. This is metadata that your loader may collect while loading an origin. For example, the PyPI loader collects some parts of the API response (eg. https://pypi.org/pypi/requests/json) They are stored as raw bytestring, along with a format (an ASCII string) and a date of discovery (usually the time your loader ran). This is done by adding them to the ``directory_extrinsic_metadata`` attribute of your ``NewPackageInfo`` object when creating it in ``get_package_info`` -as :py:cls:`swh.loader.package.loader.RawExtrinsicMetadataCore` objects:: +as :class:`swh.loader.package.loader.RawExtrinsicMetadataCore` objects:: NewPackageInfo( ..., directory_extrinsic_metadata=[ RawExtrinsicMetadataCore( format="new-format", metadata=b"foo bar baz", discovery_date=datetime.datetime(...), ) ] ) ``format`` should be a human-readable ASCII string that unambiguously describes the format. Readers of the metadata object will have a built-in list of formats they understand, and will check if your metadata object is among them. -You should use one of the :ref:`known metadata formats ` +You should use one of the :ref:`known metadata formats ` if possible, or add yours to this list. ``metadata`` is the metadata object itself. When possible, it should be copied verbatim from the source object you got, and should not be created by the loader. If this is not possible, for example because it is extracted from a larger JSON or XML document, make sure you do as little modifications as possible to reduce the risks of corruption. ``discovery_date`` is optional, and defaults to the time your loader started working. In theory, you can write extrinsic metadata on any kind of objects, eg. by implementing :py:meth:`swh.loader.package.loader.PackageLoader.get_extrinsic_origin_metadata`, :py:meth:`swh.loader.package.loader.PackageLoader.get_extrinsic_snapshot_metadata`; but this is rarely relevant in practice. Be sure to check if loader can find any potentially interesting metadata, though! Final words ----------- Congratulations, you made it to the end. If you have not already, please `contact us`_ to tell us about your new loader, and :ref:`submit your loader for review ` on our forge so we can merge it and run it along our other loaders to archive more repositories. And if you have any change in mind to improve this tutorial for future readers, please submit them too. Thank you for your contributions! .. _contact us: https://www.softwareheritage.org/community/developers/ diff --git a/docs/vcs-loader-overview.rst b/docs/vcs-loader-overview.rst index b94ca57..69abb0c 100644 --- a/docs/vcs-loader-overview.rst +++ b/docs/vcs-loader-overview.rst @@ -1,144 +1,144 @@ .. _vcs-loader-tutorial: VCS Loader Overview =================== In this overview, we will see how to write a loader for |swh| that loads :term:`artifacts ` from a Version Control System, such as Git, Mercurial, or Subversion First, you should be familiar with Python, unit-testing, |swh|'s :ref:`data-model` and :ref:`architecture`, and go through the :ref:`developer-setup`. As seen in the :ref:`swh-loader-core homepage `, SWH loaders can be sorted into two large categories: Package Loaders and VCS loaders. This page is an overview of how to write a VCS loader. This is not a tutorial, because VCS loaders are hooked deeply into their respective VCS' internals; unlike :ref:`Package Loaders ` which are somewhat uniform (list tarballs, download tarballs, load content of tarball, done). Architecture ------------ A loader is a Python package, usually a subpackage of ``swh.loader`` but in its own directory (eg. ``swh-loader-git/swh/loader/git``, as ``swh.loader`` is a :pep:`namespace package <420>`), based on the `swh-py-template`_ repository. It has at least one `entrypoint`_, declared in ``setup.py`` to be recognized by ``swh-loader-core``:: entry_points=""" [swh.workers] loader.newloader=swh.loader.newloader:register """, This entrypoint declares the task name (to be run by SWH Celery workers) and the loader class. For example, for the Subversion loader:: from typing import Any, Dict def register() -> Dict[str, Any]: from swh.loader.svn.loader import SvnLoader return { "task_modules": ["%s.tasks" % __name__], "loader": SvnLoader, } The bulk of the work is done by the returned ``loader`` class: it loads artifacts from the upstream VCS and writes them to the |swh| archive. Because of the heterogeneity of VCS loaders, it has a lot of freedom in how to achieve this. Once the initial setup is done (see the next section), its ``load`` method is called, and it is expected to do all this work as a black box. .. _swh-py-template: https://forge.softwareheritage.org/source/swh-py-template/ .. _entrypoint: https://setuptools.readthedocs.io/en/latest/userguide/entry_point.html Base classes ------------ All loaders inherit from :class:`swh.loader.core.loader.BaseLoader`, which takes care of all the SWH-specific setup and finalization: * Reading the configuration * Connecting to the :term:`storage database` * Storing :term:`origin` and :term:`visit` objects It also provides a default implementation of the ``load`` method, which takes care of: * calling its ``fetch_data`` (from the VCS) and ``store_data`` (to SWH) in a loop * on error, notifies swh-storage the loading failed, reports the error to the monitoring infrastructure (Sentry), and cleanup * on success, cleanup and notify swh-storage the loading succeeded See :meth:`its documentation ` for details. Distributed VCS loaders will usually want to inherit from its child, :class:`swh.loader.core.DVCSLoader`, which takes care of implementing ``store_data``. Classes inheriting from ``DVCSLoader`` only need to implement ``fetch_data``, and a method for each object type: ``get_contents``, ``get_directories``, ``get_revisions``, ``get_releases``, and ``get_snapshot``, each returning an iterable of the corresponding object from :mod:`swh.model.model` (except ``get_snapshot``, which returns a single one). If you are writing a DVCS loader, this allows your loader to fetch all the objects locally, then return them lazily on demand. Incremental loading ------------------- +------------------- Loading a repository from scratch can be costly, so ``swh-storage`` provides ways to remember what objects in the repository were already loaded, through :term:`extids `. They are represented by :class:`swh.model.model.ExtID`, which is essentially a 3-tuple that contains a SWHID, an id internal to the VCS type, (which is the actual "extid" itself), and the type of this id (eg. ``hg-nodeid``). When your loader is done loading, it can store extids for some of its objects (eg. the heads/tips of each branch of the :term:`snapshot` and some intermediate revisions in the history), with :meth:`swh.storage.interface.StorageInterface.extid_add`. And when it starts loading a known repository, fetches the previous snapshot using :func:`swh.storage.algos.snapshot.snapshot_get_latest`, then the extids it stores using :meth:`swh.storage.interface.StorageInterface.extid_get_from_target` for each of the branch targets. This way, it can find which objects from the origin were already loaded, without having to download them first. .. note:: For legacy reasons, the Subversion loader uses an alternative to ExtID, which is to encode the repository UUID and the revision ID (an incremental integer) directly in :attr:`swh.model.model.Revision.extra_headers`. This is discouraged because it prevents deduplication across repositories, and ``extra_headers`` does not have a well-defined schema. Integrity --------- Loaders may be interrupted at any point, for various reasons (unhandled crash, out of memory, hardware failure, blocking IO, system or daemon restart, etc.) Therefore, they must take great care that if a load was interrupted, the next load will finish loading all objects. If they don't, this may happen: 1. loader loads revision ``R``, pointing to directory ``D`` 2. loader starts loading ``D``, but crashes before it does 3. [loader restarts] 4. loader sees ``R`` is already loaded, so it doesn't load its children And ``D`` will never be loaded. The solution to this is to load objects in topological order of the DAG. Another reason to load objects in topological order is that it avoid having "holes" in the graph (aka. dangling references), even temporarily. Holes in the graph cause bad user experiences, when users click a link from an existing object and get a "not found" error. diff --git a/swh/loader/core/loader.py b/swh/loader/core/loader.py index ba503de..4763817 100644 --- a/swh/loader/core/loader.py +++ b/swh/loader/core/loader.py @@ -1,463 +1,468 @@ # Copyright (C) 2015-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import hashlib import logging import os from typing import Any, Dict, Iterable, Optional from swh.core.config import load_from_envvar from swh.loader.exception import NotFound from swh.model.model import ( BaseContent, Content, Directory, Origin, OriginVisit, OriginVisitStatus, Release, Revision, Sha1Git, SkippedContent, Snapshot, ) from swh.storage import get_storage from swh.storage.interface import StorageInterface from swh.storage.utils import now DEFAULT_CONFIG: Dict[str, Any] = { "max_content_size": 100 * 1024 * 1024, } class BaseLoader: """Base class for (D)VCS loaders (e.g Svn, Git, Mercurial, ...) or PackageLoader (e.g PyPI, Npm, CRAN, ...) A loader retrieves origin information (git/mercurial/svn repositories, pypi/npm/... package artifacts), ingests the contents/directories/revisions/releases/snapshot read from those artifacts and send them to the archive through the storage backend. The main entry point for the loader is the :func:`load` function. 2 static methods (:func:`from_config`, :func:`from_configfile`) centralizes and eases the loader instantiation from either configuration dict or configuration file. Some class examples: - :class:`SvnLoader` - :class:`GitLoader` - :class:`PyPILoader` - :class:`NpmLoader` """ + visit_date: Optional[datetime.datetime] + origin: Optional[Origin] + origin_metadata: Dict[str, Any] + loaded_snapshot_id: Optional[Sha1Git] + def __init__( self, storage: StorageInterface, logging_class: Optional[str] = None, save_data_path: Optional[str] = None, max_content_size: Optional[int] = None, ): super().__init__() self.storage = storage self.max_content_size = int(max_content_size) if max_content_size else None if logging_class is None: logging_class = "%s.%s" % ( self.__class__.__module__, self.__class__.__name__, ) self.log = logging.getLogger(logging_class) _log = logging.getLogger("requests.packages.urllib3.connectionpool") _log.setLevel(logging.WARN) # possibly overridden in self.prepare method - self.visit_date: Optional[datetime.datetime] = None - self.origin: Optional[Origin] = None + self.visit_date = None + self.origin = None if not hasattr(self, "visit_type"): self.visit_type: Optional[str] = None - self.origin_metadata: Dict[str, Any] = {} - self.loaded_snapshot_id: Optional[Sha1Git] = None + self.origin_metadata = {} + self.loaded_snapshot_id = None if save_data_path: path = save_data_path os.stat(path) if not os.access(path, os.R_OK | os.W_OK): raise PermissionError("Permission denied: %r" % path) self.save_data_path = save_data_path @classmethod def from_config(cls, storage: Dict[str, Any], **config: Any): """Instantiate a loader from a configuration dict. This is basically a backwards-compatibility shim for the CLI. Args: storage: instantiation config for the storage config: the configuration dict for the loader, with the following keys: - credentials (optional): credentials list for the scheduler - any other kwargs passed to the loader. Returns: the instantiated loader """ # Drop the legacy config keys which aren't used for this generation of loader. for legacy_key in ("storage", "celery"): config.pop(legacy_key, None) # Instantiate the storage storage_instance = get_storage(**storage) return cls(storage=storage_instance, **config) @classmethod def from_configfile(cls, **kwargs: Any): """Instantiate a loader from the configuration loaded from the SWH_CONFIG_FILENAME envvar, with potential extra keyword arguments if their value is not None. Args: kwargs: kwargs passed to the loader instantiation """ config = dict(load_from_envvar(DEFAULT_CONFIG)) config.update({k: v for k, v in kwargs.items() if v is not None}) return cls.from_config(**config) def save_data(self) -> None: """Save the data associated to the current load""" raise NotImplementedError def get_save_data_path(self) -> str: """The path to which we archive the loader's raw data""" if not hasattr(self, "__save_data_path"): year = str(self.visit_date.year) # type: ignore assert self.origin url = self.origin.url.encode("utf-8") origin_url_hash = hashlib.sha1(url).hexdigest() path = "%s/sha1:%s/%s/%s" % ( self.save_data_path, origin_url_hash[0:2], origin_url_hash, year, ) os.makedirs(path, exist_ok=True) self.__save_data_path = path return self.__save_data_path def flush(self) -> None: """Flush any potential buffered data not sent to swh-storage. """ self.storage.flush() def cleanup(self) -> None: """Last step executed by the loader. """ raise NotImplementedError def prepare_origin_visit(self) -> None: """First step executed by the loader to prepare origin and visit references. Set/update self.origin, and optionally self.origin_url, self.visit_date. """ raise NotImplementedError def _store_origin_visit(self) -> None: """Store origin and visit references. Sets the self.visit references. """ assert self.origin self.storage.origin_add([self.origin]) if not self.visit_date: # now as default visit_date if not provided self.visit_date = datetime.datetime.now(tz=datetime.timezone.utc) assert isinstance(self.visit_date, datetime.datetime) assert isinstance(self.visit_type, str) self.visit = list( self.storage.origin_visit_add( [ OriginVisit( origin=self.origin.url, date=self.visit_date, type=self.visit_type, ) ] ) )[0] def prepare(self) -> None: """Second step executed by the loader to prepare some state needed by the loader. Raises NotFound exception if the origin to ingest is not found. """ raise NotImplementedError def get_origin(self) -> Origin: """Get the origin that is currently being loaded. self.origin should be set in :func:`prepare_origin` Returns: dict: an origin ready to be sent to storage by :func:`origin_add`. """ assert self.origin return self.origin def fetch_data(self) -> bool: """Fetch the data from the source the loader is currently loading (ex: git/hg/svn/... repository). Returns: a value that is interpreted as a boolean. If True, fetch_data needs to be called again to complete loading. """ raise NotImplementedError def store_data(self): """Store fetched data in the database. Should call the :func:`maybe_load_xyz` methods, which handle the bundles sent to storage, rather than send directly. """ raise NotImplementedError def store_metadata(self) -> None: """Store fetched metadata in the database. For more information, see implementation in :class:`DepositLoader`. """ pass def load_status(self) -> Dict[str, str]: """Detailed loading status. Defaults to logging an eventful load. Returns: a dictionary that is eventually passed back as the task's result to the scheduler, allowing tuning of the task recurrence mechanism. """ return { "status": "eventful", } def post_load(self, success: bool = True) -> None: """Permit the loader to do some additional actions according to status after the loading is done. The flag success indicates the loading's status. Defaults to doing nothing. This is up to the implementer of this method to make sure this does not break. Args: success (bool): the success status of the loading """ pass def visit_status(self) -> str: """Detailed visit status. Defaults to logging a full visit. """ return "full" def pre_cleanup(self) -> None: """As a first step, will try and check for dangling data to cleanup. This should do its best to avoid raising issues. """ pass def load(self) -> Dict[str, str]: r"""Loading logic for the loader to follow: - 1. Call :meth:`prepare_origin_visit` to prepare the origin and visit we will associate loading data to - 2. Store the actual ``origin_visit`` to storage - 3. Call :meth:`prepare` to prepare any eventual state - 4. Call :meth:`get_origin` to get the origin we work with and store - while True: - 5. Call :meth:`fetch_data` to fetch the data to store - 6. Call :meth:`store_data` to store the data - 7. Call :meth:`cleanup` to clean up any eventual state put in place in :meth:`prepare` method. """ try: self.pre_cleanup() except Exception: msg = "Cleaning up dangling data failed! Continue loading." self.log.warning(msg) self.prepare_origin_visit() self._store_origin_visit() assert ( self.origin ), "The method `prepare_origin_visit` call should set the origin (Origin)" assert ( self.visit.visit ), "The method `_store_origin_visit` should set the visit (OriginVisit)" self.log.info( "Load origin '%s' with type '%s'", self.origin.url, self.visit.type ) try: self.prepare() while True: more_data_to_fetch = self.fetch_data() self.store_data() if not more_data_to_fetch: break self.store_metadata() visit_status = OriginVisitStatus( origin=self.origin.url, visit=self.visit.visit, type=self.visit_type, date=now(), status=self.visit_status(), snapshot=self.loaded_snapshot_id, ) self.storage.origin_visit_status_add([visit_status]) self.post_load() except Exception as e: if isinstance(e, NotFound): status = "not_found" task_status = "uneventful" else: status = "partial" if self.loaded_snapshot_id else "failed" task_status = "failed" self.log.exception( "Loading failure, updating to `%s` status", status, extra={ "swh_task_args": [], "swh_task_kwargs": {"origin": self.origin.url}, }, ) visit_status = OriginVisitStatus( origin=self.origin.url, visit=self.visit.visit, type=self.visit_type, date=now(), status=status, snapshot=self.loaded_snapshot_id, ) self.storage.origin_visit_status_add([visit_status]) self.post_load(success=False) return {"status": task_status} finally: self.flush() self.cleanup() return self.load_status() class DVCSLoader(BaseLoader): """This base class is a pattern for dvcs loaders (e.g. git, mercurial). Those loaders are able to load all the data in one go. For example, the loader defined in swh-loader-git :class:`BulkUpdater`. For other loaders (stateful one, (e.g :class:`SWHSvnLoader`), inherit directly from :class:`BaseLoader`. """ def cleanup(self) -> None: """Clean up an eventual state installed for computations.""" pass def has_contents(self) -> bool: """Checks whether we need to load contents""" return True def get_contents(self) -> Iterable[BaseContent]: """Get the contents that need to be loaded""" raise NotImplementedError def has_directories(self) -> bool: """Checks whether we need to load directories""" return True def get_directories(self) -> Iterable[Directory]: """Get the directories that need to be loaded""" raise NotImplementedError def has_revisions(self) -> bool: """Checks whether we need to load revisions""" return True def get_revisions(self) -> Iterable[Revision]: """Get the revisions that need to be loaded""" raise NotImplementedError def has_releases(self) -> bool: """Checks whether we need to load releases""" return True def get_releases(self) -> Iterable[Release]: """Get the releases that need to be loaded""" raise NotImplementedError def get_snapshot(self) -> Snapshot: """Get the snapshot that needs to be loaded""" raise NotImplementedError def eventful(self) -> bool: """Whether the load was eventful""" raise NotImplementedError def store_data(self) -> None: assert self.origin if self.save_data_path: self.save_data() if self.has_contents(): for obj in self.get_contents(): if isinstance(obj, Content): self.storage.content_add([obj]) elif isinstance(obj, SkippedContent): self.storage.skipped_content_add([obj]) else: raise TypeError(f"Unexpected content type: {obj}") if self.has_directories(): for directory in self.get_directories(): self.storage.directory_add([directory]) if self.has_revisions(): for revision in self.get_revisions(): self.storage.revision_add([revision]) if self.has_releases(): for release in self.get_releases(): self.storage.release_add([release]) snapshot = self.get_snapshot() self.storage.snapshot_add([snapshot]) self.flush() self.loaded_snapshot_id = snapshot.id diff --git a/swh/loader/core/utils.py b/swh/loader/core/utils.py index 602c22c..89684f2 100644 --- a/swh/loader/core/utils.py +++ b/swh/loader/core/utils.py @@ -1,48 +1,48 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import shutil import psutil def clean_dangling_folders(dirpath, pattern_check, log=None): """Clean up potential dangling temporary working folder rooted at `dirpath`. Those folders must match a dedicated pattern and not belonging to a live pid. Args: dirpath (str): Path to check for dangling files pattern_check (str): A dedicated pattern to check on first - level directory (e.g `swh.loader.mercurial.`, - `swh.loader.svn.`) - log (Logger): Optional logger + level directory (e.g `swh.loader.mercurial.`, + `swh.loader.svn.`) + log (Logger): Optional logger """ if not os.path.exists(dirpath): return for filename in os.listdir(dirpath): path_to_cleanup = os.path.join(dirpath, filename) try: # pattern: `swh.loader.svn-pid.{noise}` if ( pattern_check not in filename or "-" not in filename ): # silently ignore unknown patterns continue _, pid = filename.split("-") pid = int(pid.split(".")[0]) if psutil.pid_exists(pid): if log: log.debug("PID %s is live, skipping" % pid) continue # could be removed concurrently, so check before removal if os.path.exists(path_to_cleanup): shutil.rmtree(path_to_cleanup) except Exception as e: if log: msg = "Fail to clean dangling path %s: %s" % (path_to_cleanup, e) log.warn(msg) diff --git a/swh/loader/package/loader.py b/swh/loader/package/loader.py index 5656e3c..72469a9 100644 --- a/swh/loader/package/loader.py +++ b/swh/loader/package/loader.py @@ -1,977 +1,977 @@ # Copyright (C) 2019-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import hashlib from itertools import islice import json import logging import os import string import sys import tempfile from typing import ( Any, Dict, Generic, Iterable, Iterator, List, Mapping, Optional, Sequence, Set, Tuple, TypeVar, ) import attr import sentry_sdk from swh.core.tarball import uncompress from swh.loader.core.loader import BaseLoader from swh.loader.exception import NotFound from swh.loader.package.utils import download from swh.model import from_disk from swh.model.collections import ImmutableDict from swh.model.hashutil import hash_to_hex from swh.model.identifiers import ( CoreSWHID, ExtendedObjectType, ExtendedSWHID, ObjectType, ) from swh.model.model import ( ExtID, MetadataAuthority, MetadataAuthorityType, MetadataFetcher, Origin, OriginVisit, OriginVisitStatus, RawExtrinsicMetadata, Revision, Sha1Git, Snapshot, TargetType, ) from swh.storage.algos.snapshot import snapshot_get_latest from swh.storage.interface import StorageInterface from swh.storage.utils import now logger = logging.getLogger(__name__) SWH_METADATA_AUTHORITY = MetadataAuthority( type=MetadataAuthorityType.REGISTRY, url="https://softwareheritage.org/", metadata={}, ) """Metadata authority for extrinsic metadata generated by Software Heritage. Used for metadata on "original artifacts", ie. length, filename, and checksums of downloaded archive files.""" PartialExtID = Tuple[str, bytes] -"""The ``extid_type`` and ``extid`` fields of an :ref:py:`ExtID` object.""" +"""The ``extid_type`` and ``extid`` fields of an :class:`ExtID` object.""" @attr.s class RawExtrinsicMetadataCore: """Contains the core of the metadata extracted by a loader, that will be used to build a full RawExtrinsicMetadata object by adding object identifier, context, and provenance information.""" format = attr.ib(type=str) metadata = attr.ib(type=bytes) discovery_date = attr.ib(type=Optional[datetime.datetime], default=None) """Defaults to the visit date.""" @attr.s class BasePackageInfo: """Compute the primary key for a dict using the id_keys as primary key composite. Args: d: A dict entry to compute the primary key on id_keys: Sequence of keys to use as primary key Returns: The identity for that dict entry """ url = attr.ib(type=str) filename = attr.ib(type=Optional[str]) MANIFEST_FORMAT: Optional[string.Template] = None """If not None, used by the default extid() implementation to format a manifest, before hashing it to produce an ExtID.""" EXTID_TYPE: str = "package-manifest-sha256" # The following attribute has kw_only=True in order to allow subclasses # to add attributes. Without kw_only, attributes without default values cannot # go after attributes with default values. # See directory_extrinsic_metadata = attr.ib( type=List[RawExtrinsicMetadataCore], default=[], kw_only=True, ) """:term:`extrinsic metadata` collected by the loader, that will be attached to the loaded directory and added to the Metadata storage.""" # TODO: add support for metadata for revisions and contents def extid(self) -> Optional[PartialExtID]: """Returns a unique intrinsic identifier of this package info, or None if this package info is not 'deduplicatable' (meaning that we will always load it, instead of checking the ExtID storage to see if we already did)""" if self.MANIFEST_FORMAT is None: return None else: manifest = self.MANIFEST_FORMAT.substitute( {k: str(v) for (k, v) in attr.asdict(self).items()} ) return (self.EXTID_TYPE, hashlib.sha256(manifest.encode()).digest()) TPackageInfo = TypeVar("TPackageInfo", bound=BasePackageInfo) class PackageLoader(BaseLoader, Generic[TPackageInfo]): # Origin visit type (str) set by the loader visit_type = "" visit_date: datetime.datetime def __init__( self, storage: StorageInterface, url: str, max_content_size: Optional[int] = None, ): """Loader's constructor. This raises exception if the minimal required configuration is missing (cf. fn:`check` method). Args: storage: Storage instance url: Origin url to load data from """ super().__init__(storage=storage, max_content_size=max_content_size) self.url = url self.visit_date = datetime.datetime.now(tz=datetime.timezone.utc) def get_versions(self) -> Sequence[str]: """Return the list of all published package versions. Raises: - `class:swh.loader.exception.NotFound` error when failing to read the + class:`swh.loader.exception.NotFound` error when failing to read the published package versions. Returns: Sequence of published versions """ return [] def get_package_info(self, version: str) -> Iterator[Tuple[str, TPackageInfo]]: """Given a release version of a package, retrieve the associated package information for such version. Args: version: Package version Returns: (branch name, package metadata) """ yield from {} def build_revision( self, p_info: TPackageInfo, uncompressed_path: str, directory: Sha1Git ) -> Optional[Revision]: """Build the revision from the archive metadata (extrinsic artifact metadata) and the intrinsic metadata. Args: p_info: Package information uncompressed_path: Artifact uncompressed path on disk Returns: Revision object """ raise NotImplementedError("build_revision") def get_default_version(self) -> str: """Retrieve the latest release version if any. Returns: Latest version """ return "" def last_snapshot(self) -> Optional[Snapshot]: """Retrieve the last snapshot out of the last visit. """ return snapshot_get_latest(self.storage, self.url) def known_artifacts( self, snapshot: Optional[Snapshot] ) -> Dict[Sha1Git, Optional[ImmutableDict[str, object]]]: """Retrieve the known releases/artifact for the origin. Args snapshot: snapshot for the visit Returns: Dict of keys revision id (bytes), values a metadata Dict. """ if not snapshot: return {} # retrieve only revisions (e.g the alias we do not want here) revs = [ rev.target for rev in snapshot.branches.values() if rev and rev.target_type == TargetType.REVISION ] known_revisions = self.storage.revision_get(revs) return { revision.id: revision.metadata for revision in known_revisions if revision } def new_packageinfo_to_extid(self, p_info: TPackageInfo) -> Optional[PartialExtID]: return p_info.extid() def _get_known_extids( self, packages_info: List[TPackageInfo] ) -> Dict[PartialExtID, List[CoreSWHID]]: """Compute the ExtIDs from new PackageInfo objects, searches which are already loaded in the archive, and returns them if any.""" # Compute the ExtIDs of all the new packages, grouped by extid type new_extids: Dict[str, List[bytes]] = {} for p_info in packages_info: res = p_info.extid() if res is not None: (extid_type, extid_extid) = res new_extids.setdefault(extid_type, []).append(extid_extid) # For each extid type, call extid_get_from_extid() with all the extids of # that type, and store them in the '(type, extid) -> target' map. known_extids: Dict[PartialExtID, List[CoreSWHID]] = {} for (extid_type, extids) in new_extids.items(): for extid in self.storage.extid_get_from_extid(extid_type, extids): if extid is not None: key = (extid.extid_type, extid.extid) known_extids.setdefault(key, []).append(extid.target) return known_extids def resolve_revision_from_extids( self, known_extids: Dict[PartialExtID, List[CoreSWHID]], p_info: TPackageInfo, revision_whitelist: Set[Sha1Git], ) -> Optional[Sha1Git]: """Resolve the revision from known ExtIDs and a package info object. If the artifact has already been downloaded, this will return the existing revision targeting that uncompressed artifact directory. Otherwise, this returns None. Args: known_extids: Dict built from a list of ExtID, with the target as value p_info: Package information revision_whitelist: Any ExtID with target not in this set is filtered out Returns: None or revision identifier """ new_extid = p_info.extid() if new_extid is None: return None for extid_target in known_extids.get(new_extid, []): if extid_target.object_id not in revision_whitelist: # There is a known ExtID for this package, but its target is not # in the snapshot. # This can happen for three reasons: # # 1. a loader crashed after writing the ExtID, but before writing # the snapshot # 2. some other loader loaded the same artifact, but produced # a different revision, causing an additional ExtID object # to be written. We will probably find this loader's ExtID # in a future iteration of this loop. # Note that for now, this is impossible, as each loader has a # completely different extid_type, but this is an implementation # detail of each loader. # 3. we took a snapshot, then the package disappeared, # then we took another snapshot, and the package reappeared # # In case of 1, we must actually load the package now, # so let's do it. # TODO: detect when we are in case 3 using revision_missing instead # of the snapshot. continue elif extid_target.object_type != ObjectType.REVISION: # We only support revisions for now. # Note that this case should never be reached unless there is a # collision between a revision hash and some non-revision object's # hash, but better safe than sorry. logger.warning( "%s is in the revision whitelist, but is not a revision.", hash_to_hex(extid_target.object_type), ) continue return extid_target.object_id return None def download_package( self, p_info: TPackageInfo, tmpdir: str ) -> List[Tuple[str, Mapping]]: """Download artifacts for a specific package. All downloads happen in in the tmpdir folder. Default implementation expects the artifacts package info to be about one artifact per package. Note that most implementation have 1 artifact per package. But some implementation have multiple artifacts per package (debian), some have none, the package is the artifact (gnu). Args: artifacts_package_info: Information on the package artifacts to download (url, filename, etc...) tmpdir: Location to retrieve such artifacts Returns: List of (path, computed hashes) """ return [download(p_info.url, dest=tmpdir, filename=p_info.filename)] def uncompress( self, dl_artifacts: List[Tuple[str, Mapping[str, Any]]], dest: str ) -> str: """Uncompress the artifact(s) in the destination folder dest. Optionally, this could need to use the p_info dict for some more information (debian). """ uncompressed_path = os.path.join(dest, "src") for a_path, _ in dl_artifacts: uncompress(a_path, dest=uncompressed_path) return uncompressed_path def extra_branches(self) -> Dict[bytes, Mapping[str, Any]]: """Return an extra dict of branches that are used to update the set of branches. """ return {} def finalize_visit( self, *, snapshot: Optional[Snapshot], visit: OriginVisit, status_visit: str, status_load: str, failed_branches: List[str], ) -> Dict[str, Any]: """Finalize the visit: - flush eventual unflushed data to storage - update origin visit's status - return the task's status """ self.storage.flush() snapshot_id: Optional[bytes] = None if snapshot and snapshot.id: # to prevent the snapshot.id to b"" snapshot_id = snapshot.id assert visit.visit visit_status = OriginVisitStatus( origin=self.url, visit=visit.visit, type=self.visit_type, date=now(), status=status_visit, snapshot=snapshot_id, ) self.storage.origin_visit_status_add([visit_status]) result: Dict[str, Any] = { "status": status_load, } if snapshot_id: result["snapshot_id"] = hash_to_hex(snapshot_id) if failed_branches: logger.warning("%d failed branches", len(failed_branches)) for i, urls in enumerate(islice(failed_branches, 50)): prefix_url = "Failed branches: " if i == 0 else "" logger.warning("%s%s", prefix_url, urls) return result def load(self) -> Dict: """Load for a specific origin the associated contents. 1. Get the list of versions in an origin. 2. Get the snapshot from the previous run of the loader, and filter out versions that were already loaded, if their :term:`extids ` match Then, for each remaining version in the origin 3. Fetch the files for one package version By default, this can be implemented as a simple HTTP request. Loaders with more specific requirements can override this, e.g.: the PyPI loader checks the integrity of the downloaded files; the Debian loader has to download and check several files for one package version. 4. Extract the downloaded files. By default, this would be a universal archive/tarball extraction. Loaders for specific formats can override this method (for instance, the Debian loader uses dpkg-source -x). 5. Convert the extracted directory to a set of Software Heritage objects Using swh.model.from_disk. 6. Extract the metadata from the unpacked directories This would only be applicable for "smart" loaders like npm (parsing the package.json), PyPI (parsing the PKG-INFO file) or Debian (parsing debian/changelog and debian/control). On "minimal-metadata" sources such as the GNU archive, the lister should provide the minimal set of metadata needed to populate the revision/release objects (authors, dates) as an argument to the task. 7. Generate the revision/release objects for the given version. From the data generated at steps 3 and 4. end for each 8. Generate and load the snapshot for the visit Using the revisions/releases collected at step 7., and the branch information from step 2., generate a snapshot and load it into the Software Heritage archive """ status_load = "uneventful" # either: eventful, uneventful, failed status_visit = "full" # see swh.model.model.OriginVisitStatus snapshot = None failed_branches: List[str] = [] # Prepare origin and origin_visit origin = Origin(url=self.url) try: self.storage.origin_add([origin]) visit = list( self.storage.origin_visit_add( [ OriginVisit( origin=self.url, date=self.visit_date, type=self.visit_type, ) ] ) )[0] except Exception as e: logger.exception("Failed to initialize origin_visit for %s", self.url) sentry_sdk.capture_exception(e) return {"status": "failed"} # Get the previous snapshot for this origin. It is then used to see which # of the package's versions are already loaded in the archive. try: last_snapshot = self.last_snapshot() logger.debug("last snapshot: %s", last_snapshot) known_artifacts = self.known_artifacts(last_snapshot) logger.debug("known artifacts: %s", known_artifacts) except Exception as e: logger.exception("Failed to get previous state for %s", self.url) sentry_sdk.capture_exception(e) return self.finalize_visit( snapshot=snapshot, visit=visit, failed_branches=failed_branches, status_visit="failed", status_load="failed", ) load_exceptions: List[Exception] = [] # Get the list of all version names try: versions = self.get_versions() except NotFound: return self.finalize_visit( snapshot=snapshot, visit=visit, failed_branches=failed_branches, status_visit="not_found", status_load="failed", ) except Exception: return self.finalize_visit( snapshot=snapshot, visit=visit, failed_branches=failed_branches, status_visit="failed", status_load="failed", ) # Get the metadata of each version's package packages_info: List[Tuple[str, str, TPackageInfo]] = [ (version, branch_name, p_info) for version in versions for (branch_name, p_info) in self.get_package_info(version) ] # Compute the ExtID of each of these packages known_extids = self._get_known_extids( [p_info for (_, _, p_info) in packages_info] ) if last_snapshot is None: last_snapshot_targets: Set[Sha1Git] = set() else: last_snapshot_targets = { branch.target for branch in last_snapshot.branches.values() } new_extids: Set[ExtID] = set() tmp_revisions: Dict[str, List[Tuple[str, Sha1Git]]] = { version: [] for version in versions } for (version, branch_name, p_info) in packages_info: logger.debug("package_info: %s", p_info) # Check if the package was already loaded, using its ExtID revision_id = self.resolve_revision_from_extids( known_extids, p_info, last_snapshot_targets ) if revision_id is None: # No matching revision found in the last snapshot, load it. try: res = self._load_revision(p_info, origin) if res: (revision_id, directory_id) = res assert revision_id assert directory_id self._load_extrinsic_directory_metadata( p_info, revision_id, directory_id ) self.storage.flush() status_load = "eventful" except Exception as e: self.storage.clear_buffers() load_exceptions.append(e) sentry_sdk.capture_exception(e) logger.exception( "Failed loading branch %s for %s", branch_name, self.url ) failed_branches.append(branch_name) continue if revision_id is None: continue partial_extid = p_info.extid() if partial_extid is not None: (extid_type, extid) = partial_extid revision_swhid = CoreSWHID( object_type=ObjectType.REVISION, object_id=revision_id ) new_extids.add( ExtID(extid_type=extid_type, extid=extid, target=revision_swhid) ) tmp_revisions[version].append((branch_name, revision_id)) if load_exceptions: status_visit = "partial" if not tmp_revisions: # We could not load any revisions; fail completely return self.finalize_visit( snapshot=snapshot, visit=visit, failed_branches=failed_branches, status_visit="failed", status_load="failed", ) try: # Retrieve the default release version (the "latest" one) default_version = self.get_default_version() logger.debug("default version: %s", default_version) # Retrieve extra branches extra_branches = self.extra_branches() logger.debug("extra branches: %s", extra_branches) snapshot = self._load_snapshot( default_version, tmp_revisions, extra_branches ) self.storage.flush() except Exception as e: logger.exception("Failed to build snapshot for origin %s", self.url) sentry_sdk.capture_exception(e) status_visit = "failed" status_load = "failed" if snapshot: try: metadata_objects = self.build_extrinsic_snapshot_metadata(snapshot.id) self._load_metadata_objects(metadata_objects) except Exception as e: logger.exception( "Failed to load extrinsic snapshot metadata for %s", self.url ) sentry_sdk.capture_exception(e) status_visit = "partial" status_load = "failed" try: metadata_objects = self.build_extrinsic_origin_metadata() self._load_metadata_objects(metadata_objects) except Exception as e: logger.exception( "Failed to load extrinsic origin metadata for %s", self.url ) sentry_sdk.capture_exception(e) status_visit = "partial" status_load = "failed" self._load_extids(new_extids) return self.finalize_visit( snapshot=snapshot, visit=visit, failed_branches=failed_branches, status_visit=status_visit, status_load=status_load, ) def _load_directory( self, dl_artifacts: List[Tuple[str, Mapping[str, Any]]], tmpdir: str ) -> Tuple[str, from_disk.Directory]: uncompressed_path = self.uncompress(dl_artifacts, dest=tmpdir) logger.debug("uncompressed_path: %s", uncompressed_path) directory = from_disk.Directory.from_disk( path=uncompressed_path.encode("utf-8"), max_content_length=self.max_content_size, ) contents, skipped_contents, directories = from_disk.iter_directory(directory) logger.debug("Number of skipped contents: %s", len(skipped_contents)) self.storage.skipped_content_add(skipped_contents) logger.debug("Number of contents: %s", len(contents)) self.storage.content_add(contents) logger.debug("Number of directories: %s", len(directories)) self.storage.directory_add(directories) return (uncompressed_path, directory) def _load_revision( self, p_info: TPackageInfo, origin ) -> Optional[Tuple[Sha1Git, Sha1Git]]: """Does all the loading of a revision itself: * downloads a package and uncompresses it * loads it from disk * adds contents, directories, and revision to self.storage * returns (revision_id, directory_id) Raises exception when unable to download or uncompress artifacts """ with tempfile.TemporaryDirectory() as tmpdir: dl_artifacts = self.download_package(p_info, tmpdir) (uncompressed_path, directory) = self._load_directory(dl_artifacts, tmpdir) # FIXME: This should be release. cf. D409 revision = self.build_revision( p_info, uncompressed_path, directory=directory.hash ) if not revision: # Some artifacts are missing intrinsic metadata # skipping those return None metadata = [metadata for (filepath, metadata) in dl_artifacts] original_artifact_metadata = RawExtrinsicMetadata( target=ExtendedSWHID( object_type=ExtendedObjectType.DIRECTORY, object_id=revision.directory ), discovery_date=self.visit_date, authority=SWH_METADATA_AUTHORITY, fetcher=self.get_metadata_fetcher(), format="original-artifacts-json", metadata=json.dumps(metadata).encode(), origin=self.url, revision=CoreSWHID(object_type=ObjectType.REVISION, object_id=revision.id), ) self._load_metadata_objects([original_artifact_metadata]) logger.debug("Revision: %s", revision) self.storage.revision_add([revision]) assert directory.hash return (revision.id, directory.hash) def _load_snapshot( self, default_version: str, revisions: Dict[str, List[Tuple[str, bytes]]], extra_branches: Dict[bytes, Mapping[str, Any]], ) -> Optional[Snapshot]: """Build snapshot out of the current revisions stored and extra branches. Then load it in the storage. """ logger.debug("revisions: %s", revisions) # Build and load the snapshot branches = {} # type: Dict[bytes, Mapping[str, Any]] for version, branch_name_revisions in revisions.items(): if version == default_version and len(branch_name_revisions) == 1: # only 1 branch (no ambiguity), we can create an alias # branch 'HEAD' branch_name, _ = branch_name_revisions[0] # except for some corner case (deposit) if branch_name != "HEAD": branches[b"HEAD"] = { "target_type": "alias", "target": branch_name.encode("utf-8"), } for branch_name, target in branch_name_revisions: branches[branch_name.encode("utf-8")] = { "target_type": "revision", "target": target, } # Deal with extra-branches for name, branch_target in extra_branches.items(): if name in branches: logger.error("Extra branch '%s' has been ignored", name) else: branches[name] = branch_target snapshot_data = {"branches": branches} logger.debug("snapshot: %s", snapshot_data) snapshot = Snapshot.from_dict(snapshot_data) logger.debug("snapshot: %s", snapshot) self.storage.snapshot_add([snapshot]) return snapshot def get_loader_name(self) -> str: """Returns a fully qualified name of this loader.""" return f"{self.__class__.__module__}.{self.__class__.__name__}" def get_loader_version(self) -> str: """Returns the version of the current loader.""" module_name = self.__class__.__module__ or "" module_name_parts = module_name.split(".") # Iterate rootward through the package hierarchy until we find a parent of this # loader's module with a __version__ attribute. for prefix_size in range(len(module_name_parts), 0, -1): package_name = ".".join(module_name_parts[0:prefix_size]) module = sys.modules[package_name] if hasattr(module, "__version__"): return module.__version__ # type: ignore # If this loader's class has no parent package with a __version__, # it should implement it itself. raise NotImplementedError( f"Could not dynamically find the version of {self.get_loader_name()}." ) def get_metadata_fetcher(self) -> MetadataFetcher: """Returns a MetadataFetcher instance representing this package loader; which is used to for adding provenance information to extracted extrinsic metadata, if any.""" return MetadataFetcher( name=self.get_loader_name(), version=self.get_loader_version(), metadata={}, ) def get_metadata_authority(self) -> MetadataAuthority: """For package loaders that get extrinsic metadata, returns the authority the metadata are coming from. """ raise NotImplementedError("get_metadata_authority") def get_extrinsic_origin_metadata(self) -> List[RawExtrinsicMetadataCore]: """Returns metadata items, used by build_extrinsic_origin_metadata.""" return [] def build_extrinsic_origin_metadata(self) -> List[RawExtrinsicMetadata]: """Builds a list of full RawExtrinsicMetadata objects, using metadata returned by get_extrinsic_origin_metadata.""" metadata_items = self.get_extrinsic_origin_metadata() if not metadata_items: # If this package loader doesn't write metadata, no need to require # an implementation for get_metadata_authority. return [] authority = self.get_metadata_authority() fetcher = self.get_metadata_fetcher() metadata_objects = [] for item in metadata_items: metadata_objects.append( RawExtrinsicMetadata( target=Origin(self.url).swhid(), discovery_date=item.discovery_date or self.visit_date, authority=authority, fetcher=fetcher, format=item.format, metadata=item.metadata, ) ) return metadata_objects def get_extrinsic_snapshot_metadata(self) -> List[RawExtrinsicMetadataCore]: """Returns metadata items, used by build_extrinsic_snapshot_metadata.""" return [] def build_extrinsic_snapshot_metadata( self, snapshot_id: Sha1Git ) -> List[RawExtrinsicMetadata]: """Builds a list of full RawExtrinsicMetadata objects, using metadata returned by get_extrinsic_snapshot_metadata.""" metadata_items = self.get_extrinsic_snapshot_metadata() if not metadata_items: # If this package loader doesn't write metadata, no need to require # an implementation for get_metadata_authority. return [] authority = self.get_metadata_authority() fetcher = self.get_metadata_fetcher() metadata_objects = [] for item in metadata_items: metadata_objects.append( RawExtrinsicMetadata( target=ExtendedSWHID( object_type=ExtendedObjectType.SNAPSHOT, object_id=snapshot_id ), discovery_date=item.discovery_date or self.visit_date, authority=authority, fetcher=fetcher, format=item.format, metadata=item.metadata, origin=self.url, ) ) return metadata_objects def build_extrinsic_directory_metadata( self, p_info: TPackageInfo, revision_id: Sha1Git, directory_id: Sha1Git, ) -> List[RawExtrinsicMetadata]: if not p_info.directory_extrinsic_metadata: # If this package loader doesn't write metadata, no need to require # an implementation for get_metadata_authority. return [] authority = self.get_metadata_authority() fetcher = self.get_metadata_fetcher() metadata_objects = [] for item in p_info.directory_extrinsic_metadata: metadata_objects.append( RawExtrinsicMetadata( target=ExtendedSWHID( object_type=ExtendedObjectType.DIRECTORY, object_id=directory_id ), discovery_date=item.discovery_date or self.visit_date, authority=authority, fetcher=fetcher, format=item.format, metadata=item.metadata, origin=self.url, revision=CoreSWHID( object_type=ObjectType.REVISION, object_id=revision_id ), ) ) return metadata_objects def _load_extrinsic_directory_metadata( self, p_info: TPackageInfo, revision_id: Sha1Git, directory_id: Sha1Git, ) -> None: metadata_objects = self.build_extrinsic_directory_metadata( p_info, revision_id, directory_id ) self._load_metadata_objects(metadata_objects) def _load_metadata_objects( self, metadata_objects: List[RawExtrinsicMetadata] ) -> None: if not metadata_objects: # If this package loader doesn't write metadata, no need to require # an implementation for get_metadata_authority. return self._create_authorities(mo.authority for mo in metadata_objects) self._create_fetchers(mo.fetcher for mo in metadata_objects) self.storage.raw_extrinsic_metadata_add(metadata_objects) def _create_authorities(self, authorities: Iterable[MetadataAuthority]) -> None: deduplicated_authorities = { (authority.type, authority.url): authority for authority in authorities } if authorities: self.storage.metadata_authority_add(list(deduplicated_authorities.values())) def _create_fetchers(self, fetchers: Iterable[MetadataFetcher]) -> None: deduplicated_fetchers = { (fetcher.name, fetcher.version): fetcher for fetcher in fetchers } if fetchers: self.storage.metadata_fetcher_add(list(deduplicated_fetchers.values())) def _load_extids(self, extids: Set[ExtID]) -> None: if not extids: return try: self.storage.extid_add(list(extids)) except Exception as e: logger.exception("Failed to load new ExtIDs for %s", self.url) sentry_sdk.capture_exception(e) # No big deal, it just means the next visit will load the same versions # again.