diff --git a/PKG-INFO b/PKG-INFO index 7570110c..dfd9d0a8 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,202 +1,203 @@ Metadata-Version: 2.1 Name: swh.storage -Version: 0.0.187 +Version: 0.0.188 Summary: Software Heritage storage manager Home-page: https://forge.softwareheritage.org/diffusion/DSTO/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-storage Description: swh-storage =========== Abstraction layer over the archive, allowing to access all stored source code artifacts as well as their metadata. See the [documentation](https://docs.softwareheritage.org/devel/swh-storage/index.html) for more details. ## Quick start ### Dependencies Python tests for this module include tests that cannot be run without a local Postgresql database, so you need the Postgresql server executable on your machine (no need to have a running Postgresql server). On a Debian-like host: ``` $ sudo apt install libpq-dev postgresql ``` ### Installation It is strongly recommended to use a virtualenv. In the following, we consider you work in a virtualenv named `swh`. See the [developer setup guide](https://docs.softwareheritage.org/devel/developer-setup.html#developer-setup) for a more details on how to setup a working environment. You can install the package directly from [pypi](https://pypi.org/p/swh.storage): ``` (swh) :~$ pip install swh.storage [...] ``` Or from sources: ``` (swh) :~$ git clone https://forge.softwareheritage.org/source/swh-storage.git [...] (swh) :~$ cd swh-storage (swh) :~/swh-storage$ pip install . [...] ``` Then you can check it's properly installed: ``` (swh) :~$ swh storage --help Usage: swh storage [OPTIONS] COMMAND [ARGS]... Software Heritage Storage tools. Options: -h, --help Show this message and exit. Commands: rpc-serve Software Heritage Storage RPC server. ``` ## Tests The best way of running Python tests for this module is to use [tox](https://tox.readthedocs.io/). ``` (swh) :~$ pip install tox ``` ### tox From the sources directory, simply use tox: ``` (swh) :~/swh-storage$ tox [...] ========= 315 passed, 6 skipped, 15 warnings in 40.86 seconds ========== _______________________________ summary ________________________________ flake8: commands succeeded py3: commands succeeded congratulations :) ``` ## Development The storage server can be locally started. It requires a configuration file and a running Postgresql database. ### Sample configuration A typical configuration `storage.yml` file is: ``` storage: cls: local args: db: "dbname=softwareheritage-dev user= password=" objstorage: cls: pathslicing args: root: /tmp/swh-storage/ slicing: 0:2/2:4/4:6 ``` which means, this uses: - a local storage instance whose db connection is to `softwareheritage-dev` local instance, - the objstorage uses a local objstorage instance whose: - `root` path is /tmp/swh-storage, - slicing scheme is `0:2/2:4/4:6`. This means that the identifier of the content (sha1) which will be stored on disk at first level with the first 2 hex characters, the second level with the next 2 hex characters and the third level with the next 2 hex characters. And finally the complete hash file holding the raw content. For example: 00062f8bd330715c4f819373653d97b3cd34394c will be stored at 00/06/2f/00062f8bd330715c4f819373653d97b3cd34394c Note that the `root` path should exist on disk before starting the server. ### Starting the storage server If the python package has been properly installed (e.g. in a virtual env), you should be able to use the command: ``` (swh) :~/swh-storage$ swh storage rpc-serve storage.yml ``` This runs a local swh-storage api at 5002 port. ``` (swh) :~/swh-storage$ curl http://127.0.0.1:5002 Software Heritage storage server

You have reached the Software Heritage storage server.
See its documentation and API for more information

``` ### And then what? In your upper layer ([loader-git](https://forge.softwareheritage.org/source/swh-loader-git/), [loader-svn](https://forge.softwareheritage.org/source/swh-loader-svn/), etc...), you can define a remote storage with this snippet of yaml configuration. ``` storage: cls: remote args: url: http://localhost:5002/ ``` You could directly define a local storage with the following snippet: ``` storage: cls: local args: db: service=swh-dev objstorage: cls: pathslicing args: root: /home/storage/swh-storage/ slicing: 0:2/2:4/4:6 ``` Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable +Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing Provides-Extra: schemata Provides-Extra: journal diff --git a/requirements-swh-journal.txt b/requirements-swh-journal.txt index e43a37f4..6460fde2 100644 --- a/requirements-swh-journal.txt +++ b/requirements-swh-journal.txt @@ -1 +1 @@ -swh.journal >= 0.0.30 +swh.journal >= 0.0.31 diff --git a/setup.py b/setup.py index 309325da..eb79ad2d 100755 --- a/setup.py +++ b/setup.py @@ -1,75 +1,76 @@ #!/usr/bin/env python3 # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from setuptools import setup, find_packages from os import path from io import open here = path.abspath(path.dirname(__file__)) # Get the long description from the README file with open(path.join(here, "README.md"), encoding="utf-8") as f: long_description = f.read() def parse_requirements(name=None): if name: reqf = "requirements-%s.txt" % name else: reqf = "requirements.txt" requirements = [] if not path.exists(reqf): return requirements with open(reqf) as f: for line in f.readlines(): line = line.strip() if not line or line.startswith("#"): continue requirements.append(line) return requirements setup( name="swh.storage", description="Software Heritage storage manager", long_description=long_description, long_description_content_type="text/markdown", + python_requires=">=3.7", author="Software Heritage developers", author_email="swh-devel@inria.fr", url="https://forge.softwareheritage.org/diffusion/DSTO/", packages=find_packages(), scripts=["bin/swh-storage-add-dir",], entry_points=""" [console_scripts] swh-storage=swh.storage.cli:main [swh.cli.subcommands] storage=swh.storage.cli:storage """, setup_requires=["vcversioner"], install_requires=parse_requirements() + parse_requirements("swh"), extras_require={ "testing": (parse_requirements("test") + parse_requirements("swh-journal")), "schemata": ["SQLAlchemy"], "journal": parse_requirements("swh-journal"), }, vcversioner={}, include_package_data=True, classifiers=[ "Programming Language :: Python :: 3", "Intended Audience :: Developers", "License :: OSI Approved :: GNU General Public License v3 (GPLv3)", "Operating System :: OS Independent", "Development Status :: 5 - Production/Stable", ], project_urls={ "Bug Reports": "https://forge.softwareheritage.org/maniphest", "Funding": "https://www.softwareheritage.org/donate", "Source": "https://forge.softwareheritage.org/source/swh-storage", }, ) diff --git a/swh.storage.egg-info/PKG-INFO b/swh.storage.egg-info/PKG-INFO index 7570110c..dfd9d0a8 100644 --- a/swh.storage.egg-info/PKG-INFO +++ b/swh.storage.egg-info/PKG-INFO @@ -1,202 +1,203 @@ Metadata-Version: 2.1 Name: swh.storage -Version: 0.0.187 +Version: 0.0.188 Summary: Software Heritage storage manager Home-page: https://forge.softwareheritage.org/diffusion/DSTO/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-storage Description: swh-storage =========== Abstraction layer over the archive, allowing to access all stored source code artifacts as well as their metadata. See the [documentation](https://docs.softwareheritage.org/devel/swh-storage/index.html) for more details. ## Quick start ### Dependencies Python tests for this module include tests that cannot be run without a local Postgresql database, so you need the Postgresql server executable on your machine (no need to have a running Postgresql server). On a Debian-like host: ``` $ sudo apt install libpq-dev postgresql ``` ### Installation It is strongly recommended to use a virtualenv. In the following, we consider you work in a virtualenv named `swh`. See the [developer setup guide](https://docs.softwareheritage.org/devel/developer-setup.html#developer-setup) for a more details on how to setup a working environment. You can install the package directly from [pypi](https://pypi.org/p/swh.storage): ``` (swh) :~$ pip install swh.storage [...] ``` Or from sources: ``` (swh) :~$ git clone https://forge.softwareheritage.org/source/swh-storage.git [...] (swh) :~$ cd swh-storage (swh) :~/swh-storage$ pip install . [...] ``` Then you can check it's properly installed: ``` (swh) :~$ swh storage --help Usage: swh storage [OPTIONS] COMMAND [ARGS]... Software Heritage Storage tools. Options: -h, --help Show this message and exit. Commands: rpc-serve Software Heritage Storage RPC server. ``` ## Tests The best way of running Python tests for this module is to use [tox](https://tox.readthedocs.io/). ``` (swh) :~$ pip install tox ``` ### tox From the sources directory, simply use tox: ``` (swh) :~/swh-storage$ tox [...] ========= 315 passed, 6 skipped, 15 warnings in 40.86 seconds ========== _______________________________ summary ________________________________ flake8: commands succeeded py3: commands succeeded congratulations :) ``` ## Development The storage server can be locally started. It requires a configuration file and a running Postgresql database. ### Sample configuration A typical configuration `storage.yml` file is: ``` storage: cls: local args: db: "dbname=softwareheritage-dev user= password=" objstorage: cls: pathslicing args: root: /tmp/swh-storage/ slicing: 0:2/2:4/4:6 ``` which means, this uses: - a local storage instance whose db connection is to `softwareheritage-dev` local instance, - the objstorage uses a local objstorage instance whose: - `root` path is /tmp/swh-storage, - slicing scheme is `0:2/2:4/4:6`. This means that the identifier of the content (sha1) which will be stored on disk at first level with the first 2 hex characters, the second level with the next 2 hex characters and the third level with the next 2 hex characters. And finally the complete hash file holding the raw content. For example: 00062f8bd330715c4f819373653d97b3cd34394c will be stored at 00/06/2f/00062f8bd330715c4f819373653d97b3cd34394c Note that the `root` path should exist on disk before starting the server. ### Starting the storage server If the python package has been properly installed (e.g. in a virtual env), you should be able to use the command: ``` (swh) :~/swh-storage$ swh storage rpc-serve storage.yml ``` This runs a local swh-storage api at 5002 port. ``` (swh) :~/swh-storage$ curl http://127.0.0.1:5002 Software Heritage storage server

You have reached the Software Heritage storage server.
See its documentation and API for more information

