diff --git a/PKG-INFO b/PKG-INFO index 83e34df0..fab895dd 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,250 +1,250 @@ Metadata-Version: 2.1 Name: swh.storage -Version: 0.39.0 +Version: 0.40.0 Summary: Software Heritage storage manager Home-page: https://forge.softwareheritage.org/diffusion/DSTO/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-storage Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-storage/ Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing Provides-Extra: journal License-File: LICENSE License-File: AUTHORS swh-storage =========== Abstraction layer over the archive, allowing to access all stored source code artifacts as well as their metadata. See the [documentation](https://docs.softwareheritage.org/devel/swh-storage/index.html) for more details. ## Quick start ### Dependencies Python tests for this module include tests that cannot be run without a local Postgresql database, so you need the Postgresql server executable on your machine (no need to have a running Postgresql server). They also expect a cassandra server. #### Debian-like host ``` $ sudo apt install libpq-dev postgresql-11 cassandra ``` #### Non Debian-like host The tests expects the path to `cassandra` to either be unspecified, it is then looked up at `/usr/sbin/cassandra`, either specified through the environment variable `SWH_CASSANDRA_BIN`. Optionally, you can avoid running the cassandra tests. ``` (swh) :~/swh-storage$ tox -- -m 'not cassandra' ``` ### Installation It is strongly recommended to use a virtualenv. In the following, we consider you work in a virtualenv named `swh`. See the [developer setup guide](https://docs.softwareheritage.org/devel/developer-setup.html#developer-setup) for a more details on how to setup a working environment. You can install the package directly from [pypi](https://pypi.org/p/swh.storage): ``` (swh) :~$ pip install swh.storage [...] ``` Or from sources: ``` (swh) :~$ git clone https://forge.softwareheritage.org/source/swh-storage.git [...] (swh) :~$ cd swh-storage (swh) :~/swh-storage$ pip install . [...] ``` Then you can check it's properly installed: ``` (swh) :~$ swh storage --help Usage: swh storage [OPTIONS] COMMAND [ARGS]... Software Heritage Storage tools. Options: -h, --help Show this message and exit. Commands: rpc-serve Software Heritage Storage RPC server. ``` ## Tests The best way of running Python tests for this module is to use [tox](https://tox.readthedocs.io/). ``` (swh) :~$ pip install tox ``` ### tox From the sources directory, simply use tox: ``` (swh) :~/swh-storage$ tox [...] ========= 315 passed, 6 skipped, 15 warnings in 40.86 seconds ========== _______________________________ summary ________________________________ flake8: commands succeeded py3: commands succeeded congratulations :) ``` Note: it is possible to set the `JAVA_HOME` environment variable to specify the version of the JVM to be used by Cassandra. For example, at the time of writing this, Cassandra does not support java 14, so one may want to use for example java 11: ``` (swh) :~/swh-storage$ export JAVA_HOME=/usr/lib/jvm/java-14-openjdk-amd64/bin/java (swh) :~/swh-storage$ tox [...] ``` ## Development The storage server can be locally started. It requires a configuration file and a running Postgresql database. ### Sample configuration A typical configuration `storage.yml` file is: ``` storage: cls: postgresql db: "dbname=softwareheritage-dev user= password=" objstorage: cls: pathslicing root: /tmp/swh-storage/ slicing: 0:2/2:4/4:6 ``` which means, this uses: - a local storage instance whose db connection is to `softwareheritage-dev` local instance, - the objstorage uses a local objstorage instance whose: - `root` path is /tmp/swh-storage, - slicing scheme is `0:2/2:4/4:6`. This means that the identifier of the content (sha1) which will be stored on disk at first level with the first 2 hex characters, the second level with the next 2 hex characters and the third level with the next 2 hex characters. And finally the complete hash file holding the raw content. For example: 00062f8bd330715c4f819373653d97b3cd34394c will be stored at 00/06/2f/00062f8bd330715c4f819373653d97b3cd34394c Note that the `root` path should exist on disk before starting the server. ### Starting the storage server If the python package has been properly installed (e.g. in a virtual env), you should be able to use the command: ``` (swh) :~/swh-storage$ swh storage rpc-serve storage.yml ``` This runs a local swh-storage api at 5002 port. ``` (swh) :~/swh-storage$ curl http://127.0.0.1:5002 Software Heritage storage server

You have reached the Software Heritage storage server.
See its documentation and API for more information

