diff --git a/CONTRIBUTORS b/CONTRIBUTORS index b89f3e04..94c2b86e 100644 --- a/CONTRIBUTORS +++ b/CONTRIBUTORS @@ -1,3 +1,4 @@ Daniele Serafini Ishan Bhanuka +Kumar Shivendu Quentin Campos diff --git a/PKG-INFO b/PKG-INFO index 2969975b..dd946283 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,224 +1,223 @@ Metadata-Version: 2.1 Name: swh.storage -Version: 0.27.3 +Version: 0.27.4 Summary: Software Heritage storage manager Home-page: https://forge.softwareheritage.org/diffusion/DSTO/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-storage Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-storage/ Description: swh-storage =========== Abstraction layer over the archive, allowing to access all stored source code artifacts as well as their metadata. See the [documentation](https://docs.softwareheritage.org/devel/swh-storage/index.html) for more details. ## Quick start ### Dependencies Python tests for this module include tests that cannot be run without a local Postgresql database, so you need the Postgresql server executable on your machine (no need to have a running Postgresql server). They also expect a cassandra server. #### Debian-like host ``` $ sudo apt install libpq-dev postgresql-11 cassandra ``` #### Non Debian-like host The tests expects the path to `cassandra` to either be unspecified, it is then looked up at `/usr/sbin/cassandra`, either specified through the environment variable `SWH_CASSANDRA_BIN`. Optionally, you can avoid running the cassandra tests. ``` (swh) :~/swh-storage$ tox -- -m 'not cassandra' ``` ### Installation It is strongly recommended to use a virtualenv. In the following, we consider you work in a virtualenv named `swh`. See the [developer setup guide](https://docs.softwareheritage.org/devel/developer-setup.html#developer-setup) for a more details on how to setup a working environment. You can install the package directly from [pypi](https://pypi.org/p/swh.storage): ``` (swh) :~$ pip install swh.storage [...] ``` Or from sources: ``` (swh) :~$ git clone https://forge.softwareheritage.org/source/swh-storage.git [...] (swh) :~$ cd swh-storage (swh) :~/swh-storage$ pip install . [...] ``` Then you can check it's properly installed: ``` (swh) :~$ swh storage --help Usage: swh storage [OPTIONS] COMMAND [ARGS]... Software Heritage Storage tools. Options: -h, --help Show this message and exit. Commands: rpc-serve Software Heritage Storage RPC server. ``` ## Tests The best way of running Python tests for this module is to use [tox](https://tox.readthedocs.io/). ``` (swh) :~$ pip install tox ``` ### tox From the sources directory, simply use tox: ``` (swh) :~/swh-storage$ tox [...] ========= 315 passed, 6 skipped, 15 warnings in 40.86 seconds ========== _______________________________ summary ________________________________ flake8: commands succeeded py3: commands succeeded congratulations :) ``` Note: it is possible to set the `JAVA_HOME` environment variable to specify the version of the JVM to be used by Cassandra. For example, at the time of writing this, Cassandra does not support java 14, so one may want to use for example java 11: ``` (swh) :~/swh-storage$ export JAVA_HOME=/usr/lib/jvm/java-14-openjdk-amd64/bin/java (swh) :~/swh-storage$ tox [...] ``` ## Development The storage server can be locally started. It requires a configuration file and a running Postgresql database. ### Sample configuration A typical configuration `storage.yml` file is: ``` storage: cls: local db: "dbname=softwareheritage-dev user= password=" objstorage: cls: pathslicing root: /tmp/swh-storage/ slicing: 0:2/2:4/4:6 ``` which means, this uses: - a local storage instance whose db connection is to `softwareheritage-dev` local instance, - the objstorage uses a local objstorage instance whose: - `root` path is /tmp/swh-storage, - slicing scheme is `0:2/2:4/4:6`. This means that the identifier of the content (sha1) which will be stored on disk at first level with the first 2 hex characters, the second level with the next 2 hex characters and the third level with the next 2 hex characters. And finally the complete hash file holding the raw content. For example: 00062f8bd330715c4f819373653d97b3cd34394c will be stored at 00/06/2f/00062f8bd330715c4f819373653d97b3cd34394c Note that the `root` path should exist on disk before starting the server. ### Starting the storage server If the python package has been properly installed (e.g. in a virtual env), you should be able to use the command: ``` (swh) :~/swh-storage$ swh storage rpc-serve storage.yml ``` This runs a local swh-storage api at 5002 port. ``` (swh) :~/swh-storage$ curl http://127.0.0.1:5002 Software Heritage storage server

You have reached the Software Heritage storage server.
See its documentation and API for more information

``` ### And then what? In your upper layer ([loader-git](https://forge.softwareheritage.org/source/swh-loader-git/), [loader-svn](https://forge.softwareheritage.org/source/swh-loader-svn/), etc...), you can define a remote storage with this snippet of yaml configuration. ``` storage: cls: remote url: http://localhost:5002/ ``` You could directly define a local storage with the following snippet: ``` storage: cls: local db: service=swh-dev objstorage: cls: pathslicing root: /home/storage/swh-storage/ slicing: 0:2/2:4/4:6 ``` Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing -Provides-Extra: schemata Provides-Extra: journal diff --git a/docs/cli.rst b/docs/cli.rst index 74e78cc2..d7c349e0 100644 --- a/docs/cli.rst +++ b/docs/cli.rst @@ -1,8 +1,8 @@ .. _swh-storage-cli: Command-line interface ====================== .. click:: swh.storage.cli:storage - :prog: swh storage - :nested: full + :prog: swh storage + :nested: full diff --git a/docs/extrinsic-metadata-specification.rst b/docs/extrinsic-metadata-specification.rst index 351ab825..f1522ec7 100644 --- a/docs/extrinsic-metadata-specification.rst +++ b/docs/extrinsic-metadata-specification.rst @@ -1,345 +1,347 @@ :orphan: .. _extrinsic-metadata-specification: Extrinsic metadata specification ================================ :term:`Extrinsic metadata` is information about software that is not part of the source code itself but still closely related to the software. Typical sources for extrinsic metadata are: the hosting place of a repository, which can offer metadata via its web view or API; external registries like collaborative curation initiatives; and out-of-band information available at source code archival time. Since they are not part of the source code, a dedicated mechanism to fetch and store them is needed. This specification assumes the reader is familiar with Software Heritage's :ref:`architecture` and :ref:`data-model`. Metadata sources ---------------- Authorities ^^^^^^^^^^^ Metadata authorities are entities that provide metadata about an :term:`origin`. Metadata authorities include: code hosting places, :term:`deposit` submitters, and registries (eg. Wikidata). An authority is uniquely defined by these properties: - * its type, representing the kind of authority, which is one of these values: - * `deposit_client`, for metadata pushed to Software Heritage at the same time - as a software artifact - * `forge`, for metadata pulled from the same source as the one hosting - the software artifacts (which includes package managers) - * `registry`, for metadata pulled from a third-party - * its URL, which unambiguously identifies an instance of the authority type. +* its type, representing the kind of authority, which is one of these values: + + * ``deposit_client``, for metadata pushed to Software Heritage at the same time + as a software artifact + * ``forge``, for metadata pulled from the same source as the one hosting + the software artifacts (which includes package managers) + * ``registry``, for metadata pulled from a third-party + +* its URL, which unambiguously identifies an instance of the authority type. Examples: =============== ================================= type url =============== ================================= deposit_client https://hal.archives-ouvertes.fr/ deposit_client https://hal.inria.fr/ deposit_client https://software.intel.com/ forge https://gitlab.com/ forge https://gitlab.inria.fr/ forge https://0xacab.org/ forge https://github.com/ registry https://www.wikidata.org/ registry https://swmath.org/ registry https://ascl.net/ =============== ================================= Metadata fetchers ^^^^^^^^^^^^^^^^^ Metadata fetchers are software components used to fetch metadata from a metadata authority, and ingest them into the Software Heritage archive. A metadata fetcher is uniquely defined by these properties: * its type * its version Examples: * :term:`loaders `, which may either discover metadata as a side-effect of loading source code, or be dedicated to fetching metadata. * :term:`listers `, which may discover metadata as a side-effect of discovering origins. * :term:`deposit` submitters, which push metadata to SWH from a third-party; usually at the same time as a :term:`software artifact` * crawlers, which fetch metadata from an authority in a way that is none of the above (eg. by querying a specific API of the origin's forge). Storage API ----------- Authorities and metadata fetchers ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ The :term:`storage` API offers these endpoints to manipulate metadata authorities and metadata fetchers: * ``metadata_authority_add(type, url, metadata)`` which adds a new metadata authority to the storage. * ``metadata_authority_get(type, url)`` which looks up a known authority (there is at most one) and if it is known, returns a dictionary with keys ``type``, ``url``, and ``metadata``. * ``metadata_fetcher_add(name, version, metadata)`` which adds a new metadata fetcher to the storage. * ``metadata_fetcher_get(name, version)`` which looks up a known fetcher (there is at most one) and if it is known, returns a dictionary with keys ``name``, ``version``, and ``metadata``. These `metadata` fields contain JSON-encodable dictionaries with information about the authority/fetcher, in a format specific to each authority/fetcher. With authority, the `metadata` field is reserved for information describing and qualifying the authority. With fetchers, the `metadata` field is reserved for configuration metadata and other technical usage. Origin metadata ^^^^^^^^^^^^^^^ Extrinsic metadata are stored in SWH's :term:`storage database`. The storage API offers three endpoints to manipulate origin metadata: * Adding metadata:: raw_extrinsic_metadata_add( "origin", origin_url, discovery_date, authority, fetcher, format, metadata ) which adds a new `metadata` byte string obtained from a given authority and associated to the origin. `discovery_date` is a Python datetime. `authority` must be a dict containing keys `type` and `url`, and `fetcher` a dict containing keys `name` and `version`. The authority and fetcher must be known to the storage before using this endpoint. `format` is a text field indicating the format of the content of the `metadata` byte string, see `extrinsic-metadata-formats`_. * Getting latest metadata:: raw_extrinsic_metadata_get_latest( "origin", origin_url, authority ) where `authority` must be a dict containing keys `type` and `url`, which returns a dictionary corresponding to the latest metadata entry added from this origin, in the format:: { 'origin_url': ..., 'authority': {'type': ..., 'url': ...}, 'fetcher': {'name': ..., 'version': ...}, 'discovery_date': ..., 'format': '...', 'metadata': b'...' } * Getting all metadata:: raw_extrinsic_metadata_get( "origin", origin_url, authority, page_token, limit ) where `authority` must be a dict containing keys `type` and `url` which returns a dictionary with keys: * `next_page_token`, which is an opaque token to be used as `page_token` for retrieving the next page. if absent, there is no more pages to gather. * `results`: list of dictionaries, one for each metadata item deposited, corresponding to the given origin and obtained from the specified authority. Each of these dictionaries is in the following format:: { 'authority': {'type': ..., 'url': ...}, 'fetcher': {'name': ..., 'version': ...}, 'discovery_date': ..., 'format': '...', 'metadata': b'...' } The parameters ``page_token`` and ``limit`` are used for pagination based on an arbitrary order. An initial query to ``origin_metadata_get`` must set ``page_token`` to ``None``, and further query must use the value from the previous query's ``next_page_token`` to get the next page of results. ``metadata`` is a bytes array (eventually encoded using Base64). Its format is specific to each authority; and is treated as an opaque value by the storage. Unifying these various formats into a common language is outside the scope of this specification. Artifact metadata ^^^^^^^^^^^^^^^^^ In addition to origin metadata, the storage database stores metadata on all software artifacts supported by the data model. This works similarly to origin metadata, with one major difference: extrinsic metadata can be given on a specific artifact within a specified context (for example: a directory in a specific revision from a specific visit on a specific origin) which will be stored along the metadata itself. For example, two origins may develop the same file independently; the information about authorship, licensing or even description may vary about the same artifact in a different context. This is why it is important to qualify the metadata with the complete context for which it is intended, if any. The same two endpoints as for origin can be used, but with a different value for the first argument: * Adding metadata:: raw_extrinsic_metadata_add( type, id, context, discovery_date, authority, fetcher, format, metadata ) * Getting all metadata:: raw_extrinsic_metadata_get( type, id, authority, after, page_token, limit ) definited similarly to ``origin_metadata_add`` and ``origin_metadata_get``, but where ``id`` is a core SWHID (with type matching ````), and with an extra ``context`` (argument when adding metadata, and dictionary key when getting them) that is a dictionary with keys depending on the artifact ``type``: * for ``snapshot``: ``origin`` (a URL) and ``visit`` (an integer) * for ``release``: those above, plus ``snapshot`` (the core SWHID of a snapshot) * for ``revision``: all those above, plus ``release`` (the core SWHID of a release) * for ``directory``: all those above, plus ``revision`` (the core SWHID of a revision) and ``path`` (a byte string), representing the path to this directory from the root of the ``revision`` * for ``content``: all those above, plus ``directory`` (the core SWHID of a directory) All keys are optional, but should be provided whenever possible. The dictionary may be empty, if metadata is fully independent from context. In all cases, ``visit`` should only be provided if ``origin`` is (as visit ids are only unique with respect to an origin). .. _extrinsic-metadata-formats: Extrinsic metadata format ------------------------- Here is a list of all the metadata format stored: ``pypi-project-json`` The metadata is a release entry from a PyPI project's JSON file, extracted and re-serialized. ``replicate-npm-package-json`` ditto, but from a replicate.npmjs.com project ``nixguix-sources-json`` ditto, but from https://nix-community.github.io/nixpkgs-swh/ ``original-artifacts-json`` tarball data, see below ``sword-v2-atom-codemeta`` XML Atom document, with Codemeta metadata, as sent by a deposit client, see the :ref:`Deposit protocol reference `. ``sword-v2-atom-codemeta-v2`` Deprecated alias of ``sword-v2-atom-codemeta`` ``sword-v2-atom-codemeta-v2-in-json`` Deprecated, JSON serialization of a ``sword-v2-atom-codemeta`` document. ``xml-deposit-info`` Information about a deposit, to identify the provenance of a metadata object sent via swh-deposit, see below Details on some of these formats: original-artifacts-json ^^^^^^^^^^^^^^^^^^^^^^^ This is a loosely defined format, originally used as a ``metadata`` column on the ``revision`` table that changed over the years. It is a JSON array, and each entry is a JSON object representing an archive (tarball, zipball, ...) that was unpackaged by the SWH loader before loading its content in Software Heritage. When writing this specification, it was stabilized to this format:: [ { "length": , "filename": "", "checksums": { "sha1": "", "sha256": "", }, "url": "" }, ... ] Older ``original-artifacts-json`` were migrated to use this format, but may be missing some of the keys. xml-deposit-info ^^^^^^^^^^^^^^^^ Deposits with code objects are loaded as their own origin, so we can look them up in the deposit database from their metadata (which hold the origin as a context). This is not true for metadata-only deposits, because we don't create an origin for them; so we need to store this information somewhere. The naive solution would be to insert them in the Atom entry provided by the client, but it means altering a document before we archive it, which potentially corrupts it or loses part of the data. Therefore, on each metadata-only deposit, the deposit creates an extra "metametadata" object, with the original metadata object as target, and using this format:: {{ deposit.id }} {{ deposit.client.provider_url }} {{ deposit.collection.name }} diff --git a/mypy.ini b/mypy.ini index da53e716..b5e02744 100644 --- a/mypy.ini +++ b/mypy.ini @@ -1,63 +1,60 @@ [mypy] namespace_packages = True # due to the conditional import logic on swh.journal, in some cases a specific # type: ignore is needed, in other it isn't... warn_unused_ignores = False -# support for sqlalchemy magic: see https://github.com/dropbox/sqlalchemy-stubs -plugins = sqlmypy - # 3rd party libraries without stubs (yet) [mypy-cassandra.*] ignore_missing_imports = True [mypy-confluent_kafka.*] ignore_missing_imports = True [mypy-deprecated.*] ignore_missing_imports = True # only shipped indirectly via hypothesis [mypy-django.*] ignore_missing_imports = True [mypy-iso8601.*] ignore_missing_imports = True [mypy-msgpack.*] ignore_missing_imports = True [mypy-multiprocessing.util] ignore_missing_imports = True [mypy-pkg_resources.*] ignore_missing_imports = True [mypy-psycopg2.*] ignore_missing_imports = True [mypy-pytest.*] ignore_missing_imports = True [mypy-pytest_cov.*] ignore_missing_imports = True [mypy-pytest_kafka.*] ignore_missing_imports = True [mypy-systemd.daemon.*] ignore_missing_imports = True [mypy-tenacity.*] ignore_missing_imports = True # temporary work-around for landing typing support in spite of the current # journal<->storage dependency loop [mypy-swh.journal.*] ignore_missing_imports = True [mypy-pytest_postgresql.*] ignore_missing_imports = True diff --git a/requirements-test.txt b/requirements-test.txt index 07dc055f..c6a0316d 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,10 +1,9 @@ hypothesis >= 3.11.0 pytest pytest-mock -sqlalchemy-stubs # pytz is in fact a dep of swh.model[testing] and should not be necessary, but # the dep on swh.model in the main requirements-swh.txt file shadows this one # adding the [testing] extra. swh.model[testing] >= 0.0.50 pytz pytest-xdist diff --git a/setup.py b/setup.py index a980a6ba..c5263a3d 100755 --- a/setup.py +++ b/setup.py @@ -1,75 +1,74 @@ #!/usr/bin/env python3 # Copyright (C) 2015-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from io import open from os import path from setuptools import find_packages, setup here = path.abspath(path.dirname(__file__)) # Get the long description from the README file with open(path.join(here, "README.md"), encoding="utf-8") as f: long_description = f.read() def parse_requirements(name=None): if name: reqf = "requirements-%s.txt" % name else: reqf = "requirements.txt" requirements = [] if not path.exists(reqf): return requirements with open(reqf) as f: for line in f.readlines(): line = line.strip() if not line or line.startswith("#"): continue requirements.append(line) return requirements setup( name="swh.storage", description="Software Heritage storage manager", long_description=long_description, long_description_content_type="text/markdown", python_requires=">=3.7", author="Software Heritage developers", author_email="swh-devel@inria.fr", url="https://forge.softwareheritage.org/diffusion/DSTO/", setup_requires=["setuptools-scm"], packages=find_packages(), use_scm_version=True, scripts=["bin/swh-storage-add-dir",], entry_points=""" [swh.cli.subcommands] storage=swh.storage.cli """, install_requires=parse_requirements() + parse_requirements("swh"), extras_require={ "testing": (parse_requirements("test") + parse_requirements("swh-journal")), - "schemata": ["SQLAlchemy"], "journal": parse_requirements("swh-journal"), }, include_package_data=True, classifiers=[ "Programming Language :: Python :: 3", "Intended Audience :: Developers", "License :: OSI Approved :: GNU General Public License v3 (GPLv3)", "Operating System :: OS Independent", "Development Status :: 5 - Production/Stable", ], project_urls={ "Bug Reports": "https://forge.softwareheritage.org/maniphest", "Funding": "https://www.softwareheritage.org/donate", "Source": "https://forge.softwareheritage.org/source/swh-storage", "Documentation": "https://docs.softwareheritage.org/devel/swh-storage/", }, ) diff --git a/sql/Makefile b/sql/Makefile index 7aa86d35..fa852cae 100644 --- a/sql/Makefile +++ b/sql/Makefile @@ -1,71 +1,70 @@ # Depends: postgresql-client, postgresql-autodoc -DBNAME = softwareheritage-dev +# When running with pifpaf, $PGDATABASE is already set in the environment +PGDATABASE ?= softwareheritage-dev DOCDIR = autodoc SQL_FILES = $(sort $(wildcard $(CURDIR)/../swh/storage/sql/*.sql)) PSQL_BIN = psql PSQL_FLAGS = --echo-errors -X -v ON_ERROR_STOP=1 PSQL = $(PSQL_BIN) $(PSQL_FLAGS) PIFPAF=$(findstring postgresql://,$(PIFPAF_URLS)) all: createdb: createdb-stamp createdb-stamp: $(SQL_FILES) -ifeq ($(PIFPAF),) - -dropdb $(DBNAME) -endif - createdb $(DBNAME) + -dropdb $(PGDATABASE) + createdb $(PGDATABASE) ifeq ($(PIFPAF),) touch $@ else rm -f $@ endif filldb: filldb-stamp filldb-stamp: createdb-stamp - cat $(SQL_FILES) | $(PSQL) $(DBNAME) + cat $(SQL_FILES) | $(PSQL) $(PGDATABASE) ifeq ($(PIFPAF),) touch $@ else rm -f $@ endif dropdb: - -dropdb $(DBNAME) + -dropdb $(PGDATABASE) dumpdb: swh.dump swh.dump: filldb-stamp - pg_dump -Fc $(DBNAME) > $@ + pg_dump -Fc $(PGDATABASE) > $@ $(DOCDIR): test -d $(DOCDIR)/ || mkdir $(DOCDIR) doc: autodoc-stamp $(DOCDIR)/db-schema.pdf $(DOCDIR)/db-schema.svg autodoc-stamp: filldb-stamp $(DOCDIR) - postgresql_autodoc -d $(DBNAME) -f $(DOCDIR)/db-schema + postgresql_autodoc -d $(PGDATABASE) -f $(DOCDIR)/db-schema cp -a $(DOCDIR)/db-schema.dot $(DOCDIR)/db-schema.dot.orig ifeq ($(PIFPAF),) touch $@ else rm -f $@ endif $(DOCDIR)/db-schema.dot: clusters.dot autodoc-stamp $(DOCDIR) bin/dot_add_content $(DOCDIR)/db-schema.dot.orig clusters.dot > $(DOCDIR)/db-schema.dot $(DOCDIR)/db-schema.pdf: $(DOCDIR)/db-schema.dot autodoc-stamp dot -T pdf $< > $@ $(DOCDIR)/db-schema.svg: $(DOCDIR)/db-schema.dot autodoc-stamp dot -T svg $< > $@ clean: rm -rf *-stamp $(DOCDIR)/ distclean: clean dropdb rm -f swh.dump .PHONY: all initdb createdb dropdb doc clean diff --git a/swh.storage.egg-info/PKG-INFO b/swh.storage.egg-info/PKG-INFO index 2969975b..dd946283 100644 --- a/swh.storage.egg-info/PKG-INFO +++ b/swh.storage.egg-info/PKG-INFO @@ -1,224 +1,223 @@ Metadata-Version: 2.1 Name: swh.storage -Version: 0.27.3 +Version: 0.27.4 Summary: Software Heritage storage manager Home-page: https://forge.softwareheritage.org/diffusion/DSTO/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-storage Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-storage/ Description: swh-storage =========== Abstraction layer over the archive, allowing to access all stored source code artifacts as well as their metadata. See the [documentation](https://docs.softwareheritage.org/devel/swh-storage/index.html) for more details. ## Quick start ### Dependencies Python tests for this module include tests that cannot be run without a local Postgresql database, so you need the Postgresql server executable on your machine (no need to have a running Postgresql server). They also expect a cassandra server. #### Debian-like host ``` $ sudo apt install libpq-dev postgresql-11 cassandra ``` #### Non Debian-like host The tests expects the path to `cassandra` to either be unspecified, it is then looked up at `/usr/sbin/cassandra`, either specified through the environment variable `SWH_CASSANDRA_BIN`. Optionally, you can avoid running the cassandra tests. ``` (swh) :~/swh-storage$ tox -- -m 'not cassandra' ``` ### Installation It is strongly recommended to use a virtualenv. In the following, we consider you work in a virtualenv named `swh`. See the [developer setup guide](https://docs.softwareheritage.org/devel/developer-setup.html#developer-setup) for a more details on how to setup a working environment. You can install the package directly from [pypi](https://pypi.org/p/swh.storage): ``` (swh) :~$ pip install swh.storage [...] ``` Or from sources: ``` (swh) :~$ git clone https://forge.softwareheritage.org/source/swh-storage.git [...] (swh) :~$ cd swh-storage (swh) :~/swh-storage$ pip install . [...] ``` Then you can check it's properly installed: ``` (swh) :~$ swh storage --help Usage: swh storage [OPTIONS] COMMAND [ARGS]... Software Heritage Storage tools. Options: -h, --help Show this message and exit. Commands: rpc-serve Software Heritage Storage RPC server. ``` ## Tests The best way of running Python tests for this module is to use [tox](https://tox.readthedocs.io/). ``` (swh) :~$ pip install tox ``` ### tox From the sources directory, simply use tox: ``` (swh) :~/swh-storage$ tox [...] ========= 315 passed, 6 skipped, 15 warnings in 40.86 seconds ========== _______________________________ summary ________________________________ flake8: commands succeeded py3: commands succeeded congratulations :) ``` Note: it is possible to set the `JAVA_HOME` environment variable to specify the version of the JVM to be used by Cassandra. For example, at the time of writing this, Cassandra does not support java 14, so one may want to use for example java 11: ``` (swh) :~/swh-storage$ export JAVA_HOME=/usr/lib/jvm/java-14-openjdk-amd64/bin/java (swh) :~/swh-storage$ tox [...] ``` ## Development The storage server can be locally started. It requires a configuration file and a running Postgresql database. ### Sample configuration A typical configuration `storage.yml` file is: ``` storage: cls: local db: "dbname=softwareheritage-dev user= password=" objstorage: cls: pathslicing root: /tmp/swh-storage/ slicing: 0:2/2:4/4:6 ``` which means, this uses: - a local storage instance whose db connection is to `softwareheritage-dev` local instance, - the objstorage uses a local objstorage instance whose: - `root` path is /tmp/swh-storage, - slicing scheme is `0:2/2:4/4:6`. This means that the identifier of the content (sha1) which will be stored on disk at first level with the first 2 hex characters, the second level with the next 2 hex characters and the third level with the next 2 hex characters. And finally the complete hash file holding the raw content. For example: 00062f8bd330715c4f819373653d97b3cd34394c will be stored at 00/06/2f/00062f8bd330715c4f819373653d97b3cd34394c Note that the `root` path should exist on disk before starting the server. ### Starting the storage server If the python package has been properly installed (e.g. in a virtual env), you should be able to use the command: ``` (swh) :~/swh-storage$ swh storage rpc-serve storage.yml ``` This runs a local swh-storage api at 5002 port. ``` (swh) :~/swh-storage$ curl http://127.0.0.1:5002 Software Heritage storage server

