diff --git a/.gitignore b/.gitignore index dda3e7a86..78bac0f03 100644 --- a/.gitignore +++ b/.gitignore @@ -1,12 +1,13 @@ *.pyc *.sw? *~ .coverage .eggs/ __pycache__ build/ dist/ *.egg-info version.txt .vscode/ .hypothesis/ +/.tox/ diff --git a/PKG-INFO b/PKG-INFO index 1926a6943..270b87200 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,153 +1,153 @@ Metadata-Version: 2.1 Name: swh.storage -Version: 0.0.108 +Version: 0.0.109 Summary: Software Heritage storage manager Home-page: https://forge.softwareheritage.org/diffusion/DSTO/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-storage Description: swh-storage =========== Abstraction layer over the archive, allowing to access all stored source code artifacts as well as their metadata. See the [documentation](https://docs.softwareheritage.org/devel/swh-storage/index.html) for more details. Tests ----- Python tests for this module include tests that cannot be run without a local Postgres database. You are not obliged to run those tests though: - `make test`: will run all tests - `make test-nodb`: will run only tests that do not need a local DB - `make test-db`: will run only tests that do need a local DB If you do want to run DB-related tests, you should ensure you have access zith sufficient privileges to a Postgresql database. ### Using your system database You need to ensure that your user is authorized to create and drop DBs, and in particular DBs named "softwareheritage-test" and "softwareheritage-dev" Note: the testdata repository (swh-storage-testdata) is not required any more. ### Using pifpaf [pifpaf](https://github.com/jd/pifpaf) is a suite of fixtures and a command-line tool that allows to start and stop daemons for a quick throw-away usage. It can be used to run tests that need a Postgres database without any other configuration reauired nor the need to have special access to a running database: ```bash $ pifpaf run postgresql make test-db [snip] ---------------------------------------------------------------------- Ran 124 tests in 56.203s OK ``` Note that pifpaf is not yet available as a Debian package, so you may have to install it in a venv. Development ----------- A test server could locally be running for tests. ### Sample configuration In either /etc/softwareheritage/storage/storage.yml, ~/.config/swh/storage.yml or ~/.swh/storage.yml: ``` storage: cls: local args: db: "dbname=softwareheritage-dev user=" objstorage: cls: pathslicing args: root: /home/storage/swh-storage/ slicing: 0:2/2:4/4:6 ``` which means, this uses: - a local storage instance whose db connection is to softwareheritage-dev local instance - the objstorage uses a local objstorage instance whose: - root path is /home/storage/swh-storage - slicing scheme is 0:2/2:4/4:6. This means that the identifier of the content (sha1) which will be stored on disk at first level with the first 2 hex characters, the second level with the next 2 hex characters and the third level with the next 2 hex characters. And finally the complete hash file holding the raw content. For example: 00062f8bd330715c4f819373653d97b3cd34394c will be stored at 00/06/2f/00062f8bd330715c4f819373653d97b3cd34394c Note that the 'root' path should exist on disk. ### Run server Command: ``` python3 -m swh.storage.api.server ~/.config/swh/storage.yml ``` This runs a local swh-storage api at 5002 port. ### And then what? In your upper layer (loader-git, loader-svn, etc...), you can define a remote storage with this snippet of yaml configuration. ``` storage: cls: remote args: url: http://localhost:5002/ ``` You could directly define a local storage with the following snippet: ``` storage: cls: local args: db: service=swh-dev objstorage: cls: pathslicing args: root: /home/storage/swh-storage/ slicing: 0:2/2:4/4:6 ``` Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Description-Content-Type: text/markdown Provides-Extra: listener Provides-Extra: schemata Provides-Extra: testing diff --git a/docs/Makefile.local b/docs/Makefile.local index 0c6746349..a07ab9132 100644 --- a/docs/Makefile.local +++ b/docs/Makefile.local @@ -1,21 +1,25 @@ sphinx/html: sql-autodoc images sphinx/clean: clean-sql-autodoc clean-images sql-autodoc: make -C ../sql/ doc images: make -C images/ clean-images: make -C images/ clean clean: clean-sql-autodoc clean-images clean-sql-autodoc: make -C ../sql/ clean +distclean: clean distclean-sql-autodoc +distclean-sql-autodoc: + make -C ../sql/ distclean + .PHONY: sql-autodoc clean-sql-autodoc images clean-images # Local Variables: # mode: makefile # End: diff --git a/docs/archive-copies.rst b/docs/archive-copies.rst index 04f7e5ec3..8359b58e0 100644 --- a/docs/archive-copies.rst +++ b/docs/archive-copies.rst @@ -1,45 +1,47 @@ +:orphan: + .. _archive-copies: Archive copies ============== .. _swh-storage-copies-layout: .. figure:: images/swh-archive-copies.svg :width: 1024px :align: center Layout of Software Heritage archive copies (click to zoom). The Software Heritage archive exists in several copies, to minimize the risk of losing archived source code artifacts. The layout of existing copies, their relationships, as well as their geographical and administrative domains are shown in the layout diagram above. We recall that the archive is conceptually organized as a graph, and specifically a Merkle DAG, see :ref:`data-model` for more information. Ingested source code artifacts land directly on the **primary copy**, which is updated live and also used as reference for deduplication purposes. There, different parts of the Merkle DAG as stored using different backend technologies. The leaves of the graph, i.e., *content objects* (or "blobs"), are stored in a key-value object storage, using their SHA1 identifiers as keys (see :ref:`persistent-identifiers`). SHA1 collision avoidance is enforced by the :mod:`swh.storage` module. The *rest of the graph* is stored in a Postgres database (see :ref:`sql-storage`). At the time of writing, the primary object storage contains about 5 billion blobs with a median size of 3 KB---yes, that is *a lot of very small files*---for a total compressed size of about 200 TB. The Postgres database takes about 8 TB, half of which required by indexes. In terms of graph metrics, the Merkle DAG has about 10 B nodes and 100 B edges. The **secondary copy** is hosted on Microsoft Azure cloud, using its native blob storage for the object storage and a large virtual machine to run a Postgres instance there. The database is kept up-to-date w.r.t. the primary copy using Postgres WAL replication. The object storage is kept up-to-date using :mod:`swh.archiver`. Archive copies (as opposed to archive mirrors) are operated by the Software Heritage Team at Inria. The primary archived copy is geographically located at Rocquencourt, France; the secondary copy hosted in the Europe West region of the Azure cloud. diff --git a/docs/images/.gitignore b/docs/images/.gitignore index bceb72061..542dcd328 100644 --- a/docs/images/.gitignore +++ b/docs/images/.gitignore @@ -1,2 +1,2 @@ -archive-copies.pdf -archive-copies.svg +swh-archive-copies.pdf +swh-archive-copies.svg diff --git a/docs/images/swh-archive-copies.pdf b/docs/images/swh-archive-copies.pdf deleted file mode 100644 index ecad80798..000000000 Binary files a/docs/images/swh-archive-copies.pdf and /dev/null differ diff --git a/docs/images/swh-archive-copies.svg b/docs/images/swh-archive-copies.svg deleted file mode 100644 index 57e583935..000000000 --- a/docs/images/swh-archive-copies.svg +++ /dev/null @@ -1,119 +0,0 @@ - - - - - - - - - - - - - - Postgres - - - - Primary copy - - - - - - Object - storage - - - - ~200 TB - ~5 B contents - (i.e., "blobs") - - - ~8 TB - Merkle DAG - ~10 B nodes - ~100 B edges - - - - - - - Postgres - - - - Secondary copy - - - - - - Object - storage - - - - - - - - replication - - - - - - - archiver - - - hosted on - Azure VM - - - hosted on - Azure blob storage - - - located at Rocquencourt, France - - - located in Azure Europe West region - - - managed by Software Heritage team at Inria - - - - - - Backup - - - - - - - - barman - - - - - software - origins - - - - - - - - ingestion - - - - diff --git a/docs/sql-storage.rst b/docs/sql-storage.rst index aa3d834a3..01cc2e610 100644 --- a/docs/sql-storage.rst +++ b/docs/sql-storage.rst @@ -1,14 +1,16 @@ +:orphan: + .. _sql-storage: SQL storage =========== Postgres DB schema ------------------ .. _swh-storage-db-schema: .. figure:: ../sql/doc/sql/db-schema.svg :width: 1024px :align: center Postgres DB schema of high-level Software Heritage storage (click to zoom). diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 000000000..afa4cf37b --- /dev/null +++ b/pytest.ini @@ -0,0 +1,2 @@ +[pytest] +norecursedirs = docs diff --git a/requirements-test.txt b/requirements-test.txt index 4af3b14a4..b5ca69c38 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,2 +1,2 @@ hypothesis >= 3.11.0 -nose +pytest diff --git a/sql/Makefile b/sql/Makefile index c7caa91da..19cfb435c 100644 --- a/sql/Makefile +++ b/sql/Makefile @@ -1,78 +1,78 @@ # Depends: postgresql-client, postgresql-autodoc DBNAME = softwareheritage-dev DOCDIR = autodoc SQL_INIT = 10-swh-init.sql SQL_ENUMS = 20-swh-enums.sql SQL_SCHEMA = 30-swh-schema.sql SQL_FUNC = 40-swh-func.sql SQL_INDEX = 60-swh-indexes.sql SQL_TRIGGER = 70-swh-triggers.sql SQLS = $(SQL_INIT) $(SQL_ENUMS) $(SQL_SCHEMA) $(SQL_FUNC) $(SQL_INDEX) $(SQL_TRIGGER) SQL_FILES = $(abspath $(addprefix $(CURDIR)/../swh/storage/sql/,$(SQLS))) PSQL_BIN = psql -PSQL_FLAGS = --echo-all -X -v ON_ERROR_STOP=1 +PSQL_FLAGS = --echo-errors -X -v ON_ERROR_STOP=1 PSQL = $(PSQL_BIN) $(PSQL_FLAGS) PIFPAF=$(findstring postgresql://,$(PIFPAF_URLS)) all: createdb: createdb-stamp createdb-stamp: $(SQL_FILES) -ifndef PIFPAF +ifeq ($(PIFPAF),) -dropdb $(DBNAME) endif createdb $(DBNAME) -ifndef PIFPAF +ifeq ($(PIFPAF),) touch $@ else rm -f $@ endif filldb: filldb-stamp filldb-stamp: createdb-stamp cat $(SQL_FILES) | $(PSQL) $(DBNAME) -ifndef PIFPAF +ifeq ($(PIFPAF),) touch $@ else rm -f $@ endif dropdb: -dropdb $(DBNAME) dumpdb: swh.dump swh.dump: filldb-stamp pg_dump -Fc $(DBNAME) > $@ $(DOCDIR): test -d $(DOCDIR)/ || mkdir $(DOCDIR) doc: autodoc-stamp $(DOCDIR)/db-schema.pdf $(DOCDIR)/db-schema.svg autodoc-stamp: filldb-stamp $(DOCDIR) postgresql_autodoc -d $(DBNAME) -f $(DOCDIR)/db-schema cp -a $(DOCDIR)/db-schema.dot $(DOCDIR)/db-schema.dot.orig -ifndef PIFPAF +ifeq ($(PIFPAF),) touch $@ else rm -f $@ endif $(DOCDIR)/db-schema.dot: clusters.dot autodoc-stamp $(DOCDIR) bin/dot_add_content $(DOCDIR)/db-schema.dot.orig clusters.dot > $(DOCDIR)/db-schema.dot $(DOCDIR)/db-schema.pdf: $(DOCDIR)/db-schema.dot autodoc-stamp dot -T pdf $< > $@ $(DOCDIR)/db-schema.svg: $(DOCDIR)/db-schema.dot autodoc-stamp dot -T svg $< > $@ clean: rm -rf *-stamp $(DOCDIR)/ distclean: clean dropdb rm -f swh.dump .PHONY: all initdb createdb dropdb doc clean diff --git a/sql/bin/dot_add_content b/sql/bin/dot_add_content index fc24e38e1..b462d8d97 100755 --- a/sql/bin/dot_add_content +++ b/sql/bin/dot_add_content @@ -1,15 +1,20 @@ #!/bin/bash DOT_FILE="$1" DOT_EXTRA="$2" +SQL_SCHEMA="../swh/storage/sql/30-swh-schema.sql" if [ -z "$DOT_FILE" -o -z "$DOT_EXTRA" ] ; then echo "Usage: $0 DOT_FILE DOT_EXTRA" exit 1 fi -schema_version=$(grep -i -A 1 '^insert into dbversion' swh-schema.sql | tail -n 1 \ +if ! [ -f "$SQL_SCHEMA" ] ; then + echo "Cannot find SQL schema ${SQL_SCHEMA}" 1>&2 + exit 2 +fi +schema_version=$(grep -i -A 1 '^insert into dbversion' "$SQL_SCHEMA" | tail -n 1 \ | sed -e 's/.*values(//i' -e 's/,.*//') head -n -1 "$DOT_FILE" # all of $DOT_FILE but last line sed "s/@@VERSION@@/$schema_version/" "$DOT_EXTRA" echo "}" diff --git a/sql/clusters.dot b/sql/clusters.dot index e8f75e48e..2fb7eb7c8 100644 --- a/sql/clusters.dot +++ b/sql/clusters.dot @@ -1,95 +1,85 @@ subgraph "logical_grouping" { style = rounded; bgcolor = gray95; color = gray; subgraph cluster_meta { label = <schema versioning
version: @@VERSION@@>; dbversion; } subgraph cluster_content { label = <content>; content; skipped_content; } subgraph cluster_directory { label = <directories>; directory; directory_entry_dir; directory_entry_file; directory_entry_rev; } subgraph cluster_revision { label = <revisions>; revision; revision_history; person; } subgraph cluster_release { label = <releases>; release; } subgraph cluster_snapshots { label = <snapshots>; - occurrence; - occurrence_history; snapshot; snapshot_branch; snapshot_branches; } subgraph cluster_origins { label = <origins>; origin; fetch_history; origin_visit; } - subgraph cluster_entity { - label = <entities>; - entity; - entity_history; - entity_equivalence; - listable_entity; - list_history; - } - subgraph cluster_metadata { label = <metadata>; metadata_provider; origin_metadata; tool; } subgraph cluster_statistics { label = <statistics>; object_counts; + object_counts_bucketed; } { edge [style = dashed]; - # "rtcolN" identifies the N-th row in a table, as a source - # "ltcolN" identifies the N-th row in a table, as a destination - "directory_entry_dir":rtcol2 -> "directory":ltcol1; + # "rtcolN" identifies the N-th row (1-based) in a table, as a source + # "ltcolN" identifies the N-th row (1-based) in a table, as a destination + "snapshot_branch":rtcol3 -> "release":ltcol1; + "snapshot_branch":rtcol3 -> "revision":ltcol1; + "snapshot_branch":rtcol3 -> "directory":ltcol1; + "snapshot_branch":rtcol3 -> "content":ltcol2; + "directory_entry_dir":ltcol2 -> "directory":rtcol1; "directory_entry_file":rtcol2 -> "content":ltcol2; "directory_entry_file":rtcol2 -> "skipped_content":ltcol2; "directory_entry_rev":rtcol2 -> "revision":ltcol1; "directory":rtcol2 -> "directory_entry_dir":ltcol1; "directory":rtcol3 -> "directory_entry_file":ltcol1; "directory":rtcol4 -> "directory_entry_rev":ltcol1; - "occurrence":rtcol3 -> "revision":ltcol1; - "occurrence_history":rtcol3 -> "revision":ltcol1; "release":rtcol2 -> "revision":ltcol1; - "revision":rtcol9 -> "directory":ltcol1; + "revision":ltcol7 -> "directory":rtcol1; "revision_history":rtcol2 -> "revision":ltcol1; - "entity_history":rtcol3 -> "entity_history":ltcol2; - "entity_history":rtcol10 -> "listable_entity":ltcol1; } } diff --git a/sql/upgrades/128.sql b/sql/upgrades/128.sql new file mode 100644 index 000000000..82c61f442 --- /dev/null +++ b/sql/upgrades/128.sql @@ -0,0 +1,23 @@ +-- SWH DB schema upgrade +-- from_version: 127 +-- to_version: 128 +-- description: Add snapshot trigger event on insertion + +insert into dbversion(version, release, description) + values(128, now(), 'Work In Progress'); + +-- Asynchronous notification of new snapshot insertions +create function notify_new_snapshot() + returns trigger + language plpgsql +as $$ + begin + perform pg_notify('new_snapshot', json_build_object('id', encode(new.id, 'hex'))::text); + return null; + end; +$$; + +create trigger notify_new_snapshot + after insert on snapshot + for each row + execute procedure notify_new_snapshot(); diff --git a/swh.storage.egg-info/PKG-INFO b/swh.storage.egg-info/PKG-INFO index 1926a6943..270b87200 100644 --- a/swh.storage.egg-info/PKG-INFO +++ b/swh.storage.egg-info/PKG-INFO @@ -1,153 +1,153 @@ Metadata-Version: 2.1 Name: swh.storage -Version: 0.0.108 +Version: 0.0.109 Summary: Software Heritage storage manager Home-page: https://forge.softwareheritage.org/diffusion/DSTO/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-storage Description: swh-storage =========== Abstraction layer over the archive, allowing to access all stored source code artifacts as well as their metadata. See the [documentation](https://docs.softwareheritage.org/devel/swh-storage/index.html) for more details. Tests ----- Python tests for this module include tests that cannot be run without a local Postgres database. You are not obliged to run those tests though: - `make test`: will run all tests - `make test-nodb`: will run only tests that do not need a local DB - `make test-db`: will run only tests that do need a local DB If you do want to run DB-related tests, you should ensure you have access zith sufficient privileges to a Postgresql database. ### Using your system database You need to ensure that your user is authorized to create and drop DBs, and in particular DBs named "softwareheritage-test" and "softwareheritage-dev" Note: the testdata repository (swh-storage-testdata) is not required any more. ### Using pifpaf [pifpaf](https://github.com/jd/pifpaf) is a suite of fixtures and a command-line tool that allows to start and stop daemons for a quick throw-away usage. It can be used to run tests that need a Postgres database without any other configuration reauired nor the need to have special access to a running database: ```bash $ pifpaf run postgresql make test-db [snip] ---------------------------------------------------------------------- Ran 124 tests in 56.203s OK ``` Note that pifpaf is not yet available as a Debian package, so you may have to install it in a venv. Development ----------- A test server could locally be running for tests. ### Sample configuration In either /etc/softwareheritage/storage/storage.yml, ~/.config/swh/storage.yml or ~/.swh/storage.yml: ``` storage: cls: local args: db: "dbname=softwareheritage-dev user=" objstorage: cls: pathslicing args: root: /home/storage/swh-storage/ slicing: 0:2/2:4/4:6 ``` which means, this uses: - a local storage instance whose db connection is to softwareheritage-dev local instance - the objstorage uses a local objstorage instance whose: - root path is /home/storage/swh-storage - slicing scheme is 0:2/2:4/4:6. This means that the identifier of the content (sha1) which will be stored on disk at first level with the first 2 hex characters, the second level with the next 2 hex characters and the third level with the next 2 hex characters. And finally the complete hash file holding the raw content. For example: 00062f8bd330715c4f819373653d97b3cd34394c will be stored at 00/06/2f/00062f8bd330715c4f819373653d97b3cd34394c Note that the 'root' path should exist on disk. ### Run server Command: ``` python3 -m swh.storage.api.server ~/.config/swh/storage.yml ``` This runs a local swh-storage api at 5002 port. ### And then what? In your upper layer (loader-git, loader-svn, etc...), you can define a remote storage with this snippet of yaml configuration. ``` storage: cls: remote args: url: http://localhost:5002/ ``` You could directly define a local storage with the following snippet: ``` storage: cls: local args: db: service=swh-dev objstorage: cls: pathslicing args: root: /home/storage/swh-storage/ slicing: 0:2/2:4/4:6 ``` Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Description-Content-Type: text/markdown Provides-Extra: listener Provides-Extra: schemata Provides-Extra: testing diff --git a/swh.storage.egg-info/SOURCES.txt b/swh.storage.egg-info/SOURCES.txt index 0b23e3c35..ecb7aa53b 100644 --- a/swh.storage.egg-info/SOURCES.txt +++ b/swh.storage.egg-info/SOURCES.txt @@ -1,215 +1,218 @@ .gitignore AUTHORS LICENSE MANIFEST.in Makefile Makefile.local README.md +pytest.ini requirements-swh.txt requirements-test.txt requirements.txt setup.py +tox.ini version.txt bin/swh-storage-add-dir debian/changelog debian/compat debian/control debian/copyright debian/rules debian/source/format docs/.gitignore docs/Makefile docs/Makefile.local docs/archive-copies.rst docs/conf.py docs/index.rst docs/sql-storage.rst docs/_static/.placeholder docs/_templates/.placeholder docs/images/.gitignore docs/images/Makefile docs/images/swh-archive-copies.dia -docs/images/swh-archive-copies.pdf -docs/images/swh-archive-copies.svg sql/.gitignore sql/Makefile sql/TODO sql/clusters.dot sql/bin/db-upgrade sql/bin/dot_add_content sql/doc/json sql/doc/json/.gitignore sql/doc/json/Makefile sql/doc/json/entity.lister_metadata.schema.json sql/doc/json/entity.metadata.schema.json sql/doc/json/entity_history.lister_metadata.schema.json sql/doc/json/entity_history.metadata.schema.json sql/doc/json/fetch_history.result.schema.json sql/doc/json/list_history.result.schema.json sql/doc/json/listable_entity.list_params.schema.json sql/doc/json/origin_visit.metadata.json sql/doc/json/tool.tool_configuration.schema.json sql/json/.gitignore sql/json/Makefile sql/json/entity.lister_metadata.schema.json sql/json/entity.metadata.schema.json sql/json/entity_history.lister_metadata.schema.json sql/json/entity_history.metadata.schema.json sql/json/fetch_history.result.schema.json sql/json/list_history.result.schema.json sql/json/listable_entity.list_params.schema.json sql/json/origin_visit.metadata.json sql/json/tool.tool_configuration.schema.json sql/upgrades/015.sql sql/upgrades/016.sql sql/upgrades/017.sql sql/upgrades/018.sql sql/upgrades/019.sql sql/upgrades/020.sql sql/upgrades/021.sql sql/upgrades/022.sql sql/upgrades/023.sql sql/upgrades/024.sql sql/upgrades/025.sql sql/upgrades/026.sql sql/upgrades/027.sql sql/upgrades/028.sql sql/upgrades/029.sql sql/upgrades/030.sql sql/upgrades/032.sql sql/upgrades/033.sql sql/upgrades/034.sql sql/upgrades/035.sql sql/upgrades/036.sql sql/upgrades/037.sql sql/upgrades/038.sql sql/upgrades/039.sql sql/upgrades/040.sql sql/upgrades/041.sql sql/upgrades/042.sql sql/upgrades/043.sql sql/upgrades/044.sql sql/upgrades/045.sql sql/upgrades/046.sql sql/upgrades/047.sql sql/upgrades/048.sql sql/upgrades/049.sql sql/upgrades/050.sql sql/upgrades/051.sql sql/upgrades/052.sql sql/upgrades/053.sql sql/upgrades/054.sql sql/upgrades/055.sql sql/upgrades/056.sql sql/upgrades/057.sql sql/upgrades/058.sql sql/upgrades/059.sql sql/upgrades/060.sql sql/upgrades/061.sql sql/upgrades/062.sql sql/upgrades/063.sql sql/upgrades/064.sql sql/upgrades/065.sql sql/upgrades/066.sql sql/upgrades/067.sql sql/upgrades/068.sql sql/upgrades/069.sql sql/upgrades/070.sql sql/upgrades/071.sql sql/upgrades/072.sql sql/upgrades/073.sql sql/upgrades/074.sql sql/upgrades/075.sql sql/upgrades/076.sql sql/upgrades/077.sql sql/upgrades/078.sql sql/upgrades/079.sql sql/upgrades/080.sql sql/upgrades/081.sql sql/upgrades/082.sql sql/upgrades/083.sql sql/upgrades/084.sql sql/upgrades/085.sql sql/upgrades/086.sql sql/upgrades/087.sql sql/upgrades/088.sql sql/upgrades/089.sql sql/upgrades/090.sql sql/upgrades/091.sql sql/upgrades/092.sql sql/upgrades/093.sql sql/upgrades/094.sql sql/upgrades/095.sql sql/upgrades/096.sql sql/upgrades/097.sql sql/upgrades/098.sql sql/upgrades/099.sql sql/upgrades/100.sql sql/upgrades/101.sql sql/upgrades/102.sql sql/upgrades/103.sql sql/upgrades/104.sql sql/upgrades/105.sql sql/upgrades/106.sql sql/upgrades/107.sql sql/upgrades/108.sql sql/upgrades/109.sql sql/upgrades/110.sql sql/upgrades/111.sql sql/upgrades/112.sql sql/upgrades/113.sql sql/upgrades/114.sql sql/upgrades/115.sql sql/upgrades/116.sql sql/upgrades/117.sql sql/upgrades/118.sql sql/upgrades/119.sql sql/upgrades/120.sql sql/upgrades/121.sql sql/upgrades/122.sql sql/upgrades/123.sql sql/upgrades/124.sql sql/upgrades/125.sql sql/upgrades/126.sql sql/upgrades/127.sql +sql/upgrades/128.sql swh/__init__.py swh.storage.egg-info/PKG-INFO swh.storage.egg-info/SOURCES.txt swh.storage.egg-info/dependency_links.txt swh.storage.egg-info/requires.txt swh.storage.egg-info/top_level.txt swh/storage/__init__.py swh/storage/common.py swh/storage/converters.py swh/storage/db.py swh/storage/db_utils.py swh/storage/exc.py swh/storage/listener.py swh/storage/storage.py swh/storage/algos/__init__.py swh/storage/algos/diff.py swh/storage/algos/dir_iterators.py +swh/storage/algos/revisions_walker.py swh/storage/algos/snapshot.py swh/storage/api/__init__.py swh/storage/api/client.py swh/storage/api/server.py swh/storage/schemata/__init__.py swh/storage/schemata/distribution.py swh/storage/sql/10-swh-init.sql swh/storage/sql/20-swh-enums.sql swh/storage/sql/30-swh-schema.sql swh/storage/sql/40-swh-func.sql swh/storage/sql/60-swh-indexes.sql swh/storage/sql/70-swh-triggers.sql swh/storage/tests/__init__.py swh/storage/tests/storage_testing.py swh/storage/tests/test_api_client.py swh/storage/tests/test_converters.py swh/storage/tests/test_db.py swh/storage/tests/test_storage.py swh/storage/tests/algos/__init__.py swh/storage/tests/algos/test_diff.py swh/storage/tests/algos/test_dir_iterator.py +swh/storage/tests/algos/test_revisions_walker.py swh/storage/tests/algos/test_snapshot.py utils/dump_revisions.py utils/fix_revisions_from_dump.py \ No newline at end of file diff --git a/swh.storage.egg-info/requires.txt b/swh.storage.egg-info/requires.txt index 18155fc81..e94672ef8 100644 --- a/swh.storage.egg-info/requires.txt +++ b/swh.storage.egg-info/requires.txt @@ -1,20 +1,20 @@ aiohttp click flask psycopg2 python-dateutil swh.core>=0.0.44 swh.model>=0.0.27 swh.objstorage>=0.0.17 swh.scheduler>=0.0.14 vcversioner [listener] kafka_python [schemata] SQLAlchemy [testing] hypothesis>=3.11.0 -nose +pytest diff --git a/swh/storage/algos/dir_iterators.py b/swh/storage/algos/dir_iterators.py index e96ee68cf..9f997384c 100644 --- a/swh/storage/algos/dir_iterators.py +++ b/swh/storage/algos/dir_iterators.py @@ -1,374 +1,375 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # Utility module to iterate on directory trees. # The implementation is inspired from the work of Alberto Cortés # for the go-git project. For more details, you can refer to: # - this blog post: https://blog.sourced.tech/post/difftree/ # - the reference implementation in go: # https://github.com/src-d/go-git/tree/master/utils/merkletrie from enum import Enum from swh.model.hashutil import hash_to_bytes from swh.model.identifiers import directory_identifier # get the hash identifier for an empty directory _empty_dir_hash = hash_to_bytes(directory_identifier({'entries': []})) def _get_dir(storage, dir_id): """ Return directory data from swh storage. """ return storage.directory_ls(dir_id) if dir_id else [] class DirectoryIterator(object): """ Helper class used to iterate on a directory tree in a depth-first search way with some additional features: - - sibling nodes are iterated in lexicographic order by name - - it is possible to skip the visit of sub-directories nodes - for efficiency reasons when comparing two trees (no need to - go deeper if two directories have the same hash) + + - sibling nodes are iterated in lexicographic order by name + - it is possible to skip the visit of sub-directories nodes + for efficiency reasons when comparing two trees (no need to + go deeper if two directories have the same hash) """ def __init__(self, storage, dir_id, base_path=b''): """ Args: storage (swh.storage.storage.Storage): instance of swh storage (either local or remote) dir_id (bytes): identifier of a root directory base_path (bytes): optional base path used when traversing a sub-directory """ self.storage = storage self.root_dir_id = dir_id self.base_path = base_path self.restart() def restart(self): """ Restart the iteration at the beginning. """ # stack of frames representing currently visited directories: # the root directory is at the bottom while the current one # is at the top self.frames = [] self._push_dir_frame(self.root_dir_id) self.has_started = False def _push_dir_frame(self, dir_id): """ Visit a sub-directory by pushing a new frame to the stack. Each frame is itself a stack of directory entries. Args: dir_id (bytes): identifier of a root directory """ # get directory entries dir_data = _get_dir(self.storage, dir_id) # sort them in lexicographical order and reverse the ordering # in order to unstack the "smallest" entry each time the # iterator advances dir_data = sorted(dir_data, key=lambda e: e['name'], reverse=True) # push the directory frame to the main stack self.frames.append(dir_data) def top(self): """ Returns: list: The top frame of the main directories stack """ if not self.frames: return None return self.frames[-1] def current(self): """ Returns: dict: The current visited directory entry, i.e. the top element from the top frame """ top_frame = self.top() if not top_frame: return None return top_frame[-1] def current_hash(self): """ Returns: bytes: The hash value of the currently visited directory entry """ return self.current()['target'] def current_perms(self): """ Returns: int: The permissions value of the currently visited directory entry """ return self.current()['perms'] def current_path(self): """ Returns: str: The absolute path from the root directory of the currently visited directory entry """ top_frame = self.top() if not top_frame: return None path = [] for frame in self.frames: path.append(frame[-1]['name']) return self.base_path + b'/'.join(path) def current_is_dir(self): """ Returns: bool: If the currently visited directory entry is a directory """ return self.current()['type'] == 'dir' def _advance(self, descend): """ Advance in the tree iteration. Args: descend (bool): whether or not to push a new frame if the currently visited element is a sub-directory Returns: dict: The description of the newly visited directory entry """ current = self.current() if not self.has_started or not current: self.has_started = True return current if descend and self.current_is_dir() \ and current['target'] != _empty_dir_hash: self._push_dir_frame(current['target']) else: self.drop() return self.current() def next(self): """ Advance the tree iteration by dropping the current visited directory entry from the top frame. If the top frame ends up empty, the operation is recursively applied to remove all empty frames as the tree is climbed up towards its root. Returns: dict: The description of the newly visited directory entry """ return self._advance(False) def step(self): """ Advance the tree iteration like the next operation with the difference that if the current visited element is a sub-directory a new frame representing its content is pushed to the main stack. Returns: dict: The description of the newly visited directory entry """ return self._advance(True) def drop(self): """ Drop the current visited element from the top frame. If the frame ends up empty, the operation is recursively applied. """ frame = self.top() if not frame: return frame.pop() if not frame: self.frames.pop() self.drop() def __next__(self): entry = self.step() if not entry: raise StopIteration entry['path'] = self.current_path() return entry def __iter__(self): return DirectoryIterator(self.storage, self.root_dir_id, self.base_path) def dir_iterator(storage, dir_id): """ Return an iterator for recursively visiting a directory and its sub-directories. The associated paths are visited in lexicographic depth-first search order. Args: storage (swh.storage.Storage): an instance of a swh storage dir_id (bytes): a directory identifier Returns: swh.storage.algos.dir_iterators.DirectoryIterator: an iterator returning a dict at each iteration step describing a directory entry. A 'path' field is added in that dict to store the absolute path of the entry. """ return DirectoryIterator(storage, dir_id) class Remaining(Enum): """ Enum to represent the current state when iterating on both directory trees at the same time. """ NoMoreFiles = 0 OnlyToFilesRemain = 1 OnlyFromFilesRemain = 2 BothHaveFiles = 3 class DoubleDirectoryIterator(object): """ Helper class to traverse two directory trees at the same time and compare their contents to detect changes between them. """ def __init__(self, storage, dir_from, dir_to): """ Args: storage: instance of swh storage dir_from (bytes): hash identifier of the from directory dir_to (bytes): hash identifier of the to directory """ self.storage = storage self.dir_from = dir_from self.dir_to = dir_to self.restart() def restart(self): """ Restart the double iteration at the beginning. """ # initialize custom dfs iterators for the two directories self.it_from = DirectoryIterator(self.storage, self.dir_from) self.it_to = DirectoryIterator(self.storage, self.dir_to) # grab the first element of each iterator self.it_from.next() self.it_to.next() def next_from(self): """ Apply the next operation on the from iterator. """ self.it_from.next() def next_to(self): """ Apply the next operation on the to iterator. """ self.it_to.next() def next_both(self): """ Apply the next operation on both iterators. """ self.next_from() self.next_to() def step_from(self): """ Apply the step operation on the from iterator. """ self.it_from.step() def step_to(self): """ Apply the step operation on the from iterator. """ self.it_to.step() def step_both(self): """ Apply the step operation on the both iterators. """ self.step_from() self.step_to() def remaining(self): """ Returns: Remaining: the current state of the double iteration """ from_current = self.it_from.current() to_current = self.it_to.current() # no more files to iterate in both iterators if not from_current and not to_current: return Remaining.NoMoreFiles # still some files to iterate in the to iterator elif not from_current and to_current: return Remaining.OnlyToFilesRemain # still some files to iterate in the from iterator elif from_current and not to_current: return Remaining.OnlyFromFilesRemain # still files to iterate in the both iterators else: return Remaining.BothHaveFiles def compare(self): """ Compare the current iterated directory entries in both iterators and return the comparison status. Returns: dict: The status of the comparison with the following bool values: * *same_hash*: indicates if the two entries have the same hash * *same_perms*: indicates if the two entries have the same permissions * *both_are_dirs*: indicates if the two entries are directories * *both_are_files*: indicates if the two entries are regular files * *file_and_dir*: indicates if one of the entry is a directory and the other a regular file * *from_is_empty_dir*: indicates if the from entry is the empty directory * *from_is_empty_dir*: indicates if the to entry is the empty directory """ from_current_hash = self.it_from.current_hash() to_current_hash = self.it_to.current_hash() from_current_perms = self.it_from.current_perms() to_current_perms = self.it_to.current_perms() from_is_dir = self.it_from.current_is_dir() to_is_dir = self.it_to.current_is_dir() status = {} # compare hash status['same_hash'] = from_current_hash == to_current_hash # compare permissions status['same_perms'] = from_current_perms == to_current_perms # check if both elements are directories status['both_are_dirs'] = from_is_dir and to_is_dir # check if both elements are regular files status['both_are_files'] = not from_is_dir and not to_is_dir # check if one element is a directory, the other a regular file status['file_and_dir'] = (not status['both_are_dirs'] and not status['both_are_files']) # check if the from element is the empty directory status['from_is_empty_dir'] = (from_is_dir and from_current_hash == _empty_dir_hash) # check if the to element is the empty directory status['to_is_empty_dir'] = (to_is_dir and to_current_hash == _empty_dir_hash) return status diff --git a/swh/storage/algos/revisions_walker.py b/swh/storage/algos/revisions_walker.py new file mode 100644 index 000000000..9b683f83c --- /dev/null +++ b/swh/storage/algos/revisions_walker.py @@ -0,0 +1,513 @@ +# Copyright (C) 2018 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import heapq + +from abc import ABCMeta, abstractmethod +from collections import deque + +_revs_walker_classes = {} + + +class _RevisionsWalkerMetaClass(ABCMeta): + def __new__(cls, clsname, bases, attrs): + newclass = super().__new__(cls, clsname, bases, attrs) + if 'rw_type' in attrs: + _revs_walker_classes[attrs['rw_type']] = newclass + return newclass + + +class RevisionsWalker(metaclass=_RevisionsWalkerMetaClass): + """ + Abstract base class encapsulating the logic to walk across + a revisions history starting from a given one. + + It defines an iterator returning the revisions according + to a specific ordering implemented in derived classes. + + The iteration step performs the following operations: + + 1) Check if the iteration is finished by calling method + :meth:`is_finished` and raises :exc:`StopIteration` if it + it is the case + + 2) Get the next unseen revision by calling method + :meth:`get_next_rev_id` + + 3) Process parents of that revision by calling method + :meth:`process_parent_revs` for the next iteration + steps + + 4) Check if the revision should be returned by calling + method :meth:`should_return` and returns it if + it is the case + + In order to easily instantiate a specific type of revisions + walker, it is recommended to use the factory function + :func:`get_revisions_walker`. + + Args: + storage (swh.storage.storage.Storage): instance of swh storage + (either local or remote) + rev_start (bytes): a revision identifier + max_revs (Optional[int]): maximum number of revisions to return + state (Optional[dict]): previous state of that revisions walker + """ + + def __init__(self, storage, rev_start, max_revs=None, state=None): + self._revs_to_visit = [] + self._done = set() + self._revs = {} + self._last_rev = None + self._num_revs = 0 + self._max_revs = max_revs + if state: + self._revs_to_visit = state['revs_to_visit'] + self._done = state['done'] + self._last_rev = state['last_rev'] + self._num_revs = state['num_revs'] + self.storage = storage + self.process_rev(rev_start) + + @abstractmethod + def process_rev(self, rev_id): + """ + Abstract method whose purpose is to process a newly visited + revision during the walk. + Derived classes must implement it according to the desired + method to walk across the revisions history (for instance + through a dfs on the revisions DAG). + + Args: + rev_id (bytes): the newly visited revision identifier + """ + pass + + @abstractmethod + def get_next_rev_id(self): + """ + Abstract method whose purpose is to return the next revision + during the iteration. + Derived classes must implement it according to the desired + method to walk across the revisions history. + + Returns: + dict: A dict describing a revision as returned by + :meth:`swh.storage.storage.Storage.revision_get` + """ + pass + + def process_parent_revs(self, rev): + """ + Process the parents of a revision when it is iterated. + The default implementation simply calls :meth:`process_rev` + for each parent revision in the order they are declared. + + Args: + rev (dict): A dict describing a revision as returned by + :meth:`swh.storage.storage.Storage.revision_get` + """ + for parent_id in rev['parents']: + self.process_rev(parent_id) + + def should_return(self, rev): + """ + Filter out a revision to return if needed. + Default implementation returns all iterated revisions. + + Args: + rev (dict): A dict describing a revision as returned by + :meth:`swh.storage.storage.Storage.revision_get` + + Returns: + bool: Whether to return the revision in the iteration + """ + return True + + def is_finished(self): + """ + Determine if the iteration is finished. + This method is called at the beginning of each iteration loop. + + Returns: + bool: Whether the iteration is finished + """ + if self._max_revs is not None and self._num_revs >= self._max_revs: + return True + if not self._revs_to_visit: + return True + return False + + def _get_rev(self, rev_id): + rev = self._revs.get(rev_id, None) + if not rev: + # cache some revisions in advance to avoid sending too much + # requests to storage and thus speedup the revisions walk + for rev in self.storage.revision_log([rev_id], limit=100): + self._revs[rev['id']] = rev + return self._revs[rev_id] + + def export_state(self): + """ + Export the internal state of that revision walker to a dict. + Its purpose is to continue the iteration in a pagination context. + + Returns: + dict: A dict containing the internal state of that revisions walker + """ + return { + 'revs_to_visit': self._revs_to_visit, + 'done': self._done, + 'last_rev': self._last_rev, + 'num_revs': self._num_revs + } + + def __next__(self): + if self.is_finished(): + raise StopIteration + while self._revs_to_visit: + rev_id = self.get_next_rev_id() + if rev_id in self._done: + continue + self._done.add(rev_id) + rev = self._get_rev(rev_id) + self.process_parent_revs(rev) + if self.should_return(rev): + self._num_revs += 1 + self._last_rev = rev + return rev + raise StopIteration + + def __iter__(self): + return self + + +class CommitterDateRevisionsWalker(RevisionsWalker): + """ + Revisions walker that returns revisions in reverse chronological + order according to committer date (same behaviour as ``git log``) + """ + + rw_type = 'committer_date' + + def process_rev(self, rev_id): + """ + Add the revision to a priority queue according to the committer date. + + Args: + rev_id (bytes): the newly visited revision identifier + """ + if rev_id not in self._done: + rev = self._get_rev(rev_id) + commit_time = rev['committer_date']['timestamp']['seconds'] + heapq.heappush(self._revs_to_visit, (-commit_time, rev_id)) + + def get_next_rev_id(self): + """ + Return the smallest revision from the priority queue, i.e. + the one with highest committer date. + + Returns: + dict: A dict describing a revision as returned by + :meth:`swh.storage.storage.Storage.revision_get` + """ + _, rev_id = heapq.heappop(self._revs_to_visit) + return rev_id + + +class BFSRevisionsWalker(RevisionsWalker): + """ + Revisions walker that returns revisions in the same order + as when performing a breadth-first search on the revisions + DAG. + """ + + rw_type = 'bfs' + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._revs_to_visit = deque(self._revs_to_visit) + + def process_rev(self, rev_id): + """ + Append the revision to a queue. + + Args: + rev_id (bytes): the newly visited revision identifier + """ + if rev_id not in self._done: + self._revs_to_visit.append(rev_id) + + def get_next_rev_id(self): + """ + Return the next revision from the queue. + + Returns: + dict: A dict describing a revision as returned by + :meth:`swh.storage.storage.Storage.revision_get` + """ + return self._revs_to_visit.popleft() + + +class DFSPostRevisionsWalker(RevisionsWalker): + """ + Revisions walker that returns revisions in the same order + as when performing a depth-first search in post-order on the + revisions DAG (i.e. after visiting a merge commit, + the merged commit will be visited before the base it was + merged on). + """ + + rw_type = 'dfs_post' + + def process_rev(self, rev_id): + """ + Append the revision to a stack. + + Args: + rev_id (bytes): the newly visited revision identifier + """ + if rev_id not in self._done: + self._revs_to_visit.append(rev_id) + + def get_next_rev_id(self): + """ + Return the next revision from the stack. + + Returns: + dict: A dict describing a revision as returned by + :meth:`swh.storage.storage.Storage.revision_get` + """ + return self._revs_to_visit.pop() + + +class DFSRevisionsWalker(DFSPostRevisionsWalker): + """ + Revisions walker that returns revisions in the same order + as when performing a depth-first search in pre-order on the + revisions DAG (i.e. after visiting a merge commit, + the base commit it was merged on will be visited before + the merged commit). + """ + + rw_type = 'dfs' + + def process_parent_revs(self, rev): + """ + Process the parents of a revision when it is iterated in + the reversed order they are declared. + + Args: + rev (dict): A dict describing a revision as returned by + :meth:`swh.storage.storage.Storage.revision_get` + """ + for parent_id in reversed(rev['parents']): + self.process_rev(parent_id) + + +class PathRevisionsWalker(CommitterDateRevisionsWalker): + """ + Revisions walker that returns revisions where a specific + path in the source tree has been modified, in other terms + it allows to get the history for a specific file or directory. + + It has a behaviour similar to what ``git log`` offers by default, + meaning the returned history is simplified in order to only + show relevant revisions (see the `History Simplification + `_ + section of the associated manual for more details). + + Please note that to avoid walking the entire history, the iteration + will stop once a revision where the path has been added is found. + + .. warning:: Due to client-side implementation, performances + are not optimal when the total numbers of revisions to walk + is large. This should only be used when the total number of + revisions does not exceed a couple of thousands. + + Args: + storage (swh.storage.storage.Storage): instance of swh storage + (either local or remote) + rev_start (bytes): a revision identifier + path (str): the path in the source tree to retrieve the history + max_revs (Optional[int]): maximum number of revisions to return + state (Optional[dict]): previous state of that revisions walker + """ + + rw_type = 'path' + + def __init__(self, storage, rev_start, path, **kwargs): + super().__init__(storage, rev_start, **kwargs) + paths = path.strip('/').split('/') + self._path = list(map(lambda p: p.encode('utf-8'), paths)) + self._rev_dir_path = {} + + def _get_path_id(self, rev_id): + """ + Return the path checksum identifier in the source tree of the + provided revision. If the path corresponds to a directory, the + value computed by :func:`swh.model.identifiers.directory_identifier` + will be returned. If the path corresponds to a file, its sha1 + checksum will be returned. + + Args: + rev_id (bytes): a revision identifier + + Returns: + bytes: the path identifier + """ + + rev = self._get_rev(rev_id) + + rev_dir_id = rev['directory'] + + if rev_dir_id not in self._rev_dir_path: + try: + dir_info = \ + self.storage.directory_entry_get_by_path(rev_dir_id, + self._path) + self._rev_dir_path[rev_dir_id] = dir_info['target'] + except Exception: + self._rev_dir_path[rev_dir_id] = None + + return self._rev_dir_path[rev_dir_id] + + def is_finished(self): + """ + Check if the revisions iteration is finished. + This checks for the specified path's existence in the last + returned revision's parents' source trees. + If not, the iteration is considered finished. + + Returns: + bool: Whether to return the revision in the iteration + """ + if self._path and self._last_rev: + last_rev_parents = self._last_rev['parents'] + last_rev_parents_path_ids = [self._get_path_id(p_rev) + for p_rev in last_rev_parents] + no_path = all([path_id is None + for path_id in last_rev_parents_path_ids]) + if no_path: + return True + return super().is_finished() + + def process_parent_revs(self, rev): + """ + Process parents when a new revision is iterated. + It enables to get a simplified revisions history in the same + manner as ``git log``. When a revision has multiple parents, + the following process is applied. If the revision was a merge, + and has the same path identifier to one parent, follow only that + parent (even if there are several parents with the same path + identifier, follow only one of them.) Otherwise, follow all parents. + + Args: + rev (dict): A dict describing a revision as returned by + :meth:`swh.storage.storage.Storage.revision_get` + """ + rev_path_id = self._get_path_id(rev['id']) + + if rev_path_id: + if len(rev['parents']) == 1: + self.process_rev(rev['parents'][0]) + else: + parent_rev_path_ids = [self._get_path_id(p_rev) + for p_rev in rev['parents']] + different_trees = all([path_id != rev_path_id + for path_id in parent_rev_path_ids]) + for i, p_rev in enumerate(rev['parents']): + if different_trees or \ + parent_rev_path_ids[i] == rev_path_id: + self.process_rev(p_rev) + if not different_trees: + break + else: + super().process_parent_revs(rev) + + def should_return(self, rev): + """ + Check if a revision should be returned when iterating. + It verifies that the specified path has been modified + by the revision but also that all parents have a path + identifier different from the revision one in order + to get a simplified history. + + Args: + rev (dict): A dict describing a revision as returned by + :meth:`swh.storage.storage.Storage.revision_get` + + Returns: + bool: Whether to return the revision in the iteration + """ + rev_path_id = self._get_path_id(rev['id']) + + if not rev['parents']: + return rev_path_id is not None + + parent_rev_path_ids = [self._get_path_id(p_rev) + for p_rev in rev['parents']] + different_trees = all([path_id != rev_path_id + for path_id in parent_rev_path_ids]) + + if rev_path_id != parent_rev_path_ids[0] and different_trees: + return True + + return False + + +def get_revisions_walker(rev_walker_type, *args, **kwargs): + """ + Instantiate a revisions walker of a given type. + + The following code snippet demonstrates how to use a revisions + walker for processing a whole revisions history:: + + from swh.storage import get_storage + + storage = get_storage(...) + + revs_walker = get_revisions_walker('committer_date', storage, rev_id) + for rev in revs_walker: + # process revision rev + + It is also possible to walk a revisions history in a paginated + way as illustrated below:: + + def get_revs_history_page(rw_type, storage, rev_id, page_num, + page_size, rw_state): + max_revs = (page_num + 1) * page_size + revs_walker = get_revisions_walker(rw_type, storage, rev_id, + max_revs=max_revs, + state=rw_state) + revs = list(revs_walker) + rw_state = revs_walker.export_state() + return revs + + rev_start = ... + per_page = 50 + rw_state = {} + + for page in range(0, 10): + revs_page = get_revs_history_page('dfs', storage, rev_start, page, + per_page, rw_state) + # process revisions page + + + Args: + rev_walker_type (str): the type of revisions walker to return, + possible values are: *committer_date*, *dfs*, *dfs_post*, + *bfs* and *path* + args (list): position arguments to pass to the revisions walker + constructor + kwargs (dict): keyword arguments to pass to the revisions walker + constructor + + """ + if rev_walker_type not in _revs_walker_classes: + raise Exception('No revisions walker found for type "%s"' + % rev_walker_type) + revs_walker_class = _revs_walker_classes[rev_walker_type] + return revs_walker_class(*args, **kwargs) diff --git a/swh/storage/api/client.py b/swh/storage/api/client.py index 038a3a66f..027f3ba31 100644 --- a/swh/storage/api/client.py +++ b/swh/storage/api/client.py @@ -1,233 +1,243 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import warnings from swh.core.api import SWHRemoteAPI from ..exc import StorageAPIError class RemoteStorage(SWHRemoteAPI): """Proxy to a remote storage API""" def __init__(self, url, timeout=None): super().__init__( api_exception=StorageAPIError, url=url, timeout=timeout) def check_config(self, *, check_write): return self.post('check_config', {'check_write': check_write}) def content_add(self, content): return self.post('content/add', {'content': content}) def content_update(self, content, keys=[]): return self.post('content/update', {'content': content, 'keys': keys}) def content_missing(self, content, key_hash='sha1'): return self.post('content/missing', {'content': content, 'key_hash': key_hash}) def content_missing_per_sha1(self, contents): return self.post('content/missing/sha1', {'contents': contents}) def content_get(self, content): return self.post('content/data', {'content': content}) def content_get_metadata(self, content): return self.post('content/metadata', {'content': content}) def content_find(self, content): return self.post('content/present', {'content': content}) def directory_add(self, directories): return self.post('directory/add', {'directories': directories}) def directory_missing(self, directories): return self.post('directory/missing', {'directories': directories}) def directory_ls(self, directory, recursive=False): return self.get('directory/ls', {'directory': directory, 'recursive': recursive}) def revision_get(self, revisions): return self.post('revision', {'revisions': revisions}) def revision_log(self, revisions, limit=None): return self.post('revision/log', {'revisions': revisions, 'limit': limit}) def revision_shortlog(self, revisions, limit=None): return self.post('revision/shortlog', {'revisions': revisions, 'limit': limit}) def revision_add(self, revisions): return self.post('revision/add', {'revisions': revisions}) def revision_missing(self, revisions): return self.post('revision/missing', {'revisions': revisions}) def release_add(self, releases): return self.post('release/add', {'releases': releases}) def release_get(self, releases): return self.post('release', {'releases': releases}) def release_missing(self, releases): return self.post('release/missing', {'releases': releases}) def object_find_by_sha1_git(self, ids): return self.post('object/find_by_sha1_git', {'ids': ids}) def snapshot_add(self, origin, visit, snapshot): return self.post('snapshot/add', { 'origin': origin, 'visit': visit, 'snapshot': snapshot, }) def snapshot_get(self, snapshot_id): return self.post('snapshot', { 'snapshot_id': snapshot_id }) def snapshot_get_by_origin_visit(self, origin, visit): return self.post('snapshot/by_origin_visit', { 'origin': origin, 'visit': visit }) def snapshot_get_latest(self, origin, allowed_statuses=None): return self.post('snapshot/latest', { 'origin': origin, 'allowed_statuses': allowed_statuses }) def snapshot_count_branches(self, snapshot_id): return self.post('snapshot/count_branches', { 'snapshot_id': snapshot_id }) def snapshot_get_branches(self, snapshot_id, branches_from=b'', branches_count=1000, target_types=None): return self.post('snapshot/get_branches', { 'snapshot_id': snapshot_id, 'branches_from': branches_from, 'branches_count': branches_count, 'target_types': target_types }) def origin_get(self, origin): return self.post('origin/get', {'origin': origin}) def origin_search(self, url_pattern, offset=0, limit=50, regexp=False, with_visit=False): return self.post('origin/search', {'url_pattern': url_pattern, 'offset': offset, 'limit': limit, 'regexp': regexp, 'with_visit': with_visit}) def origin_add(self, origins): return self.post('origin/add_multi', {'origins': origins}) def origin_add_one(self, origin): return self.post('origin/add', {'origin': origin}) - def origin_visit_add(self, origin, ts): - return self.post('origin/visit/add', {'origin': origin, 'ts': ts}) + def origin_visit_add(self, origin, date, *, ts=None): + if ts is None: + if date is None: + raise TypeError('origin_visit_add expected 2 arguments.') + else: + assert date is None + warnings.warn("argument 'ts' of origin_visit_add was renamed " + "to 'date' in v0.0.109.", + DeprecationWarning) + date = ts + return self.post('origin/visit/add', {'origin': origin, 'date': date}) def origin_visit_update(self, origin, visit_id, status, metadata=None): return self.post('origin/visit/update', {'origin': origin, 'visit_id': visit_id, 'status': status, 'metadata': metadata}) def origin_visit_get(self, origin, last_visit=None, limit=None): return self.post('origin/visit/get', { 'origin': origin, 'last_visit': last_visit, 'limit': limit}) def origin_visit_get_by(self, origin, visit): return self.post('origin/visit/getby', {'origin': origin, 'visit': visit}) def person_get(self, person): return self.post('person', {'person': person}) def fetch_history_start(self, origin_id): return self.post('fetch_history/start', {'origin_id': origin_id}) def fetch_history_end(self, fetch_history_id, data): return self.post('fetch_history/end', {'fetch_history_id': fetch_history_id, 'data': data}) def fetch_history_get(self, fetch_history_id): return self.get('fetch_history', {'id': fetch_history_id}) def entity_add(self, entities): return self.post('entity/add', {'entities': entities}) def entity_get(self, uuid): return self.post('entity/get', {'uuid': uuid}) def entity_get_one(self, uuid): return self.get('entity', {'uuid': uuid}) def entity_get_from_lister_metadata(self, entities): return self.post('entity/from_lister_metadata', {'entities': entities}) def stat_counters(self): return self.get('stat/counters') def directory_entry_get_by_path(self, directory, paths): return self.post('directory/path', dict(directory=directory, paths=paths)) def tool_add(self, tools): return self.post('tool/add', {'tools': tools}) def tool_get(self, tool): return self.post('tool/data', {'tool': tool}) def origin_metadata_add(self, origin_id, ts, provider, tool, metadata): return self.post('origin/metadata/add', {'origin_id': origin_id, 'ts': ts, 'provider': provider, 'tool': tool, 'metadata': metadata}) def origin_metadata_get_by(self, origin_id, provider_type=None): return self.post('origin/metadata/get', { 'origin_id': origin_id, 'provider_type': provider_type }) def metadata_provider_add(self, provider_name, provider_type, provider_url, metadata): return self.post('provider/add', {'provider_name': provider_name, 'provider_type': provider_type, 'provider_url': provider_url, 'metadata': metadata}) def metadata_provider_get(self, provider_id): return self.post('provider/get', {'provider_id': provider_id}) def metadata_provider_get_by(self, provider): return self.post('provider/getby', {'provider': provider}) def diff_directories(self, from_dir, to_dir, track_renaming=False): return self.post('algos/diff_directories', {'from_dir': from_dir, 'to_dir': to_dir, 'track_renaming': track_renaming}) def diff_revisions(self, from_rev, to_rev, track_renaming=False): return self.post('algos/diff_revisions', {'from_rev': from_rev, 'to_rev': to_rev, 'track_renaming': track_renaming}) def diff_revision(self, revision, track_renaming=False): return self.post('algos/diff_revision', {'revision': revision, 'track_renaming': track_renaming}) diff --git a/swh/storage/common.py b/swh/storage/common.py index a1de98b91..11eddf8d5 100644 --- a/swh/storage/common.py +++ b/swh/storage/common.py @@ -1,71 +1,80 @@ # Copyright (C) 2015-2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import inspect import functools def apply_options(cursor, options): """Applies the given postgresql client options to the given cursor. Returns a dictionary with the old values if they changed.""" old_options = {} for option, value in options.items(): cursor.execute('SHOW %s' % option) old_value = cursor.fetchall()[0][0] if old_value != value: cursor.execute('SET LOCAL %s TO %%s' % option, (value,)) old_options[option] = old_value return old_options def db_transaction(**client_options): """decorator to execute Storage methods within DB transactions The decorated method must accept a `cur` and `db` keyword argument Client options are passed as `set` options to the postgresql server """ def decorator(meth, __client_options=client_options): + if inspect.isgeneratorfunction(meth): + raise ValueError( + 'Use db_transaction_generator for generator functions.') + @functools.wraps(meth) def _meth(self, *args, **kwargs): if 'cur' in kwargs and kwargs['cur']: cur = kwargs['cur'] old_options = apply_options(cur, __client_options) ret = meth(self, *args, **kwargs) apply_options(cur, old_options) return ret else: db = self.get_db() with db.transaction() as cur: apply_options(cur, __client_options) return meth(self, *args, db=db, cur=cur, **kwargs) return _meth return decorator def db_transaction_generator(**client_options): """decorator to execute Storage methods within DB transactions, while returning a generator The decorated method must accept a `cur` and `db` keyword argument Client options are passed as `set` options to the postgresql server """ def decorator(meth, __client_options=client_options): + if not inspect.isgeneratorfunction(meth): + raise ValueError( + 'Use db_transaction for non-generator functions.') + @functools.wraps(meth) def _meth(self, *args, **kwargs): if 'cur' in kwargs and kwargs['cur']: cur = kwargs['cur'] old_options = apply_options(cur, __client_options) yield from meth(self, *args, **kwargs) apply_options(cur, old_options) else: db = self.get_db() with db.transaction() as cur: apply_options(cur, __client_options) yield from meth(self, *args, db=db, cur=cur, **kwargs) return _meth return decorator diff --git a/swh/storage/db_utils.py b/swh/storage/db_utils.py index 87c1d581b..404f07b78 100644 --- a/swh/storage/db_utils.py +++ b/swh/storage/db_utils.py @@ -1,118 +1,123 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # # This code has been imported from psycopg2, version 2.7.4, # https://github.com/psycopg/psycopg2/tree/5afb2ce803debea9533e293eef73c92ffce95bcd # and modified by Software Heritage. # # Original file: lib/extras.py # # psycopg2 is free software: you can redistribute it and/or modify it under the # terms of the GNU Lesser General Public License as published by the Free # Software Foundation, either version 3 of the License, or (at your option) any # later version. import re import psycopg2.extensions def _paginate(seq, page_size): """Consume an iterable and return it in chunks. Every chunk is at most `page_size`. Never return an empty chunk. """ page = [] it = iter(seq) while 1: try: for i in range(page_size): page.append(next(it)) yield page page = [] except StopIteration: if page: yield page return def _split_sql(sql): """Split *sql* on a single ``%s`` placeholder. Split on the %s, perform %% replacement and return pre, post lists of snippets. """ curr = pre = [] post = [] tokens = re.split(br'(%.)', sql) for token in tokens: if len(token) != 2 or token[:1] != b'%': curr.append(token) continue if token[1:] == b's': if curr is pre: curr = post else: raise ValueError( "the query contains more than one '%s' placeholder") elif token[1:] == b'%': curr.append(b'%') else: raise ValueError("unsupported format character: '%s'" % token[1:].decode('ascii', 'replace')) if curr is pre: raise ValueError("the query doesn't contain any '%s' placeholder") return pre, post def execute_values_generator(cur, sql, argslist, template=None, page_size=100): - '''Execute a statement using :sql:`VALUES` with a sequence of parameters. + '''Execute a statement using SQL ``VALUES`` with a sequence of parameters. Rows returned by the query are returned through a generator. You need to consume the generator for the queries to be executed! + :param cur: the cursor to use to execute the query. :param sql: the query to execute. It must contain a single ``%s`` placeholder, which will be replaced by a `VALUES list`__. Example: ``"INSERT INTO mytable (id, f1, f2) VALUES %s"``. :param argslist: sequence of sequences or dictionaries with the arguments to send to the query. The type and content must be consistent with *template*. :param template: the snippet to merge to every item in *argslist* to compose the query. + - If the *argslist* items are sequences it should contain positional placeholders (e.g. ``"(%s, %s, %s)"``, or ``"(%s, %s, 42)``" if there are constants value...). - If the *argslist* items are mappings it should contain named placeholders (e.g. ``"(%(id)s, %(f1)s, 42)"``). + If not specified, assume the arguments are sequence and use a simple positional template (i.e. ``(%s, %s, ...)``), with the number of placeholders sniffed by the first element in *argslist*. :param page_size: maximum number of *argslist* items to include in every statement. If there are more items the function will execute more than one statement. :param yield_from_cur: Whether to yield results from the cursor in this function directly. + .. __: https://www.postgresql.org/docs/current/static/queries-values.html + After the execution of the function the `cursor.rowcount` property will **not** contain a total result. ''' # we can't just use sql % vals because vals is bytes: if sql is bytes # there will be some decoding error because of stupid codec used, and Py3 # doesn't implement % on bytes. if not isinstance(sql, bytes): sql = sql.encode( psycopg2.extensions.encodings[cur.connection.encoding] ) pre, post = _split_sql(sql) for page in _paginate(argslist, page_size=page_size): if template is None: template = b'(' + b','.join([b'%s'] * len(page[0])) + b')' parts = pre[:] for args in page: parts.append(cur.mogrify(template, args)) parts.append(b',') parts[-1:] = post cur.execute(b''.join(parts)) yield from cur diff --git a/swh/storage/listener.py b/swh/storage/listener.py index 4ea90a3c3..e86c8d0d4 100644 --- a/swh/storage/listener.py +++ b/swh/storage/listener.py @@ -1,111 +1,112 @@ # Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json import logging import kafka import msgpack import swh.storage.db from swh.core.config import load_named_config CONFIG_BASENAME = 'storage/listener' DEFAULT_CONFIG = { 'database': ('str', 'service=softwareheritage'), 'brokers': ('list[str]', ['getty.internal.softwareheritage.org']), 'topic_prefix': ('str', 'swh.tmp_journal.new'), 'poll_timeout': ('int', 10), } def decode_sha(value): """Decode the textual representation of a SHA hash""" if isinstance(value, str): return bytes.fromhex(value) return value def decode_json(value): """Decode a JSON value containing hashes and other types""" value = json.loads(value) return {k: decode_sha(v) for k, v in value.items()} OBJECT_TYPES = { 'content', 'skipped_content', 'directory', 'revision', 'release', + 'snapshot', 'origin_visit', 'origin', } def register_all_notifies(db): """Register to notifications for all object types listed in OBJECT_TYPES""" with db.transaction() as cur: for object_type in OBJECT_TYPES: db.register_listener('new_%s' % object_type, cur) def dispatch_notify(topic_prefix, producer, notify): """Dispatch a notification to the proper topic""" channel = notify.channel if not channel.startswith('new_') or channel[4:] not in OBJECT_TYPES: logging.warn("Got unexpected notify %s" % notify) return object_type = channel[4:] topic = '%s.%s' % (topic_prefix, object_type) data = decode_json(notify.payload) producer.send(topic, value=data) def run_from_config(config): """Run the Software Heritage listener from configuration""" db = swh.storage.db.Db.connect(config['database']) def key_to_kafka(key): """Serialize a key, possibly a dict, in a predictable way. Duplicated from swh.journal to avoid a cyclic dependency.""" p = msgpack.Packer(use_bin_type=True) if isinstance(key, dict): return p.pack_map_pairs(sorted(key.items())) else: return p.pack(key) producer = kafka.KafkaProducer( bootstrap_servers=config['brokers'], value_serializer=key_to_kafka, ) register_all_notifies(db) topic_prefix = config['topic_prefix'] poll_timeout = config['poll_timeout'] try: while True: for notify in db.listen_notifies(poll_timeout): dispatch_notify(topic_prefix, producer, notify) producer.flush() except Exception: logging.exception("Caught exception") producer.flush() if __name__ == '__main__': logging.basicConfig( level=logging.INFO, format='%(asctime)s %(process)d %(levelname)s %(message)s' ) config = load_named_config(CONFIG_BASENAME, DEFAULT_CONFIG) run_from_config(config) diff --git a/swh/storage/sql/30-swh-schema.sql b/swh/storage/sql/30-swh-schema.sql index 5c1d00169..461d561e3 100644 --- a/swh/storage/sql/30-swh-schema.sql +++ b/swh/storage/sql/30-swh-schema.sql @@ -1,346 +1,346 @@ --- --- SQL implementation of the Software Heritage data model --- -- schema versions create table dbversion ( version int primary key, release timestamptz, description text ); -- latest schema version insert into dbversion(version, release, description) - values(127, now(), 'Work In Progress'); + values(128, now(), 'Work In Progress'); -- a SHA1 checksum create domain sha1 as bytea check (length(value) = 20); -- a Git object ID, i.e., a Git-style salted SHA1 checksum create domain sha1_git as bytea check (length(value) = 20); -- a SHA256 checksum create domain sha256 as bytea check (length(value) = 32); -- a blake2 checksum create domain blake2s256 as bytea check (length(value) = 32); -- UNIX path (absolute, relative, individual path component, etc.) create domain unix_path as bytea; -- a set of UNIX-like access permissions, as manipulated by, e.g., chmod create domain file_perms as int; -- Checksums about actual file content. Note that the content itself is not -- stored in the DB, but on external (key-value) storage. A single checksum is -- used as key there, but the other can be used to verify that we do not inject -- content collisions not knowingly. create table content ( sha1 sha1 not null, sha1_git sha1_git not null, sha256 sha256 not null, blake2s256 blake2s256, length bigint not null, ctime timestamptz not null default now(), -- creation time, i.e. time of (first) injection into the storage status content_status not null default 'visible', object_id bigserial ); -- An origin is a place, identified by an URL, where software source code -- artifacts can be found. We support different kinds of origins, e.g., git and -- other VCS repositories, web pages that list tarballs URLs (e.g., -- http://www.kernel.org), indirect tarball URLs (e.g., -- http://www.example.org/latest.tar.gz), etc. The key feature of an origin is -- that it can be *fetched* from (wget, git clone, svn checkout, etc.) to -- retrieve all the contained software. create table origin ( id bigserial not null, type text, -- TODO use an enum here (?) url text not null ); -- Content blobs observed somewhere, but not ingested into the archive for -- whatever reason. This table is separate from the content table as we might -- not have the sha1 checksum of skipped contents (for instance when we inject -- git repositories, objects that are too big will be skipped here, and we will -- only know their sha1_git). 'reason' contains the reason the content was -- skipped. origin is a nullable column allowing to find out which origin -- contains that skipped content. create table skipped_content ( sha1 sha1, sha1_git sha1_git, sha256 sha256, blake2s256 blake2s256, length bigint not null, ctime timestamptz not null default now(), status content_status not null default 'absent', reason text not null, origin bigint, object_id bigserial ); -- Log of all origin fetches (i.e., origin crawling) that have been done in the -- past, or are still ongoing. Similar to list_history, but for origins. create table fetch_history ( id bigserial, origin bigint, date timestamptz not null, status boolean, -- true if and only if the fetch has been successful result jsonb, -- more detailed returned values, times, etc... stdout text, stderr text, -- null when status is true, filled otherwise duration interval -- fetch duration of NULL if still ongoing ); -- A file-system directory. A directory is a list of directory entries (see -- tables: directory_entry_{dir,file}). -- -- To list the contents of a directory: -- 1. list the contained directory_entry_dir using array dir_entries -- 2. list the contained directory_entry_file using array file_entries -- 3. list the contained directory_entry_rev using array rev_entries -- 4. UNION -- -- Synonyms/mappings: -- * git: tree create table directory ( id sha1_git, dir_entries bigint[], -- sub-directories, reference directory_entry_dir file_entries bigint[], -- contained files, reference directory_entry_file rev_entries bigint[], -- mounted revisions, reference directory_entry_rev object_id bigserial -- short object identifier ); -- A directory entry pointing to a (sub-)directory. create table directory_entry_dir ( id bigserial, target sha1_git, -- id of target directory name unix_path, -- path name, relative to containing dir perms file_perms -- unix-like permissions ); -- A directory entry pointing to a file content. create table directory_entry_file ( id bigserial, target sha1_git, -- id of target file name unix_path, -- path name, relative to containing dir perms file_perms -- unix-like permissions ); -- A directory entry pointing to a revision. create table directory_entry_rev ( id bigserial, target sha1_git, -- id of target revision name unix_path, -- path name, relative to containing dir perms file_perms -- unix-like permissions ); -- A person referenced by some source code artifacts, e.g., a VCS revision or -- release metadata. create table person ( id bigserial, name bytea, -- advisory: not null if we managed to parse a name email bytea, -- advisory: not null if we managed to parse an email fullname bytea not null -- freeform specification; what is actually used in the checksums -- will usually be of the form 'name ' ); -- The state of a source code tree at a specific point in time. -- -- Synonyms/mappings: -- * git / subversion / etc: commit -- * tarball: a specific tarball -- -- Revisions are organized as DAGs. Each revision points to 0, 1, or more (in -- case of merges) parent revisions. Each revision points to a directory, i.e., -- a file-system tree containing files and directories. create table revision ( id sha1_git, date timestamptz, date_offset smallint, committer_date timestamptz, committer_date_offset smallint, type revision_type not null, directory sha1_git, -- source code "root" directory message bytea, author bigint, committer bigint, synthetic boolean not null default false, -- true iff revision has been created by Software Heritage metadata jsonb, -- extra metadata (tarball checksums, extra commit information, etc...) object_id bigserial, date_neg_utc_offset boolean, committer_date_neg_utc_offset boolean ); -- either this table or the sha1_git[] column on the revision table create table revision_history ( id sha1_git, parent_id sha1_git, parent_rank int not null default 0 -- parent position in merge commits, 0-based ); -- Crawling history of software origins visited by Software Heritage. Each -- visit is a 3-way mapping between a software origin, a timestamp, and a -- snapshot object capturing the full-state of the origin at visit time. create table origin_visit ( origin bigint not null, visit bigint not null, date timestamptz not null, status origin_visit_status not null, metadata jsonb, snapshot_id bigint ); comment on column origin_visit.origin is 'Visited origin'; comment on column origin_visit.visit is 'Sequential visit number for the origin'; comment on column origin_visit.date is 'Visit timestamp'; comment on column origin_visit.status is 'Visit result'; comment on column origin_visit.metadata is 'Origin metadata at visit time'; comment on column origin_visit.snapshot_id is 'Origin snapshot at visit time'; -- A snapshot represents the entire state of a software origin as crawled by -- Software Heritage. This table is a simple mapping between (public) intrinsic -- snapshot identifiers and (private) numeric sequential identifiers. create table snapshot ( object_id bigserial not null, -- PK internal object identifier id sha1_git -- snapshot intrinsic identifier ); -- Each snapshot associate "branch" names to other objects in the Software -- Heritage Merkle DAG. This table describes branches as mappings between names -- and target typed objects. create table snapshot_branch ( object_id bigserial not null, -- PK internal object identifier name bytea not null, -- branch name, e.g., "master" or "feature/drag-n-drop" target bytea, -- target object identifier, e.g., a revision identifier target_type snapshot_target -- target object type, e.g., "revision" ); -- Mapping between snapshots and their branches. create table snapshot_branches ( snapshot_id bigint not null, -- snapshot identifier, ref. snapshot.object_id branch_id bigint not null -- branch identifier, ref. snapshot_branch.object_id ); -- A "memorable" point in time in the development history of a software -- project. -- -- Synonyms/mappings: -- * git: tag (of the annotated kind, otherwise they are just references) -- * tarball: the release version number create table release ( id sha1_git not null, target sha1_git, date timestamptz, date_offset smallint, name bytea, comment bytea, author bigint, synthetic boolean not null default false, -- true iff release has been created by Software Heritage object_id bigserial, target_type object_type not null, date_neg_utc_offset boolean ); -- Tools create table tool ( id serial not null, name text not null, version text not null, configuration jsonb ); comment on table tool is 'Tool information'; comment on column tool.id is 'Tool identifier'; comment on column tool.version is 'Tool name'; comment on column tool.version is 'Tool version'; comment on column tool.configuration is 'Tool configuration: command line, flags, etc...'; create table metadata_provider ( id serial not null, provider_name text not null, provider_type text not null, provider_url text, metadata jsonb ); comment on table metadata_provider is 'Metadata provider information'; comment on column metadata_provider.id is 'Provider''s identifier'; comment on column metadata_provider.provider_name is 'Provider''s name'; comment on column metadata_provider.provider_url is 'Provider''s url'; comment on column metadata_provider.metadata is 'Other metadata about provider'; -- Discovery of metadata during a listing, loading, deposit or external_catalog of an origin -- also provides a translation to a defined json schema using a translation tool (tool_id) create table origin_metadata ( id bigserial not null, -- PK internal object identifier origin_id bigint not null, -- references origin(id) discovery_date timestamptz not null, -- when it was extracted provider_id bigint not null, -- ex: 'hal', 'lister-github', 'loader-github' tool_id bigint not null, metadata jsonb not null ); comment on table origin_metadata is 'keeps all metadata found concerning an origin'; comment on column origin_metadata.id is 'the origin_metadata object''s id'; comment on column origin_metadata.origin_id is 'the origin id for which the metadata was found'; comment on column origin_metadata.discovery_date is 'the date of retrieval'; comment on column origin_metadata.provider_id is 'the metadata provider: github, openhub, deposit, etc.'; comment on column origin_metadata.tool_id is 'the tool used for extracting metadata: lister-github, etc.'; comment on column origin_metadata.metadata is 'metadata in json format but with original terms'; -- Keep a cache of object counts create table object_counts ( object_type text, -- table for which we're counting objects (PK) value bigint, -- count of objects in the table last_update timestamptz, -- last update for the object count in this table single_update boolean -- whether we update this table standalone (true) or through bucketed counts (false) ); create table object_counts_bucketed ( line serial not null, -- PK object_type text not null, -- table for which we're counting objects identifier text not null, -- identifier across which we're bucketing objects bucket_start bytea, -- lower bound (inclusive) for the bucket bucket_end bytea, -- upper bound (exclusive) for the bucket value bigint, -- count of objects in the bucket last_update timestamptz -- last update for the object count in this bucket ); diff --git a/swh/storage/sql/70-swh-triggers.sql b/swh/storage/sql/70-swh-triggers.sql index 478817839..ce26b25a4 100644 --- a/swh/storage/sql/70-swh-triggers.sql +++ b/swh/storage/sql/70-swh-triggers.sql @@ -1,130 +1,147 @@ -- Asynchronous notification of new content insertions create function notify_new_content() returns trigger language plpgsql as $$ begin perform pg_notify('new_content', json_build_object( 'sha1', encode(new.sha1, 'hex'), 'sha1_git', encode(new.sha1_git, 'hex'), 'sha256', encode(new.sha256, 'hex'), 'blake2s256', encode(new.blake2s256, 'hex') )::text); return null; end; $$; create trigger notify_new_content after insert on content for each row execute procedure notify_new_content(); -- Asynchronous notification of new origin insertions create function notify_new_origin() returns trigger language plpgsql as $$ begin perform pg_notify('new_origin', json_build_object('id', new.id)::text); return null; end; $$; create trigger notify_new_origin after insert on origin for each row execute procedure notify_new_origin(); -- Asynchronous notification of new skipped content insertions create function notify_new_skipped_content() returns trigger language plpgsql as $$ begin perform pg_notify('new_skipped_content', json_build_object( 'sha1', encode(new.sha1, 'hex'), 'sha1_git', encode(new.sha1_git, 'hex'), 'sha256', encode(new.sha256, 'hex'), 'blake2s256', encode(new.blake2s256, 'hex') )::text); return null; end; $$; create trigger notify_new_skipped_content after insert on skipped_content for each row execute procedure notify_new_skipped_content(); -- Asynchronous notification of new directory insertions create function notify_new_directory() returns trigger language plpgsql as $$ begin perform pg_notify('new_directory', json_build_object('id', encode(new.id, 'hex'))::text); return null; end; $$; create trigger notify_new_directory after insert on directory for each row execute procedure notify_new_directory(); -- Asynchronous notification of new revision insertions create function notify_new_revision() returns trigger language plpgsql as $$ begin perform pg_notify('new_revision', json_build_object('id', encode(new.id, 'hex'))::text); return null; end; $$; create trigger notify_new_revision after insert on revision for each row execute procedure notify_new_revision(); -- Asynchronous notification of new origin visits create function notify_new_origin_visit() returns trigger language plpgsql as $$ begin perform pg_notify('new_origin_visit', json_build_object( 'origin', new.origin, 'visit', new.visit )::text); return null; end; $$; create trigger notify_new_origin_visit after insert on origin_visit for each row execute procedure notify_new_origin_visit(); -- Asynchronous notification of new release insertions create function notify_new_release() returns trigger language plpgsql as $$ begin perform pg_notify('new_release', json_build_object('id', encode(new.id, 'hex'))::text); return null; end; $$; create trigger notify_new_release after insert on release for each row execute procedure notify_new_release(); + + +-- Asynchronous notification of new snapshot insertions +create function notify_new_snapshot() + returns trigger + language plpgsql +as $$ + begin + perform pg_notify('new_snapshot', json_build_object('id', encode(new.id, 'hex'))::text); + return null; + end; +$$; + +create trigger notify_new_snapshot + after insert on snapshot + for each row + execute procedure notify_new_snapshot(); diff --git a/swh/storage/storage.py b/swh/storage/storage.py index 6d7942f44..ff27c24a1 100644 --- a/swh/storage/storage.py +++ b/swh/storage/storage.py @@ -1,1357 +1,1396 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict from concurrent.futures import ThreadPoolExecutor import datetime import itertools import json +import warnings import dateutil.parser import psycopg2 import psycopg2.pool from . import converters from .common import db_transaction_generator, db_transaction from .db import Db from .exc import StorageDBError from .algos import diff from swh.model.hashutil import ALGORITHMS, hash_to_bytes from swh.objstorage import get_objstorage from swh.objstorage.exc import ObjNotFoundError # Max block size of contents to return BULK_BLOCK_CONTENT_LEN_MAX = 10000 EMPTY_SNAPSHOT_ID = hash_to_bytes('1a8893e6a86f444e8be8e7bda6cb34fb1735a00e') """Identifier for the empty snapshot""" class Storage(): """SWH storage proxy, encompassing DB and object storage """ def __init__(self, db, objstorage, min_pool_conns=1, max_pool_conns=10): """ Args: db_conn: either a libpq connection string, or a psycopg2 connection obj_root: path to the root of the object storage """ try: if isinstance(db, psycopg2.extensions.connection): self._pool = None self._db = Db(db) else: self._pool = psycopg2.pool.ThreadedConnectionPool( min_pool_conns, max_pool_conns, db ) self._db = None except psycopg2.OperationalError as e: raise StorageDBError(e) self.objstorage = get_objstorage(**objstorage) def get_db(self): if self._db: return self._db else: return Db.from_pool(self._pool) def check_config(self, *, check_write): """Check that the storage is configured and ready to go.""" if not self.objstorage.check_config(check_write=check_write): return False # Check permissions on one of the tables with self.get_db().transaction() as cur: if check_write: check = 'INSERT' else: check = 'SELECT' cur.execute( "select has_table_privilege(current_user, 'content', %s)", (check,) ) return cur.fetchone()[0] return True def content_add(self, content): """Add content blobs to the storage Note: in case of DB errors, objects might have already been added to the object storage and will not be removed. Since addition to the object storage is idempotent, that should not be a problem. Args: content (iterable): iterable of dictionaries representing individual pieces of content to add. Each dictionary has the following keys: - data (bytes): the actual content - length (int): content length (default: -1) - one key for each checksum algorithm in :data:`swh.model.hashutil.ALGORITHMS`, mapped to the corresponding checksum - status (str): one of visible, hidden, absent - reason (str): if status = absent, the reason why - origin (int): if status = absent, the origin we saw the content in """ db = self.get_db() def _unique_key(hash, keys=db.content_hash_keys): """Given a hash (tuple or dict), return a unique key from the aggregation of keys. """ if isinstance(hash, tuple): return hash return tuple([hash[k] for k in keys]) content_by_status = defaultdict(list) for d in content: if 'status' not in d: d['status'] = 'visible' if 'length' not in d: d['length'] = -1 content_by_status[d['status']].append(d) content_with_data = content_by_status['visible'] content_without_data = content_by_status['absent'] missing_content = set(self.content_missing(content_with_data)) missing_skipped = set(_unique_key(hashes) for hashes in self.skipped_content_missing( content_without_data)) def add_to_objstorage(): data = { cont['sha1']: cont['data'] for cont in content_with_data if cont['sha1'] in missing_content } self.objstorage.add_batch(data) with db.transaction() as cur: with ThreadPoolExecutor(max_workers=1) as executor: added_to_objstorage = executor.submit(add_to_objstorage) if missing_content: # create temporary table for metadata injection db.mktemp('content', cur) content_filtered = (cont for cont in content_with_data if cont['sha1'] in missing_content) db.copy_to(content_filtered, 'tmp_content', db.content_get_metadata_keys, cur) # move metadata in place db.content_add_from_temp(cur) if missing_skipped: missing_filtered = ( cont for cont in content_without_data if _unique_key(cont) in missing_skipped ) db.mktemp('skipped_content', cur) db.copy_to(missing_filtered, 'tmp_skipped_content', db.skipped_content_keys, cur) # move metadata in place db.skipped_content_add_from_temp(cur) # Wait for objstorage addition before returning from the # transaction, bubbling up any exception added_to_objstorage.result() @db_transaction() def content_update(self, content, keys=[], db=None, cur=None): """Update content blobs to the storage. Does nothing for unknown contents or skipped ones. Args: content (iterable): iterable of dictionaries representing individual pieces of content to update. Each dictionary has the following keys: - data (bytes): the actual content - length (int): content length (default: -1) - one key for each checksum algorithm in :data:`swh.model.hashutil.ALGORITHMS`, mapped to the corresponding checksum - status (str): one of visible, hidden, absent keys (list): List of keys (str) whose values needs an update, e.g., new hash column """ # TODO: Add a check on input keys. How to properly implement # this? We don't know yet the new columns. db.mktemp('content', cur) select_keys = list(set(db.content_get_metadata_keys).union(set(keys))) db.copy_to(content, 'tmp_content', select_keys, cur) db.content_update_from_temp(keys_to_update=keys, cur=cur) def content_get(self, content): """Retrieve in bulk contents and their data. Args: content: iterables of sha1 Yields: dict: Generates streams of contents as dict with their raw data: - sha1: sha1's content - data: bytes data of the content Raises: ValueError in case of too much contents are required. cf. BULK_BLOCK_CONTENT_LEN_MAX """ # FIXME: Improve on server module to slice the result if len(content) > BULK_BLOCK_CONTENT_LEN_MAX: raise ValueError( "Send at maximum %s contents." % BULK_BLOCK_CONTENT_LEN_MAX) for obj_id in content: try: data = self.objstorage.get(obj_id) except ObjNotFoundError: yield None continue yield {'sha1': obj_id, 'data': data} @db_transaction_generator(statement_timeout=500) def content_get_metadata(self, content, db=None, cur=None): """Retrieve content metadata in bulk Args: content: iterable of content identifiers (sha1) Returns: an iterable with content metadata corresponding to the given ids """ for metadata in db.content_get_metadata_from_sha1s(content, cur): yield dict(zip(db.content_get_metadata_keys, metadata)) @db_transaction_generator() def content_missing(self, content, key_hash='sha1', db=None, cur=None): """List content missing from storage Args: content ([dict]): iterable of dictionaries containing one key for each checksum algorithm in :data:`swh.model.hashutil.ALGORITHMS`, mapped to the corresponding checksum, and a length key mapped to the content length. key_hash (str): name of the column to use as hash id result (default: 'sha1') Returns: iterable ([bytes]): missing content ids (as per the key_hash column) Raises: TODO: an exception when we get a hash collision. """ keys = db.content_hash_keys if key_hash not in keys: raise ValueError("key_hash should be one of %s" % keys) key_hash_idx = keys.index(key_hash) if not content: return for obj in db.content_missing_from_list(content, cur): yield obj[key_hash_idx] @db_transaction_generator() def content_missing_per_sha1(self, contents, db=None, cur=None): """List content missing from storage based only on sha1. Args: contents: Iterable of sha1 to check for absence. Returns: iterable: missing ids Raises: TODO: an exception when we get a hash collision. """ for obj in db.content_missing_per_sha1(contents, cur): yield obj[0] @db_transaction_generator() def skipped_content_missing(self, content, db=None, cur=None): """List skipped_content missing from storage Args: content: iterable of dictionaries containing the data for each checksum algorithm. Returns: iterable: missing signatures """ keys = db.content_hash_keys db.mktemp('skipped_content', cur) db.copy_to(content, 'tmp_skipped_content', keys + ['length', 'reason'], cur) yield from db.skipped_content_missing_from_temp(cur) @db_transaction() def content_find(self, content, db=None, cur=None): """Find a content hash in db. Args: content: a dictionary representing one content hash, mapping checksum algorithm names (see swh.model.hashutil.ALGORITHMS) to checksum values Returns: a triplet (sha1, sha1_git, sha256) if the content exist or None otherwise. Raises: ValueError: in case the key of the dictionary is not sha1, sha1_git nor sha256. """ if not set(content).intersection(ALGORITHMS): raise ValueError('content keys must contain at least one of: ' 'sha1, sha1_git, sha256, blake2s256') c = db.content_find(sha1=content.get('sha1'), sha1_git=content.get('sha1_git'), sha256=content.get('sha256'), blake2s256=content.get('blake2s256'), cur=cur) if c: return dict(zip(db.content_find_cols, c)) return None def directory_add(self, directories): """Add directories to the storage Args: directories (iterable): iterable of dictionaries representing the individual directories to add. Each dict has the following keys: - id (sha1_git): the id of the directory to add - entries (list): list of dicts for each entry in the directory. Each dict has the following keys: - name (bytes) - type (one of 'file', 'dir', 'rev'): type of the directory entry (file, directory, revision) - target (sha1_git): id of the object pointed at by the directory entry - perms (int): entry permissions """ dirs = set() dir_entries = { 'file': defaultdict(list), 'dir': defaultdict(list), 'rev': defaultdict(list), } for cur_dir in directories: dir_id = cur_dir['id'] dirs.add(dir_id) for src_entry in cur_dir['entries']: entry = src_entry.copy() entry['dir_id'] = dir_id dir_entries[entry['type']][dir_id].append(entry) dirs_missing = set(self.directory_missing(dirs)) if not dirs_missing: return db = self.get_db() with db.transaction() as cur: # Copy directory ids dirs_missing_dict = ({'id': dir} for dir in dirs_missing) db.mktemp('directory', cur) db.copy_to(dirs_missing_dict, 'tmp_directory', ['id'], cur) # Copy entries for entry_type, entry_list in dir_entries.items(): entries = itertools.chain.from_iterable( entries_for_dir for dir_id, entries_for_dir in entry_list.items() if dir_id in dirs_missing) db.mktemp_dir_entry(entry_type) db.copy_to( entries, 'tmp_directory_entry_%s' % entry_type, ['target', 'name', 'perms', 'dir_id'], cur, ) # Do the final copy db.directory_add_from_temp(cur) @db_transaction_generator() def directory_missing(self, directories, db=None, cur=None): """List directories missing from storage Args: directories (iterable): an iterable of directory ids Yields: missing directory ids """ for obj in db.directory_missing_from_list(directories, cur): yield obj[0] @db_transaction_generator(statement_timeout=20000) def directory_ls(self, directory, recursive=False, db=None, cur=None): """Get entries for one directory. Args: - directory: the directory to list entries from. - recursive: if flag on, this list recursively from this directory. Returns: List of entries for such directory. """ if recursive: res_gen = db.directory_walk(directory, cur=cur) else: res_gen = db.directory_walk_one(directory, cur=cur) for line in res_gen: yield dict(zip(db.directory_ls_cols, line)) @db_transaction(statement_timeout=2000) def directory_entry_get_by_path(self, directory, paths, db=None, cur=None): """Get the directory entry (either file or dir) from directory with path. Args: - directory: sha1 of the top level directory - paths: path to lookup from the top level directory. From left (top) to right (bottom). Returns: The corresponding directory entry if found, None otherwise. """ res = db.directory_entry_get_by_path(directory, paths, cur) if res: return dict(zip(db.directory_ls_cols, res)) def revision_add(self, revisions): """Add revisions to the storage Args: revisions (iterable): iterable of dictionaries representing the individual revisions to add. Each dict has the following keys: - id (sha1_git): id of the revision to add - date (datetime.DateTime): date the revision was written - date_offset (int): offset from UTC in minutes the revision was written - date_neg_utc_offset (boolean): whether a null date_offset represents a negative UTC offset - committer_date (datetime.DateTime): date the revision got added to the origin - committer_date_offset (int): offset from UTC in minutes the revision was added to the origin - committer_date_neg_utc_offset (boolean): whether a null committer_date_offset represents a negative UTC offset - type (one of 'git', 'tar'): type of the revision added - directory (sha1_git): the directory the revision points at - message (bytes): the message associated with the revision - author_name (bytes): the name of the revision author - author_email (bytes): the email of the revision author - committer_name (bytes): the name of the revision committer - committer_email (bytes): the email of the revision committer - metadata (jsonb): extra information as dictionary - synthetic (bool): revision's nature (tarball, directory creates synthetic revision) - parents (list of sha1_git): the parents of this revision """ db = self.get_db() revisions_missing = set(self.revision_missing( set(revision['id'] for revision in revisions))) if not revisions_missing: return with db.transaction() as cur: db.mktemp_revision(cur) revisions_filtered = ( converters.revision_to_db(revision) for revision in revisions if revision['id'] in revisions_missing) parents_filtered = [] db.copy_to( revisions_filtered, 'tmp_revision', db.revision_add_cols, cur, lambda rev: parents_filtered.extend(rev['parents'])) db.revision_add_from_temp(cur) db.copy_to(parents_filtered, 'revision_history', ['id', 'parent_id', 'parent_rank'], cur) @db_transaction_generator() def revision_missing(self, revisions, db=None, cur=None): """List revisions missing from storage Args: revisions (iterable): revision ids Yields: missing revision ids """ if not revisions: return for obj in db.revision_missing_from_list(revisions, cur): yield obj[0] @db_transaction_generator(statement_timeout=500) def revision_get(self, revisions, db=None, cur=None): """Get all revisions from storage Args: revisions: an iterable of revision ids Returns: iterable: an iterable of revisions as dictionaries (or None if the revision doesn't exist) """ for line in db.revision_get_from_list(revisions, cur): data = converters.db_to_revision( dict(zip(db.revision_get_cols, line)) ) if not data['type']: yield None continue yield data @db_transaction_generator(statement_timeout=2000) def revision_log(self, revisions, limit=None, db=None, cur=None): """Fetch revision entry from the given root revisions. Args: revisions: array of root revision to lookup limit: limitation on the output result. Default to None. Yields: List of revision log from such revisions root. """ for line in db.revision_log(revisions, limit, cur): data = converters.db_to_revision( dict(zip(db.revision_get_cols, line)) ) if not data['type']: yield None continue yield data @db_transaction_generator(statement_timeout=2000) def revision_shortlog(self, revisions, limit=None, db=None, cur=None): """Fetch the shortlog for the given revisions Args: revisions: list of root revisions to lookup limit: depth limitation for the output Yields: a list of (id, parents) tuples. """ yield from db.revision_shortlog(revisions, limit, cur) def release_add(self, releases): """Add releases to the storage Args: releases (iterable): iterable of dictionaries representing the individual releases to add. Each dict has the following keys: - id (sha1_git): id of the release to add - revision (sha1_git): id of the revision the release points to - date (datetime.DateTime): the date the release was made - date_offset (int): offset from UTC in minutes the release was made - date_neg_utc_offset (boolean): whether a null date_offset represents a negative UTC offset - name (bytes): the name of the release - comment (bytes): the comment associated with the release - author_name (bytes): the name of the release author - author_email (bytes): the email of the release author """ db = self.get_db() release_ids = set(release['id'] for release in releases) releases_missing = set(self.release_missing(release_ids)) if not releases_missing: return with db.transaction() as cur: db.mktemp_release(cur) releases_filtered = ( converters.release_to_db(release) for release in releases if release['id'] in releases_missing ) db.copy_to(releases_filtered, 'tmp_release', db.release_add_cols, cur) db.release_add_from_temp(cur) @db_transaction_generator() def release_missing(self, releases, db=None, cur=None): """List releases missing from storage Args: releases: an iterable of release ids Returns: a list of missing release ids """ if not releases: return for obj in db.release_missing_from_list(releases, cur): yield obj[0] @db_transaction_generator(statement_timeout=500) def release_get(self, releases, db=None, cur=None): """Given a list of sha1, return the releases's information Args: releases: list of sha1s Yields: - releases: list of releases as dicts with the following keys: - - - id: origin's id - - revision: origin's type - - url: origin's url + dicts with the same keys as those given to `release_add` Raises: ValueError: if the keys does not match (url and type) nor id. """ for release in db.release_get_from_list(releases, cur): yield converters.db_to_release( dict(zip(db.release_get_cols, release)) ) @db_transaction() def snapshot_add(self, origin, visit, snapshot, db=None, cur=None): """Add a snapshot for the given origin/visit couple Args: origin (int): id of the origin visit (int): id of the visit snapshot (dict): the snapshot to add to the visit, containing the following keys: - **id** (:class:`bytes`): id of the snapshot - **branches** (:class:`dict`): branches the snapshot contains, mapping the branch name (:class:`bytes`) to the branch target, itself a :class:`dict` (or ``None`` if the branch points to an unknown object) - **target_type** (:class:`str`): one of ``content``, ``directory``, ``revision``, ``release``, ``snapshot``, ``alias`` - **target** (:class:`bytes`): identifier of the target (currently a ``sha1_git`` for all object kinds, or the name of the target branch for aliases) """ if not db.snapshot_exists(snapshot['id'], cur): db.mktemp_snapshot_branch(cur) db.copy_to( ( { 'name': name, 'target': info['target'] if info else None, 'target_type': info['target_type'] if info else None, } for name, info in snapshot['branches'].items() ), 'tmp_snapshot_branch', ['name', 'target', 'target_type'], cur, ) db.snapshot_add(origin, visit, snapshot['id'], cur) @db_transaction(statement_timeout=2000) def snapshot_get(self, snapshot_id, db=None, cur=None): """Get the content, possibly partial, of a snapshot with the given id The branches of the snapshot are iterated in the lexicographical order of their names. .. warning:: At most 1000 branches contained in the snapshot will be returned for performance reasons. In order to browse the whole set of branches, the method :meth:`snapshot_get_branches` should be used instead. Args: snapshot_id (bytes): identifier of the snapshot Returns: dict: a dict with three keys: * **id**: identifier of the snapshot * **branches**: a dict of branches contained in the snapshot whose keys are the branches' names. * **next_branch**: the name of the first branch not returned or :const:`None` if the snapshot has less than 1000 branches. """ return self.snapshot_get_branches(snapshot_id, db=db, cur=cur) @db_transaction(statement_timeout=2000) def snapshot_get_by_origin_visit(self, origin, visit, db=None, cur=None): """Get the content, possibly partial, of a snapshot for the given origin visit The branches of the snapshot are iterated in the lexicographical order of their names. .. warning:: At most 1000 branches contained in the snapshot will be returned for performance reasons. In order to browse the whole set of branches, the method :meth:`snapshot_get_branches` should be used instead. Args: origin (int): the origin identifier visit (int): the visit identifier Returns: - dict: a dict with three keys: + dict: None if the snapshot does not exist; + a dict with three keys otherwise: * **id**: identifier of the snapshot * **branches**: a dict of branches contained in the snapshot whose keys are the branches' names. * **next_branch**: the name of the first branch not returned or :const:`None` if the snapshot has less than 1000 branches. """ snapshot_id = db.snapshot_get_by_origin_visit(origin, visit, cur) if snapshot_id: return self.snapshot_get(snapshot_id, db=db, cur=cur) return None @db_transaction(statement_timeout=2000) def snapshot_get_latest(self, origin, allowed_statuses=None, db=None, cur=None): """Get the content, possibly partial, of the latest snapshot for the given origin, optionally only from visits that have one of the given allowed_statuses The branches of the snapshot are iterated in the lexicographical order of their names. .. warning:: At most 1000 branches contained in the snapshot will be returned for performance reasons. In order to browse the whole set of branches, the method :meth:`snapshot_get_branches` should be used instead. Args: origin (int): the origin identifier allowed_statuses (list of str): list of visit statuses considered to find the latest snapshot for the visit. For instance, ``allowed_statuses=['full']`` will only consider visits that have successfully run to completion. Returns: dict: a dict with three keys: * **id**: identifier of the snapshot * **branches**: a dict of branches contained in the snapshot whose keys are the branches' names. * **next_branch**: the name of the first branch not returned or :const:`None` if the snapshot has less than 1000 branches. """ origin_visit = db.origin_visit_get_latest_snapshot( origin, allowed_statuses=allowed_statuses, cur=cur) if origin_visit: origin_visit = dict(zip(db.origin_visit_get_cols, origin_visit)) return self.snapshot_get(origin_visit['snapshot'], db=db, cur=cur) @db_transaction(statement_timeout=2000) def snapshot_count_branches(self, snapshot_id, db=None, cur=None): """Count the number of branches in the snapshot with the given id Args: snapshot_id (bytes): identifier of the snapshot Returns: dict: A dict whose keys are the target types of branches and values their corresponding amount """ return dict([bc for bc in db.snapshot_count_branches(snapshot_id, cur)]) @db_transaction(statement_timeout=2000) def snapshot_get_branches(self, snapshot_id, branches_from=b'', branches_count=1000, target_types=None, db=None, cur=None): """Get the content, possibly partial, of a snapshot with the given id The branches of the snapshot are iterated in the lexicographical order of their names. Args: snapshot_id (bytes): identifier of the snapshot branches_from (bytes): optional parameter used to skip branches whose name is lesser than it before returning them branches_count (int): optional parameter used to restrain the amount of returned branches target_types (list): optional parameter used to filter the target types of branch to return (possible values that can be contained in that list are `'content', 'directory', 'revision', 'release', 'snapshot', 'alias'`) Returns: - dict: a dict with three keys: + dict: None if the snapshot does not exist; + a dict with three keys otherwise: * **id**: identifier of the snapshot * **branches**: a dict of branches contained in the snapshot whose keys are the branches' names. * **next_branch**: the name of the first branch not returned or :const:`None` if the snapshot has less than `branches_count` branches after `branches_from` included. """ if snapshot_id == EMPTY_SNAPSHOT_ID: return { 'id': snapshot_id, 'branches': {}, 'next_branch': None, } branches = {} next_branch = None fetched_branches = list(db.snapshot_get_by_id( snapshot_id, branches_from=branches_from, branches_count=branches_count+1, target_types=target_types, cur=cur, )) for branch in fetched_branches[:branches_count]: branch = dict(zip(db.snapshot_get_cols, branch)) del branch['snapshot_id'] name = branch.pop('name') if branch == {'target': None, 'target_type': None}: branch = None branches[name] = branch if len(fetched_branches) > branches_count: branch = dict(zip(db.snapshot_get_cols, fetched_branches[-1])) next_branch = branch['name'] if branches: return { 'id': snapshot_id, 'branches': branches, 'next_branch': next_branch, } return None @db_transaction() - def origin_visit_add(self, origin, ts, db=None, cur=None): + def origin_visit_add(self, origin, date=None, db=None, cur=None, *, + ts=None): """Add an origin_visit for the origin at ts with status 'ongoing'. Args: origin: Visited Origin id - ts: timestamp of such visit + date: timestamp of such visit Returns: dict: dictionary with keys origin and visit where: - origin: origin identifier - visit: the visit identifier for the new visit occurrence - - ts (datetime.DateTime): the visit date """ - if isinstance(ts, str): - ts = dateutil.parser.parse(ts) + if ts is None: + if date is None: + raise TypeError('origin_visit_add expected 2 arguments.') + else: + assert date is None + warnings.warn("argument 'ts' of origin_visit_add was renamed " + "to 'date' in v0.0.109.", + DeprecationWarning) + date = ts + + if isinstance(date, str): + date = dateutil.parser.parse(date) return { 'origin': origin, - 'visit': db.origin_visit_add(origin, ts, cur) + 'visit': db.origin_visit_add(origin, date, cur) } @db_transaction() def origin_visit_update(self, origin, visit_id, status, metadata=None, db=None, cur=None): """Update an origin_visit's status. Args: origin: Visited Origin id visit_id: Visit's id status: Visit's new status metadata: Data associated to the visit Returns: None """ return db.origin_visit_update(origin, visit_id, status, metadata, cur) @db_transaction_generator(statement_timeout=500) def origin_visit_get(self, origin, last_visit=None, limit=None, db=None, cur=None): """Retrieve all the origin's visit's information. Args: origin (int): The occurrence's origin (identifier). - last_visit (int): Starting point from which listing the next visits + last_visit: Starting point from which listing the next visits Default to None limit (int): Number of results to return from the last visit. Default to None Yields: List of visits. """ for line in db.origin_visit_get_all( origin, last_visit=last_visit, limit=limit, cur=cur): data = dict(zip(db.origin_visit_get_cols, line)) yield data @db_transaction(statement_timeout=500) def origin_visit_get_by(self, origin, visit, db=None, cur=None): """Retrieve origin visit's information. Args: origin: The occurrence's origin (identifier). Returns: - The information on that particular (origin, visit) + The information on that particular (origin, visit) or None if + it does not exist """ ori_visit = db.origin_visit_get(origin, visit, cur) if not ori_visit: return None return dict(zip(db.origin_visit_get_cols, ori_visit)) @db_transaction(statement_timeout=2000) def object_find_by_sha1_git(self, ids, db=None, cur=None): """Return the objects found with the given ids. Args: ids: a generator of sha1_gits Returns: dict: a mapping from id to the list of objects found. Each object found is itself a dict with keys: - sha1_git: the input id - type: the type of object found - id: the id of the object found - object_id: the numeric id of the object found. """ ret = {id: [] for id in ids} for retval in db.object_find_by_sha1_git(ids, cur=cur): if retval[1]: ret[retval[0]].append(dict(zip(db.object_find_by_sha1_git_cols, retval))) return ret origin_keys = ['id', 'type', 'url'] @db_transaction(statement_timeout=500) def origin_get(self, origin, db=None, cur=None): """Return the origin either identified by its id or its tuple (type, url). Args: origin: dictionary representing the individual origin to find. This dict has either the keys type and url: - type (FIXME: enum TBD): the origin type ('git', 'wget', ...) - url (bytes): the url the origin points to or the id: - id: the origin id Returns: dict: the origin dictionary with the keys: - id: origin's id - type: origin's type - url: origin's url Raises: ValueError: if the keys does not match (url and type) nor id. """ origin_id = origin.get('id') if origin_id: # check lookup per id first ori = db.origin_get(origin_id, cur) elif 'type' in origin and 'url' in origin: # or lookup per type, url ori = db.origin_get_with(origin['type'], origin['url'], cur) else: # unsupported lookup raise ValueError('Origin must have either id or (type and url).') if ori: return dict(zip(self.origin_keys, ori)) return None @db_transaction_generator() def origin_search(self, url_pattern, offset=0, limit=50, regexp=False, with_visit=False, db=None, cur=None): """Search for origins whose urls contain a provided string pattern or match a provided regular expression. The search is performed in a case insensitive way. Args: url_pattern (str): the string pattern to search for in origin urls offset (int): number of found origins to skip before returning results limit (int): the maximum number of found origins to return regexp (bool): if True, consider the provided pattern as a regular expression and return origins whose urls match it with_visit (bool): if True, filter out origins with no visit Returns: An iterable of dict containing origin information as returned by :meth:`swh.storage.storage.Storage.origin_get`. """ for origin in db.origin_search(url_pattern, offset, limit, regexp, with_visit, cur): yield dict(zip(self.origin_keys, origin)) @db_transaction() def _person_add(self, person, db=None, cur=None): """Add a person in storage. Note: Internal function for now, do not use outside of this module. Do not do anything fancy in case a person already exists. Please adapt code if more checks are needed. Args: person: dictionary with keys name and email. Returns: Id of the new person. """ return db.person_add(person) @db_transaction_generator(statement_timeout=500) def person_get(self, person, db=None, cur=None): """Return the persons identified by their ids. Args: person: array of ids. Returns: The array of persons corresponding of the ids. """ for person in db.person_get(person): yield dict(zip(db.person_get_cols, person)) @db_transaction() def origin_add(self, origins, db=None, cur=None): """Add origins to the storage Args: origins: list of dictionaries representing the individual origins, with the following keys: - type: the origin type ('git', 'svn', 'deb', ...) - url (bytes): the url the origin points to Returns: list: given origins as dict updated with their id """ for origin in origins: origin['id'] = self.origin_add_one(origin, db=db, cur=cur) return origins @db_transaction() def origin_add_one(self, origin, db=None, cur=None): """Add origin to the storage Args: origin: dictionary representing the individual origin to add. This dict has the following keys: - type (FIXME: enum TBD): the origin type ('git', 'wget', ...) - url (bytes): the url the origin points to Returns: the id of the added origin, or of the identical one that already exists. """ data = db.origin_get_with(origin['type'], origin['url'], cur) if data: return data[0] return db.origin_add(origin['type'], origin['url'], cur) @db_transaction() def fetch_history_start(self, origin_id, db=None, cur=None): """Add an entry for origin origin_id in fetch_history. Returns the id of the added fetch_history entry """ fetch_history = { 'origin': origin_id, 'date': datetime.datetime.now(tz=datetime.timezone.utc), } return db.create_fetch_history(fetch_history, cur) @db_transaction() def fetch_history_end(self, fetch_history_id, data, db=None, cur=None): """Close the fetch_history entry with id `fetch_history_id`, replacing its data with `data`. """ now = datetime.datetime.now(tz=datetime.timezone.utc) fetch_history = db.get_fetch_history(fetch_history_id, cur) if not fetch_history: raise ValueError('No fetch_history with id %d' % fetch_history_id) fetch_history['duration'] = now - fetch_history['date'] fetch_history.update(data) db.update_fetch_history(fetch_history, cur) @db_transaction() def fetch_history_get(self, fetch_history_id, db=None, cur=None): """Get the fetch_history entry with id `fetch_history_id`. """ return db.get_fetch_history(fetch_history_id, cur) @db_transaction(statement_timeout=500) def stat_counters(self, db=None, cur=None): """compute statistics about the number of tuples in various tables Returns: dict: a dictionary mapping textual labels (e.g., content) to integer values (e.g., the number of tuples in table content) """ return {k: v for (k, v) in db.stat_counters()} @db_transaction() def origin_metadata_add(self, origin_id, ts, provider, tool, metadata, db=None, cur=None): """ Add an origin_metadata for the origin at ts with provenance and metadata. Args: origin_id (int): the origin's id for which the metadata is added ts (datetime): timestamp of the found metadata provider (int): the provider of metadata (ex:'hal') tool (int): tool used to extract metadata metadata (jsonb): the metadata retrieved at the time and location Returns: id (int): the origin_metadata unique id """ if isinstance(ts, str): ts = dateutil.parser.parse(ts) return db.origin_metadata_add(origin_id, ts, provider, tool, metadata, cur) @db_transaction_generator(statement_timeout=500) def origin_metadata_get_by(self, origin_id, provider_type=None, db=None, cur=None): """Retrieve list of all origin_metadata entries for the origin_id Args: origin_id (int): the unique origin identifier provider_type (str): (optional) type of provider Returns: list of dicts: the origin_metadata dictionary with the keys: - - id (int): origin_metadata's id - origin_id (int): origin's id - discovery_date (datetime): timestamp of discovery - tool_id (int): metadata's extracting tool - metadata (jsonb) - provider_id (int): metadata's provider - provider_name (str) - provider_type (str) - provider_url (str) """ for line in db.origin_metadata_get_by(origin_id, provider_type, cur): yield dict(zip(db.origin_metadata_get_cols, line)) @db_transaction_generator() def tool_add(self, tools, db=None, cur=None): """Add new tools to the storage. Args: tools (iterable of :class:`dict`): Tool information to add to storage. Each tool is a :class:`dict` with the following keys: - name (:class:`str`): name of the tool - version (:class:`str`): version of the tool - configuration (:class:`dict`): configuration of the tool, must be json-encodable - Returns: - `iterable` of :class:`dict`: All the tools inserted in storage + Yields: + :class:`dict`: All the tools inserted in storage (including the internal ``id``). The order of the list is not guaranteed to match the order of the initial list. """ db.mktemp_tool(cur) db.copy_to(tools, 'tmp_tool', ['name', 'version', 'configuration'], cur) tools = db.tool_add_from_temp(cur) for line in tools: yield dict(zip(db.tool_cols, line)) @db_transaction(statement_timeout=500) def tool_get(self, tool, db=None, cur=None): """Retrieve tool information. Args: tool (dict): Tool information we want to retrieve from storage. The dicts have the same keys as those used in :func:`tool_add`. Returns: dict: The full tool information if it exists (``id`` included), None otherwise. """ tool_conf = tool['configuration'] if isinstance(tool_conf, dict): tool_conf = json.dumps(tool_conf) idx = db.tool_get(tool['name'], tool['version'], tool_conf) if not idx: return None return dict(zip(db.tool_cols, idx)) @db_transaction() def metadata_provider_add(self, provider_name, provider_type, provider_url, metadata, db=None, cur=None): + """Add a metadata provider. + + Args: + provider_name (str): Its name + provider_type (str): Its type + provider_url (str): Its URL + + Returns: + dict: same as args, plus an 'id' key. + """ return db.metadata_provider_add(provider_name, provider_type, provider_url, metadata, cur) @db_transaction() def metadata_provider_get(self, provider_id, db=None, cur=None): + """Get a metadata provider + + Args: + provider_id: Its identifier, as given by `metadata_provider_add`. + + Returns: + dict: same as `metadata_provider_add`; + or None if it does not exist. + """ result = db.metadata_provider_get(provider_id) if not result: return None return dict(zip(db.metadata_provider_cols, result)) @db_transaction() def metadata_provider_get_by(self, provider, db=None, cur=None): + """Get a metadata provider + + Args: + provider (dict): A dictionary with keys: + * provider_name: Its name + * provider_url: Its URL + + Returns: + dict: same as `metadata_provider_add`; + or None if it does not exist. + """ result = db.metadata_provider_get_by(provider['provider_name'], provider['provider_url']) if not result: return None return dict(zip(db.metadata_provider_cols, result)) def diff_directories(self, from_dir, to_dir, track_renaming=False): """Compute the list of file changes introduced between two arbitrary directories (insertion / deletion / modification / renaming of files). Args: from_dir (bytes): identifier of the directory to compare from to_dir (bytes): identifier of the directory to compare to track_renaming (bool): whether or not to track files renaming Returns: A list of dict describing the introduced file changes (see :func:`swh.storage.algos.diff.diff_directories` for more details). """ return diff.diff_directories(self, from_dir, to_dir, track_renaming) def diff_revisions(self, from_rev, to_rev, track_renaming=False): """Compute the list of file changes introduced between two arbitrary revisions (insertion / deletion / modification / renaming of files). Args: from_rev (bytes): identifier of the revision to compare from to_rev (bytes): identifier of the revision to compare to track_renaming (bool): whether or not to track files renaming Returns: A list of dict describing the introduced file changes (see :func:`swh.storage.algos.diff.diff_directories` for more details). """ return diff.diff_revisions(self, from_rev, to_rev, track_renaming) def diff_revision(self, revision, track_renaming=False): """Compute the list of file changes introduced by a specific revision (insertion / deletion / modification / renaming of files) by comparing it against its first parent. Args: revision (bytes): identifier of the revision from which to compute the list of files changes track_renaming (bool): whether or not to track files renaming Returns: A list of dict describing the introduced file changes (see :func:`swh.storage.algos.diff.diff_directories` for more details). """ return diff.diff_revision(self, revision, track_renaming) diff --git a/swh/storage/tests/algos/test_revisions_walker.py b/swh/storage/tests/algos/test_revisions_walker.py new file mode 100644 index 000000000..d3154a861 --- /dev/null +++ b/swh/storage/tests/algos/test_revisions_walker.py @@ -0,0 +1,367 @@ +# Copyright (C) 2018 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import unittest + +from unittest.mock import patch + +from swh.model.hashutil import hash_to_bytes +from swh.storage.algos.revisions_walker import get_revisions_walker + +# For those tests, we will walk the following revisions history +# with different orderings: +# +# * commit b364f53155044e5308a0f73abb3b5f01995a5b7d +# |\ Merge: 836d498 b94886c +# | | Author: Adam +# | | AuthorDate: Fri Oct 4 12:50:49 2013 +0200 +# | | Commit: Adam +# | | CommitDate: Fri Oct 4 12:50:49 2013 +0200 +# | | +# | | Merge branch 'release/1.0' +# | | +# | * commit b94886c500c46e32dc3d7ebae8a5409accd592e5 +# | | Author: Adam +# | | AuthorDate: Fri Oct 4 12:50:38 2013 +0200 +# | | Commit: Adam +# | | CommitDate: Fri Oct 4 12:50:38 2013 +0200 +# | | +# | | updating poms for 1.0 release +# | | +# | * commit 0cb6b4611d65bee0f57821dac7f611e2f8a02433 +# | | Author: Adam +# | | AuthorDate: Fri Oct 4 12:50:38 2013 +0200 +# | | Commit: Adam +# | | CommitDate: Fri Oct 4 12:50:38 2013 +0200 +# | | +# | | updating poms for 1.0 release +# | | +# | * commit 2b0240c6d682bad51532eec15b8a7ed6b75c8d31 +# | | Author: Adam Janicki +# | | AuthorDate: Fri Oct 4 12:50:22 2013 +0200 +# | | Commit: Adam Janicki +# | | CommitDate: Fri Oct 4 12:50:22 2013 +0200 +# | | +# | | For 1.0 release. Allow untracked. +# | | +# | * commit b401c50863475db4440c85c10ac0b6423b61554d +# | | Author: Adam +# | | AuthorDate: Fri Oct 4 12:48:12 2013 +0200 +# | | Commit: Adam +# | | CommitDate: Fri Oct 4 12:48:12 2013 +0200 +# | | +# | | updating poms for 1.0 release +# | | +# | * commit 9c5051397e5c2e0c258bb639c3dd34406584ca10 +# |/ Author: Adam Janicki +# | AuthorDate: Fri Oct 4 12:47:48 2013 +0200 +# | Commit: Adam Janicki +# | CommitDate: Fri Oct 4 12:47:48 2013 +0200 +# | +# | For 1.0 release. +# | +# * commit 836d498396fb9b5d45c896885f84d8d60a5651dc +# | Author: Adam Janicki +# | AuthorDate: Fri Oct 4 12:08:16 2013 +0200 +# | Commit: Adam Janicki +# | CommitDate: Fri Oct 4 12:08:16 2013 +0200 +# | +# | Add ignores +# | +# * commit ee96c2a2d397b79070d2b6fe3051290963748358 +# | Author: Adam +# | AuthorDate: Fri Oct 4 10:48:16 2013 +0100 +# | Commit: Adam +# | CommitDate: Fri Oct 4 10:48:16 2013 +0100 +# | +# | Reset author +# | +# * commit 8f89dda8e072383cf50d42532ae8f52ad89f8fdf +# Author: Adam +# AuthorDate: Fri Oct 4 02:20:19 2013 -0700 +# Commit: Adam +# CommitDate: Fri Oct 4 02:20:19 2013 -0700 +# +# Initial commit + +# raw dump of the above history in swh format +_revisions_list = \ +[{'author': {'email': b'adam.janicki@roche.com', # noqa + 'fullname': b'Adam ', + 'id': 8040905, + 'name': b'Adam'}, + 'committer': {'email': b'adam.janicki@roche.com', + 'fullname': b'Adam ', + 'id': 8040905, + 'name': b'Adam'}, + 'committer_date': {'negative_utc': None, + 'offset': 120, + 'timestamp': {'microseconds': 0, 'seconds': 1380883849}}, + 'date': {'negative_utc': None, + 'offset': 120, + 'timestamp': {'microseconds': 0, 'seconds': 1380883849}}, + 'directory': b'\xefX\xe7\xa6\\\xda\xdf\xfdH\xdbH\xfbq\x96@{\x98?9\xfe', + 'id': b'\xb3d\xf51U\x04NS\x08\xa0\xf7:\xbb;_\x01\x99Z[}', + 'message': b"Merge branch 'release/1.0'", + 'metadata': None, + 'parents': [b'\x83mI\x83\x96\xfb\x9b]E\xc8\x96\x88_\x84\xd8\xd6\nVQ\xdc', + b'\xb9H\x86\xc5\x00\xc4n2\xdc=~\xba\xe8\xa5@\x9a\xcc\xd5\x92\xe5'], # noqa + 'synthetic': False, + 'type': 'git'}, + {'author': {'email': b'adam.janicki@roche.com', + 'fullname': b'Adam ', + 'id': 8040905, + 'name': b'Adam'}, + 'committer': {'email': b'adam.janicki@roche.com', + 'fullname': b'Adam ', + 'id': 8040905, + 'name': b'Adam'}, + 'committer_date': {'negative_utc': None, + 'offset': 120, + 'timestamp': {'microseconds': 0, 'seconds': 1380883838}}, + 'date': {'negative_utc': None, + 'offset': 120, + 'timestamp': {'microseconds': 0, 'seconds': 1380883838}}, + 'directory': b'\xefX\xe7\xa6\\\xda\xdf\xfdH\xdbH\xfbq\x96@{\x98?9\xfe', + 'id': b'\xb9H\x86\xc5\x00\xc4n2\xdc=~\xba\xe8\xa5@\x9a\xcc\xd5\x92\xe5', + 'message': b'updating poms for 1.0 release', + 'metadata': None, + 'parents': [b'\x0c\xb6\xb4a\x1de\xbe\xe0\xf5x!\xda\xc7\xf6\x11\xe2\xf8\xa0$3'], # noqa + 'synthetic': False, + 'type': 'git'}, + {'author': {'email': b'adam.janicki@roche.com', + 'fullname': b'Adam ', + 'id': 8040905, + 'name': b'Adam'}, + 'committer': {'email': b'adam.janicki@roche.com', + 'fullname': b'Adam ', + 'id': 8040905, + 'name': b'Adam'}, + 'committer_date': {'negative_utc': None, + 'offset': 120, + 'timestamp': {'microseconds': 0, 'seconds': 1380883838}}, + 'date': {'negative_utc': None, + 'offset': 120, + 'timestamp': {'microseconds': 0, 'seconds': 1380883838}}, + 'directory': b'\xefX\xe7\xa6\\\xda\xdf\xfdH\xdbH\xfbq\x96@{\x98?9\xfe', + 'id': b'\x0c\xb6\xb4a\x1de\xbe\xe0\xf5x!\xda\xc7\xf6\x11\xe2\xf8\xa0$3', + 'message': b'updating poms for 1.0 release', + 'metadata': None, + 'parents': [b'+\x02@\xc6\xd6\x82\xba\xd5\x152\xee\xc1[\x8a~\xd6\xb7\\\x8d1'], + 'synthetic': False, + 'type': 'git'}, + {'author': {'email': b'janickia', + 'fullname': b'Adam Janicki ', + 'id': 8040906, + 'name': b'Adam Janicki'}, + 'committer': {'email': b'janickia', + 'fullname': b'Adam Janicki ', + 'id': 8040906, + 'name': b'Adam Janicki'}, + 'committer_date': {'negative_utc': None, + 'offset': 120, + 'timestamp': {'microseconds': 0, 'seconds': 1380883822}}, + 'date': {'negative_utc': None, + 'offset': 120, + 'timestamp': {'microseconds': 0, 'seconds': 1380883822}}, + 'directory': b'\xefX\xe7\xa6\\\xda\xdf\xfdH\xdbH\xfbq\x96@{\x98?9\xfe', + 'id': b'+\x02@\xc6\xd6\x82\xba\xd5\x152\xee\xc1[\x8a~\xd6\xb7\\\x8d1', + 'message': b'For 1.0 release. Allow untracked.\n', + 'metadata': None, + 'parents': [b'\xb4\x01\xc5\x08cG]\xb4D\x0c\x85\xc1\n\xc0\xb6B;aUM'], + 'synthetic': False, + 'type': 'git'}, + {'author': {'email': b'adam.janicki@roche.com', + 'fullname': b'Adam ', + 'id': 8040905, + 'name': b'Adam'}, + 'committer': {'email': b'adam.janicki@roche.com', + 'fullname': b'Adam ', + 'id': 8040905, + 'name': b'Adam'}, + 'committer_date': {'negative_utc': None, + 'offset': 120, + 'timestamp': {'microseconds': 0, 'seconds': 1380883692}}, + 'date': {'negative_utc': None, + 'offset': 120, + 'timestamp': {'microseconds': 0, 'seconds': 1380883692}}, + 'directory': b'd@\xe7\x143w\xcb\xf7\xad\xae\x91\xd5\xec\xd8\x95\x82' + b'\x02\xa6=\x1b', + 'id': b'\xb4\x01\xc5\x08cG]\xb4D\x0c\x85\xc1\n\xc0\xb6B;aUM', + 'message': b'updating poms for 1.0 release', + 'metadata': None, + 'parents': [b'\x9cPQ9~\\.\x0c%\x8b\xb69\xc3\xdd4@e\x84\xca\x10'], + 'synthetic': False, + 'type': 'git'}, + {'author': {'email': b'janickia', + 'fullname': b'Adam Janicki ', + 'id': 8040906, + 'name': b'Adam Janicki'}, + 'committer': {'email': b'janickia', + 'fullname': b'Adam Janicki ', + 'id': 8040906, + 'name': b'Adam Janicki'}, + 'committer_date': {'negative_utc': None, + 'offset': 120, + 'timestamp': {'microseconds': 0, 'seconds': 1380883668}}, + 'date': {'negative_utc': None, + 'offset': 120, + 'timestamp': {'microseconds': 0, 'seconds': 1380883668}}, + 'directory': b'\n\x857\x94r\xbe\xcc\x04=\xe9}\xe5\xfd\xdf?nR\xe6\xa7\x9e', + 'id': b'\x9cPQ9~\\.\x0c%\x8b\xb69\xc3\xdd4@e\x84\xca\x10', + 'message': b'For 1.0 release.\n', + 'metadata': None, + 'parents': [b'\x83mI\x83\x96\xfb\x9b]E\xc8\x96\x88_\x84\xd8\xd6\nVQ\xdc'], + 'synthetic': False, + 'type': 'git'}, + {'author': {'email': b'janickia', + 'fullname': b'Adam Janicki ', + 'id': 8040906, + 'name': b'Adam Janicki'}, + 'committer': {'email': b'janickia', + 'fullname': b'Adam Janicki ', + 'id': 8040906, + 'name': b'Adam Janicki'}, + 'committer_date': {'negative_utc': None, + 'offset': 120, + 'timestamp': {'microseconds': 0, 'seconds': 1380881296}}, + 'date': {'negative_utc': None, + 'offset': 120, + 'timestamp': {'microseconds': 0, 'seconds': 1380881296}}, + 'directory': b'.\xf9\xa5\xcb\xb0\xd3\xdc\x9b{\xb8\x81\x03l\xe2P\x16c\x0b|\xe6', # noqa + 'id': b'\x83mI\x83\x96\xfb\x9b]E\xc8\x96\x88_\x84\xd8\xd6\nVQ\xdc', + 'message': b'Add ignores\n', + 'metadata': None, + 'parents': [b'\xee\x96\xc2\xa2\xd3\x97\xb7\x90p\xd2\xb6\xfe0Q)\tct\x83X'], + 'synthetic': False, + 'type': 'git'}, + {'author': {'email': b'adam.janicki@roche.com', + 'fullname': b'Adam ', + 'id': 8040905, + 'name': b'Adam'}, + 'committer': {'email': b'adam.janicki@roche.com', + 'fullname': b'Adam ', + 'id': 8040905, + 'name': b'Adam'}, + 'committer_date': {'negative_utc': None, + 'offset': 60, + 'timestamp': {'microseconds': 0, 'seconds': 1380880096}}, + 'date': {'negative_utc': None, + 'offset': 60, + 'timestamp': {'microseconds': 0, 'seconds': 1380880096}}, + 'directory': b'\xc7r\xc4\x9f\xc0$\xd4\xab\xff\xcb]\xf6<\xcb\x8b~\xec\xc4\xd1)', # noqa + 'id': b'\xee\x96\xc2\xa2\xd3\x97\xb7\x90p\xd2\xb6\xfe0Q)\tct\x83X', + 'message': b'Reset author\n', + 'metadata': None, + 'parents': [b'\x8f\x89\xdd\xa8\xe0r8<\xf5\rBS*\xe8\xf5*\xd8\x9f\x8f\xdf'], + 'synthetic': False, + 'type': 'git'}, + {'author': {'email': b'adam.janicki@roche.com', + 'fullname': b'Adam ', + 'id': 8040905, + 'name': b'Adam'}, + 'committer': {'email': b'adam.janicki@roche.com', + 'fullname': b'Adam ', + 'id': 8040905, + 'name': b'Adam'}, + 'committer_date': {'negative_utc': None, + 'offset': -420, + 'timestamp': {'microseconds': 0, 'seconds': 1380878419}}, + 'date': {'negative_utc': None, + 'offset': -420, + 'timestamp': {'microseconds': 0, 'seconds': 1380878419}}, + 'directory': b'WS\xbaX\xd6x{q\x8f\x020i\xc5\x95\xa01\xf7y\xb2\x80', + 'id': b'\x8f\x89\xdd\xa8\xe0r8<\xf5\rBS*\xe8\xf5*\xd8\x9f\x8f\xdf', + 'message': b'Initial commit\n', + 'metadata': None, + 'parents': [], + 'synthetic': False, + 'type': 'git'}] + + +_rev_start = 'b364f53155044e5308a0f73abb3b5f01995a5b7d' + + +class RevisionsWalkerTest(unittest.TestCase): + + @patch('swh.storage.storage.Storage') + def check_revisions_ordering(self, rev_walker_type, expected_result, + MockStorage): + storage = MockStorage() + storage.revision_log.return_value = _revisions_list + + revs_walker = \ + get_revisions_walker(rev_walker_type, storage, + hash_to_bytes(_rev_start)) + + self.assertEqual(list(map(hash_to_bytes, expected_result)), + [rev['id'] for rev in revs_walker]) + + def test_revisions_walker_committer_date(self): + + # revisions should be returned in reverse chronological order + # of their committer date + expected_result = ['b364f53155044e5308a0f73abb3b5f01995a5b7d', + 'b94886c500c46e32dc3d7ebae8a5409accd592e5', + '0cb6b4611d65bee0f57821dac7f611e2f8a02433', + '2b0240c6d682bad51532eec15b8a7ed6b75c8d31', + 'b401c50863475db4440c85c10ac0b6423b61554d', + '9c5051397e5c2e0c258bb639c3dd34406584ca10', + '836d498396fb9b5d45c896885f84d8d60a5651dc', + 'ee96c2a2d397b79070d2b6fe3051290963748358', + '8f89dda8e072383cf50d42532ae8f52ad89f8fdf'] + + self.check_revisions_ordering('committer_date', expected_result) + + def test_revisions_walker_dfs(self): + + # revisions should be returned in the same order they are + # visited when performing a depth-first search in pre order + # on the revisions DAG + expected_result = ['b364f53155044e5308a0f73abb3b5f01995a5b7d', + '836d498396fb9b5d45c896885f84d8d60a5651dc', + 'ee96c2a2d397b79070d2b6fe3051290963748358', + '8f89dda8e072383cf50d42532ae8f52ad89f8fdf', + 'b94886c500c46e32dc3d7ebae8a5409accd592e5', + '0cb6b4611d65bee0f57821dac7f611e2f8a02433', + '2b0240c6d682bad51532eec15b8a7ed6b75c8d31', + 'b401c50863475db4440c85c10ac0b6423b61554d', + '9c5051397e5c2e0c258bb639c3dd34406584ca10'] + + self.check_revisions_ordering('dfs', expected_result) + + def test_revisions_walker_dfs_post(self): + + # revisions should be returned in the same order they are + # visited when performing a depth-first search in post order + # on the revisions DAG + expected_result = ['b364f53155044e5308a0f73abb3b5f01995a5b7d', + 'b94886c500c46e32dc3d7ebae8a5409accd592e5', + '0cb6b4611d65bee0f57821dac7f611e2f8a02433', + '2b0240c6d682bad51532eec15b8a7ed6b75c8d31', + 'b401c50863475db4440c85c10ac0b6423b61554d', + '9c5051397e5c2e0c258bb639c3dd34406584ca10', + '836d498396fb9b5d45c896885f84d8d60a5651dc', + 'ee96c2a2d397b79070d2b6fe3051290963748358', + '8f89dda8e072383cf50d42532ae8f52ad89f8fdf'] + + self.check_revisions_ordering('dfs_post', expected_result) + + def test_revisions_walker_bfs(self): + + # revisions should be returned in the same order they are + # visited when performing a breadth-first search on the + # revisions DAG + expected_result = ['b364f53155044e5308a0f73abb3b5f01995a5b7d', + '836d498396fb9b5d45c896885f84d8d60a5651dc', + 'b94886c500c46e32dc3d7ebae8a5409accd592e5', + 'ee96c2a2d397b79070d2b6fe3051290963748358', + '0cb6b4611d65bee0f57821dac7f611e2f8a02433', + '8f89dda8e072383cf50d42532ae8f52ad89f8fdf', + '2b0240c6d682bad51532eec15b8a7ed6b75c8d31', + 'b401c50863475db4440c85c10ac0b6423b61554d', + '9c5051397e5c2e0c258bb639c3dd34406584ca10'] + + self.check_revisions_ordering('bfs', expected_result) diff --git a/swh/storage/tests/algos/test_snapshot.py b/swh/storage/tests/algos/test_snapshot.py index ceff24376..0035e3341 100644 --- a/swh/storage/tests/algos/test_snapshot.py +++ b/swh/storage/tests/algos/test_snapshot.py @@ -1,128 +1,128 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest -from nose.plugins.attrib import attr +import pytest from hypothesis import given, settings from hypothesis.strategies import (binary, composite, datetimes, dictionaries, from_regex, none, one_of, sampled_from) from swh.model.identifiers import snapshot_identifier, identifier_to_bytes from swh.storage.tests.storage_testing import StorageTestFixture from swh.storage.algos.snapshot import snapshot_get_all_branches def branch_names(): return binary(min_size=5, max_size=10) @composite def branch_targets_object(draw): return { 'target': draw(binary(min_size=20, max_size=20)), 'target_type': draw( sampled_from([ 'content', 'directory', 'revision', 'release', 'snapshot', ]) ), } @composite def branch_targets_alias(draw): return { 'target': draw(branch_names()), 'target_type': 'alias', } def branch_targets(*, only_objects=False): if only_objects: return branch_targets_object() else: return one_of(none(), branch_targets_alias(), branch_targets_object()) @composite def snapshots(draw, *, min_size=0, max_size=100, only_objects=False): branches = draw(dictionaries( keys=branch_names(), values=branch_targets(only_objects=only_objects), min_size=min_size, max_size=max_size, )) if not only_objects: # Make sure aliases point to actual branches unresolved_aliases = { target['target'] for target in branches.values() if (target and target['target_type'] == 'alias' and target['target'] not in branches) } for alias in unresolved_aliases: branches[alias] = draw(branch_targets(only_objects=True)) ret = { 'branches': branches, } ret['id'] = identifier_to_bytes(snapshot_identifier(ret)) return ret @composite def urls(draw): protocol = draw(sampled_from(['git', 'http', 'https', 'deb'])) domain = draw(from_regex(r'\A([a-z]([a-z0-9-]*)\.){1,3}[a-z0-9]+\Z')) return '%s://%s' % (protocol, domain) @composite def origins(draw): return { 'type': draw(sampled_from(['git', 'hg', 'svn', 'pypi', 'deb'])), 'url': draw(urls()), } -@attr('db') +@pytest.mark.db class TestSnapshotAllBranches(StorageTestFixture, unittest.TestCase): @given(origins(), datetimes(), snapshots(min_size=0, max_size=10, only_objects=False)) def test_snapshot_small(self, origin, ts, snapshot): origin_id = self.storage.origin_add_one(origin) visit = self.storage.origin_visit_add(origin_id, ts) self.storage.snapshot_add(origin_id, visit['visit'], snapshot) returned_snapshot = snapshot_get_all_branches(self.storage, snapshot['id']) - self.assertEquals(snapshot, returned_snapshot) + self.assertEqual(snapshot, returned_snapshot) - @settings(max_examples=5, deadline=1000) + @settings(max_examples=5, deadline=5000) @given(origins(), datetimes(), branch_names(), branch_targets(only_objects=True)) def test_snapshot_large(self, origin, ts, branch_name, branch_target): origin_id = self.storage.origin_add_one(origin) visit = self.storage.origin_visit_add(origin_id, ts) snapshot = { 'branches': { b'%s%05d' % (branch_name, i): branch_target for i in range(10000) } } snapshot['id'] = identifier_to_bytes(snapshot_identifier(snapshot)) self.storage.snapshot_add(origin_id, visit['visit'], snapshot) returned_snapshot = snapshot_get_all_branches(self.storage, snapshot['id']) - self.assertEquals(snapshot, returned_snapshot) + self.assertEqual(snapshot, returned_snapshot) diff --git a/swh/storage/tests/test_api_client.py b/swh/storage/tests/test_api_client.py index 6735541e5..bd59f503b 100644 --- a/swh/storage/tests/test_api_client.py +++ b/swh/storage/tests/test_api_client.py @@ -1,54 +1,55 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import shutil import tempfile import unittest from swh.core.tests.server_testing import ServerTestFixture from swh.storage.api.client import RemoteStorage from swh.storage.api.server import app -from swh.storage.tests.test_storage import CommonTestStorage +from swh.storage.tests.test_storage import CommonTestStorage, \ + StorageTestDbFixture class TestRemoteStorage(CommonTestStorage, ServerTestFixture, - unittest.TestCase): + StorageTestDbFixture, unittest.TestCase): """Test the remote storage API. This class doesn't define any tests as we want identical functionality between local and remote storage. All the tests are therefore defined in CommonTestStorage. """ def setUp(self): # ServerTestFixture needs to have self.objroot for # setUp() method, but this field is defined in # AbstractTestStorage's setUp() # To avoid confusion, override the self.objroot to a # one chosen in this class. self.storage_base = tempfile.mkdtemp() self.config = { 'storage': { 'cls': 'local', 'args': { 'db': 'dbname=%s' % self.TEST_DB_NAME, 'objstorage': { 'cls': 'pathslicing', 'args': { 'root': self.storage_base, 'slicing': '0:2', }, }, } } } self.app = app super().setUp() self.storage = RemoteStorage(self.url()) self.objroot = self.storage_base def tearDown(self): super().tearDown() shutil.rmtree(self.storage_base) diff --git a/swh/storage/tests/test_converters.py b/swh/storage/tests/test_converters.py index dc8137ae2..7a1bf6223 100644 --- a/swh/storage/tests/test_converters.py +++ b/swh/storage/tests/test_converters.py @@ -1,125 +1,122 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest -from nose.plugins.attrib import attr - from swh.storage import converters -@attr('!db') class TestConverters(unittest.TestCase): def setUp(self): self.maxDiff = None def test_db_to_author(self): # when actual_author = converters.db_to_author( 1, b'fullname', b'name', b'email') # then - self.assertEquals(actual_author, { + self.assertEqual(actual_author, { 'id': 1, 'fullname': b'fullname', 'name': b'name', 'email': b'email', }) def test_db_to_revision(self): # when actual_revision = converters.db_to_revision({ 'id': 'revision-id', 'date': None, 'date_offset': None, 'date_neg_utc_offset': None, 'committer_date': None, 'committer_date_offset': None, 'committer_date_neg_utc_offset': None, 'type': 'rev', 'directory': b'dir-sha1', 'message': b'commit message', 'author_id': 'auth-id', 'author_fullname': b'auth-fullname', 'author_name': b'auth-name', 'author_email': b'auth-email', 'committer_id': 'comm-id', 'committer_fullname': b'comm-fullname', 'committer_name': b'comm-name', 'committer_email': b'comm-email', 'metadata': {}, 'synthetic': False, 'parents': [123, 456] }) # then - self.assertEquals(actual_revision, { + self.assertEqual(actual_revision, { 'id': 'revision-id', 'author': { 'id': 'auth-id', 'fullname': b'auth-fullname', 'name': b'auth-name', 'email': b'auth-email', }, 'date': None, 'committer': { 'id': 'comm-id', 'fullname': b'comm-fullname', 'name': b'comm-name', 'email': b'comm-email', }, 'committer_date': None, 'type': 'rev', 'directory': b'dir-sha1', 'message': b'commit message', 'metadata': {}, 'synthetic': False, 'parents': [123, 456], }) def test_db_to_release(self): # when actual_release = converters.db_to_release({ 'id': b'release-id', 'target': b'revision-id', 'target_type': 'revision', 'date': None, 'date_offset': None, 'date_neg_utc_offset': None, 'name': b'release-name', 'comment': b'release comment', 'synthetic': True, 'author_id': 'auth-id', 'author_fullname': b'auth-fullname', 'author_name': b'auth-name', 'author_email': b'auth-email', }) # then - self.assertEquals(actual_release, { + self.assertEqual(actual_release, { 'author': { 'id': 'auth-id', 'fullname': b'auth-fullname', 'name': b'auth-name', 'email': b'auth-email', }, 'date': None, 'id': b'release-id', 'name': b'release-name', 'message': b'release comment', 'synthetic': True, 'target': b'revision-id', 'target_type': 'revision' }) def test_db_to_git_headers(self): raw_data = [ ['gpgsig', b'garbage\x89a\x43b\x14'], ['extra', [b'fo\\\\\\o', b'bar\\', b'inval\\\\\x99id']], ] db_data = converters.git_headers_to_db(raw_data) loop = converters.db_to_git_headers(db_data) - self.assertEquals(raw_data, loop) + self.assertEqual(raw_data, loop) diff --git a/swh/storage/tests/test_db.py b/swh/storage/tests/test_db.py index 5b3c2ec0e..4e71726d5 100644 --- a/swh/storage/tests/test_db.py +++ b/swh/storage/tests/test_db.py @@ -1,50 +1,50 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import unittest -from nose.plugins.attrib import attr +import pytest from swh.core.tests.db_testing import SingleDbTestFixture from swh.model.hashutil import hash_to_bytes from swh.storage.db import Db from . import SQL_DIR -@attr('db') +@pytest.mark.db class TestDb(SingleDbTestFixture, unittest.TestCase): TEST_DB_NAME = 'softwareheritage-test-storage' TEST_DB_DUMP = os.path.join(SQL_DIR, '*.sql') def setUp(self): super().setUp() self.db = Db(self.conn) def tearDown(self): self.db.conn.close() super().tearDown() def test_add_content(self): cur = self.cursor sha1 = hash_to_bytes('34973274ccef6ab4dfaaf86599792fa9c3fe4689') self.db.mktemp('content', cur) self.db.copy_to([{ 'sha1': sha1, 'sha1_git': hash_to_bytes( 'd81cc0710eb6cf9efd5b920a8453e1e07157b6cd'), 'sha256': hash_to_bytes( '673650f936cb3b0a2f93ce09d81be107' '48b1b203c19e8176b4eefc1964a0cf3a'), 'blake2s256': hash_to_bytes('69217a3079908094e11121d042354a7c' '1f55b6482ca1a51e1b250dfd1ed0eef9'), 'length': 3}], 'tmp_content', ['sha1', 'sha1_git', 'sha256', 'blake2s256', 'length'], cur) self.db.content_add_from_temp(cur) self.cursor.execute('SELECT sha1 FROM content WHERE sha1 = %s', (sha1,)) self.assertEqual(self.cursor.fetchone()[0].tobytes(), sha1) diff --git a/swh/storage/tests/test_storage.py b/swh/storage/tests/test_storage.py index e002e0e7c..331d7a499 100644 --- a/swh/storage/tests/test_storage.py +++ b/swh/storage/tests/test_storage.py @@ -1,1982 +1,1988 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import copy import datetime import unittest from collections import defaultdict from operator import itemgetter from unittest.mock import Mock, patch import psycopg2 -from nose.plugins.attrib import attr +import pytest -from swh.core.tests.db_testing import DbTestFixture from swh.model import from_disk, identifiers from swh.model.hashutil import hash_to_bytes from swh.storage.tests.storage_testing import StorageTestFixture -@attr('db') -class BaseTestStorage(StorageTestFixture): +@pytest.mark.db +class StorageTestDbFixture(StorageTestFixture): def setUp(self): super().setUp() db = self.test_db[self.TEST_DB_NAME] self.conn = db.conn self.cursor = db.cursor self.maxDiff = None + def tearDown(self): + self.reset_storage_tables() + super().tearDown() + + +class TestStorageData: + def setUp(self): + super().setUp() + self.cont = { 'data': b'42\n', 'length': 3, 'sha1': hash_to_bytes( '34973274ccef6ab4dfaaf86599792fa9c3fe4689'), 'sha1_git': hash_to_bytes( 'd81cc0710eb6cf9efd5b920a8453e1e07157b6cd'), 'sha256': hash_to_bytes( '673650f936cb3b0a2f93ce09d81be107' '48b1b203c19e8176b4eefc1964a0cf3a'), 'blake2s256': hash_to_bytes('d5fe1939576527e42cfd76a9455a2' '432fe7f56669564577dd93c4280e76d661d'), 'status': 'visible', } self.cont2 = { 'data': b'4242\n', 'length': 5, 'sha1': hash_to_bytes( '61c2b3a30496d329e21af70dd2d7e097046d07b7'), 'sha1_git': hash_to_bytes( '36fade77193cb6d2bd826161a0979d64c28ab4fa'), 'sha256': hash_to_bytes( '859f0b154fdb2d630f45e1ecae4a8629' '15435e663248bb8461d914696fc047cd'), 'blake2s256': hash_to_bytes('849c20fad132b7c2d62c15de310adfe87be' '94a379941bed295e8141c6219810d'), 'status': 'visible', } self.cont3 = { 'data': b'424242\n', 'length': 7, 'sha1': hash_to_bytes( '3e21cc4942a4234c9e5edd8a9cacd1670fe59f13'), 'sha1_git': hash_to_bytes( 'c932c7649c6dfa4b82327d121215116909eb3bea'), 'sha256': hash_to_bytes( '92fb72daf8c6818288a35137b72155f5' '07e5de8d892712ab96277aaed8cf8a36'), 'blake2s256': hash_to_bytes('76d0346f44e5a27f6bafdd9c2befd304af' 'f83780f93121d801ab6a1d4769db11'), 'status': 'visible', } self.missing_cont = { 'data': b'missing\n', 'length': 8, 'sha1': hash_to_bytes( 'f9c24e2abb82063a3ba2c44efd2d3c797f28ac90'), 'sha1_git': hash_to_bytes( '33e45d56f88993aae6a0198013efa80716fd8919'), 'sha256': hash_to_bytes( '6bbd052ab054ef222c1c87be60cd191a' 'ddedd24cc882d1f5f7f7be61dc61bb3a'), 'blake2s256': hash_to_bytes('306856b8fd879edb7b6f1aeaaf8db9bbecc9' '93cd7f776c333ac3a782fa5c6eba'), 'status': 'absent', } self.skipped_cont = { 'length': 1024 * 1024 * 200, 'sha1_git': hash_to_bytes( '33e45d56f88993aae6a0198013efa80716fd8920'), 'sha1': hash_to_bytes( '43e45d56f88993aae6a0198013efa80716fd8920'), 'sha256': hash_to_bytes( '7bbd052ab054ef222c1c87be60cd191a' 'ddedd24cc882d1f5f7f7be61dc61bb3a'), 'blake2s256': hash_to_bytes( 'ade18b1adecb33f891ca36664da676e1' '2c772cc193778aac9a137b8dc5834b9b'), 'reason': 'Content too long', 'status': 'absent', } self.skipped_cont2 = { 'length': 1024 * 1024 * 300, 'sha1_git': hash_to_bytes( '44e45d56f88993aae6a0198013efa80716fd8921'), 'sha1': hash_to_bytes( '54e45d56f88993aae6a0198013efa80716fd8920'), 'sha256': hash_to_bytes( '8cbd052ab054ef222c1c87be60cd191a' 'ddedd24cc882d1f5f7f7be61dc61bb3a'), 'blake2s256': hash_to_bytes( '9ce18b1adecb33f891ca36664da676e1' '2c772cc193778aac9a137b8dc5834b9b'), 'reason': 'Content too long', 'status': 'absent', } self.dir = { 'id': b'4\x013\x422\x531\x000\xf51\xe62\xa73\xff7\xc3\xa90', 'entries': [ { 'name': b'foo', 'type': 'file', 'target': self.cont['sha1_git'], 'perms': from_disk.DentryPerms.content, }, { 'name': b'bar\xc3', 'type': 'dir', 'target': b'12345678901234567890', 'perms': from_disk.DentryPerms.directory, }, ], } self.dir2 = { 'id': b'4\x013\x422\x531\x000\xf51\xe62\xa73\xff7\xc3\xa95', 'entries': [ { 'name': b'oof', 'type': 'file', 'target': self.cont2['sha1_git'], 'perms': from_disk.DentryPerms.content, } ], } self.dir3 = { 'id': hash_to_bytes('33e45d56f88993aae6a0198013efa80716fd8921'), 'entries': [ { 'name': b'foo', 'type': 'file', 'target': self.cont['sha1_git'], 'perms': from_disk.DentryPerms.content, }, { 'name': b'bar', 'type': 'dir', 'target': b'12345678901234560000', 'perms': from_disk.DentryPerms.directory, }, { 'name': b'hello', 'type': 'file', 'target': b'12345678901234567890', 'perms': from_disk.DentryPerms.content, }, ], } self.minus_offset = datetime.timezone(datetime.timedelta(minutes=-120)) self.plus_offset = datetime.timezone(datetime.timedelta(minutes=120)) self.revision = { 'id': b'56789012345678901234', 'message': b'hello', 'author': { 'name': b'Nicolas Dandrimont', 'email': b'nicolas@example.com', 'fullname': b'Nicolas Dandrimont ', }, 'date': { 'timestamp': 1234567890, 'offset': 120, 'negative_utc': None, }, 'committer': { 'name': b'St\xc3fano Zacchiroli', 'email': b'stefano@example.com', 'fullname': b'St\xc3fano Zacchiroli ' }, 'committer_date': { 'timestamp': 1123456789, 'offset': 0, 'negative_utc': True, }, 'parents': [b'01234567890123456789', b'23434512345123456789'], 'type': 'git', 'directory': self.dir['id'], 'metadata': { 'checksums': { 'sha1': 'tarball-sha1', 'sha256': 'tarball-sha256', }, 'signed-off-by': 'some-dude', 'extra_headers': [ ['gpgsig', b'test123'], ['mergetags', [b'foo\\bar', b'\x22\xaf\x89\x80\x01\x00']], ], }, 'synthetic': True } self.revision2 = { 'id': b'87659012345678904321', 'message': b'hello again', 'author': { 'name': b'Roberto Dicosmo', 'email': b'roberto@example.com', 'fullname': b'Roberto Dicosmo ', }, 'date': { 'timestamp': { 'seconds': 1234567843, 'microseconds': 220000, }, 'offset': -720, 'negative_utc': None, }, 'committer': { 'name': b'tony', 'email': b'ar@dumont.fr', 'fullname': b'tony ', }, 'committer_date': { 'timestamp': 1123456789, 'offset': 0, 'negative_utc': False, }, 'parents': [b'01234567890123456789'], 'type': 'git', 'directory': self.dir2['id'], 'metadata': None, 'synthetic': False } self.revision3 = { 'id': hash_to_bytes('7026b7c1a2af56521e951c01ed20f255fa054238'), 'message': b'a simple revision with no parents this time', 'author': { 'name': b'Roberto Dicosmo', 'email': b'roberto@example.com', 'fullname': b'Roberto Dicosmo ', }, 'date': { 'timestamp': { 'seconds': 1234567843, 'microseconds': 220000, }, 'offset': -720, 'negative_utc': None, }, 'committer': { 'name': b'tony', 'email': b'ar@dumont.fr', 'fullname': b'tony ', }, 'committer_date': { 'timestamp': 1127351742, 'offset': 0, 'negative_utc': False, }, 'parents': [], 'type': 'git', 'directory': self.dir2['id'], 'metadata': None, 'synthetic': True } self.revision4 = { 'id': hash_to_bytes('368a48fe15b7db2383775f97c6b247011b3f14f4'), 'message': b'parent of self.revision2', 'author': { 'name': b'me', 'email': b'me@soft.heri', 'fullname': b'me ', }, 'date': { 'timestamp': { 'seconds': 1244567843, 'microseconds': 220000, }, 'offset': -720, 'negative_utc': None, }, 'committer': { 'name': b'committer-dude', 'email': b'committer@dude.com', 'fullname': b'committer-dude ', }, 'committer_date': { 'timestamp': { 'seconds': 1244567843, 'microseconds': 220000, }, 'offset': -720, 'negative_utc': None, }, 'parents': [self.revision3['id']], 'type': 'git', 'directory': self.dir['id'], 'metadata': None, 'synthetic': False } self.origin = { 'url': 'file:///dev/null', 'type': 'git', } self.origin2 = { 'url': 'file:///dev/zero', 'type': 'git', } self.provider = { 'name': 'hal', 'type': 'deposit-client', 'url': 'http:///hal/inria', 'metadata': { 'location': 'France' } } self.metadata_tool = { 'name': 'swh-deposit', 'version': '0.0.1', 'configuration': { 'sword_version': '2' } } self.origin_metadata = { 'origin': self.origin, 'discovery_date': datetime.datetime(2015, 1, 1, 23, 0, 0, tzinfo=datetime.timezone.utc), 'provider': self.provider, 'tool': 'swh-deposit', 'metadata': { 'name': 'test_origin_metadata', 'version': '0.0.1' } } self.origin_metadata2 = { 'origin': self.origin, 'discovery_date': datetime.datetime(2017, 1, 1, 23, 0, 0, tzinfo=datetime.timezone.utc), 'provider': self.provider, 'tool': 'swh-deposit', 'metadata': { 'name': 'test_origin_metadata', 'version': '0.0.1' } } self.date_visit1 = datetime.datetime(2015, 1, 1, 23, 0, 0, tzinfo=datetime.timezone.utc) self.date_visit2 = datetime.datetime(2017, 1, 1, 23, 0, 0, tzinfo=datetime.timezone.utc) self.date_visit3 = datetime.datetime(2018, 1, 1, 23, 0, 0, tzinfo=datetime.timezone.utc) self.release = { 'id': b'87659012345678901234', 'name': b'v0.0.1', 'author': { 'name': b'olasd', 'email': b'nic@olasd.fr', 'fullname': b'olasd ', }, 'date': { 'timestamp': 1234567890, 'offset': 42, 'negative_utc': None, }, 'target': b'43210987654321098765', 'target_type': 'revision', 'message': b'synthetic release', 'synthetic': True, } self.release2 = { 'id': b'56789012348765901234', 'name': b'v0.0.2', 'author': { 'name': b'tony', 'email': b'ar@dumont.fr', 'fullname': b'tony ', }, 'date': { 'timestamp': 1634366813, 'offset': -120, 'negative_utc': None, }, 'target': b'432109\xa9765432\xc309\x00765', 'target_type': 'revision', 'message': b'v0.0.2\nMisc performance improvements + bug fixes', 'synthetic': False } self.release3 = { 'id': b'87659012345678904321', 'name': b'v0.0.2', 'author': { 'name': b'tony', 'email': b'tony@ardumont.fr', 'fullname': b'tony ', }, 'date': { 'timestamp': 1634336813, 'offset': 0, 'negative_utc': False, }, 'target': self.revision2['id'], 'target_type': 'revision', 'message': b'yet another synthetic release', 'synthetic': True, } self.fetch_history_date = datetime.datetime( 2015, 1, 2, 21, 0, 0, tzinfo=datetime.timezone.utc) self.fetch_history_end = datetime.datetime( 2015, 1, 2, 23, 0, 0, tzinfo=datetime.timezone.utc) self.fetch_history_duration = (self.fetch_history_end - self.fetch_history_date) self.fetch_history_data = { 'status': True, 'result': {'foo': 'bar'}, 'stdout': 'blabla', 'stderr': 'blablabla', } self.snapshot = { 'id': hash_to_bytes('2498dbf535f882bc7f9a18fb16c9ad27fda7bab7'), 'branches': { b'master': { 'target': self.revision['id'], 'target_type': 'revision', }, }, 'next_branch': None } self.empty_snapshot = { 'id': hash_to_bytes('1a8893e6a86f444e8be8e7bda6cb34fb1735a00e'), 'branches': {}, 'next_branch': None } self.complete_snapshot = { 'id': hash_to_bytes('6e65b86363953b780d92b0a928f3e8fcdd10db36'), 'branches': { b'directory': { 'target': hash_to_bytes( '1bd0e65f7d2ff14ae994de17a1e7fe65111dcad8'), 'target_type': 'directory', }, b'content': { 'target': hash_to_bytes( 'fe95a46679d128ff167b7c55df5d02356c5a1ae1'), 'target_type': 'content', }, b'alias': { 'target': b'revision', 'target_type': 'alias', }, b'revision': { 'target': hash_to_bytes( 'aafb16d69fd30ff58afdd69036a26047f3aebdc6'), 'target_type': 'revision', }, b'release': { 'target': hash_to_bytes( '7045404f3d1c54e6473c71bbb716529fbad4be24'), 'target_type': 'release', }, b'snapshot': { 'target': hash_to_bytes( '1a8893e6a86f444e8be8e7bda6cb34fb1735a00e'), 'target_type': 'snapshot', }, b'dangling': None, }, 'next_branch': None } - def tearDown(self): - self.reset_storage_tables() - super().tearDown() - -class CommonTestStorage(BaseTestStorage): +class CommonTestStorage(TestStorageData): """Base class for Storage testing. This class is used as-is to test local storage (see TestLocalStorage below) and remote storage (see TestRemoteStorage in test_remote_storage.py. We need to have the two classes inherit from this base class separately to avoid nosetests running the tests from the base class twice. """ @staticmethod def normalize_entity(entity): entity = copy.deepcopy(entity) for key in ('date', 'committer_date'): if key in entity: entity[key] = identifiers.normalize_timestamp(entity[key]) return entity def test_check_config(self): self.assertTrue(self.storage.check_config(check_write=True)) self.assertTrue(self.storage.check_config(check_write=False)) def test_content_add(self): cont = self.cont self.storage.content_add([cont]) if hasattr(self.storage, 'objstorage'): self.assertIn(cont['sha1'], self.storage.objstorage) self.cursor.execute('SELECT sha1, sha1_git, sha256, length, status' ' FROM content WHERE sha1 = %s', (cont['sha1'],)) datum = self.cursor.fetchone() self.assertEqual( (datum[0].tobytes(), datum[1].tobytes(), datum[2].tobytes(), datum[3], datum[4]), (cont['sha1'], cont['sha1_git'], cont['sha256'], cont['length'], 'visible')) def test_content_add_collision(self): cont1 = self.cont # create (corrupted) content with same sha1{,_git} but != sha256 cont1b = cont1.copy() sha256_array = bytearray(cont1b['sha256']) sha256_array[0] += 1 cont1b['sha256'] = bytes(sha256_array) with self.assertRaises(psycopg2.IntegrityError): self.storage.content_add([cont1, cont1b]) def test_skipped_content_add(self): cont = self.skipped_cont.copy() cont2 = self.skipped_cont2.copy() cont2['blake2s256'] = None self.storage.content_add([cont, cont, cont2]) self.cursor.execute('SELECT sha1, sha1_git, sha256, blake2s256, ' 'length, status, reason ' 'FROM skipped_content ORDER BY sha1_git') datums = self.cursor.fetchall() - self.assertEquals(2, len(datums)) + self.assertEqual(2, len(datums)) datum = datums[0] self.assertEqual( (datum[0].tobytes(), datum[1].tobytes(), datum[2].tobytes(), datum[3].tobytes(), datum[4], datum[5], datum[6]), (cont['sha1'], cont['sha1_git'], cont['sha256'], cont['blake2s256'], cont['length'], 'absent', 'Content too long') ) datum2 = datums[1] self.assertEqual( (datum2[0].tobytes(), datum2[1].tobytes(), datum2[2].tobytes(), datum2[3], datum2[4], datum2[5], datum2[6]), (cont2['sha1'], cont2['sha1_git'], cont2['sha256'], cont2['blake2s256'], cont2['length'], 'absent', 'Content too long') ) def test_content_missing(self): cont2 = self.cont2 missing_cont = self.missing_cont self.storage.content_add([cont2]) test_contents = [cont2] missing_per_hash = defaultdict(list) for i in range(256): test_content = missing_cont.copy() for hash in ['sha1', 'sha256', 'sha1_git', 'blake2s256']: test_content[hash] = bytes([i]) + test_content[hash][1:] missing_per_hash[hash].append(test_content[hash]) test_contents.append(test_content) self.assertCountEqual( self.storage.content_missing(test_contents), missing_per_hash['sha1'] ) for hash in ['sha1', 'sha256', 'sha1_git', 'blake2s256']: self.assertCountEqual( self.storage.content_missing(test_contents, key_hash=hash), missing_per_hash[hash] ) def test_content_missing_per_sha1(self): # given cont2 = self.cont2 missing_cont = self.missing_cont self.storage.content_add([cont2]) # when gen = self.storage.content_missing_per_sha1([cont2['sha1'], missing_cont['sha1']]) # then self.assertEqual(list(gen), [missing_cont['sha1']]) def test_content_get_metadata(self): cont1 = self.cont.copy() cont2 = self.cont2.copy() self.storage.content_add([cont1, cont2]) gen = self.storage.content_get_metadata([cont1['sha1'], cont2['sha1']]) # we only retrieve the metadata cont1.pop('data') cont2.pop('data') self.assertCountEqual(list(gen), [cont1, cont2]) def test_content_get_metadata_missing_sha1(self): cont1 = self.cont.copy() cont2 = self.cont2.copy() missing_cont = self.missing_cont.copy() self.storage.content_add([cont1, cont2]) gen = self.storage.content_get_metadata([missing_cont['sha1']]) # All the metadata keys are None missing_cont.pop('data') for key in list(missing_cont): if key != 'sha1': missing_cont[key] = None self.assertEqual(list(gen), [missing_cont]) def test_directory_add(self): init_missing = list(self.storage.directory_missing([self.dir['id']])) self.assertEqual([self.dir['id']], init_missing) self.storage.directory_add([self.dir]) stored_data = list(self.storage.directory_ls(self.dir['id'])) data_to_store = [] for ent in sorted(self.dir['entries'], key=itemgetter('name')): data_to_store.append({ 'dir_id': self.dir['id'], 'type': ent['type'], 'target': ent['target'], 'name': ent['name'], 'perms': ent['perms'], 'status': None, 'sha1': None, 'sha1_git': None, 'sha256': None, 'length': None, }) self.assertEqual(data_to_store, stored_data) after_missing = list(self.storage.directory_missing([self.dir['id']])) self.assertEqual([], after_missing) def test_directory_entry_get_by_path(self): # given init_missing = list(self.storage.directory_missing([self.dir3['id']])) self.assertEqual([self.dir3['id']], init_missing) self.storage.directory_add([self.dir3]) expected_entries = [ { 'dir_id': self.dir3['id'], 'name': b'foo', 'type': 'file', 'target': self.cont['sha1_git'], 'sha1': None, 'sha1_git': None, 'sha256': None, 'status': None, 'perms': from_disk.DentryPerms.content, 'length': None, }, { 'dir_id': self.dir3['id'], 'name': b'bar', 'type': 'dir', 'target': b'12345678901234560000', 'sha1': None, 'sha1_git': None, 'sha256': None, 'status': None, 'perms': from_disk.DentryPerms.directory, 'length': None, }, { 'dir_id': self.dir3['id'], 'name': b'hello', 'type': 'file', 'target': b'12345678901234567890', 'sha1': None, 'sha1_git': None, 'sha256': None, 'status': None, 'perms': from_disk.DentryPerms.content, 'length': None, }, ] # when (all must be found here) for entry, expected_entry in zip(self.dir3['entries'], expected_entries): actual_entry = self.storage.directory_entry_get_by_path( self.dir3['id'], [entry['name']]) self.assertEqual(actual_entry, expected_entry) # when (nothing should be found here since self.dir is not persisted.) for entry in self.dir['entries']: actual_entry = self.storage.directory_entry_get_by_path( self.dir['id'], [entry['name']]) self.assertIsNone(actual_entry) def test_revision_add(self): init_missing = self.storage.revision_missing([self.revision['id']]) self.assertEqual([self.revision['id']], list(init_missing)) self.storage.revision_add([self.revision]) end_missing = self.storage.revision_missing([self.revision['id']]) self.assertEqual([], list(end_missing)) def test_revision_log(self): # given # self.revision4 -is-child-of-> self.revision3 self.storage.revision_add([self.revision3, self.revision4]) # when actual_results = list(self.storage.revision_log( [self.revision4['id']])) # hack: ids generated for actual_result in actual_results: del actual_result['author']['id'] del actual_result['committer']['id'] self.assertEqual(len(actual_results), 2) # rev4 -child-> rev3 - self.assertEquals(actual_results[0], - self.normalize_entity(self.revision4)) - self.assertEquals(actual_results[1], - self.normalize_entity(self.revision3)) + self.assertEqual(actual_results[0], + self.normalize_entity(self.revision4)) + self.assertEqual(actual_results[1], + self.normalize_entity(self.revision3)) def test_revision_log_with_limit(self): # given # self.revision4 -is-child-of-> self.revision3 self.storage.revision_add([self.revision3, self.revision4]) actual_results = list(self.storage.revision_log( [self.revision4['id']], 1)) # hack: ids generated for actual_result in actual_results: del actual_result['author']['id'] del actual_result['committer']['id'] self.assertEqual(len(actual_results), 1) - self.assertEquals(actual_results[0], self.revision4) + self.assertEqual(actual_results[0], self.revision4) @staticmethod def _short_revision(revision): return [revision['id'], revision['parents']] def test_revision_shortlog(self): # given # self.revision4 -is-child-of-> self.revision3 self.storage.revision_add([self.revision3, self.revision4]) # when actual_results = list(self.storage.revision_shortlog( [self.revision4['id']])) self.assertEqual(len(actual_results), 2) # rev4 -child-> rev3 - self.assertEquals(list(actual_results[0]), - self._short_revision(self.revision4)) - self.assertEquals(list(actual_results[1]), - self._short_revision(self.revision3)) + self.assertEqual(list(actual_results[0]), + self._short_revision(self.revision4)) + self.assertEqual(list(actual_results[1]), + self._short_revision(self.revision3)) def test_revision_shortlog_with_limit(self): # given # self.revision4 -is-child-of-> self.revision3 self.storage.revision_add([self.revision3, self.revision4]) actual_results = list(self.storage.revision_shortlog( [self.revision4['id']], 1)) self.assertEqual(len(actual_results), 1) - self.assertEquals(list(actual_results[0]), - self._short_revision(self.revision4)) + self.assertEqual(list(actual_results[0]), + self._short_revision(self.revision4)) def test_revision_get(self): self.storage.revision_add([self.revision]) actual_revisions = list(self.storage.revision_get( [self.revision['id'], self.revision2['id']])) # when del actual_revisions[0]['author']['id'] # hack: ids are generated del actual_revisions[0]['committer']['id'] self.assertEqual(len(actual_revisions), 2) self.assertEqual(actual_revisions[0], self.normalize_entity(self.revision)) self.assertIsNone(actual_revisions[1]) def test_revision_get_no_parents(self): self.storage.revision_add([self.revision3]) get = list(self.storage.revision_get([self.revision3['id']])) self.assertEqual(len(get), 1) self.assertEqual(get[0]['parents'], []) # no parents on this one def test_release_add(self): init_missing = self.storage.release_missing([self.release['id'], self.release2['id']]) self.assertEqual([self.release['id'], self.release2['id']], list(init_missing)) self.storage.release_add([self.release, self.release2]) end_missing = self.storage.release_missing([self.release['id'], self.release2['id']]) self.assertEqual([], list(end_missing)) def test_release_get(self): # given self.storage.release_add([self.release, self.release2]) # when actual_releases = list(self.storage.release_get([self.release['id'], self.release2['id']])) # then for actual_release in actual_releases: del actual_release['author']['id'] # hack: ids are generated - self.assertEquals([self.normalize_entity(self.release), - self.normalize_entity(self.release2)], - [actual_releases[0], actual_releases[1]]) + self.assertEqual([self.normalize_entity(self.release), + self.normalize_entity(self.release2)], + [actual_releases[0], actual_releases[1]]) def test_origin_add_one(self): origin0 = self.storage.origin_get(self.origin) self.assertIsNone(origin0) id = self.storage.origin_add_one(self.origin) actual_origin = self.storage.origin_get({'url': self.origin['url'], 'type': self.origin['type']}) self.assertEqual(actual_origin['id'], id) id2 = self.storage.origin_add_one(self.origin) self.assertEqual(id, id2) def test_origin_add(self): origin0 = self.storage.origin_get(self.origin) self.assertIsNone(origin0) origin1, origin2 = self.storage.origin_add([self.origin, self.origin2]) actual_origin = self.storage.origin_get({ 'url': self.origin['url'], 'type': self.origin['type'], }) self.assertEqual(actual_origin['id'], origin1['id']) actual_origin2 = self.storage.origin_get({ 'url': self.origin2['url'], 'type': self.origin2['type'], }) self.assertEqual(actual_origin2['id'], origin2['id']) def test_origin_add_twice(self): add1 = self.storage.origin_add([self.origin, self.origin2]) add2 = self.storage.origin_add([self.origin, self.origin2]) self.assertEqual(add1, add2) def test_origin_get(self): self.assertIsNone(self.storage.origin_get(self.origin)) id = self.storage.origin_add_one(self.origin) # lookup per type and url (returns id) actual_origin0 = self.storage.origin_get({'url': self.origin['url'], 'type': self.origin['type']}) self.assertEqual(actual_origin0['id'], id) # lookup per id (returns dict) actual_origin1 = self.storage.origin_get({'id': id}) self.assertEqual(actual_origin1, {'id': id, 'type': self.origin['type'], 'url': self.origin['url']}) def test_origin_search(self): found_origins = list(self.storage.origin_search(self.origin['url'])) self.assertEqual(len(found_origins), 0) found_origins = list(self.storage.origin_search(self.origin['url'], regexp=True)) self.assertEqual(len(found_origins), 0) id = self.storage.origin_add_one(self.origin) origin_data = {'id': id, 'type': self.origin['type'], 'url': self.origin['url']} found_origins = list(self.storage.origin_search(self.origin['url'])) self.assertEqual(len(found_origins), 1) self.assertEqual(found_origins[0], origin_data) found_origins = list(self.storage.origin_search( '.' + self.origin['url'][1:-1] + '.', regexp=True)) self.assertEqual(len(found_origins), 1) self.assertEqual(found_origins[0], origin_data) id2 = self.storage.origin_add_one(self.origin2) origin2_data = {'id': id2, 'type': self.origin2['type'], 'url': self.origin2['url']} found_origins = list(self.storage.origin_search(self.origin2['url'])) self.assertEqual(len(found_origins), 1) self.assertEqual(found_origins[0], origin2_data) found_origins = list(self.storage.origin_search( '.' + self.origin2['url'][1:-1] + '.', regexp=True)) self.assertEqual(len(found_origins), 1) self.assertEqual(found_origins[0], origin2_data) found_origins = list(self.storage.origin_search('/')) self.assertEqual(len(found_origins), 2) found_origins = list(self.storage.origin_search('.*/.*', regexp=True)) self.assertEqual(len(found_origins), 2) found_origins = list(self.storage.origin_search('/', offset=0, limit=1)) # noqa self.assertEqual(len(found_origins), 1) self.assertEqual(found_origins[0], origin_data) found_origins = list(self.storage.origin_search('.*/.*', offset=0, limit=1, regexp=True)) # noqa self.assertEqual(len(found_origins), 1) self.assertEqual(found_origins[0], origin_data) found_origins = list(self.storage.origin_search('/', offset=1, limit=1)) # noqa self.assertEqual(len(found_origins), 1) self.assertEqual(found_origins[0], origin2_data) found_origins = list(self.storage.origin_search('.*/.*', offset=1, limit=1, regexp=True)) # noqa self.assertEqual(len(found_origins), 1) self.assertEqual(found_origins[0], origin2_data) def test_origin_visit_add(self): # given self.assertIsNone(self.storage.origin_get(self.origin2)) origin_id = self.storage.origin_add_one(self.origin2) self.assertIsNotNone(origin_id) # when origin_visit1 = self.storage.origin_visit_add( origin_id, - ts=self.date_visit2) + date=self.date_visit2) # then - self.assertEquals(origin_visit1['origin'], origin_id) + self.assertEqual(origin_visit1['origin'], origin_id) self.assertIsNotNone(origin_visit1['visit']) self.assertTrue(origin_visit1['visit'] > 0) actual_origin_visits = list(self.storage.origin_visit_get(origin_id)) - self.assertEquals(actual_origin_visits, - [{ - 'origin': origin_id, - 'date': self.date_visit2, - 'visit': origin_visit1['visit'], - 'status': 'ongoing', - 'metadata': None, - 'snapshot': None, - }]) + self.assertEqual(actual_origin_visits, + [{ + 'origin': origin_id, + 'date': self.date_visit2, + 'visit': origin_visit1['visit'], + 'status': 'ongoing', + 'metadata': None, + 'snapshot': None, + }]) def test_origin_visit_update(self): # given origin_id = self.storage.origin_add_one(self.origin2) origin_id2 = self.storage.origin_add_one(self.origin) origin_visit1 = self.storage.origin_visit_add( origin_id, - ts=self.date_visit2) + date=self.date_visit2) origin_visit2 = self.storage.origin_visit_add( origin_id, - ts=self.date_visit3) + date=self.date_visit3) origin_visit3 = self.storage.origin_visit_add( origin_id2, - ts=self.date_visit3) + date=self.date_visit3) # when visit1_metadata = { 'contents': 42, 'directories': 22, } self.storage.origin_visit_update( origin_id, origin_visit1['visit'], status='full', metadata=visit1_metadata) self.storage.origin_visit_update(origin_id2, origin_visit3['visit'], status='partial') # then actual_origin_visits = list(self.storage.origin_visit_get(origin_id)) - self.assertEquals(actual_origin_visits, [{ + self.assertEqual(actual_origin_visits, [{ 'origin': origin_visit2['origin'], 'date': self.date_visit2, 'visit': origin_visit1['visit'], 'status': 'full', 'metadata': visit1_metadata, 'snapshot': None, }, { 'origin': origin_visit2['origin'], 'date': self.date_visit3, 'visit': origin_visit2['visit'], 'status': 'ongoing', 'metadata': None, 'snapshot': None, }]) actual_origin_visits_bis = list(self.storage.origin_visit_get( origin_id, limit=1)) - self.assertEquals(actual_origin_visits_bis, - [{ - 'origin': origin_visit2['origin'], - 'date': self.date_visit2, - 'visit': origin_visit1['visit'], - 'status': 'full', - 'metadata': visit1_metadata, - 'snapshot': None, - }]) + self.assertEqual(actual_origin_visits_bis, + [{ + 'origin': origin_visit2['origin'], + 'date': self.date_visit2, + 'visit': origin_visit1['visit'], + 'status': 'full', + 'metadata': visit1_metadata, + 'snapshot': None, + }]) actual_origin_visits_ter = list(self.storage.origin_visit_get( origin_id, last_visit=origin_visit1['visit'])) - self.assertEquals(actual_origin_visits_ter, - [{ - 'origin': origin_visit2['origin'], - 'date': self.date_visit3, - 'visit': origin_visit2['visit'], - 'status': 'ongoing', - 'metadata': None, - 'snapshot': None, - }]) + self.assertEqual(actual_origin_visits_ter, + [{ + 'origin': origin_visit2['origin'], + 'date': self.date_visit3, + 'visit': origin_visit2['visit'], + 'status': 'ongoing', + 'metadata': None, + 'snapshot': None, + }]) actual_origin_visits2 = list(self.storage.origin_visit_get(origin_id2)) - self.assertEquals(actual_origin_visits2, - [{ - 'origin': origin_visit3['origin'], - 'date': self.date_visit3, - 'visit': origin_visit3['visit'], - 'status': 'partial', - 'metadata': None, - 'snapshot': None, - }]) + self.assertEqual(actual_origin_visits2, + [{ + 'origin': origin_visit3['origin'], + 'date': self.date_visit3, + 'visit': origin_visit3['visit'], + 'status': 'partial', + 'metadata': None, + 'snapshot': None, + }]) def test_origin_visit_get_by(self): origin_id = self.storage.origin_add_one(self.origin2) origin_id2 = self.storage.origin_add_one(self.origin) origin_visit1 = self.storage.origin_visit_add( origin_id, - ts=self.date_visit2) + date=self.date_visit2) self.storage.snapshot_add(origin_id, origin_visit1['visit'], self.snapshot) # Add some other {origin, visit} entries - self.storage.origin_visit_add(origin_id, ts=self.date_visit3) - self.storage.origin_visit_add(origin_id2, ts=self.date_visit3) + self.storage.origin_visit_add(origin_id, date=self.date_visit3) + self.storage.origin_visit_add(origin_id2, date=self.date_visit3) # when visit1_metadata = { 'contents': 42, 'directories': 22, } self.storage.origin_visit_update( origin_id, origin_visit1['visit'], status='full', metadata=visit1_metadata) expected_origin_visit = origin_visit1.copy() expected_origin_visit.update({ 'origin': origin_id, 'visit': origin_visit1['visit'], 'date': self.date_visit2, 'metadata': visit1_metadata, 'status': 'full', 'snapshot': self.snapshot['id'], }) # when actual_origin_visit1 = self.storage.origin_visit_get_by( origin_visit1['origin'], origin_visit1['visit']) # then - self.assertEquals(actual_origin_visit1, expected_origin_visit) + self.assertEqual(actual_origin_visit1, expected_origin_visit) def test_origin_visit_get_by_no_result(self): # No result actual_origin_visit = self.storage.origin_visit_get_by( 10, 999) self.assertIsNone(actual_origin_visit) def test_snapshot_add_get_empty(self): origin_id = self.storage.origin_add_one(self.origin) origin_visit1 = self.storage.origin_visit_add(origin_id, self.date_visit1) visit_id = origin_visit1['visit'] self.storage.snapshot_add(origin_id, visit_id, self.empty_snapshot) by_id = self.storage.snapshot_get(self.empty_snapshot['id']) self.assertEqual(by_id, self.empty_snapshot) by_ov = self.storage.snapshot_get_by_origin_visit(origin_id, visit_id) self.assertEqual(by_ov, self.empty_snapshot) def test_snapshot_add_get_complete(self): origin_id = self.storage.origin_add_one(self.origin) origin_visit1 = self.storage.origin_visit_add(origin_id, self.date_visit1) visit_id = origin_visit1['visit'] self.storage.snapshot_add(origin_id, visit_id, self.complete_snapshot) by_id = self.storage.snapshot_get(self.complete_snapshot['id']) self.assertEqual(by_id, self.complete_snapshot) by_ov = self.storage.snapshot_get_by_origin_visit(origin_id, visit_id) self.assertEqual(by_ov, self.complete_snapshot) def test_snapshot_add_count_branches(self): origin_id = self.storage.origin_add_one(self.origin) origin_visit1 = self.storage.origin_visit_add(origin_id, self.date_visit1) visit_id = origin_visit1['visit'] self.storage.snapshot_add(origin_id, visit_id, self.complete_snapshot) snp_id = self.complete_snapshot['id'] snp_size = self.storage.snapshot_count_branches(snp_id) expected_snp_size = { 'alias': 1, 'content': 1, 'directory': 1, 'release': 1, 'revision': 1, 'snapshot': 1, None: 1 } self.assertEqual(snp_size, expected_snp_size) def test_snapshot_add_get_paginated(self): origin_id = self.storage.origin_add_one(self.origin) origin_visit1 = self.storage.origin_visit_add(origin_id, self.date_visit1) visit_id = origin_visit1['visit'] self.storage.snapshot_add(origin_id, visit_id, self.complete_snapshot) snp_id = self.complete_snapshot['id'] branches = self.complete_snapshot['branches'] branch_names = list(sorted(branches)) snapshot = self.storage.snapshot_get_branches(snp_id, branches_from=b'release') rel_idx = branch_names.index(b'release') expected_snapshot = { 'id': snp_id, 'branches': { name: branches[name] for name in branch_names[rel_idx:] }, 'next_branch': None, } self.assertEqual(snapshot, expected_snapshot) snapshot = self.storage.snapshot_get_branches(snp_id, branches_count=1) expected_snapshot = { 'id': snp_id, 'branches': { branch_names[0]: branches[branch_names[0]], }, 'next_branch': b'content', } self.assertEqual(snapshot, expected_snapshot) snapshot = self.storage.snapshot_get_branches( snp_id, branches_from=b'directory', branches_count=3) dir_idx = branch_names.index(b'directory') expected_snapshot = { 'id': snp_id, 'branches': { name: branches[name] for name in branch_names[dir_idx:dir_idx + 3] }, 'next_branch': branch_names[dir_idx + 3], } self.assertEqual(snapshot, expected_snapshot) def test_snapshot_add_get_filtered(self): origin_id = self.storage.origin_add_one(self.origin) origin_visit1 = self.storage.origin_visit_add(origin_id, self.date_visit1) visit_id = origin_visit1['visit'] self.storage.snapshot_add(origin_id, visit_id, self.complete_snapshot) snp_id = self.complete_snapshot['id'] branches = self.complete_snapshot['branches'] snapshot = self.storage.snapshot_get_branches( snp_id, target_types=['release', 'revision']) expected_snapshot = { 'id': snp_id, 'branches': { name: tgt for name, tgt in branches.items() if tgt and tgt['target_type'] in ['release', 'revision'] }, 'next_branch': None, } self.assertEqual(snapshot, expected_snapshot) snapshot = self.storage.snapshot_get_branches(snp_id, target_types=['alias']) expected_snapshot = { 'id': snp_id, 'branches': { name: tgt for name, tgt in branches.items() if tgt and tgt['target_type'] == 'alias' }, 'next_branch': None, } self.assertEqual(snapshot, expected_snapshot) def test_snapshot_add_get(self): origin_id = self.storage.origin_add_one(self.origin) origin_visit1 = self.storage.origin_visit_add(origin_id, self.date_visit1) visit_id = origin_visit1['visit'] self.storage.snapshot_add(origin_id, visit_id, self.snapshot) by_id = self.storage.snapshot_get(self.snapshot['id']) self.assertEqual(by_id, self.snapshot) by_ov = self.storage.snapshot_get_by_origin_visit(origin_id, visit_id) self.assertEqual(by_ov, self.snapshot) origin_visit_info = self.storage.origin_visit_get_by(origin_id, visit_id) self.assertEqual(origin_visit_info['snapshot'], self.snapshot['id']) def test_snapshot_add_twice(self): origin_id = self.storage.origin_add_one(self.origin) origin_visit1 = self.storage.origin_visit_add(origin_id, self.date_visit1) visit1_id = origin_visit1['visit'] self.storage.snapshot_add(origin_id, visit1_id, self.snapshot) by_ov1 = self.storage.snapshot_get_by_origin_visit(origin_id, visit1_id) self.assertEqual(by_ov1, self.snapshot) origin_visit2 = self.storage.origin_visit_add(origin_id, self.date_visit2) visit2_id = origin_visit2['visit'] self.storage.snapshot_add(origin_id, visit2_id, self.snapshot) by_ov2 = self.storage.snapshot_get_by_origin_visit(origin_id, visit2_id) self.assertEqual(by_ov2, self.snapshot) def test_snapshot_get_nonexistent(self): bogus_snapshot_id = b'bogus snapshot id 00' bogus_origin_id = 1 bogus_visit_id = 1 by_id = self.storage.snapshot_get(bogus_snapshot_id) self.assertIsNone(by_id) by_ov = self.storage.snapshot_get_by_origin_visit(bogus_origin_id, bogus_visit_id) self.assertIsNone(by_ov) def test_snapshot_get_latest(self): origin_id = self.storage.origin_add_one(self.origin) origin_visit1 = self.storage.origin_visit_add(origin_id, self.date_visit1) visit1_id = origin_visit1['visit'] origin_visit2 = self.storage.origin_visit_add(origin_id, self.date_visit2) visit2_id = origin_visit2['visit'] # Two visits, both with no snapshot: latest snapshot is None self.assertIsNone(self.storage.snapshot_get_latest(origin_id)) # Add snapshot to visit1, latest snapshot = visit 1 snapshot self.storage.snapshot_add(origin_id, visit1_id, self.complete_snapshot) - self.assertEquals(self.complete_snapshot, - self.storage.snapshot_get_latest(origin_id)) + self.assertEqual(self.complete_snapshot, + self.storage.snapshot_get_latest(origin_id)) # Status filter: both visits are status=ongoing, so no snapshot # returned self.assertIsNone( self.storage.snapshot_get_latest(origin_id, allowed_statuses=['full']) ) # Mark the first visit as completed and check status filter again self.storage.origin_visit_update(origin_id, visit1_id, status='full') - self.assertEquals( + self.assertEqual( self.complete_snapshot, self.storage.snapshot_get_latest(origin_id, allowed_statuses=['full']), ) # Add snapshot to visit2 and check that the new snapshot is returned self.storage.snapshot_add(origin_id, visit2_id, self.empty_snapshot) - self.assertEquals(self.empty_snapshot, - self.storage.snapshot_get_latest(origin_id)) + self.assertEqual(self.empty_snapshot, + self.storage.snapshot_get_latest(origin_id)) # Check that the status filter is still working - self.assertEquals( + self.assertEqual( self.complete_snapshot, self.storage.snapshot_get_latest(origin_id, allowed_statuses=['full']), ) def test_stat_counters(self): expected_keys = ['content', 'directory', 'directory_entry_dir', 'origin', 'person', 'revision'] for key in expected_keys: self.cursor.execute('select * from swh_update_counter(%s)', (key,)) self.conn.commit() counters = self.storage.stat_counters() self.assertTrue(set(expected_keys) <= set(counters)) self.assertIsInstance(counters[expected_keys[0]], int) def test_content_find_with_present_content(self): # 1. with something to find cont = self.cont self.storage.content_add([cont]) actually_present = self.storage.content_find({'sha1': cont['sha1']}) actually_present.pop('ctime') self.assertEqual(actually_present, { 'sha1': cont['sha1'], 'sha256': cont['sha256'], 'sha1_git': cont['sha1_git'], 'blake2s256': cont['blake2s256'], 'length': cont['length'], 'status': 'visible' }) # 2. with something to find actually_present = self.storage.content_find( {'sha1_git': cont['sha1_git']}) actually_present.pop('ctime') self.assertEqual(actually_present, { 'sha1': cont['sha1'], 'sha256': cont['sha256'], 'sha1_git': cont['sha1_git'], 'blake2s256': cont['blake2s256'], 'length': cont['length'], 'status': 'visible' }) # 3. with something to find actually_present = self.storage.content_find( {'sha256': cont['sha256']}) actually_present.pop('ctime') self.assertEqual(actually_present, { 'sha1': cont['sha1'], 'sha256': cont['sha256'], 'sha1_git': cont['sha1_git'], 'blake2s256': cont['blake2s256'], 'length': cont['length'], 'status': 'visible' }) # 4. with something to find actually_present = self.storage.content_find({ 'sha1': cont['sha1'], 'sha1_git': cont['sha1_git'], 'sha256': cont['sha256'], 'blake2s256': cont['blake2s256'], }) actually_present.pop('ctime') self.assertEqual(actually_present, { 'sha1': cont['sha1'], 'sha256': cont['sha256'], 'sha1_git': cont['sha1_git'], 'blake2s256': cont['blake2s256'], 'length': cont['length'], 'status': 'visible' }) def test_content_find_with_non_present_content(self): # 1. with something that does not exist missing_cont = self.missing_cont actually_present = self.storage.content_find( {'sha1': missing_cont['sha1']}) self.assertIsNone(actually_present) # 2. with something that does not exist actually_present = self.storage.content_find( {'sha1_git': missing_cont['sha1_git']}) self.assertIsNone(actually_present) # 3. with something that does not exist actually_present = self.storage.content_find( {'sha256': missing_cont['sha256']}) self.assertIsNone(actually_present) def test_content_find_bad_input(self): # 1. with bad input with self.assertRaises(ValueError): self.storage.content_find({}) # empty is bad # 2. with bad input with self.assertRaises(ValueError): self.storage.content_find( {'unknown-sha1': 'something'}) # not the right key def test_object_find_by_sha1_git(self): sha1_gits = [b'00000000000000000000'] expected = { b'00000000000000000000': [], } self.storage.content_add([self.cont]) sha1_gits.append(self.cont['sha1_git']) expected[self.cont['sha1_git']] = [{ 'sha1_git': self.cont['sha1_git'], 'type': 'content', 'id': self.cont['sha1'], }] self.storage.directory_add([self.dir]) sha1_gits.append(self.dir['id']) expected[self.dir['id']] = [{ 'sha1_git': self.dir['id'], 'type': 'directory', 'id': self.dir['id'], }] self.storage.revision_add([self.revision]) sha1_gits.append(self.revision['id']) expected[self.revision['id']] = [{ 'sha1_git': self.revision['id'], 'type': 'revision', 'id': self.revision['id'], }] self.storage.release_add([self.release]) sha1_gits.append(self.release['id']) expected[self.release['id']] = [{ 'sha1_git': self.release['id'], 'type': 'release', 'id': self.release['id'], }] ret = self.storage.object_find_by_sha1_git(sha1_gits) for val in ret.values(): for obj in val: del obj['object_id'] self.assertEqual(expected, ret) def test_tool_add(self): tool = { 'name': 'some-unknown-tool', 'version': 'some-version', 'configuration': {"debian-package": "some-package"}, } actual_tool = self.storage.tool_get(tool) self.assertIsNone(actual_tool) # does not exist # add it actual_tools = list(self.storage.tool_add([tool])) - self.assertEquals(len(actual_tools), 1) + self.assertEqual(len(actual_tools), 1) actual_tool = actual_tools[0] self.assertIsNotNone(actual_tool) # now it exists new_id = actual_tool.pop('id') - self.assertEquals(actual_tool, tool) + self.assertEqual(actual_tool, tool) actual_tools2 = list(self.storage.tool_add([tool])) actual_tool2 = actual_tools2[0] self.assertIsNotNone(actual_tool2) # now it exists new_id2 = actual_tool2.pop('id') self.assertEqual(new_id, new_id2) self.assertEqual(actual_tool, actual_tool2) def test_tool_add_multiple(self): tool = { 'name': 'some-unknown-tool', 'version': 'some-version', 'configuration': {"debian-package": "some-package"}, } actual_tools = list(self.storage.tool_add([tool])) self.assertEqual(len(actual_tools), 1) new_tools = [tool, { 'name': 'yet-another-tool', 'version': 'version', 'configuration': {}, }] actual_tools = list(self.storage.tool_add(new_tools)) self.assertEqual(len(actual_tools), 2) # order not guaranteed, so we iterate over results to check for tool in actual_tools: _id = tool.pop('id') self.assertIsNotNone(_id) self.assertIn(tool, new_tools) def test_tool_get_missing(self): tool = { 'name': 'unknown-tool', 'version': '3.1.0rc2-31-ga2cbb8c', 'configuration': {"command_line": "nomossa "}, } actual_tool = self.storage.tool_get(tool) self.assertIsNone(actual_tool) def test_tool_metadata_get_missing_context(self): tool = { 'name': 'swh-metadata-translator', 'version': '0.0.1', 'configuration': {"context": "unknown-context"}, } actual_tool = self.storage.tool_get(tool) self.assertIsNone(actual_tool) def test_tool_metadata_get(self): tool = { 'name': 'swh-metadata-translator', 'version': '0.0.1', 'configuration': {"type": "local", "context": "npm"}, } tools = list(self.storage.tool_add([tool])) expected_tool = tools[0] # when actual_tool = self.storage.tool_get(tool) # then self.assertEqual(expected_tool, actual_tool) def test_metadata_provider_get_by(self): # given no_provider = self.storage.metadata_provider_get_by({ 'provider_name': self.provider['name'], 'provider_url': self.provider['url'] }) self.assertIsNone(no_provider) # when provider_id = self.storage.metadata_provider_add( self.provider['name'], self.provider['type'], self.provider['url'], self.provider['metadata']) actual_provider = self.storage.metadata_provider_get_by({ 'provider_name': self.provider['name'], 'provider_url': self.provider['url'] }) # then self.assertTrue(provider_id, actual_provider['id']) def test_origin_metadata_add(self): # given origin_id = self.storage.origin_add([self.origin])[0]['id'] origin_metadata0 = list(self.storage.origin_metadata_get_by(origin_id)) self.assertTrue(len(origin_metadata0) == 0) tools = list(self.storage.tool_add([self.metadata_tool])) tool = tools[0] self.storage.metadata_provider_add( self.provider['name'], self.provider['type'], self.provider['url'], self.provider['metadata']) provider = self.storage.metadata_provider_get_by({ 'provider_name': self.provider['name'], 'provider_url': self.provider['url'] }) tool = self.storage.tool_get(self.metadata_tool) # when adding for the same origin 2 metadatas o_m1 = self.storage.origin_metadata_add( origin_id, self.origin_metadata['discovery_date'], provider['id'], tool['id'], self.origin_metadata['metadata']) actual_om1 = list(self.storage.origin_metadata_get_by(origin_id)) # then self.assertEqual(actual_om1[0]['id'], o_m1) self.assertEqual(len(actual_om1), 1) self.assertEqual(actual_om1[0]['origin_id'], origin_id) def test_origin_metadata_get(self): # given origin_id = self.storage.origin_add([self.origin])[0]['id'] origin_id2 = self.storage.origin_add([self.origin2])[0]['id'] self.storage.metadata_provider_add(self.provider['name'], self.provider['type'], self.provider['url'], self.provider['metadata']) provider = self.storage.metadata_provider_get_by({ 'provider_name': self.provider['name'], 'provider_url': self.provider['url'] }) tool = self.storage.tool_get(self.metadata_tool) # when adding for the same origin 2 metadatas o_m1 = self.storage.origin_metadata_add( origin_id, self.origin_metadata['discovery_date'], provider['id'], tool['id'], self.origin_metadata['metadata']) o_m2 = self.storage.origin_metadata_add( origin_id2, self.origin_metadata2['discovery_date'], provider['id'], tool['id'], self.origin_metadata2['metadata']) o_m3 = self.storage.origin_metadata_add( origin_id, self.origin_metadata2['discovery_date'], provider['id'], tool['id'], self.origin_metadata2['metadata']) all_metadatas = list(self.storage.origin_metadata_get_by(origin_id)) metadatas_for_origin2 = list(self.storage.origin_metadata_get_by( origin_id2)) expected_results = [{ 'origin_id': origin_id, 'discovery_date': datetime.datetime( 2017, 1, 2, 0, 0, tzinfo=psycopg2.tz.FixedOffsetTimezone( offset=60, name=None)), 'metadata': { 'name': 'test_origin_metadata', 'version': '0.0.1' }, 'id': o_m3, 'provider_id': provider['id'], 'provider_name': 'hal', 'provider_type': 'deposit-client', 'provider_url': 'http:///hal/inria', 'tool_id': tool['id'] }, { 'origin_id': origin_id, 'discovery_date': datetime.datetime( 2015, 1, 2, 0, 0, tzinfo=psycopg2.tz.FixedOffsetTimezone( offset=60, name=None)), 'metadata': { 'name': 'test_origin_metadata', 'version': '0.0.1' }, 'id': o_m1, 'provider_id': provider['id'], 'provider_name': 'hal', 'provider_type': 'deposit-client', 'provider_url': 'http:///hal/inria', 'tool_id': tool['id'] }] # then self.assertEqual(len(all_metadatas), 2) self.assertEqual(len(metadatas_for_origin2), 1) self.assertEqual(metadatas_for_origin2[0]['id'], o_m2) self.assertEqual(all_metadatas, expected_results) def test_origin_metadata_get_by_provider_type(self): # given origin_id = self.storage.origin_add([self.origin])[0]['id'] origin_id2 = self.storage.origin_add([self.origin2])[0]['id'] self.storage.metadata_provider_add( self.provider['name'], self.provider['type'], self.provider['url'], self.provider['metadata']) provider1 = self.storage.metadata_provider_get_by({ 'provider_name': self.provider['name'], 'provider_url': self.provider['url'] }) self.storage.metadata_provider_add( 'swMATH', 'registry', 'http://www.swmath.org/', {'email': 'contact@swmath.org', 'license': 'All rights reserved'}) provider2 = self.storage.metadata_provider_get_by({ 'provider_name': 'swMATH', 'provider_url': 'http://www.swmath.org/' }) # using the only tool now inserted in the data.sql, but for this # provider should be a crawler tool (not yet implemented) tool = self.storage.tool_get(self.metadata_tool) # when adding for the same origin 2 metadatas o_m1 = self.storage.origin_metadata_add( origin_id, self.origin_metadata['discovery_date'], provider1['id'], tool['id'], self.origin_metadata['metadata']) o_m2 = self.storage.origin_metadata_add( origin_id2, self.origin_metadata2['discovery_date'], provider2['id'], tool['id'], self.origin_metadata2['metadata']) provider_type = 'registry' m_by_provider = list(self.storage. origin_metadata_get_by( origin_id2, provider_type)) expected_results = [{ 'origin_id': origin_id2, 'discovery_date': datetime.datetime( 2017, 1, 2, 0, 0, tzinfo=psycopg2.tz.FixedOffsetTimezone( offset=60, name=None)), 'metadata': { 'name': 'test_origin_metadata', 'version': '0.0.1' }, 'id': o_m2, 'provider_id': provider2['id'], 'provider_name': 'swMATH', 'provider_type': provider_type, 'provider_url': 'http://www.swmath.org/', 'tool_id': tool['id'] }] # then self.assertEqual(len(m_by_provider), 1) self.assertEqual(m_by_provider, expected_results) self.assertEqual(m_by_provider[0]['id'], o_m2) self.assertIsNotNone(o_m1) -class TestLocalStorage(CommonTestStorage, unittest.TestCase): +class TestLocalStorage(CommonTestStorage, StorageTestDbFixture, + unittest.TestCase): """Test the local storage""" # Can only be tested with local storage as you can't mock # datetimes for the remote server def test_fetch_history(self): origin = self.storage.origin_add_one(self.origin) with patch('datetime.datetime'): datetime.datetime.now.return_value = self.fetch_history_date fetch_history_id = self.storage.fetch_history_start(origin) datetime.datetime.now.assert_called_with(tz=datetime.timezone.utc) with patch('datetime.datetime'): datetime.datetime.now.return_value = self.fetch_history_end self.storage.fetch_history_end(fetch_history_id, self.fetch_history_data) fetch_history = self.storage.fetch_history_get(fetch_history_id) expected_fetch_history = self.fetch_history_data.copy() expected_fetch_history['id'] = fetch_history_id expected_fetch_history['origin'] = origin expected_fetch_history['date'] = self.fetch_history_date expected_fetch_history['duration'] = self.fetch_history_duration self.assertEqual(expected_fetch_history, fetch_history) # The remote API doesn't expose _person_add def test_person_get(self): # given person0 = { 'fullname': b'bob ', 'name': b'bob', 'email': b'alice@bob', } id0 = self.storage._person_add(person0) person1 = { 'fullname': b'tony ', 'name': b'tony', 'email': b'tony@bob', } id1 = self.storage._person_add(person1) # when actual_persons = self.storage.person_get([id0, id1]) # given (person injection through release for example) self.assertEqual( list(actual_persons), [ { 'id': id0, 'fullname': person0['fullname'], 'name': person0['name'], 'email': person0['email'], }, { 'id': id1, 'fullname': person1['fullname'], 'name': person1['name'], 'email': person1['email'], }, ]) # This test is only relevant on the local storage, with an actual # objstorage raising an exception def test_content_add_objstorage_exception(self): self.storage.objstorage.add = Mock( side_effect=Exception('mocked broken objstorage') ) with self.assertRaises(Exception) as e: self.storage.content_add([self.cont]) self.assertEqual(e.exception.args, ('mocked broken objstorage',)) missing = list(self.storage.content_missing([self.cont])) self.assertEqual(missing, [self.cont['sha1']]) -class AlteringSchemaTest(BaseTestStorage, unittest.TestCase): +class AlteringSchemaTest(TestStorageData, StorageTestDbFixture, + unittest.TestCase): """This class is dedicated for the rare case where the schema needs to be altered dynamically. Otherwise, the tests could be blocking when ran altogether. """ def test_content_update(self): cont = copy.deepcopy(self.cont) self.storage.content_add([cont]) # alter the sha1_git for example cont['sha1_git'] = hash_to_bytes( '3a60a5275d0333bf13468e8b3dcab90f4046e654') self.storage.content_update([cont], keys=['sha1_git']) with self.storage.get_db().transaction() as cur: cur.execute('SELECT sha1, sha1_git, sha256, length, status' ' FROM content WHERE sha1 = %s', (cont['sha1'],)) datum = cur.fetchone() self.assertEqual( (datum[0].tobytes(), datum[1].tobytes(), datum[2].tobytes(), datum[3], datum[4]), (cont['sha1'], cont['sha1_git'], cont['sha256'], cont['length'], 'visible')) def test_content_update_with_new_cols(self): with self.storage.get_db().transaction() as cur: cur.execute("""alter table content add column test text default null, add column test2 text default null""") cont = copy.deepcopy(self.cont2) self.storage.content_add([cont]) cont['test'] = 'value-1' cont['test2'] = 'value-2' self.storage.content_update([cont], keys=['test', 'test2']) with self.storage.get_db().transaction() as cur: cur.execute( 'SELECT sha1, sha1_git, sha256, length, status, test, test2' ' FROM content WHERE sha1 = %s', (cont['sha1'],)) datum = cur.fetchone() self.assertEqual( (datum[0].tobytes(), datum[1].tobytes(), datum[2].tobytes(), datum[3], datum[4], datum[5], datum[6]), (cont['sha1'], cont['sha1_git'], cont['sha256'], cont['length'], 'visible', cont['test'], cont['test2'])) with self.storage.get_db().transaction() as cur: cur.execute("""alter table content drop column test, drop column test2""") diff --git a/tox.ini b/tox.ini new file mode 100644 index 000000000..70265ee8c --- /dev/null +++ b/tox.ini @@ -0,0 +1,17 @@ +[tox] +envlist=flake8,py3 + +[testenv:py3] +deps = + .[testing] + pytest-cov + pifpaf +commands = + pifpaf run postgresql -- pytest --cov=swh --cov-branch {posargs} + +[testenv:flake8] +skip_install = true +deps = + flake8 +commands = + {envpython} -m flake8 diff --git a/utils/dump_revisions.py b/utils/dump_revisions.py index 45421f34c..5e5d220e9 100755 --- a/utils/dump_revisions.py +++ b/utils/dump_revisions.py @@ -1,62 +1,64 @@ #!/usr/bin/env python3 import os import pickle import psycopg2.extras from swh.storage import converters, db from swh.model import identifiers + QUERY = ''' select r.id, r.date, r.date_offset, r.date_neg_utc_offset, r.committer_date, r.committer_date_offset, r.committer_date_neg_utc_offset, r.type, r.directory, r.message, a.id as author_id, a.name as author_name, a.email as author_email, c.id as committer_id, c.name as committer_name, c.email as committer_email, r.metadata, r.synthetic, array(select rh.parent_id::bytea from revision_history rh where rh.id = r.id order by rh.parent_rank) as parents from revision r left join person a on a.id = r.author left join person c on c.id = r.committer where r.id > %s order by r.id limit %s ''' def dump_revision(revision): rev_id = identifiers.identifier_to_str(revision['id']) dirs = 'revs/%s/%s' % (rev_id[0:2], rev_id[2:4]) os.makedirs(dirs, exist_ok=True) with open(os.path.join(dirs, rev_id), 'wb') as f: pickle.dump(revision, f) def check_revision(revision): id = identifiers.identifier_to_str(revision['id']) computed_id = identifiers.revision_identifier(revision) if id != computed_id: dump_revision(revision) + if __name__ == '__main__': swh_db = db.Db.connect('service=swh', cursor_factory=psycopg2.extras.RealDictCursor) last_id = bytes.fromhex('51606a8181f7c6d0aff852106c3ec23ebc186439') while True: with swh_db.transaction() as cur: cur.execute(QUERY, (last_id, 10000)) if not cur.rowcount > 0: break for db_rev in db.cursor_to_bytes(cur): revision = converters.db_to_revision(db_rev) check_revision(revision) last_id = revision['id'] diff --git a/utils/fix_revisions_from_dump.py b/utils/fix_revisions_from_dump.py index 32fd7d792..1e9aea481 100755 --- a/utils/fix_revisions_from_dump.py +++ b/utils/fix_revisions_from_dump.py @@ -1,239 +1,252 @@ #!/usr/bin/env python3 import copy import itertools import os import pickle import sys from swh.model import identifiers def author_date_to_notnegutc(rev): rev['date']['negative_utc'] = False def author_date_to_negutc(rev): rev['date']['negative_utc'] = True + DATE_NEGUTC_FIX = ('set author negutc', [ (None, None), (author_date_to_negutc, 'date_neg_utcoffset = true'), ]) def committer_date_to_notnegutc(rev): rev['committer_date']['negative_utc'] = False def committer_date_to_negutc(rev): rev['committer_date']['negative_utc'] = True + COMMITTER_DATE_NEGUTC_FIX = ('set committer negutc', [ (None, None), (committer_date_to_negutc, 'committer_date_neg_utcoffset = true'), ]) def message_to_empty(rev): rev['message'] = b'' + MESSAGE_EMPTY_FIX = ('empty instead of null message', [ (None, None), (message_to_empty, "message = ''"), ]) def message_to_null(rev): rev['message'] = None + MESSAGE_NULL_FIX = ('null instead of empty message', [ (None, None), (message_to_null, "message = NULL"), ]) def message_add_nl_end(num_nl): def fix(rev, num_nl=num_nl): components = [rev['message'] if rev['message'] else b''] components.extend([b'\n'] * num_nl) rev['message'] = b''.join(components) return fix MESSAGE_ADD_NL_END_FIX = ('add newline to end of message', [ (None, None), (message_add_nl_end(1), "add 1 newline to end of message"), (message_add_nl_end(2), "add 2 newlines to end of message"), (message_add_nl_end(3), "add 3 newlines to end of message"), ]) def message_add_nl_start(num_nl): def fix(rev, num_nl=num_nl): components = [b'\n'] * num_nl components.append(rev['message'] if rev['message'] else b'') rev['message'] = b''.join(components) return fix MESSAGE_ADD_NL_START_FIX = ('add newline to start of message', [ (None, None), (message_add_nl_start(1), "add 1 newline to start of message"), (message_add_nl_start(2), "add 2 newlines to start of message"), (message_add_nl_start(3), "add 3 newlines to start of message"), ]) def author_name_doublespace(rev): rev['author']['name'] = b''.join([rev['author']['name'], b' ']) + AUTHOR_NAME_ADD_SPC_FIX = ('author double space', [ (None, None), (author_name_doublespace, 'trailing space author name') ]) def committer_name_doublespace(rev): rev['committer']['name'] = b''.join([rev['committer']['name'], b' ']) + COMMITTER_NAME_ADD_SPC_FIX = ('committer double space', [ (None, None), (committer_name_doublespace, 'trailing space committer name') ]) def author_name_null(rev): rev['author']['name'] = None + AUTHOR_NAME_NULL_FIX = ('author name null', [ (None, None), (author_name_null, 'None author name') ]) def author_email_null(rev): rev['author']['email'] = None + AUTHOR_EMAIL_NULL_FIX = ('author email null', [ (None, None), (author_email_null, 'None author email') ]) def committer_name_null(rev): rev['committer']['name'] = None + COMMITTER_NAME_NULL_FIX = ('committer name null', [ (None, None), (committer_name_null, 'None committer name') ]) def committer_email_null(rev): rev['committer']['email'] = None + COMMITTER_EMAIL_NULL_FIX = ('committer email null', [ (None, None), (committer_email_null, 'None committer email') ]) def author_add_spc(rev): rev['author'] = b''.join([ identifiers.normalize_author(rev['author']), b' ']) + AUTHOR_ADD_SPC_FIX = ('add trailing space to author specification', [ (None, None), (author_add_spc, 'add trailing space to author spec') ]) def committer_add_spc(rev): rev['committer'] = b''.join([ identifiers.normalize_author(rev['committer']), b' ']) + COMMITTER_ADD_SPC_FIX = ('add trailing space to committer specification', [ (None, None), (committer_add_spc, 'add trailing space to committer spec') ]) def fix_revision(revision): data_fixups = [] id = identifiers.identifier_to_str(revision['id']) if revision['message'] is None: data_fixups.append(MESSAGE_EMPTY_FIX) if revision['message'] == b'': data_fixups.append(MESSAGE_NULL_FIX) data_fixups.append(MESSAGE_ADD_NL_END_FIX) data_fixups.append(MESSAGE_ADD_NL_START_FIX) if revision['date']['offset'] == 0 and \ not revision['date']['negative_utc']: data_fixups.append(DATE_NEGUTC_FIX) if revision['committer_date']['offset'] == 0 and \ not revision['committer_date']['negative_utc']: data_fixups.append(COMMITTER_DATE_NEGUTC_FIX) if not data_fixups: computed_id = identifiers.revision_identifier(revision) if id == computed_id: return # Less credible fixups are first in the list, so they run last data_fixups.insert(0, COMMITTER_ADD_SPC_FIX) data_fixups.insert(0, AUTHOR_ADD_SPC_FIX) if revision['author']['name'] == b'': data_fixups.insert(0, AUTHOR_NAME_NULL_FIX) if revision['author']['email'] == b'': data_fixups.insert(0, AUTHOR_EMAIL_NULL_FIX) if revision['committer']['name'] == b'': data_fixups.insert(0, COMMITTER_NAME_NULL_FIX) if revision['committer']['email'] == b'': data_fixups.insert(0, COMMITTER_EMAIL_NULL_FIX) data_fixups.insert(0, COMMITTER_NAME_ADD_SPC_FIX) data_fixups.insert(0, AUTHOR_NAME_ADD_SPC_FIX) data_fixup_functions = [functions for title, functions in data_fixups] for corrections in itertools.product(*data_fixup_functions): sql_fixups = [] new_revision = copy.deepcopy(revision) for fun, sql_fixup in corrections: if fun: fun(new_revision) if sql_fixup: sql_fixups.append(sql_fixup) computed_id = identifiers.revision_identifier(new_revision) if id == computed_id: if not sql_fixups: return return id, sql_fixups else: return id, [] + if __name__ == '__main__': for hash in sys.stdin.readlines(): hash = hash.strip() filename = os.path.join('revs', hash[0:2], hash[2:4], hash) with open(filename, 'rb') as f: revision = pickle.load(f) id, fixes = fix_revision(revision) if not fixes: print(id) else: print(';'.join([id, 'FIXED'] + fixes)) diff --git a/version.txt b/version.txt index 1cea4f661..1c5bfc795 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.0.108-0-gd211615 \ No newline at end of file +v0.0.109-0-g1031dc5 \ No newline at end of file