``` ### And then what? In your upper layer ([loader-git](https://forge.softwareheritage.org/source/swh-loader-git/), [loader-svn](https://forge.softwareheritage.org/source/swh-loader-svn/), etc...), you can define a remote storage with this snippet of yaml configuration. ``` storage: cls: remote url: http://localhost:5002/ ``` You could directly define a postgresql storage with the following snippet: ``` storage: cls: postgresql db: service=swh-dev objstorage: cls: pathslicing root: /home/storage/swh-storage/ slicing: 0:2/2:4/4:6 ``` ## Cassandra As an alternative to PostgreSQL, swh-storage can use Cassandra as a database backend. It can be used like this: ``` storage: cls: cassandra hosts: - localhost objstorage: cls: pathslicing root: /home/storage/swh-storage/ slicing: 0:2/2:4/4:6 ``` The Cassandra swh-storage implementation supports both Cassandra >= 4.0-alpha2 and ScyllaDB >= 4.4 (and possibly earlier versions, but this is untested). While the main code supports both transparently, running tests or configuring the schema requires specific code when using ScyllaDB, enabled by setting the `SWH_USE_SCYLLADB=1` environment variable. diff --git a/docs/extrinsic-metadata-specification.rst b/docs/extrinsic-metadata-specification.rst index f1522ec7..c4185651 100644 --- a/docs/extrinsic-metadata-specification.rst +++ b/docs/extrinsic-metadata-specification.rst @@ -1,347 +1,343 @@ :orphan: .. _extrinsic-metadata-specification: Extrinsic metadata specification ================================ :term:`Extrinsic metadata` is information about software that is not part of the source code itself but still closely related to the software. Typical sources for extrinsic metadata are: the hosting place of a repository, which can offer metadata via its web view or API; external registries like collaborative curation initiatives; and out-of-band information available at source code archival time. Since they are not part of the source code, a dedicated mechanism to fetch and store them is needed. This specification assumes the reader is familiar with Software Heritage's :ref:`architecture` and :ref:`data-model`. Metadata sources ---------------- Authorities ^^^^^^^^^^^ Metadata authorities are entities that provide metadata about an :term:`origin`. Metadata authorities include: code hosting places, :term:`deposit` submitters, and registries (eg. Wikidata). An authority is uniquely defined by these properties: * its type, representing the kind of authority, which is one of these values: * ``deposit_client``, for metadata pushed to Software Heritage at the same time as a software artifact * ``forge``, for metadata pulled from the same source as the one hosting the software artifacts (which includes package managers) * ``registry``, for metadata pulled from a third-party * its URL, which unambiguously identifies an instance of the authority type. Examples: =============== ================================= type url =============== ================================= deposit_client https://hal.archives-ouvertes.fr/ deposit_client https://hal.inria.fr/ deposit_client https://software.intel.com/ forge https://gitlab.com/ forge https://gitlab.inria.fr/ forge https://0xacab.org/ forge https://github.com/ registry https://www.wikidata.org/ registry https://swmath.org/ registry https://ascl.net/ =============== ================================= Metadata fetchers ^^^^^^^^^^^^^^^^^ Metadata fetchers are software components used to fetch metadata from a metadata authority, and ingest them into the Software Heritage archive. A metadata fetcher is uniquely defined by these properties: * its type * its version Examples: * :term:`loaders `, which may either discover metadata as a side-effect of loading source code, or be dedicated to fetching metadata. * :term:`listers `, which may discover metadata as a side-effect of discovering origins. * :term:`deposit` submitters, which push metadata to SWH from a third-party; usually at the same time as a :term:`software artifact` * crawlers, which fetch metadata from an authority in a way that is none of the above (eg. by querying a specific API of the origin's forge). Storage API ----------- Authorities and metadata fetchers ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -The :term:`storage` API offers these endpoints to manipulate metadata -authorities and metadata fetchers: +Data model +~~~~~~~~~~ -* ``metadata_authority_add(type, url, metadata)`` - which adds a new metadata authority to the storage. +The :term:`storage` API uses these structures to represent metadata +authorities and metadata fetchers (simplified Python code):: -* ``metadata_authority_get(type, url)`` - which looks up a known authority (there is at most one) and if it is - known, returns a dictionary with keys ``type``, ``url``, and ``metadata``. + class MetadataAuthorityType(Enum): + DEPOSIT_CLIENT = "deposit_client" + FORGE = "forge" + REGISTRY = "registry" -* ``metadata_fetcher_add(name, version, metadata)`` - which adds a new metadata fetcher to the storage. + class MetadataAuthority(BaseModel): + """Represents an entity that provides metadata about an origin or + software artifact.""" -* ``metadata_fetcher_get(name, version)`` - which looks up a known fetcher (there is at most one) and if it is - known, returns a dictionary with keys ``name``, ``version``, and - ``metadata``. + object_type = "metadata_authority" -These `metadata` fields contain JSON-encodable dictionaries -with information about the authority/fetcher, in a format specific to each -authority/fetcher. -With authority, the `metadata` field is reserved for information describing -and qualifying the authority. -With fetchers, the `metadata` field is reserved for configuration metadata -and other technical usage. + type: MetadataAuthorityType + url: str -Origin metadata -^^^^^^^^^^^^^^^ + class MetadataFetcher(BaseModel): + """Represents a software component used to fetch metadata from a metadata + authority, and ingest them into the Software Heritage archive.""" -Extrinsic metadata are stored in SWH's :term:`storage database`. -The storage API offers three endpoints to manipulate origin metadata: + object_type = "metadata_fetcher" -* Adding metadata:: + name: str + version: str - raw_extrinsic_metadata_add( - "origin", origin_url, discovery_date, - authority, fetcher, - format, metadata - ) +Storage API +~~~~~~~~~~~ - which adds a new `metadata` byte string obtained from a given authority - and associated to the origin. - `discovery_date` is a Python datetime. - `authority` must be a dict containing keys `type` and `url`, and - `fetcher` a dict containing keys `name` and `version`. - The authority and fetcher must be known to the storage before using this - endpoint. - `format` is a text field indicating the format of the content of the - `metadata` byte string, see `extrinsic-metadata-formats`_. +* ``metadata_authority_add(authorities: List[MetadataAuthority])`` + which adds a list of ``MetadataAuthority`` to the storage. -* Getting latest metadata:: +* ``metadata_authority_get(type: MetadataAuthorityType, url: str) -> Optional[MetadataAuthority]`` + which looks up a known authority (there is at most one) and if it is + known, returns the corresponding ``MetadataAuthority`` - raw_extrinsic_metadata_get_latest( - "origin", origin_url, authority - ) +* ``metadata_fetcher_add(fetchers: List[MetadataFetcher])`` + which adds a list of ``MetadataFetcher`` to the storage. - where `authority` must be a dict containing keys `type` and `url`, - which returns a dictionary corresponding to the latest metadata entry - added from this origin, in the format:: +* ``metadata_fetcher_get(name: str, version: str) -> Optional[MetadataFetcher]`` + which looks up a known fetcher (there is at most one) and if it is + known, returns the corresponding ``MetadataFetcher`` - { - 'origin_url': ..., - 'authority': {'type': ..., 'url': ...}, - 'fetcher': {'name': ..., 'version': ...}, - 'discovery_date': ..., - 'format': '...', - 'metadata': b'...' - } +Artifact metadata +^^^^^^^^^^^^^^^^^ +Data model +~~~~~~~~~~ -* Getting all metadata:: +The storage database stores metadata on origins, and all software artifacts +supported by the data model. +They are represented using this structure (simplified Python code):: - raw_extrinsic_metadata_get( - "origin", origin_url, - authority, - page_token, limit - ) + class RawExtrinsicMetadata(HashableObject, BaseModel): + object_type = "raw_extrinsic_metadata" - where `authority` must be a dict containing keys `type` and `url` - which returns a dictionary with keys: + # target object + target: ExtendedSWHID - * `next_page_token`, which is an opaque token to be used as - `page_token` for retrieving the next page. if absent, there is - no more pages to gather. - * `results`: list of dictionaries, one for each metadata item - deposited, corresponding to the given origin and obtained from the - specified authority. + # source + discovery_date: datetime.datetime + authority: MetadataAuthority + fetcher: MetadataFetcher - Each of these dictionaries is in the following format:: + # the metadata itself + format: str + metadata: bytes - { - 'authority': {'type': ..., 'url': ...}, - 'fetcher': {'name': ..., 'version': ...}, - 'discovery_date': ..., - 'format': '...', - 'metadata': b'...' - } - -The parameters ``page_token`` and ``limit`` are used for pagination based on -an arbitrary order. An initial query to ``origin_metadata_get`` must set -``page_token`` to ``None``, and further query must use the value from the -previous query's ``next_page_token`` to get the next page of results. - -``metadata`` is a bytes array (eventually encoded using Base64). + # context + origin: Optional[str] = None + visit: Optional[int] = None + snapshot: Optional[CoreSWHID] = None + release: Optional[CoreSWHID] = None + revision: Optional[CoreSWHID] = None + path: Optional[bytes] = None + directory: Optional[CoreSWHID] = None + + id: Sha1Git + +The ``target`` may be: + +* a regular :ref:`core SWHID `, +* a SWHID-like string with type ``ori`` and the SHA1 of an origin URL +* a SWHID-like string with type ``emd`` and the SHA1 of an other + ``RawExtrinsicMetadata`` object (to represent metadata on metadata objects) + +``id`` is a sha1 hash of the ``RawExtrinsicMetadata`` object itself; +it may be used in other ``RawExtrinsicMetadata`` as target. + +``discovery_date`` is a Python datetime. +``authority`` must be a dict containing keys ``type`` and ``url``, and +``fetcher`` a dict containing keys ``name`` and ``version``. +The authority and fetcher must be known to the storage before using this +endpoint. +``format`` is a text field indicating the format of the content of the +``metadata`` byte string, see `extrinsic-metadata-formats`_. + +``metadata`` is a byte array. Its format is specific to each authority; and is treated as an opaque value by the storage. Unifying these various formats into a common language is outside the scope of this specification. -Artifact metadata -^^^^^^^^^^^^^^^^^ - -In addition to origin metadata, the storage database stores metadata on -all software artifacts supported by the data model. - -This works similarly to origin metadata, with one major difference: -extrinsic metadata can be given on a specific artifact within a specified -context (for example: a directory in a specific revision from a specific +Finally, the remaining fields allow metadata can be given on a specific artifact within +a specified context (for example: a directory in a specific revision from a specific visit on a specific origin) which will be stored along the metadata itself. For example, two origins may develop the same file independently; the information about authorship, licensing or even description may vary about the same artifact in a different context. This is why it is important to qualify the metadata with the complete context for which it is intended, if any. -The same two endpoints as for origin can be used, but with a different -value for the first argument: - -* Adding metadata:: - - raw_extrinsic_metadata_add( - type, id, context, discovery_date, - authority, fetcher, - format, metadata - ) - - -* Getting all metadata:: - - raw_extrinsic_metadata_get( - type, id, - authority, - after, - page_token, limit - ) - - -definited similarly to ``origin_metadata_add`` and ``origin_metadata_get``, -but where ``id`` is a core SWHID (with type matching ````), -and with an extra ``context`` (argument when adding metadata, and dictionary -key when getting them) that is a dictionary with keys -depending on the artifact ``type``: +The allowed context fields for each ``target`` type are: -* for ``snapshot``: ``origin`` (a URL) and ``visit`` (an integer) -* for ``release``: those above, plus ``snapshot`` +* for ``emd`` (extrinsic metadata) and ``ori`` (origin): none +* for ``snp`` (snapshot): ``origin`` (a URL) and ``visit`` (an integer) +* for ``rel`` (release): those above, plus ``snapshot`` (the core SWHID of a snapshot) -* for ``revision``: all those above, plus ``release`` +* for ``rev`` (revision): all those above, plus ``release`` (the core SWHID of a release) -* for ``directory``: all those above, plus ``revision`` +* for ``dir`` (directory): all those above, plus ``revision`` (the core SWHID of a revision) and ``path`` (a byte string), representing the path to this directory from the root of the ``revision`` -* for ``content``: all those above, plus ``directory`` +* for ``cnt`` (content): all those above, plus ``directory`` (the core SWHID of a directory) All keys are optional, but should be provided whenever possible. The dictionary may be empty, if metadata is fully independent from context. In all cases, ``visit`` should only be provided if ``origin`` is (as visit ids are only unique with respect to an origin). +Storage API +~~~~~~~~~~~ + +The storage API offers three endpoints to manipulate origin metadata: + +* Adding metadata:: + + raw_extrinsic_metadata_add(metadata: List[RawExtrinsicMetadata]) + + which adds a list of ``RawExtrinsicMetadata`` objects, whose ``metadata`` field + is a byte string obtained from a given authority and associated to the ``target``. + + +* Getting all metadata:: + + raw_extrinsic_metadata_get( + target: ExtendedSWHID, + authority: MetadataAuthority, + after: Optional[datetime.datetime] = None, + page_token: Optional[bytes] = None, + limit: int = 1000, + ) -> PagedResult[RawExtrinsicMetadata]: + + returns a list of ``RawExtrinsicMetadata`` with the given ``target`` and from + the given ``authority``. + If ``after`` is provided, only objects whose discovery date is more recent are + returnered. + + ``PagedResult`` is a structure containing the results themselves, + and a ``next_page_token`` used to fetch the next list of results, if any + .. _extrinsic-metadata-formats: -Extrinsic metadata format -------------------------- +Extrinsic metadata formats +-------------------------- Here is a list of all the metadata format stored: ``pypi-project-json`` The metadata is a release entry from a PyPI project's JSON file, extracted and re-serialized. ``replicate-npm-package-json`` ditto, but from a replicate.npmjs.com project ``nixguix-sources-json`` ditto, but from https://nix-community.github.io/nixpkgs-swh/ ``original-artifacts-json`` tarball data, see below ``sword-v2-atom-codemeta`` XML Atom document, with Codemeta metadata, as sent by a deposit client, see the :ref:`Deposit protocol reference `. ``sword-v2-atom-codemeta-v2`` Deprecated alias of ``sword-v2-atom-codemeta`` ``sword-v2-atom-codemeta-v2-in-json`` Deprecated, JSON serialization of a ``sword-v2-atom-codemeta`` document. ``xml-deposit-info`` Information about a deposit, to identify the provenance of a metadata object sent via swh-deposit, see below Details on some of these formats: original-artifacts-json ^^^^^^^^^^^^^^^^^^^^^^^ This is a loosely defined format, originally used as a ``metadata`` column on the ``revision`` table that changed over the years. It is a JSON array, and each entry is a JSON object representing an archive (tarball, zipball, ...) that was unpackaged by the SWH loader before loading its content in Software Heritage. When writing this specification, it was stabilized to this format:: [ { "length": , "filename": "", "checksums": { "sha1": "", "sha256": "", }, "url": "" }, ... ] Older ``original-artifacts-json`` were migrated to use this format, but may be missing some of the keys. xml-deposit-info ^^^^^^^^^^^^^^^^ Deposits with code objects are loaded as their own origin, so we can look them up in the deposit database from their metadata (which hold the origin as a context). This is not true for metadata-only deposits, because we don't create an origin for them; so we need to store this information somewhere. The naive solution would be to insert them in the Atom entry provided by the client, but it means altering a document before we archive it, which potentially corrupts it or loses part of the data. Therefore, on each metadata-only deposit, the deposit creates an extra "metametadata" object, with the original metadata object as target, and using this format:: {{ deposit.id }} {{ deposit.client.provider_url }} {{ deposit.collection.name }} diff --git a/requirements-swh-journal.txt b/requirements-swh-journal.txt index 9a12e5c8..4d54e902 100644 --- a/requirements-swh-journal.txt +++ b/requirements-swh-journal.txt @@ -1 +1 @@ -swh.journal >= 0.6.2 +swh.journal >= 0.9 diff --git a/requirements-test.txt b/requirements-test.txt index 67e0830d..8662be4f 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,13 +1,15 @@ hypothesis >= 3.11.0 pytest pytest-mock # pytz is in fact a dep of swh.model[testing] and should not be necessary, but # the dep on swh.model in the main requirements-swh.txt file shadows this one # adding the [testing] extra. swh.model[testing] >= 0.0.50 pytz +pytest-redis pytest-xdist types-python-dateutil types-pytz types-pyyaml +types-redis types-requests diff --git a/requirements.txt b/requirements.txt index 06048429..05531f9d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,10 +1,11 @@ +aiohttp +cassandra-driver >= 3.19.0, != 3.21.0 click +deprecated flask +iso8601 +mypy_extensions psycopg2 -aiohttp +redis tenacity -cassandra-driver >= 3.19.0, != 3.21.0 -deprecated typing-extensions -mypy_extensions -iso8601 diff --git a/swh.storage.egg-info/PKG-INFO b/swh.storage.egg-info/PKG-INFO index 83e34df0..fab895dd 100644 --- a/swh.storage.egg-info/PKG-INFO +++ b/swh.storage.egg-info/PKG-INFO @@ -1,250 +1,250 @@ Metadata-Version: 2.1 Name: swh.storage -Version: 0.39.0 +Version: 0.40.0 Summary: Software Heritage storage manager Home-page: https://forge.softwareheritage.org/diffusion/DSTO/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-storage Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-storage/ Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing Provides-Extra: journal License-File: LICENSE License-File: AUTHORS swh-storage =========== Abstraction layer over the archive, allowing to access all stored source code artifacts as well as their metadata. See the [documentation](https://docs.softwareheritage.org/devel/swh-storage/index.html) for more details. ## Quick start ### Dependencies Python tests for this module include tests that cannot be run without a local Postgresql database, so you need the Postgresql server executable on your machine (no need to have a running Postgresql server). They also expect a cassandra server. #### Debian-like host ``` $ sudo apt install libpq-dev postgresql-11 cassandra ``` #### Non Debian-like host The tests expects the path to `cassandra` to either be unspecified, it is then looked up at `/usr/sbin/cassandra`, either specified through the environment variable `SWH_CASSANDRA_BIN`. Optionally, you can avoid running the cassandra tests. ``` (swh) :~/swh-storage$ tox -- -m 'not cassandra' ``` ### Installation It is strongly recommended to use a virtualenv. In the following, we consider you work in a virtualenv named `swh`. See the [developer setup guide](https://docs.softwareheritage.org/devel/developer-setup.html#developer-setup) for a more details on how to setup a working environment. You can install the package directly from [pypi](https://pypi.org/p/swh.storage): ``` (swh) :~$ pip install swh.storage [...] ``` Or from sources: ``` (swh) :~$ git clone https://forge.softwareheritage.org/source/swh-storage.git [...] (swh) :~$ cd swh-storage (swh) :~/swh-storage$ pip install . [...] ``` Then you can check it's properly installed: ``` (swh) :~$ swh storage --help Usage: swh storage [OPTIONS] COMMAND [ARGS]... Software Heritage Storage tools. Options: -h, --help Show this message and exit. Commands: rpc-serve Software Heritage Storage RPC server. ``` ## Tests The best way of running Python tests for this module is to use [tox](https://tox.readthedocs.io/). ``` (swh) :~$ pip install tox ``` ### tox From the sources directory, simply use tox: ``` (swh) :~/swh-storage$ tox [...] ========= 315 passed, 6 skipped, 15 warnings in 40.86 seconds ========== _______________________________ summary ________________________________ flake8: commands succeeded py3: commands succeeded congratulations :) ``` Note: it is possible to set the `JAVA_HOME` environment variable to specify the version of the JVM to be used by Cassandra. For example, at the time of writing this, Cassandra does not support java 14, so one may want to use for example java 11: ``` (swh) :~/swh-storage$ export JAVA_HOME=/usr/lib/jvm/java-14-openjdk-amd64/bin/java (swh) :~/swh-storage$ tox [...] ``` ## Development The storage server can be locally started. It requires a configuration file and a running Postgresql database. ### Sample configuration A typical configuration `storage.yml` file is: ``` storage: cls: postgresql db: "dbname=softwareheritage-dev user= password=" objstorage: cls: pathslicing root: /tmp/swh-storage/ slicing: 0:2/2:4/4:6 ``` which means, this uses: - a local storage instance whose db connection is to `softwareheritage-dev` local instance, - the objstorage uses a local objstorage instance whose: - `root` path is /tmp/swh-storage, - slicing scheme is `0:2/2:4/4:6`. This means that the identifier of the content (sha1) which will be stored on disk at first level with the first 2 hex characters, the second level with the next 2 hex characters and the third level with the next 2 hex characters. And finally the complete hash file holding the raw content. For example: 00062f8bd330715c4f819373653d97b3cd34394c will be stored at 00/06/2f/00062f8bd330715c4f819373653d97b3cd34394c Note that the `root` path should exist on disk before starting the server. ### Starting the storage server If the python package has been properly installed (e.g. in a virtual env), you should be able to use the command: ``` (swh) :~/swh-storage$ swh storage rpc-serve storage.yml ``` This runs a local swh-storage api at 5002 port. ``` (swh) :~/swh-storage$ curl http://127.0.0.1:5002 Software Heritage storage server

