diff --git a/sql/upgrades/157.sql b/sql/upgrades/157.sql new file mode 100644 --- /dev/null +++ b/sql/upgrades/157.sql @@ -0,0 +1,46 @@ +-- SWH DB schema upgrade +-- from_version: 156 +-- to_version: 157 +-- description: Add extrinsic artifact metadata + +-- latest schema version +insert into dbversion(version, release, description) + values(157, now(), 'Work In Progress'); + +create domain swhid as text check (value ~ '^swh:[0-9]+:.*'); + +alter table origin_metadata + rename to object_metadata; + + +-- Use the origin URL as identifier, instead of the origin id +alter table object_metadata + add column type text; +comment on column object_metadata.type is 'the type of object (content/directory/revision/release/snapshot/origin) the metadata is on'; + +alter table object_metadata + add column origin_url text; + +update object_metadata + set + type = 'origin', + origin_url = origin.url + from origin + where object_metadata.origin_id = origin.id; + +alter table object_metadata + alter column type set not null; +alter table object_metadata + alter column origin_url set not null; + +alter table object_metadata + drop column id; +alter table object_metadata + drop column origin_id; + +alter table object_metadata + rename column origin_url to id; +comment on column object_metadata.id is 'the SWHID or origin URL for which the metadata was found'; + +create unique index object_metadata_content_authority_date_fetcher + on object_metadata(id, authority_id, discovery_date, fetcher_id); diff --git a/swh/storage/cassandra/cql.py b/swh/storage/cassandra/cql.py --- a/swh/storage/cassandra/cql.py +++ b/swh/storage/cassandra/cql.py @@ -3,6 +3,7 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import datetime import functools import json import logging @@ -865,12 +866,13 @@ def metadata_fetcher_get(self, name, version, *, statement) -> Optional[Row]: return next(iter(self._execute_with_retries(statement, [name, version])), None) - ########################## - # 'origin_metadata' table - ########################## + ######################### + # 'object_metadata' table + ######################### - _origin_metadata_keys = [ - "origin", + _object_metadata_keys = [ + "type", + "id", "authority_type", "authority_url", "discovery_date", @@ -880,10 +882,14 @@ "metadata", ] - @_prepared_insert_statement("origin_metadata", _origin_metadata_keys) - def origin_metadata_add( + @_prepared_statement( + f"INSERT INTO object_metadata ({', '.join(_object_metadata_keys)}) " + f"VALUES ({', '.join('?' for _ in _object_metadata_keys)})" + ) + def object_metadata_add( self, - origin, + object_type: str, + id: str, authority_type, authority_url, discovery_date, @@ -894,52 +900,57 @@ *, statement, ): - return self._execute_with_retries( - statement, - [ - origin, - authority_type, - authority_url, - discovery_date, - fetcher_name, - fetcher_version, - format, - metadata, - ], - ) + params = [ + object_type, + id, + authority_type, + authority_url, + discovery_date, + fetcher_name, + fetcher_version, + format, + metadata, + ] + + return self._execute_with_retries(statement, params,) @_prepared_statement( - "SELECT * from origin_metadata " - "WHERE origin=? AND authority_url=? AND discovery_date>? " - "AND authority_type=?" + "SELECT * from object_metadata " + "WHERE id=? AND authority_url=? AND discovery_date>? AND authority_type=?" ) - def origin_metadata_get_after_date( - self, origin, authority_type, authority_url, after, *, statement + def object_metadata_get_after_date( + self, + id: str, + authority_type: str, + authority_url: str, + after: datetime.datetime, + *, + statement, ): return self._execute_with_retries( - statement, [origin, authority_url, after, authority_type] + statement, [id, authority_url, after, authority_type] ) @_prepared_statement( - "SELECT * from origin_metadata " - "WHERE origin=? AND authority_type=? AND authority_url=? " + "SELECT * from object_metadata " + "WHERE id=? AND authority_type=? AND authority_url=? " "AND (discovery_date, fetcher_name, fetcher_version) > (?, ?, ?)" ) - def origin_metadata_get_after_date_and_fetcher( + def object_metadata_get_after_date_and_fetcher( self, - origin, - authority_type, - authority_url, - after_date, - after_fetcher_name, - after_fetcher_version, + id: str, + authority_type: str, + authority_url: str, + after_date: datetime.datetime, + after_fetcher_name: str, + after_fetcher_version: str, *, statement, ): return self._execute_with_retries( statement, [ - origin, + id, authority_type, authority_url, after_date, @@ -949,14 +960,14 @@ ) @_prepared_statement( - "SELECT * from origin_metadata " - "WHERE origin=? AND authority_url=? AND authority_type=?" + "SELECT * from object_metadata " + "WHERE id=? AND authority_url=? AND authority_type=?" ) - def origin_metadata_get( - self, origin, authority_type, authority_url, *, statement + def object_metadata_get( + self, id: str, authority_type: str, authority_url: str, *, statement ) -> Iterable[Row]: return self._execute_with_retries( - statement, [origin, authority_url, authority_type] + statement, [id, authority_url, authority_type] ) ########################## diff --git a/swh/storage/cassandra/schema.py b/swh/storage/cassandra/schema.py --- a/swh/storage/cassandra/schema.py +++ b/swh/storage/cassandra/schema.py @@ -30,29 +30,34 @@ $$ ; + CREATE OR REPLACE AGGREGATE ascii_bins_count ( ascii ) SFUNC ascii_bins_count_sfunc STYPE tuple> INITCOND (0, {}) ; + CREATE TYPE IF NOT EXISTS microtimestamp ( seconds bigint, microseconds int ); + CREATE TYPE IF NOT EXISTS microtimestamp_with_timezone ( timestamp frozen, offset smallint, negative_utc boolean ); + CREATE TYPE IF NOT EXISTS person ( fullname blob, name blob, email blob ); + CREATE TABLE IF NOT EXISTS content ( sha1 blob, sha1_git blob, @@ -65,6 +70,7 @@ PRIMARY KEY ((sha1, sha1_git, sha256, blake2s256)) ); + CREATE TABLE IF NOT EXISTS skipped_content ( sha1 blob, sha1_git blob, @@ -79,6 +85,7 @@ PRIMARY KEY ((sha1, sha1_git, sha256, blake2s256)) ); + CREATE TABLE IF NOT EXISTS revision ( id blob PRIMARY KEY, date microtimestamp_with_timezone, @@ -95,6 +102,7 @@ -- extra commit information, etc...) ); + CREATE TABLE IF NOT EXISTS revision_parent ( id blob, parent_rank int, @@ -103,6 +111,7 @@ PRIMARY KEY ((id), parent_rank) ); + CREATE TABLE IF NOT EXISTS release ( id blob PRIMARY KEY, @@ -116,10 +125,12 @@ -- true iff release has been created by Software Heritage ); + CREATE TABLE IF NOT EXISTS directory ( id blob PRIMARY KEY, ); + CREATE TABLE IF NOT EXISTS directory_entry ( directory_id blob, name blob, -- path name, relative to containing dir @@ -129,10 +140,12 @@ PRIMARY KEY ((directory_id), name) ); + CREATE TABLE IF NOT EXISTS snapshot ( id blob PRIMARY KEY, ); + -- For a given snapshot_id, branches are sorted by their name, -- allowing easy pagination. CREATE TABLE IF NOT EXISTS snapshot_branch ( @@ -143,6 +156,7 @@ PRIMARY KEY ((snapshot_id), name) ); + CREATE TABLE IF NOT EXISTS origin_visit ( origin text, visit bigint, @@ -151,6 +165,7 @@ PRIMARY KEY ((origin), visit) ); + CREATE TABLE IF NOT EXISTS origin_visit_status ( origin text, visit bigint, @@ -161,6 +176,7 @@ PRIMARY KEY ((origin), visit, date) ); + CREATE TABLE IF NOT EXISTS origin ( sha1 blob PRIMARY KEY, url text, @@ -188,20 +204,23 @@ ); -CREATE TABLE IF NOT EXISTS origin_metadata ( - origin text, +CREATE TABLE IF NOT EXISTS object_metadata ( + type text, + id text, + + -- metadata source authority_type text, authority_url text, discovery_date timestamp, fetcher_name ascii, fetcher_version ascii, + + -- metadata itself format ascii, metadata blob, - PRIMARY KEY ((origin), authority_type, authority_url, discovery_date, - fetcher_name, fetcher_version), - -- for now, authority_url could be in the partition key; but leaving - -- in the partition key allows listing authorities with metadata on an - -- origin if we ever need to do it. + + PRIMARY KEY ((id), authority_type, authority_url, discovery_date, + fetcher_name, fetcher_version) ); @@ -212,7 +231,7 @@ PRIMARY KEY ((partition_key), object_type) ); """.split( - "\n\n" + "\n\n\n" ) CONTENT_INDEX_TEMPLATE = """ @@ -233,7 +252,7 @@ TABLES = ( "skipped_content content revision revision_parent release " "directory directory_entry snapshot snapshot_branch " - "origin_visit origin origin_metadata object_count " + "origin_visit origin object_metadata object_count " "origin_visit_status metadata_authority " "metadata_fetcher" ).split() diff --git a/swh/storage/cassandra/storage.py b/swh/storage/cassandra/storage.py --- a/swh/storage/cassandra/storage.py +++ b/swh/storage/cassandra/storage.py @@ -1028,16 +1028,50 @@ ) -> None: if not isinstance(origin_url, str): raise StorageArgumentException( - "origin_id must be str, not %r" % (origin_url,) + "origin_url must be str, not %r" % (origin_url,) ) + self._object_metadata_add( + "origin", origin_url, discovery_date, authority, fetcher, format, metadata, + ) + + def origin_metadata_get( + self, + origin_url: str, + authority: Dict[str, str], + after: Optional[datetime.datetime] = None, + page_token: Optional[bytes] = None, + limit: int = 1000, + ) -> Dict[str, Any]: + if not isinstance(origin_url, str): + raise TypeError("origin_url must be str, not %r" % (origin_url,)) + + res = self._object_metadata_get( + "origin", origin_url, authority, after, page_token, limit + ) + for result in res["results"]: + result["origin_url"] = result.pop("id") + + return res + + def _object_metadata_add( + self, + object_type: str, + id: str, + discovery_date: datetime.datetime, + authority: Dict[str, Any], + fetcher: Dict[str, Any], + format: str, + metadata: bytes, + ) -> None: if not self._cql_runner.metadata_authority_get(**authority): raise StorageArgumentException(f"Unknown authority {authority}") if not self._cql_runner.metadata_fetcher_get(**fetcher): raise StorageArgumentException(f"Unknown fetcher {fetcher}") try: - self._cql_runner.origin_metadata_add( - origin_url, + self._cql_runner.object_metadata_add( + object_type, + id, authority["type"], authority["url"], discovery_date, @@ -1049,17 +1083,15 @@ except TypeError as e: raise StorageArgumentException(*e.args) - def origin_metadata_get( + def _object_metadata_get( self, - origin_url: str, + object_type: str, + id: str, authority: Dict[str, str], after: Optional[datetime.datetime] = None, page_token: Optional[bytes] = None, limit: int = 1000, ) -> Dict[str, Any]: - if not isinstance(origin_url, str): - raise TypeError("origin_url must be str, not %r" % (origin_url,)) - if page_token is not None: (after_date, after_fetcher_name, after_fetcher_url) = msgpack_loads( page_token @@ -1068,8 +1100,8 @@ raise StorageArgumentException( "page_token is inconsistent with the value of 'after'." ) - entries = self._cql_runner.origin_metadata_get_after_date_and_fetcher( - origin_url, + entries = self._cql_runner.object_metadata_get_after_date_and_fetcher( + id, authority["type"], authority["url"], after_date, @@ -1077,12 +1109,12 @@ after_fetcher_url, ) elif after is not None: - entries = self._cql_runner.origin_metadata_get_after_date( - origin_url, authority["type"], authority["url"], after + entries = self._cql_runner.object_metadata_get_after_date( + id, authority["type"], authority["url"], after ) else: - entries = self._cql_runner.origin_metadata_get( - origin_url, authority["type"], authority["url"] + entries = self._cql_runner.object_metadata_get( + id, authority["type"], authority["url"] ) if limit: @@ -1091,22 +1123,23 @@ results = [] for entry in entries: discovery_date = entry.discovery_date.replace(tzinfo=datetime.timezone.utc) - results.append( - { - "origin_url": entry.origin, - "authority": { - "type": entry.authority_type, - "url": entry.authority_url, - }, - "fetcher": { - "name": entry.fetcher_name, - "version": entry.fetcher_version, - }, - "discovery_date": discovery_date, - "format": entry.format, - "metadata": entry.metadata, - } - ) + + result = { + "id": entry.id, + "authority": { + "type": entry.authority_type, + "url": entry.authority_url, + }, + "fetcher": { + "name": entry.fetcher_name, + "version": entry.fetcher_version, + }, + "discovery_date": discovery_date, + "format": entry.format, + "metadata": entry.metadata, + } + + results.append(result) if len(results) > limit: results.pop() diff --git a/swh/storage/db.py b/swh/storage/db.py --- a/swh/storage/db.py +++ b/swh/storage/db.py @@ -1092,8 +1092,30 @@ def release_get_random(self, cur=None): return self._get_random_row_from_table("release", ["id"], "id", cur) - origin_metadata_get_cols = [ - "origin.url", + _object_metadata_insert_cols = [ + "type", + "id", + "authority_id", + "fetcher_id", + "discovery_date", + "format", + "metadata", + ] + """List of columns of the object_metadata table, used when writing + metadata.""" + + _object_metadata_insert_query = f""" + INSERT INTO object_metadata + ({', '.join(_object_metadata_insert_cols)}) + VALUES ({', '.join('%s' for _ in _object_metadata_insert_cols)}) + ON CONFLICT (id, authority_id, discovery_date, fetcher_id) + DO UPDATE SET + format=EXCLUDED.format, + metadata=EXCLUDED.metadata + """ + + object_metadata_get_cols = [ + "id", "discovery_date", "metadata_authority.type", "metadata_authority.url", @@ -1103,63 +1125,58 @@ "format", "metadata", ] + """List of columns of the object_metadata, metadata_authority, + and metadata_fetcher tables, used when reading object metadata.""" + + _object_metadata_select_query = f""" + SELECT + object_metadata.id AS id, + {', '.join(object_metadata_get_cols[1:-1])}, + object_metadata.metadata AS metadata + FROM object_metadata + INNER JOIN metadata_authority + ON (metadata_authority.id=authority_id) + INNER JOIN metadata_fetcher ON (metadata_fetcher.id=fetcher_id) + WHERE object_metadata.id=%s AND authority_id=%s + """ - def origin_metadata_add( + def object_metadata_add( self, - origin: str, + object_type: str, + id: str, discovery_date: datetime.datetime, - authority: int, - fetcher: int, + authority_id: int, + fetcher_id: int, format: str, metadata: bytes, - cur=None, - ) -> None: - """ Add an origin_metadata for the origin at ts with provider, tool and - metadata. - - Args: - origin: the origin's id for which the metadata is added - discovery_date: time when the metadata was found - authority: the metadata provider identifier - fetcher: the tool's identifier used to extract metadata - format: the format of the metadata - metadata: the metadata retrieved at the time and location - """ - cur = self._cursor(cur) - insert = """INSERT INTO origin_metadata (origin_id, discovery_date, - authority_id, fetcher_id, format, metadata) - SELECT id, %s, %s, %s, %s, %s FROM origin WHERE url = %s - ON CONFLICT (origin_id, authority_id, discovery_date, fetcher_id) - DO UPDATE SET - format=EXCLUDED.format, - metadata=EXCLUDED.metadata - """ - cur.execute( - insert, (discovery_date, authority, fetcher, format, metadata, origin), + cur, + ): + query = self._object_metadata_insert_query + args: Dict[str, Any] = dict( + type=object_type, + id=id, + authority_id=authority_id, + fetcher_id=fetcher_id, + discovery_date=discovery_date, + format=format, + metadata=metadata, ) + params = [args[col] for col in self._object_metadata_insert_cols] - def origin_metadata_get( + cur.execute(query, params) + + def object_metadata_get( self, - origin_url: str, - authority: int, + object_type: str, + id: str, + authority_id: int, after_time: Optional[datetime.datetime], after_fetcher: Optional[int], - limit: Optional[int], - cur=None, + limit: int, + cur, ): - cur = self._cursor(cur) - assert self.origin_metadata_get_cols[-1] == "metadata" - query_parts = [ - f"SELECT {', '.join(self.origin_metadata_get_cols[0:-1])}, " - f" origin_metadata.metadata AS metadata " - f"FROM origin_metadata " - f"INNER JOIN metadata_authority " - f" ON (metadata_authority.id=authority_id) " - f"INNER JOIN metadata_fetcher ON (metadata_fetcher.id=fetcher_id) " - f"INNER JOIN origin ON (origin.id=origin_metadata.origin_id) " - f"WHERE origin.url=%s AND authority_id=%s " - ] - args = [origin_url, authority] + query_parts = [self._object_metadata_select_query] + args = [id, authority_id] if after_fetcher is not None: assert after_time diff --git a/swh/storage/in_memory.py b/swh/storage/in_memory.py --- a/swh/storage/in_memory.py +++ b/swh/storage/in_memory.py @@ -138,7 +138,7 @@ self._persons = {} # {origin_url: {authority: [metadata]}} - self._origin_metadata: Dict[ + self._object_metadata: Dict[ str, Dict[ Hashable, @@ -1015,6 +1015,19 @@ def refresh_stat_counters(self): pass + def content_metadata_add( + self, + id: str, + discovery_date: datetime.datetime, + authority: Dict[str, Any], + fetcher: Dict[str, Any], + format: str, + metadata: bytes, + ) -> None: + self._object_metadata_add( + "content", id, discovery_date, authority, fetcher, format, metadata, + ) + def origin_metadata_add( self, origin_url: str, @@ -1026,8 +1039,22 @@ ) -> None: if not isinstance(origin_url, str): raise StorageArgumentException( - "origin_id must be str, not %r" % (origin_url,) + "origin_url must be str, not %r" % (origin_url,) ) + self._object_metadata_add( + "origin", origin_url, discovery_date, authority, fetcher, format, metadata, + ) + + def _object_metadata_add( + self, + object_type: str, + id: str, + discovery_date: datetime.datetime, + authority: Dict[str, Any], + fetcher: Dict[str, Any], + format: str, + metadata: bytes, + ) -> None: if not isinstance(metadata, bytes): raise StorageArgumentException( "metadata must be bytes, not %r" % (metadata,) @@ -1039,10 +1066,10 @@ if fetcher_key not in self._metadata_fetchers: raise StorageArgumentException(f"Unknown fetcher {fetcher}") - origin_metadata_list = self._origin_metadata[origin_url][authority_key] + object_metadata_list = self._object_metadata[id][authority_key] - origin_metadata = { - "origin_url": origin_url, + object_metadata: Dict[str, Any] = { + "id": id, "discovery_date": discovery_date, "authority": authority_key, "fetcher": fetcher_key, @@ -1050,17 +1077,16 @@ "metadata": metadata, } - for existing_origin_metadata in origin_metadata_list: + for existing_object_metadata in object_metadata_list: if ( - existing_origin_metadata["fetcher"] == fetcher_key - and existing_origin_metadata["discovery_date"] == discovery_date + existing_object_metadata["fetcher"] == fetcher_key + and existing_object_metadata["discovery_date"] == discovery_date ): # Duplicate of an existing one; replace it. - existing_origin_metadata.update(origin_metadata) + existing_object_metadata.update(object_metadata) break else: - origin_metadata_list.add(origin_metadata) - return None + object_metadata_list.add(object_metadata) def origin_metadata_get( self, @@ -1073,6 +1099,24 @@ if not isinstance(origin_url, str): raise TypeError("origin_url must be str, not %r" % (origin_url,)) + res = self._object_metadata_get( + "origin", origin_url, authority, after, page_token, limit + ) + res["results"] = copy.deepcopy(res["results"]) + for result in res["results"]: + result["origin_url"] = result.pop("id") + + return res + + def _object_metadata_get( + self, + object_type: str, + id: str, + authority: Dict[str, str], + after: Optional[datetime.datetime] = None, + page_token: Optional[bytes] = None, + limit: int = 1000, + ) -> Dict[str, Any]: authority_key = self._metadata_authority_key(authority) if page_token is not None: @@ -1082,16 +1126,14 @@ raise StorageArgumentException( "page_token is inconsistent with the value of 'after'." ) - entries = self._origin_metadata[origin_url][authority_key].iter_after( + entries = self._object_metadata[id][authority_key].iter_after( (after_time, after_fetcher) ) elif after is not None: - entries = self._origin_metadata[origin_url][authority_key].iter_from( - (after,) - ) + entries = self._object_metadata[id][authority_key].iter_from((after,)) entries = (entry for entry in entries if entry["discovery_date"] > after) else: - entries = iter(self._origin_metadata[origin_url][authority_key]) + entries = iter(self._object_metadata[id][authority_key]) if limit: entries = itertools.islice(entries, 0, limit + 1) diff --git a/swh/storage/interface.py b/swh/storage/interface.py --- a/swh/storage/interface.py +++ b/swh/storage/interface.py @@ -1140,7 +1140,7 @@ page_token: Optional[bytes] = None, limit: int = 1000, ) -> Dict[str, Any]: - """Retrieve list of all origin_metadata entries for the origin_id + """Retrieve list of all origin_metadata entries for the origin_url Args: origin_url: the origin's URL diff --git a/swh/storage/sql/30-swh-schema.sql b/swh/storage/sql/30-swh-schema.sql --- a/swh/storage/sql/30-swh-schema.sql +++ b/swh/storage/sql/30-swh-schema.sql @@ -17,7 +17,7 @@ -- latest schema version insert into dbversion(version, release, description) - values(156, now(), 'Work In Progress'); + values(157, now(), 'Work In Progress'); -- a SHA1 checksum create domain sha1 as bytea check (length(value) = 20); @@ -37,6 +37,9 @@ -- a set of UNIX-like access permissions, as manipulated by, e.g., chmod create domain file_perms as int; +-- an SWHID +create domain swhid as text check (value ~ '^swh:[0-9]+:.*'); + -- Checksums about actual file content. Note that the content itself is not -- stored in the DB, but on external (key-value) storage. A single checksum is @@ -420,27 +423,30 @@ comment on column metadata_authority.metadata is 'Other metadata about authority'; --- Discovery of metadata during a listing, loading, deposit or external_catalog of an origin --- also provides a translation to a defined json schema using a translation tool (tool_id) -create table origin_metadata +-- Extrinsic metadata on a DAG objects and origins. +create table object_metadata ( - id bigserial not null, -- PK internal object identifier - origin_id bigint not null, -- references origin(id) - discovery_date timestamptz not null, -- when it was extracted + type text not null, + id text not null, + + -- metadata source authority_id bigint not null, fetcher_id bigint not null, - format text not null default 'sword-v2-atom-codemeta-v2-in-json', + discovery_date timestamptz not null, + + -- metadata itself + format text not null, metadata bytea not null ); -comment on table origin_metadata is 'keeps all metadata found concerning an origin'; -comment on column origin_metadata.id is 'the origin_metadata object''s id'; -comment on column origin_metadata.origin_id is 'the origin id for which the metadata was found'; -comment on column origin_metadata.discovery_date is 'the date of retrieval'; -comment on column origin_metadata.authority_id is 'the metadata provider: github, openhub, deposit, etc.'; -comment on column origin_metadata.fetcher_id is 'the tool used for extracting metadata: loaders, crawlers, etc.'; -comment on column origin_metadata.format is 'name of the format of metadata, used by readers to interpret it.'; -comment on column origin_metadata.metadata is 'original metadata in opaque format'; +comment on table object_metadata is 'keeps all metadata found concerning an object'; +comment on column object_metadata.type is 'the type of object (content/directory/revision/release/snapshot/origin) the metadata is on'; +comment on column object_metadata.id is 'the SWHID or origin URL for which the metadata was found'; +comment on column object_metadata.discovery_date is 'the date of retrieval'; +comment on column object_metadata.authority_id is 'the metadata provider: github, openhub, deposit, etc.'; +comment on column object_metadata.fetcher_id is 'the tool used for extracting metadata: loaders, crawlers, etc.'; +comment on column object_metadata.format is 'name of the format of metadata, used by readers to interpret it.'; +comment on column object_metadata.metadata is 'original metadata in opaque format'; -- Keep a cache of object counts diff --git a/swh/storage/sql/60-swh-indexes.sql b/swh/storage/sql/60-swh-indexes.sql --- a/swh/storage/sql/60-swh-indexes.sql +++ b/swh/storage/sql/60-swh-indexes.sql @@ -168,20 +168,14 @@ create unique index metadata_authority_type_url on metadata_authority(type, url); --- origin_metadata -create unique index concurrently origin_metadata_pkey on origin_metadata(id); -alter table origin_metadata add primary key using index origin_metadata_pkey; +-- object_metadata +create unique index concurrently object_metadata_content_authority_date_fetcher on object_metadata(id, authority_id, discovery_date, fetcher_id); -create unique index concurrently origin_metadata_origin_authority_date_fetcher on origin_metadata(origin_id, authority_id, discovery_date, fetcher_id); +alter table object_metadata add constraint object_metadata_authority_fkey foreign key (authority_id) references metadata_authority(id) not valid; +alter table object_metadata validate constraint object_metadata_authority_fkey; -alter table origin_metadata add constraint origin_metadata_origin_fkey foreign key (origin_id) references origin(id) not valid; -alter table origin_metadata validate constraint origin_metadata_origin_fkey; - -alter table origin_metadata add constraint origin_metadata_authority_fkey foreign key (authority_id) references metadata_authority(id) not valid; -alter table origin_metadata validate constraint origin_metadata_authority_fkey; - -alter table origin_metadata add constraint origin_metadata_fetcher_fkey foreign key (fetcher_id) references metadata_fetcher(id) not valid; -alter table origin_metadata validate constraint origin_metadata_fetcher_fkey; +alter table object_metadata add constraint object_metadata_fetcher_fkey foreign key (fetcher_id) references metadata_fetcher(id) not valid; +alter table object_metadata validate constraint object_metadata_fetcher_fkey; -- object_counts create unique index concurrently object_counts_pkey on object_counts(object_type); diff --git a/swh/storage/storage.py b/swh/storage/storage.py --- a/swh/storage/storage.py +++ b/swh/storage/storage.py @@ -1162,29 +1162,53 @@ db=None, cur=None, ) -> None: - authority_id = db.metadata_authority_get_id( - authority["type"], authority["url"], cur - ) - if not authority_id: - raise StorageArgumentException(f"Unknown authority {authority}") - fetcher_id = db.metadata_fetcher_get_id( - fetcher["name"], fetcher["version"], cur + self._object_metadata_add( + "origin", + origin_url, + discovery_date, + authority, + fetcher, + format, + metadata, + db, + cur, ) - if not fetcher_id: - raise StorageArgumentException(f"Unknown fetcher {fetcher}") - try: - db.origin_metadata_add( - origin_url, - discovery_date, - authority_id, - fetcher_id, - format, - metadata, - cur, + + def _object_metadata_add( + self, + object_type: str, + id: str, + discovery_date: datetime.datetime, + authority: Dict[str, Any], + fetcher: Dict[str, Any], + format: str, + metadata: bytes, + db, + cur, + ) -> None: + authority_id = self._get_authority_id(authority, db, cur) + fetcher_id = self._get_fetcher_id(fetcher, db, cur) + if not isinstance(metadata, bytes): + raise StorageArgumentException( + "metadata must be bytes, not %r" % (metadata,) ) - except psycopg2.ProgrammingError as e: - raise StorageArgumentException(*e.args) - send_metric("origin_metadata:add", count=1, method_name="origin_metadata_add") + + db.object_metadata_add( + object_type, + id, + discovery_date, + authority_id, + fetcher_id, + format, + metadata, + cur, + ) + + send_metric( + f"{object_type}_metadata:add", + count=1, + method_name=f"{object_type}_metadata_add", + ) @timed @db_transaction(statement_timeout=500) @@ -1197,6 +1221,27 @@ limit: int = 1000, db=None, cur=None, + ) -> Dict[str, Any]: + result = self._object_metadata_get( + "origin", origin_url, authority, after, page_token, limit, db, cur + ) + + for res in result["results"]: + res.pop("id") + res["origin_url"] = origin_url + + return result + + def _object_metadata_get( + self, + object_type: str, + id: str, + authority: Dict[str, str], + after: Optional[datetime.datetime], + page_token: Optional[bytes], + limit: int, + db, + cur, ) -> Dict[str, Any]: if page_token: (after_time, after_fetcher) = msgpack_loads(page_token) @@ -1217,28 +1262,27 @@ "results": [], } - rows = db.origin_metadata_get( - origin_url, authority_id, after_time, after_fetcher, limit + 1, cur + rows = db.object_metadata_get( + object_type, id, authority_id, after_time, after_fetcher, limit + 1, cur ) - rows = [dict(zip(db.origin_metadata_get_cols, row)) for row in rows] + rows = [dict(zip(db.object_metadata_get_cols, row)) for row in rows] results = [] for row in rows: row = row.copy() row.pop("metadata_fetcher.id") - results.append( - { - "origin_url": row.pop("origin.url"), - "authority": { - "type": row.pop("metadata_authority.type"), - "url": row.pop("metadata_authority.url"), - }, - "fetcher": { - "name": row.pop("metadata_fetcher.name"), - "version": row.pop("metadata_fetcher.version"), - }, - **row, - } - ) + result = { + "authority": { + "type": row.pop("metadata_authority.type"), + "url": row.pop("metadata_authority.url"), + }, + "fetcher": { + "name": row.pop("metadata_fetcher.name"), + "version": row.pop("metadata_fetcher.version"), + }, + **row, + } + + results.append(result) if len(results) > limit: results.pop() @@ -1314,3 +1358,19 @@ def flush(self, object_types: Optional[Iterable[str]] = None) -> Dict: return {} + + def _get_authority_id(self, authority: Dict[str, Any], db, cur): + authority_id = db.metadata_authority_get_id( + authority["type"], authority["url"], cur + ) + if not authority_id: + raise StorageArgumentException(f"Unknown authority {authority}") + return authority_id + + def _get_fetcher_id(self, fetcher: Dict[str, Any], db, cur): + fetcher_id = db.metadata_fetcher_get_id( + fetcher["name"], fetcher["version"], cur + ) + if not fetcher_id: + raise StorageArgumentException(f"Unknown fetcher {fetcher}") + return fetcher_id