diff --git a/PKG-INFO b/PKG-INFO index fd47373b..d0aedf47 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,216 +1,216 @@ Metadata-Version: 2.1 Name: swh.storage -Version: 0.9.3 +Version: 0.10.0 Summary: Software Heritage storage manager Home-page: https://forge.softwareheritage.org/diffusion/DSTO/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-storage Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-storage/ Description: swh-storage =========== Abstraction layer over the archive, allowing to access all stored source code artifacts as well as their metadata. See the [documentation](https://docs.softwareheritage.org/devel/swh-storage/index.html) for more details. ## Quick start ### Dependencies Python tests for this module include tests that cannot be run without a local Postgresql database, so you need the Postgresql server executable on your machine (no need to have a running Postgresql server). They also expect a cassandra server. #### Debian-like host ``` $ sudo apt install libpq-dev postgresql-11 cassandra ``` #### Non Debian-like host The tests expects `/usr/sbin/cassandra` to exist. Optionally, you can avoid running the cassandra tests. ``` (swh) :~/swh-storage$ tox -- -m 'not cassandra' ``` ### Installation It is strongly recommended to use a virtualenv. In the following, we consider you work in a virtualenv named `swh`. See the [developer setup guide](https://docs.softwareheritage.org/devel/developer-setup.html#developer-setup) for a more details on how to setup a working environment. You can install the package directly from [pypi](https://pypi.org/p/swh.storage): ``` (swh) :~$ pip install swh.storage [...] ``` Or from sources: ``` (swh) :~$ git clone https://forge.softwareheritage.org/source/swh-storage.git [...] (swh) :~$ cd swh-storage (swh) :~/swh-storage$ pip install . [...] ``` Then you can check it's properly installed: ``` (swh) :~$ swh storage --help Usage: swh storage [OPTIONS] COMMAND [ARGS]... Software Heritage Storage tools. Options: -h, --help Show this message and exit. Commands: rpc-serve Software Heritage Storage RPC server. ``` ## Tests The best way of running Python tests for this module is to use [tox](https://tox.readthedocs.io/). ``` (swh) :~$ pip install tox ``` ### tox From the sources directory, simply use tox: ``` (swh) :~/swh-storage$ tox [...] ========= 315 passed, 6 skipped, 15 warnings in 40.86 seconds ========== _______________________________ summary ________________________________ flake8: commands succeeded py3: commands succeeded congratulations :) ``` ## Development The storage server can be locally started. It requires a configuration file and a running Postgresql database. ### Sample configuration A typical configuration `storage.yml` file is: ``` storage: cls: local args: db: "dbname=softwareheritage-dev user= password=" objstorage: cls: pathslicing args: root: /tmp/swh-storage/ slicing: 0:2/2:4/4:6 ``` which means, this uses: - a local storage instance whose db connection is to `softwareheritage-dev` local instance, - the objstorage uses a local objstorage instance whose: - `root` path is /tmp/swh-storage, - slicing scheme is `0:2/2:4/4:6`. This means that the identifier of the content (sha1) which will be stored on disk at first level with the first 2 hex characters, the second level with the next 2 hex characters and the third level with the next 2 hex characters. And finally the complete hash file holding the raw content. For example: 00062f8bd330715c4f819373653d97b3cd34394c will be stored at 00/06/2f/00062f8bd330715c4f819373653d97b3cd34394c Note that the `root` path should exist on disk before starting the server. ### Starting the storage server If the python package has been properly installed (e.g. in a virtual env), you should be able to use the command: ``` (swh) :~/swh-storage$ swh storage rpc-serve storage.yml ``` This runs a local swh-storage api at 5002 port. ``` (swh) :~/swh-storage$ curl http://127.0.0.1:5002 Software Heritage storage server

You have reached the Software Heritage storage server.
See its documentation and API for more information

``` ### And then what? In your upper layer ([loader-git](https://forge.softwareheritage.org/source/swh-loader-git/), [loader-svn](https://forge.softwareheritage.org/source/swh-loader-svn/), etc...), you can define a remote storage with this snippet of yaml configuration. ``` storage: cls: remote args: url: http://localhost:5002/ ``` You could directly define a local storage with the following snippet: ``` storage: cls: local args: db: service=swh-dev objstorage: cls: pathslicing args: root: /home/storage/swh-storage/ slicing: 0:2/2:4/4:6 ``` Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing Provides-Extra: schemata Provides-Extra: journal diff --git a/requirements-swh-journal.txt b/requirements-swh-journal.txt index 1ab13e77..816f455a 100644 --- a/requirements-swh-journal.txt +++ b/requirements-swh-journal.txt @@ -1 +1 @@ -swh.journal >= 0.3.2 +swh.journal >= 0.4 diff --git a/requirements-swh.txt b/requirements-swh.txt index 8b1d72cd..65e45186 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,3 +1,3 @@ -swh.core[db,http] >= 0.0.94 -swh.model >= 0.3.4 +swh.core[db,http] >= 0.1.0 +swh.model >= 0.4.0 swh.objstorage >= 0.0.40 diff --git a/sql/upgrades/158.sql b/sql/upgrades/158.sql new file mode 100644 index 00000000..0c8e0849 --- /dev/null +++ b/sql/upgrades/158.sql @@ -0,0 +1,76 @@ +-- SWH DB schema upgrade +-- from_version: 157 +-- to_version: 158 +-- description: Add the extra_headers column in the revision table + +-- latest schema version +insert into dbversion(version, release, description) + values(158, now(), 'Work Still In Progress'); + +-- Adapt the revision table for the new extra_headers column +alter table revision add column (extra_headers bytea[][]); + +-- Adapt the revision_entry type for the new extra_headers attribute +alter type revision_entry add attribute (extra_headers bytea[][]); + +-- Create entries in revision from tmp_revision +create or replace function swh_revision_add() + returns void + language plpgsql +as $$ +begin + perform swh_person_add_from_revision(); + + insert into revision (id, date, date_offset, date_neg_utc_offset, committer_date, committer_date_offset, committer_date_neg_utc_offset, type, directory, message, author, committer, metadata, synthetic, extra_headers) + select t.id, t.date, t.date_offset, t.date_neg_utc_offset, t.committer_date, t.committer_date_offset, t.committer_date_neg_utc_offset, t.type, t.directory, t.message, a.id, c.id, t.metadata, t.synthetic, t.extra_headers + from tmp_revision t + left join person a on a.fullname = t.author_fullname + left join person c on c.fullname = t.committer_fullname; + return; +end +$$; + +-- "git style" revision log. Similar to swh_revision_list(), but returning all +-- information associated to each revision, and expanding authors/committers +create or replace function swh_revision_log(root_revisions bytea[], num_revs bigint default NULL) + returns setof revision_entry + language sql + stable +as $$ + select t.id, r.date, r.date_offset, r.date_neg_utc_offset, + r.committer_date, r.committer_date_offset, r.committer_date_neg_utc_offset, + r.type, r.directory, r.message, + a.id, a.fullname, a.name, a.email, + c.id, c.fullname, c.name, c.email, + r.metadata, r.synthetic, r.extra_headers, t.parents, r.object_id + from swh_revision_list(root_revisions, num_revs) as t + left join revision r on t.id = r.id + left join person a on a.id = r.author + left join person c on c.id = r.committer; +$$; + +create or replace function swh_revision_list_by_object_id( + min_excl bigint, + max_incl bigint +) + returns setof revision_entry + language sql + stable +as $$ + with revs as ( + select * from revision + where object_id > min_excl and object_id <= max_incl + ) + select r.id, r.date, r.date_offset, r.date_neg_utc_offset, + r.committer_date, r.committer_date_offset, r.committer_date_neg_utc_offset, + r.type, r.directory, r.message, + a.id, a.fullname, a.name, a.email, c.id, c.fullname, c.name, c.email, r.metadata, r.synthetic, r.extra_headers, + array(select rh.parent_id::bytea from revision_history rh where rh.id = r.id order by rh.parent_rank) + as parents, r.object_id + from revs r + left join person a on a.id = r.author + left join person c on c.id = r.committer + order by r.object_id; +$$; + +-- TODO: add the migration magic query... diff --git a/swh.storage.egg-info/PKG-INFO b/swh.storage.egg-info/PKG-INFO index fd47373b..d0aedf47 100644 --- a/swh.storage.egg-info/PKG-INFO +++ b/swh.storage.egg-info/PKG-INFO @@ -1,216 +1,216 @@ Metadata-Version: 2.1 Name: swh.storage -Version: 0.9.3 +Version: 0.10.0 Summary: Software Heritage storage manager Home-page: https://forge.softwareheritage.org/diffusion/DSTO/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-storage Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-storage/ Description: swh-storage =========== Abstraction layer over the archive, allowing to access all stored source code artifacts as well as their metadata. See the [documentation](https://docs.softwareheritage.org/devel/swh-storage/index.html) for more details. ## Quick start ### Dependencies Python tests for this module include tests that cannot be run without a local Postgresql database, so you need the Postgresql server executable on your machine (no need to have a running Postgresql server). They also expect a cassandra server. #### Debian-like host ``` $ sudo apt install libpq-dev postgresql-11 cassandra ``` #### Non Debian-like host The tests expects `/usr/sbin/cassandra` to exist. Optionally, you can avoid running the cassandra tests. ``` (swh) :~/swh-storage$ tox -- -m 'not cassandra' ``` ### Installation It is strongly recommended to use a virtualenv. In the following, we consider you work in a virtualenv named `swh`. See the [developer setup guide](https://docs.softwareheritage.org/devel/developer-setup.html#developer-setup) for a more details on how to setup a working environment. You can install the package directly from [pypi](https://pypi.org/p/swh.storage): ``` (swh) :~$ pip install swh.storage [...] ``` Or from sources: ``` (swh) :~$ git clone https://forge.softwareheritage.org/source/swh-storage.git [...] (swh) :~$ cd swh-storage (swh) :~/swh-storage$ pip install . [...] ``` Then you can check it's properly installed: ``` (swh) :~$ swh storage --help Usage: swh storage [OPTIONS] COMMAND [ARGS]... Software Heritage Storage tools. Options: -h, --help Show this message and exit. Commands: rpc-serve Software Heritage Storage RPC server. ``` ## Tests The best way of running Python tests for this module is to use [tox](https://tox.readthedocs.io/). ``` (swh) :~$ pip install tox ``` ### tox From the sources directory, simply use tox: ``` (swh) :~/swh-storage$ tox [...] ========= 315 passed, 6 skipped, 15 warnings in 40.86 seconds ========== _______________________________ summary ________________________________ flake8: commands succeeded py3: commands succeeded congratulations :) ``` ## Development The storage server can be locally started. It requires a configuration file and a running Postgresql database. ### Sample configuration A typical configuration `storage.yml` file is: ``` storage: cls: local args: db: "dbname=softwareheritage-dev user= password=" objstorage: cls: pathslicing args: root: /tmp/swh-storage/ slicing: 0:2/2:4/4:6 ``` which means, this uses: - a local storage instance whose db connection is to `softwareheritage-dev` local instance, - the objstorage uses a local objstorage instance whose: - `root` path is /tmp/swh-storage, - slicing scheme is `0:2/2:4/4:6`. This means that the identifier of the content (sha1) which will be stored on disk at first level with the first 2 hex characters, the second level with the next 2 hex characters and the third level with the next 2 hex characters. And finally the complete hash file holding the raw content. For example: 00062f8bd330715c4f819373653d97b3cd34394c will be stored at 00/06/2f/00062f8bd330715c4f819373653d97b3cd34394c Note that the `root` path should exist on disk before starting the server. ### Starting the storage server If the python package has been properly installed (e.g. in a virtual env), you should be able to use the command: ``` (swh) :~/swh-storage$ swh storage rpc-serve storage.yml ``` This runs a local swh-storage api at 5002 port. ``` (swh) :~/swh-storage$ curl http://127.0.0.1:5002 Software Heritage storage server

You have reached the Software Heritage storage server.
See its documentation and API for more information

