diff --git a/swh/storage/cassandra/cql.py b/swh/storage/cassandra/cql.py --- a/swh/storage/cassandra/cql.py +++ b/swh/storage/cassandra/cql.py @@ -17,6 +17,7 @@ Optional, Tuple, TypeVar, + Union, ) from cassandra import CoordinationFailure @@ -44,6 +45,7 @@ from .common import Row, TOKEN_BEGIN, TOKEN_END, hash_url from .schema import CREATE_TABLES_QUERIES, HASH_ALGORITHMS +from .. import extrinsic_metadata logger = logging.getLogger(__name__) @@ -795,12 +797,12 @@ def metadata_fetcher_get(self, name, version, *, statement) -> Optional[Row]: return next(iter(self._execute_with_retries(statement, [name, version])), None) - ########################## - # 'origin_metadata' table - ########################## + ######################### + # 'object_metadata' table + ######################### - _origin_metadata_keys = [ - "origin", + _object_metadata_keys = [ + "id", "authority_type", "authority_url", "discovery_date", @@ -808,12 +810,23 @@ "fetcher_version", "format", "metadata", + "origin", + "visit", + "snapshot", + "release", + "revision", + "path", + "directory", ] - @_prepared_insert_statement("origin_metadata", _origin_metadata_keys) - def origin_metadata_add( + @_prepared_statement( + f"INSERT INTO object_metadata ({', '.join(_object_metadata_keys)}) " + f"VALUES ({', '.join('?' for _ in _object_metadata_keys)})" + ) + def object_metadata_add( self, - origin, + object_type: str, + id: str, authority_type, authority_url, discovery_date, @@ -821,43 +834,46 @@ fetcher_version, format, metadata, + context: Dict[str, Union[str, bytes, int]], *, statement, ): - return self._execute_with_retries( - statement, - [ - origin, - authority_type, - authority_url, - discovery_date, - fetcher_name, - fetcher_version, - format, - metadata, - ], + params = [ + id, + authority_type, + authority_url, + discovery_date, + fetcher_name, + fetcher_version, + format, + metadata, + ] + + params.extend( + context.get(key) for key in extrinsic_metadata.CONTEXT_KEYS[object_type] ) + return self._execute_with_retries(statement, params,) + @_prepared_statement( - "SELECT * from origin_metadata " - "WHERE origin=? AND authority_url=? AND discovery_date>? " - "AND authority_type=?" + "SELECT * from object_metadata " + "WHERE id=? AND authority_url=? AND discovery_date>? AND authority_type=?" ) - def origin_metadata_get_after_date( - self, origin, authority_type, authority_url, after, *, statement + def object_metadata_get_after_date( + self, id: str, authority_type, authority_url, after, *, statement ): return self._execute_with_retries( - statement, [origin, authority_url, after, authority_type] + statement, [id, authority_url, after, authority_type] ) @_prepared_statement( - "SELECT * from origin_metadata " - "WHERE origin=? AND authority_type=? AND authority_url=? " + "SELECT * from object_metadata " + "WHERE id=? AND authority_type=? AND authority_url=? " "AND (discovery_date, fetcher_name, fetcher_version) > (?, ?, ?)" ) - def origin_metadata_get_after_date_and_fetcher( + def object_metadata_get_after_date_and_fetcher( self, - origin, + id, authority_type, authority_url, after_date, @@ -869,7 +885,7 @@ return self._execute_with_retries( statement, [ - origin, + id, authority_type, authority_url, after_date, @@ -879,14 +895,14 @@ ) @_prepared_statement( - "SELECT * from origin_metadata " - "WHERE origin=? AND authority_url=? AND authority_type=?" + "SELECT * from object_metadata " + "WHERE id=? AND authority_url=? AND authority_type=?" ) - def origin_metadata_get( - self, origin, authority_type, authority_url, *, statement + def object_metadata_get( + self, id, authority_type, authority_url, *, statement ) -> Iterable[Row]: return self._execute_with_retries( - statement, [origin, authority_url, authority_type] + statement, [id, authority_url, authority_type] ) ########################## diff --git a/swh/storage/cassandra/schema.py b/swh/storage/cassandra/schema.py --- a/swh/storage/cassandra/schema.py +++ b/swh/storage/cassandra/schema.py @@ -30,29 +30,34 @@ $$ ; + CREATE OR REPLACE AGGREGATE ascii_bins_count ( ascii ) SFUNC ascii_bins_count_sfunc STYPE tuple> INITCOND (0, {}) ; + CREATE TYPE IF NOT EXISTS microtimestamp ( seconds bigint, microseconds int ); + CREATE TYPE IF NOT EXISTS microtimestamp_with_timezone ( timestamp frozen, offset smallint, negative_utc boolean ); + CREATE TYPE IF NOT EXISTS person ( fullname blob, name blob, email blob ); + CREATE TABLE IF NOT EXISTS content ( sha1 blob, sha1_git blob, @@ -65,6 +70,7 @@ PRIMARY KEY ((sha1, sha1_git, sha256, blake2s256)) ); + CREATE TABLE IF NOT EXISTS skipped_content ( sha1 blob, sha1_git blob, @@ -79,6 +85,7 @@ PRIMARY KEY ((sha1, sha1_git, sha256, blake2s256)) ); + CREATE TABLE IF NOT EXISTS revision ( id blob PRIMARY KEY, date microtimestamp_with_timezone, @@ -95,6 +102,7 @@ -- extra commit information, etc...) ); + CREATE TABLE IF NOT EXISTS revision_parent ( id blob, parent_rank int, @@ -103,6 +111,7 @@ PRIMARY KEY ((id), parent_rank) ); + CREATE TABLE IF NOT EXISTS release ( id blob PRIMARY KEY, @@ -116,10 +125,12 @@ -- true iff release has been created by Software Heritage ); + CREATE TABLE IF NOT EXISTS directory ( id blob PRIMARY KEY, ); + CREATE TABLE IF NOT EXISTS directory_entry ( directory_id blob, name blob, -- path name, relative to containing dir @@ -129,10 +140,12 @@ PRIMARY KEY ((directory_id), name) ); + CREATE TABLE IF NOT EXISTS snapshot ( id blob PRIMARY KEY, ); + -- For a given snapshot_id, branches are sorted by their name, -- allowing easy pagination. CREATE TABLE IF NOT EXISTS snapshot_branch ( @@ -143,6 +156,7 @@ PRIMARY KEY ((snapshot_id), name) ); + CREATE TABLE IF NOT EXISTS origin_visit ( origin text, visit bigint, @@ -154,6 +168,7 @@ PRIMARY KEY ((origin), visit) ); + CREATE TABLE IF NOT EXISTS origin_visit_status ( origin text, visit bigint, @@ -164,6 +179,7 @@ PRIMARY KEY ((origin), visit, date) ); + CREATE TABLE IF NOT EXISTS origin ( sha1 blob PRIMARY KEY, url text, @@ -191,20 +207,32 @@ ); -CREATE TABLE IF NOT EXISTS origin_metadata ( - origin text, +CREATE TABLE IF NOT EXISTS object_metadata ( + -- object identifier + id text, + + -- metadata source authority_type text, authority_url text, discovery_date timestamp, fetcher_name ascii, fetcher_version ascii, + + -- metadata itself format ascii, metadata blob, - PRIMARY KEY ((origin), authority_type, authority_url, discovery_date, - fetcher_name, fetcher_version), - -- for now, authority_url could be in the partition key; but leaving - -- in the partition key allows listing authorities with metadata on an - -- origin if we ever need to do it. + + -- context + origin text, + visit bigint, + snapshot text, + release text, + revision text, + path blob, + directory text, + + PRIMARY KEY ((id), authority_type, authority_url, discovery_date, + fetcher_name, fetcher_version) ); @@ -215,7 +243,7 @@ PRIMARY KEY ((partition_key), object_type) ); """.split( - "\n\n" + "\n\n\n" ) CONTENT_INDEX_TEMPLATE = """ @@ -236,7 +264,7 @@ TABLES = ( "skipped_content content revision revision_parent release " "directory directory_entry snapshot snapshot_branch " - "origin_visit origin origin_metadata object_count " + "origin_visit origin object_metadata object_count " "origin_visit_status metadata_authority " "metadata_fetcher" ).split() diff --git a/swh/storage/cassandra/storage.py b/swh/storage/cassandra/storage.py --- a/swh/storage/cassandra/storage.py +++ b/swh/storage/cassandra/storage.py @@ -8,7 +8,7 @@ import json import random import re -from typing import Any, Dict, List, Iterable, Optional +from typing import Any, Dict, List, Iterable, Optional, Union import attr @@ -31,6 +31,7 @@ from swh.storage.utils import now from ..exc import StorageArgumentException, HashCollision +from ..extrinsic_metadata import check_extrinsic_metadata_context, CONTEXT_KEYS from .common import TOKEN_BEGIN, TOKEN_END from .converters import ( revision_to_db, @@ -258,6 +259,39 @@ [{"sha1_git": c for c in contents}], key_hash="sha1_git" ) + def content_metadata_add( + self, + id: str, + context: Dict[str, Union[str, bytes, int]], + discovery_date: datetime.datetime, + authority: Dict[str, Any], + fetcher: Dict[str, Any], + format: str, + metadata: bytes, + ): + self._object_metadata_add( + "content", + id, + discovery_date, + authority, + fetcher, + format, + metadata, + context, + ) + + def content_metadata_get( + self, + id: str, + authority: Dict[str, str], + after: Optional[datetime.datetime] = None, + page_token: Optional[bytes] = None, + limit: int = 1000, + ): + return self._object_metadata_get( + "content", id, authority, after, page_token, limit, + ) + def content_get_random(self): return self._cql_runner.content_get_random().sha1_git @@ -1011,14 +1045,61 @@ raise StorageArgumentException( "origin_id must be str, not %r" % (origin_url,) ) + + context: Dict[str, Union[str, bytes, int]] = {} # origins have no context + + self._object_metadata_add( + "origin", + origin_url, + discovery_date, + authority, + fetcher, + format, + metadata, + context, + ) + + def origin_metadata_get( + self, + origin_url: str, + authority: Dict[str, str], + after: Optional[datetime.datetime] = None, + page_token: Optional[bytes] = None, + limit: int = 1000, + ) -> Dict[str, Any]: + if not isinstance(origin_url, str): + raise TypeError("origin_url must be str, not %r" % (origin_url,)) + + res = self._object_metadata_get( + "origin", origin_url, authority, after, page_token, limit + ) + for result in res["results"]: + result["origin_url"] = result.pop("id") + + return res + + def _object_metadata_add( + self, + object_type: str, + id: str, + discovery_date: datetime.datetime, + authority: Dict[str, Any], + fetcher: Dict[str, Any], + format: str, + metadata: bytes, + context: Dict[str, Union[str, bytes, int]], + ) -> None: + check_extrinsic_metadata_context(object_type, context) + if not self._cql_runner.metadata_authority_get(**authority): raise StorageArgumentException(f"Unknown authority {authority}") if not self._cql_runner.metadata_fetcher_get(**fetcher): raise StorageArgumentException(f"Unknown fetcher {fetcher}") try: - self._cql_runner.origin_metadata_add( - origin_url, + self._cql_runner.object_metadata_add( + object_type, + id, authority["type"], authority["url"], discovery_date, @@ -1026,21 +1107,20 @@ fetcher["version"], format, metadata, + context, ) except TypeError as e: raise StorageArgumentException(*e.args) - def origin_metadata_get( + def _object_metadata_get( self, - origin_url: str, + object_type: str, + id: str, authority: Dict[str, str], after: Optional[datetime.datetime] = None, page_token: Optional[bytes] = None, limit: int = 1000, ) -> Dict[str, Any]: - if not isinstance(origin_url, str): - raise TypeError("origin_url must be str, not %r" % (origin_url,)) - if page_token is not None: (after_date, after_fetcher_name, after_fetcher_url) = msgpack_loads( page_token @@ -1049,8 +1129,8 @@ raise StorageArgumentException( "page_token is inconsistent with the value of 'after'." ) - entries = self._cql_runner.origin_metadata_get_after_date_and_fetcher( - origin_url, + entries = self._cql_runner.object_metadata_get_after_date_and_fetcher( + id, authority["type"], authority["url"], after_date, @@ -1058,12 +1138,12 @@ after_fetcher_url, ) elif after is not None: - entries = self._cql_runner.origin_metadata_get_after_date( - origin_url, authority["type"], authority["url"], after + entries = self._cql_runner.object_metadata_get_after_date( + id, authority["type"], authority["url"], after ) else: - entries = self._cql_runner.origin_metadata_get( - origin_url, authority["type"], authority["url"] + entries = self._cql_runner.object_metadata_get( + id, authority["type"], authority["url"] ) if limit: @@ -1072,22 +1152,31 @@ results = [] for entry in entries: discovery_date = entry.discovery_date.replace(tzinfo=datetime.timezone.utc) - results.append( - { - "origin_url": entry.origin, - "authority": { - "type": entry.authority_type, - "url": entry.authority_url, - }, - "fetcher": { - "name": entry.fetcher_name, - "version": entry.fetcher_version, - }, - "discovery_date": discovery_date, - "format": entry.format, - "metadata": entry.metadata, - } - ) + + result = { + "id": entry.id, + "authority": { + "type": entry.authority_type, + "url": entry.authority_url, + }, + "fetcher": { + "name": entry.fetcher_name, + "version": entry.fetcher_version, + }, + "discovery_date": discovery_date, + "format": entry.format, + "metadata": entry.metadata, + } + + if CONTEXT_KEYS[object_type]: + context = {} + for key in CONTEXT_KEYS[object_type]: + value = getattr(entry, key) + if value is not None: + context[key] = value + result["context"] = context + + results.append(result) if len(results) > limit: results.pop() diff --git a/swh/storage/db.py b/swh/storage/db.py --- a/swh/storage/db.py +++ b/swh/storage/db.py @@ -6,7 +6,7 @@ import datetime import random import select -from typing import Any, Dict, Iterable, List, Optional, Tuple +from typing import Any, Dict, Iterable, List, Optional, Tuple, Union from swh.core.db import BaseDb from swh.core.db.db_utils import stored_procedure, jsonize @@ -1085,74 +1085,106 @@ def release_get_random(self, cur=None): return self._get_random_row_from_table("release", ["id"], "id", cur) - origin_metadata_get_cols = [ - "origin.url", + _object_metadata_context_cols = [ + "origin", + "visit", + "snapshot", + "release", + "revision", + "path", + "directory", + ] + """The list of context columns for all artifact types.""" + + _object_metadata_insert_cols = [ + "id", + "authority_id", + "fetcher_id", + "discovery_date", + "format", + "metadata", + *_object_metadata_context_cols, + ] + """List of columns of the object_metadata table, used when writing + metadata.""" + + _object_metadata_insert_query = f""" + INSERT INTO object_metadata + ({', '.join(_object_metadata_insert_cols)}) + VALUES ({', '.join('%s' for _ in _object_metadata_insert_cols)}) + ON CONFLICT (id, authority_id, discovery_date, fetcher_id) + DO UPDATE SET + format=EXCLUDED.format, + metadata=EXCLUDED.metadata + """ + + object_metadata_get_cols = [ + "id", "discovery_date", "metadata_authority.type", "metadata_authority.url", "metadata_fetcher.id", "metadata_fetcher.name", "metadata_fetcher.version", + *_object_metadata_context_cols, "format", "metadata", ] + """List of columns of the object_metadata, metadata_authority, + and metadata_fetcher tables, used when reading object metadata.""" + + _object_metadata_select_query = f""" + SELECT + object_metadata.id AS id, + {', '.join(object_metadata_get_cols[1:-1])}, + object_metadata.metadata AS metadata + FROM object_metadata + INNER JOIN metadata_authority + ON (metadata_authority.id=authority_id) + INNER JOIN metadata_fetcher ON (metadata_fetcher.id=fetcher_id) + WHERE object_metadata.id=%s AND authority_id=%s + """ - def origin_metadata_add( + def object_metadata_add( self, - origin: str, + object_type: str, + id: str, + context: Dict[str, Union[str, bytes, int]], discovery_date: datetime.datetime, - authority: int, - fetcher: int, + authority_id: int, + fetcher_id: int, format: str, metadata: bytes, - cur=None, - ) -> None: - """ Add an origin_metadata for the origin at ts with provider, tool and - metadata. - - Args: - origin: the origin's id for which the metadata is added - discovery_date: time when the metadata was found - authority: the metadata provider identifier - fetcher: the tool's identifier used to extract metadata - format: the format of the metadata - metadata: the metadata retrieved at the time and location - """ - cur = self._cursor(cur) - insert = """INSERT INTO origin_metadata (origin_id, discovery_date, - authority_id, fetcher_id, format, metadata) - SELECT id, %s, %s, %s, %s, %s FROM origin WHERE url = %s - ON CONFLICT (origin_id, authority_id, discovery_date, fetcher_id) - DO UPDATE SET - format=EXCLUDED.format, - metadata=EXCLUDED.metadata - """ - cur.execute( - insert, (discovery_date, authority, fetcher, format, metadata, origin), + cur, + ): + query = self._object_metadata_insert_query + args: Dict[str, Any] = dict( + id=id, + authority_id=authority_id, + fetcher_id=fetcher_id, + discovery_date=discovery_date, + format=format, + metadata=metadata, ) + for col in self._object_metadata_context_cols: + args[col] = context.get(col) - def origin_metadata_get( + params = [args[col] for col in self._object_metadata_insert_cols] + + cur.execute(query, params) + + def object_metadata_get( self, - origin_url: str, - authority: int, + object_type: str, + id: str, + authority_id: int, after_time: Optional[datetime.datetime], after_fetcher: Optional[int], - limit: Optional[int], - cur=None, + limit: int, + cur, ): - cur = self._cursor(cur) - assert self.origin_metadata_get_cols[-1] == "metadata" - query_parts = [ - f"SELECT {', '.join(self.origin_metadata_get_cols[0:-1])}, " - f" origin_metadata.metadata AS metadata " - f"FROM origin_metadata " - f"INNER JOIN metadata_authority " - f" ON (metadata_authority.id=authority_id) " - f"INNER JOIN metadata_fetcher ON (metadata_fetcher.id=fetcher_id) " - f"INNER JOIN origin ON (origin.id=origin_metadata.origin_id) " - f"WHERE origin.url=%s AND authority_id=%s " - ] - args = [origin_url, authority] + query_parts = [self._object_metadata_select_query] + args = [id, authority_id] if after_fetcher is not None: assert after_time diff --git a/swh/storage/extrinsic_metadata.py b/swh/storage/extrinsic_metadata.py new file mode 100644 --- /dev/null +++ b/swh/storage/extrinsic_metadata.py @@ -0,0 +1,57 @@ +# Copyright (C) 2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from typing import Any, cast, Dict + +from swh.model.identifiers import PersistentId, parse_persistent_identifier + +from .exc import StorageArgumentException + +CONTEXT_KEYS: Dict[str, Dict[str, type]] = {} +CONTEXT_KEYS["origin"] = {} +CONTEXT_KEYS["snapshot"] = {"origin": str, "visit": int} +CONTEXT_KEYS["release"] = {**CONTEXT_KEYS["snapshot"], "snapshot": PersistentId} +CONTEXT_KEYS["revision"] = {**CONTEXT_KEYS["release"], "release": PersistentId} +CONTEXT_KEYS["directory"] = { + **CONTEXT_KEYS["revision"], + "revision": PersistentId, + "path": bytes, +} +CONTEXT_KEYS["content"] = {**CONTEXT_KEYS["directory"], "directory": PersistentId} + +ALL_CONTEXT_KEYS = set(CONTEXT_KEYS["content"]) + + +def check_extrinsic_metadata_context(object_type: str, context: Dict[str, Any]): + key_types = CONTEXT_KEYS[object_type] + + extra_keys = set(context) - set(key_types) + if extra_keys: + raise StorageArgumentException(f"Unknown context keys: {', '.join(extra_keys)}") + + for (key, value) in context.items(): + expected_type = key_types[key] + expected_type_str = str(expected_type) # for display + + # If an SWHID is expected and a string is given, parse it + if expected_type is PersistentId and isinstance(value, str): + value = parse_persistent_identifier(value) + expected_type_str = "PersistentId or str" + + # Check the type of the context value + if not isinstance(value, expected_type): + raise StorageArgumentException( + f"Context key {key} must have type {expected_type_str}, " + f"but is {value!r}" + ) + + # If it is an SWHID, check it is also a core SWHID. + if expected_type is PersistentId: + value = cast(PersistentId, value) + if value.metadata != {}: + raise StorageArgumentException( + f"Context key {key} must be a core SWHID, " + f"but it has qualifiers {', '.join(value.metadata)}." + ) diff --git a/swh/storage/sql/30-swh-schema.sql b/swh/storage/sql/30-swh-schema.sql --- a/swh/storage/sql/30-swh-schema.sql +++ b/swh/storage/sql/30-swh-schema.sql @@ -37,6 +37,9 @@ -- a set of UNIX-like access permissions, as manipulated by, e.g., chmod create domain file_perms as int; +-- an SWHID +create domain swhid as text check (value ~ '^swh:[0-9]+:.*'); + -- Checksums about actual file content. Note that the content itself is not -- stored in the DB, but on external (key-value) storage. A single checksum is @@ -66,6 +69,32 @@ comment on column content.object_id is 'Content identifier'; +-- Extrinsic metadata on a content object. +create table content_metadata +( + -- content identifier + id swhid not null, + + -- metadata source + authority_id bigint not null, + fetcher_id bigint not null, + discovery_date timestamptz not null, + + -- metadata itself + format text not null, + metadata bytea not null, + + -- context + origin text, + visit bigint, + snapshot swhid, + release swhid, + revision swhid, + path bytea, + directory swhid +); + + -- An origin is a place, identified by an URL, where software source code -- artifacts can be found. We support different kinds of origins, e.g., git and -- other VCS repositories, web pages that list tarballs URLs (e.g., @@ -427,27 +456,38 @@ comment on column metadata_authority.metadata is 'Other metadata about authority'; --- Discovery of metadata during a listing, loading, deposit or external_catalog of an origin --- also provides a translation to a defined json schema using a translation tool (tool_id) -create table origin_metadata +-- Extrinsic metadata on a DAG objects and origins. +create table object_metadata ( - id bigserial not null, -- PK internal object identifier - origin_id bigint not null, -- references origin(id) - discovery_date timestamptz not null, -- when it was extracted + -- object identifier + id text not null, + + -- metadata source authority_id bigint not null, fetcher_id bigint not null, - format text not null default 'sword-v2-atom-codemeta-v2-in-json', - metadata bytea not null + discovery_date timestamptz not null, + + -- metadata itself + format text not null, + metadata bytea not null, + + -- context + origin text, + visit bigint, + snapshot swhid, + release swhid, + revision swhid, + path bytea, + directory swhid ); -comment on table origin_metadata is 'keeps all metadata found concerning an origin'; -comment on column origin_metadata.id is 'the origin_metadata object''s id'; -comment on column origin_metadata.origin_id is 'the origin id for which the metadata was found'; -comment on column origin_metadata.discovery_date is 'the date of retrieval'; -comment on column origin_metadata.authority_id is 'the metadata provider: github, openhub, deposit, etc.'; -comment on column origin_metadata.fetcher_id is 'the tool used for extracting metadata: loaders, crawlers, etc.'; -comment on column origin_metadata.format is 'name of the format of metadata, used by readers to interpret it.'; -comment on column origin_metadata.metadata is 'original metadata in opaque format'; +comment on table object_metadata is 'keeps all metadata found concerning an object'; +comment on column object_metadata.id is 'the SWHID or origin URL for which the metadata was found'; +comment on column object_metadata.discovery_date is 'the date of retrieval'; +comment on column object_metadata.authority_id is 'the metadata provider: github, openhub, deposit, etc.'; +comment on column object_metadata.fetcher_id is 'the tool used for extracting metadata: loaders, crawlers, etc.'; +comment on column object_metadata.format is 'name of the format of metadata, used by readers to interpret it.'; +comment on column object_metadata.metadata is 'original metadata in opaque format'; -- Keep a cache of object counts diff --git a/swh/storage/sql/60-swh-indexes.sql b/swh/storage/sql/60-swh-indexes.sql --- a/swh/storage/sql/60-swh-indexes.sql +++ b/swh/storage/sql/60-swh-indexes.sql @@ -168,20 +168,14 @@ create unique index metadata_authority_type_url on metadata_authority(type, url); --- origin_metadata -create unique index concurrently origin_metadata_pkey on origin_metadata(id); -alter table origin_metadata add primary key using index origin_metadata_pkey; +-- object_metadata +create unique index concurrently object_metadata_content_authority_date_fetcher on object_metadata(id, authority_id, discovery_date, fetcher_id); -create unique index concurrently origin_metadata_origin_authority_date_fetcher on origin_metadata(origin_id, authority_id, discovery_date, fetcher_id); +alter table object_metadata add constraint object_metadata_authority_fkey foreign key (authority_id) references metadata_authority(id) not valid; +alter table object_metadata validate constraint object_metadata_authority_fkey; -alter table origin_metadata add constraint origin_metadata_origin_fkey foreign key (origin_id) references origin(id) not valid; -alter table origin_metadata validate constraint origin_metadata_origin_fkey; - -alter table origin_metadata add constraint origin_metadata_authority_fkey foreign key (authority_id) references metadata_authority(id) not valid; -alter table origin_metadata validate constraint origin_metadata_authority_fkey; - -alter table origin_metadata add constraint origin_metadata_fetcher_fkey foreign key (fetcher_id) references metadata_fetcher(id) not valid; -alter table origin_metadata validate constraint origin_metadata_fetcher_fkey; +alter table object_metadata add constraint object_metadata_fetcher_fkey foreign key (fetcher_id) references metadata_fetcher(id) not valid; +alter table object_metadata validate constraint object_metadata_fetcher_fkey; -- object_counts create unique index concurrently object_counts_pkey on object_counts(object_type); diff --git a/swh/storage/storage.py b/swh/storage/storage.py --- a/swh/storage/storage.py +++ b/swh/storage/storage.py @@ -9,7 +9,7 @@ from collections import defaultdict from contextlib import contextmanager -from typing import Any, Dict, Iterable, List, Optional +from typing import Any, Dict, Iterable, List, Optional, Union import attr import psycopg2 @@ -35,6 +35,11 @@ from swh.storage.utils import now from . import converters +from .extrinsic_metadata import ( + check_extrinsic_metadata_context, + CONTEXT_KEYS, + ALL_CONTEXT_KEYS, +) from .common import db_transaction_generator, db_transaction from .db import Db from .exc import StorageArgumentException, StorageDBError, HashCollision @@ -359,6 +364,49 @@ ) return [dict(zip(db.content_find_cols, content)) for content in contents] + @timed + @db_transaction() + def content_metadata_add( + self, + id: str, + context: Dict[str, Union[str, bytes, int]], + discovery_date: datetime.datetime, + authority: Dict[str, Any], + fetcher: Dict[str, Any], + format: str, + metadata: bytes, + db=None, + cur=None, + ): + self._object_metadata_add( + "content", + id, + context, + discovery_date, + authority, + fetcher, + format, + metadata, + db, + cur, + ) + + @timed + @db_transaction() + def content_metadata_get( + self, + id: str, + authority: Dict[str, str], + after: Optional[datetime.datetime] = None, + page_token: Optional[bytes] = None, + limit: int = 1000, + db=None, + cur=None, + ): + return self._object_metadata_get( + "content", id, authority, after, page_token, limit, db, cur + ) + @timed @db_transaction() def content_get_random(self, db=None, cur=None): @@ -1152,29 +1200,64 @@ db=None, cur=None, ) -> None: - authority_id = db.metadata_authority_get_id( - authority["type"], authority["url"], cur - ) - if not authority_id: - raise StorageArgumentException(f"Unknown authority {authority}") - fetcher_id = db.metadata_fetcher_get_id( - fetcher["name"], fetcher["version"], cur + origin_id = next(iter(list(db.origin_id_get_by_url([origin_url], cur))), None) + if origin_id is None: + raise StorageArgumentException(f"Unknown origin {origin_url}") + + context: Dict[str, Union[str, bytes, int]] = {} # origins have no context + + self._object_metadata_add( + "origin", + origin_url, + context, + discovery_date, + authority, + fetcher, + format, + metadata, + db, + cur, ) - if not fetcher_id: - raise StorageArgumentException(f"Unknown fetcher {fetcher}") - try: - db.origin_metadata_add( - origin_url, - discovery_date, - authority_id, - fetcher_id, - format, - metadata, - cur, + + def _object_metadata_add( + self, + object_type: str, + id: str, + context: Dict[str, Union[str, bytes, int]], + discovery_date: datetime.datetime, + authority: Dict[str, Any], + fetcher: Dict[str, Any], + format: str, + metadata: bytes, + db, + cur, + ) -> None: + check_extrinsic_metadata_context(object_type, context) + + authority_id = self._get_authority_id(authority, db, cur) + fetcher_id = self._get_fetcher_id(fetcher, db, cur) + if not isinstance(metadata, bytes): + raise StorageArgumentException( + "metadata must be bytes, not %r" % (metadata,) ) - except psycopg2.ProgrammingError as e: - raise StorageArgumentException(*e.args) - send_metric("origin_metadata:add", count=1, method_name="origin_metadata_add") + + db.object_metadata_add( + object_type, + id, + context, + discovery_date, + authority_id, + fetcher_id, + format, + metadata, + cur, + ) + + send_metric( + f"{object_type}_metadata:add", + count=1, + method_name=f"{object_type}_metadata_add", + ) @timed @db_transaction(statement_timeout=500) @@ -1187,6 +1270,31 @@ limit: int = 1000, db=None, cur=None, + ) -> Dict[str, Any]: + origin_id = next(iter(list(db.origin_id_get_by_url([origin_url], cur))), None) + if origin_id is None: + raise StorageArgumentException(f"Unknown origin {origin_url}") + + result = self._object_metadata_get( + "origin", origin_url, authority, after, page_token, limit, db, cur + ) + + for res in result["results"]: + res.pop("id") + res["origin_url"] = origin_url + + return result + + def _object_metadata_get( + self, + object_type: str, + id: str, + authority: Dict[str, str], + after: Optional[datetime.datetime], + page_token: Optional[bytes], + limit: int, + db, + cur, ) -> Dict[str, Any]: if page_token: (after_time, after_fetcher) = msgpack_loads(page_token) @@ -1207,28 +1315,37 @@ "results": [], } - rows = db.origin_metadata_get( - origin_url, authority_id, after_time, after_fetcher, limit + 1, cur + rows = db.object_metadata_get( + object_type, id, authority_id, after_time, after_fetcher, limit + 1, cur ) - rows = [dict(zip(db.origin_metadata_get_cols, row)) for row in rows] + rows = [dict(zip(db.object_metadata_get_cols, row)) for row in rows] results = [] for row in rows: row = row.copy() row.pop("metadata_fetcher.id") - results.append( - { - "origin_url": row.pop("origin.url"), - "authority": { - "type": row.pop("metadata_authority.type"), - "url": row.pop("metadata_authority.url"), - }, - "fetcher": { - "name": row.pop("metadata_fetcher.name"), - "version": row.pop("metadata_fetcher.version"), - }, - **row, - } - ) + context = {} + for key in ALL_CONTEXT_KEYS: + value = row.pop(key) + if key in CONTEXT_KEYS[object_type]: + if value is not None: + context[key] = value + + result = { + "authority": { + "type": row.pop("metadata_authority.type"), + "url": row.pop("metadata_authority.url"), + }, + "fetcher": { + "name": row.pop("metadata_fetcher.name"), + "version": row.pop("metadata_fetcher.version"), + }, + **row, + } + + if CONTEXT_KEYS[object_type]: + result["context"] = context + + results.append(result) if len(results) > limit: results.pop() @@ -1304,3 +1421,19 @@ def flush(self, object_types: Optional[Iterable[str]] = None) -> Dict: return {} + + def _get_authority_id(self, authority: Dict[str, Any], db, cur): + authority_id = db.metadata_authority_get_id( + authority["type"], authority["url"], cur + ) + if not authority_id: + raise StorageArgumentException(f"Unknown authority {authority}") + return authority_id + + def _get_fetcher_id(self, fetcher: Dict[str, Any], db, cur): + fetcher_id = db.metadata_fetcher_get_id( + fetcher["name"], fetcher["version"], cur + ) + if not fetcher_id: + raise StorageArgumentException(f"Unknown fetcher {fetcher}") + return fetcher_id diff --git a/swh/storage/tests/storage_data.py b/swh/storage/tests/storage_data.py --- a/swh/storage/tests/storage_data.py +++ b/swh/storage/tests/storage_data.py @@ -4,7 +4,7 @@ # See top-level LICENSE file for more information import datetime -from swh.model.hashutil import hash_to_bytes +from swh.model.hashutil import hash_to_bytes, hash_to_hex from swh.model import from_disk @@ -472,6 +472,65 @@ snapshots = (snapshot, empty_snapshot, complete_snapshot) +content_metadata = { + "id": f"swh:1:cnt:{cont['sha1_git']}", + "context": {"origin": origin["url"]}, + "discovery_date": datetime.datetime( + 2015, 1, 1, 21, 0, 0, tzinfo=datetime.timezone.utc + ), + "authority": { + "type": metadata_authority["type"], + "url": metadata_authority["url"], + }, + "fetcher": { + "name": metadata_fetcher["name"], + "version": metadata_fetcher["version"], + }, + "format": "json", + "metadata": b'{"foo": "bar"}', +} +content_metadata2 = { + "id": f"swh:1:cnt:{cont['sha1_git']}", + "context": {"origin": origin2["url"]}, + "discovery_date": datetime.datetime( + 2017, 1, 1, 22, 0, 0, tzinfo=datetime.timezone.utc + ), + "authority": { + "type": metadata_authority["type"], + "url": metadata_authority["url"], + }, + "fetcher": { + "name": metadata_fetcher["name"], + "version": metadata_fetcher["version"], + }, + "format": "yaml", + "metadata": b"foo: bar", +} +content_metadata3 = { + "id": f"swh:1:cnt:{cont['sha1_git']}", + "context": { + "origin": origin["url"], + "visit": 42, + "snapshot": f"swh:1:snp:{hash_to_hex(snapshot['id'])}", + "release": f"swh:1:rel:{hash_to_hex(release['id'])}", + "revision": f"swh:1:rev:{hash_to_hex(revision['id'])}", + "directory": f"swh:1:dir:{hash_to_hex(dir['id'])}", + "path": b"/foo/bar", + }, + "discovery_date": datetime.datetime( + 2017, 1, 1, 22, 0, 0, tzinfo=datetime.timezone.utc + ), + "authority": { + "type": metadata_authority2["type"], + "url": metadata_authority2["url"], + }, + "fetcher": { + "name": metadata_fetcher2["name"], + "version": metadata_fetcher2["version"], + }, + "format": "yaml", + "metadata": b"foo: bar", +} origin_metadata = { "origin_url": origin["url"], diff --git a/swh/storage/tests/test_storage.py b/swh/storage/tests/test_storage.py --- a/swh/storage/tests/test_storage.py +++ b/swh/storage/tests/test_storage.py @@ -3168,6 +3168,190 @@ with pytest.raises(StorageArgumentException): swh_storage.content_find({"unknown-sha1": "something"}) # not the right key + def test_content_metadata_add(self, swh_storage): + content = data.cont + fetcher = data.metadata_fetcher + authority = data.metadata_authority + content_swhid = f"swh:1:cnt:{content['sha1_git']}" + + swh_storage.metadata_fetcher_add(**fetcher) + swh_storage.metadata_authority_add(**authority) + + swh_storage.content_metadata_add(**data.content_metadata) + swh_storage.content_metadata_add(**data.content_metadata2) + + result = swh_storage.content_metadata_get(content_swhid, authority) + assert result["next_page_token"] is None + assert [data.content_metadata, data.content_metadata2] == list( + sorted(result["results"], key=lambda x: x["discovery_date"],) + ) + + def test_content_metadata_add_duplicate(self, swh_storage): + """Duplicates should be silently updated.""" + content = data.cont + fetcher = data.metadata_fetcher + authority = data.metadata_authority + content_swhid = f"swh:1:cnt:{content['sha1_git']}" + + new_content_metadata2 = { + **data.content_metadata2, + "format": "new-format", + "metadata": b"new-metadata", + } + + swh_storage.metadata_fetcher_add(**fetcher) + swh_storage.metadata_authority_add(**authority) + + swh_storage.content_metadata_add(**data.content_metadata) + swh_storage.content_metadata_add(**data.content_metadata2) + swh_storage.content_metadata_add(**new_content_metadata2) + + result = swh_storage.content_metadata_get(content_swhid, authority) + assert result["next_page_token"] is None + assert [data.content_metadata, new_content_metadata2] == list( + sorted(result["results"], key=lambda x: x["discovery_date"],) + ) + + def test_content_metadata_add_dict(self, swh_storage): + fetcher = data.metadata_fetcher + authority = data.metadata_authority + + swh_storage.metadata_fetcher_add(**fetcher) + swh_storage.metadata_authority_add(**authority) + + kwargs = data.content_metadata.copy() + kwargs["metadata"] = {"foo": "bar"} + + with pytest.raises(StorageArgumentException): + swh_storage.content_metadata_add(**kwargs) + + def test_content_metadata_get(self, swh_storage): + authority = data.metadata_authority + fetcher = data.metadata_fetcher + authority2 = data.metadata_authority2 + fetcher2 = data.metadata_fetcher2 + content1_swhid = f"swh:1:cnt:{data.cont['sha1_git']}" + content2_swhid = f"swh:1:cnt:{data.cont2['sha1_git']}" + + content1_metadata1 = data.content_metadata + content1_metadata2 = data.content_metadata2 + content1_metadata3 = data.content_metadata3 + content2_metadata = {**data.content_metadata2, "id": content2_swhid} + + swh_storage.metadata_authority_add(**authority) + swh_storage.metadata_fetcher_add(**fetcher) + swh_storage.metadata_authority_add(**authority2) + swh_storage.metadata_fetcher_add(**fetcher2) + + swh_storage.content_metadata_add(**content1_metadata1) + swh_storage.content_metadata_add(**content1_metadata2) + swh_storage.content_metadata_add(**content1_metadata3) + swh_storage.content_metadata_add(**content2_metadata) + + result = swh_storage.content_metadata_get(content1_swhid, authority) + assert result["next_page_token"] is None + assert [content1_metadata1, content1_metadata2] == list( + sorted(result["results"], key=lambda x: x["discovery_date"],) + ) + + result = swh_storage.content_metadata_get(content1_swhid, authority2) + assert result["next_page_token"] is None + assert [content1_metadata3] == list( + sorted(result["results"], key=lambda x: x["discovery_date"],) + ) + + result = swh_storage.content_metadata_get(content2_swhid, authority) + assert result["next_page_token"] is None + assert [content2_metadata] == list(result["results"],) + + def test_content_metadata_get_after(self, swh_storage): + content = data.cont + fetcher = data.metadata_fetcher + authority = data.metadata_authority + content_swhid = f"swh:1:cnt:{content['sha1_git']}" + + swh_storage.metadata_fetcher_add(**fetcher) + swh_storage.metadata_authority_add(**authority) + + swh_storage.content_metadata_add(**data.content_metadata) + swh_storage.content_metadata_add(**data.content_metadata2) + + result = swh_storage.content_metadata_get( + content_swhid, + authority, + after=data.content_metadata["discovery_date"] - timedelta(seconds=1), + ) + assert result["next_page_token"] is None + assert [data.content_metadata, data.content_metadata2] == list( + sorted(result["results"], key=lambda x: x["discovery_date"],) + ) + + result = swh_storage.content_metadata_get( + content_swhid, authority, after=data.content_metadata["discovery_date"] + ) + assert result["next_page_token"] is None + assert [data.content_metadata2] == result["results"] + + result = swh_storage.content_metadata_get( + content_swhid, authority, after=data.content_metadata2["discovery_date"] + ) + assert result["next_page_token"] is None + assert [] == result["results"] + + def test_content_metadata_get_paginate(self, swh_storage): + content = data.cont + fetcher = data.metadata_fetcher + authority = data.metadata_authority + content_swhid = f"swh:1:cnt:{content['sha1_git']}" + + swh_storage.metadata_fetcher_add(**fetcher) + swh_storage.metadata_authority_add(**authority) + + swh_storage.content_metadata_add(**data.content_metadata) + swh_storage.content_metadata_add(**data.content_metadata2) + + swh_storage.content_metadata_get(content_swhid, authority) + + result = swh_storage.content_metadata_get(content_swhid, authority, limit=1) + assert result["next_page_token"] is not None + assert [data.content_metadata] == result["results"] + + result = swh_storage.content_metadata_get( + content_swhid, authority, limit=1, page_token=result["next_page_token"] + ) + assert result["next_page_token"] is None + assert [data.content_metadata2] == result["results"] + + def test_content_metadata_get_paginate_same_date(self, swh_storage): + content = data.cont + fetcher1 = data.metadata_fetcher + fetcher2 = data.metadata_fetcher2 + authority = data.metadata_authority + content_swhid = f"swh:1:cnt:{content['sha1_git']}" + + swh_storage.metadata_fetcher_add(**fetcher1) + swh_storage.metadata_fetcher_add(**fetcher2) + swh_storage.metadata_authority_add(**authority) + + content_metadata2 = { + **data.content_metadata2, + "discovery_date": data.content_metadata2["discovery_date"], + "fetcher": {"name": fetcher2["name"], "version": fetcher2["version"],}, + } + + swh_storage.content_metadata_add(**data.content_metadata) + swh_storage.content_metadata_add(**content_metadata2) + + result = swh_storage.content_metadata_get(content_swhid, authority, limit=1) + assert result["next_page_token"] is not None + assert [data.content_metadata] == result["results"] + + result = swh_storage.content_metadata_get( + content_swhid, authority, limit=1, page_token=result["next_page_token"] + ) + assert result["next_page_token"] is None + assert [content_metadata2] == result["results"] + def test_object_find_by_sha1_git(self, swh_storage): sha1_gits = [b"00000000000000000000"] expected = {