``` ### And then what? In your upper layer ([loader-git](https://forge.softwareheritage.org/source/swh-loader-git/), [loader-svn](https://forge.softwareheritage.org/source/swh-loader-svn/), etc...), you can define a remote storage with this snippet of yaml configuration. ``` storage: cls: remote args: url: http://localhost:5002/ ``` You could directly define a local storage with the following snippet: ``` storage: cls: local args: db: service=swh-dev objstorage: cls: pathslicing args: root: /home/storage/swh-storage/ slicing: 0:2/2:4/4:6 ``` Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable +Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing Provides-Extra: schemata Provides-Extra: journal diff --git a/swh.storage.egg-info/SOURCES.txt b/swh.storage.egg-info/SOURCES.txt index 18bcfed9..6afaf6ca 100644 --- a/swh.storage.egg-info/SOURCES.txt +++ b/swh.storage.egg-info/SOURCES.txt @@ -1,246 +1,254 @@ MANIFEST.in Makefile Makefile.local README.md pyproject.toml pytest.ini setup.cfg setup.py tox.ini version.txt ./requirements-swh-journal.txt ./requirements-swh.txt ./requirements-test.txt ./requirements.txt bin/swh-storage-add-dir sql/.gitignore sql/Makefile sql/TODO sql/clusters.dot sql/bin/db-upgrade sql/bin/dot_add_content sql/doc/json/.gitignore sql/doc/json/Makefile sql/doc/json/entity.lister_metadata.schema.json sql/doc/json/entity.metadata.schema.json sql/doc/json/entity_history.lister_metadata.schema.json sql/doc/json/entity_history.metadata.schema.json sql/doc/json/fetch_history.result.schema.json sql/doc/json/list_history.result.schema.json sql/doc/json/listable_entity.list_params.schema.json sql/doc/json/origin_visit.metadata.json sql/doc/json/tool.tool_configuration.schema.json sql/json/.gitignore sql/json/Makefile sql/json/entity.lister_metadata.schema.json sql/json/entity.metadata.schema.json sql/json/entity_history.lister_metadata.schema.json sql/json/entity_history.metadata.schema.json sql/json/fetch_history.result.schema.json sql/json/list_history.result.schema.json sql/json/listable_entity.list_params.schema.json sql/json/origin_visit.metadata.json sql/json/tool.tool_configuration.schema.json sql/upgrades/015.sql sql/upgrades/016.sql sql/upgrades/017.sql sql/upgrades/018.sql sql/upgrades/019.sql sql/upgrades/020.sql sql/upgrades/021.sql sql/upgrades/022.sql sql/upgrades/023.sql sql/upgrades/024.sql sql/upgrades/025.sql sql/upgrades/026.sql sql/upgrades/027.sql sql/upgrades/028.sql sql/upgrades/029.sql sql/upgrades/030.sql sql/upgrades/032.sql sql/upgrades/033.sql sql/upgrades/034.sql sql/upgrades/035.sql sql/upgrades/036.sql sql/upgrades/037.sql sql/upgrades/038.sql sql/upgrades/039.sql sql/upgrades/040.sql sql/upgrades/041.sql sql/upgrades/042.sql sql/upgrades/043.sql sql/upgrades/044.sql sql/upgrades/045.sql sql/upgrades/046.sql sql/upgrades/047.sql sql/upgrades/048.sql sql/upgrades/049.sql sql/upgrades/050.sql sql/upgrades/051.sql sql/upgrades/052.sql sql/upgrades/053.sql sql/upgrades/054.sql sql/upgrades/055.sql sql/upgrades/056.sql sql/upgrades/057.sql sql/upgrades/058.sql sql/upgrades/059.sql sql/upgrades/060.sql sql/upgrades/061.sql sql/upgrades/062.sql sql/upgrades/063.sql sql/upgrades/064.sql sql/upgrades/065.sql sql/upgrades/066.sql sql/upgrades/067.sql sql/upgrades/068.sql sql/upgrades/069.sql sql/upgrades/070.sql sql/upgrades/071.sql sql/upgrades/072.sql sql/upgrades/073.sql sql/upgrades/074.sql sql/upgrades/075.sql sql/upgrades/076.sql sql/upgrades/077.sql sql/upgrades/078.sql sql/upgrades/079.sql sql/upgrades/080.sql sql/upgrades/081.sql sql/upgrades/082.sql sql/upgrades/083.sql sql/upgrades/084.sql sql/upgrades/085.sql sql/upgrades/086.sql sql/upgrades/087.sql sql/upgrades/088.sql sql/upgrades/089.sql sql/upgrades/090.sql sql/upgrades/091.sql sql/upgrades/092.sql sql/upgrades/093.sql sql/upgrades/094.sql sql/upgrades/095.sql sql/upgrades/096.sql sql/upgrades/097.sql sql/upgrades/098.sql sql/upgrades/099.sql sql/upgrades/100.sql sql/upgrades/101.sql sql/upgrades/102.sql sql/upgrades/103.sql sql/upgrades/104.sql sql/upgrades/105.sql sql/upgrades/106.sql sql/upgrades/107.sql sql/upgrades/108.sql sql/upgrades/109.sql sql/upgrades/110.sql sql/upgrades/111.sql sql/upgrades/112.sql sql/upgrades/113.sql sql/upgrades/114.sql sql/upgrades/115.sql sql/upgrades/116.sql sql/upgrades/117.sql sql/upgrades/118.sql sql/upgrades/119.sql sql/upgrades/120.sql sql/upgrades/121.sql sql/upgrades/122.sql sql/upgrades/123.sql sql/upgrades/124.sql sql/upgrades/125.sql sql/upgrades/126.sql sql/upgrades/127.sql sql/upgrades/128.sql sql/upgrades/129.sql sql/upgrades/130.sql sql/upgrades/131.sql sql/upgrades/132.sql sql/upgrades/133.sql sql/upgrades/134.sql sql/upgrades/135.sql sql/upgrades/136.sql sql/upgrades/137.sql sql/upgrades/138.sql sql/upgrades/139.sql sql/upgrades/140.sql sql/upgrades/141.sql sql/upgrades/142.sql sql/upgrades/143.sql sql/upgrades/144.sql sql/upgrades/145.sql sql/upgrades/146.sql swh/__init__.py swh.storage.egg-info/PKG-INFO swh.storage.egg-info/SOURCES.txt swh.storage.egg-info/dependency_links.txt swh.storage.egg-info/entry_points.txt swh.storage.egg-info/requires.txt swh.storage.egg-info/top_level.txt swh/storage/__init__.py +swh/storage/backfill.py swh/storage/buffer.py swh/storage/cli.py swh/storage/common.py swh/storage/converters.py swh/storage/db.py swh/storage/exc.py swh/storage/filter.py +swh/storage/fixer.py swh/storage/in_memory.py swh/storage/interface.py swh/storage/metrics.py swh/storage/objstorage.py swh/storage/py.typed +swh/storage/replay.py swh/storage/retry.py swh/storage/storage.py swh/storage/utils.py swh/storage/validate.py swh/storage/writer.py swh/storage/algos/__init__.py swh/storage/algos/diff.py swh/storage/algos/dir_iterators.py swh/storage/algos/origin.py swh/storage/algos/revisions_walker.py swh/storage/algos/snapshot.py swh/storage/api/__init__.py swh/storage/api/client.py swh/storage/api/serializers.py swh/storage/api/server.py swh/storage/cassandra/__init__.py swh/storage/cassandra/common.py swh/storage/cassandra/converters.py swh/storage/cassandra/cql.py swh/storage/cassandra/schema.py swh/storage/cassandra/storage.py swh/storage/sql/10-swh-init.sql swh/storage/sql/20-swh-enums.sql swh/storage/sql/30-swh-schema.sql swh/storage/sql/40-swh-func.sql swh/storage/sql/60-swh-indexes.sql swh/storage/tests/__init__.py swh/storage/tests/conftest.py swh/storage/tests/generate_data_test.py swh/storage/tests/storage_data.py swh/storage/tests/test_api_client.py swh/storage/tests/test_api_client_dicts.py +swh/storage/tests/test_backfill.py swh/storage/tests/test_buffer.py swh/storage/tests/test_cassandra.py swh/storage/tests/test_cassandra_converters.py +swh/storage/tests/test_cli.py swh/storage/tests/test_converters.py swh/storage/tests/test_db.py swh/storage/tests/test_exception.py swh/storage/tests/test_filter.py swh/storage/tests/test_in_memory.py swh/storage/tests/test_init.py +swh/storage/tests/test_kafka_writer.py swh/storage/tests/test_metrics.py +swh/storage/tests/test_replay.py swh/storage/tests/test_retry.py swh/storage/tests/test_server.py swh/storage/tests/test_storage.py swh/storage/tests/test_utils.py +swh/storage/tests/test_write_replay.py swh/storage/tests/algos/__init__.py swh/storage/tests/algos/test_diff.py swh/storage/tests/algos/test_dir_iterator.py swh/storage/tests/algos/test_origin.py swh/storage/tests/algos/test_revisions_walker.py swh/storage/tests/algos/test_snapshot.py \ No newline at end of file diff --git a/swh.storage.egg-info/requires.txt b/swh.storage.egg-info/requires.txt index 3130e1dd..61288852 100644 --- a/swh.storage.egg-info/requires.txt +++ b/swh.storage.egg-info/requires.txt @@ -1,28 +1,28 @@ click flask psycopg2 python-dateutil vcversioner aiohttp tenacity cassandra-driver!=3.21.0,>=3.19.0 swh.core[db,http]>=0.0.94 swh.model>=0.0.63 swh.objstorage>=0.0.40 [journal] -swh.journal>=0.0.30 +swh.journal>=0.0.31 [schemata] SQLAlchemy [testing] hypothesis>=3.11.0 pytest pytest-mock pytest-postgresql>=2.1.0 sqlalchemy-stubs swh.model[testing]>=0.0.50 pytz pytest-xdist -swh.journal>=0.0.30 +swh.journal>=0.0.31 diff --git a/swh/storage/backfill.py b/swh/storage/backfill.py new file mode 100644 index 00000000..6993dd00 --- /dev/null +++ b/swh/storage/backfill.py @@ -0,0 +1,490 @@ +# Copyright (C) 2017-2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +"""Storage backfiller. + +The backfiller goal is to produce back part or all of the objects +from a storage to the journal topics + +Current implementation consists in the JournalBackfiller class. + +It simply reads the objects from the storage and sends every object identifier back to +the journal. + +""" + +import logging + + +from swh.core.db import BaseDb +from swh.journal.writer.kafka import KafkaJournalWriter +from swh.storage.converters import db_to_release, db_to_revision + + +logger = logging.getLogger(__name__) + +PARTITION_KEY = { + "content": "sha1", + "skipped_content": "sha1", + "directory": "id", + "revision": "revision.id", + "release": "release.id", + "snapshot": "id", + "origin": "id", + "origin_visit": "origin_visit.origin", +} + +COLUMNS = { + "content": [ + "sha1", + "sha1_git", + "sha256", + "blake2s256", + "length", + "status", + "ctime", + ], + "skipped_content": [ + "sha1", + "sha1_git", + "sha256", + "blake2s256", + "length", + "ctime", + "status", + "reason", + ], + "directory": ["id", "dir_entries", "file_entries", "rev_entries"], + "revision": [ + ("revision.id", "id"), + "date", + "date_offset", + "committer_date", + "committer_date_offset", + "type", + "directory", + "message", + "synthetic", + "metadata", + "date_neg_utc_offset", + "committer_date_neg_utc_offset", + ( + "array(select parent_id::bytea from revision_history rh " + "where rh.id = revision.id order by rh.parent_rank asc)", + "parents", + ), + ("a.id", "author_id"), + ("a.name", "author_name"), + ("a.email", "author_email"), + ("a.fullname", "author_fullname"), + ("c.id", "committer_id"), + ("c.name", "committer_name"), + ("c.email", "committer_email"), + ("c.fullname", "committer_fullname"), + ], + "release": [ + ("release.id", "id"), + "date", + "date_offset", + "comment", + ("release.name", "name"), + "synthetic", + "date_neg_utc_offset", + "target", + "target_type", + ("a.id", "author_id"), + ("a.name", "author_name"), + ("a.email", "author_email"), + ("a.fullname", "author_fullname"), + ], + "snapshot": ["id", "object_id"], + "origin": ["type", "url"], + "origin_visit": [ + "visit", + "origin.type", + "origin_visit.type", + "url", + "date", + "snapshot", + "status", + "metadata", + ], +} + + +JOINS = { + "release": ["person a on release.author=a.id"], + "revision": [ + "person a on revision.author=a.id", + "person c on revision.committer=c.id", + ], + "origin_visit": ["origin on origin_visit.origin=origin.id"], +} + + +def directory_converter(db, directory): + """Convert directory from the flat representation to swh model + compatible objects. + + """ + columns = ["target", "name", "perms"] + query_template = """ + select %(columns)s + from directory_entry_%(type)s + where id in %%s + """ + + types = ["file", "dir", "rev"] + + entries = [] + with db.cursor() as cur: + for type in types: + ids = directory.pop("%s_entries" % type) + if not ids: + continue + query = query_template % { + "columns": ",".join(columns), + "type": type, + } + cur.execute(query, (tuple(ids),)) + for row in cur: + entry = dict(zip(columns, row)) + entry["type"] = type + entries.append(entry) + + directory["entries"] = entries + return directory + + +def revision_converter(db, revision): + """Convert revision from the flat representation to swh model + compatible objects. + + """ + return db_to_revision(revision) + + +def release_converter(db, release): + """Convert release from the flat representation to swh model + compatible objects. + + """ + release = db_to_release(release) + if "author" in release and release["author"]: + del release["author"]["id"] + return release + + +def snapshot_converter(db, snapshot): + """Convert snapshot from the flat representation to swh model + compatible objects. + + """ + columns = ["name", "target", "target_type"] + query = """ + select %s + from snapshot_branches sbs + inner join snapshot_branch sb on sb.object_id=sbs.branch_id + where sbs.snapshot_id=%%s + """ % ", ".join( + columns + ) + with db.cursor() as cur: + cur.execute(query, (snapshot.pop("object_id"),)) + branches = {} + for name, *row in cur: + branch = dict(zip(columns[1:], row)) + if not branch["target"] and not branch["target_type"]: + branch = None + branches[name] = branch + + snapshot["branches"] = branches + return snapshot + + +def origin_visit_converter(db, origin_visit): + origin = { + "type": origin_visit.pop("origin.type"), + "url": origin_visit.pop("url"), + } + origin_visit["origin"] = origin + origin_visit["type"] = origin_visit.pop("origin_visit.type") + return origin_visit + + +CONVERTERS = { + "directory": directory_converter, + "revision": revision_converter, + "release": release_converter, + "snapshot": snapshot_converter, + "origin_visit": origin_visit_converter, +} + + +def object_to_offset(object_id, numbits): + """Compute the index of the range containing object id, when dividing + space into 2^numbits. + + Args: + object_id (str): The hex representation of object_id + numbits (int): Number of bits in which we divide input space + + Returns: + The index of the range containing object id + + """ + q, r = divmod(numbits, 8) + length = q + (r != 0) + shift_bits = 8 - r if r else 0 + + truncated_id = object_id[: length * 2] + if len(truncated_id) < length * 2: + truncated_id += "0" * (length * 2 - len(truncated_id)) + + truncated_id_bytes = bytes.fromhex(truncated_id) + return int.from_bytes(truncated_id_bytes, byteorder="big") >> shift_bits + + +def byte_ranges(numbits, start_object=None, end_object=None): + """Generate start/end pairs of bytes spanning numbits bits and + constrained by optional start_object and end_object. + + Args: + numbits (int): Number of bits in which we divide input space + start_object (str): Hex object id contained in the first range + returned + end_object (str): Hex object id contained in the last range + returned + + Yields: + 2^numbits pairs of bytes + + """ + q, r = divmod(numbits, 8) + length = q + (r != 0) + shift_bits = 8 - r if r else 0 + + def to_bytes(i): + return int.to_bytes(i << shift_bits, length=length, byteorder="big") + + start_offset = 0 + end_offset = 1 << numbits + + if start_object is not None: + start_offset = object_to_offset(start_object, numbits) + if end_object is not None: + end_offset = object_to_offset(end_object, numbits) + 1 + + for start in range(start_offset, end_offset): + end = start + 1 + + if start == 0: + yield None, to_bytes(end) + elif end == 1 << numbits: + yield to_bytes(start), None + else: + yield to_bytes(start), to_bytes(end) + + +def integer_ranges(start, end, block_size=1000): + for start in range(start, end, block_size): + if start == 0: + yield None, block_size + elif start + block_size > end: + yield start, end + else: + yield start, start + block_size + + +RANGE_GENERATORS = { + "content": lambda start, end: byte_ranges(24, start, end), + "skipped_content": lambda start, end: [(None, None)], + "directory": lambda start, end: byte_ranges(24, start, end), + "revision": lambda start, end: byte_ranges(24, start, end), + "release": lambda start, end: byte_ranges(16, start, end), + "snapshot": lambda start, end: byte_ranges(16, start, end), + "origin": integer_ranges, + "origin_visit": integer_ranges, +} + + +def compute_query(obj_type, start, end): + columns = COLUMNS.get(obj_type) + join_specs = JOINS.get(obj_type, []) + join_clause = "\n".join("left join %s" % clause for clause in join_specs) + + where = [] + where_args = [] + if start: + where.append("%(keys)s >= %%s") + where_args.append(start) + if end: + where.append("%(keys)s < %%s") + where_args.append(end) + + where_clause = "" + if where: + where_clause = ("where " + " and ".join(where)) % { + "keys": "(%s)" % PARTITION_KEY[obj_type] + } + + column_specs = [] + column_aliases = [] + for column in columns: + if isinstance(column, str): + column_specs.append(column) + column_aliases.append(column) + else: + column_specs.append("%s as %s" % column) + column_aliases.append(column[1]) + + query = """ +select %(columns)s +from %(table)s +%(join)s +%(where)s + """ % { + "columns": ",".join(column_specs), + "table": obj_type, + "join": join_clause, + "where": where_clause, + } + + return query, where_args, column_aliases + + +def fetch(db, obj_type, start, end): + """Fetch all obj_type's identifiers from db. + + This opens one connection, stream objects and when done, close + the connection. + + Args: + db (BaseDb): Db connection object + obj_type (str): Object type + start (Union[bytes|Tuple]): Range start identifier + end (Union[bytes|Tuple]): Range end identifier + + Raises: + ValueError if obj_type is not supported + + Yields: + Objects in the given range + + """ + query, where_args, column_aliases = compute_query(obj_type, start, end) + converter = CONVERTERS.get(obj_type) + with db.cursor() as cursor: + logger.debug("Fetching data for table %s", obj_type) + logger.debug("query: %s %s", query, where_args) + cursor.execute(query, where_args) + for row in cursor: + record = dict(zip(column_aliases, row)) + if converter: + record = converter(db, record) + + logger.debug("record: %s" % record) + yield record + + +def _format_range_bound(bound): + if isinstance(bound, bytes): + return bound.hex() + else: + return str(bound) + + +MANDATORY_KEYS = ["brokers", "storage_dbconn", "prefix", "client_id"] + + +class JournalBackfiller: + """Class in charge of reading the storage's objects and sends those + back to the journal's topics. + + This is designed to be run periodically. + + """ + + def __init__(self, config=None): + self.config = config + self.check_config(config) + + def check_config(self, config): + missing_keys = [] + for key in MANDATORY_KEYS: + if not config.get(key): + missing_keys.append(key) + + if missing_keys: + raise ValueError( + "Configuration error: The following keys must be" + " provided: %s" % (",".join(missing_keys),) + ) + + def parse_arguments(self, object_type, start_object, end_object): + """Parse arguments + + Raises: + ValueError for unsupported object type + ValueError if object ids are not parseable + + Returns: + Parsed start and end object ids + + """ + if object_type not in COLUMNS: + raise ValueError( + "Object type %s is not supported. " + "The only possible values are %s" + % (object_type, ", ".join(COLUMNS.keys())) + ) + + if object_type in ["origin", "origin_visit"]: + if start_object: + start_object = int(start_object) + else: + start_object = 0 + if end_object: + end_object = int(end_object) + else: + end_object = 100 * 1000 * 1000 # hard-coded limit + + return start_object, end_object + + def run(self, object_type, start_object, end_object, dry_run=False): + """Reads storage's subscribed object types and send them to the + journal's reading topic. + + """ + start_object, end_object = self.parse_arguments( + object_type, start_object, end_object + ) + + db = BaseDb.connect(self.config["storage_dbconn"]) + writer = KafkaJournalWriter( + brokers=self.config["brokers"], + prefix=self.config["prefix"], + client_id=self.config["client_id"], + ) + for range_start, range_end in RANGE_GENERATORS[object_type]( + start_object, end_object + ): + logger.info( + "Processing %s range %s to %s", + object_type, + _format_range_bound(range_start), + _format_range_bound(range_end), + ) + + for obj in fetch(db, object_type, start=range_start, end=range_end,): + if dry_run: + continue + writer.write_addition(object_type=object_type, object_=obj) + + writer.producer.flush() + + +if __name__ == "__main__": + print('Please use the "swh-journal backfiller run" command') diff --git a/swh/storage/cli.py b/swh/storage/cli.py index 13b26128..03823387 100644 --- a/swh/storage/cli.py +++ b/swh/storage/cli.py @@ -1,62 +1,199 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import functools import logging +import os +import warnings import click +from swh.core import config from swh.core.cli import CONTEXT_SETTINGS +from swh.journal.client import get_journal_client +from swh.storage import get_storage from swh.storage.api.server import load_and_check_config, app +try: + from systemd.daemon import notify +except ImportError: + notify = None + @click.group(name="storage", context_settings=CONTEXT_SETTINGS) +@click.option( + "--config-file", + "-C", + default=None, + type=click.Path(exists=True, dir_okay=False,), + help="Configuration file.", +) @click.pass_context -def storage(ctx): +def storage(ctx, config_file): """Software Heritage Storage tools.""" - pass + if not config_file: + config_file = os.environ.get("SWH_CONFIG_FILENAME") + + if config_file: + if not os.path.exists(config_file): + raise ValueError("%s does not exist" % config_file) + conf = config.read(config_file) + else: + conf = {} + + ctx.ensure_object(dict) + + ctx.obj["config"] = conf @storage.command(name="rpc-serve") -@click.argument("config-path", required=True) +@click.argument("config-path", default=None, required=False) @click.option( "--host", default="0.0.0.0", metavar="IP", show_default=True, help="Host ip address to bind the server on", ) @click.option( "--port", default=5002, type=click.INT, metavar="PORT", show_default=True, help="Binding port of the server", ) @click.option( "--debug/--no-debug", default=True, help="Indicates if the server should run in debug mode", ) @click.pass_context def serve(ctx, config_path, host, port, debug): """Software Heritage Storage RPC server. Do NOT use this in a production environment. """ if "log_level" in ctx.obj: logging.getLogger("werkzeug").setLevel(ctx.obj["log_level"]) - api_cfg = load_and_check_config(config_path, type="any") - app.config.update(api_cfg) + if config_path: + # for bw compat + warnings.warn( + "The `config_path` argument of the `swh storage rpc-server` is now " + "deprecated. Please use the --config option of `swh storage` instead.", + DeprecationWarning, + ) + api_cfg = load_and_check_config(config_path, type="any") + app.config.update(api_cfg) + else: + app.config.update(ctx.obj["config"]) + app.run(host, port=int(port), debug=bool(debug)) +@storage.command() +@click.argument("object_type") +@click.option("--start-object", default=None) +@click.option("--end-object", default=None) +@click.option("--dry-run", is_flag=True, default=False) +@click.pass_context +def backfill(ctx, object_type, start_object, end_object, dry_run): + """Run the backfiller + + The backfiller list objects from a Storage and produce journal entries from + there. + + Typically used to rebuild a journal or compensate for missing objects in a + journal (eg. due to a downtime of this later). + + The configuration file requires the following entries: + - brokers: a list of kafka endpoints (the journal) in which entries will be + added. + - storage_dbconn: URL to connect to the storage DB. + - prefix: the prefix of the topics (topics will be .). + - client_id: the kafka client ID. + + """ + # for "lazy" loading + from swh.storage.backfill import JournalBackfiller + + try: + from systemd.daemon import notify + except ImportError: + notify = None + + conf = ctx.obj["config"] + backfiller = JournalBackfiller(conf) + + if notify: + notify("READY=1") + + try: + backfiller.run( + object_type=object_type, + start_object=start_object, + end_object=end_object, + dry_run=dry_run, + ) + except KeyboardInterrupt: + if notify: + notify("STOPPING=1") + ctx.exit(0) + + +@storage.command() +@click.option( + "--stop-after-objects", + "-n", + default=None, + type=int, + help="Stop after processing this many objects. Default is to " "run forever.", +) +@click.pass_context +def replay(ctx, stop_after_objects): + """Fill a Storage by reading a Journal. + + There can be several 'replayers' filling a Storage as long as they use + the same `group-id`. + """ + from swh.storage.replay import process_replay_objects + + conf = ctx.obj["config"] + try: + storage = get_storage(**conf.pop("storage")) + except KeyError: + ctx.fail("You must have a storage configured in your config file.") + client_cfg = conf.pop("journal_client") + if stop_after_objects: + client_cfg["stop_after_objects"] = stop_after_objects + try: + client = get_journal_client(**client_cfg) + except ValueError as exc: + ctx.fail(exc) + + worker_fn = functools.partial(process_replay_objects, storage=storage) + + if notify: + notify("READY=1") + + try: + client.process(worker_fn) + except KeyboardInterrupt: + ctx.exit(0) + else: + print("Done.") + finally: + if notify: + notify("STOPPING=1") + client.close() + + def main(): logging.basicConfig() return serve(auto_envvar_prefix="SWH_STORAGE") if __name__ == "__main__": main() diff --git a/swh/storage/fixer.py b/swh/storage/fixer.py new file mode 100644 index 00000000..7322b352 --- /dev/null +++ b/swh/storage/fixer.py @@ -0,0 +1,293 @@ +import copy +import logging +from typing import Any, Dict, List, Optional +from swh.model.identifiers import normalize_timestamp + +logger = logging.getLogger(__name__) + + +def _fix_content(content: Dict[str, Any]) -> Dict[str, Any]: + """Filters-out invalid 'perms' key that leaked from swh.model.from_disk + to the journal. + + >>> _fix_content({'perms': 0o100644, 'sha1_git': b'foo'}) + {'sha1_git': b'foo'} + + >>> _fix_content({'sha1_git': b'bar'}) + {'sha1_git': b'bar'} + + """ + content = content.copy() + content.pop("perms", None) + return content + + +def _fix_revision_pypi_empty_string(rev): + """PyPI loader failed to encode empty strings as bytes, see: + swh:1:rev:8f0095ee0664867055d03de9bcc8f95b91d8a2b9 + or https://forge.softwareheritage.org/D1772 + """ + rev = { + **rev, + "author": rev["author"].copy(), + "committer": rev["committer"].copy(), + } + if rev["author"].get("email") == "": + rev["author"]["email"] = b"" + if rev["author"].get("name") == "": + rev["author"]["name"] = b"" + if rev["committer"].get("email") == "": + rev["committer"]["email"] = b"" + if rev["committer"].get("name") == "": + rev["committer"]["name"] = b"" + return rev + + +def _fix_revision_transplant_source(rev): + if rev.get("metadata") and rev["metadata"].get("extra_headers"): + rev = copy.deepcopy(rev) + rev["metadata"]["extra_headers"] = [ + [key, value.encode("ascii")] + if key == "transplant_source" and isinstance(value, str) + else [key, value] + for (key, value) in rev["metadata"]["extra_headers"] + ] + return rev + + +def _check_date(date): + """Returns whether the date can be represented in backends with sane + limits on timestamps and timezones (resp. signed 64-bits and + signed 16 bits), and that microseconds is valid (ie. between 0 and 10^6). + """ + if date is None: + return True + date = normalize_timestamp(date) + return ( + (-(2 ** 63) <= date["timestamp"]["seconds"] < 2 ** 63) + and (0 <= date["timestamp"]["microseconds"] < 10 ** 6) + and (-(2 ** 15) <= date["offset"] < 2 ** 15) + ) + + +def _check_revision_date(rev): + """Exclude revisions with invalid dates. + See https://forge.softwareheritage.org/T1339""" + return _check_date(rev["date"]) and _check_date(rev["committer_date"]) + + +def _fix_revision(revision: Dict[str, Any]) -> Optional[Dict]: + """Fix various legacy revision issues. + + Fix author/committer person: + + >>> from pprint import pprint + >>> date = { + ... 'timestamp': { + ... 'seconds': 1565096932, + ... 'microseconds': 0, + ... }, + ... 'offset': 0, + ... } + >>> rev0 = _fix_revision({ + ... 'id': b'rev-id', + ... 'author': {'fullname': b'', 'name': '', 'email': ''}, + ... 'committer': {'fullname': b'', 'name': '', 'email': ''}, + ... 'date': date, + ... 'committer_date': date, + ... 'type': 'git', + ... 'message': '', + ... 'directory': b'dir-id', + ... 'synthetic': False, + ... }) + >>> rev0['author'] + {'fullname': b'', 'name': b'', 'email': b''} + >>> rev0['committer'] + {'fullname': b'', 'name': b'', 'email': b''} + + Fix type of 'transplant_source' extra headers: + + >>> rev1 = _fix_revision({ + ... 'id': b'rev-id', + ... 'author': {'fullname': b'', 'name': '', 'email': ''}, + ... 'committer': {'fullname': b'', 'name': '', 'email': ''}, + ... 'date': date, + ... 'committer_date': date, + ... 'metadata': { + ... 'extra_headers': [ + ... ['time_offset_seconds', b'-3600'], + ... ['transplant_source', '29c154a012a70f49df983625090434587622b39e'] + ... ]}, + ... 'type': 'git', + ... 'message': '', + ... 'directory': b'dir-id', + ... 'synthetic': False, + ... }) + >>> pprint(rev1['metadata']['extra_headers']) + [['time_offset_seconds', b'-3600'], + ['transplant_source', b'29c154a012a70f49df983625090434587622b39e']] + + Revision with invalid date are filtered: + + >>> from copy import deepcopy + >>> invalid_date1 = deepcopy(date) + >>> invalid_date1['timestamp']['microseconds'] = 1000000000 # > 10^6 + >>> rev = _fix_revision({ + ... 'author': {'fullname': b'', 'name': '', 'email': ''}, + ... 'committer': {'fullname': b'', 'name': '', 'email': ''}, + ... 'date': invalid_date1, + ... 'committer_date': date, + ... }) + >>> rev is None + True + + >>> invalid_date2 = deepcopy(date) + >>> invalid_date2['timestamp']['seconds'] = 2**70 # > 10^63 + >>> rev = _fix_revision({ + ... 'author': {'fullname': b'', 'name': '', 'email': ''}, + ... 'committer': {'fullname': b'', 'name': '', 'email': ''}, + ... 'date': invalid_date2, + ... 'committer_date': date, + ... }) + >>> rev is None + True + + >>> invalid_date3 = deepcopy(date) + >>> invalid_date3['offset'] = 2**20 # > 10^15 + >>> rev = _fix_revision({ + ... 'author': {'fullname': b'', 'name': '', 'email': ''}, + ... 'committer': {'fullname': b'', 'name': '', 'email': ''}, + ... 'date': date, + ... 'committer_date': invalid_date3, + ... }) + >>> rev is None + True + + """ # noqa + rev = _fix_revision_pypi_empty_string(revision) + rev = _fix_revision_transplant_source(rev) + if not _check_revision_date(rev): + logger.warning( + "Invalid revision date detected: %(revision)s", {"revision": rev} + ) + return None + return rev + + +def _fix_origin(origin: Dict) -> Dict: + """Fix legacy origin with type which is no longer part of the model. + + >>> from pprint import pprint + >>> pprint(_fix_origin({ + ... 'url': 'http://foo', + ... })) + {'url': 'http://foo'} + >>> pprint(_fix_origin({ + ... 'url': 'http://bar', + ... 'type': 'foo', + ... })) + {'url': 'http://bar'} + + """ + o = origin.copy() + o.pop("type", None) + return o + + +def _fix_origin_visit(visit: Dict) -> Dict: + """Fix various legacy origin visit issues. + + `visit['origin']` is a dict instead of an URL: + + >>> from datetime import datetime, timezone + >>> from pprint import pprint + >>> date = datetime(2020, 2, 27, 14, 39, 19, tzinfo=timezone.utc) + >>> pprint(_fix_origin_visit({ + ... 'origin': {'url': 'http://foo'}, + ... 'date': date, + ... 'type': 'git', + ... 'status': 'ongoing', + ... 'snapshot': None, + ... })) + {'date': datetime.datetime(2020, 2, 27, 14, 39, 19, tzinfo=datetime.timezone.utc), + 'metadata': None, + 'origin': 'http://foo', + 'snapshot': None, + 'status': 'ongoing', + 'type': 'git'} + + `visit['type']` is missing , but `origin['visit']['type']` exists: + + >>> pprint(_fix_origin_visit( + ... {'origin': {'type': 'hg', 'url': 'http://foo'}, + ... 'date': date, + ... 'status': 'ongoing', + ... 'snapshot': None, + ... })) + {'date': datetime.datetime(2020, 2, 27, 14, 39, 19, tzinfo=datetime.timezone.utc), + 'metadata': None, + 'origin': 'http://foo', + 'snapshot': None, + 'status': 'ongoing', + 'type': 'hg'} + + Old visit format (origin_visit with no type) raises: + + >>> _fix_origin_visit({ + ... 'origin': {'url': 'http://foo'}, + ... 'date': date, + ... 'status': 'ongoing', + ... 'snapshot': None + ... }) + Traceback (most recent call last): + ... + ValueError: Old origin visit format detected... + + >>> _fix_origin_visit({ + ... 'origin': 'http://foo', + ... 'date': date, + ... 'status': 'ongoing', + ... 'snapshot': None + ... }) + Traceback (most recent call last): + ... + ValueError: Old origin visit format detected... + + """ # noqa + visit = visit.copy() + if "type" not in visit: + if isinstance(visit["origin"], dict) and "type" in visit["origin"]: + # Very old version of the schema: visits did not have a type, + # but their 'origin' field was a dict with a 'type' key. + visit["type"] = visit["origin"]["type"] + else: + # Very old schema version: 'type' is missing, stop early + + # We expect the journal's origin_visit topic to no longer reference + # such visits. If it does, the replayer must crash so we can fix + # the journal's topic. + raise ValueError(f"Old origin visit format detected: {visit}") + if isinstance(visit["origin"], dict): + # Old version of the schema: visit['origin'] was a dict. + visit["origin"] = visit["origin"]["url"] + if "metadata" not in visit: + visit["metadata"] = None + return visit + + +def fix_objects(object_type: str, objects: List[Dict]) -> List[Dict]: + """ + Fix legacy objects from the journal to bring them up to date with the + latest storage schema. + """ + if object_type == "content": + return [_fix_content(v) for v in objects] + elif object_type == "revision": + revisions = [_fix_revision(v) for v in objects] + return [rev for rev in revisions if rev is not None] + elif object_type == "origin": + return [_fix_origin(v) for v in objects] + elif object_type == "origin_visit": + return [_fix_origin_visit(v) for v in objects] + else: + return objects diff --git a/swh/storage/in_memory.py b/swh/storage/in_memory.py index 2c72ff63..351df410 100644 --- a/swh/storage/in_memory.py +++ b/swh/storage/in_memory.py @@ -1,1027 +1,1080 @@ # Copyright (C) 2015-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import re import bisect import dateutil import collections import copy import datetime import itertools import random from collections import defaultdict from datetime import timedelta -from typing import Any, Dict, Iterable, List, Optional, Union +from typing import ( + Any, + Callable, + Dict, + Generic, + Iterable, + Iterator, + List, + Optional, + Tuple, + TypeVar, + Union, +) import attr from swh.model.model import ( BaseContent, Content, SkippedContent, Directory, Revision, Release, Snapshot, OriginVisit, Origin, SHA1_SIZE, ) from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex from swh.storage.objstorage import ObjStorage from swh.storage.validate import convert_validation_exceptions from swh.storage.utils import now from .exc import StorageArgumentException, HashCollision from .converters import origin_url_to_sha1 from .utils import get_partition_bounds_bytes from .writer import JournalWriter # Max block size of contents to return BULK_BLOCK_CONTENT_LEN_MAX = 10000 +SortedListItem = TypeVar("SortedListItem") +SortedListKey = TypeVar("SortedListKey") + + +class SortedList(collections.UserList, Generic[SortedListKey, SortedListItem]): + data: List[Tuple[SortedListKey, SortedListItem]] + + # https://github.com/python/mypy/issues/708 + # key: Callable[[SortedListItem], SortedListKey] + + def __init__( + self, + data: List[SortedListItem] = None, + key: Optional[Callable[[SortedListItem], SortedListKey]] = None, + ): + if key is None: + + def key(item): + return item + + assert key is not None # for mypy + super().__init__(sorted((key(x), x) for x in data or [])) + + self.key: Callable[[SortedListItem], SortedListKey] = key + + def add(self, item: SortedListItem): + k = self.key(item) + bisect.insort(self.data, (k, item)) + + def __iter__(self) -> Iterator[SortedListItem]: + for (k, item) in self.data: + yield item + + def iter_from(self, start_key: SortedListKey) -> Iterator[SortedListItem]: + """Returns an iterator over all the elements whose key is greater + or equal to `start_key`. + (This is an efficient equivalent to: + `(x for x in L if key(x) >= start_key)`) + """ + from_index = bisect.bisect_left(self.data, (start_key,)) + for (k, item) in itertools.islice(self.data, from_index, None): + yield item + + class InMemoryStorage: def __init__(self, journal_writer=None): self.reset() self.journal_writer = JournalWriter(journal_writer) def reset(self): self._contents = {} self._content_indexes = defaultdict(lambda: defaultdict(set)) self._skipped_contents = {} self._skipped_content_indexes = defaultdict(lambda: defaultdict(set)) self._directories = {} self._revisions = {} self._releases = {} self._snapshots = {} self._origins = {} self._origins_by_id = [] self._origins_by_sha1 = {} self._origin_visits = {} self._persons = [] self._origin_metadata = defaultdict(list) self._tools = {} self._metadata_providers = {} self._objects = defaultdict(list) - # ideally we would want a skip list for both fast inserts and searches - self._sorted_sha1s = [] + self._sorted_sha1s = SortedList[bytes, bytes]() self.objstorage = ObjStorage({"cls": "memory", "args": {}}) def check_config(self, *, check_write): return True def _content_add(self, contents: Iterable[Content], with_data: bool) -> Dict: self.journal_writer.content_add(contents) content_add = 0 content_add_bytes = 0 if with_data: summary = self.objstorage.content_add( c for c in contents if c.status != "absent" ) content_add_bytes = summary["content:add:bytes"] for content in contents: key = self._content_key(content) if key in self._contents: continue for algorithm in DEFAULT_ALGORITHMS: hash_ = content.get_hash(algorithm) if hash_ in self._content_indexes[algorithm] and ( algorithm not in {"blake2s256", "sha256"} ): colliding_content_hashes = [] # Add the already stored contents for content_hashes_set in self._content_indexes[algorithm][hash_]: hashes = dict(content_hashes_set) colliding_content_hashes.append(hashes) # Add the new colliding content colliding_content_hashes.append(content.hashes()) raise HashCollision(algorithm, hash_, colliding_content_hashes) for algorithm in DEFAULT_ALGORITHMS: hash_ = content.get_hash(algorithm) self._content_indexes[algorithm][hash_].add(key) self._objects[content.sha1_git].append(("content", content.sha1)) self._contents[key] = content - bisect.insort(self._sorted_sha1s, content.sha1) + self._sorted_sha1s.add(content.sha1) self._contents[key] = attr.evolve(self._contents[key], data=None) content_add += 1 summary = { "content:add": content_add, } if with_data: summary["content:add:bytes"] = content_add_bytes return summary def content_add(self, content: Iterable[Content]) -> Dict: content = [attr.evolve(c, ctime=now()) for c in content] return self._content_add(content, with_data=True) def content_update(self, content, keys=[]): self.journal_writer.content_update(content) for cont_update in content: cont_update = cont_update.copy() sha1 = cont_update.pop("sha1") for old_key in self._content_indexes["sha1"][sha1]: old_cont = self._contents.pop(old_key) for algorithm in DEFAULT_ALGORITHMS: hash_ = old_cont.get_hash(algorithm) self._content_indexes[algorithm][hash_].remove(old_key) new_cont = attr.evolve(old_cont, **cont_update) new_key = self._content_key(new_cont) self._contents[new_key] = new_cont for algorithm in DEFAULT_ALGORITHMS: hash_ = new_cont.get_hash(algorithm) self._content_indexes[algorithm][hash_].add(new_key) def content_add_metadata(self, content: Iterable[Content]) -> Dict: return self._content_add(content, with_data=False) def content_get(self, content): # FIXME: Make this method support slicing the `data`. if len(content) > BULK_BLOCK_CONTENT_LEN_MAX: raise StorageArgumentException( "Sending at most %s contents." % BULK_BLOCK_CONTENT_LEN_MAX ) yield from self.objstorage.content_get(content) def content_get_range(self, start, end, limit=1000): if limit is None: raise StorageArgumentException("limit should not be None") - from_index = bisect.bisect_left(self._sorted_sha1s, start) - sha1s = itertools.islice(self._sorted_sha1s, from_index, None) sha1s = ( (sha1, content_key) - for sha1 in sha1s + for sha1 in self._sorted_sha1s.iter_from(start) for content_key in self._content_indexes["sha1"][sha1] ) matched = [] next_content = None for sha1, key in sha1s: if sha1 > end: break if len(matched) >= limit: next_content = sha1 break matched.append(self._contents[key].to_dict()) return { "contents": matched, "next": next_content, } def content_get_partition( self, partition_id: int, nb_partitions: int, limit: int = 1000, page_token: str = None, ): if limit is None: raise StorageArgumentException("limit should not be None") (start, end) = get_partition_bounds_bytes( partition_id, nb_partitions, SHA1_SIZE ) if page_token: start = hash_to_bytes(page_token) if end is None: end = b"\xff" * SHA1_SIZE result = self.content_get_range(start, end, limit) result2 = { "contents": result["contents"], "next_page_token": None, } if result["next"]: result2["next_page_token"] = hash_to_hex(result["next"]) return result2 def content_get_metadata(self, contents: List[bytes]) -> Dict[bytes, List[Dict]]: result: Dict = {sha1: [] for sha1 in contents} for sha1 in contents: if sha1 in self._content_indexes["sha1"]: objs = self._content_indexes["sha1"][sha1] # only 1 element as content_add_metadata would have raised a # hash collision otherwise for key in objs: d = self._contents[key].to_dict() del d["ctime"] if "data" in d: del d["data"] result[sha1].append(d) return result def content_find(self, content): if not set(content).intersection(DEFAULT_ALGORITHMS): raise StorageArgumentException( "content keys must contain at least one of: %s" % ", ".join(sorted(DEFAULT_ALGORITHMS)) ) found = [] for algo in DEFAULT_ALGORITHMS: hash = content.get(algo) if hash and hash in self._content_indexes[algo]: found.append(self._content_indexes[algo][hash]) if not found: return [] keys = list(set.intersection(*found)) return [self._contents[key].to_dict() for key in keys] def content_missing(self, content, key_hash="sha1"): for cont in content: for (algo, hash_) in cont.items(): if algo not in DEFAULT_ALGORITHMS: continue if hash_ not in self._content_indexes.get(algo, []): yield cont[key_hash] break else: for result in self.content_find(cont): if result["status"] == "missing": yield cont[key_hash] def content_missing_per_sha1(self, contents): for content in contents: if content not in self._content_indexes["sha1"]: yield content def content_missing_per_sha1_git(self, contents): for content in contents: if content not in self._content_indexes["sha1_git"]: yield content def content_get_random(self): return random.choice(list(self._content_indexes["sha1_git"])) def _skipped_content_add(self, contents: List[SkippedContent]) -> Dict: self.journal_writer.skipped_content_add(contents) summary = {"skipped_content:add": 0} missing_contents = self.skipped_content_missing([c.hashes() for c in contents]) missing = {self._content_key(c) for c in missing_contents} contents = [c for c in contents if self._content_key(c) in missing] for content in contents: key = self._content_key(content) for algo in DEFAULT_ALGORITHMS: if content.get_hash(algo): self._skipped_content_indexes[algo][content.get_hash(algo)].add(key) self._skipped_contents[key] = content summary["skipped_content:add"] += 1 return summary def skipped_content_add(self, content: Iterable[SkippedContent]) -> Dict: content = [attr.evolve(c, ctime=now()) for c in content] return self._skipped_content_add(content) def skipped_content_missing(self, contents): for content in contents: matches = list(self._skipped_contents.values()) for (algorithm, key) in self._content_key(content): if algorithm == "blake2s256": continue # Filter out skipped contents with the same hash matches = [ match for match in matches if match.get_hash(algorithm) == key ] # if none of the contents match if not matches: yield {algo: content[algo] for algo in DEFAULT_ALGORITHMS} def directory_add(self, directories: Iterable[Directory]) -> Dict: directories = [dir_ for dir_ in directories if dir_.id not in self._directories] self.journal_writer.directory_add(directories) count = 0 for directory in directories: count += 1 self._directories[directory.id] = directory self._objects[directory.id].append(("directory", directory.id)) return {"directory:add": count} def directory_missing(self, directories): for id in directories: if id not in self._directories: yield id def _join_dentry_to_content(self, dentry): keys = ( "status", "sha1", "sha1_git", "sha256", "length", ) ret = dict.fromkeys(keys) ret.update(dentry) if ret["type"] == "file": # TODO: Make it able to handle more than one content content = self.content_find({"sha1_git": ret["target"]}) if content: content = content[0] for key in keys: ret[key] = content[key] return ret def _directory_ls(self, directory_id, recursive, prefix=b""): if directory_id in self._directories: for entry in self._directories[directory_id].entries: ret = self._join_dentry_to_content(entry.to_dict()) ret["name"] = prefix + ret["name"] ret["dir_id"] = directory_id yield ret if recursive and ret["type"] == "dir": yield from self._directory_ls( ret["target"], True, prefix + ret["name"] + b"/" ) def directory_ls(self, directory, recursive=False): yield from self._directory_ls(directory, recursive) def directory_entry_get_by_path(self, directory, paths): return self._directory_entry_get_by_path(directory, paths, b"") def directory_get_random(self): if not self._directories: return None return random.choice(list(self._directories)) def _directory_entry_get_by_path(self, directory, paths, prefix): if not paths: return contents = list(self.directory_ls(directory)) if not contents: return def _get_entry(entries, name): for entry in entries: if entry["name"] == name: entry = entry.copy() entry["name"] = prefix + entry["name"] return entry first_item = _get_entry(contents, paths[0]) if len(paths) == 1: return first_item if not first_item or first_item["type"] != "dir": return return self._directory_entry_get_by_path( first_item["target"], paths[1:], prefix + paths[0] + b"/" ) def revision_add(self, revisions: Iterable[Revision]) -> Dict: revisions = [rev for rev in revisions if rev.id not in self._revisions] self.journal_writer.revision_add(revisions) count = 0 for revision in revisions: revision = attr.evolve( revision, committer=self._person_add(revision.committer), author=self._person_add(revision.author), ) self._revisions[revision.id] = revision self._objects[revision.id].append(("revision", revision.id)) count += 1 return {"revision:add": count} def revision_missing(self, revisions): for id in revisions: if id not in self._revisions: yield id def revision_get(self, revisions): for id in revisions: if id in self._revisions: yield self._revisions.get(id).to_dict() else: yield None def _get_parent_revs(self, rev_id, seen, limit): if limit and len(seen) >= limit: return if rev_id in seen or rev_id not in self._revisions: return seen.add(rev_id) yield self._revisions[rev_id].to_dict() for parent in self._revisions[rev_id].parents: yield from self._get_parent_revs(parent, seen, limit) def revision_log(self, revisions, limit=None): seen = set() for rev_id in revisions: yield from self._get_parent_revs(rev_id, seen, limit) def revision_shortlog(self, revisions, limit=None): yield from ( (rev["id"], rev["parents"]) for rev in self.revision_log(revisions, limit) ) def revision_get_random(self): return random.choice(list(self._revisions)) def release_add(self, releases: Iterable[Release]) -> Dict: releases = [rel for rel in releases if rel.id not in self._releases] self.journal_writer.release_add(releases) count = 0 for rel in releases: if rel.author: self._person_add(rel.author) self._objects[rel.id].append(("release", rel.id)) self._releases[rel.id] = rel count += 1 return {"release:add": count} def release_missing(self, releases): yield from (rel for rel in releases if rel not in self._releases) def release_get(self, releases): for rel_id in releases: if rel_id in self._releases: yield self._releases[rel_id].to_dict() else: yield None def release_get_random(self): return random.choice(list(self._releases)) def snapshot_add(self, snapshots: Iterable[Snapshot]) -> Dict: count = 0 snapshots = (snap for snap in snapshots if snap.id not in self._snapshots) for snapshot in snapshots: self.journal_writer.snapshot_add(snapshot) sorted_branch_names = sorted(snapshot.branches) self._snapshots[snapshot.id] = (snapshot, sorted_branch_names) self._objects[snapshot.id].append(("snapshot", snapshot.id)) count += 1 return {"snapshot:add": count} def snapshot_missing(self, snapshots): for id in snapshots: if id not in self._snapshots: yield id def snapshot_get(self, snapshot_id): return self.snapshot_get_branches(snapshot_id) def snapshot_get_by_origin_visit(self, origin, visit): origin_url = self._get_origin_url(origin) if not origin_url: return if origin_url not in self._origins or visit > len( self._origin_visits[origin_url] ): return None snapshot_id = self._origin_visits[origin_url][visit - 1].snapshot if snapshot_id: return self.snapshot_get(snapshot_id) else: return None def snapshot_get_latest(self, origin, allowed_statuses=None): origin_url = self._get_origin_url(origin) if not origin_url: return visit = self.origin_visit_get_latest( origin_url, allowed_statuses=allowed_statuses, require_snapshot=True ) if visit and visit["snapshot"]: snapshot = self.snapshot_get(visit["snapshot"]) if not snapshot: raise StorageArgumentException( "last origin visit references an unknown snapshot" ) return snapshot def snapshot_count_branches(self, snapshot_id): (snapshot, _) = self._snapshots[snapshot_id] return collections.Counter( branch.target_type.value if branch else None for branch in snapshot.branches.values() ) def snapshot_get_branches( self, snapshot_id, branches_from=b"", branches_count=1000, target_types=None ): res = self._snapshots.get(snapshot_id) if res is None: return None (snapshot, sorted_branch_names) = res from_index = bisect.bisect_left(sorted_branch_names, branches_from) if target_types: next_branch = None branches = {} for branch_name in sorted_branch_names[from_index:]: branch = snapshot.branches[branch_name] if branch and branch.target_type.value in target_types: if len(branches) < branches_count: branches[branch_name] = branch else: next_branch = branch_name break else: # As there is no 'target_types', we can do that much faster to_index = from_index + branches_count returned_branch_names = sorted_branch_names[from_index:to_index] branches = { branch_name: snapshot.branches[branch_name] for branch_name in returned_branch_names } if to_index >= len(sorted_branch_names): next_branch = None else: next_branch = sorted_branch_names[to_index] branches = { name: branch.to_dict() if branch else None for (name, branch) in branches.items() } return { "id": snapshot_id, "branches": branches, "next_branch": next_branch, } def snapshot_get_random(self): return random.choice(list(self._snapshots)) def object_find_by_sha1_git(self, ids): ret = {} for id_ in ids: objs = self._objects.get(id_, []) ret[id_] = [{"sha1_git": id_, "type": obj[0],} for obj in objs] return ret def _convert_origin(self, t): if t is None: return None return t.to_dict() def origin_get(self, origins): if isinstance(origins, dict): # Old API return_single = True origins = [origins] else: return_single = False # Sanity check to be error-compatible with the pgsql backend if any("id" in origin for origin in origins) and not all( "id" in origin for origin in origins ): raise StorageArgumentException( 'Either all origins or none at all should have an "id".' ) if any("url" in origin for origin in origins) and not all( "url" in origin for origin in origins ): raise StorageArgumentException( "Either all origins or none at all should have " 'an "url" key.' ) results = [] for origin in origins: result = None if "url" in origin: if origin["url"] in self._origins: result = self._origins[origin["url"]] else: raise StorageArgumentException("Origin must have an url.") results.append(self._convert_origin(result)) if return_single: assert len(results) == 1 return results[0] else: return results def origin_get_by_sha1(self, sha1s): return [self._convert_origin(self._origins_by_sha1.get(sha1)) for sha1 in sha1s] def origin_get_range(self, origin_from=1, origin_count=100): origin_from = max(origin_from, 1) if origin_from <= len(self._origins_by_id): max_idx = origin_from + origin_count - 1 if max_idx > len(self._origins_by_id): max_idx = len(self._origins_by_id) for idx in range(origin_from - 1, max_idx): origin = self._convert_origin(self._origins[self._origins_by_id[idx]]) yield {"id": idx + 1, **origin} def origin_list(self, page_token: Optional[str] = None, limit: int = 100) -> dict: origin_urls = sorted(self._origins) if page_token: from_ = bisect.bisect_left(origin_urls, page_token) else: from_ = 0 result = { "origins": [ {"url": origin_url} for origin_url in origin_urls[from_ : from_ + limit] ] } if from_ + limit < len(origin_urls): result["next_page_token"] = origin_urls[from_ + limit] return result def origin_search( self, url_pattern, offset=0, limit=50, regexp=False, with_visit=False ): origins = map(self._convert_origin, self._origins.values()) if regexp: pat = re.compile(url_pattern) origins = [orig for orig in origins if pat.search(orig["url"])] else: origins = [orig for orig in origins if url_pattern in orig["url"]] if with_visit: origins = [ orig for orig in origins if len(self._origin_visits[orig["url"]]) > 0 and set( ov.snapshot for ov in self._origin_visits[orig["url"]] if ov.snapshot ) & set(self._snapshots) ] return origins[offset : offset + limit] def origin_count(self, url_pattern, regexp=False, with_visit=False): return len( self.origin_search( url_pattern, regexp=regexp, with_visit=with_visit, limit=len(self._origins), ) ) def origin_add(self, origins: Iterable[Origin]) -> List[Dict]: origins = copy.deepcopy(list(origins)) for origin in origins: self.origin_add_one(origin) return [origin.to_dict() for origin in origins] def origin_add_one(self, origin: Origin) -> str: if origin.url not in self._origins: self.journal_writer.origin_add_one(origin) # generate an origin_id because it is needed by origin_get_range. # TODO: remove this when we remove origin_get_range origin_id = len(self._origins) + 1 self._origins_by_id.append(origin.url) assert len(self._origins_by_id) == origin_id self._origins[origin.url] = origin self._origins_by_sha1[origin_url_to_sha1(origin.url)] = origin self._origin_visits[origin.url] = [] self._objects[origin.url].append(("origin", origin.url)) return origin.url def origin_visit_add( self, origin_url: str, date: Union[str, datetime.datetime], type: str ) -> OriginVisit: if isinstance(date, str): # FIXME: Converge on iso8601 at some point date = dateutil.parser.parse(date) elif not isinstance(date, datetime.datetime): raise StorageArgumentException("Date must be a datetime or a string") origin = self.origin_get({"url": origin_url}) if not origin: # Cannot add a visit without an origin raise StorageArgumentException("Unknown origin %s", origin_url) if origin_url in self._origins: origin = self._origins[origin_url] # visit ids are in the range [1, +inf[ visit_id = len(self._origin_visits[origin_url]) + 1 status = "ongoing" visit = OriginVisit( origin=origin_url, date=date, type=type, status=status, snapshot=None, metadata=None, visit=visit_id, ) self._origin_visits[origin_url].append(visit) visit = visit self._objects[(origin_url, visit.visit)].append(("origin_visit", None)) self.journal_writer.origin_visit_add(visit) # return last visit return visit def origin_visit_update( self, origin: str, visit_id: int, status: str, metadata: Optional[Dict] = None, snapshot: Optional[bytes] = None, date: Optional[datetime.datetime] = None, ): origin_url = self._get_origin_url(origin) if origin_url is None: raise StorageArgumentException("Unknown origin.") try: visit = self._origin_visits[origin_url][visit_id - 1] except IndexError: raise StorageArgumentException("Unknown visit_id for this origin") from None updates: Dict[str, Any] = {"status": status} if metadata: updates["metadata"] = metadata if snapshot: updates["snapshot"] = snapshot with convert_validation_exceptions(): visit = attr.evolve(visit, **updates) self.journal_writer.origin_visit_update(visit) self._origin_visits[origin_url][visit_id - 1] = visit def origin_visit_upsert(self, visits: Iterable[OriginVisit]) -> None: for visit in visits: if visit.visit is None: raise StorageArgumentException(f"Missing visit id for visit {visit}") self.journal_writer.origin_visit_upsert(visits) for visit in visits: visit_id = visit.visit origin_url = visit.origin with convert_validation_exceptions(): visit = attr.evolve(visit, origin=origin_url) self._objects[(origin_url, visit_id)].append(("origin_visit", None)) if visit_id: while len(self._origin_visits[origin_url]) <= visit_id: self._origin_visits[origin_url].append(None) self._origin_visits[origin_url][visit_id - 1] = visit def _convert_visit(self, visit): if visit is None: return visit = visit.to_dict() return visit def origin_visit_get( self, origin: str, last_visit: Optional[int] = None, limit: Optional[int] = None ) -> Iterable[Dict[str, Any]]: origin_url = self._get_origin_url(origin) if origin_url in self._origin_visits: visits = self._origin_visits[origin_url] if last_visit is not None: visits = visits[last_visit:] if limit is not None: visits = visits[:limit] for visit in visits: if not visit: continue visit_id = visit.visit yield self._convert_visit(self._origin_visits[origin_url][visit_id - 1]) def origin_visit_find_by_date( self, origin: str, visit_date: datetime.datetime ) -> Optional[Dict[str, Any]]: origin_url = self._get_origin_url(origin) if origin_url in self._origin_visits: visits = self._origin_visits[origin_url] visit = min(visits, key=lambda v: (abs(v.date - visit_date), -v.visit)) return self._convert_visit(visit) return None def origin_visit_get_by(self, origin: str, visit: int) -> Optional[Dict[str, Any]]: origin_url = self._get_origin_url(origin) if origin_url in self._origin_visits and visit <= len( self._origin_visits[origin_url] ): return self._convert_visit(self._origin_visits[origin_url][visit - 1]) return None def origin_visit_get_latest( self, origin: str, allowed_statuses: Optional[List[str]] = None, require_snapshot: bool = False, ) -> Optional[Dict[str, Any]]: ori = self._origins.get(origin) if not ori: return None visits = self._origin_visits[ori.url] if allowed_statuses is not None: visits = [visit for visit in visits if visit.status in allowed_statuses] if require_snapshot: visits = [visit for visit in visits if visit.snapshot] visit = max(visits, key=lambda v: (v.date, v.visit), default=None) return self._convert_visit(visit) def _select_random_origin_visit_by_type(self, type: str) -> str: while True: url = random.choice(list(self._origin_visits.keys())) random_origin_visits = self._origin_visits[url] if random_origin_visits[0].type == type: return url def origin_visit_get_random(self, type: str) -> Optional[Dict[str, Any]]: url = self._select_random_origin_visit_by_type(type) random_origin_visits = copy.deepcopy(self._origin_visits[url]) random_origin_visits.reverse() back_in_the_day = now() - timedelta(weeks=12) # 3 months back # This should be enough for tests for visit in random_origin_visits: if visit.date > back_in_the_day and visit.status == "full": return visit.to_dict() else: return None def stat_counters(self): keys = ( "content", "directory", "origin", "origin_visit", "person", "release", "revision", "skipped_content", "snapshot", ) stats = {key: 0 for key in keys} stats.update( collections.Counter( obj_type for (obj_type, obj_id) in itertools.chain(*self._objects.values()) ) ) return stats def refresh_stat_counters(self): pass def origin_metadata_add(self, origin_url, ts, provider, tool, metadata): if not isinstance(origin_url, str): raise TypeError("origin_id must be str, not %r" % (origin_url,)) if isinstance(ts, str): ts = dateutil.parser.parse(ts) origin_metadata = { "origin_url": origin_url, "discovery_date": ts, "tool_id": tool, "metadata": metadata, "provider_id": provider, } self._origin_metadata[origin_url].append(origin_metadata) return None def origin_metadata_get_by(self, origin_url, provider_type=None): if not isinstance(origin_url, str): raise TypeError("origin_url must be str, not %r" % (origin_url,)) metadata = [] for item in self._origin_metadata[origin_url]: item = copy.deepcopy(item) provider = self.metadata_provider_get(item["provider_id"]) for attr_name in ("name", "type", "url"): item["provider_" + attr_name] = provider["provider_" + attr_name] metadata.append(item) return metadata def tool_add(self, tools): inserted = [] for tool in tools: key = self._tool_key(tool) assert "id" not in tool record = copy.deepcopy(tool) record["id"] = key # TODO: remove this if key not in self._tools: self._tools[key] = record inserted.append(copy.deepcopy(self._tools[key])) return inserted def tool_get(self, tool): return self._tools.get(self._tool_key(tool)) def metadata_provider_add( self, provider_name, provider_type, provider_url, metadata ): provider = { "provider_name": provider_name, "provider_type": provider_type, "provider_url": provider_url, "metadata": metadata, } key = self._metadata_provider_key(provider) provider["id"] = key self._metadata_providers[key] = provider return key def metadata_provider_get(self, provider_id): return self._metadata_providers.get(provider_id) def metadata_provider_get_by(self, provider): key = self._metadata_provider_key(provider) return self._metadata_providers.get(key) def _get_origin_url(self, origin): if isinstance(origin, str): return origin else: raise TypeError("origin must be a string.") def _person_add(self, person): key = ("person", person.fullname) if key not in self._objects: person_id = len(self._persons) + 1 self._persons.append(person) self._objects[key].append(("person", person_id)) else: person_id = self._objects[key][0][1] person = self._persons[person_id - 1] return person @staticmethod def _content_key(content): """ A stable key and the algorithm for a content""" if isinstance(content, BaseContent): content = content.to_dict() return tuple((key, content.get(key)) for key in sorted(DEFAULT_ALGORITHMS)) @staticmethod def _tool_key(tool): return "%r %r %r" % ( tool["name"], tool["version"], tuple(sorted(tool["configuration"].items())), ) @staticmethod def _metadata_provider_key(provider): return "%r %r" % (provider["provider_name"], provider["provider_url"]) def diff_directories(self, from_dir, to_dir, track_renaming=False): raise NotImplementedError("InMemoryStorage.diff_directories") def diff_revisions(self, from_rev, to_rev, track_renaming=False): raise NotImplementedError("InMemoryStorage.diff_revisions") def diff_revision(self, revision, track_renaming=False): raise NotImplementedError("InMemoryStorage.diff_revision") def clear_buffers(self, object_types: Optional[Iterable[str]] = None) -> None: """Do nothing """ return None def flush(self, object_types: Optional[Iterable[str]] = None) -> Dict: return {} diff --git a/swh/storage/replay.py b/swh/storage/replay.py new file mode 100644 index 00000000..0a15d08d --- /dev/null +++ b/swh/storage/replay.py @@ -0,0 +1,128 @@ +# Copyright (C) 2019-2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import logging +from typing import Any, Callable, Dict, Iterable, List + +try: + from systemd.daemon import notify +except ImportError: + notify = None + +from swh.core.statsd import statsd +from swh.storage.fixer import fix_objects + +from swh.model.model import ( + BaseContent, + BaseModel, + Content, + Directory, + Origin, + OriginVisit, + Revision, + SkippedContent, + Snapshot, + Release, +) +from swh.storage.exc import HashCollision + +logger = logging.getLogger(__name__) + +GRAPH_OPERATIONS_METRIC = "swh_graph_replayer_operations_total" +GRAPH_DURATION_METRIC = "swh_graph_replayer_duration_seconds" + + +object_converter_fn: Dict[str, Callable[[Dict], BaseModel]] = { + "origin": Origin.from_dict, + "origin_visit": OriginVisit.from_dict, + "snapshot": Snapshot.from_dict, + "revision": Revision.from_dict, + "release": Release.from_dict, + "directory": Directory.from_dict, + "content": Content.from_dict, + "skipped_content": SkippedContent.from_dict, +} + + +def process_replay_objects(all_objects, *, storage): + for (object_type, objects) in all_objects.items(): + logger.debug("Inserting %s %s objects", len(objects), object_type) + with statsd.timed(GRAPH_DURATION_METRIC, tags={"object_type": object_type}): + _insert_objects(object_type, objects, storage) + statsd.increment( + GRAPH_OPERATIONS_METRIC, len(objects), tags={"object_type": object_type} + ) + if notify: + notify("WATCHDOG=1") + + +def collision_aware_content_add( + content_add_fn: Callable[[Iterable[Any]], None], contents: List[BaseContent] +) -> None: + """Add contents to storage. If a hash collision is detected, an error is + logged. Then this adds the other non colliding contents to the storage. + + Args: + content_add_fn: Storage content callable + contents: List of contents or skipped contents to add to storage + + """ + if not contents: + return + colliding_content_hashes: List[Dict[str, Any]] = [] + while True: + try: + content_add_fn(contents) + except HashCollision as e: + colliding_content_hashes.append( + { + "algo": e.algo, + "hash": e.hash_id, # hex hash id + "objects": e.colliding_contents, # hex hashes + } + ) + colliding_hashes = e.colliding_content_hashes() + # Drop the colliding contents from the transaction + contents = [c for c in contents if c.hashes() not in colliding_hashes] + else: + # Successfully added contents, we are done + break + if colliding_content_hashes: + for collision in colliding_content_hashes: + logger.error("Collision detected: %(collision)s", {"collision": collision}) + + +def _insert_objects(object_type: str, objects: List[Dict], storage) -> None: + """Insert objects of type object_type in the storage. + + """ + objects = fix_objects(object_type, objects) + + if object_type == "content": + contents: List[BaseContent] = [] + skipped_contents: List[BaseContent] = [] + for content in objects: + c = BaseContent.from_dict(content) + if isinstance(c, SkippedContent): + skipped_contents.append(c) + else: + contents.append(c) + + collision_aware_content_add(storage.skipped_content_add, skipped_contents) + collision_aware_content_add(storage.content_add_metadata, contents) + elif object_type == "origin_visit": + visits: List[OriginVisit] = [] + origins: List[Origin] = [] + for obj in objects: + visit = OriginVisit.from_dict(obj) + visits.append(visit) + origins.append(Origin(url=visit.origin)) + storage.origin_add(origins) + storage.origin_visit_upsert(visits) + elif object_type in ("directory", "revision", "release", "snapshot", "origin"): + method = getattr(storage, object_type + "_add") + method(object_converter_fn[object_type](o) for o in objects) + else: + logger.warning("Received a series of %s, this should not happen", object_type) diff --git a/swh/storage/tests/conftest.py b/swh/storage/tests/conftest.py index 6cc476f0..6e095d13 100644 --- a/swh/storage/tests/conftest.py +++ b/swh/storage/tests/conftest.py @@ -1,254 +1,252 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import glob import pytest from typing import Union from pytest_postgresql import factories from pytest_postgresql.janitor import DatabaseJanitor, psycopg2, Version from os import path, environ from hypothesis import settings from typing import Dict import swh.storage from swh.core.utils import numfile_sortkey as sortkey - from swh.model.tests.generate_testdata import gen_contents, gen_origins from swh.model.model import ( Content, Directory, Origin, OriginVisit, Release, Revision, SkippedContent, Snapshot, ) OBJECT_FACTORY = { "content": Content.from_dict, "directory": Directory.from_dict, "origin": Origin.from_dict, "origin_visit": OriginVisit.from_dict, "release": Release.from_dict, "revision": Revision.from_dict, "skipped_content": SkippedContent.from_dict, "snapshot": Snapshot.from_dict, } - SQL_DIR = path.join(path.dirname(swh.storage.__file__), "sql") environ["LC_ALL"] = "C.UTF-8" DUMP_FILES = path.join(SQL_DIR, "*.sql") # define tests profile. Full documentation is at: # https://hypothesis.readthedocs.io/en/latest/settings.html#settings-profiles settings.register_profile("fast", max_examples=5, deadline=5000) settings.register_profile("slow", max_examples=20, deadline=5000) @pytest.fixture def swh_storage_backend_config(postgresql_proc, swh_storage_postgresql): yield { "cls": "local", "db": "postgresql://{user}@{host}:{port}/{dbname}".format( host=postgresql_proc.host, port=postgresql_proc.port, user="postgres", dbname="tests", ), "objstorage": {"cls": "memory", "args": {}}, "journal_writer": {"cls": "memory",}, } @pytest.fixture def swh_storage(swh_storage_backend_config): return swh.storage.get_storage(cls="validate", storage=swh_storage_backend_config) @pytest.fixture def swh_contents(swh_storage): contents = gen_contents(n=20) swh_storage.content_add([c for c in contents if c["status"] != "absent"]) swh_storage.skipped_content_add([c for c in contents if c["status"] == "absent"]) return contents @pytest.fixture def swh_origins(swh_storage): origins = gen_origins(n=100) swh_storage.origin_add(origins) return origins # the postgres_fact factory fixture below is mostly a copy of the code # from pytest-postgresql. We need a custom version here to be able to # specify our version of the DBJanitor we use. def postgresql_fact(process_fixture_name, db_name=None, dump_files=DUMP_FILES): @pytest.fixture def postgresql_factory(request): """ Fixture factory for PostgreSQL. :param FixtureRequest request: fixture request object :rtype: psycopg2.connection :returns: postgresql client """ config = factories.get_config(request) if not psycopg2: raise ImportError("No module named psycopg2. Please install it.") proc_fixture = request.getfixturevalue(process_fixture_name) # _, config = try_import('psycopg2', request) pg_host = proc_fixture.host pg_port = proc_fixture.port pg_user = proc_fixture.user pg_options = proc_fixture.options pg_db = db_name or config["dbname"] with SwhDatabaseJanitor( pg_user, pg_host, pg_port, pg_db, proc_fixture.version, dump_files=dump_files, ): connection = psycopg2.connect( dbname=pg_db, user=pg_user, host=pg_host, port=pg_port, options=pg_options, ) yield connection connection.close() return postgresql_factory swh_storage_postgresql = postgresql_fact("postgresql_proc") # This version of the DatabaseJanitor implement a different setup/teardown # behavior than than the stock one: instead of dropping, creating and # initializing the database for each test, it create and initialize the db only # once, then it truncate the tables. This is needed to have acceptable test # performances. class SwhDatabaseJanitor(DatabaseJanitor): def __init__( self, user: str, host: str, port: str, db_name: str, version: Union[str, float, Version], dump_files: str = DUMP_FILES, ) -> None: super().__init__(user, host, port, db_name, version) self.dump_files = sorted(glob.glob(dump_files), key=sortkey) def db_setup(self): with psycopg2.connect( dbname=self.db_name, user=self.user, host=self.host, port=self.port, ) as cnx: with cnx.cursor() as cur: for fname in self.dump_files: with open(fname) as fobj: sql = fobj.read().replace("concurrently", "").strip() if sql: cur.execute(sql) cnx.commit() def db_reset(self): with psycopg2.connect( dbname=self.db_name, user=self.user, host=self.host, port=self.port, ) as cnx: with cnx.cursor() as cur: cur.execute( "SELECT table_name FROM information_schema.tables " "WHERE table_schema = %s", ("public",), ) tables = set(table for (table,) in cur.fetchall()) for table in tables: cur.execute("truncate table %s cascade" % table) cur.execute( "SELECT sequence_name FROM information_schema.sequences " "WHERE sequence_schema = %s", ("public",), ) seqs = set(seq for (seq,) in cur.fetchall()) for seq in seqs: cur.execute("ALTER SEQUENCE %s RESTART;" % seq) cnx.commit() def init(self): with self.cursor() as cur: cur.execute( "SELECT COUNT(1) FROM pg_database WHERE datname=%s;", (self.db_name,) ) db_exists = cur.fetchone()[0] == 1 if db_exists: cur.execute( "UPDATE pg_database SET datallowconn=true " "WHERE datname = %s;", (self.db_name,), ) if db_exists: self.db_reset() else: with self.cursor() as cur: cur.execute('CREATE DATABASE "{}";'.format(self.db_name)) self.db_setup() def drop(self): pid_column = "pid" with self.cursor() as cur: cur.execute( "UPDATE pg_database SET datallowconn=false " "WHERE datname = %s;", (self.db_name,), ) cur.execute( "SELECT pg_terminate_backend(pg_stat_activity.{})" "FROM pg_stat_activity " "WHERE pg_stat_activity.datname = %s;".format(pid_column), (self.db_name,), ) @pytest.fixture def sample_data() -> Dict: """Pre-defined sample storage object data to manipulate Returns: Dict of data (keys: content, directory, revision, release, person, origin) """ from .storage_data import data return { "content": [data.cont, data.cont2], "content_metadata": [data.cont3], "skipped_content": [data.skipped_cont, data.skipped_cont2], "person": [data.person], "directory": [data.dir2, data.dir], "revision": [data.revision, data.revision2, data.revision3], "release": [data.release, data.release2, data.release3], "snapshot": [data.snapshot], "origin": [data.origin, data.origin2], "tool": [data.metadata_tool], "provider": [data.provider], "origin_metadata": [data.origin_metadata, data.origin_metadata2], } diff --git a/swh/storage/tests/test_backfill.py b/swh/storage/tests/test_backfill.py new file mode 100644 index 00000000..85f5ec97 --- /dev/null +++ b/swh/storage/tests/test_backfill.py @@ -0,0 +1,160 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import pytest + +from swh.storage.backfill import JournalBackfiller, compute_query, PARTITION_KEY + + +TEST_CONFIG = { + "brokers": ["localhost"], + "prefix": "swh.tmp_journal.new", + "client_id": "swh.journal.client.test", + "storage_dbconn": "service=swh-dev", +} + + +def test_config_ko_missing_mandatory_key(): + """Missing configuration key will make the initialization fail + + """ + for key in TEST_CONFIG.keys(): + config = TEST_CONFIG.copy() + config.pop(key) + + with pytest.raises(ValueError) as e: + JournalBackfiller(config) + + error = "Configuration error: The following keys must be" " provided: %s" % ( + ",".join([key]), + ) + assert e.value.args[0] == error + + +def test_config_ko_unknown_object_type(): + """Parse arguments will fail if the object type is unknown + + """ + backfiller = JournalBackfiller(TEST_CONFIG) + with pytest.raises(ValueError) as e: + backfiller.parse_arguments("unknown-object-type", 1, 2) + + error = ( + "Object type unknown-object-type is not supported. " + "The only possible values are %s" % (", ".join(PARTITION_KEY)) + ) + assert e.value.args[0] == error + + +def test_compute_query_content(): + query, where_args, column_aliases = compute_query("content", "\x000000", "\x000001") + + assert where_args == ["\x000000", "\x000001"] + + assert column_aliases == [ + "sha1", + "sha1_git", + "sha256", + "blake2s256", + "length", + "status", + "ctime", + ] + + assert ( + query + == """ +select sha1,sha1_git,sha256,blake2s256,length,status,ctime +from content + +where (sha1) >= %s and (sha1) < %s + """ + ) + + +def test_compute_query_skipped_content(): + query, where_args, column_aliases = compute_query("skipped_content", None, None) + + assert where_args == [] + + assert column_aliases == [ + "sha1", + "sha1_git", + "sha256", + "blake2s256", + "length", + "ctime", + "status", + "reason", + ] + + assert ( + query + == """ +select sha1,sha1_git,sha256,blake2s256,length,ctime,status,reason +from skipped_content + + + """ + ) + + +def test_compute_query_origin_visit(): + query, where_args, column_aliases = compute_query("origin_visit", 1, 10) + + assert where_args == [1, 10] + + assert column_aliases == [ + "visit", + "origin.type", + "origin_visit.type", + "url", + "date", + "snapshot", + "status", + "metadata", + ] + + assert ( + query + == """ +select visit,origin.type,origin_visit.type,url,date,snapshot,status,metadata +from origin_visit +left join origin on origin_visit.origin=origin.id +where (origin_visit.origin) >= %s and (origin_visit.origin) < %s + """ + ) + + +def test_compute_query_release(): + query, where_args, column_aliases = compute_query("release", "\x000002", "\x000003") + + assert where_args == ["\x000002", "\x000003"] + + assert column_aliases == [ + "id", + "date", + "date_offset", + "comment", + "name", + "synthetic", + "date_neg_utc_offset", + "target", + "target_type", + "author_id", + "author_name", + "author_email", + "author_fullname", + ] + + assert ( + query + == """ +select release.id as id,date,date_offset,comment,release.name as name,synthetic,date_neg_utc_offset,target,target_type,a.id as author_id,a.name as author_name,a.email as author_email,a.fullname as author_fullname +from release +left join person a on release.author=a.id +where (release.id) >= %s and (release.id) < %s + """ # noqa + ) diff --git a/swh/storage/tests/test_cli.py b/swh/storage/tests/test_cli.py new file mode 100644 index 00000000..5653a51f --- /dev/null +++ b/swh/storage/tests/test_cli.py @@ -0,0 +1,107 @@ +# Copyright (C) 2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import copy +import logging +import re +import tempfile +import yaml + +from typing import Any, Dict +from unittest.mock import patch + +import pytest + +from click.testing import CliRunner +from confluent_kafka import Producer + +from swh.journal.serializers import key_to_kafka, value_to_kafka +from swh.storage import get_storage +from swh.storage.cli import storage as cli + + +logger = logging.getLogger(__name__) + + +CLI_CONFIG = { + "storage": {"cls": "memory",}, +} + + +@pytest.fixture +def storage(): + """An swh-storage object that gets injected into the CLI functions.""" + storage_config = {"cls": "pipeline", "steps": [{"cls": "memory"},]} + storage = get_storage(**storage_config) + with patch("swh.storage.cli.get_storage") as get_storage_mock: + get_storage_mock.return_value = storage + yield storage + + +@pytest.fixture +def monkeypatch_retry_sleep(monkeypatch): + from swh.journal.replay import copy_object, obj_in_objstorage + + monkeypatch.setattr(copy_object.retry, "sleep", lambda x: None) + monkeypatch.setattr(obj_in_objstorage.retry, "sleep", lambda x: None) + + +def invoke(*args, env=None, journal_config=None): + config = copy.deepcopy(CLI_CONFIG) + if journal_config: + config["journal_client"] = journal_config.copy() + config["journal_client"]["cls"] = "kafka" + + runner = CliRunner() + with tempfile.NamedTemporaryFile("a", suffix=".yml") as config_fd: + yaml.dump(config, config_fd) + config_fd.seek(0) + args = ["-C" + config_fd.name] + list(args) + ret = runner.invoke(cli, args, obj={"log_level": logging.DEBUG}, env=env,) + return ret + + +def test_replay( + storage, kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, +): + kafka_prefix += ".swh.journal.objects" + + producer = Producer( + { + "bootstrap.servers": kafka_server, + "client.id": "test-producer", + "acks": "all", + } + ) + + snapshot = { + "id": b"foo", + "branches": {b"HEAD": {"target_type": "revision", "target": b"\x01" * 20,}}, + } # type: Dict[str, Any] + producer.produce( + topic=kafka_prefix + ".snapshot", + key=key_to_kafka(snapshot["id"]), + value=value_to_kafka(snapshot), + ) + producer.flush() + + logger.debug("Flushed producer") + + result = invoke( + "replay", + "--stop-after-objects", + "1", + journal_config={ + "brokers": [kafka_server], + "group_id": kafka_consumer_group, + "prefix": kafka_prefix, + }, + ) + + expected = r"Done.\n" + assert result.exit_code == 0, result.output + assert re.fullmatch(expected, result.output, re.MULTILINE), result.output + + assert storage.snapshot_get(snapshot["id"]) == {**snapshot, "next_branch": None} diff --git a/swh/storage/tests/test_in_memory.py b/swh/storage/tests/test_in_memory.py index 0043df21..f226a277 100644 --- a/swh/storage/tests/test_in_memory.py +++ b/swh/storage/tests/test_in_memory.py @@ -1,21 +1,70 @@ # Copyright (C) 2018-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest +from swh.storage.in_memory import SortedList from swh.storage.tests.test_storage import TestStorage, TestStorageGeneratedData # noqa # tests are executed using imported classes (TestStorage and # TestStorageGeneratedData) using overloaded swh_storage fixture # below @pytest.fixture def swh_storage_backend_config(): yield { "cls": "memory", "journal_writer": {"cls": "memory",}, } + + +parametrize = pytest.mark.parametrize( + "items", + [ + [1, 2, 3, 4, 5, 6, 10, 100], + [10, 100, 6, 5, 4, 3, 2, 1], + [10, 4, 5, 6, 1, 2, 3, 100], + ], +) + + +@parametrize +def test_sorted_list_iter(items): + list1 = SortedList() + for item in items: + list1.add(item) + assert list(list1) == sorted(items) + + list2 = SortedList(items) + assert list(list2) == sorted(items) + + +@parametrize +def test_sorted_list_iter__key(items): + list1 = SortedList(key=lambda item: -item) + for item in items: + list1.add(item) + assert list(list1) == list(reversed(sorted(items))) + + list2 = SortedList(items, key=lambda item: -item) + assert list(list2) == list(reversed(sorted(items))) + + +@parametrize +def test_sorted_list_iter_from(items): + list_ = SortedList(items) + for split in items: + expected = sorted(item for item in items if item >= split) + assert list(list_.iter_from(split)) == expected, f"split: {split}" + + +@parametrize +def test_sorted_list_iter_from__key(items): + list_ = SortedList(items, key=lambda item: -item) + for split in items: + expected = reversed(sorted(item for item in items if item <= split)) + assert list(list_.iter_from(-split)) == list(expected), f"split: {split}" diff --git a/swh/storage/tests/test_kafka_writer.py b/swh/storage/tests/test_kafka_writer.py new file mode 100644 index 00000000..e48ab32e --- /dev/null +++ b/swh/storage/tests/test_kafka_writer.py @@ -0,0 +1,60 @@ +# Copyright (C) 2018-2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from confluent_kafka import Consumer + +from swh.storage import get_storage +from swh.model.model import Origin, OriginVisit +from swh.journal.pytest_plugin import consume_messages, assert_all_objects_consumed +from swh.journal.tests.journal_data import TEST_OBJECTS + + +def test_storage_direct_writer(kafka_prefix: str, kafka_server, consumer: Consumer): + kafka_prefix += ".swh.journal.objects" + + writer_config = { + "cls": "kafka", + "brokers": [kafka_server], + "client_id": "kafka_writer", + "prefix": kafka_prefix, + } + storage_config = { + "cls": "pipeline", + "steps": [{"cls": "memory", "journal_writer": writer_config},], + } + + storage = get_storage(**storage_config) + + expected_messages = 0 + + for object_type, objects in TEST_OBJECTS.items(): + method = getattr(storage, object_type + "_add") + if object_type in ( + "content", + "directory", + "revision", + "release", + "snapshot", + "origin", + ): + method(objects) + expected_messages += len(objects) + elif object_type in ("origin_visit",): + for obj in objects: + assert isinstance(obj, OriginVisit) + storage.origin_add_one(Origin(url=obj.origin)) + visit = method(obj.origin, date=obj.date, type=obj.type) + expected_messages += 1 + + obj_d = obj.to_dict() + for k in ("visit", "origin", "date", "type"): + del obj_d[k] + storage.origin_visit_update(obj.origin, visit.visit, **obj_d) + expected_messages += 1 + else: + assert False, object_type + + consumed_messages = consume_messages(consumer, kafka_prefix, expected_messages) + assert_all_objects_consumed(consumed_messages) diff --git a/swh/storage/tests/test_replay.py b/swh/storage/tests/test_replay.py new file mode 100644 index 00000000..48be2397 --- /dev/null +++ b/swh/storage/tests/test_replay.py @@ -0,0 +1,388 @@ +# Copyright (C) 2019-2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import datetime +import functools +import random +import logging +import dateutil + +from typing import Dict, List + +from confluent_kafka import Producer + +import pytest + +from swh.model.hashutil import hash_to_hex +from swh.model.model import Content + +from swh.storage import get_storage +from swh.storage.replay import process_replay_objects + +from swh.journal.serializers import key_to_kafka, value_to_kafka +from swh.journal.client import JournalClient + +from swh.journal.tests.utils import MockedJournalClient, MockedKafkaWriter +from swh.journal.tests.conftest import ( + TEST_OBJECT_DICTS, + DUPLICATE_CONTENTS, +) + + +storage_config = {"cls": "pipeline", "steps": [{"cls": "memory"},]} + + +def test_storage_play( + kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, caplog, +): + """Optimal replayer scenario. + + This: + - writes objects to the topic + - replayer consumes objects from the topic and replay them + + """ + kafka_prefix += ".swh.journal.objects" + + storage = get_storage(**storage_config) + + producer = Producer( + { + "bootstrap.servers": kafka_server, + "client.id": "test producer", + "acks": "all", + } + ) + + now = datetime.datetime.now(tz=datetime.timezone.utc) + + # Fill Kafka + nb_sent = 0 + nb_visits = 0 + for object_type, objects in TEST_OBJECT_DICTS.items(): + topic = f"{kafka_prefix}.{object_type}" + for object_ in objects: + key = bytes(random.randint(0, 255) for _ in range(40)) + object_ = object_.copy() + if object_type == "content": + object_["ctime"] = now + elif object_type == "origin_visit": + nb_visits += 1 + object_["visit"] = nb_visits + producer.produce( + topic=topic, key=key_to_kafka(key), value=value_to_kafka(object_), + ) + nb_sent += 1 + + producer.flush() + + caplog.set_level(logging.ERROR, "swh.journal.replay") + # Fill the storage from Kafka + replayer = JournalClient( + brokers=kafka_server, + group_id=kafka_consumer_group, + prefix=kafka_prefix, + stop_after_objects=nb_sent, + ) + worker_fn = functools.partial(process_replay_objects, storage=storage) + nb_inserted = 0 + while nb_inserted < nb_sent: + nb_inserted += replayer.process(worker_fn) + assert nb_sent == nb_inserted + + # Check the objects were actually inserted in the storage + assert TEST_OBJECT_DICTS["revision"] == list( + storage.revision_get([rev["id"] for rev in TEST_OBJECT_DICTS["revision"]]) + ) + assert TEST_OBJECT_DICTS["release"] == list( + storage.release_get([rel["id"] for rel in TEST_OBJECT_DICTS["release"]]) + ) + + origins = list(storage.origin_get([orig for orig in TEST_OBJECT_DICTS["origin"]])) + assert TEST_OBJECT_DICTS["origin"] == [{"url": orig["url"]} for orig in origins] + for origin in origins: + origin_url = origin["url"] + expected_visits = [ + { + **visit, + "origin": origin_url, + "date": dateutil.parser.parse(visit["date"]), + } + for visit in TEST_OBJECT_DICTS["origin_visit"] + if visit["origin"] == origin["url"] + ] + actual_visits = list(storage.origin_visit_get(origin_url)) + for visit in actual_visits: + del visit["visit"] # opaque identifier + assert expected_visits == actual_visits + + input_contents = TEST_OBJECT_DICTS["content"] + contents = storage.content_get_metadata([cont["sha1"] for cont in input_contents]) + assert len(contents) == len(input_contents) + assert contents == {cont["sha1"]: [cont] for cont in input_contents} + + collision = 0 + for record in caplog.records: + logtext = record.getMessage() + if "Colliding contents:" in logtext: + collision += 1 + + assert collision == 0, "No collision should be detected" + + +def test_storage_play_with_collision( + kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, caplog, +): + """Another replayer scenario with collisions. + + This: + - writes objects to the topic, including colliding contents + - replayer consumes objects from the topic and replay them + - This drops the colliding contents from the replay when detected + + """ + kafka_prefix += ".swh.journal.objects" + + storage = get_storage(**storage_config) + + producer = Producer( + { + "bootstrap.servers": kafka_server, + "client.id": "test producer", + "enable.idempotence": "true", + } + ) + + now = datetime.datetime.now(tz=datetime.timezone.utc) + + # Fill Kafka + nb_sent = 0 + nb_visits = 0 + for object_type, objects in TEST_OBJECT_DICTS.items(): + topic = f"{kafka_prefix}.{object_type}" + for object_ in objects: + key = bytes(random.randint(0, 255) for _ in range(40)) + object_ = object_.copy() + if object_type == "content": + object_["ctime"] = now + elif object_type == "origin_visit": + nb_visits += 1 + object_["visit"] = nb_visits + producer.produce( + topic=topic, key=key_to_kafka(key), value=value_to_kafka(object_), + ) + nb_sent += 1 + + # Create collision in input data + # They are not written in the destination + for content in DUPLICATE_CONTENTS: + topic = f"{kafka_prefix}.content" + producer.produce( + topic=topic, key=key_to_kafka(key), value=value_to_kafka(content), + ) + + nb_sent += 1 + + producer.flush() + + caplog.set_level(logging.ERROR, "swh.journal.replay") + # Fill the storage from Kafka + replayer = JournalClient( + brokers=kafka_server, + group_id=kafka_consumer_group, + prefix=kafka_prefix, + stop_after_objects=nb_sent, + ) + worker_fn = functools.partial(process_replay_objects, storage=storage) + nb_inserted = 0 + while nb_inserted < nb_sent: + nb_inserted += replayer.process(worker_fn) + assert nb_sent == nb_inserted + + # Check the objects were actually inserted in the storage + assert TEST_OBJECT_DICTS["revision"] == list( + storage.revision_get([rev["id"] for rev in TEST_OBJECT_DICTS["revision"]]) + ) + assert TEST_OBJECT_DICTS["release"] == list( + storage.release_get([rel["id"] for rel in TEST_OBJECT_DICTS["release"]]) + ) + + origins = list(storage.origin_get([orig for orig in TEST_OBJECT_DICTS["origin"]])) + assert TEST_OBJECT_DICTS["origin"] == [{"url": orig["url"]} for orig in origins] + for origin in origins: + origin_url = origin["url"] + expected_visits = [ + { + **visit, + "origin": origin_url, + "date": dateutil.parser.parse(visit["date"]), + } + for visit in TEST_OBJECT_DICTS["origin_visit"] + if visit["origin"] == origin["url"] + ] + actual_visits = list(storage.origin_visit_get(origin_url)) + for visit in actual_visits: + del visit["visit"] # opaque identifier + assert expected_visits == actual_visits + + input_contents = TEST_OBJECT_DICTS["content"] + contents = storage.content_get_metadata([cont["sha1"] for cont in input_contents]) + assert len(contents) == len(input_contents) + assert contents == {cont["sha1"]: [cont] for cont in input_contents} + + nb_collisions = 0 + + actual_collision: Dict + for record in caplog.records: + logtext = record.getMessage() + if "Collision detected:" in logtext: + nb_collisions += 1 + actual_collision = record.args["collision"] + + assert nb_collisions == 1, "1 collision should be detected" + + algo = "sha1" + assert actual_collision["algo"] == algo + expected_colliding_hash = hash_to_hex(DUPLICATE_CONTENTS[0][algo]) + assert actual_collision["hash"] == expected_colliding_hash + + actual_colliding_hashes = actual_collision["objects"] + assert len(actual_colliding_hashes) == len(DUPLICATE_CONTENTS) + for content in DUPLICATE_CONTENTS: + expected_content_hashes = { + k: hash_to_hex(v) for k, v in Content.from_dict(content).hashes().items() + } + assert expected_content_hashes in actual_colliding_hashes + + +def _test_write_replay_origin_visit(visits: List[Dict]): + """Helper function to write tests for origin_visit. + + Each visit (a dict) given in the 'visits' argument will be sent to + a (mocked) kafka queue, which a in-memory-storage backed replayer is + listening to. + + Check that corresponding origin visits entities are present in the storage + and have correct values if they are not skipped. + + """ + queue: List = [] + replayer = MockedJournalClient(queue) + writer = MockedKafkaWriter(queue) + + # Note that flipping the order of these two insertions will crash + # the test, because the legacy origin_format does not allow to create + # the origin when needed (type is missing) + writer.send( + "origin", + "foo", + { + "url": "http://example.com/", + "type": "git", # test the legacy origin format is accepted + }, + ) + for visit in visits: + writer.send("origin_visit", "foo", visit) + + queue_size = len(queue) + assert replayer.stop_after_objects is None + replayer.stop_after_objects = queue_size + + storage = get_storage(**storage_config) + worker_fn = functools.partial(process_replay_objects, storage=storage) + + replayer.process(worker_fn) + + actual_visits = list(storage.origin_visit_get("http://example.com/")) + + assert len(actual_visits) == len(visits), actual_visits + + for vin, vout in zip(visits, actual_visits): + vin = vin.copy() + vout = vout.copy() + assert vout.pop("origin") == "http://example.com/" + vin.pop("origin") + vin.setdefault("type", "git") + vin.setdefault("metadata", None) + assert vin == vout + + +def test_write_replay_origin_visit(): + """Test origin_visit when the 'origin' is just a string.""" + now = datetime.datetime.now() + visits = [ + { + "visit": 1, + "origin": "http://example.com/", + "date": now, + "type": "git", + "status": "partial", + "snapshot": None, + } + ] + _test_write_replay_origin_visit(visits) + + +def test_write_replay_legacy_origin_visit1(): + """Origin_visit with no types should make the replayer crash + + We expect the journal's origin_visit topic to no longer reference such + visits. If it does, the replayer must crash so we can fix the journal's + topic. + + """ + now = datetime.datetime.now() + visit = { + "visit": 1, + "origin": "http://example.com/", + "date": now, + "status": "partial", + "snapshot": None, + } + now2 = datetime.datetime.now() + visit2 = { + "visit": 2, + "origin": {"url": "http://example.com/"}, + "date": now2, + "status": "partial", + "snapshot": None, + } + + for origin_visit in [visit, visit2]: + with pytest.raises(ValueError, match="Old origin visit format"): + _test_write_replay_origin_visit([origin_visit]) + + +def test_write_replay_legacy_origin_visit2(): + """Test origin_visit when 'type' is missing from the visit, but not + from the origin.""" + now = datetime.datetime.now() + visits = [ + { + "visit": 1, + "origin": {"url": "http://example.com/", "type": "git",}, + "date": now, + "type": "git", + "status": "partial", + "snapshot": None, + } + ] + _test_write_replay_origin_visit(visits) + + +def test_write_replay_legacy_origin_visit3(): + """Test origin_visit when the origin is a dict""" + now = datetime.datetime.now() + visits = [ + { + "visit": 1, + "origin": {"url": "http://example.com/",}, + "date": now, + "type": "git", + "status": "partial", + "snapshot": None, + } + ] + _test_write_replay_origin_visit(visits) diff --git a/swh/storage/tests/test_retry.py b/swh/storage/tests/test_retry.py index 188cf9e3..5b3f231f 100644 --- a/swh/storage/tests/test_retry.py +++ b/swh/storage/tests/test_retry.py @@ -1,1082 +1,1039 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Dict from unittest.mock import call import psycopg2 import pytest from swh.model.model import ( Content, Directory, Release, Revision, Snapshot, SkippedContent, Origin, ) from swh.storage import get_storage from swh.storage.exc import HashCollision, StorageArgumentException from .storage_data import date_visit1 +@pytest.fixture +def monkeypatch_sleep(monkeypatch, swh_storage): + """In test context, we don't want to wait, make test faster + + """ + from swh.storage.retry import RetryingProxyStorage + + for method_name, method in RetryingProxyStorage.__dict__.items(): + if "_add" in method_name or "_update" in method_name: + monkeypatch.setattr(method.retry, "sleep", lambda x: None) + + return monkeypatch + + @pytest.fixture def fake_hash_collision(sample_data): return HashCollision("sha1", "38762cf7f55934b34d179ae6a4c80cadccbb7f0a", []) @pytest.fixture def swh_storage(): storage_config = { "cls": "pipeline", "steps": [{"cls": "validate"}, {"cls": "retry"}, {"cls": "memory"},], } return get_storage(**storage_config) def test_retrying_proxy_storage_content_add(swh_storage, sample_data): """Standard content_add works as before """ sample_content = sample_data["content"][0] content = next(swh_storage.content_get([sample_content["sha1"]])) assert not content s = swh_storage.content_add([sample_content]) assert s == { "content:add": 1, "content:add:bytes": sample_content["length"], } content = next(swh_storage.content_get([sample_content["sha1"]])) assert content["sha1"] == sample_content["sha1"] def test_retrying_proxy_storage_content_add_with_retry( - swh_storage, sample_data, mocker, fake_hash_collision + monkeypatch_sleep, swh_storage, sample_data, mocker, fake_hash_collision, ): """Multiple retries for hash collision and psycopg2 error but finally ok """ mock_memory = mocker.patch("swh.storage.in_memory.InMemoryStorage.content_add") mock_memory.side_effect = [ # first try goes ko fake_hash_collision, # second try goes ko psycopg2.IntegrityError("content already inserted"), # ok then! {"content:add": 1}, ] - mock_sleep = mocker.patch( - "swh.storage.retry.RetryingProxyStorage" ".content_add.retry.sleep" - ) sample_content = sample_data["content"][0] content = next(swh_storage.content_get([sample_content["sha1"]])) assert not content s = swh_storage.content_add([sample_content]) assert s == {"content:add": 1} mock_memory.assert_has_calls( [ call([Content.from_dict(sample_content)]), call([Content.from_dict(sample_content)]), call([Content.from_dict(sample_content)]), ] ) - assert mock_sleep.call_count == 2 def test_retrying_proxy_swh_storage_content_add_failure( swh_storage, sample_data, mocker ): """Unfiltered errors are raising without retry """ mock_memory = mocker.patch("swh.storage.in_memory.InMemoryStorage.content_add") mock_memory.side_effect = StorageArgumentException("Refuse to add content always!") sample_content = sample_data["content"][0] content = next(swh_storage.content_get([sample_content["sha1"]])) assert not content with pytest.raises(StorageArgumentException, match="Refuse to add"): swh_storage.content_add([sample_content]) assert mock_memory.call_count == 1 def test_retrying_proxy_storage_content_add_metadata(swh_storage, sample_data): """Standard content_add_metadata works as before """ sample_content = sample_data["content_metadata"][0] pk = sample_content["sha1"] content_metadata = swh_storage.content_get_metadata([pk]) assert not content_metadata[pk] s = swh_storage.content_add_metadata([sample_content]) assert s == { "content:add": 1, } content_metadata = swh_storage.content_get_metadata([pk]) assert len(content_metadata[pk]) == 1 assert content_metadata[pk][0]["sha1"] == pk def test_retrying_proxy_storage_content_add_metadata_with_retry( - swh_storage, sample_data, mocker, fake_hash_collision + monkeypatch_sleep, swh_storage, sample_data, mocker, fake_hash_collision ): """Multiple retries for hash collision and psycopg2 error but finally ok """ mock_memory = mocker.patch( "swh.storage.in_memory.InMemoryStorage.content_add_metadata" ) mock_memory.side_effect = [ # first try goes ko fake_hash_collision, # second try goes ko psycopg2.IntegrityError("content_metadata already inserted"), # ok then! {"content:add": 1}, ] - mock_sleep = mocker.patch( - "swh.storage.retry.RetryingProxyStorage" ".content_add_metadata.retry.sleep" - ) sample_content = sample_data["content_metadata"][0] s = swh_storage.content_add_metadata([sample_content]) assert s == {"content:add": 1} mock_memory.assert_has_calls( [ call([Content.from_dict(sample_content)]), call([Content.from_dict(sample_content)]), call([Content.from_dict(sample_content)]), ] ) - assert mock_sleep.call_count == 2 def test_retrying_proxy_swh_storage_content_add_metadata_failure( swh_storage, sample_data, mocker ): """Unfiltered errors are raising without retry """ mock_memory = mocker.patch( "swh.storage.in_memory.InMemoryStorage.content_add_metadata" ) mock_memory.side_effect = StorageArgumentException( "Refuse to add content_metadata!" ) sample_content = sample_data["content_metadata"][0] pk = sample_content["sha1"] content_metadata = swh_storage.content_get_metadata([pk]) assert not content_metadata[pk] with pytest.raises(StorageArgumentException, match="Refuse to add"): swh_storage.content_add_metadata([sample_content]) assert mock_memory.call_count == 1 def test_retrying_proxy_storage_skipped_content_add(swh_storage, sample_data): """Standard skipped_content_add works as before """ sample_content = sample_data["skipped_content"][0] skipped_contents = list(swh_storage.skipped_content_missing([sample_content])) assert len(skipped_contents) == 1 s = swh_storage.skipped_content_add([sample_content]) assert s == { "skipped_content:add": 1, } skipped_content = list(swh_storage.skipped_content_missing([sample_content])) assert len(skipped_content) == 0 def test_retrying_proxy_storage_skipped_content_add_with_retry( - swh_storage, sample_data, mocker, fake_hash_collision + monkeypatch_sleep, swh_storage, sample_data, mocker, fake_hash_collision ): """Multiple retries for hash collision and psycopg2 error but finally ok """ mock_memory = mocker.patch( "swh.storage.in_memory.InMemoryStorage.skipped_content_add" ) mock_memory.side_effect = [ # 1st & 2nd try goes ko fake_hash_collision, psycopg2.IntegrityError("skipped_content already inserted"), # ok then! {"skipped_content:add": 1}, ] - mock_sleep = mocker.patch( - "swh.storage.retry.RetryingProxyStorage" ".skipped_content_add.retry.sleep" - ) sample_content = sample_data["skipped_content"][0] s = swh_storage.skipped_content_add([sample_content]) assert s == {"skipped_content:add": 1} mock_memory.assert_has_calls( [ call([SkippedContent.from_dict(sample_content)]), call([SkippedContent.from_dict(sample_content)]), call([SkippedContent.from_dict(sample_content)]), ] ) - assert mock_sleep.call_count == 2 def test_retrying_proxy_swh_storage_skipped_content_add_failure( swh_storage, sample_data, mocker ): """Unfiltered errors are raising without retry """ mock_memory = mocker.patch( "swh.storage.in_memory.InMemoryStorage.skipped_content_add" ) mock_memory.side_effect = StorageArgumentException( "Refuse to add content_metadata!" ) sample_content = sample_data["skipped_content"][0] skipped_contents = list(swh_storage.skipped_content_missing([sample_content])) assert len(skipped_contents) == 1 with pytest.raises(StorageArgumentException, match="Refuse to add"): swh_storage.skipped_content_add([sample_content]) skipped_contents = list(swh_storage.skipped_content_missing([sample_content])) assert len(skipped_contents) == 1 assert mock_memory.call_count == 1 def test_retrying_proxy_swh_storage_origin_add_one(swh_storage, sample_data): """Standard origin_add_one works as before """ sample_origin = sample_data["origin"][0] origin = swh_storage.origin_get(sample_origin) assert not origin r = swh_storage.origin_add_one(sample_origin) assert r == sample_origin["url"] origin = swh_storage.origin_get(sample_origin) assert origin["url"] == sample_origin["url"] def test_retrying_proxy_swh_storage_origin_add_one_retry( - swh_storage, sample_data, mocker, fake_hash_collision + monkeypatch_sleep, swh_storage, sample_data, mocker, fake_hash_collision ): """Multiple retries for hash collision and psycopg2 error but finally ok """ sample_origin = sample_data["origin"][1] mock_memory = mocker.patch("swh.storage.in_memory.InMemoryStorage.origin_add_one") mock_memory.side_effect = [ # first try goes ko fake_hash_collision, # second try goes ko psycopg2.IntegrityError("origin already inserted"), # ok then! sample_origin["url"], ] - mock_sleep = mocker.patch( - "swh.storage.retry.RetryingProxyStorage" ".origin_add_one.retry.sleep" - ) origin = swh_storage.origin_get(sample_origin) assert not origin r = swh_storage.origin_add_one(sample_origin) assert r == sample_origin["url"] mock_memory.assert_has_calls( [ call(Origin.from_dict(sample_origin)), call(Origin.from_dict(sample_origin)), call(Origin.from_dict(sample_origin)), ] ) - assert mock_sleep.call_count == 2 def test_retrying_proxy_swh_storage_origin_add_one_failure( swh_storage, sample_data, mocker ): """Unfiltered errors are raising without retry """ mock_memory = mocker.patch("swh.storage.in_memory.InMemoryStorage.origin_add_one") mock_memory.side_effect = StorageArgumentException("Refuse to add origin always!") sample_origin = sample_data["origin"][0] origin = swh_storage.origin_get(sample_origin) assert not origin with pytest.raises(StorageArgumentException, match="Refuse to add"): swh_storage.origin_add_one(sample_origin) assert mock_memory.call_count == 1 def test_retrying_proxy_swh_storage_origin_visit_add(swh_storage, sample_data): """Standard origin_visit_add works as before """ sample_origin = sample_data["origin"][0] origin_url = swh_storage.origin_add_one(sample_origin) origin = list(swh_storage.origin_visit_get(origin_url)) assert not origin origin_visit = swh_storage.origin_visit_add(origin_url, date_visit1, "hg") assert origin_visit.origin == origin_url assert isinstance(origin_visit.visit, int) origin_visit = next(swh_storage.origin_visit_get(origin_url)) assert origin_visit["origin"] == origin_url assert isinstance(origin_visit["visit"], int) def test_retrying_proxy_swh_storage_origin_visit_add_retry( - swh_storage, sample_data, mocker, fake_hash_collision + monkeypatch_sleep, swh_storage, sample_data, mocker, fake_hash_collision ): """Multiple retries for hash collision and psycopg2 error but finally ok """ sample_origin = sample_data["origin"][1] origin_url = swh_storage.origin_add_one(sample_origin) mock_memory = mocker.patch("swh.storage.in_memory.InMemoryStorage.origin_visit_add") mock_memory.side_effect = [ # first try goes ko fake_hash_collision, # second try goes ko psycopg2.IntegrityError("origin already inserted"), # ok then! {"origin": origin_url, "visit": 1}, ] - mock_sleep = mocker.patch( - "swh.storage.retry.RetryingProxyStorage" ".origin_visit_add.retry.sleep" - ) origin = list(swh_storage.origin_visit_get(origin_url)) assert not origin r = swh_storage.origin_visit_add(origin_url, date_visit1, "git") assert r == {"origin": origin_url, "visit": 1} mock_memory.assert_has_calls( [ call(origin_url, date_visit1, "git"), call(origin_url, date_visit1, "git"), call(origin_url, date_visit1, "git"), ] ) - assert mock_sleep.call_count == 2 def test_retrying_proxy_swh_storage_origin_visit_add_failure( swh_storage, sample_data, mocker ): """Unfiltered errors are raising without retry """ mock_memory = mocker.patch("swh.storage.in_memory.InMemoryStorage.origin_visit_add") mock_memory.side_effect = StorageArgumentException("Refuse to add origin always!") origin_url = sample_data["origin"][0]["url"] origin = list(swh_storage.origin_visit_get(origin_url)) assert not origin with pytest.raises(StorageArgumentException, match="Refuse to add"): swh_storage.origin_visit_add(origin_url, date_visit1, "svn") mock_memory.assert_has_calls( [call(origin_url, date_visit1, "svn"),] ) def test_retrying_proxy_storage_tool_add(swh_storage, sample_data): """Standard tool_add works as before """ sample_tool = sample_data["tool"][0] tool = swh_storage.tool_get(sample_tool) assert not tool tools = swh_storage.tool_add([sample_tool]) assert tools tool = tools[0] tool.pop("id") assert tool == sample_tool tool = swh_storage.tool_get(sample_tool) tool.pop("id") assert tool == sample_tool def test_retrying_proxy_storage_tool_add_with_retry( - swh_storage, sample_data, mocker, fake_hash_collision + monkeypatch_sleep, swh_storage, sample_data, mocker, fake_hash_collision ): """Multiple retries for hash collision and psycopg2 error but finally ok """ sample_tool = sample_data["tool"][0] mock_memory = mocker.patch("swh.storage.in_memory.InMemoryStorage.tool_add") mock_memory.side_effect = [ # first try goes ko fake_hash_collision, # second try goes ko psycopg2.IntegrityError("tool already inserted"), # ok then! [sample_tool], ] - mock_sleep = mocker.patch( - "swh.storage.retry.RetryingProxyStorage" ".tool_add.retry.sleep" - ) tool = swh_storage.tool_get(sample_tool) assert not tool tools = swh_storage.tool_add([sample_tool]) assert tools == [sample_tool] mock_memory.assert_has_calls( [call([sample_tool]), call([sample_tool]), call([sample_tool]),] ) - assert mock_sleep.call_count == 2 def test_retrying_proxy_swh_storage_tool_add_failure(swh_storage, sample_data, mocker): """Unfiltered errors are raising without retry """ mock_memory = mocker.patch("swh.storage.in_memory.InMemoryStorage.tool_add") mock_memory.side_effect = StorageArgumentException("Refuse to add tool always!") sample_tool = sample_data["tool"][0] tool = swh_storage.tool_get(sample_tool) assert not tool with pytest.raises(StorageArgumentException, match="Refuse to add"): swh_storage.tool_add([sample_tool]) assert mock_memory.call_count == 1 def to_provider(provider: Dict) -> Dict: return { "provider_name": provider["name"], "provider_url": provider["url"], "provider_type": provider["type"], "metadata": provider["metadata"], } def test_retrying_proxy_storage_metadata_provider_add(swh_storage, sample_data): """Standard metadata_provider_add works as before """ provider = sample_data["provider"][0] provider_get = to_provider(provider) provider = swh_storage.metadata_provider_get_by(provider_get) assert not provider provider_id = swh_storage.metadata_provider_add(**provider_get) assert provider_id actual_provider = swh_storage.metadata_provider_get(provider_id) assert actual_provider actual_provider_id = actual_provider.pop("id") assert actual_provider_id == provider_id assert actual_provider == provider_get def test_retrying_proxy_storage_metadata_provider_add_with_retry( - swh_storage, sample_data, mocker, fake_hash_collision + monkeypatch_sleep, swh_storage, sample_data, mocker, fake_hash_collision ): """Multiple retries for hash collision and psycopg2 error but finally ok """ provider = sample_data["provider"][0] provider_get = to_provider(provider) mock_memory = mocker.patch( "swh.storage.in_memory.InMemoryStorage.metadata_provider_add" ) mock_memory.side_effect = [ # first try goes ko fake_hash_collision, # second try goes ko psycopg2.IntegrityError("provider_id already inserted"), # ok then! "provider_id", ] - mock_sleep = mocker.patch( - "swh.storage.retry.RetryingProxyStorage" ".metadata_provider_add.retry.sleep" - ) provider = swh_storage.metadata_provider_get_by(provider_get) assert not provider provider_id = swh_storage.metadata_provider_add(**provider_get) assert provider_id == "provider_id" provider_arg_names = ("provider_name", "provider_type", "provider_url", "metadata") provider_args = [provider_get[key] for key in provider_arg_names] mock_memory.assert_has_calls( [call(*provider_args), call(*provider_args), call(*provider_args),] ) - assert mock_sleep.call_count == 2 def test_retrying_proxy_swh_storage_metadata_provider_add_failure( swh_storage, sample_data, mocker ): """Unfiltered errors are raising without retry """ mock_memory = mocker.patch( "swh.storage.in_memory.InMemoryStorage.metadata_provider_add" ) mock_memory.side_effect = StorageArgumentException( "Refuse to add provider_id always!" ) provider = sample_data["provider"][0] provider_get = to_provider(provider) provider_id = swh_storage.metadata_provider_get_by(provider_get) assert not provider_id with pytest.raises(StorageArgumentException, match="Refuse to add"): swh_storage.metadata_provider_add(**provider_get) assert mock_memory.call_count == 1 def test_retrying_proxy_storage_origin_metadata_add(swh_storage, sample_data): """Standard origin_metadata_add works as before """ ori_meta = sample_data["origin_metadata"][0] origin = ori_meta["origin"] swh_storage.origin_add_one(origin) provider_get = to_provider(ori_meta["provider"]) provider_id = swh_storage.metadata_provider_add(**provider_get) origin_metadata = swh_storage.origin_metadata_get_by(origin["url"]) assert not origin_metadata swh_storage.origin_metadata_add( origin["url"], ori_meta["discovery_date"], provider_id, ori_meta["tool"], ori_meta["metadata"], ) origin_metadata = swh_storage.origin_metadata_get_by(origin["url"]) assert origin_metadata def test_retrying_proxy_storage_origin_metadata_add_with_retry( - swh_storage, sample_data, mocker, fake_hash_collision + monkeypatch_sleep, swh_storage, sample_data, mocker, fake_hash_collision ): """Multiple retries for hash collision and psycopg2 error but finally ok """ ori_meta = sample_data["origin_metadata"][0] origin = ori_meta["origin"] swh_storage.origin_add_one(origin) provider_get = to_provider(ori_meta["provider"]) provider_id = swh_storage.metadata_provider_add(**provider_get) mock_memory = mocker.patch( "swh.storage.in_memory.InMemoryStorage.origin_metadata_add" ) mock_memory.side_effect = [ # first try goes ko fake_hash_collision, # second try goes ko psycopg2.IntegrityError("provider_id already inserted"), # ok then! None, ] - mock_sleep = mocker.patch( - "swh.storage.retry.RetryingProxyStorage" ".origin_metadata_add.retry.sleep" - ) - url = origin["url"] ts = ori_meta["discovery_date"] tool_id = ori_meta["tool"] metadata = ori_meta["metadata"] # No exception raised as insertion finally came through swh_storage.origin_metadata_add(url, ts, provider_id, tool_id, metadata) mock_memory.assert_has_calls( [ # 3 calls, as long as error raised call(url, ts, provider_id, tool_id, metadata), call(url, ts, provider_id, tool_id, metadata), call(url, ts, provider_id, tool_id, metadata), ] ) - assert mock_sleep.call_count == 2 def test_retrying_proxy_swh_storage_origin_metadata_add_failure( swh_storage, sample_data, mocker ): """Unfiltered errors are raising without retry """ mock_memory = mocker.patch( "swh.storage.in_memory.InMemoryStorage.origin_metadata_add" ) mock_memory.side_effect = StorageArgumentException("Refuse to add always!") ori_meta = sample_data["origin_metadata"][0] origin = ori_meta["origin"] swh_storage.origin_add_one(origin) url = origin["url"] ts = ori_meta["discovery_date"] provider_id = "provider_id" tool_id = ori_meta["tool"] metadata = ori_meta["metadata"] with pytest.raises(StorageArgumentException, match="Refuse to add"): swh_storage.origin_metadata_add(url, ts, provider_id, tool_id, metadata) assert mock_memory.call_count == 1 def test_retrying_proxy_swh_storage_origin_visit_update(swh_storage, sample_data): """Standard origin_visit_update works as before """ sample_origin = sample_data["origin"][0] origin_url = swh_storage.origin_add_one(sample_origin) origin_visit = swh_storage.origin_visit_add(origin_url, date_visit1, "hg") ov = next(swh_storage.origin_visit_get(origin_url)) assert ov["origin"] == origin_url assert ov["visit"] == origin_visit.visit assert ov["status"] == "ongoing" assert ov["snapshot"] is None assert ov["metadata"] is None swh_storage.origin_visit_update(origin_url, origin_visit.visit, status="full") ov = next(swh_storage.origin_visit_get(origin_url)) assert ov["origin"] == origin_url assert ov["visit"] == origin_visit.visit assert ov["status"] == "full" assert ov["snapshot"] is None assert ov["metadata"] is None def test_retrying_proxy_swh_storage_origin_visit_update_retry( - swh_storage, sample_data, mocker, fake_hash_collision + monkeypatch_sleep, swh_storage, sample_data, mocker, fake_hash_collision ): """Multiple retries for hash collision and psycopg2 error but finally ok """ sample_origin = sample_data["origin"][1] origin_url = sample_origin["url"] mock_memory = mocker.patch( "swh.storage.in_memory.InMemoryStorage.origin_visit_update" ) mock_memory.side_effect = [ # first try goes ko fake_hash_collision, # second try goes ko psycopg2.IntegrityError("origin already inserted"), # ok then! {"origin": origin_url, "visit": 1}, ] - mock_sleep = mocker.patch( - "swh.storage.retry.RetryingProxyStorage" ".origin_visit_update.retry.sleep" - ) - visit_id = 1 swh_storage.origin_visit_update(origin_url, visit_id, status="full") mock_memory.assert_has_calls( [ call(origin_url, visit_id, metadata=None, snapshot=None, status="full"), call(origin_url, visit_id, metadata=None, snapshot=None, status="full"), call(origin_url, visit_id, metadata=None, snapshot=None, status="full"), ] ) - assert mock_sleep.call_count == 2 def test_retrying_proxy_swh_storage_origin_visit_update_failure( swh_storage, sample_data, mocker ): """Unfiltered errors are raising without retry """ mock_memory = mocker.patch( "swh.storage.in_memory.InMemoryStorage.origin_visit_update" ) mock_memory.side_effect = StorageArgumentException("Refuse to add origin always!") origin_url = sample_data["origin"][0]["url"] visit_id = 9 with pytest.raises(StorageArgumentException, match="Refuse to add"): swh_storage.origin_visit_update(origin_url, visit_id, "partial") assert mock_memory.call_count == 1 def test_retrying_proxy_storage_directory_add(swh_storage, sample_data): """Standard directory_add works as before """ sample_dir = sample_data["directory"][0] directory = swh_storage.directory_get_random() # no directory assert not directory s = swh_storage.directory_add([sample_dir]) assert s == { "directory:add": 1, } directory_id = swh_storage.directory_get_random() # only 1 assert directory_id == sample_dir["id"] def test_retrying_proxy_storage_directory_add_with_retry( - swh_storage, sample_data, mocker, fake_hash_collision + monkeypatch_sleep, swh_storage, sample_data, mocker, fake_hash_collision ): """Multiple retries for hash collision and psycopg2 error but finally ok """ mock_memory = mocker.patch("swh.storage.in_memory.InMemoryStorage.directory_add") mock_memory.side_effect = [ # first try goes ko fake_hash_collision, # second try goes ko psycopg2.IntegrityError("directory already inserted"), # ok then! {"directory:add": 1}, ] - mock_sleep = mocker.patch( - "swh.storage.retry.RetryingProxyStorage" ".directory_add.retry.sleep" - ) sample_dir = sample_data["directory"][1] directory_id = swh_storage.directory_get_random() # no directory assert not directory_id s = swh_storage.directory_add([sample_dir]) assert s == { "directory:add": 1, } mock_memory.assert_has_calls( [ call([Directory.from_dict(sample_dir)]), call([Directory.from_dict(sample_dir)]), call([Directory.from_dict(sample_dir)]), ] ) - assert mock_sleep.call_count == 2 def test_retrying_proxy_swh_storage_directory_add_failure( swh_storage, sample_data, mocker ): """Unfiltered errors are raising without retry """ mock_memory = mocker.patch("swh.storage.in_memory.InMemoryStorage.directory_add") mock_memory.side_effect = StorageArgumentException( "Refuse to add directory always!" ) sample_dir = sample_data["directory"][0] directory_id = swh_storage.directory_get_random() # no directory assert not directory_id with pytest.raises(StorageArgumentException, match="Refuse to add"): swh_storage.directory_add([sample_dir]) assert mock_memory.call_count == 1 def test_retrying_proxy_storage_revision_add(swh_storage, sample_data): """Standard revision_add works as before """ sample_rev = sample_data["revision"][0] revision = next(swh_storage.revision_get([sample_rev["id"]])) assert not revision s = swh_storage.revision_add([sample_rev]) assert s == { "revision:add": 1, } revision = next(swh_storage.revision_get([sample_rev["id"]])) assert revision["id"] == sample_rev["id"] def test_retrying_proxy_storage_revision_add_with_retry( - swh_storage, sample_data, mocker, fake_hash_collision + monkeypatch_sleep, swh_storage, sample_data, mocker, fake_hash_collision ): """Multiple retries for hash collision and psycopg2 error but finally ok """ mock_memory = mocker.patch("swh.storage.in_memory.InMemoryStorage.revision_add") mock_memory.side_effect = [ # first try goes ko fake_hash_collision, # second try goes ko psycopg2.IntegrityError("revision already inserted"), # ok then! {"revision:add": 1}, ] - mock_sleep = mocker.patch( - "swh.storage.retry.RetryingProxyStorage" ".revision_add.retry.sleep" - ) - sample_rev = sample_data["revision"][0] revision = next(swh_storage.revision_get([sample_rev["id"]])) assert not revision s = swh_storage.revision_add([sample_rev]) assert s == { "revision:add": 1, } mock_memory.assert_has_calls( [ call([Revision.from_dict(sample_rev)]), call([Revision.from_dict(sample_rev)]), call([Revision.from_dict(sample_rev)]), ] ) - assert mock_sleep.call_count == 2 def test_retrying_proxy_swh_storage_revision_add_failure( swh_storage, sample_data, mocker ): """Unfiltered errors are raising without retry """ mock_memory = mocker.patch("swh.storage.in_memory.InMemoryStorage.revision_add") mock_memory.side_effect = StorageArgumentException("Refuse to add revision always!") sample_rev = sample_data["revision"][0] revision = next(swh_storage.revision_get([sample_rev["id"]])) assert not revision with pytest.raises(StorageArgumentException, match="Refuse to add"): swh_storage.revision_add([sample_rev]) assert mock_memory.call_count == 1 def test_retrying_proxy_storage_release_add(swh_storage, sample_data): """Standard release_add works as before """ sample_rel = sample_data["release"][0] release = next(swh_storage.release_get([sample_rel["id"]])) assert not release s = swh_storage.release_add([sample_rel]) assert s == { "release:add": 1, } release = next(swh_storage.release_get([sample_rel["id"]])) assert release["id"] == sample_rel["id"] def test_retrying_proxy_storage_release_add_with_retry( - swh_storage, sample_data, mocker, fake_hash_collision + monkeypatch_sleep, swh_storage, sample_data, mocker, fake_hash_collision ): """Multiple retries for hash collision and psycopg2 error but finally ok """ mock_memory = mocker.patch("swh.storage.in_memory.InMemoryStorage.release_add") mock_memory.side_effect = [ # first try goes ko fake_hash_collision, # second try goes ko psycopg2.IntegrityError("release already inserted"), # ok then! {"release:add": 1}, ] - mock_sleep = mocker.patch( - "swh.storage.retry.RetryingProxyStorage" ".release_add.retry.sleep" - ) - sample_rel = sample_data["release"][0] release = next(swh_storage.release_get([sample_rel["id"]])) assert not release s = swh_storage.release_add([sample_rel]) assert s == { "release:add": 1, } mock_memory.assert_has_calls( [ call([Release.from_dict(sample_rel)]), call([Release.from_dict(sample_rel)]), call([Release.from_dict(sample_rel)]), ] ) - assert mock_sleep.call_count == 2 def test_retrying_proxy_swh_storage_release_add_failure( swh_storage, sample_data, mocker ): """Unfiltered errors are raising without retry """ mock_memory = mocker.patch("swh.storage.in_memory.InMemoryStorage.release_add") mock_memory.side_effect = StorageArgumentException("Refuse to add release always!") sample_rel = sample_data["release"][0] release = next(swh_storage.release_get([sample_rel["id"]])) assert not release with pytest.raises(StorageArgumentException, match="Refuse to add"): swh_storage.release_add([sample_rel]) assert mock_memory.call_count == 1 def test_retrying_proxy_storage_snapshot_add(swh_storage, sample_data): """Standard snapshot_add works as before """ sample_snap = sample_data["snapshot"][0] snapshot = swh_storage.snapshot_get(sample_snap["id"]) assert not snapshot s = swh_storage.snapshot_add([sample_snap]) assert s == { "snapshot:add": 1, } snapshot = swh_storage.snapshot_get(sample_snap["id"]) assert snapshot["id"] == sample_snap["id"] def test_retrying_proxy_storage_snapshot_add_with_retry( - swh_storage, sample_data, mocker, fake_hash_collision + monkeypatch_sleep, swh_storage, sample_data, mocker, fake_hash_collision ): """Multiple retries for hash collision and psycopg2 error but finally ok """ mock_memory = mocker.patch("swh.storage.in_memory.InMemoryStorage.snapshot_add") mock_memory.side_effect = [ # first try goes ko fake_hash_collision, # second try goes ko psycopg2.IntegrityError("snapshot already inserted"), # ok then! {"snapshot:add": 1}, ] - mock_sleep = mocker.patch( - "swh.storage.retry.RetryingProxyStorage" ".snapshot_add.retry.sleep" - ) - sample_snap = sample_data["snapshot"][0] snapshot = swh_storage.snapshot_get(sample_snap["id"]) assert not snapshot s = swh_storage.snapshot_add([sample_snap]) assert s == { "snapshot:add": 1, } mock_memory.assert_has_calls( [ call([Snapshot.from_dict(sample_snap)]), call([Snapshot.from_dict(sample_snap)]), call([Snapshot.from_dict(sample_snap)]), ] ) - assert mock_sleep.call_count == 2 def test_retrying_proxy_swh_storage_snapshot_add_failure( swh_storage, sample_data, mocker ): """Unfiltered errors are raising without retry """ mock_memory = mocker.patch("swh.storage.in_memory.InMemoryStorage.snapshot_add") mock_memory.side_effect = StorageArgumentException("Refuse to add snapshot always!") sample_snap = sample_data["snapshot"][0] snapshot = swh_storage.snapshot_get(sample_snap["id"]) assert not snapshot with pytest.raises(StorageArgumentException, match="Refuse to add"): swh_storage.snapshot_add([sample_snap]) assert mock_memory.call_count == 1 diff --git a/swh/storage/tests/test_write_replay.py b/swh/storage/tests/test_write_replay.py new file mode 100644 index 00000000..c78efe28 --- /dev/null +++ b/swh/storage/tests/test_write_replay.py @@ -0,0 +1,112 @@ +# Copyright (C) 2019-2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import functools +from unittest.mock import patch + +import attr +from hypothesis import given, settings, HealthCheck +from hypothesis.strategies import lists + +from swh.model.hypothesis_strategies import objects +from swh.model.model import Origin +from swh.storage import get_storage +from swh.storage.exc import HashCollision + +from swh.storage.replay import process_replay_objects + +from swh.journal.tests.utils import MockedJournalClient, MockedKafkaWriter + + +storage_config = { + "cls": "memory", + "journal_writer": {"cls": "memory"}, +} + + +def empty_person_name_email(rev_or_rel): + """Empties the 'name' and 'email' fields of the author/committer fields + of a revision or release; leaving only the fullname.""" + if getattr(rev_or_rel, "author", None): + rev_or_rel = attr.evolve( + rev_or_rel, author=attr.evolve(rev_or_rel.author, name=b"", email=b"",) + ) + + if getattr(rev_or_rel, "committer", None): + rev_or_rel = attr.evolve( + rev_or_rel, + committer=attr.evolve(rev_or_rel.committer, name=b"", email=b"",), + ) + + return rev_or_rel + + +@given(lists(objects(blacklist_types=("origin_visit_status",)), min_size=1)) +@settings(suppress_health_check=[HealthCheck.too_slow]) +def test_write_replay_same_order_batches(objects): + queue = [] + replayer = MockedJournalClient(queue) + + with patch( + "swh.journal.writer.inmemory.InMemoryJournalWriter", + return_value=MockedKafkaWriter(queue), + ): + storage1 = get_storage(**storage_config) + + # Write objects to storage1 + for (obj_type, obj) in objects: + if obj_type == "content" and obj.status == "absent": + obj_type = "skipped_content" + + if obj_type == "origin_visit": + storage1.origin_add_one(Origin(url=obj.origin)) + storage1.origin_visit_upsert([obj]) + else: + method = getattr(storage1, obj_type + "_add") + try: + method([obj]) + except HashCollision: + pass + + # Bail out early if we didn't insert any relevant objects... + queue_size = len(queue) + assert queue_size != 0, "No test objects found; hypothesis strategy bug?" + + assert replayer.stop_after_objects is None + replayer.stop_after_objects = queue_size + + storage2 = get_storage(**storage_config) + worker_fn = functools.partial(process_replay_objects, storage=storage2) + + replayer.process(worker_fn) + + assert replayer.consumer.committed + + for attr_name in ( + "_contents", + "_directories", + "_snapshots", + "_origin_visits", + "_origins", + ): + assert getattr(storage1, attr_name) == getattr(storage2, attr_name), attr_name + + # When hypothesis generates a revision and a release with same + # author (or committer) fullname but different name or email, then + # the storage will use the first name/email it sees. + # This first one will be either the one from the revision or the release, + # and since there is no order guarantees, storage2 has 1/2 chance of + # not seeing the same order as storage1, therefore we need to strip + # them out before comparing. + for attr_name in ("_revisions", "_releases"): + items1 = { + k: empty_person_name_email(v) + for (k, v) in getattr(storage1, attr_name).items() + } + items2 = { + k: empty_person_name_email(v) + for (k, v) in getattr(storage2, attr_name).items() + } + assert items1 == items2, attr_name diff --git a/version.txt b/version.txt index e10bad73..cd3a0b39 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.0.187-0-gf66184d \ No newline at end of file +v0.0.188-0-g49109d1 \ No newline at end of file