``` ### And then what? In your upper layer ([loader-git](https://forge.softwareheritage.org/source/swh-loader-git/), [loader-svn](https://forge.softwareheritage.org/source/swh-loader-svn/), etc...), you can define a remote storage with this snippet of yaml configuration. ``` storage: cls: remote args: url: http://localhost:5002/ ``` You could directly define a local storage with the following snippet: ``` storage: cls: local args: db: service=swh-dev objstorage: cls: pathslicing args: root: /home/storage/swh-storage/ slicing: 0:2/2:4/4:6 ``` Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing Provides-Extra: schemata Provides-Extra: journal diff --git a/swh.storage.egg-info/SOURCES.txt b/swh.storage.egg-info/SOURCES.txt index 8d3e0c5a..623fcdc1 100644 --- a/swh.storage.egg-info/SOURCES.txt +++ b/swh.storage.egg-info/SOURCES.txt @@ -1,264 +1,266 @@ MANIFEST.in Makefile Makefile.local README.md pyproject.toml pytest.ini setup.cfg setup.py tox.ini version.txt ./requirements-swh-journal.txt ./requirements-swh.txt ./requirements-test.txt ./requirements.txt bin/swh-storage-add-dir sql/.gitignore sql/Makefile sql/TODO sql/clusters.dot sql/bin/db-upgrade sql/bin/dot_add_content sql/doc/json/.gitignore sql/doc/json/Makefile sql/doc/json/entity.lister_metadata.schema.json sql/doc/json/entity.metadata.schema.json sql/doc/json/entity_history.lister_metadata.schema.json sql/doc/json/entity_history.metadata.schema.json sql/doc/json/fetch_history.result.schema.json sql/doc/json/list_history.result.schema.json sql/doc/json/listable_entity.list_params.schema.json sql/doc/json/origin_visit.metadata.json sql/doc/json/tool.tool_configuration.schema.json sql/json/.gitignore sql/json/Makefile sql/json/entity.lister_metadata.schema.json sql/json/entity.metadata.schema.json sql/json/entity_history.lister_metadata.schema.json sql/json/entity_history.metadata.schema.json sql/json/fetch_history.result.schema.json sql/json/list_history.result.schema.json sql/json/listable_entity.list_params.schema.json sql/json/origin_visit.metadata.json sql/json/tool.tool_configuration.schema.json sql/upgrades/015.sql sql/upgrades/016.sql sql/upgrades/017.sql sql/upgrades/018.sql sql/upgrades/019.sql sql/upgrades/020.sql sql/upgrades/021.sql sql/upgrades/022.sql sql/upgrades/023.sql sql/upgrades/024.sql sql/upgrades/025.sql sql/upgrades/026.sql sql/upgrades/027.sql sql/upgrades/028.sql sql/upgrades/029.sql sql/upgrades/030.sql sql/upgrades/032.sql sql/upgrades/033.sql sql/upgrades/034.sql sql/upgrades/035.sql sql/upgrades/036.sql sql/upgrades/037.sql sql/upgrades/038.sql sql/upgrades/039.sql sql/upgrades/040.sql sql/upgrades/041.sql sql/upgrades/042.sql sql/upgrades/043.sql sql/upgrades/044.sql sql/upgrades/045.sql sql/upgrades/046.sql sql/upgrades/047.sql sql/upgrades/048.sql sql/upgrades/049.sql sql/upgrades/050.sql sql/upgrades/051.sql sql/upgrades/052.sql sql/upgrades/053.sql sql/upgrades/054.sql sql/upgrades/055.sql sql/upgrades/056.sql sql/upgrades/057.sql sql/upgrades/058.sql sql/upgrades/059.sql sql/upgrades/060.sql sql/upgrades/061.sql sql/upgrades/062.sql sql/upgrades/063.sql sql/upgrades/064.sql sql/upgrades/065.sql sql/upgrades/066.sql sql/upgrades/067.sql sql/upgrades/068.sql sql/upgrades/069.sql sql/upgrades/070.sql sql/upgrades/071.sql sql/upgrades/072.sql sql/upgrades/073.sql sql/upgrades/074.sql sql/upgrades/075.sql sql/upgrades/076.sql sql/upgrades/077.sql sql/upgrades/078.sql sql/upgrades/079.sql sql/upgrades/080.sql sql/upgrades/081.sql sql/upgrades/082.sql sql/upgrades/083.sql sql/upgrades/084.sql sql/upgrades/085.sql sql/upgrades/086.sql sql/upgrades/087.sql sql/upgrades/088.sql sql/upgrades/089.sql sql/upgrades/090.sql sql/upgrades/091.sql sql/upgrades/092.sql sql/upgrades/093.sql sql/upgrades/094.sql sql/upgrades/095.sql sql/upgrades/096.sql sql/upgrades/097.sql sql/upgrades/098.sql sql/upgrades/099.sql sql/upgrades/100.sql sql/upgrades/101.sql sql/upgrades/102.sql sql/upgrades/103.sql sql/upgrades/104.sql sql/upgrades/105.sql sql/upgrades/106.sql sql/upgrades/107.sql sql/upgrades/108.sql sql/upgrades/109.sql sql/upgrades/110.sql sql/upgrades/111.sql sql/upgrades/112.sql sql/upgrades/113.sql sql/upgrades/114.sql sql/upgrades/115.sql sql/upgrades/116.sql sql/upgrades/117.sql sql/upgrades/118.sql sql/upgrades/119.sql sql/upgrades/120.sql sql/upgrades/121.sql sql/upgrades/122.sql sql/upgrades/123.sql sql/upgrades/124.sql sql/upgrades/125.sql sql/upgrades/126.sql sql/upgrades/127.sql sql/upgrades/128.sql sql/upgrades/129.sql sql/upgrades/130.sql sql/upgrades/131.sql sql/upgrades/132.sql sql/upgrades/133.sql sql/upgrades/134.sql sql/upgrades/135.sql sql/upgrades/136.sql sql/upgrades/137.sql sql/upgrades/138.sql sql/upgrades/139.sql sql/upgrades/140.sql sql/upgrades/141.sql sql/upgrades/142.sql sql/upgrades/143.sql sql/upgrades/144.sql sql/upgrades/145.sql sql/upgrades/146.sql sql/upgrades/147.sql sql/upgrades/148.sql sql/upgrades/149.sql sql/upgrades/150.sql sql/upgrades/151.sql sql/upgrades/152.sql sql/upgrades/153.sql sql/upgrades/154.sql sql/upgrades/155.sql sql/upgrades/156.sql sql/upgrades/157.sql +sql/upgrades/158.sql swh/__init__.py swh.storage.egg-info/PKG-INFO swh.storage.egg-info/SOURCES.txt swh.storage.egg-info/dependency_links.txt swh.storage.egg-info/entry_points.txt swh.storage.egg-info/requires.txt swh.storage.egg-info/top_level.txt swh/storage/__init__.py swh/storage/backfill.py swh/storage/buffer.py swh/storage/cli.py swh/storage/common.py swh/storage/converters.py swh/storage/db.py swh/storage/exc.py swh/storage/extrinsic_metadata.py swh/storage/filter.py swh/storage/fixer.py swh/storage/in_memory.py swh/storage/interface.py swh/storage/metrics.py swh/storage/objstorage.py swh/storage/py.typed swh/storage/replay.py swh/storage/retry.py swh/storage/storage.py swh/storage/utils.py swh/storage/validate.py swh/storage/writer.py swh/storage/algos/__init__.py swh/storage/algos/diff.py swh/storage/algos/dir_iterators.py swh/storage/algos/origin.py swh/storage/algos/revisions_walker.py swh/storage/algos/snapshot.py swh/storage/api/__init__.py swh/storage/api/client.py swh/storage/api/serializers.py swh/storage/api/server.py swh/storage/cassandra/__init__.py swh/storage/cassandra/common.py swh/storage/cassandra/converters.py swh/storage/cassandra/cql.py swh/storage/cassandra/schema.py swh/storage/cassandra/storage.py swh/storage/sql/10-swh-init.sql swh/storage/sql/20-swh-enums.sql swh/storage/sql/30-swh-schema.sql swh/storage/sql/40-swh-func.sql swh/storage/sql/60-swh-indexes.sql swh/storage/tests/__init__.py swh/storage/tests/conftest.py swh/storage/tests/generate_data_test.py swh/storage/tests/storage_data.py swh/storage/tests/test_api_client.py swh/storage/tests/test_backfill.py swh/storage/tests/test_buffer.py swh/storage/tests/test_cassandra.py swh/storage/tests/test_cassandra_converters.py swh/storage/tests/test_cli.py swh/storage/tests/test_converters.py swh/storage/tests/test_db.py swh/storage/tests/test_exception.py swh/storage/tests/test_filter.py swh/storage/tests/test_in_memory.py swh/storage/tests/test_init.py swh/storage/tests/test_kafka_writer.py swh/storage/tests/test_metrics.py swh/storage/tests/test_replay.py swh/storage/tests/test_retry.py +swh/storage/tests/test_revision_bw_compat.py swh/storage/tests/test_server.py swh/storage/tests/test_storage.py swh/storage/tests/test_utils.py swh/storage/tests/algos/__init__.py swh/storage/tests/algos/test_diff.py swh/storage/tests/algos/test_dir_iterator.py swh/storage/tests/algos/test_origin.py swh/storage/tests/algos/test_revisions_walker.py swh/storage/tests/algos/test_snapshot.py \ No newline at end of file diff --git a/swh.storage.egg-info/requires.txt b/swh.storage.egg-info/requires.txt index e2fdb0d1..7c9f0d77 100644 --- a/swh.storage.egg-info/requires.txt +++ b/swh.storage.egg-info/requires.txt @@ -1,28 +1,28 @@ click flask psycopg2 vcversioner aiohttp tenacity cassandra-driver!=3.21.0,>=3.19.0 deprecated -swh.core[db,http]>=0.0.94 -swh.model>=0.3.4 +swh.core[db,http]>=0.1.0 +swh.model>=0.4.0 swh.objstorage>=0.0.40 [journal] -swh.journal>=0.3.2 +swh.journal>=0.4 [schemata] SQLAlchemy [testing] hypothesis>=3.11.0 pytest pytest-mock pytest-postgresql>=2.1.0 sqlalchemy-stubs swh.model[testing]>=0.0.50 pytz pytest-xdist -swh.journal>=0.3.2 +swh.journal>=0.4 diff --git a/swh/storage/cassandra/converters.py b/swh/storage/cassandra/converters.py index b018f05d..a1943011 100644 --- a/swh/storage/cassandra/converters.py +++ b/swh/storage/cassandra/converters.py @@ -1,92 +1,94 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import json import attr from copy import deepcopy from typing import Any, Dict, Tuple from cassandra.cluster import ResultSet from swh.model.model import ( ObjectType, OriginVisitStatus, Revision, RevisionType, Release, Sha1Git, ) from swh.model.hashutil import DEFAULT_ALGORITHMS -from ..converters import git_headers_to_db, db_to_git_headers from .common import Row def revision_to_db(revision: Revision) -> Dict[str, Any]: # we use a deepcopy of the dict because we do not want to recurse the # Model->dict conversion (to keep Timestamp & al. entities), BUT we do not # want to modify original metadata (embedded in the Model entity), so we # non-recursively convert it as a dict but make a deep copy. db_revision = deepcopy(attr.asdict(revision, recurse=False)) metadata = revision.metadata - if metadata and "extra_headers" in metadata: - db_revision["metadata"]["extra_headers"] = git_headers_to_db( - metadata["extra_headers"] - ) + extra_headers = revision.extra_headers + if not extra_headers and metadata and "extra_headers" in metadata: + extra_headers = db_revision["metadata"].pop("extra_headers") db_revision["metadata"] = json.dumps(db_revision["metadata"]) + db_revision["extra_headers"] = extra_headers db_revision["type"] = db_revision["type"].value return db_revision def revision_from_db(db_revision: Row, parents: Tuple[Sha1Git]) -> Revision: revision = db_revision._asdict() # type: ignore metadata = json.loads(revision.pop("metadata", None)) - if metadata and "extra_headers" in metadata: - extra_headers = db_to_git_headers(metadata["extra_headers"]) - metadata["extra_headers"] = extra_headers + extra_headers = revision.pop("extra_headers", ()) + if not extra_headers and metadata and "extra_headers" in metadata: + extra_headers = metadata.pop("extra_headers") + if extra_headers is None: + extra_headers = () return Revision( parents=parents, type=RevisionType(revision.pop("type")), metadata=metadata, + extra_headers=extra_headers, **revision, ) def release_to_db(release: Release) -> Dict[str, Any]: db_release = attr.asdict(release, recurse=False) db_release["target_type"] = db_release["target_type"].value return db_release def release_from_db(db_release: Row) -> Release: release = db_release._asdict() # type: ignore return Release(target_type=ObjectType(release.pop("target_type")), **release,) def row_to_content_hashes(row: Row) -> Dict[str, bytes]: """Convert cassandra row to a content hashes """ hashes = {} for algo in DEFAULT_ALGORITHMS: hashes[algo] = getattr(row, algo) return hashes def row_to_visit_status(row: ResultSet) -> OriginVisitStatus: """Format a row representing a visit_status to an actual dict representing an OriginVisitStatus. """ return OriginVisitStatus.from_dict( { **row._asdict(), "origin": row.origin, "date": row.date.replace(tzinfo=datetime.timezone.utc), "metadata": (json.loads(row.metadata) if row.metadata else None), } ) diff --git a/swh/storage/cassandra/cql.py b/swh/storage/cassandra/cql.py index db65629e..6a2997f8 100644 --- a/swh/storage/cassandra/cql.py +++ b/swh/storage/cassandra/cql.py @@ -1,999 +1,1000 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import functools import json import logging import random from typing import ( Any, Callable, Dict, Iterable, Iterator, List, Optional, Tuple, TypeVar, Union, ) from cassandra import CoordinationFailure from cassandra.cluster import Cluster, EXEC_PROFILE_DEFAULT, ExecutionProfile, ResultSet from cassandra.policies import DCAwareRoundRobinPolicy, TokenAwarePolicy from cassandra.query import PreparedStatement, BoundStatement from tenacity import ( retry, stop_after_attempt, wait_random_exponential, retry_if_exception_type, ) from swh.model.model import ( Sha1Git, TimestampWithTimezone, Timestamp, Person, Content, SkippedContent, OriginVisit, OriginVisitStatus, Origin, ) from .common import Row, TOKEN_BEGIN, TOKEN_END, hash_url from .schema import CREATE_TABLES_QUERIES, HASH_ALGORITHMS from .. import extrinsic_metadata logger = logging.getLogger(__name__) _execution_profiles = { EXEC_PROFILE_DEFAULT: ExecutionProfile( load_balancing_policy=TokenAwarePolicy(DCAwareRoundRobinPolicy()) ), } # Configuration for cassandra-driver's access to servers: # * hit the right server directly when sending a query (TokenAwarePolicy), # * if there's more than one, then pick one at random that's in the same # datacenter as the client (DCAwareRoundRobinPolicy) def create_keyspace( hosts: List[str], keyspace: str, port: int = 9042, *, durable_writes=True ): cluster = Cluster(hosts, port=port, execution_profiles=_execution_profiles) session = cluster.connect() extra_params = "" if not durable_writes: extra_params = "AND durable_writes = false" session.execute( """CREATE KEYSPACE IF NOT EXISTS "%s" WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 } %s; """ % (keyspace, extra_params) ) session.execute('USE "%s"' % keyspace) for query in CREATE_TABLES_QUERIES: session.execute(query) T = TypeVar("T") def _prepared_statement(query: str) -> Callable[[Callable[..., T]], Callable[..., T]]: """Returns a decorator usable on methods of CqlRunner, to inject them with a 'statement' argument, that is a prepared statement corresponding to the query. This only works on methods of CqlRunner, as preparing a statement requires a connection to a Cassandra server.""" def decorator(f): @functools.wraps(f) def newf(self, *args, **kwargs) -> T: if f.__name__ not in self._prepared_statements: statement: PreparedStatement = self._session.prepare(query) self._prepared_statements[f.__name__] = statement return f( self, *args, **kwargs, statement=self._prepared_statements[f.__name__] ) return newf return decorator def _prepared_insert_statement(table_name: str, columns: List[str]): """Shorthand for using `_prepared_statement` for `INSERT INTO` statements.""" return _prepared_statement( "INSERT INTO %s (%s) VALUES (%s)" % (table_name, ", ".join(columns), ", ".join("?" for _ in columns),) ) def _prepared_exists_statement(table_name: str): """Shorthand for using `_prepared_statement` for queries that only check which ids in a list exist in the table.""" return _prepared_statement(f"SELECT id FROM {table_name} WHERE id IN ?") class CqlRunner: """Class managing prepared statements and building queries to be sent to Cassandra.""" def __init__(self, hosts: List[str], keyspace: str, port: int): self._cluster = Cluster( hosts, port=port, execution_profiles=_execution_profiles ) self._session = self._cluster.connect(keyspace) self._cluster.register_user_type( keyspace, "microtimestamp_with_timezone", TimestampWithTimezone ) self._cluster.register_user_type(keyspace, "microtimestamp", Timestamp) self._cluster.register_user_type(keyspace, "person", Person) self._prepared_statements: Dict[str, PreparedStatement] = {} ########################## # Common utility functions ########################## MAX_RETRIES = 3 @retry( wait=wait_random_exponential(multiplier=1, max=10), stop=stop_after_attempt(MAX_RETRIES), retry=retry_if_exception_type(CoordinationFailure), ) def _execute_with_retries(self, statement, args) -> ResultSet: return self._session.execute(statement, args, timeout=1000.0) @_prepared_statement( "UPDATE object_count SET count = count + ? " "WHERE partition_key = 0 AND object_type = ?" ) def _increment_counter( self, object_type: str, nb: int, *, statement: PreparedStatement ) -> None: self._execute_with_retries(statement, [nb, object_type]) def _add_one(self, statement, object_type: str, obj, keys: List[str]) -> None: self._increment_counter(object_type, 1) self._execute_with_retries(statement, [getattr(obj, key) for key in keys]) def _get_random_row(self, statement) -> Optional[Row]: """Takes a prepared statement of the form "SELECT * FROM WHERE token() > ? LIMIT 1" and uses it to return a random row""" token = random.randint(TOKEN_BEGIN, TOKEN_END) rows = self._execute_with_retries(statement, [token]) if not rows: # There are no row with a greater token; wrap around to get # the row with the smallest token rows = self._execute_with_retries(statement, [TOKEN_BEGIN]) if rows: return rows.one() else: return None def _missing(self, statement, ids): res = self._execute_with_retries(statement, [ids]) found_ids = {id_ for (id_,) in res} return [id_ for id_ in ids if id_ not in found_ids] ########################## # 'content' table ########################## _content_pk = ["sha1", "sha1_git", "sha256", "blake2s256"] _content_keys = [ "sha1", "sha1_git", "sha256", "blake2s256", "length", "ctime", "status", ] def _content_add_finalize(self, statement: BoundStatement) -> None: """Returned currified by content_add_prepare, to be called when the content row should be added to the primary table.""" self._execute_with_retries(statement, None) self._increment_counter("content", 1) @_prepared_insert_statement("content", _content_keys) def content_add_prepare( self, content, *, statement ) -> Tuple[int, Callable[[], None]]: """Prepares insertion of a Content to the main 'content' table. Returns a token (to be used in secondary tables), and a function to be called to perform the insertion in the main table.""" statement = statement.bind( [getattr(content, key) for key in self._content_keys] ) # Type used for hashing keys (usually, it will be # cassandra.metadata.Murmur3Token) token_class = self._cluster.metadata.token_map.token_class # Token of the row when it will be inserted. This is equivalent to # "SELECT token({', '.join(self._content_pk)}) FROM content WHERE ..." # after the row is inserted; but we need the token to insert in the # index tables *before* inserting to the main 'content' table token = token_class.from_key(statement.routing_key).value assert TOKEN_BEGIN <= token <= TOKEN_END # Function to be called after the indexes contain their respective # row finalizer = functools.partial(self._content_add_finalize, statement) return (token, finalizer) @_prepared_statement( "SELECT * FROM content WHERE " + " AND ".join(map("%s = ?".__mod__, HASH_ALGORITHMS)) ) def content_get_from_pk( self, content_hashes: Dict[str, bytes], *, statement ) -> Optional[Row]: rows = list( self._execute_with_retries( statement, [content_hashes[algo] for algo in HASH_ALGORITHMS] ) ) assert len(rows) <= 1 if rows: return rows[0] else: return None @_prepared_statement( "SELECT * FROM content WHERE token(" + ", ".join(_content_pk) + ") = ?" ) def content_get_from_token(self, token, *, statement) -> Iterable[Row]: return self._execute_with_retries(statement, [token]) @_prepared_statement( "SELECT * FROM content WHERE token(%s) > ? LIMIT 1" % ", ".join(_content_pk) ) def content_get_random(self, *, statement) -> Optional[Row]: return self._get_random_row(statement) @_prepared_statement( ( "SELECT token({0}) AS tok, {1} FROM content " "WHERE token({0}) >= ? AND token({0}) <= ? LIMIT ?" ).format(", ".join(_content_pk), ", ".join(_content_keys)) ) def content_get_token_range( self, start: int, end: int, limit: int, *, statement ) -> Iterable[Row]: return self._execute_with_retries(statement, [start, end, limit]) ########################## # 'content_by_*' tables ########################## @_prepared_statement("SELECT sha1_git FROM content_by_sha1_git WHERE sha1_git IN ?") def content_missing_by_sha1_git( self, ids: List[bytes], *, statement ) -> List[bytes]: return self._missing(statement, ids) def content_index_add_one(self, algo: str, content: Content, token: int) -> None: """Adds a row mapping content[algo] to the token of the Content in the main 'content' table.""" query = ( f"INSERT INTO content_by_{algo} ({algo}, target_token) " f"VALUES (%s, %s)" ) self._execute_with_retries(query, [content.get_hash(algo), token]) def content_get_tokens_from_single_hash( self, algo: str, hash_: bytes ) -> Iterable[int]: assert algo in HASH_ALGORITHMS query = f"SELECT target_token FROM content_by_{algo} WHERE {algo} = %s" return (tok for (tok,) in self._execute_with_retries(query, [hash_])) ########################## # 'skipped_content' table ########################## _skipped_content_pk = ["sha1", "sha1_git", "sha256", "blake2s256"] _skipped_content_keys = [ "sha1", "sha1_git", "sha256", "blake2s256", "length", "ctime", "status", "reason", "origin", ] _magic_null_pk = b"" """ NULLs (or all-empty blobs) are not allowed in primary keys; instead use a special value that can't possibly be a valid hash. """ def _skipped_content_add_finalize(self, statement: BoundStatement) -> None: """Returned currified by skipped_content_add_prepare, to be called when the content row should be added to the primary table.""" self._execute_with_retries(statement, None) self._increment_counter("skipped_content", 1) @_prepared_insert_statement("skipped_content", _skipped_content_keys) def skipped_content_add_prepare( self, content, *, statement ) -> Tuple[int, Callable[[], None]]: """Prepares insertion of a Content to the main 'skipped_content' table. Returns a token (to be used in secondary tables), and a function to be called to perform the insertion in the main table.""" # Replace NULLs (which are not allowed in the partition key) with # an empty byte string content = content.to_dict() for key in self._skipped_content_pk: if content[key] is None: content[key] = self._magic_null_pk statement = statement.bind( [content.get(key) for key in self._skipped_content_keys] ) # Type used for hashing keys (usually, it will be # cassandra.metadata.Murmur3Token) token_class = self._cluster.metadata.token_map.token_class # Token of the row when it will be inserted. This is equivalent to # "SELECT token({', '.join(self._content_pk)}) # FROM skipped_content WHERE ..." # after the row is inserted; but we need the token to insert in the # index tables *before* inserting to the main 'skipped_content' table token = token_class.from_key(statement.routing_key).value assert TOKEN_BEGIN <= token <= TOKEN_END # Function to be called after the indexes contain their respective # row finalizer = functools.partial(self._skipped_content_add_finalize, statement) return (token, finalizer) @_prepared_statement( "SELECT * FROM skipped_content WHERE " + " AND ".join(map("%s = ?".__mod__, HASH_ALGORITHMS)) ) def skipped_content_get_from_pk( self, content_hashes: Dict[str, bytes], *, statement ) -> Optional[Row]: rows = list( self._execute_with_retries( statement, [ content_hashes[algo] or self._magic_null_pk for algo in HASH_ALGORITHMS ], ) ) assert len(rows) <= 1 if rows: # TODO: convert _magic_null_pk back to None? return rows[0] else: return None ########################## # 'skipped_content_by_*' tables ########################## def skipped_content_index_add_one( self, algo: str, content: SkippedContent, token: int ) -> None: """Adds a row mapping content[algo] to the token of the SkippedContent in the main 'skipped_content' table.""" query = ( f"INSERT INTO skipped_content_by_{algo} ({algo}, target_token) " f"VALUES (%s, %s)" ) self._execute_with_retries( query, [content.get_hash(algo) or self._magic_null_pk, token] ) ########################## # 'revision' table ########################## _revision_keys = [ "id", "date", "committer_date", "type", "directory", "message", "author", "committer", "synthetic", "metadata", + "extra_headers", ] @_prepared_exists_statement("revision") def revision_missing(self, ids: List[bytes], *, statement) -> List[bytes]: return self._missing(statement, ids) @_prepared_insert_statement("revision", _revision_keys) def revision_add_one(self, revision: Dict[str, Any], *, statement) -> None: self._execute_with_retries( statement, [revision[key] for key in self._revision_keys] ) self._increment_counter("revision", 1) @_prepared_statement("SELECT id FROM revision WHERE id IN ?") def revision_get_ids(self, revision_ids, *, statement) -> ResultSet: return self._execute_with_retries(statement, [revision_ids]) @_prepared_statement("SELECT * FROM revision WHERE id IN ?") def revision_get(self, revision_ids, *, statement) -> ResultSet: return self._execute_with_retries(statement, [revision_ids]) @_prepared_statement("SELECT * FROM revision WHERE token(id) > ? LIMIT 1") def revision_get_random(self, *, statement) -> Optional[Row]: return self._get_random_row(statement) ########################## # 'revision_parent' table ########################## _revision_parent_keys = ["id", "parent_rank", "parent_id"] @_prepared_insert_statement("revision_parent", _revision_parent_keys) def revision_parent_add_one( self, id_: Sha1Git, parent_rank: int, parent_id: Sha1Git, *, statement ) -> None: self._execute_with_retries(statement, [id_, parent_rank, parent_id]) @_prepared_statement("SELECT parent_id FROM revision_parent WHERE id = ?") def revision_parent_get(self, revision_id: Sha1Git, *, statement) -> ResultSet: return self._execute_with_retries(statement, [revision_id]) ########################## # 'release' table ########################## _release_keys = [ "id", "target", "target_type", "date", "name", "message", "author", "synthetic", ] @_prepared_exists_statement("release") def release_missing(self, ids: List[bytes], *, statement) -> List[bytes]: return self._missing(statement, ids) @_prepared_insert_statement("release", _release_keys) def release_add_one(self, release: Dict[str, Any], *, statement) -> None: self._execute_with_retries( statement, [release[key] for key in self._release_keys] ) self._increment_counter("release", 1) @_prepared_statement("SELECT * FROM release WHERE id in ?") def release_get(self, release_ids: List[str], *, statement) -> None: return self._execute_with_retries(statement, [release_ids]) @_prepared_statement("SELECT * FROM release WHERE token(id) > ? LIMIT 1") def release_get_random(self, *, statement) -> Optional[Row]: return self._get_random_row(statement) ########################## # 'directory' table ########################## _directory_keys = ["id"] @_prepared_exists_statement("directory") def directory_missing(self, ids: List[bytes], *, statement) -> List[bytes]: return self._missing(statement, ids) @_prepared_insert_statement("directory", _directory_keys) def directory_add_one(self, directory_id: Sha1Git, *, statement) -> None: """Called after all calls to directory_entry_add_one, to commit/finalize the directory.""" self._execute_with_retries(statement, [directory_id]) self._increment_counter("directory", 1) @_prepared_statement("SELECT * FROM directory WHERE token(id) > ? LIMIT 1") def directory_get_random(self, *, statement) -> Optional[Row]: return self._get_random_row(statement) ########################## # 'directory_entry' table ########################## _directory_entry_keys = ["directory_id", "name", "type", "target", "perms"] @_prepared_insert_statement("directory_entry", _directory_entry_keys) def directory_entry_add_one(self, entry: Dict[str, Any], *, statement) -> None: self._execute_with_retries( statement, [entry[key] for key in self._directory_entry_keys] ) @_prepared_statement("SELECT * FROM directory_entry WHERE directory_id IN ?") def directory_entry_get(self, directory_ids, *, statement) -> ResultSet: return self._execute_with_retries(statement, [directory_ids]) ########################## # 'snapshot' table ########################## _snapshot_keys = ["id"] @_prepared_exists_statement("snapshot") def snapshot_missing(self, ids: List[bytes], *, statement) -> List[bytes]: return self._missing(statement, ids) @_prepared_insert_statement("snapshot", _snapshot_keys) def snapshot_add_one(self, snapshot_id: Sha1Git, *, statement) -> None: self._execute_with_retries(statement, [snapshot_id]) self._increment_counter("snapshot", 1) @_prepared_statement("SELECT * FROM snapshot WHERE id = ?") def snapshot_get(self, snapshot_id: Sha1Git, *, statement) -> ResultSet: return self._execute_with_retries(statement, [snapshot_id]) @_prepared_statement("SELECT * FROM snapshot WHERE token(id) > ? LIMIT 1") def snapshot_get_random(self, *, statement) -> Optional[Row]: return self._get_random_row(statement) ########################## # 'snapshot_branch' table ########################## _snapshot_branch_keys = ["snapshot_id", "name", "target_type", "target"] @_prepared_insert_statement("snapshot_branch", _snapshot_branch_keys) def snapshot_branch_add_one(self, branch: Dict[str, Any], *, statement) -> None: self._execute_with_retries( statement, [branch[key] for key in self._snapshot_branch_keys] ) @_prepared_statement( "SELECT ascii_bins_count(target_type) AS counts " "FROM snapshot_branch " "WHERE snapshot_id = ? " ) def snapshot_count_branches(self, snapshot_id: Sha1Git, *, statement) -> ResultSet: return self._execute_with_retries(statement, [snapshot_id]) @_prepared_statement( "SELECT * FROM snapshot_branch WHERE snapshot_id = ? AND name >= ? LIMIT ?" ) def snapshot_branch_get( self, snapshot_id: Sha1Git, from_: bytes, limit: int, *, statement ) -> None: return self._execute_with_retries(statement, [snapshot_id, from_, limit]) ########################## # 'origin' table ########################## origin_keys = ["sha1", "url", "type", "next_visit_id"] @_prepared_statement( "INSERT INTO origin (sha1, url, next_visit_id) " "VALUES (?, ?, 1) IF NOT EXISTS" ) def origin_add_one(self, origin: Origin, *, statement) -> None: self._execute_with_retries(statement, [hash_url(origin.url), origin.url]) self._increment_counter("origin", 1) @_prepared_statement("SELECT * FROM origin WHERE sha1 = ?") def origin_get_by_sha1(self, sha1: bytes, *, statement) -> ResultSet: return self._execute_with_retries(statement, [sha1]) def origin_get_by_url(self, url: str) -> ResultSet: return self.origin_get_by_sha1(hash_url(url)) @_prepared_statement( f'SELECT token(sha1) AS tok, {", ".join(origin_keys)} ' f"FROM origin WHERE token(sha1) >= ? LIMIT ?" ) def origin_list(self, start_token: int, limit: int, *, statement) -> ResultSet: return self._execute_with_retries(statement, [start_token, limit]) @_prepared_statement("SELECT * FROM origin") def origin_iter_all(self, *, statement) -> ResultSet: return self._execute_with_retries(statement, []) @_prepared_statement("SELECT next_visit_id FROM origin WHERE sha1 = ?") def _origin_get_next_visit_id(self, origin_sha1: bytes, *, statement) -> int: rows = list(self._execute_with_retries(statement, [origin_sha1])) assert len(rows) == 1 # TODO: error handling return rows[0].next_visit_id @_prepared_statement( "UPDATE origin SET next_visit_id=? WHERE sha1 = ? IF next_visit_id=?" ) def origin_generate_unique_visit_id(self, origin_url: str, *, statement) -> int: origin_sha1 = hash_url(origin_url) next_id = self._origin_get_next_visit_id(origin_sha1) while True: res = list( self._execute_with_retries( statement, [next_id + 1, origin_sha1, next_id] ) ) assert len(res) == 1 if res[0].applied: # No data race return next_id else: # Someone else updated it before we did, let's try again next_id = res[0].next_visit_id # TODO: abort after too many attempts return next_id ########################## # 'origin_visit' table ########################## _origin_visit_keys = [ "origin", "visit", "type", "date", ] @_prepared_statement( "SELECT * FROM origin_visit WHERE origin = ? AND visit > ? " "ORDER BY visit ASC" ) def _origin_visit_get_pagination_asc_no_limit( self, origin_url: str, last_visit: int, *, statement ) -> ResultSet: return self._execute_with_retries(statement, [origin_url, last_visit]) @_prepared_statement( "SELECT * FROM origin_visit WHERE origin = ? AND visit > ? " "ORDER BY visit ASC " "LIMIT ?" ) def _origin_visit_get_pagination_asc_limit( self, origin_url: str, last_visit: int, limit: int, *, statement ) -> ResultSet: return self._execute_with_retries(statement, [origin_url, last_visit, limit]) @_prepared_statement( "SELECT * FROM origin_visit WHERE origin = ? AND visit < ? " "ORDER BY visit DESC" ) def _origin_visit_get_pagination_desc_no_limit( self, origin_url: str, last_visit: int, *, statement ) -> ResultSet: return self._execute_with_retries(statement, [origin_url, last_visit]) @_prepared_statement( "SELECT * FROM origin_visit WHERE origin = ? AND visit < ? " "ORDER BY visit DESC " "LIMIT ?" ) def _origin_visit_get_pagination_desc_limit( self, origin_url: str, last_visit: int, limit: int, *, statement ) -> ResultSet: return self._execute_with_retries(statement, [origin_url, last_visit, limit]) @_prepared_statement( "SELECT * FROM origin_visit WHERE origin = ? ORDER BY visit ASC LIMIT ?" ) def _origin_visit_get_no_pagination_asc_limit( self, origin_url: str, limit: int, *, statement ) -> ResultSet: return self._execute_with_retries(statement, [origin_url, limit]) @_prepared_statement( "SELECT * FROM origin_visit WHERE origin = ? ORDER BY visit ASC " ) def _origin_visit_get_no_pagination_asc_no_limit( self, origin_url: str, *, statement ) -> ResultSet: return self._execute_with_retries(statement, [origin_url]) @_prepared_statement( "SELECT * FROM origin_visit WHERE origin = ? ORDER BY visit DESC" ) def _origin_visit_get_no_pagination_desc_no_limit( self, origin_url: str, *, statement ) -> ResultSet: return self._execute_with_retries(statement, [origin_url]) @_prepared_statement( "SELECT * FROM origin_visit WHERE origin = ? ORDER BY visit DESC LIMIT ?" ) def _origin_visit_get_no_pagination_desc_limit( self, origin_url: str, limit: int, *, statement ) -> ResultSet: return self._execute_with_retries(statement, [origin_url, limit]) def origin_visit_get( self, origin_url: str, last_visit: Optional[int], limit: Optional[int], order: str = "asc", ) -> ResultSet: order = order.lower() assert order in ["asc", "desc"] args: List[Any] = [origin_url] if last_visit is not None: page_name = "pagination" args.append(last_visit) else: page_name = "no_pagination" if limit is not None: limit_name = "limit" args.append(limit) else: limit_name = "no_limit" method_name = f"_origin_visit_get_{page_name}_{order}_{limit_name}" origin_visit_get_method = getattr(self, method_name) return origin_visit_get_method(*args) @_prepared_insert_statement("origin_visit", _origin_visit_keys) def origin_visit_add_one(self, visit: OriginVisit, *, statement) -> None: self._add_one(statement, "origin_visit", visit, self._origin_visit_keys) _origin_visit_status_keys = [ "origin", "visit", "date", "status", "snapshot", "metadata", ] @_prepared_insert_statement("origin_visit_status", _origin_visit_status_keys) def origin_visit_status_add_one( self, visit_update: OriginVisitStatus, *, statement ) -> None: assert self._origin_visit_status_keys[-1] == "metadata" keys = self._origin_visit_status_keys metadata = json.dumps(visit_update.metadata) self._execute_with_retries( statement, [getattr(visit_update, key) for key in keys[:-1]] + [metadata] ) def origin_visit_status_get_latest(self, origin: str, visit: int,) -> Optional[Row]: """Given an origin visit id, return its latest origin_visit_status """ rows = self.origin_visit_status_get(origin, visit) return rows[0] if rows else None @_prepared_statement( "SELECT * FROM origin_visit_status " "WHERE origin = ? AND visit = ? " "ORDER BY date DESC" ) def origin_visit_status_get( self, origin: str, visit: int, allowed_statuses: Optional[List[str]] = None, require_snapshot: bool = False, *, statement, ) -> List[Row]: """Return all origin visit statuses for a given visit """ return list(self._execute_with_retries(statement, [origin, visit])) @_prepared_statement("SELECT * FROM origin_visit WHERE origin = ? AND visit = ?") def origin_visit_get_one( self, origin_url: str, visit_id: int, *, statement ) -> Optional[Row]: # TODO: error handling rows = list(self._execute_with_retries(statement, [origin_url, visit_id])) if rows: return rows[0] else: return None @_prepared_statement("SELECT * FROM origin_visit WHERE origin = ?") def origin_visit_get_all(self, origin_url: str, *, statement) -> ResultSet: return self._execute_with_retries(statement, [origin_url]) @_prepared_statement("SELECT * FROM origin_visit WHERE token(origin) >= ?") def _origin_visit_iter_from(self, min_token: int, *, statement) -> Iterator[Row]: yield from self._execute_with_retries(statement, [min_token]) @_prepared_statement("SELECT * FROM origin_visit WHERE token(origin) < ?") def _origin_visit_iter_to(self, max_token: int, *, statement) -> Iterator[Row]: yield from self._execute_with_retries(statement, [max_token]) def origin_visit_iter(self, start_token: int) -> Iterator[Row]: """Returns all origin visits in order from this token, and wraps around the token space.""" yield from self._origin_visit_iter_from(start_token) yield from self._origin_visit_iter_to(start_token) ########################## # 'metadata_authority' table ########################## _metadata_authority_keys = ["url", "type", "metadata"] @_prepared_insert_statement("metadata_authority", _metadata_authority_keys) def metadata_authority_add(self, url, type, metadata, *, statement): return self._execute_with_retries(statement, [url, type, metadata]) @_prepared_statement("SELECT * from metadata_authority WHERE type = ? AND url = ?") def metadata_authority_get(self, type, url, *, statement) -> Optional[Row]: return next(iter(self._execute_with_retries(statement, [type, url])), None) ########################## # 'metadata_fetcher' table ########################## _metadata_fetcher_keys = ["name", "version", "metadata"] @_prepared_insert_statement("metadata_fetcher", _metadata_fetcher_keys) def metadata_fetcher_add(self, name, version, metadata, *, statement): return self._execute_with_retries(statement, [name, version, metadata]) @_prepared_statement( "SELECT * from metadata_fetcher WHERE name = ? AND version = ?" ) def metadata_fetcher_get(self, name, version, *, statement) -> Optional[Row]: return next(iter(self._execute_with_retries(statement, [name, version])), None) ######################### # 'object_metadata' table ######################### _object_metadata_keys = [ "type", "id", "authority_type", "authority_url", "discovery_date", "fetcher_name", "fetcher_version", "format", "metadata", "origin", "visit", "snapshot", "release", "revision", "path", "directory", ] @_prepared_statement( f"INSERT INTO object_metadata ({', '.join(_object_metadata_keys)}) " f"VALUES ({', '.join('?' for _ in _object_metadata_keys)})" ) def object_metadata_add( self, object_type: str, id: str, authority_type, authority_url, discovery_date, fetcher_name, fetcher_version, format, metadata, context: Dict[str, Union[str, bytes, int]], *, statement, ): params = [ object_type, id, authority_type, authority_url, discovery_date, fetcher_name, fetcher_version, format, metadata, ] params.extend( context.get(key) for key in extrinsic_metadata.CONTEXT_KEYS[object_type] ) return self._execute_with_retries(statement, params,) @_prepared_statement( "SELECT * from object_metadata " "WHERE id=? AND authority_url=? AND discovery_date>? AND authority_type=?" ) def object_metadata_get_after_date( self, id: str, authority_type: str, authority_url: str, after: datetime.datetime, *, statement, ): return self._execute_with_retries( statement, [id, authority_url, after, authority_type] ) @_prepared_statement( "SELECT * from object_metadata " "WHERE id=? AND authority_type=? AND authority_url=? " "AND (discovery_date, fetcher_name, fetcher_version) > (?, ?, ?)" ) def object_metadata_get_after_date_and_fetcher( self, id: str, authority_type: str, authority_url: str, after_date: datetime.datetime, after_fetcher_name: str, after_fetcher_version: str, *, statement, ): return self._execute_with_retries( statement, [ id, authority_type, authority_url, after_date, after_fetcher_name, after_fetcher_version, ], ) @_prepared_statement( "SELECT * from object_metadata " "WHERE id=? AND authority_url=? AND authority_type=?" ) def object_metadata_get( self, id: str, authority_type: str, authority_url: str, *, statement ) -> Iterable[Row]: return self._execute_with_retries( statement, [id, authority_url, authority_type] ) ########################## # Miscellaneous ########################## @_prepared_statement("SELECT uuid() FROM revision LIMIT 1;") def check_read(self, *, statement): self._execute_with_retries(statement, []) @_prepared_statement( "SELECT object_type, count FROM object_count WHERE partition_key=0" ) def stat_counters(self, *, statement) -> ResultSet: return self._execute_with_retries(statement, []) diff --git a/swh/storage/cassandra/schema.py b/swh/storage/cassandra/schema.py index 01bdecec..945842e3 100644 --- a/swh/storage/cassandra/schema.py +++ b/swh/storage/cassandra/schema.py @@ -1,282 +1,283 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information CREATE_TABLES_QUERIES = """ CREATE OR REPLACE FUNCTION ascii_bins_count_sfunc ( state tuple>, -- (nb_none, map) bin_name ascii ) CALLED ON NULL INPUT RETURNS tuple> LANGUAGE java AS $$ if (bin_name == null) { state.setInt(0, state.getInt(0) + 1); } else { Map counters = state.getMap( 1, String.class, Integer.class); Integer nb = counters.get(bin_name); if (nb == null) { nb = 0; } counters.put(bin_name, nb + 1); state.setMap(1, counters, String.class, Integer.class); } return state; $$ ; CREATE OR REPLACE AGGREGATE ascii_bins_count ( ascii ) SFUNC ascii_bins_count_sfunc STYPE tuple> INITCOND (0, {}) ; CREATE TYPE IF NOT EXISTS microtimestamp ( seconds bigint, microseconds int ); CREATE TYPE IF NOT EXISTS microtimestamp_with_timezone ( timestamp frozen, offset smallint, negative_utc boolean ); CREATE TYPE IF NOT EXISTS person ( fullname blob, name blob, email blob ); CREATE TABLE IF NOT EXISTS content ( sha1 blob, sha1_git blob, sha256 blob, blake2s256 blob, length bigint, ctime timestamp, -- creation time, i.e. time of (first) injection into the storage status ascii, PRIMARY KEY ((sha1, sha1_git, sha256, blake2s256)) ); CREATE TABLE IF NOT EXISTS skipped_content ( sha1 blob, sha1_git blob, sha256 blob, blake2s256 blob, length bigint, ctime timestamp, -- creation time, i.e. time of (first) injection into the storage status ascii, reason text, origin text, PRIMARY KEY ((sha1, sha1_git, sha256, blake2s256)) ); CREATE TABLE IF NOT EXISTS revision ( id blob PRIMARY KEY, date microtimestamp_with_timezone, committer_date microtimestamp_with_timezone, type ascii, directory blob, -- source code "root" directory message blob, author person, committer person, synthetic boolean, -- true iff revision has been created by Software Heritage - metadata text - -- extra metadata as JSON(tarball checksums, - -- extra commit information, etc...) + metadata text, + -- extra metadata as JSON(tarball checksums, etc...) + extra_headers frozen> > + -- extra commit information as (tuple(key, value), ...) ); CREATE TABLE IF NOT EXISTS revision_parent ( id blob, parent_rank int, -- parent position in merge commits, 0-based parent_id blob, PRIMARY KEY ((id), parent_rank) ); CREATE TABLE IF NOT EXISTS release ( id blob PRIMARY KEY, target_type ascii, target blob, date microtimestamp_with_timezone, name blob, message blob, author person, synthetic boolean, -- true iff release has been created by Software Heritage ); CREATE TABLE IF NOT EXISTS directory ( id blob PRIMARY KEY, ); CREATE TABLE IF NOT EXISTS directory_entry ( directory_id blob, name blob, -- path name, relative to containing dir target blob, perms int, -- unix-like permissions type ascii, -- target type PRIMARY KEY ((directory_id), name) ); CREATE TABLE IF NOT EXISTS snapshot ( id blob PRIMARY KEY, ); -- For a given snapshot_id, branches are sorted by their name, -- allowing easy pagination. CREATE TABLE IF NOT EXISTS snapshot_branch ( snapshot_id blob, name blob, target_type ascii, target blob, PRIMARY KEY ((snapshot_id), name) ); CREATE TABLE IF NOT EXISTS origin_visit ( origin text, visit bigint, date timestamp, type text, PRIMARY KEY ((origin), visit) ); CREATE TABLE IF NOT EXISTS origin_visit_status ( origin text, visit bigint, date timestamp, status ascii, metadata text, snapshot blob, PRIMARY KEY ((origin), visit, date) ); CREATE TABLE IF NOT EXISTS origin ( sha1 blob PRIMARY KEY, url text, type text, next_visit_id int, -- We need integer visit ids for compatibility with the pgsql -- storage, so we're using lightweight transactions with this trick: -- https://stackoverflow.com/a/29391877/539465 ); CREATE TABLE IF NOT EXISTS metadata_authority ( url text, type ascii, metadata text, PRIMARY KEY ((url), type) ); CREATE TABLE IF NOT EXISTS metadata_fetcher ( name ascii, version ascii, metadata text, PRIMARY KEY ((name), version) ); CREATE TABLE IF NOT EXISTS object_metadata ( type text, id text, -- metadata source authority_type text, authority_url text, discovery_date timestamp, fetcher_name ascii, fetcher_version ascii, -- metadata itself format ascii, metadata blob, -- context origin text, visit bigint, snapshot text, release text, revision text, path blob, directory text, PRIMARY KEY ((id), authority_type, authority_url, discovery_date, fetcher_name, fetcher_version) ); CREATE TABLE IF NOT EXISTS object_count ( partition_key smallint, -- Constant, must always be 0 object_type ascii, count counter, PRIMARY KEY ((partition_key), object_type) ); """.split( "\n\n\n" ) CONTENT_INDEX_TEMPLATE = """ -- Secondary table, used for looking up "content" from a single hash CREATE TABLE IF NOT EXISTS content_by_{main_algo} ( {main_algo} blob, target_token bigint, -- value of token(pk) on the "primary" table PRIMARY KEY (({main_algo}), target_token) ); CREATE TABLE IF NOT EXISTS skipped_content_by_{main_algo} ( {main_algo} blob, target_token bigint, -- value of token(pk) on the "primary" table PRIMARY KEY (({main_algo}), target_token) ); """ TABLES = ( "skipped_content content revision revision_parent release " "directory directory_entry snapshot snapshot_branch " "origin_visit origin object_metadata object_count " "origin_visit_status metadata_authority " "metadata_fetcher" ).split() HASH_ALGORITHMS = ["sha1", "sha1_git", "sha256", "blake2s256"] for main_algo in HASH_ALGORITHMS: CREATE_TABLES_QUERIES.extend( CONTENT_INDEX_TEMPLATE.format( main_algo=main_algo, other_algos=", ".join( [algo for algo in HASH_ALGORITHMS if algo != main_algo] ), ).split("\n\n") ) TABLES.append("content_by_%s" % main_algo) TABLES.append("skipped_content_by_%s" % main_algo) diff --git a/swh/storage/converters.py b/swh/storage/converters.py index 541d5aa2..6c4e6286 100644 --- a/swh/storage/converters.py +++ b/swh/storage/converters.py @@ -1,320 +1,293 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from typing import Optional, Dict -from swh.core.utils import decode_with_escape, encode_with_unescape +from swh.core.utils import encode_with_unescape from swh.model import identifiers from swh.model.hashutil import MultiHash DEFAULT_AUTHOR = { "fullname": None, "name": None, "email": None, } DEFAULT_DATE = { "timestamp": None, "offset": 0, "neg_utc_offset": None, } def author_to_db(author): """Convert a swh-model author to its DB representation. Args: author: a :mod:`swh.model` compatible author Returns: dict: a dictionary with three keys: author, fullname and email """ if author is None: return DEFAULT_AUTHOR return author def db_to_author( fullname: Optional[bytes], name: Optional[bytes], email: Optional[bytes] ) -> Optional[Dict[str, Optional[bytes]]]: """Convert the DB representation of an author to a swh-model author. Args: fullname (bytes): the author's fullname name (bytes): the author's name email (bytes): the author's email Returns: a dictionary with three keys (fullname, name and email), or None if all the arguments are None. """ if (fullname, name, email) == (None, None, None): return None return { "fullname": fullname, "name": name, "email": email, } -def git_headers_to_db(git_headers): - """Convert git headers to their database representation. - - We convert the bytes to unicode by decoding them into utf-8 and replacing - invalid utf-8 sequences with backslash escapes. - - """ - ret = [] - for key, values in git_headers: - if isinstance(values, list): - ret.append([key, [decode_with_escape(value) for value in values]]) - else: - ret.append([key, decode_with_escape(values)]) - - return ret - - def db_to_git_headers(db_git_headers): ret = [] - for key, values in db_git_headers: - if isinstance(values, list): - ret.append([key, [encode_with_unescape(value) for value in values]]) - else: - ret.append([key, encode_with_unescape(values)]) - + for key, value in db_git_headers: + ret.append([key.encode("utf-8"), encode_with_unescape(value)]) return ret def db_to_date(date, offset, neg_utc_offset): """Convert the DB representation of a date to a swh-model compatible date. Args: date (datetime.datetime): a date pulled out of the database offset (int): an integer number of minutes representing an UTC offset neg_utc_offset (boolean): whether an utc offset is negative Returns: dict: a dict with three keys: - timestamp: a timestamp from UTC - offset: the number of minutes since UTC - negative_utc: whether a null UTC offset is negative """ if date is None: return None return { "timestamp": { "seconds": int(date.timestamp()), "microseconds": date.microsecond, }, "offset": offset, "negative_utc": neg_utc_offset, } def date_to_db(date_offset): """Convert a swh-model date_offset to its DB representation. Args: date_offset: a :mod:`swh.model` compatible date_offset Returns: dict: a dictionary with three keys: - timestamp: a date in ISO format - offset: the UTC offset in minutes - neg_utc_offset: a boolean indicating whether a null offset is negative or positive. """ if date_offset is None: return DEFAULT_DATE normalized = identifiers.normalize_timestamp(date_offset) ts = normalized["timestamp"] seconds = ts.get("seconds", 0) microseconds = ts.get("microseconds", 0) timestamp = datetime.datetime.fromtimestamp(seconds, datetime.timezone.utc) timestamp = timestamp.replace(microsecond=microseconds) return { # PostgreSQL supports isoformatted timestamps "timestamp": timestamp.isoformat(), "offset": normalized["offset"], "neg_utc_offset": normalized["negative_utc"], } def revision_to_db(rev): """Convert a swh-model revision to its database representation. """ revision = rev.to_dict() author = author_to_db(revision["author"]) date = date_to_db(revision["date"]) committer = author_to_db(revision["committer"]) committer_date = date_to_db(revision["committer_date"]) - metadata = revision["metadata"] - - if metadata and "extra_headers" in metadata: - metadata = metadata.copy() - extra_headers = git_headers_to_db(metadata["extra_headers"]) - metadata["extra_headers"] = extra_headers - return { "id": revision["id"], "author_fullname": author["fullname"], "author_name": author["name"], "author_email": author["email"], "date": date["timestamp"], "date_offset": date["offset"], "date_neg_utc_offset": date["neg_utc_offset"], "committer_fullname": committer["fullname"], "committer_name": committer["name"], "committer_email": committer["email"], "committer_date": committer_date["timestamp"], "committer_date_offset": committer_date["offset"], "committer_date_neg_utc_offset": committer_date["neg_utc_offset"], "type": revision["type"], "directory": revision["directory"], "message": revision["message"], - "metadata": metadata, + "metadata": revision["metadata"], "synthetic": revision["synthetic"], + "extra_headers": revision["extra_headers"], "parents": [ {"id": revision["id"], "parent_id": parent, "parent_rank": i,} for i, parent in enumerate(revision["parents"]) ], } def db_to_revision(db_revision): """Convert a database representation of a revision to its swh-model representation.""" author = db_to_author( db_revision["author_fullname"], db_revision["author_name"], db_revision["author_email"], ) date = db_to_date( db_revision["date"], db_revision["date_offset"], db_revision["date_neg_utc_offset"], ) committer = db_to_author( db_revision["committer_fullname"], db_revision["committer_name"], db_revision["committer_email"], ) committer_date = db_to_date( db_revision["committer_date"], db_revision["committer_date_offset"], db_revision["committer_date_neg_utc_offset"], ) - metadata = db_revision["metadata"] - - if metadata and "extra_headers" in metadata: - extra_headers = db_to_git_headers(metadata["extra_headers"]) - metadata["extra_headers"] = extra_headers - parents = [] if "parents" in db_revision: for parent in db_revision["parents"]: if parent: parents.append(parent) + metadata = db_revision["metadata"] + extra_headers = db_revision.get("extra_headers", ()) + if not extra_headers and metadata and "extra_headers" in metadata: + extra_headers = db_to_git_headers(metadata.pop("extra_headers")) + ret = { "id": db_revision["id"], "author": author, "date": date, "committer": committer, "committer_date": committer_date, "type": db_revision["type"], "directory": db_revision["directory"], "message": db_revision["message"], "metadata": metadata, "synthetic": db_revision["synthetic"], + "extra_headers": extra_headers, "parents": parents, } if "object_id" in db_revision: ret["object_id"] = db_revision["object_id"] return ret def release_to_db(rel): """Convert a swh-model release to its database representation. """ release = rel.to_dict() author = author_to_db(release["author"]) date = date_to_db(release["date"]) return { "id": release["id"], "author_fullname": author["fullname"], "author_name": author["name"], "author_email": author["email"], "date": date["timestamp"], "date_offset": date["offset"], "date_neg_utc_offset": date["neg_utc_offset"], "name": release["name"], "target": release["target"], "target_type": release["target_type"], "comment": release["message"], "synthetic": release["synthetic"], } def db_to_release(db_release): """Convert a database representation of a release to its swh-model representation. """ author = db_to_author( db_release["author_fullname"], db_release["author_name"], db_release["author_email"], ) date = db_to_date( db_release["date"], db_release["date_offset"], db_release["date_neg_utc_offset"] ) ret = { "author": author, "date": date, "id": db_release["id"], "name": db_release["name"], "message": db_release["comment"], "synthetic": db_release["synthetic"], "target": db_release["target"], "target_type": db_release["target_type"], } if "object_id" in db_release: ret["object_id"] = db_release["object_id"] return ret def origin_url_to_sha1(origin_url): """Convert an origin URL to a sha1. Encodes URL to utf-8.""" return MultiHash.from_data(origin_url.encode("utf-8"), {"sha1"}).digest()["sha1"] diff --git a/swh/storage/db.py b/swh/storage/db.py index 91935b46..41e83863 100644 --- a/swh/storage/db.py +++ b/swh/storage/db.py @@ -1,1287 +1,1288 @@ # Copyright (C) 2015-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import random import select from typing import Any, Dict, Iterable, List, Optional, Tuple, Union from swh.core.db import BaseDb from swh.core.db.db_utils import stored_procedure, jsonize from swh.core.db.db_utils import execute_values_generator from swh.model.model import OriginVisit, OriginVisitStatus, SHA1_SIZE class Db(BaseDb): """Proxy to the SWH DB, with wrappers around stored procedures """ def mktemp_dir_entry(self, entry_type, cur=None): self._cursor(cur).execute( "SELECT swh_mktemp_dir_entry(%s)", (("directory_entry_%s" % entry_type),) ) @stored_procedure("swh_mktemp_revision") def mktemp_revision(self, cur=None): pass @stored_procedure("swh_mktemp_release") def mktemp_release(self, cur=None): pass @stored_procedure("swh_mktemp_snapshot_branch") def mktemp_snapshot_branch(self, cur=None): pass def register_listener(self, notify_queue, cur=None): """Register a listener for NOTIFY queue `notify_queue`""" self._cursor(cur).execute("LISTEN %s" % notify_queue) def listen_notifies(self, timeout): """Listen to notifications for `timeout` seconds""" if select.select([self.conn], [], [], timeout) == ([], [], []): return else: self.conn.poll() while self.conn.notifies: yield self.conn.notifies.pop(0) @stored_procedure("swh_content_add") def content_add_from_temp(self, cur=None): pass @stored_procedure("swh_directory_add") def directory_add_from_temp(self, cur=None): pass @stored_procedure("swh_skipped_content_add") def skipped_content_add_from_temp(self, cur=None): pass @stored_procedure("swh_revision_add") def revision_add_from_temp(self, cur=None): pass @stored_procedure("swh_release_add") def release_add_from_temp(self, cur=None): pass def content_update_from_temp(self, keys_to_update, cur=None): cur = self._cursor(cur) cur.execute( """select swh_content_update(ARRAY[%s] :: text[])""" % keys_to_update ) content_get_metadata_keys = [ "sha1", "sha1_git", "sha256", "blake2s256", "length", "status", ] content_add_keys = content_get_metadata_keys + ["ctime"] skipped_content_keys = [ "sha1", "sha1_git", "sha256", "blake2s256", "length", "reason", "status", "origin", ] def content_get_metadata_from_sha1s(self, sha1s, cur=None): cur = self._cursor(cur) yield from execute_values_generator( cur, """ select t.sha1, %s from (values %%s) as t (sha1) inner join content using (sha1) """ % ", ".join(self.content_get_metadata_keys[1:]), ((sha1,) for sha1 in sha1s), ) def content_get_range(self, start, end, limit=None, cur=None): """Retrieve contents within range [start, end]. """ cur = self._cursor(cur) query = """select %s from content where %%s <= sha1 and sha1 <= %%s order by sha1 limit %%s""" % ", ".join( self.content_get_metadata_keys ) cur.execute(query, (start, end, limit)) yield from cur content_hash_keys = ["sha1", "sha1_git", "sha256", "blake2s256"] def content_missing_from_list(self, contents, cur=None): cur = self._cursor(cur) keys = ", ".join(self.content_hash_keys) equality = " AND ".join( ("t.%s = c.%s" % (key, key)) for key in self.content_hash_keys ) yield from execute_values_generator( cur, """ SELECT %s FROM (VALUES %%s) as t(%s) WHERE NOT EXISTS ( SELECT 1 FROM content c WHERE %s ) """ % (keys, keys, equality), (tuple(c[key] for key in self.content_hash_keys) for c in contents), ) def content_missing_per_sha1(self, sha1s, cur=None): cur = self._cursor(cur) yield from execute_values_generator( cur, """ SELECT t.sha1 FROM (VALUES %s) AS t(sha1) WHERE NOT EXISTS ( SELECT 1 FROM content c WHERE c.sha1 = t.sha1 )""", ((sha1,) for sha1 in sha1s), ) def content_missing_per_sha1_git(self, contents, cur=None): cur = self._cursor(cur) yield from execute_values_generator( cur, """ SELECT t.sha1_git FROM (VALUES %s) AS t(sha1_git) WHERE NOT EXISTS ( SELECT 1 FROM content c WHERE c.sha1_git = t.sha1_git )""", ((sha1,) for sha1 in contents), ) def skipped_content_missing(self, contents, cur=None): if not contents: return [] cur = self._cursor(cur) query = """SELECT * FROM (VALUES %s) AS t (%s) WHERE not exists (SELECT 1 FROM skipped_content s WHERE s.sha1 is not distinct from t.sha1::sha1 and s.sha1_git is not distinct from t.sha1_git::sha1 and s.sha256 is not distinct from t.sha256::bytea);""" % ( (", ".join("%s" for _ in contents)), ", ".join(self.content_hash_keys), ) cur.execute( query, [tuple(cont[key] for key in self.content_hash_keys) for cont in contents], ) yield from cur def snapshot_exists(self, snapshot_id, cur=None): """Check whether a snapshot with the given id exists""" cur = self._cursor(cur) cur.execute("""SELECT 1 FROM snapshot where id=%s""", (snapshot_id,)) return bool(cur.fetchone()) def snapshot_missing_from_list(self, snapshots, cur=None): cur = self._cursor(cur) yield from execute_values_generator( cur, """ SELECT id FROM (VALUES %s) as t(id) WHERE NOT EXISTS ( SELECT 1 FROM snapshot d WHERE d.id = t.id ) """, ((id,) for id in snapshots), ) def snapshot_add(self, snapshot_id, cur=None): """Add a snapshot from the temporary table""" cur = self._cursor(cur) cur.execute("""SELECT swh_snapshot_add(%s)""", (snapshot_id,)) snapshot_count_cols = ["target_type", "count"] def snapshot_count_branches(self, snapshot_id, cur=None): cur = self._cursor(cur) query = """\ SELECT %s FROM swh_snapshot_count_branches(%%s) """ % ", ".join( self.snapshot_count_cols ) cur.execute(query, (snapshot_id,)) yield from cur snapshot_get_cols = ["snapshot_id", "name", "target", "target_type"] def snapshot_get_by_id( self, snapshot_id, branches_from=b"", branches_count=None, target_types=None, cur=None, ): cur = self._cursor(cur) query = """\ SELECT %s FROM swh_snapshot_get_by_id(%%s, %%s, %%s, %%s :: snapshot_target[]) """ % ", ".join( self.snapshot_get_cols ) cur.execute(query, (snapshot_id, branches_from, branches_count, target_types)) yield from cur def snapshot_get_by_origin_visit(self, origin_url, visit_id, cur=None): cur = self._cursor(cur) query = """\ SELECT ovs.snapshot FROM origin_visit ov INNER JOIN origin o ON o.id = ov.origin INNER JOIN origin_visit_status ovs ON ov.origin = ovs.origin AND ov.visit = ovs.visit WHERE o.url=%s AND ov.visit=%s ORDER BY ovs.date DESC LIMIT 1 """ cur.execute(query, (origin_url, visit_id)) ret = cur.fetchone() if ret: return ret[0] def snapshot_get_random(self, cur=None): return self._get_random_row_from_table("snapshot", ["id"], "id", cur) content_find_cols = [ "sha1", "sha1_git", "sha256", "blake2s256", "length", "ctime", "status", ] def content_find( self, sha1=None, sha1_git=None, sha256=None, blake2s256=None, cur=None ): """Find the content optionally on a combination of the following checksums sha1, sha1_git, sha256 or blake2s256. Args: sha1: sha1 content git_sha1: the sha1 computed `a la git` sha1 of the content sha256: sha256 content blake2s256: blake2s256 content Returns: The tuple (sha1, sha1_git, sha256, blake2s256) if found or None. """ cur = self._cursor(cur) checksum_dict = { "sha1": sha1, "sha1_git": sha1_git, "sha256": sha256, "blake2s256": blake2s256, } where_parts = [] args = [] # Adds only those keys which have value other than None for algorithm in checksum_dict: if checksum_dict[algorithm] is not None: args.append(checksum_dict[algorithm]) where_parts.append(algorithm + "= %s") query = " AND ".join(where_parts) cur.execute( """SELECT %s FROM content WHERE %s """ % (",".join(self.content_find_cols), query), args, ) content = cur.fetchall() return content def content_get_random(self, cur=None): return self._get_random_row_from_table("content", ["sha1_git"], "sha1_git", cur) def directory_missing_from_list(self, directories, cur=None): cur = self._cursor(cur) yield from execute_values_generator( cur, """ SELECT id FROM (VALUES %s) as t(id) WHERE NOT EXISTS ( SELECT 1 FROM directory d WHERE d.id = t.id ) """, ((id,) for id in directories), ) directory_ls_cols = [ "dir_id", "type", "target", "name", "perms", "status", "sha1", "sha1_git", "sha256", "length", ] def directory_walk_one(self, directory, cur=None): cur = self._cursor(cur) cols = ", ".join(self.directory_ls_cols) query = "SELECT %s FROM swh_directory_walk_one(%%s)" % cols cur.execute(query, (directory,)) yield from cur def directory_walk(self, directory, cur=None): cur = self._cursor(cur) cols = ", ".join(self.directory_ls_cols) query = "SELECT %s FROM swh_directory_walk(%%s)" % cols cur.execute(query, (directory,)) yield from cur def directory_entry_get_by_path(self, directory, paths, cur=None): """Retrieve a directory entry by path. """ cur = self._cursor(cur) cols = ", ".join(self.directory_ls_cols) query = "SELECT %s FROM swh_find_directory_entry_by_path(%%s, %%s)" % cols cur.execute(query, (directory, paths)) data = cur.fetchone() if set(data) == {None}: return None return data def directory_get_random(self, cur=None): return self._get_random_row_from_table("directory", ["id"], "id", cur) def revision_missing_from_list(self, revisions, cur=None): cur = self._cursor(cur) yield from execute_values_generator( cur, """ SELECT id FROM (VALUES %s) as t(id) WHERE NOT EXISTS ( SELECT 1 FROM revision r WHERE r.id = t.id ) """, ((id,) for id in revisions), ) revision_add_cols = [ "id", "date", "date_offset", "date_neg_utc_offset", "committer_date", "committer_date_offset", "committer_date_neg_utc_offset", "type", "directory", "message", "author_fullname", "author_name", "author_email", "committer_fullname", "committer_name", "committer_email", "metadata", "synthetic", + "extra_headers", ] revision_get_cols = revision_add_cols + ["parents"] def origin_visit_add(self, origin, ts, type, cur=None): """Add a new origin_visit for origin origin at timestamp ts. Args: origin: origin concerned by the visit ts: the date of the visit type: type of loader for the visit Returns: The new visit index step for that origin """ cur = self._cursor(cur) self._cursor(cur).execute( "SELECT swh_origin_visit_add(%s, %s, %s)", (origin, ts, type) ) return cur.fetchone()[0] origin_visit_status_cols = [ "origin", "visit", "date", "status", "snapshot", "metadata", ] def origin_visit_status_add( self, visit_status: OriginVisitStatus, cur=None ) -> None: """Add new origin visit status """ assert self.origin_visit_status_cols[0] == "origin" assert self.origin_visit_status_cols[-1] == "metadata" cols = self.origin_visit_status_cols[1:-1] cur = self._cursor(cur) cur.execute( f"WITH origin_id as (select id from origin where url=%s) " f"INSERT INTO origin_visit_status " f"(origin, {', '.join(cols)}, metadata) " f"VALUES ((select id from origin_id), " f"{', '.join(['%s']*len(cols))}, %s) " f"ON CONFLICT (origin, visit, date) do nothing", [visit_status.origin] + [getattr(visit_status, key) for key in cols] + [jsonize(visit_status.metadata)], ) def origin_visit_add_with_id(self, origin_visit: OriginVisit, cur=None) -> None: """Insert origin visit when id are already set """ ov = origin_visit assert ov.visit is not None cur = self._cursor(cur) origin_visit_cols = ["origin", "visit", "date", "type"] query = """INSERT INTO origin_visit ({cols}) VALUES ((select id from origin where url=%s), {values}) ON CONFLICT (origin, visit) DO NOTHING""".format( cols=", ".join(origin_visit_cols), values=", ".join("%s" for col in origin_visit_cols[1:]), ) cur.execute(query, (ov.origin, ov.visit, ov.date, ov.type)) origin_visit_get_cols = [ "origin", "visit", "date", "type", "status", "metadata", "snapshot", ] origin_visit_select_cols = [ "o.url AS origin", "ov.visit", "ov.date", "ov.type AS type", "ovs.status", "ovs.metadata", "ovs.snapshot", ] origin_visit_status_select_cols = [ "o.url AS origin", "ovs.visit", "ovs.date", "ovs.status", "ovs.snapshot", "ovs.metadata", ] def _make_origin_visit_status( self, row: Optional[Tuple[Any]] ) -> Optional[Dict[str, Any]]: """Make an origin_visit_status dict out of a row """ if not row: return None return dict(zip(self.origin_visit_status_cols, row)) def origin_visit_status_get_latest( self, origin_url: str, visit: int, allowed_statuses: Optional[List[str]] = None, require_snapshot: bool = False, cur=None, ) -> Optional[Dict[str, Any]]: """Given an origin visit id, return its latest origin_visit_status """ cur = self._cursor(cur) query_parts = [ "SELECT %s" % ", ".join(self.origin_visit_status_select_cols), "FROM origin_visit_status ovs ", "INNER JOIN origin o ON o.id = ovs.origin", ] query_parts.append("WHERE o.url = %s") query_params: List[Any] = [origin_url] query_parts.append("AND ovs.visit = %s") query_params.append(visit) if require_snapshot: query_parts.append("AND ovs.snapshot is not null") if allowed_statuses: query_parts.append("AND ovs.status IN %s") query_params.append(tuple(allowed_statuses)) query_parts.append("ORDER BY ovs.date DESC LIMIT 1") query = "\n".join(query_parts) cur.execute(query, tuple(query_params)) row = cur.fetchone() return self._make_origin_visit_status(row) def origin_visit_get_all( self, origin_id, last_visit=None, order="asc", limit=None, cur=None ): """Retrieve all visits for origin with id origin_id. Args: origin_id: The occurrence's origin Yields: The visits for that origin """ cur = self._cursor(cur) assert order.lower() in ["asc", "desc"] query_parts = [ "SELECT DISTINCT ON (ov.visit) %s " % ", ".join(self.origin_visit_select_cols), "FROM origin_visit ov", "INNER JOIN origin o ON o.id = ov.origin", "INNER JOIN origin_visit_status ovs", "ON ov.origin = ovs.origin AND ov.visit = ovs.visit", ] query_parts.append("WHERE o.url = %s") query_params: List[Any] = [origin_id] if last_visit is not None: op_comparison = ">" if order == "asc" else "<" query_parts.append(f"and ov.visit {op_comparison} %s") query_params.append(last_visit) if order == "asc": query_parts.append("ORDER BY ov.visit ASC, ovs.date DESC") elif order == "desc": query_parts.append("ORDER BY ov.visit DESC, ovs.date DESC") else: assert False if limit is not None: query_parts.append("LIMIT %s") query_params.append(limit) query = "\n".join(query_parts) cur.execute(query, tuple(query_params)) yield from cur def origin_visit_get(self, origin_id, visit_id, cur=None): """Retrieve information on visit visit_id of origin origin_id. Args: origin_id: the origin concerned visit_id: The visit step for that origin Returns: The origin_visit information """ cur = self._cursor(cur) query = """\ SELECT %s FROM origin_visit ov INNER JOIN origin o ON o.id = ov.origin INNER JOIN origin_visit_status ovs ON ov.origin = ovs.origin AND ov.visit = ovs.visit WHERE o.url = %%s AND ov.visit = %%s ORDER BY ovs.date DESC LIMIT 1 """ % ( ", ".join(self.origin_visit_select_cols) ) cur.execute(query, (origin_id, visit_id)) r = cur.fetchall() if not r: return None return r[0] def origin_visit_find_by_date(self, origin, visit_date, cur=None): cur = self._cursor(cur) cur.execute( "SELECT * FROM swh_visit_find_by_date(%s, %s)", (origin, visit_date) ) rows = cur.fetchall() if rows: visit = dict(zip(self.origin_visit_get_cols, rows[0])) visit["origin"] = origin return visit def origin_visit_exists(self, origin_id, visit_id, cur=None): """Check whether an origin visit with the given ids exists""" cur = self._cursor(cur) query = "SELECT 1 FROM origin_visit where origin = %s AND visit = %s" cur.execute(query, (origin_id, visit_id)) return bool(cur.fetchone()) def origin_visit_get_latest( self, origin_id: str, type: Optional[str], allowed_statuses: Optional[Iterable[str]], require_snapshot: bool, cur=None, ): """Retrieve the most recent origin_visit of the given origin, with optional filters. Args: origin_id: the origin concerned type: Optional visit type to filter on allowed_statuses: the visit statuses allowed for the returned visit require_snapshot (bool): If True, only a visit with a known snapshot will be returned. Returns: The origin_visit information, or None if no visit matches. """ cur = self._cursor(cur) query_parts = [ "SELECT %s" % ", ".join(self.origin_visit_select_cols), "FROM origin_visit ov ", "INNER JOIN origin o ON o.id = ov.origin", "INNER JOIN origin_visit_status ovs ", "ON o.id = ovs.origin AND ov.visit = ovs.visit ", ] query_parts.append("WHERE o.url = %s") query_params: List[Any] = [origin_id] if type is not None: query_parts.append("AND ov.type = %s") query_params.append(type) if require_snapshot: query_parts.append("AND ovs.snapshot is not null") if allowed_statuses: query_parts.append("AND ovs.status IN %s") query_params.append(tuple(allowed_statuses)) query_parts.append( "ORDER BY ov.date DESC, ov.visit DESC, ovs.date DESC LIMIT 1" ) query = "\n".join(query_parts) cur.execute(query, tuple(query_params)) r = cur.fetchone() if not r: return None return r def origin_visit_get_random(self, type, cur=None): """Randomly select one origin visit that was full and in the last 3 months """ cur = self._cursor(cur) columns = ",".join(self.origin_visit_select_cols) query = f"""select {columns} from origin_visit ov inner join origin o on ov.origin=o.id inner join origin_visit_status ovs on ov.origin = ovs.origin and ov.visit = ovs.visit where ovs.status='full' and ov.type=%s and ov.date > now() - '3 months'::interval and random() < 0.1 limit 1 """ cur.execute(query, (type,)) return cur.fetchone() @staticmethod def mangle_query_key(key, main_table): if key == "id": return "t.id" if key == "parents": return """ ARRAY( SELECT rh.parent_id::bytea FROM revision_history rh WHERE rh.id = t.id ORDER BY rh.parent_rank )""" if "_" not in key: return "%s.%s" % (main_table, key) head, tail = key.split("_", 1) if head in ("author", "committer") and tail in ( "name", "email", "id", "fullname", ): return "%s.%s" % (head, tail) return "%s.%s" % (main_table, key) def revision_get_from_list(self, revisions, cur=None): cur = self._cursor(cur) query_keys = ", ".join( self.mangle_query_key(k, "revision") for k in self.revision_get_cols ) yield from execute_values_generator( cur, """ SELECT %s FROM (VALUES %%s) as t(sortkey, id) LEFT JOIN revision ON t.id = revision.id LEFT JOIN person author ON revision.author = author.id LEFT JOIN person committer ON revision.committer = committer.id ORDER BY sortkey """ % query_keys, ((sortkey, id) for sortkey, id in enumerate(revisions)), ) def revision_log(self, root_revisions, limit=None, cur=None): cur = self._cursor(cur) query = """SELECT %s FROM swh_revision_log(%%s, %%s) """ % ", ".join( self.revision_get_cols ) cur.execute(query, (root_revisions, limit)) yield from cur revision_shortlog_cols = ["id", "parents"] def revision_shortlog(self, root_revisions, limit=None, cur=None): cur = self._cursor(cur) query = """SELECT %s FROM swh_revision_list(%%s, %%s) """ % ", ".join( self.revision_shortlog_cols ) cur.execute(query, (root_revisions, limit)) yield from cur def revision_get_random(self, cur=None): return self._get_random_row_from_table("revision", ["id"], "id", cur) def release_missing_from_list(self, releases, cur=None): cur = self._cursor(cur) yield from execute_values_generator( cur, """ SELECT id FROM (VALUES %s) as t(id) WHERE NOT EXISTS ( SELECT 1 FROM release r WHERE r.id = t.id ) """, ((id,) for id in releases), ) object_find_by_sha1_git_cols = ["sha1_git", "type"] def object_find_by_sha1_git(self, ids, cur=None): cur = self._cursor(cur) yield from execute_values_generator( cur, """ WITH t (sha1_git) AS (VALUES %s), known_objects as (( select id as sha1_git, 'release'::object_type as type, object_id from release r where exists (select 1 from t where t.sha1_git = r.id) ) union all ( select id as sha1_git, 'revision'::object_type as type, object_id from revision r where exists (select 1 from t where t.sha1_git = r.id) ) union all ( select id as sha1_git, 'directory'::object_type as type, object_id from directory d where exists (select 1 from t where t.sha1_git = d.id) ) union all ( select sha1_git as sha1_git, 'content'::object_type as type, object_id from content c where exists (select 1 from t where t.sha1_git = c.sha1_git) )) select t.sha1_git as sha1_git, k.type from t left join known_objects k on t.sha1_git = k.sha1_git """, ((id,) for id in ids), ) def stat_counters(self, cur=None): cur = self._cursor(cur) cur.execute("SELECT * FROM swh_stat_counters()") yield from cur def origin_add(self, url, cur=None): """Insert a new origin and return the new identifier.""" insert = """INSERT INTO origin (url) values (%s) RETURNING url""" cur.execute(insert, (url,)) return cur.fetchone()[0] origin_cols = ["url"] def origin_get_by_url(self, origins, cur=None): """Retrieve origin `(type, url)` from urls if found.""" cur = self._cursor(cur) query = """SELECT %s FROM (VALUES %%s) as t(url) LEFT JOIN origin ON t.url = origin.url """ % ",".join( "origin." + col for col in self.origin_cols ) yield from execute_values_generator(cur, query, ((url,) for url in origins)) def origin_get_by_sha1(self, sha1s, cur=None): """Retrieve origin urls from sha1s if found.""" cur = self._cursor(cur) query = """SELECT %s FROM (VALUES %%s) as t(sha1) LEFT JOIN origin ON t.sha1 = digest(origin.url, 'sha1') """ % ",".join( "origin." + col for col in self.origin_cols ) yield from execute_values_generator(cur, query, ((sha1,) for sha1 in sha1s)) def origin_id_get_by_url(self, origins, cur=None): """Retrieve origin `(type, url)` from urls if found.""" cur = self._cursor(cur) query = """SELECT id FROM (VALUES %s) as t(url) LEFT JOIN origin ON t.url = origin.url """ for row in execute_values_generator(cur, query, ((url,) for url in origins)): yield row[0] origin_get_range_cols = ["id", "url"] def origin_get_range(self, origin_from=1, origin_count=100, cur=None): """Retrieve ``origin_count`` origins whose ids are greater or equal than ``origin_from``. Origins are sorted by id before retrieving them. Args: origin_from (int): the minimum id of origins to retrieve origin_count (int): the maximum number of origins to retrieve """ cur = self._cursor(cur) query = """SELECT %s FROM origin WHERE id >= %%s ORDER BY id LIMIT %%s """ % ",".join( self.origin_get_range_cols ) cur.execute(query, (origin_from, origin_count)) yield from cur def _origin_query( self, url_pattern, count=False, offset=0, limit=50, regexp=False, with_visit=False, cur=None, ): """ Method factorizing query creation for searching and counting origins. """ cur = self._cursor(cur) if count: origin_cols = "COUNT(*)" else: origin_cols = ",".join(self.origin_cols) query = """SELECT %s FROM origin o WHERE """ if with_visit: query += """ EXISTS ( SELECT 1 FROM origin_visit ov INNER JOIN origin_visit_status ovs ON ov.origin = ovs.origin AND ov.visit = ovs.visit INNER JOIN snapshot ON ovs.snapshot=snapshot.id WHERE ov.origin=o.id ) AND """ query += "url %s %%s " if not count: query += "ORDER BY id OFFSET %%s LIMIT %%s" if not regexp: query = query % (origin_cols, "ILIKE") query_params = ("%" + url_pattern + "%", offset, limit) else: query = query % (origin_cols, "~*") query_params = (url_pattern, offset, limit) if count: query_params = (query_params[0],) cur.execute(query, query_params) def origin_search( self, url_pattern, offset=0, limit=50, regexp=False, with_visit=False, cur=None ): """Search for origins whose urls contain a provided string pattern or match a provided regular expression. The search is performed in a case insensitive way. Args: url_pattern (str): the string pattern to search for in origin urls offset (int): number of found origins to skip before returning results limit (int): the maximum number of found origins to return regexp (bool): if True, consider the provided pattern as a regular expression and returns origins whose urls match it with_visit (bool): if True, filter out origins with no visit """ self._origin_query( url_pattern, offset=offset, limit=limit, regexp=regexp, with_visit=with_visit, cur=cur, ) yield from cur def origin_count(self, url_pattern, regexp=False, with_visit=False, cur=None): """Count origins whose urls contain a provided string pattern or match a provided regular expression. The pattern search in origin urls is performed in a case insensitive way. Args: url_pattern (str): the string pattern to search for in origin urls regexp (bool): if True, consider the provided pattern as a regular expression and returns origins whose urls match it with_visit (bool): if True, filter out origins with no visit """ self._origin_query( url_pattern, count=True, regexp=regexp, with_visit=with_visit, cur=cur ) return cur.fetchone()[0] release_add_cols = [ "id", "target", "target_type", "date", "date_offset", "date_neg_utc_offset", "name", "comment", "synthetic", "author_fullname", "author_name", "author_email", ] release_get_cols = release_add_cols def release_get_from_list(self, releases, cur=None): cur = self._cursor(cur) query_keys = ", ".join( self.mangle_query_key(k, "release") for k in self.release_get_cols ) yield from execute_values_generator( cur, """ SELECT %s FROM (VALUES %%s) as t(sortkey, id) LEFT JOIN release ON t.id = release.id LEFT JOIN person author ON release.author = author.id ORDER BY sortkey """ % query_keys, ((sortkey, id) for sortkey, id in enumerate(releases)), ) def release_get_random(self, cur=None): return self._get_random_row_from_table("release", ["id"], "id", cur) _object_metadata_context_cols = [ "origin", "visit", "snapshot", "release", "revision", "path", "directory", ] """The list of context columns for all artifact types.""" _object_metadata_insert_cols = [ "type", "id", "authority_id", "fetcher_id", "discovery_date", "format", "metadata", *_object_metadata_context_cols, ] """List of columns of the object_metadata table, used when writing metadata.""" _object_metadata_insert_query = f""" INSERT INTO object_metadata ({', '.join(_object_metadata_insert_cols)}) VALUES ({', '.join('%s' for _ in _object_metadata_insert_cols)}) ON CONFLICT (id, authority_id, discovery_date, fetcher_id) DO NOTHING """ object_metadata_get_cols = [ "id", "discovery_date", "metadata_authority.type", "metadata_authority.url", "metadata_fetcher.id", "metadata_fetcher.name", "metadata_fetcher.version", *_object_metadata_context_cols, "format", "metadata", ] """List of columns of the object_metadata, metadata_authority, and metadata_fetcher tables, used when reading object metadata.""" _object_metadata_select_query = f""" SELECT object_metadata.id AS id, {', '.join(object_metadata_get_cols[1:-1])}, object_metadata.metadata AS metadata FROM object_metadata INNER JOIN metadata_authority ON (metadata_authority.id=authority_id) INNER JOIN metadata_fetcher ON (metadata_fetcher.id=fetcher_id) WHERE object_metadata.id=%s AND authority_id=%s """ def object_metadata_add( self, object_type: str, id: str, context: Dict[str, Union[str, bytes, int]], discovery_date: datetime.datetime, authority_id: int, fetcher_id: int, format: str, metadata: bytes, cur, ): query = self._object_metadata_insert_query args: Dict[str, Any] = dict( type=object_type, id=id, authority_id=authority_id, fetcher_id=fetcher_id, discovery_date=discovery_date, format=format, metadata=metadata, ) for col in self._object_metadata_context_cols: args[col] = context.get(col) params = [args[col] for col in self._object_metadata_insert_cols] cur.execute(query, params) def object_metadata_get( self, object_type: str, id: str, authority_id: int, after_time: Optional[datetime.datetime], after_fetcher: Optional[int], limit: int, cur, ): query_parts = [self._object_metadata_select_query] args = [id, authority_id] if after_fetcher is not None: assert after_time query_parts.append("AND (discovery_date, fetcher_id) > (%s, %s)") args.extend([after_time, after_fetcher]) elif after_time is not None: query_parts.append("AND discovery_date > %s") args.append(after_time) query_parts.append("ORDER BY discovery_date, fetcher_id") if limit: query_parts.append("LIMIT %s") args.append(limit) cur.execute(" ".join(query_parts), args) yield from cur metadata_fetcher_cols = ["name", "version", "metadata"] def metadata_fetcher_add( self, name: str, version: str, metadata: bytes, cur=None ) -> None: cur = self._cursor(cur) cur.execute( "INSERT INTO metadata_fetcher (name, version, metadata) " "VALUES (%s, %s, %s) ON CONFLICT DO NOTHING", (name, version, jsonize(metadata)), ) def metadata_fetcher_get(self, name: str, version: str, cur=None): cur = self._cursor(cur) cur.execute( f"SELECT {', '.join(self.metadata_fetcher_cols)} " f"FROM metadata_fetcher " f"WHERE name=%s AND version=%s", (name, version), ) return cur.fetchone() def metadata_fetcher_get_id( self, name: str, version: str, cur=None ) -> Optional[int]: cur = self._cursor(cur) cur.execute( "SELECT id FROM metadata_fetcher WHERE name=%s AND version=%s", (name, version), ) row = cur.fetchone() if row: return row[0] else: return None metadata_authority_cols = ["type", "url", "metadata"] def metadata_authority_add( self, type: str, url: str, metadata: bytes, cur=None ) -> None: cur = self._cursor(cur) cur.execute( "INSERT INTO metadata_authority (type, url, metadata) " "VALUES (%s, %s, %s) ON CONFLICT DO NOTHING", (type, url, jsonize(metadata)), ) def metadata_authority_get(self, type: str, url: str, cur=None): cur = self._cursor(cur) cur.execute( f"SELECT {', '.join(self.metadata_authority_cols)} " f"FROM metadata_authority " f"WHERE type=%s AND url=%s", (type, url), ) return cur.fetchone() def metadata_authority_get_id(self, type: str, url: str, cur=None) -> Optional[int]: cur = self._cursor(cur) cur.execute( "SELECT id FROM metadata_authority WHERE type=%s AND url=%s", (type, url) ) row = cur.fetchone() if row: return row[0] else: return None def _get_random_row_from_table(self, table_name, cols, id_col, cur=None): random_sha1 = bytes(random.randint(0, 255) for _ in range(SHA1_SIZE)) cur = self._cursor(cur) query = """ (SELECT {cols} FROM {table} WHERE {id_col} >= %s ORDER BY {id_col} LIMIT 1) UNION (SELECT {cols} FROM {table} WHERE {id_col} < %s ORDER BY {id_col} DESC LIMIT 1) LIMIT 1 """.format( cols=", ".join(cols), table=table_name, id_col=id_col ) cur.execute(query, (random_sha1, random_sha1)) row = cur.fetchone() if row: return row[0] diff --git a/swh/storage/sql/30-swh-schema.sql b/swh/storage/sql/30-swh-schema.sql index 02b3b1ab..d267d380 100644 --- a/swh/storage/sql/30-swh-schema.sql +++ b/swh/storage/sql/30-swh-schema.sql @@ -1,497 +1,499 @@ --- --- SQL implementation of the Software Heritage data model --- -- schema versions create table dbversion ( version int primary key, release timestamptz, description text ); comment on table dbversion is 'Details of current db version'; comment on column dbversion.version is 'SQL schema version'; comment on column dbversion.release is 'Version deployment timestamp'; comment on column dbversion.description is 'Release description'; -- latest schema version insert into dbversion(version, release, description) - values(157, now(), 'Work In Progress'); + values(158, now(), 'Work In Progress'); -- a SHA1 checksum create domain sha1 as bytea check (length(value) = 20); -- a Git object ID, i.e., a Git-style salted SHA1 checksum create domain sha1_git as bytea check (length(value) = 20); -- a SHA256 checksum create domain sha256 as bytea check (length(value) = 32); -- a blake2 checksum create domain blake2s256 as bytea check (length(value) = 32); -- UNIX path (absolute, relative, individual path component, etc.) create domain unix_path as bytea; -- a set of UNIX-like access permissions, as manipulated by, e.g., chmod create domain file_perms as int; -- an SWHID create domain swhid as text check (value ~ '^swh:[0-9]+:.*'); -- Checksums about actual file content. Note that the content itself is not -- stored in the DB, but on external (key-value) storage. A single checksum is -- used as key there, but the other can be used to verify that we do not inject -- content collisions not knowingly. create table content ( sha1 sha1 not null, sha1_git sha1_git not null, sha256 sha256 not null, blake2s256 blake2s256 not null, length bigint not null, ctime timestamptz not null default now(), -- creation time, i.e. time of (first) injection into the storage status content_status not null default 'visible', object_id bigserial ); comment on table content is 'Checksums of file content which is actually stored externally'; comment on column content.sha1 is 'Content sha1 hash'; comment on column content.sha1_git is 'Git object sha1 hash'; comment on column content.sha256 is 'Content Sha256 hash'; comment on column content.blake2s256 is 'Content blake2s hash'; comment on column content.length is 'Content length'; comment on column content.ctime is 'First seen time'; comment on column content.status is 'Content status (absent, visible, hidden)'; comment on column content.object_id is 'Content identifier'; -- An origin is a place, identified by an URL, where software source code -- artifacts can be found. We support different kinds of origins, e.g., git and -- other VCS repositories, web pages that list tarballs URLs (e.g., -- http://www.kernel.org), indirect tarball URLs (e.g., -- http://www.example.org/latest.tar.gz), etc. The key feature of an origin is -- that it can be *fetched* from (wget, git clone, svn checkout, etc.) to -- retrieve all the contained software. create table origin ( id bigserial not null, url text not null ); comment on column origin.id is 'Artifact origin id'; comment on column origin.url is 'URL of origin'; -- Content blobs observed somewhere, but not ingested into the archive for -- whatever reason. This table is separate from the content table as we might -- not have the sha1 checksum of skipped contents (for instance when we inject -- git repositories, objects that are too big will be skipped here, and we will -- only know their sha1_git). 'reason' contains the reason the content was -- skipped. origin is a nullable column allowing to find out which origin -- contains that skipped content. create table skipped_content ( sha1 sha1, sha1_git sha1_git, sha256 sha256, blake2s256 blake2s256, length bigint not null, ctime timestamptz not null default now(), status content_status not null default 'absent', reason text not null, origin bigint, object_id bigserial ); comment on table skipped_content is 'Content blobs observed, but not ingested in the archive'; comment on column skipped_content.sha1 is 'Skipped content sha1 hash'; comment on column skipped_content.sha1_git is 'Git object sha1 hash'; comment on column skipped_content.sha256 is 'Skipped content sha256 hash'; comment on column skipped_content.blake2s256 is 'Skipped content blake2s hash'; comment on column skipped_content.length is 'Skipped content length'; comment on column skipped_content.ctime is 'First seen time'; comment on column skipped_content.status is 'Skipped content status (absent, visible, hidden)'; comment on column skipped_content.reason is 'Reason for skipping'; comment on column skipped_content.origin is 'Origin table identifier'; comment on column skipped_content.object_id is 'Skipped content identifier'; -- A file-system directory. A directory is a list of directory entries (see -- tables: directory_entry_{dir,file}). -- -- To list the contents of a directory: -- 1. list the contained directory_entry_dir using array dir_entries -- 2. list the contained directory_entry_file using array file_entries -- 3. list the contained directory_entry_rev using array rev_entries -- 4. UNION -- -- Synonyms/mappings: -- * git: tree create table directory ( id sha1_git not null, dir_entries bigint[], -- sub-directories, reference directory_entry_dir file_entries bigint[], -- contained files, reference directory_entry_file rev_entries bigint[], -- mounted revisions, reference directory_entry_rev object_id bigserial -- short object identifier ); comment on table directory is 'Contents of a directory, synonymous to tree (git)'; comment on column directory.id is 'Git object sha1 hash'; comment on column directory.dir_entries is 'Sub-directories, reference directory_entry_dir'; comment on column directory.file_entries is 'Contained files, reference directory_entry_file'; comment on column directory.rev_entries is 'Mounted revisions, reference directory_entry_rev'; comment on column directory.object_id is 'Short object identifier'; -- A directory entry pointing to a (sub-)directory. create table directory_entry_dir ( id bigserial, target sha1_git not null, -- id of target directory name unix_path not null, -- path name, relative to containing dir perms file_perms not null -- unix-like permissions ); comment on table directory_entry_dir is 'Directory entry for directory'; comment on column directory_entry_dir.id is 'Directory identifier'; comment on column directory_entry_dir.target is 'Target directory identifier'; comment on column directory_entry_dir.name is 'Path name, relative to containing directory'; comment on column directory_entry_dir.perms is 'Unix-like permissions'; -- A directory entry pointing to a file content. create table directory_entry_file ( id bigserial, target sha1_git not null, -- id of target file name unix_path not null, -- path name, relative to containing dir perms file_perms not null -- unix-like permissions ); comment on table directory_entry_file is 'Directory entry for file'; comment on column directory_entry_file.id is 'File identifier'; comment on column directory_entry_file.target is 'Target file identifier'; comment on column directory_entry_file.name is 'Path name, relative to containing directory'; comment on column directory_entry_file.perms is 'Unix-like permissions'; -- A directory entry pointing to a revision. create table directory_entry_rev ( id bigserial, target sha1_git not null, -- id of target revision name unix_path not null, -- path name, relative to containing dir perms file_perms not null -- unix-like permissions ); comment on table directory_entry_rev is 'Directory entry for revision'; comment on column directory_entry_dir.id is 'Revision identifier'; comment on column directory_entry_dir.target is 'Target revision in identifier'; comment on column directory_entry_dir.name is 'Path name, relative to containing directory'; comment on column directory_entry_dir.perms is 'Unix-like permissions'; -- A person referenced by some source code artifacts, e.g., a VCS revision or -- release metadata. create table person ( id bigserial, name bytea, -- advisory: not null if we managed to parse a name email bytea, -- advisory: not null if we managed to parse an email fullname bytea not null -- freeform specification; what is actually used in the checksums -- will usually be of the form 'name ' ); comment on table person is 'Person referenced in code artifact release metadata'; comment on column person.id is 'Person identifier'; comment on column person.name is 'Name'; comment on column person.email is 'Email'; comment on column person.fullname is 'Full name (raw name)'; -- The state of a source code tree at a specific point in time. -- -- Synonyms/mappings: -- * git / subversion / etc: commit -- * tarball: a specific tarball -- -- Revisions are organized as DAGs. Each revision points to 0, 1, or more (in -- case of merges) parent revisions. Each revision points to a directory, i.e., -- a file-system tree containing files and directories. create table revision ( id sha1_git not null, date timestamptz, date_offset smallint, committer_date timestamptz, committer_date_offset smallint, type revision_type not null, directory sha1_git, -- source code 'root' directory message bytea, author bigint, committer bigint, synthetic boolean not null default false, -- true iff revision has been created by Software Heritage metadata jsonb, -- extra metadata (tarball checksums, extra commit information, etc...) object_id bigserial, date_neg_utc_offset boolean, - committer_date_neg_utc_offset boolean + committer_date_neg_utc_offset boolean, + extra_headers bytea[][] -- extra headers (used in hash computation) ); comment on table revision is 'A revision represents the state of a source code tree at a specific point in time'; comment on column revision.id is 'Git-style SHA1 commit identifier'; comment on column revision.date is 'Author timestamp as UNIX epoch'; comment on column revision.date_offset is 'Author timestamp timezone, as minute offsets from UTC'; comment on column revision.date_neg_utc_offset is 'True indicates a -0 UTC offset on author timestamp'; comment on column revision.committer_date is 'Committer timestamp as UNIX epoch'; comment on column revision.committer_date_offset is 'Committer timestamp timezone, as minute offsets from UTC'; comment on column revision.committer_date_neg_utc_offset is 'True indicates a -0 UTC offset on committer timestamp'; comment on column revision.type is 'Type of revision'; comment on column revision.directory is 'Directory identifier'; comment on column revision.message is 'Commit message'; comment on column revision.author is 'Author identity'; comment on column revision.committer is 'Committer identity'; comment on column revision.synthetic is 'True iff revision has been synthesized by Software Heritage'; comment on column revision.metadata is 'Extra revision metadata'; comment on column revision.object_id is 'Non-intrinsic, sequential object identifier'; +comment on column revision.extra_headers is 'Extra revision headers; used in revision hash computation'; -- either this table or the sha1_git[] column on the revision table create table revision_history ( id sha1_git not null, parent_id sha1_git not null, parent_rank int not null default 0 -- parent position in merge commits, 0-based ); comment on table revision_history is 'Sequence of revision history with parent and position in history'; comment on column revision_history.id is 'Revision history git object sha1 checksum'; comment on column revision_history.parent_id is 'Parent revision git object identifier'; comment on column revision_history.parent_rank is 'Parent position in merge commits, 0-based'; -- Crawling history of software origins visited by Software Heritage. Each -- visit is a 3-way mapping between a software origin, a timestamp, and a -- snapshot object capturing the full-state of the origin at visit time. create table origin_visit ( origin bigint not null, visit bigint not null, date timestamptz not null, type text not null ); comment on column origin_visit.origin is 'Visited origin'; comment on column origin_visit.visit is 'Sequential visit number for the origin'; comment on column origin_visit.date is 'Visit timestamp'; comment on column origin_visit.type is 'Type of loader that did the visit (hg, git, ...)'; -- Crawling history of software origin visits by Software Heritage. Each -- visit see its history change through new origin visit status updates create table origin_visit_status ( origin bigint not null, visit bigint not null, date timestamptz not null, status origin_visit_state not null, metadata jsonb, snapshot sha1_git ); comment on column origin_visit_status.origin is 'Origin concerned by the visit update'; comment on column origin_visit_status.visit is 'Visit concerned by the visit update'; comment on column origin_visit_status.date is 'Visit update timestamp'; comment on column origin_visit_status.status is 'Visit status (ongoing, failed, full)'; comment on column origin_visit_status.metadata is 'Optional origin visit metadata'; comment on column origin_visit_status.snapshot is 'Optional, possibly partial, snapshot of the origin visit. It can be partial.'; -- A snapshot represents the entire state of a software origin as crawled by -- Software Heritage. This table is a simple mapping between (public) intrinsic -- snapshot identifiers and (private) numeric sequential identifiers. create table snapshot ( object_id bigserial not null, -- PK internal object identifier id sha1_git not null -- snapshot intrinsic identifier ); comment on table snapshot is 'State of a software origin as crawled by Software Heritage'; comment on column snapshot.object_id is 'Internal object identifier'; comment on column snapshot.id is 'Intrinsic snapshot identifier'; -- Each snapshot associate "branch" names to other objects in the Software -- Heritage Merkle DAG. This table describes branches as mappings between names -- and target typed objects. create table snapshot_branch ( object_id bigserial not null, -- PK internal object identifier name bytea not null, -- branch name, e.g., "master" or "feature/drag-n-drop" target bytea, -- target object identifier, e.g., a revision identifier target_type snapshot_target -- target object type, e.g., "revision" ); comment on table snapshot_branch is 'Associates branches with objects in Heritage Merkle DAG'; comment on column snapshot_branch.object_id is 'Internal object identifier'; comment on column snapshot_branch.name is 'Branch name'; comment on column snapshot_branch.target is 'Target object identifier'; comment on column snapshot_branch.target_type is 'Target object type'; -- Mapping between snapshots and their branches. create table snapshot_branches ( snapshot_id bigint not null, -- snapshot identifier, ref. snapshot.object_id branch_id bigint not null -- branch identifier, ref. snapshot_branch.object_id ); comment on table snapshot_branches is 'Mapping between snapshot and their branches'; comment on column snapshot_branches.snapshot_id is 'Snapshot identifier'; comment on column snapshot_branches.branch_id is 'Branch identifier'; -- A "memorable" point in time in the development history of a software -- project. -- -- Synonyms/mappings: -- * git: tag (of the annotated kind, otherwise they are just references) -- * tarball: the release version number create table release ( id sha1_git not null, target sha1_git, date timestamptz, date_offset smallint, name bytea, comment bytea, author bigint, synthetic boolean not null default false, -- true iff release has been created by Software Heritage object_id bigserial, target_type object_type not null, date_neg_utc_offset boolean ); comment on table release is 'Details of a software release, synonymous with a tag (git) or version number (tarball)'; comment on column release.id is 'Release git identifier'; comment on column release.target is 'Target git identifier'; comment on column release.date is 'Release timestamp'; comment on column release.date_offset is 'Timestamp offset from UTC'; comment on column release.name is 'Name'; comment on column release.comment is 'Comment'; comment on column release.author is 'Author'; comment on column release.synthetic is 'Indicates if created by Software Heritage'; comment on column release.object_id is 'Object identifier'; comment on column release.target_type is 'Object type (''content'', ''directory'', ''revision'', ''release'', ''snapshot'')'; comment on column release.date_neg_utc_offset is 'True indicates -0 UTC offset for release timestamp'; -- Tools create table metadata_fetcher ( id serial not null, name text not null, version text not null, metadata jsonb not null ); comment on table metadata_fetcher is 'Tools used to retrieve metadata'; comment on column metadata_fetcher.id is 'Internal identifier of the fetcher'; comment on column metadata_fetcher.name is 'Fetcher name'; comment on column metadata_fetcher.version is 'Fetcher version'; comment on column metadata_fetcher.metadata is 'Extra information about the fetcher'; create table metadata_authority ( id serial not null, type text not null, url text not null, metadata jsonb not null ); comment on table metadata_authority is 'Metadata authority information'; comment on column metadata_authority.id is 'Internal identifier of the authority'; comment on column metadata_authority.type is 'Type of authority (deposit/forge/registry)'; comment on column metadata_authority.url is 'Authority''s uri'; comment on column metadata_authority.metadata is 'Other metadata about authority'; -- Extrinsic metadata on a DAG objects and origins. create table object_metadata ( type text not null, id text not null, -- metadata source authority_id bigint not null, fetcher_id bigint not null, discovery_date timestamptz not null, -- metadata itself format text not null, metadata bytea not null, -- context origin text, visit bigint, snapshot swhid, release swhid, revision swhid, path bytea, directory swhid ); comment on table object_metadata is 'keeps all metadata found concerning an object'; comment on column object_metadata.type is 'the type of object (content/directory/revision/release/snapshot/origin) the metadata is on'; comment on column object_metadata.id is 'the SWHID or origin URL for which the metadata was found'; comment on column object_metadata.discovery_date is 'the date of retrieval'; comment on column object_metadata.authority_id is 'the metadata provider: github, openhub, deposit, etc.'; comment on column object_metadata.fetcher_id is 'the tool used for extracting metadata: loaders, crawlers, etc.'; comment on column object_metadata.format is 'name of the format of metadata, used by readers to interpret it.'; comment on column object_metadata.metadata is 'original metadata in opaque format'; -- Keep a cache of object counts create table object_counts ( object_type text, -- table for which we're counting objects (PK) value bigint, -- count of objects in the table last_update timestamptz, -- last update for the object count in this table single_update boolean -- whether we update this table standalone (true) or through bucketed counts (false) ); comment on table object_counts is 'Cache of object counts'; comment on column object_counts.object_type is 'Object type (''content'', ''directory'', ''revision'', ''release'', ''snapshot'')'; comment on column object_counts.value is 'Count of objects in the table'; comment on column object_counts.last_update is 'Last update for object count'; comment on column object_counts.single_update is 'standalone (true) or bucketed counts (false)'; create table object_counts_bucketed ( line serial not null, -- PK object_type text not null, -- table for which we're counting objects identifier text not null, -- identifier across which we're bucketing objects bucket_start bytea, -- lower bound (inclusive) for the bucket bucket_end bytea, -- upper bound (exclusive) for the bucket value bigint, -- count of objects in the bucket last_update timestamptz -- last update for the object count in this bucket ); comment on table object_counts_bucketed is 'Bucketed count for objects ordered by type'; comment on column object_counts_bucketed.line is 'Auto incremented idenitfier value'; comment on column object_counts_bucketed.object_type is 'Object type (''content'', ''directory'', ''revision'', ''release'', ''snapshot'')'; comment on column object_counts_bucketed.identifier is 'Common identifier for bucketed objects'; comment on column object_counts_bucketed.bucket_start is 'Lower bound (inclusive) for the bucket'; comment on column object_counts_bucketed.bucket_end is 'Upper bound (exclusive) for the bucket'; comment on column object_counts_bucketed.value is 'Count of objects in the bucket'; comment on column object_counts_bucketed.last_update is 'Last update for the object count in this bucket'; diff --git a/swh/storage/sql/40-swh-func.sql b/swh/storage/sql/40-swh-func.sql index 0a8a0a7f..e244ebf1 100644 --- a/swh/storage/sql/40-swh-func.sql +++ b/swh/storage/sql/40-swh-func.sql @@ -1,949 +1,950 @@ create or replace function hash_sha1(text) returns text as $$ select encode(digest($1, 'sha1'), 'hex') $$ language sql strict immutable; comment on function hash_sha1(text) is 'Compute SHA1 hash as text'; -- create a temporary table called tmp_TBLNAME, mimicking existing table -- TBLNAME -- -- Args: -- tblname: name of the table to mimic create or replace function swh_mktemp(tblname regclass) returns void language plpgsql as $$ begin execute format(' create temporary table if not exists tmp_%1$I (like %1$I including defaults) on commit delete rows; alter table tmp_%1$I drop column if exists object_id; ', tblname); return; end $$; -- create a temporary table for directory entries called tmp_TBLNAME, -- mimicking existing table TBLNAME with an extra dir_id (sha1_git) -- column, and dropping the id column. -- -- This is used to create the tmp_directory_entry_ tables. -- -- Args: -- tblname: name of the table to mimic create or replace function swh_mktemp_dir_entry(tblname regclass) returns void language plpgsql as $$ begin execute format(' create temporary table if not exists tmp_%1$I (like %1$I including defaults, dir_id sha1_git) on commit delete rows; alter table tmp_%1$I drop column if exists id; ', tblname); return; end $$; -- create a temporary table for revisions called tmp_revisions, -- mimicking existing table revision, replacing the foreign keys to -- people with an email and name field -- create or replace function swh_mktemp_revision() returns void language sql as $$ create temporary table if not exists tmp_revision ( like revision including defaults, author_fullname bytea, author_name bytea, author_email bytea, committer_fullname bytea, committer_name bytea, committer_email bytea ) on commit delete rows; alter table tmp_revision drop column if exists author; alter table tmp_revision drop column if exists committer; alter table tmp_revision drop column if exists object_id; $$; -- create a temporary table for releases called tmp_release, -- mimicking existing table release, replacing the foreign keys to -- people with an email and name field -- create or replace function swh_mktemp_release() returns void language sql as $$ create temporary table if not exists tmp_release ( like release including defaults, author_fullname bytea, author_name bytea, author_email bytea ) on commit delete rows; alter table tmp_release drop column if exists author; alter table tmp_release drop column if exists object_id; $$; -- create a temporary table for the branches of a snapshot create or replace function swh_mktemp_snapshot_branch() returns void language sql as $$ create temporary table if not exists tmp_snapshot_branch ( name bytea not null, target bytea, target_type snapshot_target ) on commit delete rows; $$; -- a content signature is a set of cryptographic checksums that we use to -- uniquely identify content, for the purpose of verifying if we already have -- some content or not during content injection create type content_signature as ( sha1 sha1, sha1_git sha1_git, sha256 sha256, blake2s256 blake2s256 ); -- check which entries of tmp_skipped_content are missing from skipped_content -- -- operates in bulk: 0. swh_mktemp(skipped_content), 1. COPY to tmp_skipped_content, -- 2. call this function create or replace function swh_skipped_content_missing() returns setof content_signature language plpgsql as $$ begin return query select sha1, sha1_git, sha256, blake2s256 from tmp_skipped_content t where not exists (select 1 from skipped_content s where s.sha1 is not distinct from t.sha1 and s.sha1_git is not distinct from t.sha1_git and s.sha256 is not distinct from t.sha256); return; end $$; -- add tmp_content entries to content, skipping duplicates -- -- operates in bulk: 0. swh_mktemp(content), 1. COPY to tmp_content, -- 2. call this function create or replace function swh_content_add() returns void language plpgsql as $$ begin insert into content (sha1, sha1_git, sha256, blake2s256, length, status, ctime) select distinct sha1, sha1_git, sha256, blake2s256, length, status, ctime from tmp_content; return; end $$; -- add tmp_skipped_content entries to skipped_content, skipping duplicates -- -- operates in bulk: 0. swh_mktemp(skipped_content), 1. COPY to tmp_skipped_content, -- 2. call this function create or replace function swh_skipped_content_add() returns void language plpgsql as $$ begin insert into skipped_content (sha1, sha1_git, sha256, blake2s256, length, status, reason, origin) select distinct sha1, sha1_git, sha256, blake2s256, length, status, reason, origin from tmp_skipped_content where (coalesce(sha1, ''), coalesce(sha1_git, ''), coalesce(sha256, '')) in ( select coalesce(sha1, ''), coalesce(sha1_git, ''), coalesce(sha256, '') from swh_skipped_content_missing() ); -- TODO XXX use postgres 9.5 "UPSERT" support here, when available. -- Specifically, using "INSERT .. ON CONFLICT IGNORE" we can avoid -- the extra swh_skipped_content_missing() query here. return; end $$; -- Update content entries from temporary table. -- (columns are potential new columns added to the schema, this cannot be empty) -- create or replace function swh_content_update(columns_update text[]) returns void language plpgsql as $$ declare query text; tmp_array text[]; begin if array_length(columns_update, 1) = 0 then raise exception 'Please, provide the list of column names to update.'; end if; tmp_array := array(select format('%1$s=t.%1$s', unnest) from unnest(columns_update)); query = format('update content set %s from tmp_content t where t.sha1 = content.sha1', array_to_string(tmp_array, ', ')); execute query; return; end $$; comment on function swh_content_update(text[]) IS 'Update existing content''s columns'; create type directory_entry_type as enum('file', 'dir', 'rev'); -- Add tmp_directory_entry_* entries to directory_entry_* and directory, -- skipping duplicates in directory_entry_*. This is a generic function that -- works on all kind of directory entries. -- -- operates in bulk: 0. swh_mktemp_dir_entry('directory_entry_*'), 1 COPY to -- tmp_directory_entry_*, 2. call this function -- -- Assumption: this function is used in the same transaction that inserts the -- context directory in table "directory". create or replace function swh_directory_entry_add(typ directory_entry_type) returns void language plpgsql as $$ begin execute format(' insert into directory_entry_%1$s (target, name, perms) select distinct t.target, t.name, t.perms from tmp_directory_entry_%1$s t where not exists ( select 1 from directory_entry_%1$s i where t.target = i.target and t.name = i.name and t.perms = i.perms) ', typ); execute format(' with new_entries as ( select t.dir_id, array_agg(i.id) as entries from tmp_directory_entry_%1$s t inner join directory_entry_%1$s i using (target, name, perms) group by t.dir_id ) update tmp_directory as d set %1$s_entries = new_entries.entries from new_entries where d.id = new_entries.dir_id ', typ); return; end $$; -- Insert the data from tmp_directory, tmp_directory_entry_file, -- tmp_directory_entry_dir, tmp_directory_entry_rev into their final -- tables. -- -- Prerequisites: -- directory ids in tmp_directory -- entries in tmp_directory_entry_{file,dir,rev} -- create or replace function swh_directory_add() returns void language plpgsql as $$ begin perform swh_directory_entry_add('file'); perform swh_directory_entry_add('dir'); perform swh_directory_entry_add('rev'); insert into directory select * from tmp_directory t where not exists ( select 1 from directory d where d.id = t.id); return; end $$; -- a directory listing entry with all the metadata -- -- can be used to list a directory, and retrieve all the data in one go. create type directory_entry as ( dir_id sha1_git, -- id of the parent directory type directory_entry_type, -- type of entry target sha1_git, -- id of target name unix_path, -- path name, relative to containing dir perms file_perms, -- unix-like permissions status content_status, -- visible or absent sha1 sha1, -- content if sha1 if type is not dir sha1_git sha1_git, -- content's sha1 git if type is not dir sha256 sha256, -- content's sha256 if type is not dir length bigint -- content length if type is not dir ); -- List a single level of directory walked_dir_id -- FIXME: order by name is not correct. For git, we need to order by -- lexicographic order but as if a trailing / is present in directory -- name create or replace function swh_directory_walk_one(walked_dir_id sha1_git) returns setof directory_entry language sql stable as $$ with dir as ( select id as dir_id, dir_entries, file_entries, rev_entries from directory where id = walked_dir_id), ls_d as (select dir_id, unnest(dir_entries) as entry_id from dir), ls_f as (select dir_id, unnest(file_entries) as entry_id from dir), ls_r as (select dir_id, unnest(rev_entries) as entry_id from dir) (select dir_id, 'dir'::directory_entry_type as type, e.target, e.name, e.perms, NULL::content_status, NULL::sha1, NULL::sha1_git, NULL::sha256, NULL::bigint from ls_d left join directory_entry_dir e on ls_d.entry_id = e.id) union (select dir_id, 'file'::directory_entry_type as type, e.target, e.name, e.perms, c.status, c.sha1, c.sha1_git, c.sha256, c.length from ls_f left join directory_entry_file e on ls_f.entry_id = e.id left join content c on e.target = c.sha1_git) union (select dir_id, 'rev'::directory_entry_type as type, e.target, e.name, e.perms, NULL::content_status, NULL::sha1, NULL::sha1_git, NULL::sha256, NULL::bigint from ls_r left join directory_entry_rev e on ls_r.entry_id = e.id) order by name; $$; -- List recursively the revision directory arborescence create or replace function swh_directory_walk(walked_dir_id sha1_git) returns setof directory_entry language sql stable as $$ with recursive entries as ( select dir_id, type, target, name, perms, status, sha1, sha1_git, sha256, length from swh_directory_walk_one(walked_dir_id) union all select dir_id, type, target, (dirname || '/' || name)::unix_path as name, perms, status, sha1, sha1_git, sha256, length from (select (swh_directory_walk_one(dirs.target)).*, dirs.name as dirname from (select target, name from entries where type = 'dir') as dirs) as with_parent ) select dir_id, type, target, name, perms, status, sha1, sha1_git, sha256, length from entries $$; -- Find a directory entry by its path create or replace function swh_find_directory_entry_by_path( walked_dir_id sha1_git, dir_or_content_path bytea[]) returns directory_entry language plpgsql as $$ declare end_index integer; paths bytea default ''; path bytea; res bytea[]; r record; begin end_index := array_upper(dir_or_content_path, 1); res[1] := walked_dir_id; for i in 1..end_index loop path := dir_or_content_path[i]; -- concatenate path for patching the name in the result record (if we found it) if i = 1 then paths = path; else paths := paths || '/' || path; -- concatenate paths end if; if i <> end_index then select * from swh_directory_walk_one(res[i] :: sha1_git) where name=path and type = 'dir' limit 1 into r; else select * from swh_directory_walk_one(res[i] :: sha1_git) where name=path limit 1 into r; end if; -- find the path if r is null then return null; else -- store the next dir to lookup the next local path from res[i+1] := r.target; end if; end loop; -- at this moment, r is the result. Patch its 'name' with the full path before returning it. r.name := paths; return r; end $$; -- List all revision IDs starting from a given revision, going back in time -- -- TODO ordering: should be breadth-first right now (what do we want?) -- TODO ordering: ORDER BY parent_rank somewhere? create or replace function swh_revision_list(root_revisions bytea[], num_revs bigint default NULL) returns table (id sha1_git, parents bytea[]) language sql stable as $$ with recursive full_rev_list(id) as ( (select id from revision where id = ANY(root_revisions)) union (select h.parent_id from revision_history as h join full_rev_list on h.id = full_rev_list.id) ), rev_list as (select id from full_rev_list limit num_revs) select rev_list.id as id, array(select rh.parent_id::bytea from revision_history rh where rh.id = rev_list.id order by rh.parent_rank ) as parent from rev_list; $$; -- Detailed entry for a revision create type revision_entry as ( id sha1_git, date timestamptz, date_offset smallint, date_neg_utc_offset boolean, committer_date timestamptz, committer_date_offset smallint, committer_date_neg_utc_offset boolean, type revision_type, directory sha1_git, message bytea, author_id bigint, author_fullname bytea, author_name bytea, author_email bytea, committer_id bigint, committer_fullname bytea, committer_name bytea, committer_email bytea, metadata jsonb, synthetic boolean, + extra_headers bytea[][], parents bytea[], object_id bigint ); -- "git style" revision log. Similar to swh_revision_list(), but returning all -- information associated to each revision, and expanding authors/committers create or replace function swh_revision_log(root_revisions bytea[], num_revs bigint default NULL) returns setof revision_entry language sql stable as $$ select t.id, r.date, r.date_offset, r.date_neg_utc_offset, r.committer_date, r.committer_date_offset, r.committer_date_neg_utc_offset, r.type, r.directory, r.message, a.id, a.fullname, a.name, a.email, c.id, c.fullname, c.name, c.email, - r.metadata, r.synthetic, t.parents, r.object_id + r.metadata, r.synthetic, r.extra_headers, t.parents, r.object_id from swh_revision_list(root_revisions, num_revs) as t left join revision r on t.id = r.id left join person a on a.id = r.author left join person c on c.id = r.committer; $$; -- Detailed entry for a release create type release_entry as ( id sha1_git, target sha1_git, target_type object_type, date timestamptz, date_offset smallint, date_neg_utc_offset boolean, name bytea, comment bytea, synthetic boolean, author_id bigint, author_fullname bytea, author_name bytea, author_email bytea, object_id bigint ); -- Create entries in person from tmp_revision create or replace function swh_person_add_from_revision() returns void language plpgsql as $$ begin with t as ( select author_fullname as fullname, author_name as name, author_email as email from tmp_revision union select committer_fullname as fullname, committer_name as name, committer_email as email from tmp_revision ) insert into person (fullname, name, email) select distinct on (fullname) fullname, name, email from t where not exists ( select 1 from person p where t.fullname = p.fullname ); return; end $$; -- Create entries in revision from tmp_revision create or replace function swh_revision_add() returns void language plpgsql as $$ begin perform swh_person_add_from_revision(); - insert into revision (id, date, date_offset, date_neg_utc_offset, committer_date, committer_date_offset, committer_date_neg_utc_offset, type, directory, message, author, committer, metadata, synthetic) - select t.id, t.date, t.date_offset, t.date_neg_utc_offset, t.committer_date, t.committer_date_offset, t.committer_date_neg_utc_offset, t.type, t.directory, t.message, a.id, c.id, t.metadata, t.synthetic + insert into revision (id, date, date_offset, date_neg_utc_offset, committer_date, committer_date_offset, committer_date_neg_utc_offset, type, directory, message, author, committer, metadata, synthetic, extra_headers) + select t.id, t.date, t.date_offset, t.date_neg_utc_offset, t.committer_date, t.committer_date_offset, t.committer_date_neg_utc_offset, t.type, t.directory, t.message, a.id, c.id, t.metadata, t.synthetic, t.extra_headers from tmp_revision t left join person a on a.fullname = t.author_fullname left join person c on c.fullname = t.committer_fullname; return; end $$; -- Create entries in person from tmp_release create or replace function swh_person_add_from_release() returns void language plpgsql as $$ begin with t as ( select distinct author_fullname as fullname, author_name as name, author_email as email from tmp_release where author_fullname is not null ) insert into person (fullname, name, email) select distinct on (fullname) fullname, name, email from t where not exists ( select 1 from person p where t.fullname = p.fullname ); return; end $$; -- Create entries in release from tmp_release create or replace function swh_release_add() returns void language plpgsql as $$ begin perform swh_person_add_from_release(); insert into release (id, target, target_type, date, date_offset, date_neg_utc_offset, name, comment, author, synthetic) select distinct t.id, t.target, t.target_type, t.date, t.date_offset, t.date_neg_utc_offset, t.name, t.comment, a.id, t.synthetic from tmp_release t left join person a on a.fullname = t.author_fullname where not exists (select 1 from release where t.id = release.id); return; end $$; -- add a new origin_visit for origin origin_id at date. -- -- Returns the new visit id. create or replace function swh_origin_visit_add(origin_url text, date timestamptz, type text) returns bigint language sql as $$ with origin_id as ( select id from origin where url = origin_url ), last_known_visit as ( select coalesce(max(visit), 0) as visit from origin_visit where origin = (select id from origin_id) ) insert into origin_visit (origin, date, type, visit) values ((select id from origin_id), date, type, (select visit from last_known_visit) + 1) returning visit; $$; create or replace function swh_snapshot_add(snapshot_id sha1_git) returns void language plpgsql as $$ declare snapshot_object_id snapshot.object_id%type; begin select object_id from snapshot where id = snapshot_id into snapshot_object_id; if snapshot_object_id is null then insert into snapshot (id) values (snapshot_id) returning object_id into snapshot_object_id; insert into snapshot_branch (name, target_type, target) select name, target_type, target from tmp_snapshot_branch tmp where not exists ( select 1 from snapshot_branch sb where sb.name = tmp.name and sb.target = tmp.target and sb.target_type = tmp.target_type ) on conflict do nothing; insert into snapshot_branches (snapshot_id, branch_id) select snapshot_object_id, sb.object_id as branch_id from tmp_snapshot_branch tmp join snapshot_branch sb using (name, target, target_type) where tmp.target is not null and tmp.target_type is not null union select snapshot_object_id, sb.object_id as branch_id from tmp_snapshot_branch tmp join snapshot_branch sb using (name) where tmp.target is null and tmp.target_type is null and sb.target is null and sb.target_type is null; end if; truncate table tmp_snapshot_branch; end; $$; create type snapshot_result as ( snapshot_id sha1_git, name bytea, target bytea, target_type snapshot_target ); create or replace function swh_snapshot_get_by_id(id sha1_git, branches_from bytea default '', branches_count bigint default null, target_types snapshot_target[] default NULL) returns setof snapshot_result language sql stable as $$ -- with small limits, the "naive" version of this query can degenerate into -- using the deduplication index on snapshot_branch (name, target, -- target_type); The planner happily scans several hundred million rows. -- Do the query in two steps: first pull the relevant branches for the given -- snapshot (filtering them by type), then do the limiting. This two-step -- process guides the planner into using the proper index. with filtered_snapshot_branches as ( select swh_snapshot_get_by_id.id as snapshot_id, name, target, target_type from snapshot_branches inner join snapshot_branch on snapshot_branches.branch_id = snapshot_branch.object_id where snapshot_id = (select object_id from snapshot where snapshot.id = swh_snapshot_get_by_id.id) and (target_types is null or target_type = any(target_types)) order by name ) select snapshot_id, name, target, target_type from filtered_snapshot_branches where name >= branches_from order by name limit branches_count; $$; create type snapshot_size as ( target_type snapshot_target, count bigint ); create or replace function swh_snapshot_count_branches(id sha1_git) returns setof snapshot_size language sql stable as $$ SELECT target_type, count(name) from swh_snapshot_get_by_id(swh_snapshot_count_branches.id) group by target_type; $$; -- Absolute path: directory reference + complete path relative to it create type content_dir as ( directory sha1_git, path unix_path ); -- Find the containing directory of a given content, specified by sha1 -- (note: *not* sha1_git). -- -- Return a pair (dir_it, path) where path is a UNIX path that, from the -- directory root, reach down to a file with the desired content. Return NULL -- if no match is found. -- -- In case of multiple paths (i.e., pretty much always), an arbitrary one is -- chosen. create or replace function swh_content_find_directory(content_id sha1) returns content_dir language sql stable as $$ with recursive path as ( -- Recursively build a path from the requested content to a root -- directory. Each iteration returns a pair (dir_id, filename) where -- filename is relative to dir_id. Stops when no parent directory can -- be found. (select dir.id as dir_id, dir_entry_f.name as name, 0 as depth from directory_entry_file as dir_entry_f join content on content.sha1_git = dir_entry_f.target join directory as dir on dir.file_entries @> array[dir_entry_f.id] where content.sha1 = content_id limit 1) union all (select dir.id as dir_id, (dir_entry_d.name || '/' || path.name)::unix_path as name, path.depth + 1 from path join directory_entry_dir as dir_entry_d on dir_entry_d.target = path.dir_id join directory as dir on dir.dir_entries @> array[dir_entry_d.id] limit 1) ) select dir_id, name from path order by depth desc limit 1; $$; -- Find the visit of origin closest to date visit_date -- Breaks ties by selecting the largest visit id create or replace function swh_visit_find_by_date(origin_url text, visit_date timestamptz default NOW()) returns setof origin_visit language plpgsql stable as $$ declare origin_id bigint; begin select id into origin_id from origin where url=origin_url; return query with closest_two_visits as (( select ov, (date - visit_date), visit as interval from origin_visit ov where ov.origin = origin_id and ov.date >= visit_date order by ov.date asc, ov.visit desc limit 1 ) union ( select ov, (visit_date - date), visit as interval from origin_visit ov where ov.origin = origin_id and ov.date < visit_date order by ov.date desc, ov.visit desc limit 1 )) select (ov).* from closest_two_visits order by interval, visit limit 1; end $$; -- Object listing by object_id create or replace function swh_content_list_by_object_id( min_excl bigint, max_incl bigint ) returns setof content language sql stable as $$ select * from content where object_id > min_excl and object_id <= max_incl order by object_id; $$; create or replace function swh_revision_list_by_object_id( min_excl bigint, max_incl bigint ) returns setof revision_entry language sql stable as $$ with revs as ( select * from revision where object_id > min_excl and object_id <= max_incl ) select r.id, r.date, r.date_offset, r.date_neg_utc_offset, r.committer_date, r.committer_date_offset, r.committer_date_neg_utc_offset, r.type, r.directory, r.message, - a.id, a.fullname, a.name, a.email, c.id, c.fullname, c.name, c.email, r.metadata, r.synthetic, + a.id, a.fullname, a.name, a.email, c.id, c.fullname, c.name, c.email, r.metadata, r.synthetic, r.extra_headers, array(select rh.parent_id::bytea from revision_history rh where rh.id = r.id order by rh.parent_rank) as parents, r.object_id from revs r left join person a on a.id = r.author left join person c on c.id = r.committer order by r.object_id; $$; create or replace function swh_release_list_by_object_id( min_excl bigint, max_incl bigint ) returns setof release_entry language sql stable as $$ with rels as ( select * from release where object_id > min_excl and object_id <= max_incl ) select r.id, r.target, r.target_type, r.date, r.date_offset, r.date_neg_utc_offset, r.name, r.comment, r.synthetic, p.id as author_id, p.fullname as author_fullname, p.name as author_name, p.email as author_email, r.object_id from rels r left join person p on p.id = r.author order by r.object_id; $$; -- simple counter mapping a textual label to an integer value create type counter as ( label text, value bigint ); -- return statistics about the number of tuples in various SWH tables -- -- Note: the returned values are based on postgres internal statistics -- (pg_class table), which are only updated daily (by autovacuum) or so create or replace function swh_stat_counters() returns setof counter language sql stable as $$ select object_type as label, value as value from object_counts where object_type in ( 'content', 'directory', 'directory_entry_dir', 'directory_entry_file', 'directory_entry_rev', 'origin', 'origin_visit', 'person', 'release', 'revision', 'revision_history', 'skipped_content', 'snapshot' ); $$; create or replace function swh_update_counter(object_type text) returns void language plpgsql as $$ begin execute format(' insert into object_counts (value, last_update, object_type) values ((select count(*) from %1$I), NOW(), %1$L) on conflict (object_type) do update set value = excluded.value, last_update = excluded.last_update', object_type); return; end; $$; create or replace function swh_update_counter_bucketed() returns void language plpgsql as $$ declare query text; line_to_update int; new_value bigint; begin select object_counts_bucketed.line, format( 'select count(%I) from %I where %s', coalesce(identifier, '*'), object_type, coalesce( concat_ws( ' and ', case when bucket_start is not null then format('%I >= %L', identifier, bucket_start) -- lower bound condition, inclusive end, case when bucket_end is not null then format('%I < %L', identifier, bucket_end) -- upper bound condition, exclusive end ), 'true' ) ) from object_counts_bucketed order by coalesce(last_update, now() - '1 month'::interval) asc limit 1 into line_to_update, query; execute query into new_value; update object_counts_bucketed set value = new_value, last_update = now() where object_counts_bucketed.line = line_to_update; END $$; create or replace function swh_update_counters_from_buckets() returns trigger language plpgsql as $$ begin with to_update as ( select object_type, sum(value) as value, max(last_update) as last_update from object_counts_bucketed ob1 where not exists ( select 1 from object_counts_bucketed ob2 where ob1.object_type = ob2.object_type and value is null ) group by object_type ) update object_counts set value = to_update.value, last_update = to_update.last_update from to_update where object_counts.object_type = to_update.object_type and object_counts.value != to_update.value; return null; end $$; create trigger update_counts_from_bucketed after insert or update on object_counts_bucketed for each row when (NEW.line % 256 = 0) execute procedure swh_update_counters_from_buckets(); diff --git a/swh/storage/tests/storage_data.py b/swh/storage/tests/storage_data.py index 11e94c86..842e7af7 100644 --- a/swh/storage/tests/storage_data.py +++ b/swh/storage/tests/storage_data.py @@ -1,598 +1,601 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from swh.model.hashutil import hash_to_bytes, hash_to_hex from swh.model import from_disk class StorageData: def __getattr__(self, key): try: v = globals()[key] except KeyError as e: raise AttributeError(e.args[0]) if hasattr(v, "copy"): return v.copy() return v data = StorageData() cont = { "data": b"42\n", "length": 3, "sha1": hash_to_bytes("34973274ccef6ab4dfaaf86599792fa9c3fe4689"), "sha1_git": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), "sha256": hash_to_bytes( "673650f936cb3b0a2f93ce09d81be10748b1b203c19e8176b4eefc1964a0cf3a" ), "blake2s256": hash_to_bytes( "d5fe1939576527e42cfd76a9455a2432fe7f56669564577dd93c4280e76d661d" ), "status": "visible", } cont2 = { "data": b"4242\n", "length": 5, "sha1": hash_to_bytes("61c2b3a30496d329e21af70dd2d7e097046d07b7"), "sha1_git": hash_to_bytes("36fade77193cb6d2bd826161a0979d64c28ab4fa"), "sha256": hash_to_bytes( "859f0b154fdb2d630f45e1ecae4a862915435e663248bb8461d914696fc047cd" ), "blake2s256": hash_to_bytes( "849c20fad132b7c2d62c15de310adfe87be94a379941bed295e8141c6219810d" ), "status": "visible", } cont3 = { "data": b"424242\n", "length": 7, "sha1": hash_to_bytes("3e21cc4942a4234c9e5edd8a9cacd1670fe59f13"), "sha1_git": hash_to_bytes("c932c7649c6dfa4b82327d121215116909eb3bea"), "sha256": hash_to_bytes( "92fb72daf8c6818288a35137b72155f507e5de8d892712ab96277aaed8cf8a36" ), "blake2s256": hash_to_bytes( "76d0346f44e5a27f6bafdd9c2befd304aff83780f93121d801ab6a1d4769db11" ), "status": "visible", "ctime": "2019-12-01 00:00:00Z", } contents = (cont, cont2, cont3) missing_cont = { "data": b"missing\n", "length": 8, "sha1": hash_to_bytes("f9c24e2abb82063a3ba2c44efd2d3c797f28ac90"), "sha1_git": hash_to_bytes("33e45d56f88993aae6a0198013efa80716fd8919"), "sha256": hash_to_bytes( "6bbd052ab054ef222c1c87be60cd191addedd24cc882d1f5f7f7be61dc61bb3a" ), "blake2s256": hash_to_bytes( "306856b8fd879edb7b6f1aeaaf8db9bbecc993cd7f776c333ac3a782fa5c6eba" ), "status": "absent", } skipped_cont = { "length": 1024 * 1024 * 200, "sha1_git": hash_to_bytes("33e45d56f88993aae6a0198013efa80716fd8920"), "sha1": hash_to_bytes("43e45d56f88993aae6a0198013efa80716fd8920"), "sha256": hash_to_bytes( "7bbd052ab054ef222c1c87be60cd191addedd24cc882d1f5f7f7be61dc61bb3a" ), "blake2s256": hash_to_bytes( "ade18b1adecb33f891ca36664da676e12c772cc193778aac9a137b8dc5834b9b" ), "reason": "Content too long", "status": "absent", "origin": "file:///dev/zero", } skipped_cont2 = { "length": 1024 * 1024 * 300, "sha1_git": hash_to_bytes("44e45d56f88993aae6a0198013efa80716fd8921"), "sha1": hash_to_bytes("54e45d56f88993aae6a0198013efa80716fd8920"), "sha256": hash_to_bytes( "8cbd052ab054ef222c1c87be60cd191addedd24cc882d1f5f7f7be61dc61bb3a" ), "blake2s256": hash_to_bytes( "9ce18b1adecb33f891ca36664da676e12c772cc193778aac9a137b8dc5834b9b" ), "reason": "Content too long", "status": "absent", } skipped_contents = (skipped_cont, skipped_cont2) dir = { "id": hash_to_bytes("34f335a750111ca0a8b64d8034faec9eedc396be"), "entries": ( { "name": b"foo", "type": "file", "target": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), # cont "perms": from_disk.DentryPerms.content, }, { "name": b"bar\xc3", "type": "dir", "target": b"12345678901234567890", "perms": from_disk.DentryPerms.directory, }, ), } dir2 = { "id": hash_to_bytes("8505808532953da7d2581741f01b29c04b1cb9ab"), "entries": ( { "name": b"oof", "type": "file", "target": hash_to_bytes( # cont2 "36fade77193cb6d2bd826161a0979d64c28ab4fa" ), "perms": from_disk.DentryPerms.content, }, ), } dir3 = { "id": hash_to_bytes("4ea8c6b2f54445e5dd1a9d5bb2afd875d66f3150"), "entries": ( { "name": b"foo", "type": "file", "target": hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), # cont "perms": from_disk.DentryPerms.content, }, { "name": b"subdir", "type": "dir", "target": hash_to_bytes("34f335a750111ca0a8b64d8034faec9eedc396be"), # dir "perms": from_disk.DentryPerms.directory, }, { "name": b"hello", "type": "file", "target": b"12345678901234567890", "perms": from_disk.DentryPerms.content, }, ), } dir4 = { "id": hash_to_bytes("377aa5fcd944fbabf502dbfda55cd14d33c8c3c6"), "entries": ( { "name": b"subdir1", "type": "dir", "target": hash_to_bytes("4ea8c6b2f54445e5dd1a9d5bb2afd875d66f3150"), # dir3 "perms": from_disk.DentryPerms.directory, }, ), } directories = (dir, dir2, dir3, dir4) minus_offset = datetime.timezone(datetime.timedelta(minutes=-120)) plus_offset = datetime.timezone(datetime.timedelta(minutes=120)) revision = { "id": hash_to_bytes("066b1b62dbfa033362092af468bf6cfabec230e7"), "message": b"hello", "author": { "name": b"Nicolas Dandrimont", "email": b"nicolas@example.com", "fullname": b"Nicolas Dandrimont ", }, "date": { "timestamp": {"seconds": 1234567890, "microseconds": 0}, "offset": 120, "negative_utc": False, }, "committer": { "name": b"St\xc3fano Zacchiroli", "email": b"stefano@example.com", "fullname": b"St\xc3fano Zacchiroli ", }, "committer_date": { "timestamp": {"seconds": 1123456789, "microseconds": 0}, "offset": 0, "negative_utc": True, }, "parents": (b"01234567890123456789", b"23434512345123456789"), "type": "git", "directory": hash_to_bytes("34f335a750111ca0a8b64d8034faec9eedc396be"), # dir "metadata": { "checksums": {"sha1": "tarball-sha1", "sha256": "tarball-sha256",}, "signed-off-by": "some-dude", - "extra_headers": [ - ["gpgsig", b"test123"], - ["mergetag", b"foo\\bar"], - ["mergetag", b"\x22\xaf\x89\x80\x01\x00"], - ], }, + "extra_headers": ( + (b"gpgsig", b"test123"), + (b"mergetag", b"foo\\bar"), + (b"mergetag", b"\x22\xaf\x89\x80\x01\x00"), + ), "synthetic": True, } revision2 = { "id": hash_to_bytes("df7a6f6a99671fb7f7343641aff983a314ef6161"), "message": b"hello again", "author": { "name": b"Roberto Dicosmo", "email": b"roberto@example.com", "fullname": b"Roberto Dicosmo ", }, "date": { "timestamp": {"seconds": 1234567843, "microseconds": 220000,}, "offset": -720, "negative_utc": False, }, "committer": { "name": b"tony", "email": b"ar@dumont.fr", "fullname": b"tony ", }, "committer_date": { "timestamp": {"seconds": 1123456789, "microseconds": 0}, "offset": 0, "negative_utc": False, }, "parents": (b"01234567890123456789",), "type": "git", "directory": hash_to_bytes("8505808532953da7d2581741f01b29c04b1cb9ab"), # dir2 "metadata": None, + "extra_headers": (), "synthetic": False, } revision3 = { "id": hash_to_bytes("2cbd7bb22c653bbb23a29657852a50a01b591d46"), "message": b"a simple revision with no parents this time", "author": { "name": b"Roberto Dicosmo", "email": b"roberto@example.com", "fullname": b"Roberto Dicosmo ", }, "date": { "timestamp": {"seconds": 1234567843, "microseconds": 220000,}, "offset": -720, "negative_utc": False, }, "committer": { "name": b"tony", "email": b"ar@dumont.fr", "fullname": b"tony ", }, "committer_date": { "timestamp": {"seconds": 1127351742, "microseconds": 0}, "offset": 0, "negative_utc": False, }, "parents": (), "type": "git", "directory": hash_to_bytes("8505808532953da7d2581741f01b29c04b1cb9ab"), # dir2 "metadata": None, + "extra_headers": (), "synthetic": True, } revision4 = { "id": hash_to_bytes("88cd5126fc958ed70089d5340441a1c2477bcc20"), "message": b"parent of self.revision2", "author": { "name": b"me", "email": b"me@soft.heri", "fullname": b"me ", }, "date": { "timestamp": {"seconds": 1244567843, "microseconds": 220000,}, "offset": -720, "negative_utc": False, }, "committer": { "name": b"committer-dude", "email": b"committer@dude.com", "fullname": b"committer-dude ", }, "committer_date": { "timestamp": {"seconds": 1244567843, "microseconds": 220000,}, "offset": -720, "negative_utc": False, }, "parents": ( hash_to_bytes("2cbd7bb22c653bbb23a29657852a50a01b591d46"), ), # revision3 "type": "git", "directory": hash_to_bytes("34f335a750111ca0a8b64d8034faec9eedc396be"), # dir "metadata": None, + "extra_headers": (), "synthetic": False, } revisions = (revision, revision2, revision3, revision4) origin = { "url": "file:///dev/null", } origin2 = { "url": "file:///dev/zero", } origins = (origin, origin2) metadata_authority = { "type": "deposit", "url": "http://hal.inria.example.com/", "metadata": {"location": "France"}, } metadata_authority2 = { "type": "registry", "url": "http://wikidata.example.com/", "metadata": {}, } metadata_fetcher = { "name": "swh-deposit", "version": "0.0.1", "metadata": {"sword_version": "2"}, } metadata_fetcher2 = { "name": "swh-example", "version": "0.0.1", "metadata": {}, } date_visit1 = datetime.datetime(2015, 1, 1, 23, 0, 0, tzinfo=datetime.timezone.utc) type_visit1 = "git" date_visit2 = datetime.datetime(2017, 1, 1, 23, 0, 0, tzinfo=datetime.timezone.utc) type_visit2 = "hg" date_visit3 = datetime.datetime(2018, 1, 1, 23, 0, 0, tzinfo=datetime.timezone.utc) type_visit3 = "deb" release = { "id": hash_to_bytes("a673e617fcc6234e29b2cad06b8245f96c415c61"), "name": b"v0.0.1", "author": { "name": b"olasd", "email": b"nic@olasd.fr", "fullname": b"olasd ", }, "date": { "timestamp": {"seconds": 1234567890, "microseconds": 0}, "offset": 42, "negative_utc": False, }, "target": b"43210987654321098765", "target_type": "revision", "message": b"synthetic release", "synthetic": True, } release2 = { "id": hash_to_bytes("6902bd4c82b7d19a421d224aedab2b74197e420d"), "name": b"v0.0.2", "author": { "name": b"tony", "email": b"ar@dumont.fr", "fullname": b"tony ", }, "date": { "timestamp": {"seconds": 1634366813, "microseconds": 0}, "offset": -120, "negative_utc": False, }, "target": b"432109\xa9765432\xc309\x00765", "target_type": "revision", "message": b"v0.0.2\nMisc performance improvements + bug fixes", "synthetic": False, } release3 = { "id": hash_to_bytes("3e9050196aa288264f2a9d279d6abab8b158448b"), "name": b"v0.0.2", "author": { "name": b"tony", "email": b"tony@ardumont.fr", "fullname": b"tony ", }, "date": { "timestamp": {"seconds": 1634336813, "microseconds": 0}, "offset": 0, "negative_utc": False, }, "target": hash_to_bytes("df7a6f6a99671fb7f7343641aff983a314ef6161"), "target_type": "revision", "message": b"yet another synthetic release", "synthetic": True, } releases = (release, release2, release3) snapshot = { "id": hash_to_bytes("409ee1ff3f10d166714bc90581debfd0446dda57"), "branches": { b"master": { "target": hash_to_bytes("066b1b62dbfa033362092af468bf6cfabec230e7"), "target_type": "revision", }, }, } empty_snapshot = { "id": hash_to_bytes("1a8893e6a86f444e8be8e7bda6cb34fb1735a00e"), "branches": {}, } complete_snapshot = { "id": hash_to_bytes("a56ce2d81c190023bb99a3a36279307522cb85f6"), "branches": { b"directory": { "target": hash_to_bytes("1bd0e65f7d2ff14ae994de17a1e7fe65111dcad8"), "target_type": "directory", }, b"directory2": { "target": hash_to_bytes("1bd0e65f7d2ff14ae994de17a1e7fe65111dcad8"), "target_type": "directory", }, b"content": { "target": hash_to_bytes("fe95a46679d128ff167b7c55df5d02356c5a1ae1"), "target_type": "content", }, b"alias": {"target": b"revision", "target_type": "alias",}, b"revision": { "target": hash_to_bytes("aafb16d69fd30ff58afdd69036a26047f3aebdc6"), "target_type": "revision", }, b"release": { "target": hash_to_bytes("7045404f3d1c54e6473c71bbb716529fbad4be24"), "target_type": "release", }, b"snapshot": { "target": hash_to_bytes("1a8893e6a86f444e8be8e7bda6cb34fb1735a00e"), "target_type": "snapshot", }, b"dangling": None, }, } snapshots = (snapshot, empty_snapshot, complete_snapshot) content_metadata = { "id": f"swh:1:cnt:{cont['sha1_git']}", "context": {"origin": origin["url"]}, "discovery_date": datetime.datetime( 2015, 1, 1, 21, 0, 0, tzinfo=datetime.timezone.utc ), "authority": { "type": metadata_authority["type"], "url": metadata_authority["url"], }, "fetcher": { "name": metadata_fetcher["name"], "version": metadata_fetcher["version"], }, "format": "json", "metadata": b'{"foo": "bar"}', } content_metadata2 = { "id": f"swh:1:cnt:{cont['sha1_git']}", "context": {"origin": origin2["url"]}, "discovery_date": datetime.datetime( 2017, 1, 1, 22, 0, 0, tzinfo=datetime.timezone.utc ), "authority": { "type": metadata_authority["type"], "url": metadata_authority["url"], }, "fetcher": { "name": metadata_fetcher["name"], "version": metadata_fetcher["version"], }, "format": "yaml", "metadata": b"foo: bar", } content_metadata3 = { "id": f"swh:1:cnt:{cont['sha1_git']}", "context": { "origin": origin["url"], "visit": 42, "snapshot": f"swh:1:snp:{hash_to_hex(snapshot['id'])}", "release": f"swh:1:rel:{hash_to_hex(release['id'])}", "revision": f"swh:1:rev:{hash_to_hex(revision['id'])}", "directory": f"swh:1:dir:{hash_to_hex(dir['id'])}", "path": b"/foo/bar", }, "discovery_date": datetime.datetime( 2017, 1, 1, 22, 0, 0, tzinfo=datetime.timezone.utc ), "authority": { "type": metadata_authority2["type"], "url": metadata_authority2["url"], }, "fetcher": { "name": metadata_fetcher2["name"], "version": metadata_fetcher2["version"], }, "format": "yaml", "metadata": b"foo: bar", } origin_metadata = { "origin_url": origin["url"], "discovery_date": datetime.datetime( 2015, 1, 1, 21, 0, 0, tzinfo=datetime.timezone.utc ), "authority": { "type": metadata_authority["type"], "url": metadata_authority["url"], }, "fetcher": { "name": metadata_fetcher["name"], "version": metadata_fetcher["version"], }, "format": "json", "metadata": b'{"foo": "bar"}', } origin_metadata2 = { "origin_url": origin["url"], "discovery_date": datetime.datetime( 2017, 1, 1, 22, 0, 0, tzinfo=datetime.timezone.utc ), "authority": { "type": metadata_authority["type"], "url": metadata_authority["url"], }, "fetcher": { "name": metadata_fetcher["name"], "version": metadata_fetcher["version"], }, "format": "yaml", "metadata": b"foo: bar", } origin_metadata3 = { "origin_url": origin["url"], "discovery_date": datetime.datetime( 2017, 1, 1, 22, 0, 0, tzinfo=datetime.timezone.utc ), "authority": { "type": metadata_authority2["type"], "url": metadata_authority2["url"], }, "fetcher": { "name": metadata_fetcher2["name"], "version": metadata_fetcher2["version"], }, "format": "yaml", "metadata": b"foo: bar", } person = { "name": b"John Doe", "email": b"john.doe@institute.org", "fullname": b"John Doe ", } objects = { "content": contents, "skipped_content": skipped_contents, "directory": directories, "revision": revisions, "origin": origins, "release": releases, "snapshot": snapshots, } diff --git a/swh/storage/tests/test_converters.py b/swh/storage/tests/test_converters.py index 83f63e11..bd60590d 100644 --- a/swh/storage/tests/test_converters.py +++ b/swh/storage/tests/test_converters.py @@ -1,160 +1,151 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.storage import converters def test_date_to_db(): date_to_db = converters.date_to_db assert date_to_db(None) == {"timestamp": None, "offset": 0, "neg_utc_offset": None} assert date_to_db( {"timestamp": 1234567890, "offset": 120, "negative_utc": False,} ) == { "timestamp": "2009-02-13T23:31:30+00:00", "offset": 120, "neg_utc_offset": False, } assert date_to_db( {"timestamp": 1123456789, "offset": 0, "negative_utc": True,} ) == { "timestamp": "2005-08-07T23:19:49+00:00", "offset": 0, "neg_utc_offset": True, } assert date_to_db( {"timestamp": 1234567890, "offset": 42, "negative_utc": False,} ) == { "timestamp": "2009-02-13T23:31:30+00:00", "offset": 42, "neg_utc_offset": False, } assert date_to_db( {"timestamp": 1634366813, "offset": -120, "negative_utc": False,} ) == { "timestamp": "2021-10-16T06:46:53+00:00", "offset": -120, "neg_utc_offset": False, } def test_db_to_author(): # when actual_author = converters.db_to_author(b"fullname", b"name", b"email") # then assert actual_author == { "fullname": b"fullname", "name": b"name", "email": b"email", } def test_db_to_author_none(): # when actual_author = converters.db_to_author(None, None, None) # then assert actual_author is None def test_db_to_revision(): # when actual_revision = converters.db_to_revision( { "id": "revision-id", "date": None, "date_offset": None, "date_neg_utc_offset": None, "committer_date": None, "committer_date_offset": None, "committer_date_neg_utc_offset": None, "type": "rev", "directory": b"dir-sha1", "message": b"commit message", "author_fullname": b"auth-fullname", "author_name": b"auth-name", "author_email": b"auth-email", "committer_fullname": b"comm-fullname", "committer_name": b"comm-name", "committer_email": b"comm-email", "metadata": {}, "synthetic": False, + "extra_headers": (), "parents": [123, 456], } ) # then assert actual_revision == { "id": "revision-id", "author": { "fullname": b"auth-fullname", "name": b"auth-name", "email": b"auth-email", }, "date": None, "committer": { "fullname": b"comm-fullname", "name": b"comm-name", "email": b"comm-email", }, "committer_date": None, "type": "rev", "directory": b"dir-sha1", "message": b"commit message", "metadata": {}, "synthetic": False, + "extra_headers": (), "parents": [123, 456], } def test_db_to_release(): # when actual_release = converters.db_to_release( { "id": b"release-id", "target": b"revision-id", "target_type": "revision", "date": None, "date_offset": None, "date_neg_utc_offset": None, "name": b"release-name", "comment": b"release comment", "synthetic": True, "author_fullname": b"auth-fullname", "author_name": b"auth-name", "author_email": b"auth-email", } ) # then assert actual_release == { "author": { "fullname": b"auth-fullname", "name": b"auth-name", "email": b"auth-email", }, "date": None, "id": b"release-id", "name": b"release-name", "message": b"release comment", "synthetic": True, "target": b"revision-id", "target_type": "revision", } - - -def test_db_to_git_headers(): - raw_data = [ - ["gpgsig", b"garbage\x89a\x43b\x14"], - ["extra", [b"foo\\\\\\o", b"bar\\", b"inval\\\\\x99id"]], - ] - - db_data = converters.git_headers_to_db(raw_data) - loop = converters.db_to_git_headers(db_data) - assert raw_data == loop diff --git a/swh/storage/tests/test_revision_bw_compat.py b/swh/storage/tests/test_revision_bw_compat.py new file mode 100644 index 00000000..1acfe638 --- /dev/null +++ b/swh/storage/tests/test_revision_bw_compat.py @@ -0,0 +1,48 @@ +# Copyright (C) 2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import attr + +from swh.core.utils import decode_with_escape +from swh.model.model import Revision +from swh.storage import get_storage +from swh.storage.tests.storage_data import data +from swh.storage.tests.test_storage import db_transaction + + +def headers_to_db(git_headers): + return [[key, decode_with_escape(value)] for key, value in git_headers] + + +def test_revision_extra_header_in_metadata(swh_storage_backend_config): + storage = get_storage(**swh_storage_backend_config) + rev = Revision.from_dict(data.revision) + + md_w_extra = dict( + rev.metadata.items(), + extra_headers=headers_to_db( + [ + ["gpgsig", b"test123"], + ["mergetag", b"foo\\bar"], + ["mergetag", b"\x22\xaf\x89\x80\x01\x00"], + ] + ), + ) + + bw_rev = attr.evolve(rev, extra_headers=()) + object.__setattr__(bw_rev, "metadata", md_w_extra) + assert bw_rev.extra_headers == () + + assert storage.revision_add([bw_rev]) == {"revision:add": 1} + + # check data in the db are old format + with db_transaction(storage) as (_, cur): + cur.execute("SELECT metadata, extra_headers FROM revision") + metadata, extra_headers = cur.fetchone() + assert extra_headers == [] + assert metadata == bw_rev.metadata + + # check the Revision build from revision_get is the original, "new style", Revision + assert [Revision.from_dict(x) for x in storage.revision_get([rev.id])] == [rev] diff --git a/version.txt b/version.txt index f01b6532..2998efce 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.9.3-0-g8010848 \ No newline at end of file +v0.10.0-0-g5ab7023 \ No newline at end of file