You have reached the Software Heritage storage server.
See its documentation and API for more information

``` ### And then what? In your upper layer ([loader-git](https://forge.softwareheritage.org/source/swh-loader-git/), [loader-svn](https://forge.softwareheritage.org/source/swh-loader-svn/), etc...), you can define a remote storage with this snippet of yaml configuration. ``` storage: cls: remote url: http://localhost:5002/ ``` You could directly define a postgresql storage with the following snippet: ``` storage: cls: postgresql db: service=swh-dev objstorage: cls: pathslicing root: /home/storage/swh-storage/ slicing: 0:2/2:4/4:6 ``` ## Cassandra As an alternative to PostgreSQL, swh-storage can use Cassandra as a database backend. It can be used like this: ``` storage: cls: cassandra hosts: - localhost objstorage: cls: pathslicing root: /home/storage/swh-storage/ slicing: 0:2/2:4/4:6 ``` The Cassandra swh-storage implementation supports both Cassandra >= 4.0-alpha2 and ScyllaDB >= 4.4 (and possibly earlier versions, but this is untested). While the main code supports both transparently, running tests or configuring the schema requires specific code when using ScyllaDB, enabled by setting the `SWH_USE_SCYLLADB=1` environment variable. diff --git a/swh.storage.egg-info/requires.txt b/swh.storage.egg-info/requires.txt index 22de944d..25752023 100644 --- a/swh.storage.egg-info/requires.txt +++ b/swh.storage.egg-info/requires.txt @@ -1,30 +1,33 @@ +aiohttp +cassandra-driver!=3.21.0,>=3.19.0 click +deprecated flask +iso8601 +mypy_extensions psycopg2 -aiohttp +redis tenacity -cassandra-driver!=3.21.0,>=3.19.0 -deprecated typing-extensions -mypy_extensions -iso8601 swh.core[db,http]>=0.14.0 swh.counters>=v0.8.0 swh.model>=2.1.0 swh.objstorage>=0.2.2 [journal] -swh.journal>=0.6.2 +swh.journal>=0.9 [testing] hypothesis>=3.11.0 pytest pytest-mock swh.model[testing]>=0.0.50 pytz +pytest-redis pytest-xdist types-python-dateutil types-pytz types-pyyaml +types-redis types-requests -swh.journal>=0.6.2 +swh.journal>=0.9 diff --git a/swh/storage/backfill.py b/swh/storage/backfill.py index b27cc6bb..a97a11fe 100644 --- a/swh/storage/backfill.py +++ b/swh/storage/backfill.py @@ -1,658 +1,658 @@ # Copyright (C) 2017-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Storage backfiller. The backfiller goal is to produce back part or all of the objects from a storage to the journal topics Current implementation consists in the JournalBackfiller class. It simply reads the objects from the storage and sends every object identifier back to the journal. """ import logging from typing import Any, Callable, Dict, Optional from swh.core.db import BaseDb from swh.model.model import ( BaseModel, Directory, DirectoryEntry, ExtID, RawExtrinsicMetadata, Release, Revision, Snapshot, SnapshotBranch, TargetType, ) from swh.model.swhids import ExtendedObjectType from swh.storage.postgresql.converters import ( db_to_extid, db_to_raw_extrinsic_metadata, db_to_release, db_to_revision, ) -from swh.storage.replay import object_converter_fn +from swh.storage.replay import OBJECT_CONVERTERS from swh.storage.writer import JournalWriter logger = logging.getLogger(__name__) PARTITION_KEY = { "content": "sha1", "skipped_content": "sha1", "directory": "id", "extid": "target", "metadata_authority": "type, url", "metadata_fetcher": "name, version", "raw_extrinsic_metadata": "target", "revision": "revision.id", "release": "release.id", "snapshot": "id", "origin": "id", "origin_visit": "origin_visit.origin", "origin_visit_status": "origin_visit_status.origin", } COLUMNS = { "content": [ "sha1", "sha1_git", "sha256", "blake2s256", "length", "status", "ctime", ], "skipped_content": [ "sha1", "sha1_git", "sha256", "blake2s256", "length", "ctime", "status", "reason", ], "directory": ["id", "dir_entries", "file_entries", "rev_entries"], "extid": ["extid_type", "extid", "extid_version", "target_type", "target"], "metadata_authority": ["type", "url"], "metadata_fetcher": ["name", "version"], "origin": ["url"], "origin_visit": ["visit", "type", ("origin.url", "origin"), "date",], "origin_visit_status": [ ("origin_visit_status.visit", "visit"), ("origin.url", "origin"), ("origin_visit_status.date", "date"), "type", "snapshot", "status", "metadata", ], "raw_extrinsic_metadata": [ "raw_extrinsic_metadata.type", "raw_extrinsic_metadata.target", "metadata_authority.type", "metadata_authority.url", "metadata_fetcher.name", "metadata_fetcher.version", "discovery_date", "format", "raw_extrinsic_metadata.metadata", "origin", "visit", "snapshot", "release", "revision", "path", "directory", ], "revision": [ ("revision.id", "id"), "date", "date_offset", "date_neg_utc_offset", "committer_date", "committer_date_offset", "committer_date_neg_utc_offset", "type", "directory", "message", "synthetic", "metadata", "extra_headers", ( "array(select parent_id::bytea from revision_history rh " "where rh.id = revision.id order by rh.parent_rank asc)", "parents", ), ("a.id", "author_id"), ("a.name", "author_name"), ("a.email", "author_email"), ("a.fullname", "author_fullname"), ("c.id", "committer_id"), ("c.name", "committer_name"), ("c.email", "committer_email"), ("c.fullname", "committer_fullname"), ], "release": [ ("release.id", "id"), "date", "date_offset", "date_neg_utc_offset", "comment", ("release.name", "name"), "synthetic", "target", "target_type", ("a.id", "author_id"), ("a.name", "author_name"), ("a.email", "author_email"), ("a.fullname", "author_fullname"), ], "snapshot": ["id", "object_id"], } JOINS = { "release": ["person a on release.author=a.id"], "revision": [ "person a on revision.author=a.id", "person c on revision.committer=c.id", ], "origin_visit": ["origin on origin_visit.origin=origin.id"], "origin_visit_status": ["origin on origin_visit_status.origin=origin.id",], "raw_extrinsic_metadata": [ "metadata_authority on " "raw_extrinsic_metadata.authority_id=metadata_authority.id", "metadata_fetcher on raw_extrinsic_metadata.fetcher_id=metadata_fetcher.id", ], } EXTRA_WHERE = { # hack to force the right index usage on table extid "extid": "target_type in ('revision', 'release', 'content', 'directory')" } def directory_converter(db: BaseDb, directory_d: Dict[str, Any]) -> Directory: """Convert directory from the flat representation to swh model compatible objects. """ columns = ["target", "name", "perms"] query_template = """ select %(columns)s from directory_entry_%(type)s where id in %%s """ types = ["file", "dir", "rev"] entries = [] with db.cursor() as cur: for type in types: ids = directory_d.pop("%s_entries" % type) if not ids: continue query = query_template % { "columns": ",".join(columns), "type": type, } cur.execute(query, (tuple(ids),)) for row in cur: entry_d = dict(zip(columns, row)) entry = DirectoryEntry( name=entry_d["name"], type=type, target=entry_d["target"], perms=entry_d["perms"], ) entries.append(entry) return Directory(id=directory_d["id"], entries=tuple(entries),) def raw_extrinsic_metadata_converter( db: BaseDb, metadata: Dict[str, Any] ) -> RawExtrinsicMetadata: """Convert a raw extrinsic metadata from the flat representation to swh model compatible objects. """ return db_to_raw_extrinsic_metadata(metadata) def extid_converter(db: BaseDb, extid: Dict[str, Any]) -> ExtID: """Convert an extid from the flat representation to swh model compatible objects. """ return db_to_extid(extid) def revision_converter(db: BaseDb, revision_d: Dict[str, Any]) -> Revision: """Convert revision from the flat representation to swh model compatible objects. """ revision = db_to_revision(revision_d) assert revision is not None, revision_d["id"] return revision def release_converter(db: BaseDb, release_d: Dict[str, Any]) -> Release: """Convert release from the flat representation to swh model compatible objects. """ release = db_to_release(release_d) assert release is not None, release_d["id"] return release def snapshot_converter(db: BaseDb, snapshot_d: Dict[str, Any]) -> Snapshot: """Convert snapshot from the flat representation to swh model compatible objects. """ columns = ["name", "target", "target_type"] query = """ select %s from snapshot_branches sbs inner join snapshot_branch sb on sb.object_id=sbs.branch_id where sbs.snapshot_id=%%s """ % ", ".join( columns ) with db.cursor() as cur: cur.execute(query, (snapshot_d["object_id"],)) branches = {} for name, *row in cur: branch_d = dict(zip(columns[1:], row)) if branch_d["target"] is not None and branch_d["target_type"] is not None: branch: Optional[SnapshotBranch] = SnapshotBranch( target=branch_d["target"], target_type=TargetType(branch_d["target_type"]), ) else: branch = None branches[name] = branch return Snapshot(id=snapshot_d["id"], branches=branches,) CONVERTERS: Dict[str, Callable[[BaseDb, Dict[str, Any]], BaseModel]] = { "directory": directory_converter, "extid": extid_converter, "raw_extrinsic_metadata": raw_extrinsic_metadata_converter, "revision": revision_converter, "release": release_converter, "snapshot": snapshot_converter, } def object_to_offset(object_id, numbits): """Compute the index of the range containing object id, when dividing space into 2^numbits. Args: object_id (str): The hex representation of object_id numbits (int): Number of bits in which we divide input space Returns: The index of the range containing object id """ q, r = divmod(numbits, 8) length = q + (r != 0) shift_bits = 8 - r if r else 0 truncated_id = object_id[: length * 2] if len(truncated_id) < length * 2: truncated_id += "0" * (length * 2 - len(truncated_id)) truncated_id_bytes = bytes.fromhex(truncated_id) return int.from_bytes(truncated_id_bytes, byteorder="big") >> shift_bits def byte_ranges(numbits, start_object=None, end_object=None): """Generate start/end pairs of bytes spanning numbits bits and constrained by optional start_object and end_object. Args: numbits (int): Number of bits in which we divide input space start_object (str): Hex object id contained in the first range returned end_object (str): Hex object id contained in the last range returned Yields: 2^numbits pairs of bytes """ q, r = divmod(numbits, 8) length = q + (r != 0) shift_bits = 8 - r if r else 0 def to_bytes(i): return int.to_bytes(i << shift_bits, length=length, byteorder="big") start_offset = 0 end_offset = 1 << numbits if start_object is not None: start_offset = object_to_offset(start_object, numbits) if end_object is not None: end_offset = object_to_offset(end_object, numbits) + 1 for start in range(start_offset, end_offset): end = start + 1 if start == 0: yield None, to_bytes(end) elif end == 1 << numbits: yield to_bytes(start), None else: yield to_bytes(start), to_bytes(end) def raw_extrinsic_metadata_target_ranges(start_object=None, end_object=None): """Generate ranges of values for the `target` attribute of `raw_extrinsic_metadata` objects. This generates one range for all values before the first SWHID (which would correspond to raw origin URLs), then a number of hex-based ranges for each known type of SWHID (2**12 ranges for directories, 2**8 ranges for all other types). Finally, it generates one extra range for values above all possible SWHIDs. """ if start_object is None: start_object = "" swhid_target_types = sorted(type.value for type in ExtendedObjectType) first_swhid = f"swh:1:{swhid_target_types[0]}:" # Generate a range for url targets, if the starting object is before SWHIDs if start_object < first_swhid: yield start_object, ( first_swhid if end_object is None or end_object >= first_swhid else end_object ) if end_object is not None and end_object <= first_swhid: return # Prime the following loop, which uses the upper bound of the previous range # as lower bound, to account for potential targets between two valid types # of SWHIDs (even though they would eventually be rejected by the # RawExtrinsicMetadata parser, they /might/ exist...) end_swhid = first_swhid # Generate ranges for swhid targets for target_type in swhid_target_types: finished = False base_swhid = f"swh:1:{target_type}:" last_swhid = base_swhid + ("f" * 40) if start_object > last_swhid: continue # Generate 2**8 or 2**12 ranges for _, end in byte_ranges(12 if target_type == "dir" else 8): # Reuse previous uppper bound start_swhid = end_swhid # Use last_swhid for this object type if on the last byte range end_swhid = (base_swhid + end.hex()) if end is not None else last_swhid # Ignore out of bounds ranges if start_object >= end_swhid: continue # Potentially clamp start of range to the first object requested start_swhid = max(start_swhid, start_object) # Handle ending the loop early if the last requested object id is in # the current range if end_object is not None and end_swhid >= end_object: end_swhid = end_object finished = True yield start_swhid, end_swhid if finished: return # Generate one final range for potential raw origin URLs after the last # valid SWHID start_swhid = max(start_object, end_swhid) yield start_swhid, end_object def integer_ranges(start, end, block_size=1000): for start in range(start, end, block_size): if start == 0: yield None, block_size elif start + block_size > end: yield start, end else: yield start, start + block_size RANGE_GENERATORS = { "content": lambda start, end: byte_ranges(24, start, end), "skipped_content": lambda start, end: [(None, None)], "directory": lambda start, end: byte_ranges(24, start, end), "extid": lambda start, end: byte_ranges(24, start, end), "revision": lambda start, end: byte_ranges(24, start, end), "release": lambda start, end: byte_ranges(16, start, end), "raw_extrinsic_metadata": raw_extrinsic_metadata_target_ranges, "snapshot": lambda start, end: byte_ranges(16, start, end), "origin": integer_ranges, "origin_visit": integer_ranges, "origin_visit_status": integer_ranges, } def compute_query(obj_type, start, end): columns = COLUMNS.get(obj_type) join_specs = JOINS.get(obj_type, []) join_clause = "\n".join("left join %s" % clause for clause in join_specs) additional_where = EXTRA_WHERE.get(obj_type) where = [] where_args = [] if start: where.append("%(keys)s >= %%s") where_args.append(start) if end: where.append("%(keys)s < %%s") where_args.append(end) if additional_where: where.append(additional_where) where_clause = "" if where: where_clause = ("where " + " and ".join(where)) % { "keys": "(%s)" % PARTITION_KEY[obj_type] } column_specs = [] column_aliases = [] for column in columns: if isinstance(column, str): column_specs.append(column) column_aliases.append(column) else: column_specs.append("%s as %s" % column) column_aliases.append(column[1]) query = """ select %(columns)s from %(table)s %(join)s %(where)s """ % { "columns": ",".join(column_specs), "table": obj_type, "join": join_clause, "where": where_clause, } return query, where_args, column_aliases def fetch(db, obj_type, start, end): """Fetch all obj_type's identifiers from db. This opens one connection, stream objects and when done, close the connection. Args: db (BaseDb): Db connection object obj_type (str): Object type start (Union[bytes|Tuple]): Range start identifier end (Union[bytes|Tuple]): Range end identifier Raises: ValueError if obj_type is not supported Yields: Objects in the given range """ query, where_args, column_aliases = compute_query(obj_type, start, end) converter = CONVERTERS.get(obj_type) with db.cursor() as cursor: logger.debug("Fetching data for table %s", obj_type) logger.debug("query: %s %s", query, where_args) cursor.execute(query, where_args) for row in cursor: record = dict(zip(column_aliases, row)) if converter: record = converter(db, record) else: - record = object_converter_fn[obj_type](record) + record = OBJECT_CONVERTERS[obj_type](record) logger.debug("record: %s", record) yield record def _format_range_bound(bound): if isinstance(bound, bytes): return bound.hex() else: return str(bound) MANDATORY_KEYS = ["storage", "journal_writer"] class JournalBackfiller: """Class in charge of reading the storage's objects and sends those back to the journal's topics. This is designed to be run periodically. """ def __init__(self, config=None): self.config = config self.check_config(config) def check_config(self, config): missing_keys = [] for key in MANDATORY_KEYS: if not config.get(key): missing_keys.append(key) if missing_keys: raise ValueError( "Configuration error: The following keys must be" " provided: %s" % (",".join(missing_keys),) ) if "cls" not in config["storage"] or config["storage"]["cls"] not in ( "local", "postgresql", ): raise ValueError( "swh storage backfiller must be configured to use a local" " (PostgreSQL) storage" ) def parse_arguments(self, object_type, start_object, end_object): """Parse arguments Raises: ValueError for unsupported object type ValueError if object ids are not parseable Returns: Parsed start and end object ids """ if object_type not in COLUMNS: raise ValueError( "Object type %s is not supported. " "The only possible values are %s" % (object_type, ", ".join(sorted(COLUMNS.keys()))) ) if object_type in ["origin", "origin_visit", "origin_visit_status"]: if start_object: start_object = int(start_object) else: start_object = 0 if end_object: end_object = int(end_object) else: end_object = 100 * 1000 * 1000 # hard-coded limit return start_object, end_object def run(self, object_type, start_object, end_object, dry_run=False): """Reads storage's subscribed object types and send them to the journal's reading topic. """ start_object, end_object = self.parse_arguments( object_type, start_object, end_object ) db = BaseDb.connect(self.config["storage"]["db"]) writer = JournalWriter({"cls": "kafka", **self.config["journal_writer"]}) assert writer.journal is not None for range_start, range_end in RANGE_GENERATORS[object_type]( start_object, end_object ): logger.info( "Processing %s range %s to %s", object_type, _format_range_bound(range_start), _format_range_bound(range_end), ) objects = fetch(db, object_type, start=range_start, end=range_end) if not dry_run: writer.write_additions(object_type, objects) else: # only consume the objects iterator to check for any potential # decoding/encoding errors for obj in objects: pass if __name__ == "__main__": print('Please use the "swh-journal backfiller run" command') diff --git a/swh/storage/cli.py b/swh/storage/cli.py index 34fd17e0..881d0f40 100644 --- a/swh/storage/cli.py +++ b/swh/storage/cli.py @@ -1,227 +1,276 @@ # Copyright (C) 2015-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # WARNING: do not import unnecessary things here to keep cli startup time under # control import logging import os from typing import Dict, Optional import click from swh.core.cli import CONTEXT_SETTINGS from swh.core.cli import swh as swh_cli_group +from swh.storage.replay import ModelObjectDeserializer + try: from systemd.daemon import notify except ImportError: notify = None @swh_cli_group.group(name="storage", context_settings=CONTEXT_SETTINGS) @click.option( "--config-file", "-C", default=None, type=click.Path(exists=True, dir_okay=False,), help="Configuration file.", ) @click.option( "--check-config", default=None, type=click.Choice(["no", "read", "write"]), help=( "Check the configuration of the storage at startup for read or write access; " "if set, override the value present in the configuration file if any. " "Defaults to 'read' for the 'backfill' command, and 'write' for 'rpc-server' " "and 'replay' commands." ), ) @click.pass_context def storage(ctx, config_file, check_config): """Software Heritage Storage tools.""" from swh.core import config if not config_file: config_file = os.environ.get("SWH_CONFIG_FILENAME") if config_file: if not os.path.exists(config_file): raise ValueError("%s does not exist" % config_file) conf = config.read(config_file) else: conf = {} if "storage" not in conf: ctx.fail("You must have a storage configured in your config file.") ctx.ensure_object(dict) ctx.obj["config"] = conf ctx.obj["check_config"] = check_config @storage.command(name="rpc-serve") @click.option( "--host", default="0.0.0.0", metavar="IP", show_default=True, help="Host ip address to bind the server on", ) @click.option( "--port", default=5002, type=click.INT, metavar="PORT", show_default=True, help="Binding port of the server", ) @click.option( "--debug/--no-debug", default=True, help="Indicates if the server should run in debug mode", ) @click.pass_context def serve(ctx, host, port, debug): """Software Heritage Storage RPC server. Do NOT use this in a production environment. """ from swh.storage.api.server import app if "log_level" in ctx.obj: logging.getLogger("werkzeug").setLevel(ctx.obj["log_level"]) ensure_check_config(ctx.obj["config"], ctx.obj["check_config"], "write") app.config.update(ctx.obj["config"]) app.run(host, port=int(port), debug=bool(debug)) @storage.command() @click.argument("object_type") @click.option("--start-object", default=None) @click.option("--end-object", default=None) @click.option("--dry-run", is_flag=True, default=False) @click.pass_context def backfill(ctx, object_type, start_object, end_object, dry_run): """Run the backfiller The backfiller list objects from a Storage and produce journal entries from there. Typically used to rebuild a journal or compensate for missing objects in a journal (eg. due to a downtime of this later). The configuration file requires the following entries: - brokers: a list of kafka endpoints (the journal) in which entries will be added. - storage_dbconn: URL to connect to the storage DB. - prefix: the prefix of the topics (topics will be .). - client_id: the kafka client ID. """ ensure_check_config(ctx.obj["config"], ctx.obj["check_config"], "read") # for "lazy" loading from swh.storage.backfill import JournalBackfiller try: from systemd.daemon import notify except ImportError: notify = None conf = ctx.obj["config"] backfiller = JournalBackfiller(conf) if notify: notify("READY=1") try: backfiller.run( object_type=object_type, start_object=start_object, end_object=end_object, dry_run=dry_run, ) except KeyboardInterrupt: if notify: notify("STOPPING=1") ctx.exit(0) @storage.command() @click.option( "--stop-after-objects", "-n", default=None, type=int, help="Stop after processing this many objects. Default is to " "run forever.", ) +@click.option( + "--type", + "-t", + "object_types", + default=[], + type=click.Choice( + # use a hardcoded list to prevent having to load the + # replay module at cli loading time + [ + "origin", + "origin_visit", + "origin_visit_status", + "snapshot", + "revision", + "release", + "directory", + "content", + "skipped_content", + "metadata_authority", + "metadata_fetcher", + "raw_extrinsic_metadata", + "extid", + ] + ), + help="Object types to replay", + multiple=True, +) @click.pass_context -def replay(ctx, stop_after_objects): +def replay(ctx, stop_after_objects, object_types): """Fill a Storage by reading a Journal. There can be several 'replayers' filling a Storage as long as they use the same `group-id`. """ import functools from swh.journal.client import get_journal_client from swh.storage import get_storage from swh.storage.replay import process_replay_objects ensure_check_config(ctx.obj["config"], ctx.obj["check_config"], "write") conf = ctx.obj["config"] storage = get_storage(**conf.pop("storage")) + if "error_reporter" in conf: + from redis import Redis + + reporter = Redis(**conf["error_reporter"]).set + else: + reporter = None + validate = conf.get("privileged", False) + + if not validate and reporter: + ctx.fail( + "Invalid configuration: you cannot have 'error_reporter' set if " + "'privileged' is False; we cannot validate anonymized objects." + ) + + deserializer = ModelObjectDeserializer(reporter=reporter, validate=validate) + client_cfg = conf.pop("journal_client") + client_cfg["value_deserializer"] = deserializer.convert + if object_types: + client_cfg["object_types"] = object_types if stop_after_objects: client_cfg["stop_after_objects"] = stop_after_objects + try: client = get_journal_client(**client_cfg) except ValueError as exc: ctx.fail(exc) worker_fn = functools.partial(process_replay_objects, storage=storage) if notify: notify("READY=1") try: client.process(worker_fn) except KeyboardInterrupt: ctx.exit(0) else: print("Done.") finally: if notify: notify("STOPPING=1") client.close() def ensure_check_config(storage_cfg: Dict, check_config: Optional[str], default: str): """Helper function to inject the setting of check_config option in the storage config dict according to the expected default value (default value depends on the command, eg. backfill can be read-only). """ if check_config is not None: if check_config == "no": storage_cfg.pop("check_config", None) else: storage_cfg["check_config"] = {"check_write": check_config == "write"} else: if "check_config" not in storage_cfg: storage_cfg["check_config"] = {"check_write": default == "write"} def main(): logging.basicConfig() return serve(auto_envvar_prefix="SWH_STORAGE") if __name__ == "__main__": main() diff --git a/swh/storage/fixer.py b/swh/storage/fixer.py index 14b21c5e..1c29df49 100644 --- a/swh/storage/fixer.py +++ b/swh/storage/fixer.py @@ -1,331 +1,65 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import copy -import datetime import logging -from typing import Any, Dict, List, Optional +from typing import Any, Callable, Dict, List -from swh.model.model import Origin, TimestampWithTimezone +from swh.model.model import Origin logger = logging.getLogger(__name__) def _fix_content(content: Dict[str, Any]) -> Dict[str, Any]: """Filters-out invalid 'perms' key that leaked from swh.model.from_disk to the journal. >>> _fix_content({'perms': 0o100644, 'sha1_git': b'foo'}) {'sha1_git': b'foo'} >>> _fix_content({'sha1_git': b'bar'}) {'sha1_git': b'bar'} """ content = content.copy() content.pop("perms", None) return content -def _fix_revision_pypi_empty_string(rev): - """PyPI loader failed to encode empty strings as bytes, see: - swh:1:rev:8f0095ee0664867055d03de9bcc8f95b91d8a2b9 - or https://forge.softwareheritage.org/D1772 - """ - rev = { - **rev, - "author": rev["author"].copy(), - "committer": rev["committer"].copy(), - } - if rev["author"].get("email") == "": - rev["author"]["email"] = b"" - if rev["author"].get("name") == "": - rev["author"]["name"] = b"" - if rev["committer"].get("email") == "": - rev["committer"]["email"] = b"" - if rev["committer"].get("name") == "": - rev["committer"]["name"] = b"" - return rev - - -def _fix_revision_transplant_source(rev): - if rev.get("metadata") and rev["metadata"].get("extra_headers"): - rev = copy.deepcopy(rev) - rev["metadata"]["extra_headers"] = [ - [key, value.encode("ascii")] - if key == "transplant_source" and isinstance(value, str) - else [key, value] - for (key, value) in rev["metadata"]["extra_headers"] - ] - return rev - - -def _check_date(date): - """Returns whether the date can be represented in backends with sane - limits on timestamps and timezones (resp. signed 64-bits and - signed 16 bits), and that microseconds is valid (ie. between 0 and 10^6). - """ - if date is None: - return True - try: - TimestampWithTimezone.from_dict(date) - except ValueError: - return False - else: - return True - - -def _check_revision_date(rev): - """Exclude revisions with invalid dates. - See https://forge.softwareheritage.org/T1339""" - return _check_date(rev["date"]) and _check_date(rev["committer_date"]) - - -def _fix_revision(revision: Dict[str, Any]) -> Optional[Dict]: - """Fix various legacy revision issues. - - Fix author/committer person: - - >>> from pprint import pprint - >>> date = { - ... 'timestamp': { - ... 'seconds': 1565096932, - ... 'microseconds': 0, - ... }, - ... 'offset': 0, - ... } - >>> rev0 = _fix_revision({ - ... 'id': b'rev-id', - ... 'author': {'fullname': b'', 'name': '', 'email': ''}, - ... 'committer': {'fullname': b'', 'name': '', 'email': ''}, - ... 'date': date, - ... 'committer_date': date, - ... 'type': 'git', - ... 'message': '', - ... 'directory': b'dir-id', - ... 'synthetic': False, - ... }) - >>> rev0['author'] - {'fullname': b'', 'name': b'', 'email': b''} - >>> rev0['committer'] - {'fullname': b'', 'name': b'', 'email': b''} - - Fix type of 'transplant_source' extra headers: - - >>> rev1 = _fix_revision({ - ... 'id': b'rev-id', - ... 'author': {'fullname': b'', 'name': '', 'email': ''}, - ... 'committer': {'fullname': b'', 'name': '', 'email': ''}, - ... 'date': date, - ... 'committer_date': date, - ... 'metadata': { - ... 'extra_headers': [ - ... ['time_offset_seconds', b'-3600'], - ... ['transplant_source', '29c154a012a70f49df983625090434587622b39e'] - ... ]}, - ... 'type': 'git', - ... 'message': '', - ... 'directory': b'dir-id', - ... 'synthetic': False, - ... }) - >>> pprint(rev1['metadata']['extra_headers']) - [['time_offset_seconds', b'-3600'], - ['transplant_source', b'29c154a012a70f49df983625090434587622b39e']] - - Revision with invalid date are filtered: - - >>> from copy import deepcopy - >>> invalid_date1 = deepcopy(date) - >>> invalid_date1['timestamp']['microseconds'] = 1000000000 # > 10^6 - >>> rev = _fix_revision({ - ... 'author': {'fullname': b'', 'name': '', 'email': ''}, - ... 'committer': {'fullname': b'', 'name': '', 'email': ''}, - ... 'date': invalid_date1, - ... 'committer_date': date, - ... }) - >>> rev is None - True - - >>> invalid_date2 = deepcopy(date) - >>> invalid_date2['timestamp']['seconds'] = 2**70 # > 10^63 - >>> rev = _fix_revision({ - ... 'author': {'fullname': b'', 'name': '', 'email': ''}, - ... 'committer': {'fullname': b'', 'name': '', 'email': ''}, - ... 'date': invalid_date2, - ... 'committer_date': date, - ... }) - >>> rev is None - True - - >>> invalid_date3 = deepcopy(date) - >>> invalid_date3['offset'] = 2**20 # > 10^15 - >>> rev = _fix_revision({ - ... 'author': {'fullname': b'', 'name': '', 'email': ''}, - ... 'committer': {'fullname': b'', 'name': '', 'email': ''}, - ... 'date': date, - ... 'committer_date': invalid_date3, - ... }) - >>> rev is None - True - - """ # noqa - rev = _fix_revision_pypi_empty_string(revision) - rev = _fix_revision_transplant_source(rev) - if not _check_revision_date(rev): - logger.warning( - "Invalid revision date detected: %(revision)s", {"revision": rev} - ) - return None - return rev - - -def _fix_origin(origin: Dict) -> Dict: - """Fix legacy origin with type which is no longer part of the model. - - >>> from pprint import pprint - >>> pprint(_fix_origin({ - ... 'url': 'http://foo', - ... })) - {'url': 'http://foo'} - >>> pprint(_fix_origin({ - ... 'url': 'http://bar', - ... 'type': 'foo', - ... })) - {'url': 'http://bar'} - - """ - o = origin.copy() - o.pop("type", None) - return o - - -def _fix_origin_visit(visit: Dict) -> Dict: - """Fix various legacy origin visit issues. - - `visit['origin']` is a dict instead of an URL: - - >>> from datetime import datetime, timezone - >>> from pprint import pprint - >>> date = datetime(2020, 2, 27, 14, 39, 19, tzinfo=timezone.utc) - >>> pprint(_fix_origin_visit({ - ... 'origin': {'url': 'http://foo'}, - ... 'date': date, - ... 'type': 'git', - ... 'status': 'ongoing', - ... 'snapshot': None, - ... })) - {'date': datetime.datetime(2020, 2, 27, 14, 39, 19, tzinfo=datetime.timezone.utc), - 'origin': 'http://foo', - 'type': 'git'} - - `visit['type']` is missing , but `origin['visit']['type']` exists: - - >>> pprint(_fix_origin_visit( - ... {'origin': {'type': 'hg', 'url': 'http://foo'}, - ... 'date': date, - ... 'status': 'ongoing', - ... 'snapshot': None, - ... })) - {'date': datetime.datetime(2020, 2, 27, 14, 39, 19, tzinfo=datetime.timezone.utc), - 'origin': 'http://foo', - 'type': 'hg'} - - >>> pprint(_fix_origin_visit( - ... {'origin': {'type': 'hg', 'url': 'http://foo'}, - ... 'date': '2020-02-27 14:39:19+00:00', - ... 'status': 'ongoing', - ... 'snapshot': None, - ... })) - {'date': datetime.datetime(2020, 2, 27, 14, 39, 19, tzinfo=datetime.timezone.utc), - 'origin': 'http://foo', - 'type': 'hg'} - - Old visit format (origin_visit with no type) raises: - - >>> _fix_origin_visit({ - ... 'origin': {'url': 'http://foo'}, - ... 'date': date, - ... 'status': 'ongoing', - ... 'snapshot': None - ... }) - Traceback (most recent call last): - ... - ValueError: Old origin visit format detected... - - >>> _fix_origin_visit({ - ... 'origin': 'http://foo', - ... 'date': date, - ... 'status': 'ongoing', - ... 'snapshot': None - ... }) - Traceback (most recent call last): - ... - ValueError: Old origin visit format detected... - - """ # noqa - visit = visit.copy() - if "type" not in visit: - if isinstance(visit["origin"], dict) and "type" in visit["origin"]: - # Very old version of the schema: visits did not have a type, - # but their 'origin' field was a dict with a 'type' key. - visit["type"] = visit["origin"]["type"] - else: - # Very old schema version: 'type' is missing, stop early - - # We expect the journal's origin_visit topic to no longer reference - # such visits. If it does, the replayer must crash so we can fix - # the journal's topic. - raise ValueError(f"Old origin visit format detected: {visit}") - if isinstance(visit["origin"], dict): - # Old version of the schema: visit['origin'] was a dict. - visit["origin"] = visit["origin"]["url"] - date = visit["date"] - if isinstance(date, str): - visit["date"] = datetime.datetime.fromisoformat(date) - # Those are no longer part of the model - for key in ["status", "snapshot", "metadata"]: - visit.pop(key, None) - return visit - - def _fix_raw_extrinsic_metadata(obj_dict: Dict) -> Dict: """Fix legacy RawExtrinsicMetadata with type which is no longer part of the model. >>> _fix_raw_extrinsic_metadata({ ... 'type': 'directory', ... 'target': 'swh:1:dir:460a586d1c95d120811eaadb398d534e019b5243', ... }) {'target': 'swh:1:dir:460a586d1c95d120811eaadb398d534e019b5243'} >>> _fix_raw_extrinsic_metadata({ ... 'type': 'origin', ... 'target': 'https://inria.halpreprod.archives-ouvertes.fr/hal-01667309', ... }) {'target': 'swh:1:ori:155291d5b9ada4570672510509f93fcfd9809882'} """ o = obj_dict.copy() if o.pop("type", None) == "origin": o["target"] = str(Origin(o["target"]).swhid()) return o +object_fixers: Dict[str, Callable[[Dict], Dict]] = { + "content": _fix_content, + "raw_extrinsic_metadata": _fix_raw_extrinsic_metadata, +} + + def fix_objects(object_type: str, objects: List[Dict]) -> List[Dict]: """ Fix legacy objects from the journal to bring them up to date with the latest storage schema. """ - if object_type == "content": - return [_fix_content(v) for v in objects] - elif object_type == "revision": - revisions = [_fix_revision(v) for v in objects] - return [rev for rev in revisions if rev is not None] - elif object_type == "origin": - return [_fix_origin(v) for v in objects] - elif object_type == "origin_visit": - return [_fix_origin_visit(v) for v in objects] - elif object_type == "raw_extrinsic_metadata": - return [_fix_raw_extrinsic_metadata(v) for v in objects] - else: - return objects + if object_type in object_fixers: + fixer = object_fixers[object_type] + objects = [fixer(v) for v in objects] + return objects diff --git a/swh/storage/replay.py b/swh/storage/replay.py index a022c566..eb86a16b 100644 --- a/swh/storage/replay.py +++ b/swh/storage/replay.py @@ -1,184 +1,231 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from collections import Counter +from functools import partial import logging -from typing import Any, Callable, Container, Dict, List +from typing import Any, Callable +from typing import Counter as CounterT +from typing import Dict, List, Optional, TypeVar, Union, cast try: from systemd.daemon import notify except ImportError: notify = None from swh.core.statsd import statsd +from swh.journal.serializers import kafka_to_value +from swh.model.hashutil import hash_to_hex from swh.model.model import ( BaseContent, BaseModel, Content, Directory, ExtID, + HashableObject, MetadataAuthority, MetadataFetcher, Origin, OriginVisit, OriginVisitStatus, RawExtrinsicMetadata, Release, Revision, SkippedContent, Snapshot, ) -from swh.storage.exc import HashCollision -from swh.storage.fixer import fix_objects +from swh.storage.exc import HashCollision, StorageArgumentException from swh.storage.interface import StorageInterface +from swh.storage.utils import remove_keys logger = logging.getLogger(__name__) GRAPH_OPERATIONS_METRIC = "swh_graph_replayer_operations_total" GRAPH_DURATION_METRIC = "swh_graph_replayer_duration_seconds" -object_converter_fn: Dict[str, Callable[[Dict], BaseModel]] = { +OBJECT_CONVERTERS: Dict[str, Callable[[Dict], BaseModel]] = { "origin": Origin.from_dict, "origin_visit": OriginVisit.from_dict, "origin_visit_status": OriginVisitStatus.from_dict, "snapshot": Snapshot.from_dict, "revision": Revision.from_dict, "release": Release.from_dict, "directory": Directory.from_dict, "content": Content.from_dict, "skipped_content": SkippedContent.from_dict, "metadata_authority": MetadataAuthority.from_dict, "metadata_fetcher": MetadataFetcher.from_dict, "raw_extrinsic_metadata": RawExtrinsicMetadata.from_dict, "extid": ExtID.from_dict, } +# Deprecated, for BW compat only. +object_converter_fn = OBJECT_CONVERTERS + + +OBJECT_FIXERS = { + "revision": partial(remove_keys, keys=("metadata",)), +} + + +class ModelObjectDeserializer: + + """A swh.journal object deserializer that checks object validity and reports + invalid objects + + The deserializer will directly produce BaseModel objects from journal + objects representations. + + If validation is activated and the object is hashable, it will check if the + computed hash matches the identifier of the object. + + If the object is invalid and a 'reporter' function is given, it will be + called with 2 arguments:: + + reporter(object_id, journal_msg) + + Where 'object_id' is a string representation of the object identifier (from + the journal message), and 'journal_msg' is the row message (bytes) + retrieved from the journal. + + If 'raise_on_error' is True, a 'StorageArgumentException' exception is + raised. + + Typical usage:: + + deserializer = ModelObjectDeserializer(validate=True, reporter=reporter_cb) + client = get_journal_client( + cls="kafka", value_deserializer=deserializer, **cfg) + + """ + + def __init__( + self, + validate: bool = True, + raise_on_error: bool = False, + reporter: Optional[Callable[[str, bytes], None]] = None, + ): + self.validate = validate + self.reporter = reporter + self.raise_on_error = raise_on_error + + def convert(self, object_type: str, msg: bytes) -> Optional[BaseModel]: + dict_repr = kafka_to_value(msg) + if object_type in OBJECT_FIXERS: + dict_repr = OBJECT_FIXERS[object_type](dict_repr) + obj = OBJECT_CONVERTERS[object_type](dict_repr) + if self.validate: + if isinstance(obj, HashableObject): + cid = obj.compute_hash() + if obj.id != cid: + error_msg = ( + f"Object has id {hash_to_hex(obj.id)}, " + f"but it should be {hash_to_hex(cid)}: {obj}" + ) + logger.error(error_msg) + self.report_failure(msg, obj) + if self.raise_on_error: + raise StorageArgumentException(error_msg) + return None + return obj + + def report_failure(self, msg: bytes, obj: BaseModel): + if self.reporter: + oid: str = "" + if hasattr(obj, "swhid"): + swhid = obj.swhid() # type: ignore[attr-defined] + oid = str(swhid) + elif isinstance(obj, HashableObject): + uid = obj.compute_hash() + oid = f"{obj.object_type}:{uid.hex()}" # type: ignore[attr-defined] + if oid: + self.reporter(oid, msg) def process_replay_objects( - all_objects: Dict[str, List[Dict[str, Any]]], *, storage: StorageInterface + all_objects: Dict[str, List[BaseModel]], *, storage: StorageInterface ) -> None: for (object_type, objects) in all_objects.items(): logger.debug("Inserting %s %s objects", len(objects), object_type) with statsd.timed(GRAPH_DURATION_METRIC, tags={"object_type": object_type}): _insert_objects(object_type, objects, storage) statsd.increment( GRAPH_OPERATIONS_METRIC, len(objects), tags={"object_type": object_type} ) if notify: notify("WATCHDOG=1") +ContentType = TypeVar("ContentType", bound=BaseContent) + + def collision_aware_content_add( - content_add_fn: Callable[[List[Any]], Dict[str, int]], contents: List[BaseContent], -) -> None: + contents: List[ContentType], + content_add_fn: Callable[[List[ContentType]], Dict[str, int]], +) -> Dict[str, int]: """Add contents to storage. If a hash collision is detected, an error is logged. Then this adds the other non colliding contents to the storage. Args: content_add_fn: Storage content callable contents: List of contents or skipped contents to add to storage """ if not contents: - return + return {} colliding_content_hashes: List[Dict[str, Any]] = [] + results: CounterT[str] = Counter() while True: try: - content_add_fn(contents) + results.update(content_add_fn(contents)) except HashCollision as e: colliding_content_hashes.append( { "algo": e.algo, "hash": e.hash_id, # hex hash id "objects": e.colliding_contents, # hex hashes } ) colliding_hashes = e.colliding_content_hashes() # Drop the colliding contents from the transaction contents = [c for c in contents if c.hashes() not in colliding_hashes] else: # Successfully added contents, we are done break if colliding_content_hashes: for collision in colliding_content_hashes: logger.error("Collision detected: %(collision)s", {"collision": collision}) - - -def dict_key_dropper(d: Dict, keys_to_drop: Container) -> Dict: - """Returns a copy of the dict d without any key listed in keys_to_drop""" - return {k: v for (k, v) in d.items() if k not in keys_to_drop} + return dict(results) def _insert_objects( - object_type: str, objects: List[Dict], storage: StorageInterface + object_type: str, objects: List[BaseModel], storage: StorageInterface ) -> None: """Insert objects of type object_type in the storage. """ - objects = fix_objects(object_type, objects) - - if object_type == "content": - # for bw compat, skipped content should now be delivered in the skipped_content - # topic - contents: List[BaseContent] = [] - skipped_contents: List[BaseContent] = [] - for content in objects: - c = BaseContent.from_dict(content) - if isinstance(c, SkippedContent): - logger.warning( - "Received a series of skipped_content in the " - "content topic, this should not happen anymore" - ) - skipped_contents.append(c) - else: - contents.append(c) - collision_aware_content_add(storage.skipped_content_add, skipped_contents) - collision_aware_content_add(storage.content_add_metadata, contents) - elif object_type == "skipped_content": - skipped_contents = [SkippedContent.from_dict(obj) for obj in objects] - collision_aware_content_add(storage.skipped_content_add, skipped_contents) + if object_type not in OBJECT_CONVERTERS: + logger.warning("Received a series of %s, this should not happen", object_type) + return + + method = getattr(storage, f"{object_type}_add") + if object_type == "skipped_content": + method = partial(collision_aware_content_add, content_add_fn=method) + elif object_type == "content": + method = partial( + collision_aware_content_add, content_add_fn=storage.content_add_metadata + ) elif object_type in ("origin_visit", "origin_visit_status"): origins: List[Origin] = [] - converter_fn = object_converter_fn[object_type] - model_objs = [] - for obj in objects: - origins.append(Origin(url=obj["origin"])) - model_objs.append(converter_fn(obj)) + for obj in cast(List[Union[OriginVisit, OriginVisitStatus]], objects): + origins.append(Origin(url=obj.origin)) storage.origin_add(origins) - method = getattr(storage, f"{object_type}_add") - method(model_objs) elif object_type == "raw_extrinsic_metadata": - converted = [RawExtrinsicMetadata.from_dict(o) for o in objects] - authorities = {emd.authority for emd in converted} - fetchers = {emd.fetcher for emd in converted} + emds = cast(List[RawExtrinsicMetadata], objects) + authorities = {emd.authority for emd in emds} + fetchers = {emd.fetcher for emd in emds} storage.metadata_authority_add(list(authorities)) storage.metadata_fetcher_add(list(fetchers)) - storage.raw_extrinsic_metadata_add(converted) - elif object_type == "revision": - # drop the metadata field from the revision (is any); this field is - # about to be dropped from the data model (in favor of - # raw_extrinsic_metadata) and there can be bogus values in the existing - # journal (metadata with \0000 in it) - method = getattr(storage, object_type + "_add") - method( - [ - object_converter_fn[object_type](dict_key_dropper(o, ("metadata",))) - for o in objects - ] - ) - elif object_type in ( - "directory", - "extid", - "revision", - "release", - "snapshot", - "origin", - "metadata_fetcher", - "metadata_authority", - ): - method = getattr(storage, object_type + "_add") - method([object_converter_fn[object_type](o) for o in objects]) - else: - logger.warning("Received a series of %s, this should not happen", object_type) + method(objects) diff --git a/swh/storage/tests/test_backfill.py b/swh/storage/tests/test_backfill.py index b1a90520..787cad12 100644 --- a/swh/storage/tests/test_backfill.py +++ b/swh/storage/tests/test_backfill.py @@ -1,298 +1,301 @@ # Copyright (C) 2019-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import functools import logging from unittest.mock import patch import pytest from swh.journal.client import JournalClient from swh.model.tests.swh_model_data import TEST_OBJECTS from swh.storage import get_storage from swh.storage.backfill import ( PARTITION_KEY, JournalBackfiller, byte_ranges, compute_query, raw_extrinsic_metadata_target_ranges, ) from swh.storage.in_memory import InMemoryStorage -from swh.storage.replay import process_replay_objects +from swh.storage.replay import ModelObjectDeserializer, process_replay_objects from swh.storage.tests.test_replay import check_replayed TEST_CONFIG = { "journal_writer": { "brokers": ["localhost"], "prefix": "swh.tmp_journal.new", "client_id": "swh.journal.client.test", }, "storage": {"cls": "postgresql", "db": "service=swh-dev"}, } def test_config_ko_missing_mandatory_key(): """Missing configuration key will make the initialization fail """ for key in TEST_CONFIG.keys(): config = TEST_CONFIG.copy() config.pop(key) with pytest.raises(ValueError) as e: JournalBackfiller(config) error = "Configuration error: The following keys must be provided: %s" % ( ",".join([key]), ) assert e.value.args[0] == error def test_config_ko_unknown_object_type(): """Parse arguments will fail if the object type is unknown """ backfiller = JournalBackfiller(TEST_CONFIG) with pytest.raises(ValueError) as e: backfiller.parse_arguments("unknown-object-type", 1, 2) error = ( "Object type unknown-object-type is not supported. " "The only possible values are %s" % (", ".join(sorted(PARTITION_KEY))) ) assert e.value.args[0] == error def test_compute_query_content(): query, where_args, column_aliases = compute_query("content", "\x000000", "\x000001") assert where_args == ["\x000000", "\x000001"] assert column_aliases == [ "sha1", "sha1_git", "sha256", "blake2s256", "length", "status", "ctime", ] assert ( query == """ select sha1,sha1_git,sha256,blake2s256,length,status,ctime from content where (sha1) >= %s and (sha1) < %s """ ) def test_compute_query_skipped_content(): query, where_args, column_aliases = compute_query("skipped_content", None, None) assert where_args == [] assert column_aliases == [ "sha1", "sha1_git", "sha256", "blake2s256", "length", "ctime", "status", "reason", ] assert ( query == """ select sha1,sha1_git,sha256,blake2s256,length,ctime,status,reason from skipped_content """ ) def test_compute_query_origin_visit(): query, where_args, column_aliases = compute_query("origin_visit", 1, 10) assert where_args == [1, 10] assert column_aliases == [ "visit", "type", "origin", "date", ] assert ( query == """ select visit,type,origin.url as origin,date from origin_visit left join origin on origin_visit.origin=origin.id where (origin_visit.origin) >= %s and (origin_visit.origin) < %s """ ) def test_compute_query_release(): query, where_args, column_aliases = compute_query("release", "\x000002", "\x000003") assert where_args == ["\x000002", "\x000003"] assert column_aliases == [ "id", "date", "date_offset", "date_neg_utc_offset", "comment", "name", "synthetic", "target", "target_type", "author_id", "author_name", "author_email", "author_fullname", ] assert ( query == """ select release.id as id,date,date_offset,date_neg_utc_offset,comment,release.name as name,synthetic,target,target_type,a.id as author_id,a.name as author_name,a.email as author_email,a.fullname as author_fullname from release left join person a on release.author=a.id where (release.id) >= %s and (release.id) < %s """ # noqa ) @pytest.mark.parametrize("numbits", [2, 3, 8, 16]) def test_byte_ranges(numbits): ranges = list(byte_ranges(numbits)) assert len(ranges) == 2 ** numbits assert ranges[0][0] is None assert ranges[-1][1] is None bounds = [] for i, (left, right) in enumerate(zip(ranges[:-1], ranges[1:])): assert left[1] == right[0], f"Mismatched bounds in {i}th range" bounds.append(left[1]) assert bounds == sorted(bounds) def test_raw_extrinsic_metadata_target_ranges(): ranges = list(raw_extrinsic_metadata_target_ranges()) assert ranges[0][0] == "" assert ranges[-1][1] is None bounds = [] for i, (left, right) in enumerate(zip(ranges[:-1], ranges[1:])): assert left[1] == right[0], f"Mismatched bounds in {i}th range" bounds.append(left[1]) assert bounds == sorted(bounds) RANGE_GENERATORS = { "content": lambda start, end: [(None, None)], "skipped_content": lambda start, end: [(None, None)], "directory": lambda start, end: [(None, None)], "extid": lambda start, end: [(None, None)], "metadata_authority": lambda start, end: [(None, None)], "metadata_fetcher": lambda start, end: [(None, None)], "revision": lambda start, end: [(None, None)], "release": lambda start, end: [(None, None)], "snapshot": lambda start, end: [(None, None)], "origin": lambda start, end: [(None, 10000)], "origin_visit": lambda start, end: [(None, 10000)], "origin_visit_status": lambda start, end: [(None, 10000)], "raw_extrinsic_metadata": lambda start, end: [(None, None)], } @patch("swh.storage.backfill.RANGE_GENERATORS", RANGE_GENERATORS) def test_backfiller( swh_storage_backend_config, kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, caplog, ): prefix1 = f"{kafka_prefix}-1" prefix2 = f"{kafka_prefix}-2" journal1 = { "cls": "kafka", "brokers": [kafka_server], "client_id": "kafka_writer-1", "prefix": prefix1, } swh_storage_backend_config["journal_writer"] = journal1 storage = get_storage(**swh_storage_backend_config) - # fill the storage and the journal (under prefix1) for object_type, objects in TEST_OBJECTS.items(): method = getattr(storage, object_type + "_add") method(objects) # now apply the backfiller on the storage to fill the journal under prefix2 backfiller_config = { "journal_writer": { "brokers": [kafka_server], "client_id": "kafka_writer-2", "prefix": prefix2, }, "storage": swh_storage_backend_config, } # Backfilling backfiller = JournalBackfiller(backfiller_config) for object_type in TEST_OBJECTS: backfiller.run(object_type, None, None) # Trace log messages for unhandled object types in the replayer caplog.set_level(logging.DEBUG, "swh.storage.replay") # now check journal content are the same under both topics # use the replayer scaffolding to fill storages to make is a bit easier # Replaying #1 + deserializer = ModelObjectDeserializer() sto1 = get_storage(cls="memory") replayer1 = JournalClient( brokers=kafka_server, group_id=f"{kafka_consumer_group}-1", prefix=prefix1, stop_on_eof=True, + value_deserializer=deserializer.convert, ) + worker_fn1 = functools.partial(process_replay_objects, storage=sto1) replayer1.process(worker_fn1) # Replaying #2 sto2 = get_storage(cls="memory") replayer2 = JournalClient( brokers=kafka_server, group_id=f"{kafka_consumer_group}-2", prefix=prefix2, stop_on_eof=True, + value_deserializer=deserializer.convert, ) worker_fn2 = functools.partial(process_replay_objects, storage=sto2) replayer2.process(worker_fn2) # Compare storages assert isinstance(sto1, InMemoryStorage) # needed to help mypy assert isinstance(sto2, InMemoryStorage) check_replayed(sto1, sto2) for record in caplog.records: assert ( "this should not happen" not in record.message ), "Replayer ignored some message types, see captured logging" diff --git a/swh/storage/tests/test_cli.py b/swh/storage/tests/test_cli.py index 87121b93..f472dbf1 100644 --- a/swh/storage/tests/test_cli.py +++ b/swh/storage/tests/test_cli.py @@ -1,111 +1,125 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import copy import logging import re import tempfile from unittest.mock import patch from click.testing import CliRunner from confluent_kafka import Producer import pytest import yaml from swh.journal.serializers import key_to_kafka, value_to_kafka from swh.model.model import Snapshot, SnapshotBranch, TargetType from swh.storage import get_storage from swh.storage.cli import storage as cli +from swh.storage.replay import OBJECT_CONVERTERS logger = logging.getLogger(__name__) CLI_CONFIG = { "storage": {"cls": "memory",}, } @pytest.fixture def swh_storage(): """An swh-storage object that gets injected into the CLI functions.""" storage = get_storage(**CLI_CONFIG["storage"]) with patch("swh.storage.get_storage") as get_storage_mock: get_storage_mock.return_value = storage yield storage @pytest.fixture def monkeypatch_retry_sleep(monkeypatch): from swh.journal.replay import copy_object, obj_in_objstorage monkeypatch.setattr(copy_object.retry, "sleep", lambda x: None) monkeypatch.setattr(obj_in_objstorage.retry, "sleep", lambda x: None) def invoke(*args, env=None, journal_config=None): config = copy.deepcopy(CLI_CONFIG) if journal_config: config["journal_client"] = journal_config.copy() config["journal_client"]["cls"] = "kafka" runner = CliRunner() with tempfile.NamedTemporaryFile("a", suffix=".yml") as config_fd: yaml.dump(config, config_fd) config_fd.seek(0) args = ["-C" + config_fd.name] + list(args) ret = runner.invoke(cli, args, obj={"log_level": logging.DEBUG}, env=env,) return ret def test_replay( swh_storage, kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, ): kafka_prefix += ".swh.journal.objects" producer = Producer( { "bootstrap.servers": kafka_server, "client.id": "test-producer", "acks": "all", } ) snapshot = Snapshot( branches={ b"HEAD": SnapshotBranch( target_type=TargetType.REVISION, target=b"\x01" * 20, ) }, ) snapshot_dict = snapshot.to_dict() producer.produce( topic=kafka_prefix + ".snapshot", key=key_to_kafka(snapshot.id), value=value_to_kafka(snapshot_dict), ) producer.flush() logger.debug("Flushed producer") result = invoke( "replay", "--stop-after-objects", "1", journal_config={ "brokers": [kafka_server], "group_id": kafka_consumer_group, "prefix": kafka_prefix, }, ) expected = r"Done.\n" assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output assert swh_storage.snapshot_get(snapshot.id) == { **snapshot_dict, "next_branch": None, } + + +def test_replay_type_list(): + result = invoke("replay", "--help",) + assert result.exit_code == 0, result.output + types_in_help = re.findall("--type [[]([a-z_|]+)[]]", result.output) + assert len(types_in_help) == 1 + types = types_in_help[0].split("|") + + assert sorted(types) == sorted(list(OBJECT_CONVERTERS.keys())), ( + "Make sure the list of accepted types in cli.py " + "matches implementation in replay.py" + ) diff --git a/swh/storage/tests/test_replay.py b/swh/storage/tests/test_replay.py index 6562f2aa..8c13fa95 100644 --- a/swh/storage/tests/test_replay.py +++ b/swh/storage/tests/test_replay.py @@ -1,375 +1,544 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import dataclasses import datetime import functools import logging -from typing import Any, Container, Dict, Optional +import re +from typing import Any, Container, Dict, Optional, cast import attr import pytest from swh.journal.client import JournalClient -from swh.journal.serializers import key_to_kafka, value_to_kafka +from swh.journal.serializers import kafka_to_value, key_to_kafka, value_to_kafka from swh.model.hashutil import DEFAULT_ALGORITHMS, MultiHash, hash_to_bytes, hash_to_hex from swh.model.model import Revision, RevisionType from swh.model.tests.swh_model_data import ( COMMITTERS, DATES, DUPLICATE_CONTENTS, REVISIONS, ) from swh.model.tests.swh_model_data import TEST_OBJECTS as _TEST_OBJECTS from swh.storage import get_storage from swh.storage.cassandra.model import ContentRow, SkippedContentRow +from swh.storage.exc import StorageArgumentException from swh.storage.in_memory import InMemoryStorage -from swh.storage.replay import process_replay_objects +from swh.storage.replay import ModelObjectDeserializer, process_replay_objects UTC = datetime.timezone.utc TEST_OBJECTS = _TEST_OBJECTS.copy() +# add a revision with metadata to check this later is dropped while being replayed TEST_OBJECTS["revision"] = list(_TEST_OBJECTS["revision"]) + [ Revision( - id=hash_to_bytes("a569b03ebe6e5f9f2f6077355c40d89bd6986d0c"), + id=hash_to_bytes("51d9d94ab08d3f75512e3a9fd15132e0a7ca7928"), message=b"hello again", date=DATES[1], committer=COMMITTERS[1], author=COMMITTERS[0], committer_date=DATES[0], type=RevisionType.GIT, directory=b"\x03" * 20, synthetic=False, metadata={"something": "interesting"}, parents=(REVISIONS[0].id,), ), ] +WRONG_ID_REG = re.compile( + "Object has id [0-9a-f]{40}, but it should be [0-9a-f]{40}: .*" +) def nullify_ctime(obj): if isinstance(obj, (ContentRow, SkippedContentRow)): return dataclasses.replace(obj, ctime=None) else: return obj @pytest.fixture() def replayer_storage_and_client( kafka_prefix: str, kafka_consumer_group: str, kafka_server: str ): journal_writer_config = { "cls": "kafka", "brokers": [kafka_server], "client_id": "kafka_writer", "prefix": kafka_prefix, } storage_config: Dict[str, Any] = { "cls": "memory", "journal_writer": journal_writer_config, } storage = get_storage(**storage_config) + deserializer = ModelObjectDeserializer() replayer = JournalClient( brokers=kafka_server, group_id=kafka_consumer_group, prefix=kafka_prefix, stop_on_eof=True, + value_deserializer=deserializer.convert, ) yield storage, replayer def test_storage_replayer(replayer_storage_and_client, caplog): """Optimal replayer scenario. This: - writes objects to a source storage - replayer consumes objects from the topic and replays them - a destination storage is filled from this In the end, both storages should have the same content. """ src, replayer = replayer_storage_and_client # Fill Kafka using a source storage nb_sent = 0 for object_type, objects in TEST_OBJECTS.items(): method = getattr(src, object_type + "_add") method(objects) if object_type == "origin_visit": nb_sent += len(objects) # origin-visit-add adds origin-visit-status as well nb_sent += len(objects) caplog.set_level(logging.ERROR, "swh.journal.replay") # Fill the destination storage from Kafka dst = get_storage(cls="memory") worker_fn = functools.partial(process_replay_objects, storage=dst) nb_inserted = replayer.process(worker_fn) assert nb_sent == nb_inserted assert isinstance(src, InMemoryStorage) # needed to help mypy assert isinstance(dst, InMemoryStorage) check_replayed(src, dst) collision = 0 for record in caplog.records: logtext = record.getMessage() if "Colliding contents:" in logtext: collision += 1 assert collision == 0, "No collision should be detected" -def test_storage_play_with_collision(replayer_storage_and_client, caplog): +def test_storage_replay_with_collision(replayer_storage_and_client, caplog): """Another replayer scenario with collisions. This: - writes objects to the topic, including colliding contents - replayer consumes objects from the topic and replay them - This drops the colliding contents from the replay when detected """ src, replayer = replayer_storage_and_client # Fill Kafka using a source storage nb_sent = 0 for object_type, objects in TEST_OBJECTS.items(): method = getattr(src, object_type + "_add") method(objects) if object_type == "origin_visit": nb_sent += len(objects) # origin-visit-add adds origin-visit-status as well nb_sent += len(objects) # Create collision in input data # These should not be written in the destination producer = src.journal_writer.journal.producer prefix = src.journal_writer.journal._prefix for content in DUPLICATE_CONTENTS: topic = f"{prefix}.content" key = content.sha1 now = datetime.datetime.now(tz=UTC) content = attr.evolve(content, ctime=now) producer.produce( topic=topic, key=key_to_kafka(key), value=value_to_kafka(content.to_dict()), ) nb_sent += 1 producer.flush() caplog.set_level(logging.ERROR, "swh.journal.replay") # Fill the destination storage from Kafka dst = get_storage(cls="memory") worker_fn = functools.partial(process_replay_objects, storage=dst) nb_inserted = replayer.process(worker_fn) assert nb_sent == nb_inserted # check the logs for the collision being properly detected nb_collisions = 0 actual_collision: Dict for record in caplog.records: logtext = record.getMessage() if "Collision detected:" in logtext: nb_collisions += 1 actual_collision = record.args["collision"] assert nb_collisions == 1, "1 collision should be detected" algo = "sha1" assert actual_collision["algo"] == algo expected_colliding_hash = hash_to_hex(DUPLICATE_CONTENTS[0].get_hash(algo)) assert actual_collision["hash"] == expected_colliding_hash actual_colliding_hashes = actual_collision["objects"] assert len(actual_colliding_hashes) == len(DUPLICATE_CONTENTS) for content in DUPLICATE_CONTENTS: expected_content_hashes = { k: hash_to_hex(v) for k, v in content.hashes().items() } assert expected_content_hashes in actual_colliding_hashes # all objects from the src should exists in the dst storage assert isinstance(src, InMemoryStorage) # needed to help mypy assert isinstance(dst, InMemoryStorage) # needed to help mypy check_replayed(src, dst, exclude=["contents"]) # but the dst has one content more (one of the 2 colliding ones) assert ( len(list(src._cql_runner._contents.iter_all())) == len(list(dst._cql_runner._contents.iter_all())) - 1 ) def test_replay_skipped_content(replayer_storage_and_client): """Test the 'skipped_content' topic is properly replayed.""" src, replayer = replayer_storage_and_client _check_replay_skipped_content(src, replayer, "skipped_content") -def test_replay_skipped_content_bwcompat(replayer_storage_and_client): - """Test the 'content' topic can be used to replay SkippedContent objects.""" - src, replayer = replayer_storage_and_client - _check_replay_skipped_content(src, replayer, "content") - - # utility functions def check_replayed( src: InMemoryStorage, dst: InMemoryStorage, exclude: Optional[Container] = None, expected_anonymized=False, ): """Simple utility function to compare the content of 2 in_memory storages""" def fix_expected(attr, row): if expected_anonymized: if attr == "releases": row = dataclasses.replace( row, author=row.author and row.author.anonymize() ) elif attr == "revisions": row = dataclasses.replace( row, author=row.author.anonymize(), committer=row.committer.anonymize(), ) if attr == "revisions": # the replayer should now drop the metadata attribute; see # swh/storgae/replay.py:_insert_objects() row.metadata = "null" return row for attr_ in ( "contents", "skipped_contents", "directories", "extid", "revisions", "releases", "snapshots", "origins", "origin_visits", "origin_visit_statuses", "raw_extrinsic_metadata", ): if exclude and attr_ in exclude: continue expected_objects = [ (id, nullify_ctime(fix_expected(attr_, obj))) for id, obj in sorted(getattr(src._cql_runner, f"_{attr_}").iter_all()) ] got_objects = [ (id, nullify_ctime(obj)) for id, obj in sorted(getattr(dst._cql_runner, f"_{attr_}").iter_all()) ] assert got_objects == expected_objects, f"Mismatch object list for {attr_}" def _check_replay_skipped_content(storage, replayer, topic): skipped_contents = _gen_skipped_contents(100) nb_sent = len(skipped_contents) producer = storage.journal_writer.journal.producer prefix = storage.journal_writer.journal._prefix for i, obj in enumerate(skipped_contents): producer.produce( topic=f"{prefix}.{topic}", key=key_to_kafka({"sha1": obj["sha1"]}), value=value_to_kafka(obj), ) producer.flush() dst_storage = get_storage(cls="memory") worker_fn = functools.partial(process_replay_objects, storage=dst_storage) nb_inserted = replayer.process(worker_fn) assert nb_sent == nb_inserted for content in skipped_contents: assert not storage.content_find({"sha1": content["sha1"]}) # no skipped_content_find API endpoint, so use this instead assert not list(dst_storage.skipped_content_missing(skipped_contents)) def _updated(d1, d2): d1.update(d2) d1.pop("data", None) return d1 def _gen_skipped_contents(n=10): # we do not use the hypothesis strategy here because this does not play well with # pytest fixtures, and it makes test execution very slow algos = DEFAULT_ALGORITHMS | {"length"} now = datetime.datetime.now(tz=UTC) return [ _updated( MultiHash.from_data(data=f"foo{i}".encode(), hash_names=algos).digest(), { "status": "absent", "reason": "why not", "origin": f"https://somewhere/{i}", "ctime": now, }, ) for i in range(n) ] @pytest.mark.parametrize("privileged", [True, False]) -def test_storage_play_anonymized( +def test_storage_replay_anonymized( kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, privileged: bool, ): """Optimal replayer scenario. This: - writes objects to the topic - replayer consumes objects from the topic and replay them This tests the behavior with both a privileged and non-privileged replayer """ writer_config = { "cls": "kafka", "brokers": [kafka_server], "client_id": "kafka_writer", "prefix": kafka_prefix, "anonymize": True, } src_config: Dict[str, Any] = {"cls": "memory", "journal_writer": writer_config} storage = get_storage(**src_config) # Fill the src storage nb_sent = 0 for obj_type, objs in TEST_OBJECTS.items(): if obj_type in ("origin_visit", "origin_visit_status"): # these are unrelated with what we want to test here continue method = getattr(storage, obj_type + "_add") method(objs) nb_sent += len(objs) # Fill a destination storage from Kafka, potentially using privileged topics dst_storage = get_storage(cls="memory") + deserializer = ModelObjectDeserializer( + validate=False + ) # we cannot validate an anonymized replay replayer = JournalClient( brokers=kafka_server, group_id=kafka_consumer_group, prefix=kafka_prefix, stop_after_objects=nb_sent, privileged=privileged, + value_deserializer=deserializer.convert, ) worker_fn = functools.partial(process_replay_objects, storage=dst_storage) nb_inserted = replayer.process(worker_fn) replayer.consumer.commit() assert nb_sent == nb_inserted # Check the contents of the destination storage, and whether the anonymization was # properly used assert isinstance(storage, InMemoryStorage) # needed to help mypy assert isinstance(dst_storage, InMemoryStorage) check_replayed(storage, dst_storage, expected_anonymized=not privileged) + + +def test_storage_replayer_with_validation_ok( + replayer_storage_and_client, caplog, redisdb +): + """Optimal replayer scenario + + with validation activated and reporter set to a redis db. + + - writes objects to a source storage + - replayer consumes objects from the topic and replays them + - a destination storage is filled from this + - nothing has been reported in the redis db + - both storages should have the same content + """ + src, replayer = replayer_storage_and_client + replayer.deserializer = ModelObjectDeserializer(validate=True, reporter=redisdb.set) + + # Fill Kafka using a source storage + nb_sent = 0 + for object_type, objects in TEST_OBJECTS.items(): + method = getattr(src, object_type + "_add") + method(objects) + if object_type == "origin_visit": + nb_sent += len(objects) # origin-visit-add adds origin-visit-status as well + nb_sent += len(objects) + + # Fill the destination storage from Kafka + dst = get_storage(cls="memory") + worker_fn = functools.partial(process_replay_objects, storage=dst) + nb_inserted = replayer.process(worker_fn) + assert nb_sent == nb_inserted + + # check we do not have invalid objects reported + invalid = 0 + for record in caplog.records: + logtext = record.getMessage() + if WRONG_ID_REG.match(logtext): + invalid += 1 + assert invalid == 0, "Invalid objects should not be detected" + assert not redisdb.keys() + # so the dst should be the same as src storage + check_replayed(cast(InMemoryStorage, src), cast(InMemoryStorage, dst)) + + +def test_storage_replayer_with_validation_nok( + replayer_storage_and_client, caplog, redisdb +): + """Replayer scenario with invalid objects + + with validation and reporter set to a redis db. + + - writes objects to a source storage + - replayer consumes objects from the topic and replays them + - the destination storage is filled with only valid objects + - the redis db contains the invalid (raw kafka mesg) objects + """ + src, replayer = replayer_storage_and_client + replayer.value_deserializer = ModelObjectDeserializer( + validate=True, reporter=redisdb.set + ).convert + + caplog.set_level(logging.ERROR, "swh.journal.replay") + + # Fill Kafka using a source storage + nb_sent = 0 + for object_type, objects in TEST_OBJECTS.items(): + method = getattr(src, object_type + "_add") + method(objects) + if object_type == "origin_visit": + nb_sent += len(objects) # origin-visit-add adds origin-visit-status as well + nb_sent += len(objects) + + # insert invalid objects + for object_type in ("revision", "directory", "release", "snapshot"): + method = getattr(src, object_type + "_add") + method([attr.evolve(TEST_OBJECTS[object_type][0], id=b"\x00" * 20)]) + nb_sent += 1 + + # Fill the destination storage from Kafka + dst = get_storage(cls="memory") + worker_fn = functools.partial(process_replay_objects, storage=dst) + nb_inserted = replayer.process(worker_fn) + assert nb_sent == nb_inserted + + # check we do have invalid objects reported + invalid = 0 + for record in caplog.records: + logtext = record.getMessage() + if WRONG_ID_REG.match(logtext): + invalid += 1 + assert invalid == 4, "Invalid objects should be detected" + assert set(redisdb.keys()) == { + f"swh:1:{typ}:{'0'*40}".encode() for typ in ("rel", "rev", "snp", "dir") + } + + for key in redisdb.keys(): + # check the stored value looks right + rawvalue = redisdb.get(key) + value = kafka_to_value(rawvalue) + assert isinstance(value, dict) + assert "id" in value + assert value["id"] == b"\x00" * 20 + + # check that invalid objects did not reach the dst storage + for attr_ in ( + "directories", + "revisions", + "releases", + "snapshots", + ): + for id, obj in sorted(getattr(dst._cql_runner, f"_{attr_}").iter_all()): + assert id != b"\x00" * 20 + + +def test_storage_replayer_with_validation_nok_raises( + replayer_storage_and_client, caplog, redisdb +): + """Replayer scenario with invalid objects + + with raise_on_error set to True + + This: + - writes both valid & invalid objects to a source storage + - a StorageArgumentException should be raised while replayer consumes + objects from the topic and replays them + """ + src, replayer = replayer_storage_and_client + replayer.value_deserializer = ModelObjectDeserializer( + validate=True, reporter=redisdb.set, raise_on_error=True + ).convert + + caplog.set_level(logging.ERROR, "swh.journal.replay") + + # Fill Kafka using a source storage + nb_sent = 0 + for object_type, objects in TEST_OBJECTS.items(): + method = getattr(src, object_type + "_add") + method(objects) + if object_type == "origin_visit": + nb_sent += len(objects) # origin-visit-add adds origin-visit-status as well + nb_sent += len(objects) + + # insert invalid objects + for object_type in ("revision", "directory", "release", "snapshot"): + method = getattr(src, object_type + "_add") + method([attr.evolve(TEST_OBJECTS[object_type][0], id=b"\x00" * 20)]) + nb_sent += 1 + + # Fill the destination storage from Kafka + dst = get_storage(cls="memory") + worker_fn = functools.partial(process_replay_objects, storage=dst) + with pytest.raises(StorageArgumentException): + replayer.process(worker_fn) + + # check we do have invalid objects reported + invalid = 0 + for record in caplog.records: + logtext = record.getMessage() + if WRONG_ID_REG.match(logtext): + invalid += 1 + assert invalid == 1, "One invalid objects should be detected" + assert len(redisdb.keys()) == 1