You have reached the Software Heritage storage server.
See its documentation and API for more information

``` ### And then what? In your upper layer ([loader-git](https://forge.softwareheritage.org/source/swh-loader-git/), [loader-svn](https://forge.softwareheritage.org/source/swh-loader-svn/), etc...), you can define a remote storage with this snippet of yaml configuration. ``` storage: cls: remote url: http://localhost:5002/ ``` You could directly define a local storage with the following snippet: ``` storage: cls: local db: service=swh-dev objstorage: cls: pathslicing root: /home/storage/swh-storage/ slicing: 0:2/2:4/4:6 ``` Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing -Provides-Extra: schemata Provides-Extra: journal diff --git a/swh.storage.egg-info/requires.txt b/swh.storage.egg-info/requires.txt index d5df35c0..d78ca92c 100644 --- a/swh.storage.egg-info/requires.txt +++ b/swh.storage.egg-info/requires.txt @@ -1,29 +1,25 @@ click flask psycopg2 aiohttp tenacity cassandra-driver!=3.21.0,>=3.19.0 deprecated typing-extensions mypy_extensions iso8601 swh.core[db,http]>=0.5 swh.model>=2.1.0 swh.objstorage>=0.2.2 [journal] swh.journal>=0.6.2 -[schemata] -SQLAlchemy - [testing] hypothesis>=3.11.0 pytest pytest-mock -sqlalchemy-stubs swh.model[testing]>=0.0.50 pytz pytest-xdist swh.journal>=0.6.2 diff --git a/swh/storage/__init__.py b/swh/storage/__init__.py index 37d1bebe..5518f91c 100644 --- a/swh/storage/__init__.py +++ b/swh/storage/__init__.py @@ -1,110 +1,111 @@ # Copyright (C) 2015-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import importlib from typing import TYPE_CHECKING, Any, Dict, List import warnings if TYPE_CHECKING: from .interface import StorageInterface STORAGE_IMPLEMENTATIONS = { "local": ".postgresql.storage.Storage", "remote": ".api.client.RemoteStorage", "memory": ".in_memory.InMemoryStorage", "filter": ".filter.FilteringProxyStorage", "buffer": ".buffer.BufferingProxyStorage", "retry": ".retry.RetryingProxyStorage", "cassandra": ".cassandra.CassandraStorage", "validate": ".validate.ValidatingProxyStorage", } def get_storage(cls: str, **kwargs) -> "StorageInterface": """Get a storage object of class `storage_class` with arguments `storage_args`. Args: - cls (str): storage's class, can be: - - ``local`` to use a postgresql database - - ``cassandra`` to use a cassandra database - - ``remote`` to connect to a swh-storage server - - ``memory`` for an in-memory storage, useful for fast tests - - ``filter``, ``buffer``, ... to use specific storage "proxies", see their - respective documentations + cls (str): + storage's class, can be: + - ``local`` to use a postgresql database + - ``cassandra`` to use a cassandra database + - ``remote`` to connect to a swh-storage server + - ``memory`` for an in-memory storage, useful for fast tests + - ``filter``, ``buffer``, ... to use specific storage "proxies", see their + respective documentations args (dict): dictionary with keys Returns: an instance of swh.storage.Storage or compatible class Raises: ValueError if passed an unknown storage class. """ if "args" in kwargs: warnings.warn( 'Explicit "args" key is deprecated, use keys directly instead.', DeprecationWarning, ) kwargs = kwargs["args"] if cls == "pipeline": return get_storage_pipeline(**kwargs) class_path = STORAGE_IMPLEMENTATIONS.get(cls) if class_path is None: raise ValueError( "Unknown storage class `%s`. Supported: %s" % (cls, ", ".join(STORAGE_IMPLEMENTATIONS)) ) (module_path, class_name) = class_path.rsplit(".", 1) module = importlib.import_module(module_path, package=__package__) Storage = getattr(module, class_name) check_config = kwargs.pop("check_config", {}) storage = Storage(**kwargs) if check_config: if not storage.check_config(**check_config): raise EnvironmentError("storage check config failed") return storage def get_storage_pipeline( steps: List[Dict[str, Any]], check_config=None ) -> "StorageInterface": """Recursively get a storage object that may use other storage objects as backends. Args: steps (List[dict]): List of dicts that may be used as kwargs for `get_storage`. Returns: an instance of swh.storage.Storage or compatible class Raises: ValueError if passed an unknown storage class. """ storage_config = None for step in reversed(steps): if "args" in step: warnings.warn( 'Explicit "args" key is deprecated, use keys directly ' "instead.", DeprecationWarning, ) step = { "cls": step["cls"], **step["args"], } if storage_config: step["storage"] = storage_config step["check_config"] = check_config storage_config = step if storage_config is None: raise ValueError("'pipeline' has no steps.") return get_storage(**storage_config) diff --git a/swh/storage/buffer.py b/swh/storage/buffer.py index 0389754d..01f376f9 100644 --- a/swh/storage/buffer.py +++ b/swh/storage/buffer.py @@ -1,182 +1,183 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from functools import partial from typing import Dict, Iterable, Mapping, Sequence, Tuple from typing_extensions import Literal from swh.core.utils import grouper from swh.model.model import BaseModel, Content, SkippedContent from swh.storage import get_storage from swh.storage.interface import StorageInterface LObjectType = Literal[ "content", "skipped_content", "directory", "revision", "release", "snapshot", "extid", ] OBJECT_TYPES: Tuple[LObjectType, ...] = ( "content", "skipped_content", "directory", "revision", "release", "snapshot", "extid", ) DEFAULT_BUFFER_THRESHOLDS: Dict[str, int] = { "content": 10000, "content_bytes": 100 * 1024 * 1024, "skipped_content": 10000, "directory": 25000, "revision": 100000, "release": 100000, "snapshot": 25000, "extid": 10000, } class BufferingProxyStorage: """Storage implementation in charge of accumulating objects prior to discussing with the "main" storage. Deduplicates values based on a tuple of keys depending on the object type. Sample configuration use case for buffering storage: .. code-block:: yaml storage: cls: buffer args: storage: cls: remote args: http://storage.internal.staging.swh.network:5002/ min_batch_size: content: 10000 content_bytes: 100000000 skipped_content: 10000 directory: 5000 revision: 1000 release: 10000 snapshot: 5000 """ def __init__(self, storage: Mapping, min_batch_size: Mapping = {}): self.storage: StorageInterface = get_storage(**storage) self._buffer_thresholds = {**DEFAULT_BUFFER_THRESHOLDS, **min_batch_size} self._objects: Dict[LObjectType, Dict[Tuple[str, ...], BaseModel]] = { k: {} for k in OBJECT_TYPES } self._contents_size: int = 0 def __getattr__(self, key: str): if key.endswith("_add"): object_type = key.rsplit("_", 1)[0] if object_type in OBJECT_TYPES: return partial(self.object_add, object_type=object_type, keys=["id"],) if key == "storage": raise AttributeError(key) return getattr(self.storage, key) def content_add(self, contents: Sequence[Content]) -> Dict: """Push contents to write to the storage in the buffer. Following policies apply: - - if the buffer's threshold is hit, flush content to the storage. - - otherwise, if the total size of buffered contents's threshold is hit, - flush content to the storage. + + - if the buffer's threshold is hit, flush content to the storage. + - otherwise, if the total size of buffered contents's threshold is hit, + flush content to the storage. """ stats = self.object_add( contents, object_type="content", keys=["sha1", "sha1_git", "sha256", "blake2s256"], ) if not stats: # We did not flush already self._contents_size += sum(c.length for c in contents) if self._contents_size >= self._buffer_thresholds["content_bytes"]: return self.flush(["content"]) return stats def skipped_content_add(self, contents: Sequence[SkippedContent]) -> Dict: return self.object_add( contents, object_type="skipped_content", keys=["sha1", "sha1_git", "sha256", "blake2s256"], ) def object_add( self, objects: Sequence[BaseModel], *, object_type: LObjectType, keys: Iterable[str], ) -> Dict[str, int]: """Push objects to write to the storage in the buffer. Flushes the buffer to the storage if the threshold is hit. """ buffer_ = self._objects[object_type] for obj in objects: obj_key = tuple(getattr(obj, key) for key in keys) buffer_[obj_key] = obj if len(buffer_) >= self._buffer_thresholds[object_type]: return self.flush() return {} def flush( self, object_types: Sequence[LObjectType] = OBJECT_TYPES ) -> Dict[str, int]: summary: Dict[str, int] = {} def update_summary(stats): for k, v in stats.items(): summary[k] = v + summary.get(k, 0) for object_type in object_types: buffer_ = self._objects[object_type] batches = grouper(buffer_.values(), n=self._buffer_thresholds[object_type]) for batch in batches: add_fn = getattr(self.storage, "%s_add" % object_type) stats = add_fn(list(batch)) update_summary(stats) # Flush underlying storage stats = self.storage.flush(object_types) update_summary(stats) self.clear_buffers(object_types) return summary def clear_buffers(self, object_types: Sequence[LObjectType] = OBJECT_TYPES) -> None: """Clear objects from current buffer. WARNING: data that has not been flushed to storage will be lost when this method is called. This should only be called when `flush` fails and you want to continue your processing. """ for object_type in object_types: buffer_ = self._objects[object_type] buffer_.clear() if object_type == "content": self._contents_size = 0 self.storage.clear_buffers(object_types) diff --git a/swh/storage/cassandra/cql.py b/swh/storage/cassandra/cql.py index 9a34c098..99f9ccf1 100644 --- a/swh/storage/cassandra/cql.py +++ b/swh/storage/cassandra/cql.py @@ -1,1162 +1,1194 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import Counter import dataclasses import datetime import functools import logging import random from typing import ( Any, Callable, Dict, Iterable, Iterator, List, Optional, Tuple, Type, TypeVar, Union, ) from cassandra import CoordinationFailure from cassandra.cluster import EXEC_PROFILE_DEFAULT, Cluster, ExecutionProfile, ResultSet from cassandra.policies import DCAwareRoundRobinPolicy, TokenAwarePolicy from cassandra.query import BoundStatement, PreparedStatement, dict_factory from mypy_extensions import NamedArg from tenacity import ( retry, retry_if_exception_type, stop_after_attempt, wait_random_exponential, ) from swh.model.identifiers import CoreSWHID from swh.model.model import ( Content, Person, Sha1Git, SkippedContent, Timestamp, TimestampWithTimezone, ) from swh.storage.interface import ListOrder from ..utils import remove_keys from .common import TOKEN_BEGIN, TOKEN_END, hash_url from .model import ( MAGIC_NULL_PK, BaseRow, ContentRow, DirectoryEntryRow, DirectoryRow, + ExtIDByTargetRow, ExtIDRow, MetadataAuthorityRow, MetadataFetcherRow, ObjectCountRow, OriginRow, OriginVisitRow, OriginVisitStatusRow, RawExtrinsicMetadataRow, ReleaseRow, RevisionParentRow, RevisionRow, SkippedContentRow, SnapshotBranchRow, SnapshotRow, + content_index_table_name, ) from .schema import CREATE_TABLES_QUERIES, HASH_ALGORITHMS logger = logging.getLogger(__name__) _execution_profiles = { EXEC_PROFILE_DEFAULT: ExecutionProfile( load_balancing_policy=TokenAwarePolicy(DCAwareRoundRobinPolicy()), row_factory=dict_factory, ), } # Configuration for cassandra-driver's access to servers: # * hit the right server directly when sending a query (TokenAwarePolicy), # * if there's more than one, then pick one at random that's in the same # datacenter as the client (DCAwareRoundRobinPolicy) def create_keyspace( hosts: List[str], keyspace: str, port: int = 9042, *, durable_writes=True ): cluster = Cluster(hosts, port=port, execution_profiles=_execution_profiles) session = cluster.connect() extra_params = "" if not durable_writes: extra_params = "AND durable_writes = false" session.execute( """CREATE KEYSPACE IF NOT EXISTS "%s" WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 } %s; """ % (keyspace, extra_params) ) session.execute('USE "%s"' % keyspace) for query in CREATE_TABLES_QUERIES: session.execute(query) TRet = TypeVar("TRet") def _prepared_statement( query: str, ) -> Callable[[Callable[..., TRet]], Callable[..., TRet]]: """Returns a decorator usable on methods of CqlRunner, to inject them with a 'statement' argument, that is a prepared statement corresponding to the query. This only works on methods of CqlRunner, as preparing a statement requires a connection to a Cassandra server.""" def decorator(f): @functools.wraps(f) def newf(self, *args, **kwargs) -> TRet: if f.__name__ not in self._prepared_statements: statement: PreparedStatement = self._session.prepare(query) self._prepared_statements[f.__name__] = statement return f( self, *args, **kwargs, statement=self._prepared_statements[f.__name__] ) return newf return decorator TArg = TypeVar("TArg") TSelf = TypeVar("TSelf") def _prepared_insert_statement( row_class: Type[BaseRow], ) -> Callable[ [Callable[[TSelf, TArg, NamedArg(Any, "statement")], TRet]], # noqa Callable[[TSelf, TArg], TRet], ]: """Shorthand for using `_prepared_statement` for `INSERT INTO` statements.""" columns = row_class.cols() return _prepared_statement( "INSERT INTO %s (%s) VALUES (%s)" % (row_class.TABLE, ", ".join(columns), ", ".join("?" for _ in columns),) ) def _prepared_exists_statement( table_name: str, ) -> Callable[ [Callable[[TSelf, TArg, NamedArg(Any, "statement")], TRet]], # noqa Callable[[TSelf, TArg], TRet], ]: """Shorthand for using `_prepared_statement` for queries that only check which ids in a list exist in the table.""" return _prepared_statement(f"SELECT id FROM {table_name} WHERE id IN ?") def _prepared_select_statement( row_class: Type[BaseRow], clauses: str = "", cols: Optional[List[str]] = None, ) -> Callable[[Callable[..., TRet]], Callable[..., TRet]]: if cols is None: cols = row_class.cols() return _prepared_statement( f"SELECT {', '.join(cols)} FROM {row_class.TABLE} {clauses}" ) def _prepared_select_statements( row_class: Type[BaseRow], queries: Dict[Any, str], ) -> Callable[[Callable[..., TRet]], Callable[..., TRet]]: """Like _prepared_statement, but supports multiple statements, passed a dict, and passes a dict of prepared statements to the decorated method""" cols = row_class.cols() statement_start = f"SELECT {', '.join(cols)} FROM {row_class.TABLE} " def decorator(f): @functools.wraps(f) def newf(self, *args, **kwargs) -> TRet: if f.__name__ not in self._prepared_statements: self._prepared_statements[f.__name__] = { key: self._session.prepare(statement_start + query) for (key, query) in queries.items() } return f( self, *args, **kwargs, statements=self._prepared_statements[f.__name__] ) return newf return decorator def _next_bytes_value(value: bytes) -> bytes: """Returns the next bytes value by incrementing the integer representation of the provided value and converting it back to bytes. For instance when prefix is b"abcd", it returns b"abce". """ next_value_int = int.from_bytes(value, byteorder="big") + 1 return next_value_int.to_bytes( (next_value_int.bit_length() + 7) // 8, byteorder="big" ) class CqlRunner: """Class managing prepared statements and building queries to be sent to Cassandra.""" def __init__(self, hosts: List[str], keyspace: str, port: int): self._cluster = Cluster( hosts, port=port, execution_profiles=_execution_profiles ) self._session = self._cluster.connect(keyspace) self._cluster.register_user_type( keyspace, "microtimestamp_with_timezone", TimestampWithTimezone ) self._cluster.register_user_type(keyspace, "microtimestamp", Timestamp) self._cluster.register_user_type(keyspace, "person", Person) # directly a PreparedStatement for methods decorated with # @_prepared_statements (and its wrappers, _prepared_insert_statement, # _prepared_exists_statement, and _prepared_select_statement); # and a dict of PreparedStatements with @_prepared_select_statements self._prepared_statements: Dict[ str, Union[PreparedStatement, Dict[Any, PreparedStatement]] ] = {} ########################## # Common utility functions ########################## MAX_RETRIES = 3 @retry( wait=wait_random_exponential(multiplier=1, max=10), stop=stop_after_attempt(MAX_RETRIES), retry=retry_if_exception_type(CoordinationFailure), ) def _execute_with_retries(self, statement, args) -> ResultSet: return self._session.execute(statement, args, timeout=1000.0) @_prepared_statement( "UPDATE object_count SET count = count + ? " "WHERE partition_key = 0 AND object_type = ?" ) def _increment_counter( self, object_type: str, nb: int, *, statement: PreparedStatement ) -> None: self._execute_with_retries(statement, [nb, object_type]) def _add_one(self, statement, obj: BaseRow) -> None: self._increment_counter(obj.TABLE, 1) self._execute_with_retries(statement, dataclasses.astuple(obj)) _T = TypeVar("_T", bound=BaseRow) def _get_random_row(self, row_class: Type[_T], statement) -> Optional[_T]: # noqa """Takes a prepared statement of the form "SELECT * FROM WHERE token() > ? LIMIT 1" and uses it to return a random row""" token = random.randint(TOKEN_BEGIN, TOKEN_END) rows = self._execute_with_retries(statement, [token]) if not rows: # There are no row with a greater token; wrap around to get # the row with the smallest token rows = self._execute_with_retries(statement, [TOKEN_BEGIN]) if rows: return row_class.from_dict(rows.one()) # type: ignore else: return None def _missing(self, statement, ids): rows = self._execute_with_retries(statement, [ids]) found_ids = {row["id"] for row in rows} return [id_ for id_ in ids if id_ not in found_ids] ########################## # 'content' table ########################## def _content_add_finalize(self, statement: BoundStatement) -> None: """Returned currified by content_add_prepare, to be called when the content row should be added to the primary table.""" self._execute_with_retries(statement, None) self._increment_counter("content", 1) @_prepared_insert_statement(ContentRow) def content_add_prepare( self, content: ContentRow, *, statement ) -> Tuple[int, Callable[[], None]]: """Prepares insertion of a Content to the main 'content' table. Returns a token (to be used in secondary tables), and a function to be called to perform the insertion in the main table.""" statement = statement.bind(dataclasses.astuple(content)) # Type used for hashing keys (usually, it will be # cassandra.metadata.Murmur3Token) token_class = self._cluster.metadata.token_map.token_class # Token of the row when it will be inserted. This is equivalent to # "SELECT token({', '.join(ContentRow.PARTITION_KEY)}) FROM content WHERE ..." # after the row is inserted; but we need the token to insert in the # index tables *before* inserting to the main 'content' table token = token_class.from_key(statement.routing_key).value assert TOKEN_BEGIN <= token <= TOKEN_END # Function to be called after the indexes contain their respective # row finalizer = functools.partial(self._content_add_finalize, statement) return (token, finalizer) @_prepared_select_statement( ContentRow, f"WHERE {' AND '.join(map('%s = ?'.__mod__, HASH_ALGORITHMS))}" ) def content_get_from_pk( self, content_hashes: Dict[str, bytes], *, statement ) -> Optional[ContentRow]: rows = list( self._execute_with_retries( statement, [content_hashes[algo] for algo in HASH_ALGORITHMS] ) ) assert len(rows) <= 1 if rows: return ContentRow(**rows[0]) else: return None @_prepared_select_statement( ContentRow, f"WHERE token({', '.join(ContentRow.PARTITION_KEY)}) = ?" ) def content_get_from_token(self, token, *, statement) -> Iterable[ContentRow]: return map(ContentRow.from_dict, self._execute_with_retries(statement, [token])) @_prepared_select_statement( ContentRow, f"WHERE token({', '.join(ContentRow.PARTITION_KEY)}) > ? LIMIT 1" ) def content_get_random(self, *, statement) -> Optional[ContentRow]: return self._get_random_row(ContentRow, statement) @_prepared_statement( - ( - "SELECT token({0}) AS tok, {1} FROM content " - "WHERE token({0}) >= ? AND token({0}) <= ? LIMIT ?" - ).format(", ".join(ContentRow.PARTITION_KEY), ", ".join(ContentRow.cols())) + """ + SELECT token({pk}) AS tok, {cols} FROM {table} + WHERE token({pk}) >= ? AND token({pk}) <= ? LIMIT ? + """.format( + pk=", ".join(ContentRow.PARTITION_KEY), + cols=", ".join(ContentRow.cols()), + table=ContentRow.TABLE, + ) ) def content_get_token_range( self, start: int, end: int, limit: int, *, statement ) -> Iterable[Tuple[int, ContentRow]]: """Returns an iterable of (token, row)""" return ( (row["tok"], ContentRow.from_dict(remove_keys(row, ("tok",)))) for row in self._execute_with_retries(statement, [start, end, limit]) ) ########################## # 'content_by_*' tables ########################## @_prepared_statement( - "SELECT sha1_git AS id FROM content_by_sha1_git WHERE sha1_git IN ?" + f""" + SELECT sha1_git AS id + FROM {content_index_table_name("sha1_git", skipped_content=False)} + WHERE sha1_git IN ? + """ ) def content_missing_by_sha1_git( self, ids: List[bytes], *, statement ) -> List[bytes]: return self._missing(statement, ids) def content_index_add_one(self, algo: str, content: Content, token: int) -> None: """Adds a row mapping content[algo] to the token of the Content in the main 'content' table.""" - query = ( - f"INSERT INTO content_by_{algo} ({algo}, target_token) " f"VALUES (%s, %s)" - ) + query = f""" + INSERT INTO {content_index_table_name(algo, skipped_content=False)} + ({algo}, target_token) + VALUES (%s, %s) + """ self._execute_with_retries(query, [content.get_hash(algo), token]) def content_get_tokens_from_single_hash( self, algo: str, hash_: bytes ) -> Iterable[int]: assert algo in HASH_ALGORITHMS - query = f"SELECT target_token FROM content_by_{algo} WHERE {algo} = %s" + query = f""" + SELECT target_token + FROM {content_index_table_name(algo, skipped_content=False)} + WHERE {algo} = %s + """ return ( row["target_token"] for row in self._execute_with_retries(query, [hash_]) ) ########################## # 'skipped_content' table ########################## def _skipped_content_add_finalize(self, statement: BoundStatement) -> None: """Returned currified by skipped_content_add_prepare, to be called when the content row should be added to the primary table.""" self._execute_with_retries(statement, None) self._increment_counter("skipped_content", 1) @_prepared_insert_statement(SkippedContentRow) def skipped_content_add_prepare( self, content, *, statement ) -> Tuple[int, Callable[[], None]]: """Prepares insertion of a Content to the main 'skipped_content' table. Returns a token (to be used in secondary tables), and a function to be called to perform the insertion in the main table.""" # Replace NULLs (which are not allowed in the partition key) with # an empty byte string for key in SkippedContentRow.PARTITION_KEY: if getattr(content, key) is None: setattr(content, key, MAGIC_NULL_PK) statement = statement.bind(dataclasses.astuple(content)) # Type used for hashing keys (usually, it will be # cassandra.metadata.Murmur3Token) token_class = self._cluster.metadata.token_map.token_class # Token of the row when it will be inserted. This is equivalent to # "SELECT token({', '.join(SkippedContentRow.PARTITION_KEY)}) # FROM skipped_content WHERE ..." # after the row is inserted; but we need the token to insert in the # index tables *before* inserting to the main 'skipped_content' table token = token_class.from_key(statement.routing_key).value assert TOKEN_BEGIN <= token <= TOKEN_END # Function to be called after the indexes contain their respective # row finalizer = functools.partial(self._skipped_content_add_finalize, statement) return (token, finalizer) @_prepared_select_statement( SkippedContentRow, f"WHERE {' AND '.join(map('%s = ?'.__mod__, HASH_ALGORITHMS))}", ) def skipped_content_get_from_pk( self, content_hashes: Dict[str, bytes], *, statement ) -> Optional[SkippedContentRow]: rows = list( self._execute_with_retries( statement, [content_hashes[algo] or MAGIC_NULL_PK for algo in HASH_ALGORITHMS], ) ) assert len(rows) <= 1 if rows: return SkippedContentRow.from_dict(rows[0]) else: return None @_prepared_select_statement( SkippedContentRow, f"WHERE token({', '.join(SkippedContentRow.PARTITION_KEY)}) = ?", ) def skipped_content_get_from_token( self, token, *, statement ) -> Iterable[SkippedContentRow]: return map( SkippedContentRow.from_dict, self._execute_with_retries(statement, [token]) ) ########################## # 'skipped_content_by_*' tables ########################## def skipped_content_index_add_one( self, algo: str, content: SkippedContent, token: int ) -> None: """Adds a row mapping content[algo] to the token of the SkippedContent in the main 'skipped_content' table.""" query = ( f"INSERT INTO skipped_content_by_{algo} ({algo}, target_token) " f"VALUES (%s, %s)" ) self._execute_with_retries( query, [content.get_hash(algo) or MAGIC_NULL_PK, token] ) def skipped_content_get_tokens_from_single_hash( self, algo: str, hash_: bytes ) -> Iterable[int]: assert algo in HASH_ALGORITHMS - query = f"SELECT target_token FROM skipped_content_by_{algo} WHERE {algo} = %s" + query = f""" + SELECT target_token + FROM {content_index_table_name(algo, skipped_content=True)} + WHERE {algo} = %s + """ return ( row["target_token"] for row in self._execute_with_retries(query, [hash_]) ) ########################## # 'revision' table ########################## @_prepared_exists_statement("revision") def revision_missing(self, ids: List[bytes], *, statement) -> List[bytes]: return self._missing(statement, ids) @_prepared_insert_statement(RevisionRow) def revision_add_one(self, revision: RevisionRow, *, statement) -> None: self._add_one(statement, revision) - @_prepared_statement("SELECT id FROM revision WHERE id IN ?") + @_prepared_statement(f"SELECT id FROM {RevisionRow.TABLE} WHERE id IN ?") def revision_get_ids(self, revision_ids, *, statement) -> Iterable[int]: return ( row["id"] for row in self._execute_with_retries(statement, [revision_ids]) ) @_prepared_select_statement(RevisionRow, "WHERE id IN ?") def revision_get( self, revision_ids: List[Sha1Git], *, statement ) -> Iterable[RevisionRow]: return map( RevisionRow.from_dict, self._execute_with_retries(statement, [revision_ids]) ) @_prepared_select_statement(RevisionRow, "WHERE token(id) > ? LIMIT 1") def revision_get_random(self, *, statement) -> Optional[RevisionRow]: return self._get_random_row(RevisionRow, statement) ########################## # 'revision_parent' table ########################## @_prepared_insert_statement(RevisionParentRow) def revision_parent_add_one( self, revision_parent: RevisionParentRow, *, statement ) -> None: self._add_one(statement, revision_parent) - @_prepared_statement("SELECT parent_id FROM revision_parent WHERE id = ?") + @_prepared_statement( + f"SELECT parent_id FROM {RevisionParentRow.TABLE} WHERE id = ?" + ) def revision_parent_get( self, revision_id: Sha1Git, *, statement ) -> Iterable[bytes]: return ( row["parent_id"] for row in self._execute_with_retries(statement, [revision_id]) ) ########################## # 'release' table ########################## @_prepared_exists_statement("release") def release_missing(self, ids: List[bytes], *, statement) -> List[bytes]: return self._missing(statement, ids) @_prepared_insert_statement(ReleaseRow) def release_add_one(self, release: ReleaseRow, *, statement) -> None: self._add_one(statement, release) @_prepared_select_statement(ReleaseRow, "WHERE id in ?") def release_get(self, release_ids: List[str], *, statement) -> Iterable[ReleaseRow]: return map( ReleaseRow.from_dict, self._execute_with_retries(statement, [release_ids]) ) @_prepared_select_statement(ReleaseRow, "WHERE token(id) > ? LIMIT 1") def release_get_random(self, *, statement) -> Optional[ReleaseRow]: return self._get_random_row(ReleaseRow, statement) ########################## # 'directory' table ########################## @_prepared_exists_statement("directory") def directory_missing(self, ids: List[bytes], *, statement) -> List[bytes]: return self._missing(statement, ids) @_prepared_insert_statement(DirectoryRow) def directory_add_one(self, directory: DirectoryRow, *, statement) -> None: """Called after all calls to directory_entry_add_one, to commit/finalize the directory.""" self._add_one(statement, directory) @_prepared_select_statement(DirectoryRow, "WHERE token(id) > ? LIMIT 1") def directory_get_random(self, *, statement) -> Optional[DirectoryRow]: return self._get_random_row(DirectoryRow, statement) ########################## # 'directory_entry' table ########################## @_prepared_insert_statement(DirectoryEntryRow) def directory_entry_add_one(self, entry: DirectoryEntryRow, *, statement) -> None: self._add_one(statement, entry) @_prepared_select_statement(DirectoryEntryRow, "WHERE directory_id IN ?") def directory_entry_get( self, directory_ids, *, statement ) -> Iterable[DirectoryEntryRow]: return map( DirectoryEntryRow.from_dict, self._execute_with_retries(statement, [directory_ids]), ) ########################## # 'snapshot' table ########################## @_prepared_exists_statement("snapshot") def snapshot_missing(self, ids: List[bytes], *, statement) -> List[bytes]: return self._missing(statement, ids) @_prepared_insert_statement(SnapshotRow) def snapshot_add_one(self, snapshot: SnapshotRow, *, statement) -> None: self._add_one(statement, snapshot) @_prepared_select_statement(SnapshotRow, "WHERE token(id) > ? LIMIT 1") def snapshot_get_random(self, *, statement) -> Optional[SnapshotRow]: return self._get_random_row(SnapshotRow, statement) ########################## # 'snapshot_branch' table ########################## @_prepared_insert_statement(SnapshotBranchRow) def snapshot_branch_add_one(self, branch: SnapshotBranchRow, *, statement) -> None: self._add_one(statement, branch) @_prepared_statement( - "SELECT ascii_bins_count(target_type) AS counts " - "FROM snapshot_branch " - "WHERE snapshot_id = ? AND name >= ?" + f""" + SELECT ascii_bins_count(target_type) AS counts + FROM {SnapshotBranchRow.TABLE} + WHERE snapshot_id = ? AND name >= ? + """ ) def snapshot_count_branches_from_name( self, snapshot_id: Sha1Git, from_: bytes, *, statement ) -> Dict[Optional[str], int]: row = self._execute_with_retries(statement, [snapshot_id, from_]).one() (nb_none, counts) = row["counts"] return {None: nb_none, **counts} @_prepared_statement( - "SELECT ascii_bins_count(target_type) AS counts " - "FROM snapshot_branch " - "WHERE snapshot_id = ? AND name < ?" + f""" + SELECT ascii_bins_count(target_type) AS counts + FROM {SnapshotBranchRow.TABLE} + WHERE snapshot_id = ? AND name < ? + """ ) def snapshot_count_branches_before_name( self, snapshot_id: Sha1Git, before: bytes, *, statement, ) -> Dict[Optional[str], int]: row = self._execute_with_retries(statement, [snapshot_id, before]).one() (nb_none, counts) = row["counts"] return {None: nb_none, **counts} def snapshot_count_branches( self, snapshot_id: Sha1Git, branch_name_exclude_prefix: Optional[bytes] = None, ) -> Dict[Optional[str], int]: """Returns a dictionary from type names to the number of branches of that type.""" prefix = branch_name_exclude_prefix if prefix is None: return self.snapshot_count_branches_from_name(snapshot_id, b"") else: # counts branches before exclude prefix counts = Counter( self.snapshot_count_branches_before_name(snapshot_id, prefix) ) # no need to execute that part if each bit of the prefix equals 1 if prefix.replace(b"\xff", b"") != b"": # counts branches after exclude prefix and update counters counts.update( self.snapshot_count_branches_from_name( snapshot_id, _next_bytes_value(prefix) ) ) return counts @_prepared_select_statement( SnapshotBranchRow, "WHERE snapshot_id = ? AND name >= ? LIMIT ?" ) def snapshot_branch_get_from_name( self, snapshot_id: Sha1Git, from_: bytes, limit: int, *, statement ) -> Iterable[SnapshotBranchRow]: return map( SnapshotBranchRow.from_dict, self._execute_with_retries(statement, [snapshot_id, from_, limit]), ) @_prepared_select_statement( SnapshotBranchRow, "WHERE snapshot_id = ? AND name >= ? AND name < ? LIMIT ?" ) def snapshot_branch_get_range( self, snapshot_id: Sha1Git, from_: bytes, before: bytes, limit: int, *, statement, ) -> Iterable[SnapshotBranchRow]: return map( SnapshotBranchRow.from_dict, self._execute_with_retries(statement, [snapshot_id, from_, before, limit]), ) def snapshot_branch_get( self, snapshot_id: Sha1Git, from_: bytes, limit: int, branch_name_exclude_prefix: Optional[bytes] = None, ) -> Iterable[SnapshotBranchRow]: prefix = branch_name_exclude_prefix if prefix is None: return self.snapshot_branch_get_from_name(snapshot_id, from_, limit) else: # get branches before the exclude prefix branches = list( self.snapshot_branch_get_range(snapshot_id, from_, prefix, limit) ) nb_branches = len(branches) # no need to execute that part if limit is reached # or if each bit of the prefix equals 1 if nb_branches < limit and prefix.replace(b"\xff", b"") != b"": # get branches after the exclude prefix and update list to return branches.extend( self.snapshot_branch_get_from_name( snapshot_id, _next_bytes_value(prefix), limit - nb_branches ) ) return branches ########################## # 'origin' table ########################## @_prepared_insert_statement(OriginRow) def origin_add_one(self, origin: OriginRow, *, statement) -> None: self._add_one(statement, origin) @_prepared_select_statement(OriginRow, "WHERE sha1 = ?") def origin_get_by_sha1(self, sha1: bytes, *, statement) -> Iterable[OriginRow]: return map(OriginRow.from_dict, self._execute_with_retries(statement, [sha1])) def origin_get_by_url(self, url: str) -> Iterable[OriginRow]: return self.origin_get_by_sha1(hash_url(url)) @_prepared_statement( - f'SELECT token(sha1) AS tok, {", ".join(OriginRow.cols())} ' - f"FROM origin WHERE token(sha1) >= ? LIMIT ?" + f""" + SELECT token(sha1) AS tok, {", ".join(OriginRow.cols())} + FROM {OriginRow.TABLE} + WHERE token(sha1) >= ? LIMIT ? + """ ) def origin_list( self, start_token: int, limit: int, *, statement ) -> Iterable[Tuple[int, OriginRow]]: """Returns an iterable of (token, origin)""" return ( (row["tok"], OriginRow.from_dict(remove_keys(row, ("tok",)))) for row in self._execute_with_retries(statement, [start_token, limit]) ) @_prepared_select_statement(OriginRow) def origin_iter_all(self, *, statement) -> Iterable[OriginRow]: return map(OriginRow.from_dict, self._execute_with_retries(statement, [])) - @_prepared_statement("SELECT next_visit_id FROM origin WHERE sha1 = ?") + @_prepared_statement(f"SELECT next_visit_id FROM {OriginRow.TABLE} WHERE sha1 = ?") def _origin_get_next_visit_id(self, origin_sha1: bytes, *, statement) -> int: rows = list(self._execute_with_retries(statement, [origin_sha1])) assert len(rows) == 1 # TODO: error handling return rows[0]["next_visit_id"] @_prepared_statement( - "UPDATE origin SET next_visit_id=? WHERE sha1 = ? IF next_visit_id=?" + f""" + UPDATE {OriginRow.TABLE} + SET next_visit_id=? + WHERE sha1 = ? IF next_visit_id=? + """ ) def origin_generate_unique_visit_id(self, origin_url: str, *, statement) -> int: origin_sha1 = hash_url(origin_url) next_id = self._origin_get_next_visit_id(origin_sha1) while True: res = list( self._execute_with_retries( statement, [next_id + 1, origin_sha1, next_id] ) ) assert len(res) == 1 if res[0]["[applied]"]: # No data race return next_id else: # Someone else updated it before we did, let's try again next_id = res[0]["next_visit_id"] # TODO: abort after too many attempts return next_id ########################## # 'origin_visit' table ########################## @_prepared_select_statements( OriginVisitRow, { (True, ListOrder.ASC): ( "WHERE origin = ? AND visit > ? ORDER BY visit ASC LIMIT ?" ), (True, ListOrder.DESC): ( "WHERE origin = ? AND visit < ? ORDER BY visit DESC LIMIT ?" ), (False, ListOrder.ASC): "WHERE origin = ? ORDER BY visit ASC LIMIT ?", (False, ListOrder.DESC): "WHERE origin = ? ORDER BY visit DESC LIMIT ?", }, ) def origin_visit_get( self, origin_url: str, last_visit: Optional[int], limit: int, order: ListOrder, *, statements, ) -> Iterable[OriginVisitRow]: args: List[Any] = [origin_url] if last_visit is not None: args.append(last_visit) args.append(limit) statement = statements[(last_visit is not None, order)] return map( OriginVisitRow.from_dict, self._execute_with_retries(statement, args) ) @_prepared_insert_statement(OriginVisitRow) def origin_visit_add_one(self, visit: OriginVisitRow, *, statement) -> None: self._add_one(statement, visit) @_prepared_select_statement(OriginVisitRow, "WHERE origin = ? AND visit = ?") def origin_visit_get_one( self, origin_url: str, visit_id: int, *, statement ) -> Optional[OriginVisitRow]: # TODO: error handling rows = list(self._execute_with_retries(statement, [origin_url, visit_id])) if rows: return OriginVisitRow.from_dict(rows[0]) else: return None @_prepared_select_statement(OriginVisitRow, "WHERE origin = ?") def origin_visit_get_all( self, origin_url: str, *, statement ) -> Iterable[OriginVisitRow]: return map( OriginVisitRow.from_dict, self._execute_with_retries(statement, [origin_url]), ) @_prepared_select_statement(OriginVisitRow, "WHERE token(origin) >= ?") def _origin_visit_iter_from( self, min_token: int, *, statement ) -> Iterable[OriginVisitRow]: return map( OriginVisitRow.from_dict, self._execute_with_retries(statement, [min_token]) ) @_prepared_select_statement(OriginVisitRow, "WHERE token(origin) < ?") def _origin_visit_iter_to( self, max_token: int, *, statement ) -> Iterable[OriginVisitRow]: return map( OriginVisitRow.from_dict, self._execute_with_retries(statement, [max_token]) ) def origin_visit_iter(self, start_token: int) -> Iterator[OriginVisitRow]: """Returns all origin visits in order from this token, and wraps around the token space.""" yield from self._origin_visit_iter_from(start_token) yield from self._origin_visit_iter_to(start_token) ########################## # 'origin_visit_status' table ########################## @_prepared_select_statements( OriginVisitStatusRow, { (True, ListOrder.ASC): ( "WHERE origin = ? AND visit = ? AND date >= ? " "ORDER BY visit ASC LIMIT ?" ), (True, ListOrder.DESC): ( "WHERE origin = ? AND visit = ? AND date <= ? " "ORDER BY visit DESC LIMIT ?" ), (False, ListOrder.ASC): ( "WHERE origin = ? AND visit = ? ORDER BY visit ASC LIMIT ?" ), (False, ListOrder.DESC): ( "WHERE origin = ? AND visit = ? ORDER BY visit DESC LIMIT ?" ), }, ) def origin_visit_status_get_range( self, origin: str, visit: int, date_from: Optional[datetime.datetime], limit: int, order: ListOrder, *, statements, ) -> Iterable[OriginVisitStatusRow]: args: List[Any] = [origin, visit] if date_from is not None: args.append(date_from) args.append(limit) statement = statements[(date_from is not None, order)] return map( OriginVisitStatusRow.from_dict, self._execute_with_retries(statement, args) ) @_prepared_insert_statement(OriginVisitStatusRow) def origin_visit_status_add_one( self, visit_update: OriginVisitStatusRow, *, statement ) -> None: self._add_one(statement, visit_update) def origin_visit_status_get_latest( self, origin: str, visit: int, ) -> Optional[OriginVisitStatusRow]: """Given an origin visit id, return its latest origin_visit_status """ return next(self.origin_visit_status_get(origin, visit), None) @_prepared_select_statement( OriginVisitStatusRow, "WHERE origin = ? AND visit = ? ORDER BY date DESC" ) def origin_visit_status_get( self, origin: str, visit: int, *, statement, ) -> Iterator[OriginVisitStatusRow]: """Return all origin visit statuses for a given visit """ return map( OriginVisitStatusRow.from_dict, self._execute_with_retries(statement, [origin, visit]), ) ########################## # 'metadata_authority' table ########################## @_prepared_insert_statement(MetadataAuthorityRow) def metadata_authority_add(self, authority: MetadataAuthorityRow, *, statement): self._add_one(statement, authority) @_prepared_select_statement(MetadataAuthorityRow, "WHERE type = ? AND url = ?") def metadata_authority_get( self, type, url, *, statement ) -> Optional[MetadataAuthorityRow]: rows = list(self._execute_with_retries(statement, [type, url])) if rows: return MetadataAuthorityRow.from_dict(rows[0]) else: return None ########################## # 'metadata_fetcher' table ########################## @_prepared_insert_statement(MetadataFetcherRow) def metadata_fetcher_add(self, fetcher, *, statement): self._add_one(statement, fetcher) @_prepared_select_statement(MetadataFetcherRow, "WHERE name = ? AND version = ?") def metadata_fetcher_get( self, name, version, *, statement ) -> Optional[MetadataFetcherRow]: rows = list(self._execute_with_retries(statement, [name, version])) if rows: return MetadataFetcherRow.from_dict(rows[0]) else: return None ######################### # 'raw_extrinsic_metadata' table ######################### @_prepared_insert_statement(RawExtrinsicMetadataRow) def raw_extrinsic_metadata_add(self, raw_extrinsic_metadata, *, statement): self._add_one(statement, raw_extrinsic_metadata) @_prepared_select_statement( RawExtrinsicMetadataRow, "WHERE target=? AND authority_url=? AND discovery_date>? AND authority_type=?", ) def raw_extrinsic_metadata_get_after_date( self, target: str, authority_type: str, authority_url: str, after: datetime.datetime, *, statement, ) -> Iterable[RawExtrinsicMetadataRow]: return map( RawExtrinsicMetadataRow.from_dict, self._execute_with_retries( statement, [target, authority_url, after, authority_type] ), ) @_prepared_select_statement( RawExtrinsicMetadataRow, "WHERE target=? AND authority_type=? AND authority_url=? " "AND (discovery_date, id) > (?, ?)", ) def raw_extrinsic_metadata_get_after_date_and_id( self, target: str, authority_type: str, authority_url: str, after_date: datetime.datetime, after_id: bytes, *, statement, ) -> Iterable[RawExtrinsicMetadataRow]: return map( RawExtrinsicMetadataRow.from_dict, self._execute_with_retries( statement, [target, authority_type, authority_url, after_date, after_id,], ), ) @_prepared_select_statement( RawExtrinsicMetadataRow, "WHERE target=? AND authority_url=? AND authority_type=?", ) def raw_extrinsic_metadata_get( self, target: str, authority_type: str, authority_url: str, *, statement ) -> Iterable[RawExtrinsicMetadataRow]: return map( RawExtrinsicMetadataRow.from_dict, self._execute_with_retries( statement, [target, authority_url, authority_type] ), ) ########################## # 'extid' table ########################## def _extid_add_finalize(self, statement: BoundStatement) -> None: """Returned currified by extid_add_prepare, to be called when the extid row should be added to the primary table.""" self._execute_with_retries(statement, None) self._increment_counter("extid", 1) @_prepared_insert_statement(ExtIDRow) def extid_add_prepare( self, extid: ExtIDRow, *, statement ) -> Tuple[int, Callable[[], None]]: statement = statement.bind(dataclasses.astuple(extid)) token_class = self._cluster.metadata.token_map.token_class token = token_class.from_key(statement.routing_key).value assert TOKEN_BEGIN <= token <= TOKEN_END # Function to be called after the indexes contain their respective # row finalizer = functools.partial(self._extid_add_finalize, statement) return (token, finalizer) @_prepared_select_statement( ExtIDRow, "WHERE extid_type=? AND extid=? AND target_type=? AND target=?", ) def extid_get_from_pk( self, extid_type: str, extid: bytes, target: CoreSWHID, *, statement, ) -> Optional[ExtIDRow]: rows = list( self._execute_with_retries( statement, [extid_type, extid, target.object_type.value, target.object_id], ), ) assert len(rows) <= 1 if rows: return ExtIDRow(**rows[0]) else: return None @_prepared_select_statement( ExtIDRow, "WHERE token(extid_type, extid) = ?", ) def extid_get_from_token(self, token: int, *, statement) -> Iterable[ExtIDRow]: return map(ExtIDRow.from_dict, self._execute_with_retries(statement, [token]),) @_prepared_select_statement( ExtIDRow, "WHERE extid_type=? AND extid=?", ) def extid_get_from_extid( self, extid_type: str, extid: bytes, *, statement ) -> Iterable[ExtIDRow]: return map( ExtIDRow.from_dict, self._execute_with_retries(statement, [extid_type, extid]), ) def extid_get_from_target( self, target_type: str, target: bytes ) -> Iterable[ExtIDRow]: for token in self._extid_get_tokens_from_target(target_type, target): if token is not None: for extid in self.extid_get_from_token(token): # re-check the extid against target (in case of murmur3 collision) if ( extid is not None and extid.target_type == target_type and extid.target == target ): yield extid ########################## - # 'extid_by_*' tables + # 'extid_by_target' table ########################## - def extid_index_add_one(self, extid: ExtIDRow, token: int) -> None: + @_prepared_insert_statement(ExtIDByTargetRow) + def extid_index_add_one(self, row: ExtIDByTargetRow, *, statement) -> None: """Adds a row mapping extid[target_type, target] to the token of the ExtID in the main 'extid' table.""" - query = ( - "INSERT INTO extid_by_target (target_type, target, target_token) " - "VALUES (%s, %s, %s)" - ) - self._execute_with_retries(query, [extid.target_type, extid.target, token]) + self._add_one(statement, row) + @_prepared_statement( + f""" + SELECT target_token + FROM {ExtIDByTargetRow.TABLE} + WHERE target_type = ? AND target = ? + """ + ) def _extid_get_tokens_from_target( - self, target_type: str, target: bytes + self, target_type: str, target: bytes, *, statement ) -> Iterable[int]: - query = ( - "SELECT target_token " - "FROM extid_by_target " - "WHERE target_type = %s AND target = %s" - ) return ( row["target_token"] - for row in self._execute_with_retries(query, [target_type, target]) + for row in self._execute_with_retries(statement, [target_type, target]) ) ########################## # Miscellaneous ########################## @_prepared_statement("SELECT uuid() FROM revision LIMIT 1;") def check_read(self, *, statement): self._execute_with_retries(statement, []) @_prepared_select_statement(ObjectCountRow, "WHERE partition_key=0") def stat_counters(self, *, statement) -> Iterable[ObjectCountRow]: return map(ObjectCountRow.from_dict, self._execute_with_retries(statement, [])) diff --git a/swh/storage/cassandra/model.py b/swh/storage/cassandra/model.py index f842178a..a7803ae7 100644 --- a/swh/storage/cassandra/model.py +++ b/swh/storage/cassandra/model.py @@ -1,294 +1,318 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Classes representing tables in the Cassandra database. They are very close to classes found in swh.model.model, but most of them are subtly different: * Large objects are split into other classes (eg. RevisionRow has no 'parents' field, because parents are stored in a different table, represented by RevisionParentRow) * They have a "cols" field, which returns the list of column names of the table * They only use types that map directly to Cassandra's schema (ie. no enums) Therefore, this model doesn't reuse swh.model.model, except for types that can be mapped to UDTs (Person and TimestampWithTimezone). """ import dataclasses import datetime from typing import Any, ClassVar, Dict, List, Optional, Tuple, Type, TypeVar from swh.model.model import Person, TimestampWithTimezone MAGIC_NULL_PK = b"" """ NULLs (or all-empty blobs) are not allowed in primary keys; instead we use a special value that can't possibly be a valid hash. """ T = TypeVar("T", bound="BaseRow") +def content_index_table_name(algo: str, skipped_content: bool) -> str: + """Given an algorithm name, returns the name of one of the 'content_by_*' + and 'skipped_content_by_*' tables that serve as index for the 'content' + and 'skipped_content' tables based on this algorithm's hashes. + + For now it is a simple substitution, but future versions may append a version + number to it, if needed for schema updates.""" + if skipped_content: + return f"skipped_content_by_{algo}" + else: + return f"content_by_{algo}" + + class BaseRow: TABLE: ClassVar[str] PARTITION_KEY: ClassVar[Tuple[str, ...]] CLUSTERING_KEY: ClassVar[Tuple[str, ...]] = () @classmethod def from_dict(cls: Type[T], d: Dict[str, Any]) -> T: return cls(**d) # type: ignore @classmethod def cols(cls) -> List[str]: return [field.name for field in dataclasses.fields(cls)] def to_dict(self) -> Dict[str, Any]: return dataclasses.asdict(self) @dataclasses.dataclass class ContentRow(BaseRow): TABLE = "content" PARTITION_KEY = ("sha1", "sha1_git", "sha256", "blake2s256") sha1: bytes sha1_git: bytes sha256: bytes blake2s256: bytes length: int ctime: datetime.datetime status: str @dataclasses.dataclass class SkippedContentRow(BaseRow): TABLE = "skipped_content" PARTITION_KEY = ("sha1", "sha1_git", "sha256", "blake2s256") sha1: Optional[bytes] sha1_git: Optional[bytes] sha256: Optional[bytes] blake2s256: Optional[bytes] length: Optional[int] ctime: Optional[datetime.datetime] status: str reason: str origin: str @classmethod def from_dict(cls, d: Dict[str, Any]) -> "SkippedContentRow": d = d.copy() for k in ("sha1", "sha1_git", "sha256", "blake2s256"): if d[k] == MAGIC_NULL_PK: d[k] = None return super().from_dict(d) @dataclasses.dataclass class DirectoryRow(BaseRow): TABLE = "directory" PARTITION_KEY = ("id",) id: bytes @dataclasses.dataclass class DirectoryEntryRow(BaseRow): TABLE = "directory_entry" PARTITION_KEY = ("directory_id",) CLUSTERING_KEY = ("name",) directory_id: bytes name: bytes target: bytes perms: int type: str @dataclasses.dataclass class RevisionRow(BaseRow): TABLE = "revision" PARTITION_KEY = ("id",) id: bytes date: Optional[TimestampWithTimezone] committer_date: Optional[TimestampWithTimezone] type: str directory: bytes message: bytes author: Person committer: Person synthetic: bool metadata: str extra_headers: dict @dataclasses.dataclass class RevisionParentRow(BaseRow): TABLE = "revision_parent" PARTITION_KEY = ("id",) CLUSTERING_KEY = ("parent_rank",) id: bytes parent_rank: int parent_id: bytes @dataclasses.dataclass class ReleaseRow(BaseRow): TABLE = "release" PARTITION_KEY = ("id",) id: bytes target_type: str target: bytes date: TimestampWithTimezone name: bytes message: bytes author: Person synthetic: bool @dataclasses.dataclass class SnapshotRow(BaseRow): TABLE = "snapshot" PARTITION_KEY = ("id",) id: bytes @dataclasses.dataclass class SnapshotBranchRow(BaseRow): TABLE = "snapshot_branch" PARTITION_KEY = ("snapshot_id",) CLUSTERING_KEY = ("name",) snapshot_id: bytes name: bytes target_type: Optional[str] target: Optional[bytes] @dataclasses.dataclass class OriginVisitRow(BaseRow): TABLE = "origin_visit" PARTITION_KEY = ("origin",) CLUSTERING_KEY = ("visit",) origin: str visit: int date: datetime.datetime type: str @dataclasses.dataclass class OriginVisitStatusRow(BaseRow): TABLE = "origin_visit_status" PARTITION_KEY = ("origin",) CLUSTERING_KEY = ("visit", "date") origin: str visit: int date: datetime.datetime type: str status: str metadata: str snapshot: bytes @classmethod def from_dict(cls: Type[T], d: Dict[str, Any]) -> T: return cls(**d) # type: ignore @dataclasses.dataclass class OriginRow(BaseRow): TABLE = "origin" PARTITION_KEY = ("sha1",) sha1: bytes url: str next_visit_id: int @dataclasses.dataclass class MetadataAuthorityRow(BaseRow): TABLE = "metadata_authority" PARTITION_KEY = ("url",) CLUSTERING_KEY = ("type",) url: str type: str metadata: str @dataclasses.dataclass class MetadataFetcherRow(BaseRow): TABLE = "metadata_fetcher" PARTITION_KEY = ("name",) CLUSTERING_KEY = ("version",) name: str version: str metadata: str @dataclasses.dataclass class RawExtrinsicMetadataRow(BaseRow): TABLE = "raw_extrinsic_metadata" PARTITION_KEY = ("target",) CLUSTERING_KEY = ( "authority_type", "authority_url", "discovery_date", "id", ) id: bytes type: str target: str authority_type: str authority_url: str discovery_date: datetime.datetime fetcher_name: str fetcher_version: str format: str metadata: bytes origin: Optional[str] visit: Optional[int] snapshot: Optional[str] release: Optional[str] revision: Optional[str] path: Optional[bytes] directory: Optional[str] @dataclasses.dataclass class ObjectCountRow(BaseRow): TABLE = "object_count" PARTITION_KEY = ("partition_key",) CLUSTERING_KEY = ("object_type",) partition_key: int object_type: str count: int @dataclasses.dataclass class ExtIDRow(BaseRow): TABLE = "extid" PARTITION_KEY = ("target", "target_type", "extid", "extid_type") extid_type: str extid: bytes target_type: str target: bytes + + +@dataclasses.dataclass +class ExtIDByTargetRow(BaseRow): + TABLE = "extid_by_target" + PARTITION_KEY = ("target_type", "target") + CLUSTERING_KEY = ("target_token",) + + target_type: str + target: bytes + target_token: int diff --git a/swh/storage/cassandra/storage.py b/swh/storage/cassandra/storage.py index 8fae868e..ad8e18a4 100644 --- a/swh/storage/cassandra/storage.py +++ b/swh/storage/cassandra/storage.py @@ -1,1413 +1,1420 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import base64 import datetime import itertools import json import random import re from typing import ( Any, Callable, Dict, Iterable, List, Optional, Sequence, Set, Tuple, Union, ) import attr from swh.core.api.classes import stream_results from swh.core.api.serializers import msgpack_dumps, msgpack_loads from swh.model.hashutil import DEFAULT_ALGORITHMS from swh.model.identifiers import CoreSWHID, ExtendedSWHID from swh.model.identifiers import ObjectType as SwhidObjectType from swh.model.model import ( Content, Directory, DirectoryEntry, ExtID, MetadataAuthority, MetadataAuthorityType, MetadataFetcher, Origin, OriginVisit, OriginVisitStatus, RawExtrinsicMetadata, Release, Revision, Sha1Git, SkippedContent, Snapshot, SnapshotBranch, TargetType, ) from swh.storage.interface import ( VISIT_STATUSES, ListOrder, PagedResult, PartialBranches, Sha1, ) from swh.storage.objstorage import ObjStorage from swh.storage.utils import map_optional, now from swh.storage.writer import JournalWriter from . import converters from ..exc import HashCollision, StorageArgumentException from ..utils import remove_keys from .common import TOKEN_BEGIN, TOKEN_END, hash_url from .cql import CqlRunner from .model import ( ContentRow, DirectoryEntryRow, DirectoryRow, + ExtIDByTargetRow, ExtIDRow, MetadataAuthorityRow, MetadataFetcherRow, OriginRow, OriginVisitRow, OriginVisitStatusRow, RawExtrinsicMetadataRow, RevisionParentRow, SkippedContentRow, SnapshotBranchRow, SnapshotRow, ) from .schema import HASH_ALGORITHMS # Max block size of contents to return BULK_BLOCK_CONTENT_LEN_MAX = 10000 class CassandraStorage: def __init__(self, hosts, keyspace, objstorage, port=9042, journal_writer=None): self._cql_runner: CqlRunner = CqlRunner(hosts, keyspace, port) self.journal_writer: JournalWriter = JournalWriter(journal_writer) self.objstorage: ObjStorage = ObjStorage(objstorage) def check_config(self, *, check_write: bool) -> bool: self._cql_runner.check_read() return True def _content_get_from_hash(self, algo, hash_) -> Iterable: """From the name of a hash algorithm and a value of that hash, looks up the "hash -> token" secondary table (content_by_{algo}) to get tokens. Then, looks up the main table (content) to get all contents with that token, and filters out contents whose hash doesn't match.""" found_tokens = self._cql_runner.content_get_tokens_from_single_hash(algo, hash_) for token in found_tokens: assert isinstance(token, int), found_tokens # Query the main table ('content'). res = self._cql_runner.content_get_from_token(token) for row in res: # re-check the the hash (in case of murmur3 collision) if getattr(row, algo) == hash_: yield row def _content_add(self, contents: List[Content], with_data: bool) -> Dict: # Filter-out content already in the database. contents = [ c for c in contents if not self._cql_runner.content_get_from_pk(c.to_dict()) ] if with_data: # First insert to the objstorage, if the endpoint is # `content_add` (as opposed to `content_add_metadata`). # Must add to the objstorage before the DB and journal. Otherwise: # 1. in case of a crash the DB may "believe" we have the content, but # we didn't have time to write to the objstorage before the crash # 2. the objstorage mirroring, which reads from the journal, may attempt to # read from the objstorage before we finished writing it summary = self.objstorage.content_add( c for c in contents if c.status != "absent" ) content_add_bytes = summary["content:add:bytes"] self.journal_writer.content_add(contents) content_add = 0 for content in contents: content_add += 1 # Check for sha1 or sha1_git collisions. This test is not atomic # with the insertion, so it won't detect a collision if both # contents are inserted at the same time, but it's good enough. # # The proper way to do it would probably be a BATCH, but this # would be inefficient because of the number of partitions we # need to affect (len(HASH_ALGORITHMS)+1, which is currently 5) for algo in {"sha1", "sha1_git"}: collisions = [] # Get tokens of 'content' rows with the same value for # sha1/sha1_git rows = self._content_get_from_hash(algo, content.get_hash(algo)) for row in rows: if getattr(row, algo) != content.get_hash(algo): # collision of token(partition key), ignore this # row continue for other_algo in HASH_ALGORITHMS: if getattr(row, other_algo) != content.get_hash(other_algo): # This hash didn't match; discard the row. collisions.append( {k: getattr(row, k) for k in HASH_ALGORITHMS} ) if collisions: collisions.append(content.hashes()) raise HashCollision(algo, content.get_hash(algo), collisions) (token, insertion_finalizer) = self._cql_runner.content_add_prepare( ContentRow(**remove_keys(content.to_dict(), ("data",))) ) # Then add to index tables for algo in HASH_ALGORITHMS: self._cql_runner.content_index_add_one(algo, content, token) # Then to the main table insertion_finalizer() summary = { "content:add": content_add, } if with_data: summary["content:add:bytes"] = content_add_bytes return summary def content_add(self, content: List[Content]) -> Dict: - contents = [attr.evolve(c, ctime=now()) for c in content] + to_add = { + (c.sha1, c.sha1_git, c.sha256, c.blake2s256): c for c in content + }.values() + contents = [attr.evolve(c, ctime=now()) for c in to_add] return self._content_add(list(contents), with_data=True) def content_update( self, contents: List[Dict[str, Any]], keys: List[str] = [] ) -> None: raise NotImplementedError( "content_update is not supported by the Cassandra backend" ) def content_add_metadata(self, content: List[Content]) -> Dict: return self._content_add(content, with_data=False) def content_get_data(self, content: Sha1) -> Optional[bytes]: # FIXME: Make this method support slicing the `data` return self.objstorage.content_get(content) def content_get_partition( self, partition_id: int, nb_partitions: int, page_token: Optional[str] = None, limit: int = 1000, ) -> PagedResult[Content]: if limit is None: raise StorageArgumentException("limit should not be None") # Compute start and end of the range of tokens covered by the # requested partition partition_size = (TOKEN_END - TOKEN_BEGIN) // nb_partitions range_start = TOKEN_BEGIN + partition_id * partition_size range_end = TOKEN_BEGIN + (partition_id + 1) * partition_size # offset the range start according to the `page_token`. if page_token is not None: if not (range_start <= int(page_token) <= range_end): raise StorageArgumentException("Invalid page_token.") range_start = int(page_token) next_page_token: Optional[str] = None rows = self._cql_runner.content_get_token_range( range_start, range_end, limit + 1 ) contents = [] for counter, (tok, row) in enumerate(rows): if row.status == "absent": continue row_d = row.to_dict() if counter >= limit: next_page_token = str(tok) break row_d.pop("ctime") contents.append(Content(**row_d)) assert len(contents) <= limit return PagedResult(results=contents, next_page_token=next_page_token) def content_get(self, contents: List[Sha1]) -> List[Optional[Content]]: contents_by_sha1: Dict[Sha1, Optional[Content]] = {} for sha1 in contents: # Get all (sha1, sha1_git, sha256, blake2s256) whose sha1 # matches the argument, from the index table ('content_by_sha1') for row in self._content_get_from_hash("sha1", sha1): row_d = row.to_dict() row_d.pop("ctime") content = Content(**row_d) contents_by_sha1[content.sha1] = content return [contents_by_sha1.get(sha1) for sha1 in contents] def content_find(self, content: Dict[str, Any]) -> List[Content]: # Find an algorithm that is common to all the requested contents. # It will be used to do an initial filtering efficiently. filter_algos = list(set(content).intersection(HASH_ALGORITHMS)) if not filter_algos: raise StorageArgumentException( "content keys must contain at least one " f"of: {', '.join(sorted(HASH_ALGORITHMS))}" ) common_algo = filter_algos[0] results = [] rows = self._content_get_from_hash(common_algo, content[common_algo]) for row in rows: # Re-check all the hashes, in case of collisions (either of the # hash of the partition key, or the hashes in it) for algo in HASH_ALGORITHMS: if content.get(algo) and getattr(row, algo) != content[algo]: # This hash didn't match; discard the row. break else: # All hashes match, keep this row. row_d = row.to_dict() row_d["ctime"] = row.ctime.replace(tzinfo=datetime.timezone.utc) results.append(Content(**row_d)) return results def content_missing( self, contents: List[Dict[str, Any]], key_hash: str = "sha1" ) -> Iterable[bytes]: if key_hash not in DEFAULT_ALGORITHMS: raise StorageArgumentException( "key_hash should be one of {','.join(DEFAULT_ALGORITHMS)}" ) for content in contents: res = self.content_find(content) if not res: yield content[key_hash] def content_missing_per_sha1(self, contents: List[bytes]) -> Iterable[bytes]: return self.content_missing([{"sha1": c} for c in contents]) def content_missing_per_sha1_git( self, contents: List[Sha1Git] ) -> Iterable[Sha1Git]: return self.content_missing( [{"sha1_git": c} for c in contents], key_hash="sha1_git" ) def content_get_random(self) -> Sha1Git: content = self._cql_runner.content_get_random() assert content, "Could not find any content" return content.sha1_git def _skipped_content_add(self, contents: List[SkippedContent]) -> Dict: # Filter-out content already in the database. contents = [ c for c in contents if not self._cql_runner.skipped_content_get_from_pk(c.to_dict()) ] self.journal_writer.skipped_content_add(contents) for content in contents: # Compute token of the row in the main table (token, insertion_finalizer) = self._cql_runner.skipped_content_add_prepare( SkippedContentRow.from_dict({"origin": None, **content.to_dict()}) ) # Then add to index tables for algo in HASH_ALGORITHMS: self._cql_runner.skipped_content_index_add_one(algo, content, token) # Then to the main table insertion_finalizer() return {"skipped_content:add": len(contents)} def skipped_content_add(self, content: List[SkippedContent]) -> Dict: contents = [attr.evolve(c, ctime=now()) for c in content] return self._skipped_content_add(contents) def skipped_content_missing( self, contents: List[Dict[str, Any]] ) -> Iterable[Dict[str, Any]]: for content in contents: if not self._cql_runner.skipped_content_get_from_pk(content): yield {algo: content[algo] for algo in DEFAULT_ALGORITHMS} def directory_add(self, directories: List[Directory]) -> Dict: + to_add = {d.id: d for d in directories}.values() # Filter out directories that are already inserted. - missing = self.directory_missing([dir_.id for dir_ in directories]) + missing = self.directory_missing([dir_.id for dir_ in to_add]) directories = [dir_ for dir_ in directories if dir_.id in missing] self.journal_writer.directory_add(directories) for directory in directories: # Add directory entries to the 'directory_entry' table for entry in directory.entries: self._cql_runner.directory_entry_add_one( DirectoryEntryRow(directory_id=directory.id, **entry.to_dict()) ) # Add the directory *after* adding all the entries, so someone # calling snapshot_get_branch in the meantime won't end up # with half the entries. self._cql_runner.directory_add_one(DirectoryRow(id=directory.id)) return {"directory:add": len(directories)} def directory_missing(self, directories: List[Sha1Git]) -> Iterable[Sha1Git]: return self._cql_runner.directory_missing(directories) def _join_dentry_to_content(self, dentry: DirectoryEntry) -> Dict[str, Any]: contents: Union[List[Content], List[SkippedContentRow]] keys = ( "status", "sha1", "sha1_git", "sha256", "length", ) ret = dict.fromkeys(keys) ret.update(dentry.to_dict()) if ret["type"] == "file": contents = self.content_find({"sha1_git": ret["target"]}) if not contents: tokens = list( self._cql_runner.skipped_content_get_tokens_from_single_hash( "sha1_git", ret["target"] ) ) if tokens: contents = list( self._cql_runner.skipped_content_get_from_token(tokens[0]) ) if contents: content = contents[0] for key in keys: ret[key] = getattr(content, key) return ret def _directory_ls( self, directory_id: Sha1Git, recursive: bool, prefix: bytes = b"" ) -> Iterable[Dict[str, Any]]: if self.directory_missing([directory_id]): return rows = list(self._cql_runner.directory_entry_get([directory_id])) for row in rows: entry_d = row.to_dict() # Build and yield the directory entry dict del entry_d["directory_id"] entry = DirectoryEntry.from_dict(entry_d) ret = self._join_dentry_to_content(entry) ret["name"] = prefix + ret["name"] ret["dir_id"] = directory_id yield ret if recursive and ret["type"] == "dir": yield from self._directory_ls( ret["target"], True, prefix + ret["name"] + b"/" ) def directory_entry_get_by_path( self, directory: Sha1Git, paths: List[bytes] ) -> Optional[Dict[str, Any]]: return self._directory_entry_get_by_path(directory, paths, b"") def _directory_entry_get_by_path( self, directory: Sha1Git, paths: List[bytes], prefix: bytes ) -> Optional[Dict[str, Any]]: if not paths: return None contents = list(self.directory_ls(directory)) if not contents: return None def _get_entry(entries, name): """Finds the entry with the requested name, prepends the prefix (to get its full path), and returns it. If no entry has that name, returns None.""" for entry in entries: if entry["name"] == name: entry = entry.copy() entry["name"] = prefix + entry["name"] return entry first_item = _get_entry(contents, paths[0]) if len(paths) == 1: return first_item if not first_item or first_item["type"] != "dir": return None return self._directory_entry_get_by_path( first_item["target"], paths[1:], prefix + paths[0] + b"/" ) def directory_ls( self, directory: Sha1Git, recursive: bool = False ) -> Iterable[Dict[str, Any]]: yield from self._directory_ls(directory, recursive) def directory_get_random(self) -> Sha1Git: directory = self._cql_runner.directory_get_random() assert directory, "Could not find any directory" return directory.id def revision_add(self, revisions: List[Revision]) -> Dict: # Filter-out revisions already in the database - missing = self.revision_missing([rev.id for rev in revisions]) + to_add = {r.id: r for r in revisions}.values() + missing = self.revision_missing([rev.id for rev in to_add]) revisions = [rev for rev in revisions if rev.id in missing] self.journal_writer.revision_add(revisions) for revision in revisions: revobject = converters.revision_to_db(revision) if revobject: # Add parents first for (rank, parent) in enumerate(revision.parents): self._cql_runner.revision_parent_add_one( RevisionParentRow( id=revobject.id, parent_rank=rank, parent_id=parent ) ) # Then write the main revision row. # Writing this after all parents were written ensures that # read endpoints don't return a partial view while writing # the parents self._cql_runner.revision_add_one(revobject) return {"revision:add": len(revisions)} def revision_missing(self, revisions: List[Sha1Git]) -> Iterable[Sha1Git]: return self._cql_runner.revision_missing(revisions) def revision_get(self, revision_ids: List[Sha1Git]) -> List[Optional[Revision]]: rows = self._cql_runner.revision_get(revision_ids) revisions: Dict[Sha1Git, Revision] = {} for row in rows: # TODO: use a single query to get all parents? # (it might have lower latency, but requires more code and more # bandwidth, because revision id would be part of each returned # row) parents = tuple(self._cql_runner.revision_parent_get(row.id)) # parent_rank is the clustering key, so results are already # sorted by rank. rev = converters.revision_from_db(row, parents=parents) revisions[rev.id] = rev return [revisions.get(rev_id) for rev_id in revision_ids] def _get_parent_revs( self, rev_ids: Iterable[Sha1Git], seen: Set[Sha1Git], limit: Optional[int], short: bool, ) -> Union[ Iterable[Dict[str, Any]], Iterable[Tuple[Sha1Git, Tuple[Sha1Git, ...]]], ]: if limit and len(seen) >= limit: return rev_ids = [id_ for id_ in rev_ids if id_ not in seen] if not rev_ids: return seen |= set(rev_ids) # We need this query, even if short=True, to return consistent # results (ie. not return only a subset of a revision's parents # if it is being written) if short: ids = self._cql_runner.revision_get_ids(rev_ids) for id_ in ids: # TODO: use a single query to get all parents? # (it might have less latency, but requires less code and more # bandwidth (because revision id would be part of each returned # row) parents = tuple(self._cql_runner.revision_parent_get(id_)) # parent_rank is the clustering key, so results are already # sorted by rank. yield (id_, parents) yield from self._get_parent_revs(parents, seen, limit, short) else: rows = self._cql_runner.revision_get(rev_ids) for row in rows: # TODO: use a single query to get all parents? # (it might have less latency, but requires less code and more # bandwidth (because revision id would be part of each returned # row) parents = tuple(self._cql_runner.revision_parent_get(row.id)) # parent_rank is the clustering key, so results are already # sorted by rank. rev = converters.revision_from_db(row, parents=parents) yield rev.to_dict() yield from self._get_parent_revs(parents, seen, limit, short) def revision_log( self, revisions: List[Sha1Git], limit: Optional[int] = None ) -> Iterable[Optional[Dict[str, Any]]]: seen: Set[Sha1Git] = set() yield from self._get_parent_revs(revisions, seen, limit, False) def revision_shortlog( self, revisions: List[Sha1Git], limit: Optional[int] = None ) -> Iterable[Optional[Tuple[Sha1Git, Tuple[Sha1Git, ...]]]]: seen: Set[Sha1Git] = set() yield from self._get_parent_revs(revisions, seen, limit, True) def revision_get_random(self) -> Sha1Git: revision = self._cql_runner.revision_get_random() assert revision, "Could not find any revision" return revision.id def release_add(self, releases: List[Release]) -> Dict: - to_add = [] - for rel in releases: - if rel not in to_add: - to_add.append(rel) + to_add = {r.id: r for r in releases}.values() missing = set(self.release_missing([rel.id for rel in to_add])) - to_add = [rel for rel in to_add if rel.id in missing] - - self.journal_writer.release_add(to_add) + releases = [rel for rel in to_add if rel.id in missing] + self.journal_writer.release_add(releases) - for release in to_add: + for release in releases: if release: self._cql_runner.release_add_one(converters.release_to_db(release)) - return {"release:add": len(to_add)} + return {"release:add": len(releases)} def release_missing(self, releases: List[Sha1Git]) -> Iterable[Sha1Git]: return self._cql_runner.release_missing(releases) def release_get(self, releases: List[Sha1Git]) -> List[Optional[Release]]: rows = self._cql_runner.release_get(releases) rels: Dict[Sha1Git, Release] = {} for row in rows: release = converters.release_from_db(row) rels[row.id] = release return [rels.get(rel_id) for rel_id in releases] def release_get_random(self) -> Sha1Git: release = self._cql_runner.release_get_random() assert release, "Could not find any release" return release.id def snapshot_add(self, snapshots: List[Snapshot]) -> Dict: - missing = self._cql_runner.snapshot_missing([snp.id for snp in snapshots]) + to_add = {s.id: s for s in snapshots}.values() + missing = self._cql_runner.snapshot_missing([snp.id for snp in to_add]) snapshots = [snp for snp in snapshots if snp.id in missing] for snapshot in snapshots: self.journal_writer.snapshot_add([snapshot]) # Add branches for (branch_name, branch) in snapshot.branches.items(): if branch is None: target_type: Optional[str] = None target: Optional[bytes] = None else: target_type = branch.target_type.value target = branch.target self._cql_runner.snapshot_branch_add_one( SnapshotBranchRow( snapshot_id=snapshot.id, name=branch_name, target_type=target_type, target=target, ) ) # Add the snapshot *after* adding all the branches, so someone # calling snapshot_get_branch in the meantime won't end up # with half the branches. self._cql_runner.snapshot_add_one(SnapshotRow(id=snapshot.id)) return {"snapshot:add": len(snapshots)} def snapshot_missing(self, snapshots: List[Sha1Git]) -> Iterable[Sha1Git]: return self._cql_runner.snapshot_missing(snapshots) def snapshot_get(self, snapshot_id: Sha1Git) -> Optional[Dict[str, Any]]: d = self.snapshot_get_branches(snapshot_id) if d is None: return None return { "id": d["id"], "branches": { name: branch.to_dict() if branch else None for (name, branch) in d["branches"].items() }, "next_branch": d["next_branch"], } def snapshot_count_branches( self, snapshot_id: Sha1Git, branch_name_exclude_prefix: Optional[bytes] = None, ) -> Optional[Dict[Optional[str], int]]: if self._cql_runner.snapshot_missing([snapshot_id]): # Makes sure we don't fetch branches for a snapshot that is # being added. return None return self._cql_runner.snapshot_count_branches( snapshot_id, branch_name_exclude_prefix ) def snapshot_get_branches( self, snapshot_id: Sha1Git, branches_from: bytes = b"", branches_count: int = 1000, target_types: Optional[List[str]] = None, branch_name_include_substring: Optional[bytes] = None, branch_name_exclude_prefix: Optional[bytes] = None, ) -> Optional[PartialBranches]: if self._cql_runner.snapshot_missing([snapshot_id]): # Makes sure we don't fetch branches for a snapshot that is # being added. return None branches: List = [] while len(branches) < branches_count + 1: new_branches = list( self._cql_runner.snapshot_branch_get( snapshot_id, branches_from, branches_count + 1, branch_name_exclude_prefix, ) ) if not new_branches: break branches_from = new_branches[-1].name new_branches_filtered = new_branches # Filter by target_type if target_types: new_branches_filtered = [ branch for branch in new_branches_filtered if branch.target is not None and branch.target_type in target_types ] # Filter by branches_name_pattern if branch_name_include_substring: new_branches_filtered = [ branch for branch in new_branches_filtered if branch.name is not None and ( branch_name_include_substring is None or branch_name_include_substring in branch.name ) ] branches.extend(new_branches_filtered) if len(new_branches) < branches_count + 1: break if len(branches) > branches_count: last_branch = branches.pop(-1).name else: last_branch = None return PartialBranches( id=snapshot_id, branches={ branch.name: None if branch.target is None else SnapshotBranch( target=branch.target, target_type=TargetType(branch.target_type) ) for branch in branches }, next_branch=last_branch, ) def snapshot_get_random(self) -> Sha1Git: snapshot = self._cql_runner.snapshot_get_random() assert snapshot, "Could not find any snapshot" return snapshot.id def object_find_by_sha1_git(self, ids: List[Sha1Git]) -> Dict[Sha1Git, List[Dict]]: results: Dict[Sha1Git, List[Dict]] = {id_: [] for id_ in ids} missing_ids = set(ids) # Mind the order, revision is the most likely one for a given ID, # so we check revisions first. queries: List[Tuple[str, Callable[[List[Sha1Git]], List[Sha1Git]]]] = [ ("revision", self._cql_runner.revision_missing), ("release", self._cql_runner.release_missing), ("content", self._cql_runner.content_missing_by_sha1_git), ("directory", self._cql_runner.directory_missing), ] for (object_type, query_fn) in queries: found_ids = missing_ids - set(query_fn(list(missing_ids))) for sha1_git in found_ids: results[sha1_git].append( {"sha1_git": sha1_git, "type": object_type,} ) missing_ids.remove(sha1_git) if not missing_ids: # We found everything, skipping the next queries. break return results def origin_get(self, origins: List[str]) -> Iterable[Optional[Origin]]: return [self.origin_get_one(origin) for origin in origins] def origin_get_one(self, origin_url: str) -> Optional[Origin]: """Given an origin url, return the origin if it exists, None otherwise """ rows = list(self._cql_runner.origin_get_by_url(origin_url)) if rows: assert len(rows) == 1 return Origin(url=rows[0].url) else: return None def origin_get_by_sha1(self, sha1s: List[bytes]) -> List[Optional[Dict[str, Any]]]: results = [] for sha1 in sha1s: rows = list(self._cql_runner.origin_get_by_sha1(sha1)) origin = {"url": rows[0].url} if rows else None results.append(origin) return results def origin_list( self, page_token: Optional[str] = None, limit: int = 100 ) -> PagedResult[Origin]: # Compute what token to begin the listing from start_token = TOKEN_BEGIN if page_token: start_token = int(page_token) if not (TOKEN_BEGIN <= start_token <= TOKEN_END): raise StorageArgumentException("Invalid page_token.") next_page_token = None origins = [] # Take one more origin so we can reuse it as the next page token if any for (tok, row) in self._cql_runner.origin_list(start_token, limit + 1): origins.append(Origin(url=row.url)) # keep reference of the last id for pagination purposes last_id = tok if len(origins) > limit: # last origin id is the next page token next_page_token = str(last_id) # excluding that origin from the result to respect the limit size origins = origins[:limit] assert len(origins) <= limit return PagedResult(results=origins, next_page_token=next_page_token) def origin_search( self, url_pattern: str, page_token: Optional[str] = None, limit: int = 50, regexp: bool = False, with_visit: bool = False, visit_types: Optional[List[str]] = None, ) -> PagedResult[Origin]: # TODO: remove this endpoint, swh-search should be used instead. next_page_token = None offset = int(page_token) if page_token else 0 origin_rows = [row for row in self._cql_runner.origin_iter_all()] if regexp: pat = re.compile(url_pattern) origin_rows = [row for row in origin_rows if pat.search(row.url)] else: origin_rows = [row for row in origin_rows if url_pattern in row.url] if with_visit: origin_rows = [row for row in origin_rows if row.next_visit_id > 1] if visit_types: def _has_visit_types(origin, visit_types): for origin_visit in stream_results(self.origin_visit_get, origin): if origin_visit.type in visit_types: return True return False origin_rows = [ row for row in origin_rows if _has_visit_types(row.url, visit_types) ] origins = [Origin(url=row.url) for row in origin_rows] origins = origins[offset : offset + limit + 1] if len(origins) > limit: # next offset next_page_token = str(offset + limit) # excluding that origin from the result to respect the limit size origins = origins[:limit] assert len(origins) <= limit return PagedResult(results=origins, next_page_token=next_page_token) def origin_count( self, url_pattern: str, regexp: bool = False, with_visit: bool = False ) -> int: raise NotImplementedError( "The Cassandra backend does not implement origin_count" ) def origin_add(self, origins: List[Origin]) -> Dict[str, int]: - to_add = [ori for ori in origins if self.origin_get_one(ori.url) is None] - # keep only one occurrence of each given origin while keeping the list - # sorted as originally given - to_add = sorted(set(to_add), key=to_add.index) - self.journal_writer.origin_add(to_add) - for origin in to_add: + to_add = {o.url: o for o in origins}.values() + origins = [ori for ori in to_add if self.origin_get_one(ori.url) is None] + + self.journal_writer.origin_add(origins) + for origin in origins: self._cql_runner.origin_add_one( OriginRow(sha1=hash_url(origin.url), url=origin.url, next_visit_id=1) ) - return {"origin:add": len(to_add)} + return {"origin:add": len(origins)} def origin_visit_add(self, visits: List[OriginVisit]) -> Iterable[OriginVisit]: for visit in visits: origin = self.origin_get_one(visit.origin) if not origin: # Cannot add a visit without an origin raise StorageArgumentException("Unknown origin %s", visit.origin) all_visits = [] nb_visits = 0 for visit in visits: nb_visits += 1 if not visit.visit: visit_id = self._cql_runner.origin_generate_unique_visit_id( visit.origin ) visit = attr.evolve(visit, visit=visit_id) self.journal_writer.origin_visit_add([visit]) self._cql_runner.origin_visit_add_one(OriginVisitRow(**visit.to_dict())) assert visit.visit is not None all_visits.append(visit) self._origin_visit_status_add( OriginVisitStatus( origin=visit.origin, visit=visit.visit, date=visit.date, type=visit.type, status="created", snapshot=None, ) ) return all_visits def _origin_visit_status_add(self, visit_status: OriginVisitStatus) -> None: """Add an origin visit status""" if visit_status.type is None: visit_row = self._cql_runner.origin_visit_get_one( visit_status.origin, visit_status.visit ) if visit_row is None: raise StorageArgumentException( f"Unknown origin visit {visit_status.visit} " f"of origin {visit_status.origin}" ) visit_status = attr.evolve(visit_status, type=visit_row.type) self.journal_writer.origin_visit_status_add([visit_status]) self._cql_runner.origin_visit_status_add_one( converters.visit_status_to_row(visit_status) ) def origin_visit_status_add(self, visit_statuses: List[OriginVisitStatus]) -> None: # First round to check existence (fail early if any is ko) for visit_status in visit_statuses: origin_url = self.origin_get_one(visit_status.origin) if not origin_url: raise StorageArgumentException(f"Unknown origin {visit_status.origin}") for visit_status in visit_statuses: self._origin_visit_status_add(visit_status) def _origin_visit_apply_status( self, visit: Dict[str, Any], visit_status: OriginVisitStatusRow ) -> Dict[str, Any]: """Retrieve the latest visit status information for the origin visit. Then merge it with the visit and return it. """ return { # default to the values in visit **visit, # override with the last update **visit_status.to_dict(), # visit['origin'] is the URL (via a join), while # visit_status['origin'] is only an id. "origin": visit["origin"], # but keep the date of the creation of the origin visit "date": visit["date"], # We use the visit type from origin visit # if it's not present on the origin visit status "type": visit_status.type or visit["type"], } def _origin_visit_get_latest_status(self, visit: OriginVisit) -> OriginVisitStatus: """Retrieve the latest visit status information for the origin visit object. """ assert visit.visit row = self._cql_runner.origin_visit_status_get_latest(visit.origin, visit.visit) assert row is not None visit_status = converters.row_to_visit_status(row) return attr.evolve(visit_status, origin=visit.origin) @staticmethod def _format_origin_visit_row(visit): return { **visit.to_dict(), "origin": visit.origin, "date": visit.date.replace(tzinfo=datetime.timezone.utc), } def origin_visit_get( self, origin: str, page_token: Optional[str] = None, order: ListOrder = ListOrder.ASC, limit: int = 10, ) -> PagedResult[OriginVisit]: if not isinstance(order, ListOrder): raise StorageArgumentException("order must be a ListOrder value") if page_token and not isinstance(page_token, str): raise StorageArgumentException("page_token must be a string.") next_page_token = None visit_from = None if page_token is None else int(page_token) visits: List[OriginVisit] = [] extra_limit = limit + 1 rows = self._cql_runner.origin_visit_get(origin, visit_from, extra_limit, order) for row in rows: visits.append(converters.row_to_visit(row)) assert len(visits) <= extra_limit if len(visits) == extra_limit: visits = visits[:limit] next_page_token = str(visits[-1].visit) return PagedResult(results=visits, next_page_token=next_page_token) def origin_visit_status_get( self, origin: str, visit: int, page_token: Optional[str] = None, order: ListOrder = ListOrder.ASC, limit: int = 10, ) -> PagedResult[OriginVisitStatus]: next_page_token = None date_from = None if page_token is not None: date_from = datetime.datetime.fromisoformat(page_token) # Take one more visit status so we can reuse it as the next page token if any rows = self._cql_runner.origin_visit_status_get_range( origin, visit, date_from, limit + 1, order ) visit_statuses = [converters.row_to_visit_status(row) for row in rows] if len(visit_statuses) > limit: # last visit status date is the next page token next_page_token = str(visit_statuses[-1].date) # excluding that visit status from the result to respect the limit size visit_statuses = visit_statuses[:limit] return PagedResult(results=visit_statuses, next_page_token=next_page_token) def origin_visit_find_by_date( self, origin: str, visit_date: datetime.datetime ) -> Optional[OriginVisit]: # Iterator over all the visits of the origin # This should be ok for now, as there aren't too many visits # per origin. rows = list(self._cql_runner.origin_visit_get_all(origin)) def key(visit): dt = visit.date.replace(tzinfo=datetime.timezone.utc) - visit_date return (abs(dt), -visit.visit) if rows: return converters.row_to_visit(min(rows, key=key)) return None def origin_visit_get_by(self, origin: str, visit: int) -> Optional[OriginVisit]: row = self._cql_runner.origin_visit_get_one(origin, visit) if row: return converters.row_to_visit(row) return None def origin_visit_get_latest( self, origin: str, type: Optional[str] = None, allowed_statuses: Optional[List[str]] = None, require_snapshot: bool = False, ) -> Optional[OriginVisit]: if allowed_statuses and not set(allowed_statuses).intersection(VISIT_STATUSES): raise StorageArgumentException( f"Unknown allowed statuses {','.join(allowed_statuses)}, only " f"{','.join(VISIT_STATUSES)} authorized" ) # TODO: Do not fetch all visits rows = self._cql_runner.origin_visit_get_all(origin) latest_visit = None for row in rows: visit = self._format_origin_visit_row(row) for status_row in self._cql_runner.origin_visit_status_get( origin, visit["visit"] ): updated_visit = self._origin_visit_apply_status(visit, status_row) if type is not None and updated_visit["type"] != type: continue if allowed_statuses and updated_visit["status"] not in allowed_statuses: continue if require_snapshot and updated_visit["snapshot"] is None: continue # updated_visit is a candidate if latest_visit is not None: if updated_visit["date"] < latest_visit["date"]: continue if updated_visit["visit"] < latest_visit["visit"]: continue latest_visit = updated_visit if latest_visit is None: return None return OriginVisit( origin=latest_visit["origin"], visit=latest_visit["visit"], date=latest_visit["date"], type=latest_visit["type"], ) def origin_visit_status_get_latest( self, origin_url: str, visit: int, allowed_statuses: Optional[List[str]] = None, require_snapshot: bool = False, ) -> Optional[OriginVisitStatus]: if allowed_statuses and not set(allowed_statuses).intersection(VISIT_STATUSES): raise StorageArgumentException( f"Unknown allowed statuses {','.join(allowed_statuses)}, only " f"{','.join(VISIT_STATUSES)} authorized" ) rows = list(self._cql_runner.origin_visit_status_get(origin_url, visit)) # filtering is done python side as we cannot do it server side if allowed_statuses: rows = [row for row in rows if row.status in allowed_statuses] if require_snapshot: rows = [row for row in rows if row.snapshot is not None] if not rows: return None return converters.row_to_visit_status(rows[0]) def origin_visit_status_get_random(self, type: str) -> Optional[OriginVisitStatus]: back_in_the_day = now() - datetime.timedelta(weeks=12) # 3 months back # Random position to start iteration at start_token = random.randint(TOKEN_BEGIN, TOKEN_END) # Iterator over all visits, ordered by token(origins) then visit_id rows = self._cql_runner.origin_visit_iter(start_token) for row in rows: visit = converters.row_to_visit(row) visit_status = self._origin_visit_get_latest_status(visit) if visit.date > back_in_the_day and visit_status.status == "full": return visit_status return None def stat_counters(self): rows = self._cql_runner.stat_counters() keys = ( "content", "directory", "origin", "origin_visit", "release", "revision", "skipped_content", "snapshot", ) stats = {key: 0 for key in keys} stats.update({row.object_type: row.count for row in rows}) return stats def refresh_stat_counters(self): pass def raw_extrinsic_metadata_add(self, metadata: List[RawExtrinsicMetadata]) -> None: self.journal_writer.raw_extrinsic_metadata_add(metadata) for metadata_entry in metadata: if not self._cql_runner.metadata_authority_get( metadata_entry.authority.type.value, metadata_entry.authority.url ): raise StorageArgumentException( f"Unknown authority {metadata_entry.authority}" ) if not self._cql_runner.metadata_fetcher_get( metadata_entry.fetcher.name, metadata_entry.fetcher.version ): raise StorageArgumentException( f"Unknown fetcher {metadata_entry.fetcher}" ) try: row = RawExtrinsicMetadataRow( id=metadata_entry.id, type=metadata_entry.target.object_type.name.lower(), target=str(metadata_entry.target), authority_type=metadata_entry.authority.type.value, authority_url=metadata_entry.authority.url, discovery_date=metadata_entry.discovery_date, fetcher_name=metadata_entry.fetcher.name, fetcher_version=metadata_entry.fetcher.version, format=metadata_entry.format, metadata=metadata_entry.metadata, origin=metadata_entry.origin, visit=metadata_entry.visit, snapshot=map_optional(str, metadata_entry.snapshot), release=map_optional(str, metadata_entry.release), revision=map_optional(str, metadata_entry.revision), path=metadata_entry.path, directory=map_optional(str, metadata_entry.directory), ) self._cql_runner.raw_extrinsic_metadata_add(row) except TypeError as e: raise StorageArgumentException(*e.args) def raw_extrinsic_metadata_get( self, target: ExtendedSWHID, authority: MetadataAuthority, after: Optional[datetime.datetime] = None, page_token: Optional[bytes] = None, limit: int = 1000, ) -> PagedResult[RawExtrinsicMetadata]: if page_token is not None: (after_date, id_) = msgpack_loads(base64.b64decode(page_token)) if after and after_date < after: raise StorageArgumentException( "page_token is inconsistent with the value of 'after'." ) entries = self._cql_runner.raw_extrinsic_metadata_get_after_date_and_id( str(target), authority.type.value, authority.url, after_date, id_, ) elif after is not None: entries = self._cql_runner.raw_extrinsic_metadata_get_after_date( str(target), authority.type.value, authority.url, after ) else: entries = self._cql_runner.raw_extrinsic_metadata_get( str(target), authority.type.value, authority.url ) if limit: entries = itertools.islice(entries, 0, limit + 1) results = [] for entry in entries: discovery_date = entry.discovery_date.replace(tzinfo=datetime.timezone.utc) assert str(target) == entry.target result = RawExtrinsicMetadata( target=target, authority=MetadataAuthority( type=MetadataAuthorityType(entry.authority_type), url=entry.authority_url, ), fetcher=MetadataFetcher( name=entry.fetcher_name, version=entry.fetcher_version, ), discovery_date=discovery_date, format=entry.format, metadata=entry.metadata, origin=entry.origin, visit=entry.visit, snapshot=map_optional(CoreSWHID.from_string, entry.snapshot), release=map_optional(CoreSWHID.from_string, entry.release), revision=map_optional(CoreSWHID.from_string, entry.revision), path=entry.path, directory=map_optional(CoreSWHID.from_string, entry.directory), ) results.append(result) if len(results) > limit: results.pop() assert len(results) == limit last_result = results[-1] next_page_token: Optional[str] = base64.b64encode( msgpack_dumps((last_result.discovery_date, last_result.id,)) ).decode() else: next_page_token = None return PagedResult(next_page_token=next_page_token, results=results,) def metadata_fetcher_add(self, fetchers: List[MetadataFetcher]) -> None: self.journal_writer.metadata_fetcher_add(fetchers) for fetcher in fetchers: self._cql_runner.metadata_fetcher_add( MetadataFetcherRow( name=fetcher.name, version=fetcher.version, metadata=json.dumps(map_optional(dict, fetcher.metadata)), ) ) def metadata_fetcher_get( self, name: str, version: str ) -> Optional[MetadataFetcher]: fetcher = self._cql_runner.metadata_fetcher_get(name, version) if fetcher: return MetadataFetcher( name=fetcher.name, version=fetcher.version, metadata=json.loads(fetcher.metadata), ) else: return None def metadata_authority_add(self, authorities: List[MetadataAuthority]) -> None: self.journal_writer.metadata_authority_add(authorities) for authority in authorities: self._cql_runner.metadata_authority_add( MetadataAuthorityRow( url=authority.url, type=authority.type.value, metadata=json.dumps(map_optional(dict, authority.metadata)), ) ) def metadata_authority_get( self, type: MetadataAuthorityType, url: str ) -> Optional[MetadataAuthority]: authority = self._cql_runner.metadata_authority_get(type.value, url) if authority: return MetadataAuthority( type=MetadataAuthorityType(authority.type), url=authority.url, metadata=json.loads(authority.metadata), ) else: return None # ExtID tables def extid_add(self, ids: List[ExtID]) -> Dict[str, int]: extids = [ extid for extid in ids if not self._cql_runner.extid_get_from_pk( extid_type=extid.extid_type, extid=extid.extid, target=extid.target, ) ] self.journal_writer.extid_add(extids) inserted = 0 for extid in extids: + target_type = extid.target.object_type.value + target = extid.target.object_id extidrow = ExtIDRow( extid_type=extid.extid_type, extid=extid.extid, - target_type=extid.target.object_type.value, - target=extid.target.object_id, + target_type=target_type, + target=target, ) (token, insertion_finalizer) = self._cql_runner.extid_add_prepare(extidrow) - self._cql_runner.extid_index_add_one(extidrow, token) + indexrow = ExtIDByTargetRow( + target_type=target_type, target=target, target_token=token, + ) + self._cql_runner.extid_index_add_one(indexrow) insertion_finalizer() inserted += 1 return {"extid:add": inserted} def extid_get_from_extid(self, id_type: str, ids: List[bytes]) -> List[ExtID]: result: List[ExtID] = [] for extid in ids: extidrows = list(self._cql_runner.extid_get_from_extid(id_type, extid)) result.extend( ExtID( extid_type=extidrow.extid_type, extid=extidrow.extid, target=CoreSWHID( object_type=extidrow.target_type, object_id=extidrow.target, ), ) for extidrow in extidrows ) return result def extid_get_from_target( self, target_type: SwhidObjectType, ids: List[Sha1Git] ) -> List[ExtID]: result: List[ExtID] = [] for target in ids: extidrows = list( self._cql_runner.extid_get_from_target(target_type.value, target) ) result.extend( ExtID( extid_type=extidrow.extid_type, extid=extidrow.extid, target=CoreSWHID( object_type=SwhidObjectType(extidrow.target_type), object_id=extidrow.target, ), ) for extidrow in extidrows ) return result # Misc def clear_buffers(self, object_types: Sequence[str] = ()) -> None: """Do nothing """ return None def flush(self, object_types: Sequence[str] = ()) -> Dict[str, int]: return {} diff --git a/swh/storage/cli.py b/swh/storage/cli.py index d21b9071..34fd17e0 100644 --- a/swh/storage/cli.py +++ b/swh/storage/cli.py @@ -1,226 +1,227 @@ # Copyright (C) 2015-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # WARNING: do not import unnecessary things here to keep cli startup time under # control import logging import os from typing import Dict, Optional import click from swh.core.cli import CONTEXT_SETTINGS from swh.core.cli import swh as swh_cli_group try: from systemd.daemon import notify except ImportError: notify = None @swh_cli_group.group(name="storage", context_settings=CONTEXT_SETTINGS) @click.option( "--config-file", "-C", default=None, type=click.Path(exists=True, dir_okay=False,), help="Configuration file.", ) @click.option( "--check-config", default=None, type=click.Choice(["no", "read", "write"]), help=( "Check the configuration of the storage at startup for read or write access; " "if set, override the value present in the configuration file if any. " "Defaults to 'read' for the 'backfill' command, and 'write' for 'rpc-server' " "and 'replay' commands." ), ) @click.pass_context def storage(ctx, config_file, check_config): """Software Heritage Storage tools.""" from swh.core import config if not config_file: config_file = os.environ.get("SWH_CONFIG_FILENAME") if config_file: if not os.path.exists(config_file): raise ValueError("%s does not exist" % config_file) conf = config.read(config_file) else: conf = {} if "storage" not in conf: ctx.fail("You must have a storage configured in your config file.") ctx.ensure_object(dict) ctx.obj["config"] = conf ctx.obj["check_config"] = check_config @storage.command(name="rpc-serve") @click.option( "--host", default="0.0.0.0", metavar="IP", show_default=True, help="Host ip address to bind the server on", ) @click.option( "--port", default=5002, type=click.INT, metavar="PORT", show_default=True, help="Binding port of the server", ) @click.option( "--debug/--no-debug", default=True, help="Indicates if the server should run in debug mode", ) @click.pass_context def serve(ctx, host, port, debug): """Software Heritage Storage RPC server. Do NOT use this in a production environment. """ from swh.storage.api.server import app if "log_level" in ctx.obj: logging.getLogger("werkzeug").setLevel(ctx.obj["log_level"]) ensure_check_config(ctx.obj["config"], ctx.obj["check_config"], "write") app.config.update(ctx.obj["config"]) app.run(host, port=int(port), debug=bool(debug)) @storage.command() @click.argument("object_type") @click.option("--start-object", default=None) @click.option("--end-object", default=None) @click.option("--dry-run", is_flag=True, default=False) @click.pass_context def backfill(ctx, object_type, start_object, end_object, dry_run): """Run the backfiller The backfiller list objects from a Storage and produce journal entries from there. Typically used to rebuild a journal or compensate for missing objects in a journal (eg. due to a downtime of this later). The configuration file requires the following entries: + - brokers: a list of kafka endpoints (the journal) in which entries will be - added. + added. - storage_dbconn: URL to connect to the storage DB. - prefix: the prefix of the topics (topics will be .). - client_id: the kafka client ID. """ ensure_check_config(ctx.obj["config"], ctx.obj["check_config"], "read") # for "lazy" loading from swh.storage.backfill import JournalBackfiller try: from systemd.daemon import notify except ImportError: notify = None conf = ctx.obj["config"] backfiller = JournalBackfiller(conf) if notify: notify("READY=1") try: backfiller.run( object_type=object_type, start_object=start_object, end_object=end_object, dry_run=dry_run, ) except KeyboardInterrupt: if notify: notify("STOPPING=1") ctx.exit(0) @storage.command() @click.option( "--stop-after-objects", "-n", default=None, type=int, help="Stop after processing this many objects. Default is to " "run forever.", ) @click.pass_context def replay(ctx, stop_after_objects): """Fill a Storage by reading a Journal. There can be several 'replayers' filling a Storage as long as they use the same `group-id`. """ import functools from swh.journal.client import get_journal_client from swh.storage import get_storage from swh.storage.replay import process_replay_objects ensure_check_config(ctx.obj["config"], ctx.obj["check_config"], "write") conf = ctx.obj["config"] storage = get_storage(**conf.pop("storage")) client_cfg = conf.pop("journal_client") if stop_after_objects: client_cfg["stop_after_objects"] = stop_after_objects try: client = get_journal_client(**client_cfg) except ValueError as exc: ctx.fail(exc) worker_fn = functools.partial(process_replay_objects, storage=storage) if notify: notify("READY=1") try: client.process(worker_fn) except KeyboardInterrupt: ctx.exit(0) else: print("Done.") finally: if notify: notify("STOPPING=1") client.close() def ensure_check_config(storage_cfg: Dict, check_config: Optional[str], default: str): """Helper function to inject the setting of check_config option in the storage config dict according to the expected default value (default value depends on the command, eg. backfill can be read-only). """ if check_config is not None: if check_config == "no": storage_cfg.pop("check_config", None) else: storage_cfg["check_config"] = {"check_write": check_config == "write"} else: if "check_config" not in storage_cfg: storage_cfg["check_config"] = {"check_write": default == "write"} def main(): logging.basicConfig() return serve(auto_envvar_prefix="SWH_STORAGE") if __name__ == "__main__": main() diff --git a/swh/storage/fixer.py b/swh/storage/fixer.py index a719954a..4b478edb 100644 --- a/swh/storage/fixer.py +++ b/swh/storage/fixer.py @@ -1,308 +1,332 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import copy import datetime import logging from typing import Any, Dict, List, Optional from swh.model.identifiers import normalize_timestamp +from swh.model.model import Origin logger = logging.getLogger(__name__) def _fix_content(content: Dict[str, Any]) -> Dict[str, Any]: """Filters-out invalid 'perms' key that leaked from swh.model.from_disk to the journal. >>> _fix_content({'perms': 0o100644, 'sha1_git': b'foo'}) {'sha1_git': b'foo'} >>> _fix_content({'sha1_git': b'bar'}) {'sha1_git': b'bar'} """ content = content.copy() content.pop("perms", None) return content def _fix_revision_pypi_empty_string(rev): """PyPI loader failed to encode empty strings as bytes, see: swh:1:rev:8f0095ee0664867055d03de9bcc8f95b91d8a2b9 or https://forge.softwareheritage.org/D1772 """ rev = { **rev, "author": rev["author"].copy(), "committer": rev["committer"].copy(), } if rev["author"].get("email") == "": rev["author"]["email"] = b"" if rev["author"].get("name") == "": rev["author"]["name"] = b"" if rev["committer"].get("email") == "": rev["committer"]["email"] = b"" if rev["committer"].get("name") == "": rev["committer"]["name"] = b"" return rev def _fix_revision_transplant_source(rev): if rev.get("metadata") and rev["metadata"].get("extra_headers"): rev = copy.deepcopy(rev) rev["metadata"]["extra_headers"] = [ [key, value.encode("ascii")] if key == "transplant_source" and isinstance(value, str) else [key, value] for (key, value) in rev["metadata"]["extra_headers"] ] return rev def _check_date(date): """Returns whether the date can be represented in backends with sane limits on timestamps and timezones (resp. signed 64-bits and signed 16 bits), and that microseconds is valid (ie. between 0 and 10^6). """ if date is None: return True date = normalize_timestamp(date) return ( (-(2 ** 63) <= date["timestamp"]["seconds"] < 2 ** 63) and (0 <= date["timestamp"]["microseconds"] < 10 ** 6) and (-(2 ** 15) <= date["offset"] < 2 ** 15) ) def _check_revision_date(rev): """Exclude revisions with invalid dates. See https://forge.softwareheritage.org/T1339""" return _check_date(rev["date"]) and _check_date(rev["committer_date"]) def _fix_revision(revision: Dict[str, Any]) -> Optional[Dict]: """Fix various legacy revision issues. Fix author/committer person: >>> from pprint import pprint >>> date = { ... 'timestamp': { ... 'seconds': 1565096932, ... 'microseconds': 0, ... }, ... 'offset': 0, ... } >>> rev0 = _fix_revision({ ... 'id': b'rev-id', ... 'author': {'fullname': b'', 'name': '', 'email': ''}, ... 'committer': {'fullname': b'', 'name': '', 'email': ''}, ... 'date': date, ... 'committer_date': date, ... 'type': 'git', ... 'message': '', ... 'directory': b'dir-id', ... 'synthetic': False, ... }) >>> rev0['author'] {'fullname': b'', 'name': b'', 'email': b''} >>> rev0['committer'] {'fullname': b'', 'name': b'', 'email': b''} Fix type of 'transplant_source' extra headers: >>> rev1 = _fix_revision({ ... 'id': b'rev-id', ... 'author': {'fullname': b'', 'name': '', 'email': ''}, ... 'committer': {'fullname': b'', 'name': '', 'email': ''}, ... 'date': date, ... 'committer_date': date, ... 'metadata': { ... 'extra_headers': [ ... ['time_offset_seconds', b'-3600'], ... ['transplant_source', '29c154a012a70f49df983625090434587622b39e'] ... ]}, ... 'type': 'git', ... 'message': '', ... 'directory': b'dir-id', ... 'synthetic': False, ... }) >>> pprint(rev1['metadata']['extra_headers']) [['time_offset_seconds', b'-3600'], ['transplant_source', b'29c154a012a70f49df983625090434587622b39e']] Revision with invalid date are filtered: >>> from copy import deepcopy >>> invalid_date1 = deepcopy(date) >>> invalid_date1['timestamp']['microseconds'] = 1000000000 # > 10^6 >>> rev = _fix_revision({ ... 'author': {'fullname': b'', 'name': '', 'email': ''}, ... 'committer': {'fullname': b'', 'name': '', 'email': ''}, ... 'date': invalid_date1, ... 'committer_date': date, ... }) >>> rev is None True >>> invalid_date2 = deepcopy(date) >>> invalid_date2['timestamp']['seconds'] = 2**70 # > 10^63 >>> rev = _fix_revision({ ... 'author': {'fullname': b'', 'name': '', 'email': ''}, ... 'committer': {'fullname': b'', 'name': '', 'email': ''}, ... 'date': invalid_date2, ... 'committer_date': date, ... }) >>> rev is None True >>> invalid_date3 = deepcopy(date) >>> invalid_date3['offset'] = 2**20 # > 10^15 >>> rev = _fix_revision({ ... 'author': {'fullname': b'', 'name': '', 'email': ''}, ... 'committer': {'fullname': b'', 'name': '', 'email': ''}, ... 'date': date, ... 'committer_date': invalid_date3, ... }) >>> rev is None True """ # noqa rev = _fix_revision_pypi_empty_string(revision) rev = _fix_revision_transplant_source(rev) if not _check_revision_date(rev): logger.warning( "Invalid revision date detected: %(revision)s", {"revision": rev} ) return None return rev def _fix_origin(origin: Dict) -> Dict: """Fix legacy origin with type which is no longer part of the model. >>> from pprint import pprint >>> pprint(_fix_origin({ ... 'url': 'http://foo', ... })) {'url': 'http://foo'} >>> pprint(_fix_origin({ ... 'url': 'http://bar', ... 'type': 'foo', ... })) {'url': 'http://bar'} """ o = origin.copy() o.pop("type", None) return o def _fix_origin_visit(visit: Dict) -> Dict: """Fix various legacy origin visit issues. `visit['origin']` is a dict instead of an URL: >>> from datetime import datetime, timezone >>> from pprint import pprint >>> date = datetime(2020, 2, 27, 14, 39, 19, tzinfo=timezone.utc) >>> pprint(_fix_origin_visit({ ... 'origin': {'url': 'http://foo'}, ... 'date': date, ... 'type': 'git', ... 'status': 'ongoing', ... 'snapshot': None, ... })) {'date': datetime.datetime(2020, 2, 27, 14, 39, 19, tzinfo=datetime.timezone.utc), 'origin': 'http://foo', 'type': 'git'} `visit['type']` is missing , but `origin['visit']['type']` exists: >>> pprint(_fix_origin_visit( ... {'origin': {'type': 'hg', 'url': 'http://foo'}, ... 'date': date, ... 'status': 'ongoing', ... 'snapshot': None, ... })) {'date': datetime.datetime(2020, 2, 27, 14, 39, 19, tzinfo=datetime.timezone.utc), 'origin': 'http://foo', 'type': 'hg'} >>> pprint(_fix_origin_visit( ... {'origin': {'type': 'hg', 'url': 'http://foo'}, ... 'date': '2020-02-27 14:39:19+00:00', ... 'status': 'ongoing', ... 'snapshot': None, ... })) {'date': datetime.datetime(2020, 2, 27, 14, 39, 19, tzinfo=datetime.timezone.utc), 'origin': 'http://foo', 'type': 'hg'} Old visit format (origin_visit with no type) raises: >>> _fix_origin_visit({ ... 'origin': {'url': 'http://foo'}, ... 'date': date, ... 'status': 'ongoing', ... 'snapshot': None ... }) Traceback (most recent call last): ... ValueError: Old origin visit format detected... >>> _fix_origin_visit({ ... 'origin': 'http://foo', ... 'date': date, ... 'status': 'ongoing', ... 'snapshot': None ... }) Traceback (most recent call last): ... ValueError: Old origin visit format detected... """ # noqa visit = visit.copy() if "type" not in visit: if isinstance(visit["origin"], dict) and "type" in visit["origin"]: # Very old version of the schema: visits did not have a type, # but their 'origin' field was a dict with a 'type' key. visit["type"] = visit["origin"]["type"] else: # Very old schema version: 'type' is missing, stop early # We expect the journal's origin_visit topic to no longer reference # such visits. If it does, the replayer must crash so we can fix # the journal's topic. raise ValueError(f"Old origin visit format detected: {visit}") if isinstance(visit["origin"], dict): # Old version of the schema: visit['origin'] was a dict. visit["origin"] = visit["origin"]["url"] date = visit["date"] if isinstance(date, str): visit["date"] = datetime.datetime.fromisoformat(date) # Those are no longer part of the model for key in ["status", "snapshot", "metadata"]: visit.pop(key, None) return visit +def _fix_raw_extrinsic_metadata(obj_dict: Dict) -> Dict: + """Fix legacy RawExtrinsicMetadata with type which is no longer part of the model. + + >>> _fix_raw_extrinsic_metadata({ + ... 'type': 'directory', + ... 'target': 'swh:1:dir:460a586d1c95d120811eaadb398d534e019b5243', + ... }) + {'target': 'swh:1:dir:460a586d1c95d120811eaadb398d534e019b5243'} + >>> _fix_raw_extrinsic_metadata({ + ... 'type': 'origin', + ... 'target': 'https://inria.halpreprod.archives-ouvertes.fr/hal-01667309', + ... }) + {'target': 'swh:1:ori:155291d5b9ada4570672510509f93fcfd9809882'} + + """ + o = obj_dict.copy() + if o.pop("type", None) == "origin": + o["target"] = str(Origin(o["target"]).swhid()) + return o + + def fix_objects(object_type: str, objects: List[Dict]) -> List[Dict]: """ Fix legacy objects from the journal to bring them up to date with the latest storage schema. """ if object_type == "content": return [_fix_content(v) for v in objects] elif object_type == "revision": revisions = [_fix_revision(v) for v in objects] return [rev for rev in revisions if rev is not None] elif object_type == "origin": return [_fix_origin(v) for v in objects] elif object_type == "origin_visit": return [_fix_origin_visit(v) for v in objects] + elif object_type == "raw_extrinsic_metadata": + return [_fix_raw_extrinsic_metadata(v) for v in objects] else: return objects diff --git a/swh/storage/in_memory.py b/swh/storage/in_memory.py index d69b7340..b227394b 100644 --- a/swh/storage/in_memory.py +++ b/swh/storage/in_memory.py @@ -1,686 +1,687 @@ # Copyright (C) 2015-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict import datetime import functools import random from typing import ( Any, Dict, Generic, Iterable, Iterator, List, Optional, Tuple, Type, TypeVar, Union, ) from swh.model.identifiers import ExtendedSWHID from swh.model.model import Content, Sha1Git, SkippedContent from swh.storage.cassandra import CassandraStorage from swh.storage.cassandra.model import ( BaseRow, ContentRow, DirectoryEntryRow, DirectoryRow, + ExtIDByTargetRow, ExtIDRow, MetadataAuthorityRow, MetadataFetcherRow, ObjectCountRow, OriginRow, OriginVisitRow, OriginVisitStatusRow, RawExtrinsicMetadataRow, ReleaseRow, RevisionParentRow, RevisionRow, SkippedContentRow, SnapshotBranchRow, SnapshotRow, ) from swh.storage.interface import ListOrder from swh.storage.objstorage import ObjStorage from .common import origin_url_to_sha1 from .writer import JournalWriter TRow = TypeVar("TRow", bound=BaseRow) class Table(Generic[TRow]): def __init__(self, row_class: Type[TRow]): self.row_class = row_class self.primary_key_cols = row_class.PARTITION_KEY + row_class.CLUSTERING_KEY # Map from tokens to clustering keys to rows # These are not actually partitions (or rather, there is one partition # for each token) and they aren't sorted. # But it is good enough if we don't care about performance; # and makes the code a lot simpler. self.data: Dict[int, Dict[Tuple, TRow]] = defaultdict(dict) def __repr__(self): return f"<__module__.Table[{self.row_class.__name__}] object>" def partition_key(self, row: Union[TRow, Dict[str, Any]]) -> Tuple: """Returns the partition key of a row (ie. the cells which get hashed into the token.""" if isinstance(row, dict): row_d = row else: row_d = row.to_dict() return tuple(row_d[col] for col in self.row_class.PARTITION_KEY) def clustering_key(self, row: Union[TRow, Dict[str, Any]]) -> Tuple: """Returns the clustering key of a row (ie. the cells which are used for sorting rows within a partition.""" if isinstance(row, dict): row_d = row else: row_d = row.to_dict() return tuple(row_d[col] for col in self.row_class.CLUSTERING_KEY) def primary_key(self, row): return self.partition_key(row) + self.clustering_key(row) def primary_key_from_dict(self, d: Dict[str, Any]) -> Tuple: """Returns the primary key (ie. concatenation of partition key and clustering key) of the given dictionary interpreted as a row.""" return tuple(d[col] for col in self.primary_key_cols) def token(self, key: Tuple): """Returns the token of a row (ie. the hash of its partition key).""" return hash(key) def get_partition(self, token: int) -> Dict[Tuple, TRow]: """Returns the partition that contains this token.""" return self.data[token] def insert(self, row: TRow): partition = self.data[self.token(self.partition_key(row))] partition[self.clustering_key(row)] = row def split_primary_key(self, key: Tuple) -> Tuple[Tuple, Tuple]: """Returns (partition_key, clustering_key) from a partition key""" assert len(key) == len(self.primary_key_cols) partition_key = key[0 : len(self.row_class.PARTITION_KEY)] clustering_key = key[len(self.row_class.PARTITION_KEY) :] return (partition_key, clustering_key) def get_from_partition_key(self, partition_key: Tuple) -> Iterable[TRow]: """Returns at most one row, from its partition key.""" token = self.token(partition_key) for row in self.get_from_token(token): if self.partition_key(row) == partition_key: yield row def get_from_primary_key(self, primary_key: Tuple) -> Optional[TRow]: """Returns at most one row, from its primary key.""" (partition_key, clustering_key) = self.split_primary_key(primary_key) token = self.token(partition_key) partition = self.get_partition(token) return partition.get(clustering_key) def get_from_token(self, token: int) -> Iterable[TRow]: """Returns all rows whose token (ie. non-cryptographic hash of the partition key) is the one passed as argument.""" return (v for (k, v) in sorted(self.get_partition(token).items())) def iter_all(self) -> Iterator[Tuple[Tuple, TRow]]: return ( (self.primary_key(row), row) for (token, partition) in self.data.items() for (clustering_key, row) in partition.items() ) def get_random(self) -> Optional[TRow]: return random.choice([row for (pk, row) in self.iter_all()]) class InMemoryCqlRunner: def __init__(self): self._contents = Table(ContentRow) self._content_indexes = defaultdict(lambda: defaultdict(set)) self._skipped_contents = Table(ContentRow) self._skipped_content_indexes = defaultdict(lambda: defaultdict(set)) self._directories = Table(DirectoryRow) self._directory_entries = Table(DirectoryEntryRow) self._revisions = Table(RevisionRow) self._revision_parents = Table(RevisionParentRow) self._releases = Table(ReleaseRow) self._snapshots = Table(SnapshotRow) self._snapshot_branches = Table(SnapshotBranchRow) self._origins = Table(OriginRow) self._origin_visits = Table(OriginVisitRow) self._origin_visit_statuses = Table(OriginVisitStatusRow) self._metadata_authorities = Table(MetadataAuthorityRow) self._metadata_fetchers = Table(MetadataFetcherRow) self._raw_extrinsic_metadata = Table(RawExtrinsicMetadataRow) self._extid = Table(ExtIDRow) self._stat_counters = defaultdict(int) def increment_counter(self, object_type: str, nb: int): self._stat_counters[object_type] += nb def stat_counters(self) -> Iterable[ObjectCountRow]: for (object_type, count) in self._stat_counters.items(): yield ObjectCountRow(partition_key=0, object_type=object_type, count=count) ########################## # 'content' table ########################## def _content_add_finalize(self, content: ContentRow) -> None: self._contents.insert(content) self.increment_counter("content", 1) def content_add_prepare(self, content: ContentRow): finalizer = functools.partial(self._content_add_finalize, content) return (self._contents.token(self._contents.partition_key(content)), finalizer) def content_get_from_pk( self, content_hashes: Dict[str, bytes] ) -> Optional[ContentRow]: primary_key = self._contents.primary_key_from_dict(content_hashes) return self._contents.get_from_primary_key(primary_key) def content_get_from_token(self, token: int) -> Iterable[ContentRow]: return self._contents.get_from_token(token) def content_get_random(self) -> Optional[ContentRow]: return self._contents.get_random() def content_get_token_range( self, start: int, end: int, limit: int, ) -> Iterable[Tuple[int, ContentRow]]: matches = [ (token, row) for (token, partition) in self._contents.data.items() for (clustering_key, row) in partition.items() if start <= token <= end ] matches.sort() return matches[0:limit] ########################## # 'content_by_*' tables ########################## def content_missing_by_sha1_git(self, ids: List[bytes]) -> List[bytes]: missing = [] for id_ in ids: if id_ not in self._content_indexes["sha1_git"]: missing.append(id_) return missing def content_index_add_one(self, algo: str, content: Content, token: int) -> None: self._content_indexes[algo][content.get_hash(algo)].add(token) def content_get_tokens_from_single_hash( self, algo: str, hash_: bytes ) -> Iterable[int]: return self._content_indexes[algo][hash_] ########################## # 'skipped_content' table ########################## def _skipped_content_add_finalize(self, content: SkippedContentRow) -> None: self._skipped_contents.insert(content) self.increment_counter("skipped_content", 1) def skipped_content_add_prepare(self, content: SkippedContentRow): finalizer = functools.partial(self._skipped_content_add_finalize, content) return ( self._skipped_contents.token(self._contents.partition_key(content)), finalizer, ) def skipped_content_get_from_pk( self, content_hashes: Dict[str, bytes] ) -> Optional[SkippedContentRow]: primary_key = self._skipped_contents.primary_key_from_dict(content_hashes) return self._skipped_contents.get_from_primary_key(primary_key) def skipped_content_get_from_token(self, token: int) -> Iterable[SkippedContentRow]: return self._skipped_contents.get_from_token(token) ########################## # 'skipped_content_by_*' tables ########################## def skipped_content_index_add_one( self, algo: str, content: SkippedContent, token: int ) -> None: self._skipped_content_indexes[algo][content.get_hash(algo)].add(token) def skipped_content_get_tokens_from_single_hash( self, algo: str, hash_: bytes ) -> Iterable[int]: return self._skipped_content_indexes[algo][hash_] ########################## # 'directory' table ########################## def directory_missing(self, ids: List[bytes]) -> List[bytes]: missing = [] for id_ in ids: if self._directories.get_from_primary_key((id_,)) is None: missing.append(id_) return missing def directory_add_one(self, directory: DirectoryRow) -> None: self._directories.insert(directory) self.increment_counter("directory", 1) def directory_get_random(self) -> Optional[DirectoryRow]: return self._directories.get_random() ########################## # 'directory_entry' table ########################## def directory_entry_add_one(self, entry: DirectoryEntryRow) -> None: self._directory_entries.insert(entry) def directory_entry_get( self, directory_ids: List[Sha1Git] ) -> Iterable[DirectoryEntryRow]: for id_ in directory_ids: yield from self._directory_entries.get_from_partition_key((id_,)) ########################## # 'revision' table ########################## def revision_missing(self, ids: List[bytes]) -> Iterable[bytes]: missing = [] for id_ in ids: if self._revisions.get_from_primary_key((id_,)) is None: missing.append(id_) return missing def revision_add_one(self, revision: RevisionRow) -> None: self._revisions.insert(revision) self.increment_counter("revision", 1) def revision_get_ids(self, revision_ids) -> Iterable[int]: for id_ in revision_ids: if self._revisions.get_from_primary_key((id_,)) is not None: yield id_ def revision_get(self, revision_ids: List[Sha1Git]) -> Iterable[RevisionRow]: for id_ in revision_ids: row = self._revisions.get_from_primary_key((id_,)) if row: yield row def revision_get_random(self) -> Optional[RevisionRow]: return self._revisions.get_random() ########################## # 'revision_parent' table ########################## def revision_parent_add_one(self, revision_parent: RevisionParentRow) -> None: self._revision_parents.insert(revision_parent) def revision_parent_get(self, revision_id: Sha1Git) -> Iterable[bytes]: for parent in self._revision_parents.get_from_partition_key((revision_id,)): yield parent.parent_id ########################## # 'release' table ########################## def release_missing(self, ids: List[bytes]) -> List[bytes]: missing = [] for id_ in ids: if self._releases.get_from_primary_key((id_,)) is None: missing.append(id_) return missing def release_add_one(self, release: ReleaseRow) -> None: self._releases.insert(release) self.increment_counter("release", 1) def release_get(self, release_ids: List[str]) -> Iterable[ReleaseRow]: for id_ in release_ids: row = self._releases.get_from_primary_key((id_,)) if row: yield row def release_get_random(self) -> Optional[ReleaseRow]: return self._releases.get_random() ########################## # 'snapshot' table ########################## def snapshot_missing(self, ids: List[bytes]) -> List[bytes]: missing = [] for id_ in ids: if self._snapshots.get_from_primary_key((id_,)) is None: missing.append(id_) return missing def snapshot_add_one(self, snapshot: SnapshotRow) -> None: self._snapshots.insert(snapshot) self.increment_counter("snapshot", 1) def snapshot_get_random(self) -> Optional[SnapshotRow]: return self._snapshots.get_random() ########################## # 'snapshot_branch' table ########################## def snapshot_branch_add_one(self, branch: SnapshotBranchRow) -> None: self._snapshot_branches.insert(branch) def snapshot_count_branches( self, snapshot_id: Sha1Git, branch_name_exclude_prefix: Optional[bytes] = None, ) -> Dict[Optional[str], int]: """Returns a dictionary from type names to the number of branches of that type.""" counts: Dict[Optional[str], int] = defaultdict(int) for branch in self._snapshot_branches.get_from_partition_key((snapshot_id,)): if branch_name_exclude_prefix and branch.name.startswith( branch_name_exclude_prefix ): continue if branch.target_type is None: target_type = None else: target_type = branch.target_type counts[target_type] += 1 return counts def snapshot_branch_get( self, snapshot_id: Sha1Git, from_: bytes, limit: int, branch_name_exclude_prefix: Optional[bytes] = None, ) -> Iterable[SnapshotBranchRow]: count = 0 for branch in self._snapshot_branches.get_from_partition_key((snapshot_id,)): prefix = branch_name_exclude_prefix if branch.name >= from_ and ( prefix is None or not branch.name.startswith(prefix) ): count += 1 yield branch if count >= limit: break ########################## # 'origin' table ########################## def origin_add_one(self, origin: OriginRow) -> None: self._origins.insert(origin) self.increment_counter("origin", 1) def origin_get_by_sha1(self, sha1: bytes) -> Iterable[OriginRow]: return self._origins.get_from_partition_key((sha1,)) def origin_get_by_url(self, url: str) -> Iterable[OriginRow]: return self.origin_get_by_sha1(origin_url_to_sha1(url)) def origin_list( self, start_token: int, limit: int ) -> Iterable[Tuple[int, OriginRow]]: """Returns an iterable of (token, origin)""" matches = [ (token, row) for (token, partition) in self._origins.data.items() for (clustering_key, row) in partition.items() if token >= start_token ] matches.sort() return matches[0:limit] def origin_iter_all(self) -> Iterable[OriginRow]: return ( row for (token, partition) in self._origins.data.items() for (clustering_key, row) in partition.items() ) def origin_generate_unique_visit_id(self, origin_url: str) -> int: origin = list(self.origin_get_by_url(origin_url))[0] visit_id = origin.next_visit_id origin.next_visit_id += 1 return visit_id ########################## # 'origin_visit' table ########################## def origin_visit_get( self, origin_url: str, last_visit: Optional[int], limit: int, order: ListOrder, ) -> Iterable[OriginVisitRow]: visits = list(self._origin_visits.get_from_partition_key((origin_url,))) if last_visit is not None: if order == ListOrder.ASC: visits = [v for v in visits if v.visit > last_visit] else: visits = [v for v in visits if v.visit < last_visit] visits.sort(key=lambda v: v.visit, reverse=order == ListOrder.DESC) visits = visits[0:limit] return visits def origin_visit_add_one(self, visit: OriginVisitRow) -> None: self._origin_visits.insert(visit) self.increment_counter("origin_visit", 1) def origin_visit_get_one( self, origin_url: str, visit_id: int ) -> Optional[OriginVisitRow]: return self._origin_visits.get_from_primary_key((origin_url, visit_id)) def origin_visit_get_all(self, origin_url: str) -> Iterable[OriginVisitRow]: return self._origin_visits.get_from_partition_key((origin_url,)) def origin_visit_iter(self, start_token: int) -> Iterator[OriginVisitRow]: """Returns all origin visits in order from this token, and wraps around the token space.""" return ( row for (token, partition) in self._origin_visits.data.items() for (clustering_key, row) in partition.items() ) ########################## # 'origin_visit_status' table ########################## def origin_visit_status_get_range( self, origin: str, visit: int, date_from: Optional[datetime.datetime], limit: int, order: ListOrder, ) -> Iterable[OriginVisitStatusRow]: statuses = list(self.origin_visit_status_get(origin, visit)) if date_from is not None: if order == ListOrder.ASC: statuses = [s for s in statuses if s.date >= date_from] else: statuses = [s for s in statuses if s.date <= date_from] statuses.sort(key=lambda s: s.date, reverse=order == ListOrder.DESC) return statuses[0:limit] def origin_visit_status_add_one(self, visit_update: OriginVisitStatusRow) -> None: self._origin_visit_statuses.insert(visit_update) self.increment_counter("origin_visit_status", 1) def origin_visit_status_get_latest( self, origin: str, visit: int, ) -> Optional[OriginVisitStatusRow]: """Given an origin visit id, return its latest origin_visit_status """ return next(self.origin_visit_status_get(origin, visit), None) def origin_visit_status_get( self, origin: str, visit: int, ) -> Iterator[OriginVisitStatusRow]: """Return all origin visit statuses for a given visit """ statuses = [ s for s in self._origin_visit_statuses.get_from_partition_key((origin,)) if s.visit == visit ] statuses.sort(key=lambda s: s.date, reverse=True) return iter(statuses) ########################## # 'metadata_authority' table ########################## def metadata_authority_add(self, authority: MetadataAuthorityRow): self._metadata_authorities.insert(authority) self.increment_counter("metadata_authority", 1) def metadata_authority_get(self, type, url) -> Optional[MetadataAuthorityRow]: return self._metadata_authorities.get_from_primary_key((url, type)) ########################## # 'metadata_fetcher' table ########################## def metadata_fetcher_add(self, fetcher: MetadataFetcherRow): self._metadata_fetchers.insert(fetcher) self.increment_counter("metadata_fetcher", 1) def metadata_fetcher_get(self, name, version) -> Optional[MetadataAuthorityRow]: return self._metadata_fetchers.get_from_primary_key((name, version)) ######################### # 'raw_extrinsic_metadata' table ######################### def raw_extrinsic_metadata_add(self, raw_extrinsic_metadata): self._raw_extrinsic_metadata.insert(raw_extrinsic_metadata) self.increment_counter("raw_extrinsic_metadata", 1) def raw_extrinsic_metadata_get_after_date( self, target: str, authority_type: str, authority_url: str, after: datetime.datetime, ) -> Iterable[RawExtrinsicMetadataRow]: metadata = self.raw_extrinsic_metadata_get( target, authority_type, authority_url ) return (m for m in metadata if m.discovery_date > after) def raw_extrinsic_metadata_get_after_date_and_id( self, target: str, authority_type: str, authority_url: str, after_date: datetime.datetime, after_id: bytes, ) -> Iterable[RawExtrinsicMetadataRow]: metadata = self._raw_extrinsic_metadata.get_from_partition_key((target,)) after_tuple = (after_date, after_id) return ( m for m in metadata if m.authority_type == authority_type and m.authority_url == authority_url and (m.discovery_date, m.id) > after_tuple ) def raw_extrinsic_metadata_get( self, target: str, authority_type: str, authority_url: str ) -> Iterable[RawExtrinsicMetadataRow]: metadata = self._raw_extrinsic_metadata.get_from_partition_key((target,)) return ( m for m in metadata if m.authority_type == authority_type and m.authority_url == authority_url ) ######################### # 'extid' table ######################### def _extid_add_finalize(self, extid: ExtIDRow) -> None: self._extid.insert(extid) self.increment_counter("extid", 1) def extid_add_prepare(self, extid: ExtIDRow): finalizer = functools.partial(self._extid_add_finalize, extid) return (self._extid.token(self._extid.partition_key(extid)), finalizer) - def extid_index_add_one(self, extid: ExtIDRow, token: int) -> None: + def extid_index_add_one(self, row: ExtIDByTargetRow) -> None: pass def extid_get_from_pk( self, extid_type: str, extid: bytes, target: ExtendedSWHID, ) -> Optional[ExtIDRow]: primary_key = self._extid.primary_key_from_dict( dict( extid_type=extid_type, extid=extid, target_type=target.object_type.value, target=target.object_id, ) ) return self._extid.get_from_primary_key(primary_key) def extid_get_from_extid(self, extid_type: str, extid: bytes) -> Iterable[ExtIDRow]: return ( row for pk, row in self._extid.iter_all() if row.extid_type == extid_type and row.extid == extid ) def extid_get_from_target( self, target_type: str, target: bytes ) -> Iterable[ExtIDRow]: return ( row for pk, row in self._extid.iter_all() if row.target_type == target_type and row.target == target ) class InMemoryStorage(CassandraStorage): _cql_runner: InMemoryCqlRunner # type: ignore def __init__(self, journal_writer=None): self.reset() self.journal_writer = JournalWriter(journal_writer) def reset(self): self._cql_runner = InMemoryCqlRunner() self.objstorage = ObjStorage({"cls": "memory"}) def check_config(self, *, check_write: bool) -> bool: return True diff --git a/swh/storage/tests/storage_data.py b/swh/storage/tests/storage_data.py index eecb8608..12ad3a38 100644 --- a/swh/storage/tests/storage_data.py +++ b/swh/storage/tests/storage_data.py @@ -1,711 +1,713 @@ # Copyright (C) 2015-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from typing import Tuple import attr from swh.model import from_disk from swh.model.hashutil import hash_to_bytes from swh.model.identifiers import CoreSWHID, ExtendedObjectType, ExtendedSWHID from swh.model.identifiers import ObjectType as SwhidObjectType from swh.model.model import ( Content, Directory, DirectoryEntry, ExtID, MetadataAuthority, MetadataAuthorityType, MetadataFetcher, ObjectType, Origin, OriginVisit, Person, RawExtrinsicMetadata, Release, Revision, RevisionType, SkippedContent, Snapshot, SnapshotBranch, TargetType, Timestamp, TimestampWithTimezone, ) class StorageData: """Data model objects to use within tests. """ content = Content( data=b"42\n", length=3, sha1=hash_to_bytes("34973274ccef6ab4dfaaf86599792fa9c3fe4689"), sha1_git=hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), sha256=hash_to_bytes( "084c799cd551dd1d8d5c5f9a5d593b2e931f5e36122ee5c793c1d08a19839cc0" ), blake2s256=hash_to_bytes( "d5fe1939576527e42cfd76a9455a2432fe7f56669564577dd93c4280e76d661d" ), status="visible", ) content2 = Content( data=b"4242\n", length=5, sha1=hash_to_bytes("61c2b3a30496d329e21af70dd2d7e097046d07b7"), sha1_git=hash_to_bytes("36fade77193cb6d2bd826161a0979d64c28ab4fa"), sha256=hash_to_bytes( "859f0b154fdb2d630f45e1ecae4a862915435e663248bb8461d914696fc047cd" ), blake2s256=hash_to_bytes( "849c20fad132b7c2d62c15de310adfe87be94a379941bed295e8141c6219810d" ), status="visible", ) content3 = Content( data=b"424242\n", length=7, sha1=hash_to_bytes("3e21cc4942a4234c9e5edd8a9cacd1670fe59f13"), sha1_git=hash_to_bytes("c932c7649c6dfa4b82327d121215116909eb3bea"), sha256=hash_to_bytes( "92fb72daf8c6818288a35137b72155f507e5de8d892712ab96277aaed8cf8a36" ), blake2s256=hash_to_bytes( "76d0346f44e5a27f6bafdd9c2befd304aff83780f93121d801ab6a1d4769db11" ), status="visible", ctime=datetime.datetime(2019, 12, 1, tzinfo=datetime.timezone.utc), ) contents: Tuple[Content, ...] = (content, content2, content3) skipped_content = SkippedContent( length=1024 * 1024 * 200, sha1_git=hash_to_bytes("33e45d56f88993aae6a0198013efa80716fd8920"), sha1=hash_to_bytes("43e45d56f88993aae6a0198013efa80716fd8920"), sha256=hash_to_bytes( "7bbd052ab054ef222c1c87be60cd191addedd24cc882d1f5f7f7be61dc61bb3a" ), blake2s256=hash_to_bytes( "ade18b1adecb33f891ca36664da676e12c772cc193778aac9a137b8dc5834b9b" ), reason="Content too long", status="absent", origin="file:///dev/zero", ) skipped_content2 = SkippedContent( length=1024 * 1024 * 300, sha1_git=hash_to_bytes("44e45d56f88993aae6a0198013efa80716fd8921"), sha1=hash_to_bytes("54e45d56f88993aae6a0198013efa80716fd8920"), sha256=hash_to_bytes( "8cbd052ab054ef222c1c87be60cd191addedd24cc882d1f5f7f7be61dc61bb3a" ), blake2s256=hash_to_bytes( "9ce18b1adecb33f891ca36664da676e12c772cc193778aac9a137b8dc5834b9b" ), reason="Content too long", status="absent", ) skipped_contents: Tuple[SkippedContent, ...] = (skipped_content, skipped_content2) - directory5 = Directory(entries=()) + directory5 = Directory( + id=hash_to_bytes("4b825dc642cb6eb9a060e54bf8d69288fbee4904"), entries=(), + ) directory = Directory( id=hash_to_bytes("5256e856a0a0898966d6ba14feb4388b8b82d302"), entries=tuple( [ DirectoryEntry( name=b"foo", type="file", target=content.sha1_git, perms=from_disk.DentryPerms.content, ), DirectoryEntry( name=b"bar\xc3", type="dir", target=directory5.id, perms=from_disk.DentryPerms.directory, ), ], ), ) directory2 = Directory( id=hash_to_bytes("8505808532953da7d2581741f01b29c04b1cb9ab"), entries=tuple( [ DirectoryEntry( name=b"oof", type="file", target=content2.sha1_git, perms=from_disk.DentryPerms.content, ) ], ), ) directory3 = Directory( - id=hash_to_bytes("4ea8c6b2f54445e5dd1a9d5bb2afd875d66f3150"), + id=hash_to_bytes("13089e6e544f78df7c9a40a3059050d10dee686a"), entries=tuple( [ DirectoryEntry( name=b"foo", type="file", target=content.sha1_git, perms=from_disk.DentryPerms.content, ), DirectoryEntry( name=b"subdir", type="dir", target=directory.id, perms=from_disk.DentryPerms.directory, ), DirectoryEntry( name=b"hello", type="file", target=content2.sha1_git, perms=from_disk.DentryPerms.content, ), ], ), ) directory4 = Directory( - id=hash_to_bytes("377aa5fcd944fbabf502dbfda55cd14d33c8c3c6"), + id=hash_to_bytes("cd5dfd9c09d9e99ed123bc7937a0d5fddc3cd531"), entries=tuple( [ DirectoryEntry( name=b"subdir1", type="dir", target=directory3.id, perms=from_disk.DentryPerms.directory, ) ], ), ) directories: Tuple[Directory, ...] = ( directory2, directory, directory3, directory4, directory5, ) revision = Revision( id=hash_to_bytes("01a7114f36fddd5ef2511b2cadda237a68adbb12"), message=b"hello", author=Person( name=b"Nicolas Dandrimont", email=b"nicolas@example.com", fullname=b"Nicolas Dandrimont ", ), date=TimestampWithTimezone( timestamp=Timestamp(seconds=1234567890, microseconds=0), offset=120, negative_utc=False, ), committer=Person( name=b"St\xc3fano Zacchiroli", email=b"stefano@example.com", fullname=b"St\xc3fano Zacchiroli ", ), committer_date=TimestampWithTimezone( timestamp=Timestamp(seconds=1123456789, microseconds=0), offset=120, negative_utc=False, ), parents=(), type=RevisionType.GIT, directory=directory.id, metadata={ "checksums": {"sha1": "tarball-sha1", "sha256": "tarball-sha256",}, "signed-off-by": "some-dude", }, extra_headers=( (b"gpgsig", b"test123"), (b"mergetag", b"foo\\bar"), (b"mergetag", b"\x22\xaf\x89\x80\x01\x00"), ), synthetic=True, ) revision2 = Revision( - id=hash_to_bytes("df7a6f6a99671fb7f7343641aff983a314ef6161"), + id=hash_to_bytes("a646dd94c912829659b22a1e7e143d2fa5ebde1b"), message=b"hello again", author=Person( name=b"Roberto Dicosmo", email=b"roberto@example.com", fullname=b"Roberto Dicosmo ", ), date=TimestampWithTimezone( timestamp=Timestamp(seconds=1234567843, microseconds=220000,), offset=-720, negative_utc=False, ), committer=Person( name=b"tony", email=b"ar@dumont.fr", fullname=b"tony ", ), committer_date=TimestampWithTimezone( timestamp=Timestamp(seconds=1123456789, microseconds=220000,), offset=0, negative_utc=False, ), parents=tuple([revision.id]), type=RevisionType.GIT, directory=directory2.id, metadata=None, extra_headers=(), synthetic=False, ) revision3 = Revision( - id=hash_to_bytes("2cbd7bb22c653bbb23a29657852a50a01b591d46"), + id=hash_to_bytes("beb2844dff30658e27573cb46eb55980e974b391"), message=b"a simple revision with no parents this time", author=Person( name=b"Roberto Dicosmo", email=b"roberto@example.com", fullname=b"Roberto Dicosmo ", ), date=TimestampWithTimezone( timestamp=Timestamp(seconds=1234567843, microseconds=220000,), offset=-720, negative_utc=False, ), committer=Person( name=b"tony", email=b"ar@dumont.fr", fullname=b"tony ", ), committer_date=TimestampWithTimezone( timestamp=Timestamp(seconds=1127351742, microseconds=220000,), offset=0, negative_utc=False, ), parents=tuple([revision.id, revision2.id]), type=RevisionType.GIT, directory=directory2.id, metadata=None, extra_headers=(), synthetic=True, ) revision4 = Revision( - id=hash_to_bytes("88cd5126fc958ed70089d5340441a1c2477bcc20"), + id=hash_to_bytes("ae860aec43700c7f5a295e2ef47e2ae41b535dfe"), message=b"parent of self.revision2", author=Person( name=b"me", email=b"me@soft.heri", fullname=b"me ", ), date=TimestampWithTimezone( timestamp=Timestamp(seconds=1234567843, microseconds=220000,), offset=-720, negative_utc=False, ), committer=Person( name=b"committer-dude", email=b"committer@dude.com", fullname=b"committer-dude ", ), committer_date=TimestampWithTimezone( timestamp=Timestamp(seconds=1244567843, microseconds=220000,), offset=-720, negative_utc=False, ), parents=tuple([revision3.id]), type=RevisionType.GIT, directory=directory.id, metadata=None, extra_headers=(), synthetic=False, ) git_revisions: Tuple[Revision, ...] = (revision, revision2, revision3, revision4) hg_revision = Revision( id=hash_to_bytes("951c9503541e7beaf002d7aebf2abd1629084c68"), message=b"hello", author=Person( name=b"Nicolas Dandrimont", email=b"nicolas@example.com", fullname=b"Nicolas Dandrimont ", ), date=TimestampWithTimezone( timestamp=Timestamp(seconds=1234567890, microseconds=0), offset=120, negative_utc=False, ), committer=Person( name=b"St\xc3fano Zacchiroli", email=b"stefano@example.com", fullname=b"St\xc3fano Zacchiroli ", ), committer_date=TimestampWithTimezone( timestamp=Timestamp(seconds=1123456789, microseconds=0), offset=120, negative_utc=False, ), parents=(), type=RevisionType.MERCURIAL, directory=directory.id, metadata={ "checksums": {"sha1": "tarball-sha1", "sha256": "tarball-sha256",}, "signed-off-by": "some-dude", "node": "a316dfb434af2b451c1f393496b7eaeda343f543", }, extra_headers=(), synthetic=True, ) hg_revision2 = Revision( id=hash_to_bytes("df4afb063236300eb13b96a0d7fff03f7b7cbbaf"), message=b"hello again", author=Person( name=b"Roberto Dicosmo", email=b"roberto@example.com", fullname=b"Roberto Dicosmo ", ), date=TimestampWithTimezone( timestamp=Timestamp(seconds=1234567843, microseconds=220000,), offset=-720, negative_utc=False, ), committer=Person( name=b"tony", email=b"ar@dumont.fr", fullname=b"tony ", ), committer_date=TimestampWithTimezone( timestamp=Timestamp(seconds=1123456789, microseconds=220000,), offset=0, negative_utc=False, ), parents=tuple([hg_revision.id]), type=RevisionType.MERCURIAL, directory=directory2.id, metadata=None, extra_headers=( (b"node", hash_to_bytes("fa1b7c84a9b40605b67653700f268349a6d6aca1")), ), synthetic=False, ) hg_revision3 = Revision( id=hash_to_bytes("84d8e7081b47ebb88cad9fa1f25de5f330872a37"), message=b"a simple revision with no parents this time", author=Person( name=b"Roberto Dicosmo", email=b"roberto@example.com", fullname=b"Roberto Dicosmo ", ), date=TimestampWithTimezone( timestamp=Timestamp(seconds=1234567843, microseconds=220000,), offset=-720, negative_utc=False, ), committer=Person( name=b"tony", email=b"ar@dumont.fr", fullname=b"tony ", ), committer_date=TimestampWithTimezone( timestamp=Timestamp(seconds=1127351742, microseconds=220000,), offset=0, negative_utc=False, ), parents=tuple([hg_revision.id, hg_revision2.id]), type=RevisionType.MERCURIAL, directory=directory2.id, metadata=None, extra_headers=( (b"node", hash_to_bytes("7f294a01c49065a90b3fe8b4ad49f08ce9656ef6")), ), synthetic=True, ) hg_revision4 = Revision( - id=hash_to_bytes("42070a39e5387e9b99bb3d83674e3a4a1ff39b69"), + id=hash_to_bytes("4683324ba26dfe941a72cc7552e86eaaf7c27fe3"), message=b"parent of self.revision2", author=Person( name=b"me", email=b"me@soft.heri", fullname=b"me ", ), date=TimestampWithTimezone( timestamp=Timestamp(seconds=1234567843, microseconds=220000,), offset=-720, negative_utc=False, ), committer=Person( name=b"committer-dude", email=b"committer@dude.com", fullname=b"committer-dude ", ), committer_date=TimestampWithTimezone( timestamp=Timestamp(seconds=1244567843, microseconds=220000,), offset=-720, negative_utc=False, ), parents=tuple([hg_revision3.id]), type=RevisionType.MERCURIAL, directory=directory.id, metadata=None, extra_headers=( (b"node", hash_to_bytes("f4160af0485c85823d9e829bae2c00b00a2e6297")), ), synthetic=False, ) hg_revisions: Tuple[Revision, ...] = ( hg_revision, hg_revision2, hg_revision3, hg_revision4, ) revisions: Tuple[Revision, ...] = git_revisions + hg_revisions origins: Tuple[Origin, ...] = ( Origin(url="https://github.com/user1/repo1"), Origin(url="https://github.com/user2/repo1"), Origin(url="https://github.com/user3/repo1"), Origin(url="https://gitlab.com/user1/repo1"), Origin(url="https://gitlab.com/user2/repo1"), Origin(url="https://forge.softwareheritage.org/source/repo1"), ) origin, origin2 = origins[:2] metadata_authority = MetadataAuthority( type=MetadataAuthorityType.DEPOSIT_CLIENT, url="http://hal.inria.example.com/", metadata={"location": "France"}, ) metadata_authority2 = MetadataAuthority( type=MetadataAuthorityType.REGISTRY, url="http://wikidata.example.com/", metadata={}, ) authorities: Tuple[MetadataAuthority, ...] = ( metadata_authority, metadata_authority2, ) metadata_fetcher = MetadataFetcher( name="swh-deposit", version="0.0.1", metadata={"sword_version": "2"}, ) metadata_fetcher2 = MetadataFetcher( name="swh-example", version="0.0.1", metadata={}, ) fetchers: Tuple[MetadataFetcher, ...] = (metadata_fetcher, metadata_fetcher2) date_visit1 = datetime.datetime(2015, 1, 1, 23, 0, 0, tzinfo=datetime.timezone.utc) date_visit2 = datetime.datetime(2017, 1, 1, 23, 0, 0, tzinfo=datetime.timezone.utc) date_visit3 = datetime.datetime(2018, 1, 1, 23, 0, 0, tzinfo=datetime.timezone.utc) type_visit1 = "git" type_visit2 = "hg" type_visit3 = "deb" origin_visit = OriginVisit( origin=origin.url, visit=1, date=date_visit1, type=type_visit1, ) origin_visit2 = OriginVisit( origin=origin.url, visit=2, date=date_visit2, type=type_visit1, ) origin_visit3 = OriginVisit( origin=origin2.url, visit=1, date=date_visit1, type=type_visit2, ) origin_visits: Tuple[OriginVisit, ...] = ( origin_visit, origin_visit2, origin_visit3, ) release = Release( id=hash_to_bytes("f7f222093a18ec60d781070abec4a630c850b837"), name=b"v0.0.1", author=Person( name=b"olasd", email=b"nic@olasd.fr", fullname=b"olasd ", ), date=TimestampWithTimezone( timestamp=Timestamp(seconds=1234567890, microseconds=0), offset=42, negative_utc=False, ), target=revision.id, target_type=ObjectType.REVISION, message=b"synthetic release", synthetic=True, ) release2 = Release( - id=hash_to_bytes("6902bd4c82b7d19a421d224aedab2b74197e420d"), + id=hash_to_bytes("db81a26783a3f4a9db07b4759ffc37621f159bb2"), name=b"v0.0.2", author=Person( name=b"tony", email=b"ar@dumont.fr", fullname=b"tony ", ), date=TimestampWithTimezone( timestamp=Timestamp(seconds=1634366813, microseconds=0), offset=-120, negative_utc=False, ), target=revision2.id, target_type=ObjectType.REVISION, message=b"v0.0.2\nMisc performance improvements + bug fixes", synthetic=False, ) release3 = Release( - id=hash_to_bytes("3e9050196aa288264f2a9d279d6abab8b158448b"), + id=hash_to_bytes("1c5d42e603ce2eea44917fadca76c78bad76aeb9"), name=b"v0.0.2", author=Person( name=b"tony", email=b"tony@ardumont.fr", fullname=b"tony ", ), date=TimestampWithTimezone( timestamp=Timestamp(seconds=1634366813, microseconds=0), offset=-120, negative_utc=False, ), target=revision3.id, target_type=ObjectType.REVISION, message=b"yet another synthetic release", synthetic=True, ) releases: Tuple[Release, ...] = (release, release2, release3) snapshot = Snapshot( id=hash_to_bytes("9b922e6d8d5b803c1582aabe5525b7b91150788e"), branches={ b"master": SnapshotBranch( target=revision.id, target_type=TargetType.REVISION, ), }, ) empty_snapshot = Snapshot( id=hash_to_bytes("1a8893e6a86f444e8be8e7bda6cb34fb1735a00e"), branches={}, ) complete_snapshot = Snapshot( - id=hash_to_bytes("a56ce2d81c190023bb99a3a36279307522cb85f6"), + id=hash_to_bytes("db99fda25b43dc5cd90625ee4b0744751799c917"), branches={ b"directory": SnapshotBranch( target=directory.id, target_type=TargetType.DIRECTORY, ), b"directory2": SnapshotBranch( target=directory2.id, target_type=TargetType.DIRECTORY, ), b"content": SnapshotBranch( target=content.sha1_git, target_type=TargetType.CONTENT, ), b"alias": SnapshotBranch(target=b"revision", target_type=TargetType.ALIAS,), b"revision": SnapshotBranch( target=revision.id, target_type=TargetType.REVISION, ), b"release": SnapshotBranch( target=release.id, target_type=TargetType.RELEASE, ), b"snapshot": SnapshotBranch( target=empty_snapshot.id, target_type=TargetType.SNAPSHOT, ), b"dangling": None, }, ) snapshots: Tuple[Snapshot, ...] = (snapshot, empty_snapshot, complete_snapshot) content_metadata1 = RawExtrinsicMetadata( target=ExtendedSWHID( object_type=ExtendedObjectType.CONTENT, object_id=content.sha1_git ), origin=origin.url, discovery_date=datetime.datetime( 2015, 1, 1, 21, 0, 0, tzinfo=datetime.timezone.utc ), authority=attr.evolve(metadata_authority, metadata=None), fetcher=attr.evolve(metadata_fetcher, metadata=None), format="json", metadata=b'{"foo": "bar"}', ) content_metadata2 = RawExtrinsicMetadata( target=ExtendedSWHID( object_type=ExtendedObjectType.CONTENT, object_id=content.sha1_git ), origin=origin2.url, discovery_date=datetime.datetime( 2017, 1, 1, 22, 0, 0, tzinfo=datetime.timezone.utc ), authority=attr.evolve(metadata_authority, metadata=None), fetcher=attr.evolve(metadata_fetcher, metadata=None), format="yaml", metadata=b"foo: bar", ) content_metadata3 = RawExtrinsicMetadata( target=ExtendedSWHID( object_type=ExtendedObjectType.CONTENT, object_id=content.sha1_git ), discovery_date=datetime.datetime( 2017, 1, 1, 22, 0, 0, tzinfo=datetime.timezone.utc ), authority=attr.evolve(metadata_authority2, metadata=None), fetcher=attr.evolve(metadata_fetcher2, metadata=None), format="yaml", metadata=b"foo: bar", origin=origin.url, visit=42, snapshot=snapshot.swhid(), release=release.swhid(), revision=revision.swhid(), directory=directory.swhid(), path=b"/foo/bar", ) content_metadata: Tuple[RawExtrinsicMetadata, ...] = ( content_metadata1, content_metadata2, content_metadata3, ) origin_metadata1 = RawExtrinsicMetadata( target=Origin(origin.url).swhid(), discovery_date=datetime.datetime( 2015, 1, 1, 21, 0, 0, tzinfo=datetime.timezone.utc ), authority=attr.evolve(metadata_authority, metadata=None), fetcher=attr.evolve(metadata_fetcher, metadata=None), format="json", metadata=b'{"foo": "bar"}', ) origin_metadata2 = RawExtrinsicMetadata( target=Origin(origin.url).swhid(), discovery_date=datetime.datetime( 2017, 1, 1, 22, 0, 0, tzinfo=datetime.timezone.utc ), authority=attr.evolve(metadata_authority, metadata=None), fetcher=attr.evolve(metadata_fetcher, metadata=None), format="yaml", metadata=b"foo: bar", ) origin_metadata3 = RawExtrinsicMetadata( target=Origin(origin.url).swhid(), discovery_date=datetime.datetime( 2017, 1, 1, 22, 0, 0, tzinfo=datetime.timezone.utc ), authority=attr.evolve(metadata_authority2, metadata=None), fetcher=attr.evolve(metadata_fetcher2, metadata=None), format="yaml", metadata=b"foo: bar", ) origin_metadata: Tuple[RawExtrinsicMetadata, ...] = ( origin_metadata1, origin_metadata2, origin_metadata3, ) extid1 = ExtID( target=CoreSWHID(object_type=SwhidObjectType.REVISION, object_id=revision.id), extid_type="git", extid=revision.id, ) extid2 = ExtID( target=CoreSWHID( object_type=SwhidObjectType.REVISION, object_id=hg_revision.id ), extid_type="mercurial", extid=hash_to_bytes("a316dfb434af2b451c1f393496b7eaeda343f543"), ) extid3 = ExtID( target=CoreSWHID(object_type=SwhidObjectType.DIRECTORY, object_id=directory.id), extid_type="directory", extid=b"something", ) extids: Tuple[ExtID, ...] = ( extid1, extid2, extid3, ) diff --git a/swh/storage/tests/test_storage_data.py b/swh/storage/tests/test_storage_data.py index 33030f1b..821b7f66 100644 --- a/swh/storage/tests/test_storage_data.py +++ b/swh/storage/tests/test_storage_data.py @@ -1,28 +1,42 @@ -# Copyright (C) 2020 The Software Heritage developers +# Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import pytest from swh.model.model import BaseModel from swh.storage.tests.storage_data import StorageData def test_storage_data(): data = StorageData() for attribute_key in [ "contents", "skipped_contents", "directories", "revisions", "releases", "snapshots", "origins", "origin_visits", "fetchers", "authorities", "origin_metadata", "content_metadata", ]: for obj in getattr(data, attribute_key): assert isinstance(obj, BaseModel) + + +@pytest.mark.parametrize( + "collection", + ("directories", "git_revisions", "hg_revisions", "releases", "snapshots"), +) +def test_storage_data_hash(collection): + data = StorageData() + + for obj in getattr(data, collection): + assert ( + obj.compute_hash() == obj.id + ), f"{obj.compute_hash().hex()} != {obj.id.hex()}" diff --git a/tox.ini b/tox.ini index 1710b45f..eebc9bfa 100644 --- a/tox.ini +++ b/tox.ini @@ -1,43 +1,83 @@ [tox] envlist=black,flake8,mypy,py3 [testenv] extras = testing deps = pytest-cov dev: ipdb passenv = SWH_CASSANDRA_BIN SWH_CASSANDRA_LOG JAVA_HOME commands = pytest \ !slow: --hypothesis-profile=fast \ slow: --hypothesis-profile=slow \ --cov={envsitepackagesdir}/swh/storage \ {envsitepackagesdir}/swh/storage \ --doctest-modules \ --cov-branch {posargs} [testenv:black] skip_install = true deps = black==19.10b0 commands = {envpython} -m black --check swh [testenv:flake8] skip_install = true deps = flake8 commands = {envpython} -m flake8 [testenv:mypy] extras = testing deps = mypy commands = mypy swh + +# build documentation outside swh-environment using the current +# git HEAD of swh-docs, is executed on CI for each diff to prevent +# breaking doc build +[testenv:sphinx] +whitelist_externals = make +usedevelop = true +extras = + testing +deps = + # fetch and install swh-docs in develop mode + -e git+https://forge.softwareheritage.org/source/swh-docs#egg=swh.docs + pifpaf + +setenv = + SWH_PACKAGE_DOC_TOX_BUILD = 1 + # turn warnings into errors + SPHINXOPTS = -W +commands = + {envpython} -m pifpaf run postgresql -- make -I ../.tox/sphinx/src/swh-docs/swh/ -C docs + + +# build documentation only inside swh-environment using local state +# of swh-docs package +[testenv:sphinx-dev] +whitelist_externals = make +usedevelop = true +extras = + testing +deps = + # install swh-docs in develop mode + -e ../swh-docs + pifpaf + +setenv = + SWH_PACKAGE_DOC_TOX_BUILD = 1 + # turn warnings into errors + SPHINXOPTS = -W +commands = + {envpython} -m pifpaf run postgresql -- make -I ../.tox/sphinx-dev/src/swh-docs/swh/ -C docs