diff --git a/swh/storage/api/serializers.py b/swh/storage/api/serializers.py --- a/swh/storage/api/serializers.py +++ b/swh/storage/api/serializers.py @@ -7,6 +7,7 @@ from typing import Callable, Dict, List, Tuple +from swh.model.identifiers import SWHID, parse_swhid import swh.model.model as model @@ -16,11 +17,23 @@ return d +def _encode_model_enum(obj): + return { + "value": obj.value, + "__type__": type(obj).__name__, + } + + ENCODERS: List[Tuple[type, str, Callable]] = [ (model.BaseModel, "model", _encode_model_object), + (SWHID, "swhid", str), + (model.MetadataTargetType, "model_enum", _encode_model_enum), + (model.MetadataAuthorityType, "model_enum", _encode_model_enum), ] DECODERS: Dict[str, Callable] = { - "model": lambda d: getattr(model, d.pop("__type__")).from_dict(d) + "swhid": parse_swhid, + "model": lambda d: getattr(model, d.pop("__type__")).from_dict(d), + "model_enum": lambda d: getattr(model, d.pop("__type__"))(d["value"]), } diff --git a/swh/storage/cassandra/cql.py b/swh/storage/cassandra/cql.py --- a/swh/storage/cassandra/cql.py +++ b/swh/storage/cassandra/cql.py @@ -18,7 +18,6 @@ Optional, Tuple, TypeVar, - Union, ) from cassandra import CoordinationFailure @@ -46,7 +45,6 @@ from .common import Row, TOKEN_BEGIN, TOKEN_END, hash_url from .schema import CREATE_TABLES_QUERIES, HASH_ALGORITHMS -from .. import extrinsic_metadata logger = logging.getLogger(__name__) @@ -896,36 +894,10 @@ f"VALUES ({', '.join('?' for _ in _object_metadata_keys)})" ) def object_metadata_add( - self, - object_type: str, - id: str, - authority_type, - authority_url, - discovery_date, - fetcher_name, - fetcher_version, - format, - metadata, - context: Dict[str, Union[str, bytes, int]], - *, - statement, + self, statement, **kwargs, ): - params = [ - object_type, - id, - authority_type, - authority_url, - discovery_date, - fetcher_name, - fetcher_version, - format, - metadata, - ] - - params.extend( - context.get(key) for key in extrinsic_metadata.CONTEXT_KEYS[object_type] - ) - + assert set(kwargs) == set(self._object_metadata_keys), set(kwargs) + params = [kwargs[key] for key in self._object_metadata_keys] return self._execute_with_retries(statement, params,) @_prepared_statement( diff --git a/swh/storage/cassandra/storage.py b/swh/storage/cassandra/storage.py --- a/swh/storage/cassandra/storage.py +++ b/swh/storage/cassandra/storage.py @@ -14,6 +14,8 @@ from deprecated import deprecated from swh.core.api.serializers import msgpack_loads, msgpack_dumps +from swh.model.identifiers import parse_swhid, SWHID +from swh.model.hashutil import DEFAULT_ALGORITHMS from swh.model.model import ( Revision, Release, @@ -25,14 +27,17 @@ OriginVisitStatus, Snapshot, Origin, + MetadataAuthority, + MetadataAuthorityType, + MetadataFetcher, + MetadataTargetType, + RawExtrinsicMetadata, ) -from swh.model.hashutil import DEFAULT_ALGORITHMS from swh.storage.objstorage import ObjStorage from swh.storage.writer import JournalWriter -from swh.storage.utils import now +from swh.storage.utils import map_optional, now from ..exc import StorageArgumentException, HashCollision -from ..extrinsic_metadata import check_extrinsic_metadata_context, CONTEXT_KEYS from .common import TOKEN_BEGIN, TOKEN_END from .converters import ( revision_to_db, @@ -1009,124 +1014,48 @@ def refresh_stat_counters(self): pass - def content_metadata_add( - self, - id: str, - context: Dict[str, Union[str, bytes, int]], - discovery_date: datetime.datetime, - authority: Dict[str, Any], - fetcher: Dict[str, Any], - format: str, - metadata: bytes, - ) -> None: - self._object_metadata_add( - "content", - id, - discovery_date, - authority, - fetcher, - format, - metadata, - context, - ) - - def content_metadata_get( - self, - id: str, - authority: Dict[str, str], - after: Optional[datetime.datetime] = None, - page_token: Optional[bytes] = None, - limit: int = 1000, - ) -> Dict[str, Any]: - return self._object_metadata_get( - "content", id, authority, after, page_token, limit, - ) - - def origin_metadata_add( - self, - origin_url: str, - discovery_date: datetime.datetime, - authority: Dict[str, Any], - fetcher: Dict[str, Any], - format: str, - metadata: bytes, - ) -> None: - if not isinstance(origin_url, str): - raise StorageArgumentException( - "origin_url must be str, not %r" % (origin_url,) - ) - - context: Dict[str, Union[str, bytes, int]] = {} # origins have no context - - self._object_metadata_add( - "origin", - origin_url, - discovery_date, - authority, - fetcher, - format, - metadata, - context, - ) - - def origin_metadata_get( - self, - origin_url: str, - authority: Dict[str, str], - after: Optional[datetime.datetime] = None, - page_token: Optional[bytes] = None, - limit: int = 1000, - ) -> Dict[str, Any]: - if not isinstance(origin_url, str): - raise TypeError("origin_url must be str, not %r" % (origin_url,)) - - res = self._object_metadata_get( - "origin", origin_url, authority, after, page_token, limit - ) - for result in res["results"]: - result["origin_url"] = result.pop("id") - - return res - - def _object_metadata_add( - self, - object_type: str, - id: str, - discovery_date: datetime.datetime, - authority: Dict[str, Any], - fetcher: Dict[str, Any], - format: str, - metadata: bytes, - context: Dict[str, Union[str, bytes, int]], - ) -> None: - check_extrinsic_metadata_context(object_type, context) - - if not self._cql_runner.metadata_authority_get(**authority): - raise StorageArgumentException(f"Unknown authority {authority}") - if not self._cql_runner.metadata_fetcher_get(**fetcher): - raise StorageArgumentException(f"Unknown fetcher {fetcher}") + def object_metadata_add(self, metadata: Iterable[RawExtrinsicMetadata]) -> None: + for metadata_entry in metadata: + if not self._cql_runner.metadata_authority_get( + metadata_entry.authority.type.value, metadata_entry.authority.url + ): + raise StorageArgumentException( + f"Unknown authority {metadata_entry.authority}" + ) + if not self._cql_runner.metadata_fetcher_get( + metadata_entry.fetcher.name, metadata_entry.fetcher.version + ): + raise StorageArgumentException( + f"Unknown fetcher {metadata_entry.fetcher}" + ) - try: - self._cql_runner.object_metadata_add( - object_type, - id, - authority["type"], - authority["url"], - discovery_date, - fetcher["name"], - fetcher["version"], - format, - metadata, - context, - ) - except TypeError as e: - raise StorageArgumentException(*e.args) + try: + self._cql_runner.object_metadata_add( + type=metadata_entry.type.value, + id=str(metadata_entry.id), + authority_type=metadata_entry.authority.type.value, + authority_url=metadata_entry.authority.url, + discovery_date=metadata_entry.discovery_date, + fetcher_name=metadata_entry.fetcher.name, + fetcher_version=metadata_entry.fetcher.version, + format=metadata_entry.format, + metadata=metadata_entry.metadata, + origin=metadata_entry.origin, + visit=metadata_entry.visit, + snapshot=map_optional(str, metadata_entry.snapshot), + release=map_optional(str, metadata_entry.release), + revision=map_optional(str, metadata_entry.revision), + path=metadata_entry.path, + directory=map_optional(str, metadata_entry.directory), + ) + except TypeError as e: + raise StorageArgumentException(*e.args) - def _object_metadata_get( + def object_metadata_get( self, - object_type: str, - id: str, - authority: Dict[str, str], + object_type: MetadataTargetType, + id: Union[str, SWHID], + authority: MetadataAuthority, after: Optional[datetime.datetime] = None, page_token: Optional[bytes] = None, limit: int = 1000, @@ -1140,20 +1069,20 @@ "page_token is inconsistent with the value of 'after'." ) entries = self._cql_runner.object_metadata_get_after_date_and_fetcher( - id, - authority["type"], - authority["url"], + str(id), + authority.type.value, + authority.url, after_date, after_fetcher_name, after_fetcher_url, ) elif after is not None: entries = self._cql_runner.object_metadata_get_after_date( - id, authority["type"], authority["url"], after + str(id), authority.type.value, authority.url, after ) else: entries = self._cql_runner.object_metadata_get( - id, authority["type"], authority["url"] + str(id), authority.type.value, authority.url ) if limit: @@ -1163,28 +1092,29 @@ for entry in entries: discovery_date = entry.discovery_date.replace(tzinfo=datetime.timezone.utc) - result = { - "id": entry.id, - "authority": { - "type": entry.authority_type, - "url": entry.authority_url, - }, - "fetcher": { - "name": entry.fetcher_name, - "version": entry.fetcher_version, - }, - "discovery_date": discovery_date, - "format": entry.format, - "metadata": entry.metadata, - } - - if CONTEXT_KEYS[object_type]: - context = {} - for key in CONTEXT_KEYS[object_type]: - value = getattr(entry, key) - if value is not None: - context[key] = value - result["context"] = context + assert str(id) == entry.id + + result = RawExtrinsicMetadata( + type=MetadataTargetType(entry.type), + id=id, + authority=MetadataAuthority( + type=MetadataAuthorityType(entry.authority_type), + url=entry.authority_url, + ), + fetcher=MetadataFetcher( + name=entry.fetcher_name, version=entry.fetcher_version, + ), + discovery_date=discovery_date, + format=entry.format, + metadata=entry.metadata, + origin=entry.origin, + visit=entry.visit, + snapshot=map_optional(parse_swhid, entry.snapshot), + release=map_optional(parse_swhid, entry.release), + revision=map_optional(parse_swhid, entry.revision), + path=entry.path, + directory=map_optional(parse_swhid, entry.directory), + ) results.append(result) @@ -1194,9 +1124,9 @@ last_result = results[-1] next_page_token: Optional[bytes] = msgpack_dumps( ( - last_result["discovery_date"], - last_result["fetcher"]["name"], - last_result["fetcher"]["version"], + last_result.discovery_date, + last_result.fetcher.name, + last_result.fetcher.version, ) ) else: @@ -1207,35 +1137,45 @@ "results": results, } - def metadata_fetcher_add( - self, name: str, version: str, metadata: Dict[str, Any] - ) -> None: - self._cql_runner.metadata_fetcher_add(name, version, json.dumps(metadata)) + def metadata_fetcher_add(self, fetchers: Iterable[MetadataFetcher]) -> None: + for fetcher in fetchers: + self._cql_runner.metadata_fetcher_add( + fetcher.name, + fetcher.version, + json.dumps(map_optional(dict, fetcher.metadata)), + ) - def metadata_fetcher_get(self, name: str, version: str) -> Optional[Dict[str, Any]]: + def metadata_fetcher_get( + self, name: str, version: str + ) -> Optional[MetadataFetcher]: fetcher = self._cql_runner.metadata_fetcher_get(name, version) if fetcher: - return { - "name": fetcher.name, - "version": fetcher.version, - "metadata": json.loads(fetcher.metadata), - } + return MetadataFetcher( + name=fetcher.name, + version=fetcher.version, + metadata=json.loads(fetcher.metadata), + ) else: return None - def metadata_authority_add( - self, type: str, url: str, metadata: Dict[str, Any] - ) -> None: - self._cql_runner.metadata_authority_add(url, type, json.dumps(metadata)) + def metadata_authority_add(self, authorities: Iterable[MetadataAuthority]) -> None: + for authority in authorities: + self._cql_runner.metadata_authority_add( + authority.url, + authority.type.value, + json.dumps(map_optional(dict, authority.metadata)), + ) - def metadata_authority_get(self, type: str, url: str) -> Optional[Dict[str, Any]]: - authority = self._cql_runner.metadata_authority_get(type, url) + def metadata_authority_get( + self, type: MetadataAuthorityType, url: str + ) -> Optional[MetadataAuthority]: + authority = self._cql_runner.metadata_authority_get(type.value, url) if authority: - return { - "type": authority.type, - "url": authority.url, - "metadata": json.loads(authority.metadata), - } + return MetadataAuthority( + type=MetadataAuthorityType(authority.type), + url=authority.url, + metadata=json.loads(authority.metadata), + ) else: return None diff --git a/swh/storage/db.py b/swh/storage/db.py --- a/swh/storage/db.py +++ b/swh/storage/db.py @@ -6,7 +6,7 @@ import datetime import random import select -from typing import Any, Dict, Iterable, List, Optional, Tuple, Union +from typing import Any, Dict, Iterable, List, Optional, Tuple from swh.core.db import BaseDb from swh.core.db.db_utils import stored_procedure, jsonize @@ -1125,7 +1125,8 @@ """ object_metadata_get_cols = [ - "id", + "object_metadata.id", + "object_metadata.type", "discovery_date", "metadata_authority.type", "metadata_authority.url", @@ -1134,16 +1135,14 @@ "metadata_fetcher.version", *_object_metadata_context_cols, "format", - "metadata", + "object_metadata.metadata", ] """List of columns of the object_metadata, metadata_authority, and metadata_fetcher tables, used when reading object metadata.""" _object_metadata_select_query = f""" SELECT - object_metadata.id AS id, - {', '.join(object_metadata_get_cols[1:-1])}, - object_metadata.metadata AS metadata + {', '.join(object_metadata_get_cols)} FROM object_metadata INNER JOIN metadata_authority ON (metadata_authority.id=authority_id) @@ -1155,12 +1154,18 @@ self, object_type: str, id: str, - context: Dict[str, Union[str, bytes, int]], discovery_date: datetime.datetime, authority_id: int, fetcher_id: int, format: str, metadata: bytes, + origin: Optional[str], + visit: Optional[int], + snapshot: Optional[str], + release: Optional[str], + revision: Optional[str], + path: Optional[bytes], + directory: Optional[str], cur, ): query = self._object_metadata_insert_query @@ -1172,9 +1177,14 @@ discovery_date=discovery_date, format=format, metadata=metadata, + origin=origin, + visit=visit, + snapshot=snapshot, + release=release, + revision=revision, + path=path, + directory=directory, ) - for col in self._object_metadata_context_cols: - args[col] = context.get(col) params = [args[col] for col in self._object_metadata_insert_cols] diff --git a/swh/storage/extrinsic_metadata.py b/swh/storage/extrinsic_metadata.py deleted file mode 100644 --- a/swh/storage/extrinsic_metadata.py +++ /dev/null @@ -1,55 +0,0 @@ -# Copyright (C) 2020 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -from typing import Any, cast, Dict - -from swh.model.identifiers import PersistentId, parse_persistent_identifier - -from .exc import StorageArgumentException - -CONTEXT_KEYS: Dict[str, Dict[str, type]] = {} -CONTEXT_KEYS["origin"] = {} -CONTEXT_KEYS["snapshot"] = {"origin": str, "visit": int} -CONTEXT_KEYS["release"] = {**CONTEXT_KEYS["snapshot"], "snapshot": PersistentId} -CONTEXT_KEYS["revision"] = {**CONTEXT_KEYS["release"], "release": PersistentId} -CONTEXT_KEYS["directory"] = { - **CONTEXT_KEYS["revision"], - "revision": PersistentId, - "path": bytes, -} -CONTEXT_KEYS["content"] = {**CONTEXT_KEYS["directory"], "directory": PersistentId} - - -def check_extrinsic_metadata_context(object_type: str, context: Dict[str, Any]): - key_types = CONTEXT_KEYS[object_type] - - extra_keys = set(context) - set(key_types) - if extra_keys: - raise StorageArgumentException(f"Unknown context keys: {', '.join(extra_keys)}") - - for (key, value) in context.items(): - expected_type = key_types[key] - expected_type_str = str(expected_type) # for display - - # If an SWHID is expected and a string is given, parse it - if expected_type is PersistentId and isinstance(value, str): - value = parse_persistent_identifier(value) - expected_type_str = "PersistentId or str" - - # Check the type of the context value - if not isinstance(value, expected_type): - raise StorageArgumentException( - f"Context key {key} must have type {expected_type_str}, " - f"but is {value!r}" - ) - - # If it is an SWHID, check it is also a core SWHID. - if expected_type is PersistentId: - value = cast(PersistentId, value) - if value.metadata != {}: - raise StorageArgumentException( - f"Context key {key} must be a core SWHID, " - f"but it has qualifiers {', '.join(value.metadata)}." - ) diff --git a/swh/storage/in_memory.py b/swh/storage/in_memory.py --- a/swh/storage/in_memory.py +++ b/swh/storage/in_memory.py @@ -33,6 +33,7 @@ from deprecated import deprecated from swh.core.api.serializers import msgpack_loads, msgpack_dumps +from swh.model.identifiers import SWHID from swh.model.model import ( BaseContent, Content, @@ -45,6 +46,11 @@ OriginVisitStatus, Origin, SHA1_SIZE, + MetadataAuthority, + MetadataAuthorityType, + MetadataFetcher, + MetadataTargetType, + RawExtrinsicMetadata, ) from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex from swh.storage.objstorage import ObjStorage @@ -52,7 +58,6 @@ from .converters import origin_url_to_sha1 from .exc import StorageArgumentException, HashCollision -from .extrinsic_metadata import check_extrinsic_metadata_context, CONTEXT_KEYS from .utils import get_partition_bounds_bytes from .writer import JournalWriter @@ -138,21 +143,33 @@ self._origin_visit_statuses: Dict[Tuple[str, int], List[OriginVisitStatus]] = {} self._persons = {} - # {origin_url: {authority: [metadata]}} + # {object_type: {id: {authority: [metadata]}}} self._object_metadata: Dict[ - str, + MetadataTargetType, Dict[ - Hashable, - SortedList[Tuple[datetime.datetime, FetcherKey], Dict[str, Any]], + Union[str, SWHID], + Dict[ + Hashable, + SortedList[ + Tuple[datetime.datetime, FetcherKey], RawExtrinsicMetadata + ], + ], ], ] = defaultdict( lambda: defaultdict( - lambda: SortedList(key=lambda x: (x["discovery_date"], x["fetcher"])) + lambda: defaultdict( + lambda: SortedList( + key=lambda x: ( + x.discovery_date, + self._metadata_fetcher_key(x.fetcher), + ) + ) + ) ) ) # noqa - self._metadata_fetchers: Dict[FetcherKey, Dict[str, Any]] = {} - self._metadata_authorities: Dict[Hashable, Dict[str, Any]] = {} + self._metadata_fetchers: Dict[FetcherKey, MetadataFetcher] = {} + self._metadata_authorities: Dict[Hashable, MetadataAuthority] = {} self._objects = defaultdict(list) self._sorted_sha1s = SortedList[bytes, bytes]() @@ -1018,144 +1035,58 @@ def refresh_stat_counters(self): pass - def content_metadata_add( - self, - id: str, - context: Dict[str, Union[str, bytes, int]], - discovery_date: datetime.datetime, - authority: Dict[str, Any], - fetcher: Dict[str, Any], - format: str, - metadata: bytes, - ) -> None: - self._object_metadata_add( - "content", - id, - discovery_date, - authority, - fetcher, - format, - metadata, - context, - ) - - def content_metadata_get( - self, - id: str, - authority: Dict[str, str], - after: Optional[datetime.datetime] = None, - page_token: Optional[bytes] = None, - limit: int = 1000, - ) -> Dict[str, Any]: - return self._object_metadata_get( - "content", id, authority, after, page_token, limit - ) - - def origin_metadata_add( - self, - origin_url: str, - discovery_date: datetime.datetime, - authority: Dict[str, Any], - fetcher: Dict[str, Any], - format: str, - metadata: bytes, - ) -> None: - if not isinstance(origin_url, str): - raise StorageArgumentException( - "origin_url must be str, not %r" % (origin_url,) - ) + def object_metadata_add(self, metadata: Iterable[RawExtrinsicMetadata],) -> None: + for metadata_entry in metadata: + authority_key = self._metadata_authority_key(metadata_entry.authority) + if authority_key not in self._metadata_authorities: + raise StorageArgumentException( + f"Unknown authority {metadata_entry.authority}" + ) + fetcher_key = self._metadata_fetcher_key(metadata_entry.fetcher) + if fetcher_key not in self._metadata_fetchers: + raise StorageArgumentException( + f"Unknown fetcher {metadata_entry.fetcher}" + ) - context: Dict[str, Union[str, bytes, int]] = {} # origins have no context + object_metadata_list = self._object_metadata[metadata_entry.type][ + metadata_entry.id + ][authority_key] - self._object_metadata_add( - "origin", - origin_url, - discovery_date, - authority, - fetcher, - format, - metadata, - context, - ) + for existing_object_metadata in object_metadata_list: + if ( + self._metadata_fetcher_key(existing_object_metadata.fetcher) + == fetcher_key + and existing_object_metadata.discovery_date + == metadata_entry.discovery_date + ): + # Duplicate of an existing one; ignore it. + break + else: + object_metadata_list.add(metadata_entry) - def origin_metadata_get( + def object_metadata_get( self, - origin_url: str, - authority: Dict[str, str], + object_type: MetadataTargetType, + id: Union[str, SWHID], + authority: MetadataAuthority, after: Optional[datetime.datetime] = None, page_token: Optional[bytes] = None, limit: int = 1000, - ) -> Dict[str, Any]: - if not isinstance(origin_url, str): - raise TypeError("origin_url must be str, not %r" % (origin_url,)) - - res = self._object_metadata_get( - "origin", origin_url, authority, after, page_token, limit - ) - res["results"] = copy.deepcopy(res["results"]) - for result in res["results"]: - result["origin_url"] = result.pop("id") - - return res - - def _object_metadata_add( - self, - object_type: str, - id: str, - discovery_date: datetime.datetime, - authority: Dict[str, Any], - fetcher: Dict[str, Any], - format: str, - metadata: bytes, - context: Dict[str, Union[str, bytes, int]], - ) -> None: - check_extrinsic_metadata_context(object_type, context) - if not isinstance(metadata, bytes): - raise StorageArgumentException( - "metadata must be bytes, not %r" % (metadata,) - ) + ) -> Dict[str, Union[Optional[bytes], List[RawExtrinsicMetadata]]]: authority_key = self._metadata_authority_key(authority) - if authority_key not in self._metadata_authorities: - raise StorageArgumentException(f"Unknown authority {authority}") - fetcher_key = self._metadata_fetcher_key(fetcher) - if fetcher_key not in self._metadata_fetchers: - raise StorageArgumentException(f"Unknown fetcher {fetcher}") - - object_metadata_list = self._object_metadata[id][authority_key] - - object_metadata: Dict[str, Any] = { - "id": id, - "discovery_date": discovery_date, - "authority": authority_key, - "fetcher": fetcher_key, - "format": format, - "metadata": metadata, - } - - if CONTEXT_KEYS[object_type]: - object_metadata["context"] = context - for existing_object_metadata in object_metadata_list: - if ( - existing_object_metadata["fetcher"] == fetcher_key - and existing_object_metadata["discovery_date"] == discovery_date - ): - # Duplicate of an existing one; replace it. - existing_object_metadata.update(object_metadata) - break + if object_type == MetadataTargetType.ORIGIN: + if isinstance(id, SWHID): + raise StorageArgumentException( + f"object_metadata_get called with object_type='origin', but " + f"provided id is an SWHID: {id!r}" + ) else: - object_metadata_list.add(object_metadata) - - def _object_metadata_get( - self, - object_type: str, - id: str, - authority: Dict[str, str], - after: Optional[datetime.datetime] = None, - page_token: Optional[bytes] = None, - limit: int = 1000, - ) -> Dict[str, Any]: - authority_key = self._metadata_authority_key(authority) + if not isinstance(id, SWHID): + raise StorageArgumentException( + f"object_metadata_get called with object_type!='origin', but " + f"provided id is not an SWHID: {id!r}" + ) if page_token is not None: (after_time, after_fetcher) = msgpack_loads(page_token) @@ -1164,33 +1095,36 @@ raise StorageArgumentException( "page_token is inconsistent with the value of 'after'." ) - entries = self._object_metadata[id][authority_key].iter_after( + entries = self._object_metadata[object_type][id][authority_key].iter_after( (after_time, after_fetcher) ) elif after is not None: - entries = self._object_metadata[id][authority_key].iter_from((after,)) - entries = (entry for entry in entries if entry["discovery_date"] > after) + entries = self._object_metadata[object_type][id][authority_key].iter_from( + (after,) + ) + entries = (entry for entry in entries if entry.discovery_date > after) else: - entries = iter(self._object_metadata[id][authority_key]) + entries = iter(self._object_metadata[object_type][id][authority_key]) if limit: entries = itertools.islice(entries, 0, limit + 1) results = [] for entry in entries: - authority = self._metadata_authorities[entry["authority"]] - fetcher = self._metadata_fetchers[entry["fetcher"]] + entry_authority = self._metadata_authorities[ + self._metadata_authority_key(entry.authority) + ] + entry_fetcher = self._metadata_fetchers[ + self._metadata_fetcher_key(entry.fetcher) + ] if after: - assert entry["discovery_date"] > after + assert entry.discovery_date > after results.append( - { - **entry, - "authority": {"type": authority["type"], "url": authority["url"],}, - "fetcher": { - "name": fetcher["name"], - "version": fetcher["version"], - }, - } + attr.evolve( + entry, + authority=attr.evolve(entry_authority, metadata=None), + fetcher=attr.evolve(entry_fetcher, metadata=None), + ) ) if len(results) > limit: @@ -1199,8 +1133,8 @@ last_result = results[-1] next_page_token: Optional[bytes] = msgpack_dumps( ( - last_result["discovery_date"], - self._metadata_fetcher_key(last_result["fetcher"]), + last_result.discovery_date, + self._metadata_fetcher_key(last_result.fetcher), ) ) else: @@ -1211,37 +1145,38 @@ "results": results, } - def metadata_fetcher_add( - self, name: str, version: str, metadata: Dict[str, Any] - ) -> None: - fetcher = { - "name": name, - "version": version, - "metadata": metadata, - } - key = self._metadata_fetcher_key(fetcher) - if key not in self._metadata_fetchers: - self._metadata_fetchers[key] = fetcher + def metadata_fetcher_add(self, fetchers: Iterable[MetadataFetcher]) -> None: + for fetcher in fetchers: + if fetcher.metadata is None: + raise StorageArgumentException( + "MetadataFetcher.metadata may not be None in metadata_fetcher_add." + ) + key = self._metadata_fetcher_key(fetcher) + if key not in self._metadata_fetchers: + self._metadata_fetchers[key] = fetcher - def metadata_fetcher_get(self, name: str, version: str) -> Optional[Dict[str, Any]]: + def metadata_fetcher_get( + self, name: str, version: str + ) -> Optional[MetadataFetcher]: return self._metadata_fetchers.get( - self._metadata_fetcher_key({"name": name, "version": version}) + self._metadata_fetcher_key(MetadataFetcher(name=name, version=version)) ) - def metadata_authority_add( - self, type: str, url: str, metadata: Dict[str, Any] - ) -> None: - authority = { - "type": type, - "url": url, - "metadata": metadata, - } - key = self._metadata_authority_key(authority) - self._metadata_authorities[key] = authority + def metadata_authority_add(self, authorities: Iterable[MetadataAuthority]) -> None: + for authority in authorities: + if authority.metadata is None: + raise StorageArgumentException( + "MetadataAuthority.metadata may not be None in " + "metadata_authority_add." + ) + key = self._metadata_authority_key(authority) + self._metadata_authorities[key] = authority - def metadata_authority_get(self, type: str, url: str) -> Optional[Dict[str, Any]]: + def metadata_authority_get( + self, type: MetadataAuthorityType, url: str + ) -> Optional[MetadataAuthority]: return self._metadata_authorities.get( - self._metadata_authority_key({"type": type, "url": url}) + self._metadata_authority_key(MetadataAuthority(type=type, url=url)) ) def _get_origin_url(self, origin): @@ -1266,12 +1201,12 @@ return tuple((key, content.get(key)) for key in sorted(DEFAULT_ALGORITHMS)) @staticmethod - def _metadata_fetcher_key(fetcher: Dict) -> FetcherKey: - return (fetcher["name"], fetcher["version"]) + def _metadata_fetcher_key(fetcher: MetadataFetcher) -> FetcherKey: + return (fetcher.name, fetcher.version) @staticmethod - def _metadata_authority_key(authority: Dict) -> Hashable: - return (authority["type"], authority["url"]) + def _metadata_authority_key(authority: MetadataAuthority) -> Hashable: + return (authority.type, authority.url) def diff_directories(self, from_dir, to_dir, track_renaming=False): raise NotImplementedError("InMemoryStorage.diff_directories") diff --git a/swh/storage/interface.py b/swh/storage/interface.py --- a/swh/storage/interface.py +++ b/swh/storage/interface.py @@ -8,6 +8,7 @@ from typing import Any, Dict, Iterable, List, Optional, Union from swh.core.api import remote_api_endpoint +from swh.model.identifiers import SWHID from swh.model.model import ( Content, Directory, @@ -18,6 +19,11 @@ Release, Snapshot, SkippedContent, + MetadataAuthority, + MetadataAuthorityType, + MetadataFetcher, + MetadataTargetType, + RawExtrinsicMetadata, ) @@ -1107,118 +1113,38 @@ """Recomputes the statistics for `stat_counters`.""" ... - @remote_api_endpoint("content/metadata/add") - def content_metadata_add( - self, - id: str, - context: Dict[str, Union[str, bytes, int]], - discovery_date: datetime.datetime, - authority: Dict[str, Any], - fetcher: Dict[str, Any], - format: str, - metadata: bytes, - ) -> None: - """Add a content_metadata for the content at discovery_date, - obtained using the `fetcher` from the `authority`. - - The authority and fetcher must be known to the storage before - using this endpoint. - - If there is already content metadata for the same content, authority, - fetcher, and at the same date; the new one will be either dropped or - will replace the existing one - (it is unspecified which one of these two behaviors happens). - - Args: - discovery_date: when the metadata was fetched. - authority: a dict containing keys `type` and `url`. - fetcher: a dict containing keys `name` and `version`. - format: text field indicating the format of the content of the - metadata: blob of raw metadata - """ - ... - - @remote_api_endpoint("content/metadata/get") - def content_metadata_get( - self, - id: str, - authority: Dict[str, str], - after: Optional[datetime.datetime] = None, - page_token: Optional[bytes] = None, - limit: int = 1000, - ) -> Dict[str, Any]: - """Retrieve list of all content_metadata entries for the id - - Args: - id: the content's SWHID - authority: a dict containing keys `type` and `url`. - after: minimum discovery_date for a result to be returned - page_token: opaque token, used to get the next page of results - limit: maximum number of results to be returned - - Returns: - dict with keys `next_page_token` and `results`. - `next_page_token` is an opaque token that is used to get the - next page of results, or `None` if there are no more results. - `results` is a list of dicts in the format: - - .. code-block: python - - { - 'authority': {'type': ..., 'url': ...}, - 'fetcher': {'name': ..., 'version': ...}, - 'discovery_date': ..., - 'format': '...', - 'metadata': b'...', - 'context': { ... }, - } - - """ - ... - - @remote_api_endpoint("origin/metadata/add") - def origin_metadata_add( - self, - origin_url: str, - discovery_date: datetime.datetime, - authority: Dict[str, Any], - fetcher: Dict[str, Any], - format: str, - metadata: bytes, - ) -> None: - """Add an origin_metadata for the origin at discovery_date, - obtained using the `fetcher` from the `authority`. + @remote_api_endpoint("object_metadata/add") + def object_metadata_add(self, metadata: Iterable[RawExtrinsicMetadata],) -> None: + """Add extrinsic metadata on objects (contents, directories, ...). The authority and fetcher must be known to the storage before using this endpoint. - If there is already origin metadata for the same origin, authority, + If there is already content metadata for the same object, authority, fetcher, and at the same date; the new one will be either dropped or will replace the existing one (it is unspecified which one of these two behaviors happens). Args: - discovery_date: when the metadata was fetched. - authority: a dict containing keys `type` and `url`. - fetcher: a dict containing keys `name` and `version`. - format: text field indicating the format of the content of the - metadata: blob of raw metadata + metadata: iterable of RawExtrinsicMetadata objects to be inserted. """ ... - @remote_api_endpoint("origin/metadata/get") - def origin_metadata_get( + @remote_api_endpoint("object_metadata/get") + def object_metadata_get( self, - origin_url: str, - authority: Dict[str, str], + object_type: MetadataTargetType, + id: Union[str, SWHID], + authority: MetadataAuthority, after: Optional[datetime.datetime] = None, page_token: Optional[bytes] = None, limit: int = 1000, - ) -> Dict[str, Any]: - """Retrieve list of all origin_metadata entries for the origin_url + ) -> Dict[str, Union[Optional[bytes], List[RawExtrinsicMetadata]]]: + """Retrieve list of all object_metadata entries for the id Args: - origin_url: the origin's URL + object_type: one of the values of swh.model.model.MetadataTargetType + id: an URL if object_type is 'origin', else a core SWHID authority: a dict containing keys `type` and `url`. after: minimum discovery_date for a result to be returned page_token: opaque token, used to get the next page of results @@ -1228,40 +1154,30 @@ dict with keys `next_page_token` and `results`. `next_page_token` is an opaque token that is used to get the next page of results, or `None` if there are no more results. - `results` is a list of dicts in the format: - - .. code-block: python - - { - 'authority': {'type': ..., 'url': ...}, - 'fetcher': {'name': ..., 'version': ...}, - 'discovery_date': ..., - 'format': '...', - 'metadata': b'...' - } + `results` is a list of RawExtrinsicMetadata objects.: """ ... - @remote_api_endpoint("fetcher/add") - def metadata_fetcher_add( - self, name: str, version: str, metadata: Dict[str, Any] - ) -> None: - """Add a new metadata fetcher to the storage. + @remote_api_endpoint("metadata_fetcher/add") + def metadata_fetcher_add(self, fetchers: Iterable[MetadataFetcher],) -> None: + """Add new metadata fetchers to the storage. - `name` and `version` together are a unique identifier of this + Their `name` and `version` together are unique identifiers of this fetcher; and `metadata` is an arbitrary dict of JSONable data - with information about this fetcher. + with information about this fetcher, which must not be `None` + (but may be empty). Args: - name: the name of the fetcher - version: version of the fetcher + fetchers: iterable of MetadataFetcher to be inserted """ ... - @remote_api_endpoint("fetcher/get") - def metadata_fetcher_get(self, name: str, version: str) -> Optional[Dict[str, Any]]: + @remote_api_endpoint("metadata_fetcher/get") + def metadata_fetcher_get( + self, name: str, version: str + ) -> Optional[MetadataFetcher]: """Retrieve information about a fetcher Args: @@ -1269,27 +1185,30 @@ version: version of the fetcher Returns: - dictionary with keys `name`, `version`, and `metadata`; or None - if the fetcher is not known + a MetadataFetcher object (with a non-None metadata field) if it is known, + else None. """ ... - @remote_api_endpoint("authority/add") - def metadata_authority_add( - self, type: str, url: str, metadata: Dict[str, Any] - ) -> None: - """Add a metadata authority + @remote_api_endpoint("metadata_authority/add") + def metadata_authority_add(self, authorities: Iterable[MetadataAuthority]) -> None: + """Add new metadata authorities to the storage. + + Their `type` and `url` together are unique identifiers of this + authority; and `metadata` is an arbitrary dict of JSONable data + with information about this authority, which must not be `None` + (but may be empty). Args: - type: one of "deposit", "forge", or "registry" - url: unique URI identifying the authority - metadata: JSON-encodable object + authorities: iterable of MetadataAuthority to be inserted """ ... - @remote_api_endpoint("authority/get") - def metadata_authority_get(self, type: str, url: str) -> Optional[Dict[str, Any]]: + @remote_api_endpoint("metadata_authority/get") + def metadata_authority_get( + self, type: MetadataAuthorityType, url: str + ) -> Optional[MetadataAuthority]: """Retrieve information about an authority Args: @@ -1297,8 +1216,8 @@ url: unique URI identifying the authority Returns: - dictionary with keys `type`, `url`, and `metadata`; or None - if the authority is not known + a MetadataAuthority object (with a non-None metadata field) if it is known, + else None. """ ... diff --git a/swh/storage/retry.py b/swh/storage/retry.py --- a/swh/storage/retry.py +++ b/swh/storage/retry.py @@ -6,8 +6,7 @@ import logging import traceback -from datetime import datetime -from typing import Any, Dict, Iterable, Optional +from typing import Dict, Iterable, Optional from tenacity import ( retry, @@ -24,6 +23,9 @@ Snapshot, Origin, OriginVisit, + MetadataAuthority, + MetadataFetcher, + RawExtrinsicMetadata, ) from swh.storage import get_storage @@ -112,30 +114,16 @@ return self.storage.origin_visit_add(visits) @swh_retry - def metadata_fetcher_add( - self, name: str, version: str, metadata: Dict[str, Any] - ) -> None: - return self.storage.metadata_fetcher_add(name, version, metadata) + def metadata_fetcher_add(self, fetchers: Iterable[MetadataFetcher],) -> None: + return self.storage.metadata_fetcher_add(fetchers) @swh_retry - def metadata_authority_add( - self, type: str, url: str, metadata: Dict[str, Any] - ) -> None: - return self.storage.metadata_authority_add(type, url, metadata) + def metadata_authority_add(self, authorities: Iterable[MetadataAuthority]) -> None: + return self.storage.metadata_authority_add(authorities) @swh_retry - def origin_metadata_add( - self, - origin_url: str, - discovery_date: datetime, - authority: Dict[str, Any], - fetcher: Dict[str, Any], - format: str, - metadata: bytes, - ) -> None: - return self.storage.origin_metadata_add( - origin_url, discovery_date, authority, fetcher, format, metadata - ) + def object_metadata_add(self, metadata: Iterable[RawExtrinsicMetadata],) -> None: + return self.storage.object_metadata_add(metadata) @swh_retry def directory_add(self, directories: Iterable[Directory]) -> Dict: diff --git a/swh/storage/storage.py b/swh/storage/storage.py --- a/swh/storage/storage.py +++ b/swh/storage/storage.py @@ -10,7 +10,15 @@ from collections import defaultdict from contextlib import contextmanager from deprecated import deprecated -from typing import Any, Dict, Iterable, List, Optional, Union +from typing import ( + Any, + Counter, + Dict, + Iterable, + List, + Optional, + Union, +) import attr import psycopg2 @@ -18,6 +26,7 @@ import psycopg2.errors from swh.core.api.serializers import msgpack_loads, msgpack_dumps +from swh.model.identifiers import parse_swhid, SWHID from swh.model.model import ( Content, Directory, @@ -29,6 +38,11 @@ SkippedContent, Snapshot, SHA1_SIZE, + MetadataAuthority, + MetadataAuthorityType, + MetadataFetcher, + MetadataTargetType, + RawExtrinsicMetadata, ) from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex from swh.storage.objstorage import ObjStorage @@ -36,16 +50,12 @@ from swh.storage.utils import now from . import converters -from .extrinsic_metadata import ( - check_extrinsic_metadata_context, - CONTEXT_KEYS, -) from .common import db_transaction_generator, db_transaction from .db import Db from .exc import StorageArgumentException, StorageDBError, HashCollision from .algos import diff from .metrics import timed, send_metric, process_metrics -from .utils import get_partition_bounds_bytes, extract_collision_hash +from .utils import get_partition_bounds_bytes, extract_collision_hash, map_optional from .writer import JournalWriter @@ -1130,150 +1140,53 @@ for key in keys: cur.execute("select * from swh_update_counter(%s)", (key,)) - @timed @db_transaction() - def content_metadata_add( - self, - id: str, - context: Dict[str, Union[str, bytes, int]], - discovery_date: datetime.datetime, - authority: Dict[str, Any], - fetcher: Dict[str, Any], - format: str, - metadata: bytes, - db=None, - cur=None, + def object_metadata_add( + self, metadata: Iterable[RawExtrinsicMetadata], db, cur, ) -> None: - self._object_metadata_add( - "content", - id, - context, - discovery_date, - authority, - fetcher, - format, - metadata, - db, - cur, - ) + counter = Counter[MetadataTargetType]() + for metadata_entry in metadata: + authority_id = self._get_authority_id(metadata_entry.authority, db, cur) + fetcher_id = self._get_fetcher_id(metadata_entry.fetcher, db, cur) + + db.object_metadata_add( + object_type=metadata_entry.type.value, + id=str(metadata_entry.id), + discovery_date=metadata_entry.discovery_date, + authority_id=authority_id, + fetcher_id=fetcher_id, + format=metadata_entry.format, + metadata=metadata_entry.metadata, + origin=metadata_entry.origin, + visit=metadata_entry.visit, + snapshot=map_optional(str, metadata_entry.snapshot), + release=map_optional(str, metadata_entry.release), + revision=map_optional(str, metadata_entry.revision), + path=metadata_entry.path, + directory=map_optional(str, metadata_entry.directory), + cur=cur, + ) + counter[metadata_entry.type] += 1 - @timed - @db_transaction() - def content_metadata_get( - self, - id: str, - authority: Dict[str, str], - after: Optional[datetime.datetime] = None, - page_token: Optional[bytes] = None, - limit: int = 1000, - db=None, - cur=None, - ) -> Dict[str, Any]: - return self._object_metadata_get( - "content", id, authority, after, page_token, limit, db, cur - ) + for (object_type, count) in counter.items(): + send_metric( + f"{object_type.value}_metadata:add", + count=count, + method_name=f"{object_type.value}_metadata_add", + ) - @timed @db_transaction() - def origin_metadata_add( + def object_metadata_get( self, - origin_url: str, - discovery_date: datetime.datetime, - authority: Dict[str, Any], - fetcher: Dict[str, Any], - format: str, - metadata: bytes, - db=None, - cur=None, - ) -> None: - context: Dict[str, Union[str, bytes, int]] = {} # origins have no context - - self._object_metadata_add( - "origin", - origin_url, - context, - discovery_date, - authority, - fetcher, - format, - metadata, - db, - cur, - ) - - @timed - @db_transaction(statement_timeout=500) - def origin_metadata_get( - self, - origin_url: str, - authority: Dict[str, str], + object_type: MetadataTargetType, + id: Union[str, SWHID], + authority: MetadataAuthority, after: Optional[datetime.datetime] = None, page_token: Optional[bytes] = None, limit: int = 1000, db=None, cur=None, - ) -> Dict[str, Any]: - result = self._object_metadata_get( - "origin", origin_url, authority, after, page_token, limit, db, cur - ) - - for res in result["results"]: - res.pop("id") - res["origin_url"] = origin_url - - return result - - def _object_metadata_add( - self, - object_type: str, - id: str, - context: Dict[str, Union[str, bytes, int]], - discovery_date: datetime.datetime, - authority: Dict[str, Any], - fetcher: Dict[str, Any], - format: str, - metadata: bytes, - db, - cur, - ) -> None: - check_extrinsic_metadata_context(object_type, context) - - authority_id = self._get_authority_id(authority, db, cur) - fetcher_id = self._get_fetcher_id(fetcher, db, cur) - if not isinstance(metadata, bytes): - raise StorageArgumentException( - "metadata must be bytes, not %r" % (metadata,) - ) - - db.object_metadata_add( - object_type, - id, - context, - discovery_date, - authority_id, - fetcher_id, - format, - metadata, - cur, - ) - - send_metric( - f"{object_type}_metadata:add", - count=1, - method_name=f"{object_type}_metadata_add", - ) - - def _object_metadata_get( - self, - object_type: str, - id: str, - authority: Dict[str, str], - after: Optional[datetime.datetime], - page_token: Optional[bytes], - limit: int, - db, - cur, - ) -> Dict[str, Any]: + ) -> Dict[str, Union[Optional[bytes], List[RawExtrinsicMetadata]]]: if page_token: (after_time, after_fetcher) = msgpack_loads(page_token) if after and after_time < after: @@ -1284,9 +1197,7 @@ after_time = after after_fetcher = None - authority_id = db.metadata_authority_get_id( - authority["type"], authority["url"], cur - ) + authority_id = self._get_authority_id(authority, db, cur) if not authority_id: return { "next_page_token": None, @@ -1294,36 +1205,44 @@ } rows = db.object_metadata_get( - object_type, id, authority_id, after_time, after_fetcher, limit + 1, cur + object_type, + str(id), + authority_id, + after_time, + after_fetcher, + limit + 1, + cur, ) rows = [dict(zip(db.object_metadata_get_cols, row)) for row in rows] results = [] for row in rows: row = row.copy() row.pop("metadata_fetcher.id") - context = {} - for key in CONTEXT_KEYS[object_type]: - value = row[key] - if value is not None: - context[key] = value - - result = { - "id": row["id"], - "authority": { - "type": row.pop("metadata_authority.type"), - "url": row.pop("metadata_authority.url"), - }, - "fetcher": { - "name": row.pop("metadata_fetcher.name"), - "version": row.pop("metadata_fetcher.version"), - }, - "discovery_date": row["discovery_date"], - "format": row["format"], - "metadata": row["metadata"], - } - if CONTEXT_KEYS[object_type]: - result["context"] = context + assert str(id) == row["object_metadata.id"] + + result = RawExtrinsicMetadata( + type=MetadataTargetType(row["object_metadata.type"]), + id=id, + authority=MetadataAuthority( + type=MetadataAuthorityType(row["metadata_authority.type"]), + url=row["metadata_authority.url"], + ), + fetcher=MetadataFetcher( + name=row["metadata_fetcher.name"], + version=row["metadata_fetcher.version"], + ), + discovery_date=row["discovery_date"], + format=row["format"], + metadata=row["object_metadata.metadata"], + origin=row["origin"], + visit=row["visit"], + snapshot=map_optional(parse_swhid, row["snapshot"]), + release=map_optional(parse_swhid, row["release"]), + revision=map_optional(parse_swhid, row["revision"]), + path=row["path"], + directory=map_optional(parse_swhid, row["directory"]), + ) results.append(result) @@ -1348,38 +1267,55 @@ @timed @db_transaction() def metadata_fetcher_add( - self, name: str, version: str, metadata: Dict[str, Any], db=None, cur=None + self, fetchers: Iterable[MetadataFetcher], db=None, cur=None ) -> None: - db.metadata_fetcher_add(name, version, metadata) - send_metric("metadata_fetcher:add", count=1, method_name="metadata_fetcher") + for (i, fetcher) in enumerate(fetchers): + if fetcher.metadata is None: + raise StorageArgumentException( + "MetadataFetcher.metadata may not be None in metadata_fetcher_add." + ) + db.metadata_fetcher_add( + fetcher.name, fetcher.version, dict(fetcher.metadata), cur=cur + ) + send_metric("metadata_fetcher:add", count=i + 1, method_name="metadata_fetcher") @timed @db_transaction(statement_timeout=500) def metadata_fetcher_get( self, name: str, version: str, db=None, cur=None - ) -> Optional[Dict[str, Any]]: + ) -> Optional[MetadataFetcher]: row = db.metadata_fetcher_get(name, version, cur=cur) if not row: return None - return dict(zip(db.metadata_fetcher_cols, row)) + return MetadataFetcher.from_dict(dict(zip(db.metadata_fetcher_cols, row))) @timed @db_transaction() def metadata_authority_add( - self, type: str, url: str, metadata: Dict[str, Any], db=None, cur=None + self, authorities: Iterable[MetadataAuthority], db=None, cur=None ) -> None: - db.metadata_authority_add(type, url, metadata, cur) - send_metric("metadata_authority:add", count=1, method_name="metadata_authority") + for (i, authority) in enumerate(authorities): + if authority.metadata is None: + raise StorageArgumentException( + "MetadataAuthority.metadata may not be None in " + "metadata_authority_add." + ) + db.metadata_authority_add( + authority.type.value, authority.url, dict(authority.metadata), cur=cur + ) + send_metric( + "metadata_authority:add", count=i + 1, method_name="metadata_authority" + ) @timed @db_transaction() def metadata_authority_get( - self, type: str, url: str, db=None, cur=None - ) -> Optional[Dict[str, Any]]: - row = db.metadata_authority_get(type, url, cur=cur) + self, type: MetadataAuthorityType, url: str, db=None, cur=None + ) -> Optional[MetadataAuthority]: + row = db.metadata_authority_get(type.value, url, cur=cur) if not row: return None - return dict(zip(db.metadata_authority_cols, row)) + return MetadataAuthority.from_dict(dict(zip(db.metadata_authority_cols, row))) @timed def diff_directories(self, from_dir, to_dir, track_renaming=False): @@ -1402,18 +1338,16 @@ def flush(self, object_types: Optional[Iterable[str]] = None) -> Dict: return {} - def _get_authority_id(self, authority: Dict[str, Any], db, cur): + def _get_authority_id(self, authority: MetadataAuthority, db, cur): authority_id = db.metadata_authority_get_id( - authority["type"], authority["url"], cur + authority.type.value, authority.url, cur ) if not authority_id: raise StorageArgumentException(f"Unknown authority {authority}") return authority_id - def _get_fetcher_id(self, fetcher: Dict[str, Any], db, cur): - fetcher_id = db.metadata_fetcher_get_id( - fetcher["name"], fetcher["version"], cur - ) + def _get_fetcher_id(self, fetcher: MetadataFetcher, db, cur): + fetcher_id = db.metadata_fetcher_get_id(fetcher.name, fetcher.version, cur) if not fetcher_id: raise StorageArgumentException(f"Unknown fetcher {fetcher}") return fetcher_id diff --git a/swh/storage/tests/storage_data.py b/swh/storage/tests/storage_data.py --- a/swh/storage/tests/storage_data.py +++ b/swh/storage/tests/storage_data.py @@ -4,8 +4,19 @@ # See top-level LICENSE file for more information import datetime + +import attr + from swh.model.hashutil import hash_to_bytes, hash_to_hex from swh.model import from_disk +from swh.model.identifiers import parse_swhid +from swh.model.model import ( + MetadataAuthority, + MetadataAuthorityType, + MetadataFetcher, + RawExtrinsicMetadata, + MetadataTargetType, +) class StorageData: @@ -332,27 +343,21 @@ origins = (origin, origin2) -metadata_authority = { - "type": "deposit", - "url": "http://hal.inria.example.com/", - "metadata": {"location": "France"}, -} -metadata_authority2 = { - "type": "registry", - "url": "http://wikidata.example.com/", - "metadata": {}, -} +metadata_authority = MetadataAuthority( + type=MetadataAuthorityType.DEPOSIT, + url="http://hal.inria.example.com/", + metadata={"location": "France"}, +) +metadata_authority2 = MetadataAuthority( + type=MetadataAuthorityType.REGISTRY, + url="http://wikidata.example.com/", + metadata={}, +) -metadata_fetcher = { - "name": "swh-deposit", - "version": "0.0.1", - "metadata": {"sword_version": "2"}, -} -metadata_fetcher2 = { - "name": "swh-example", - "version": "0.0.1", - "metadata": {}, -} +metadata_fetcher = MetadataFetcher( + name="swh-deposit", version="0.0.1", metadata={"sword_version": "2"}, +) +metadata_fetcher2 = MetadataFetcher(name="swh-example", version="0.0.1", metadata={},) date_visit1 = datetime.datetime(2015, 1, 1, 23, 0, 0, tzinfo=datetime.timezone.utc) type_visit1 = "git" @@ -472,114 +477,82 @@ snapshots = (snapshot, empty_snapshot, complete_snapshot) -content_metadata = { - "id": f"swh:1:cnt:{cont['sha1_git']}", - "context": {"origin": origin["url"]}, - "discovery_date": datetime.datetime( +content_metadata = RawExtrinsicMetadata( + type=MetadataTargetType.CONTENT, + id=parse_swhid(f"swh:1:cnt:{hash_to_hex(cont['sha1_git'])}"), + origin=origin["url"], + discovery_date=datetime.datetime( 2015, 1, 1, 21, 0, 0, tzinfo=datetime.timezone.utc ), - "authority": { - "type": metadata_authority["type"], - "url": metadata_authority["url"], - }, - "fetcher": { - "name": metadata_fetcher["name"], - "version": metadata_fetcher["version"], - }, - "format": "json", - "metadata": b'{"foo": "bar"}', -} -content_metadata2 = { - "id": f"swh:1:cnt:{cont['sha1_git']}", - "context": {"origin": origin2["url"]}, - "discovery_date": datetime.datetime( + authority=attr.evolve(metadata_authority, metadata=None), + fetcher=attr.evolve(metadata_fetcher, metadata=None), + format="json", + metadata=b'{"foo": "bar"}', +) +content_metadata2 = RawExtrinsicMetadata( + type=MetadataTargetType.CONTENT, + id=parse_swhid(f"swh:1:cnt:{hash_to_hex(cont['sha1_git'])}"), + origin=origin2["url"], + discovery_date=datetime.datetime( 2017, 1, 1, 22, 0, 0, tzinfo=datetime.timezone.utc ), - "authority": { - "type": metadata_authority["type"], - "url": metadata_authority["url"], - }, - "fetcher": { - "name": metadata_fetcher["name"], - "version": metadata_fetcher["version"], - }, - "format": "yaml", - "metadata": b"foo: bar", -} -content_metadata3 = { - "id": f"swh:1:cnt:{cont['sha1_git']}", - "context": { - "origin": origin["url"], - "visit": 42, - "snapshot": f"swh:1:snp:{hash_to_hex(snapshot['id'])}", - "release": f"swh:1:rel:{hash_to_hex(release['id'])}", - "revision": f"swh:1:rev:{hash_to_hex(revision['id'])}", - "directory": f"swh:1:dir:{hash_to_hex(dir['id'])}", - "path": b"/foo/bar", - }, - "discovery_date": datetime.datetime( + authority=attr.evolve(metadata_authority, metadata=None), + fetcher=attr.evolve(metadata_fetcher, metadata=None), + format="yaml", + metadata=b"foo: bar", +) +content_metadata3 = RawExtrinsicMetadata( + type=MetadataTargetType.CONTENT, + id=parse_swhid(f"swh:1:cnt:{hash_to_hex(cont['sha1_git'])}"), + discovery_date=datetime.datetime( 2017, 1, 1, 22, 0, 0, tzinfo=datetime.timezone.utc ), - "authority": { - "type": metadata_authority2["type"], - "url": metadata_authority2["url"], - }, - "fetcher": { - "name": metadata_fetcher2["name"], - "version": metadata_fetcher2["version"], - }, - "format": "yaml", - "metadata": b"foo: bar", -} - -origin_metadata = { - "origin_url": origin["url"], - "discovery_date": datetime.datetime( + authority=attr.evolve(metadata_authority2, metadata=None), + fetcher=attr.evolve(metadata_fetcher2, metadata=None), + format="yaml", + metadata=b"foo: bar", + origin=origin["url"], + visit=42, + snapshot=parse_swhid(f"swh:1:snp:{hash_to_hex(snapshot['id'])}"), + release=parse_swhid(f"swh:1:rel:{hash_to_hex(release['id'])}"), + revision=parse_swhid(f"swh:1:rev:{hash_to_hex(revision['id'])}"), + directory=parse_swhid(f"swh:1:dir:{hash_to_hex(dir['id'])}"), + path=b"/foo/bar", +) + +origin_metadata = RawExtrinsicMetadata( + type=MetadataTargetType.ORIGIN, + id=origin["url"], + discovery_date=datetime.datetime( 2015, 1, 1, 21, 0, 0, tzinfo=datetime.timezone.utc ), - "authority": { - "type": metadata_authority["type"], - "url": metadata_authority["url"], - }, - "fetcher": { - "name": metadata_fetcher["name"], - "version": metadata_fetcher["version"], - }, - "format": "json", - "metadata": b'{"foo": "bar"}', -} -origin_metadata2 = { - "origin_url": origin["url"], - "discovery_date": datetime.datetime( + authority=attr.evolve(metadata_authority, metadata=None), + fetcher=attr.evolve(metadata_fetcher, metadata=None), + format="json", + metadata=b'{"foo": "bar"}', +) +origin_metadata2 = RawExtrinsicMetadata( + type=MetadataTargetType.ORIGIN, + id=origin["url"], + discovery_date=datetime.datetime( 2017, 1, 1, 22, 0, 0, tzinfo=datetime.timezone.utc ), - "authority": { - "type": metadata_authority["type"], - "url": metadata_authority["url"], - }, - "fetcher": { - "name": metadata_fetcher["name"], - "version": metadata_fetcher["version"], - }, - "format": "yaml", - "metadata": b"foo: bar", -} -origin_metadata3 = { - "origin_url": origin["url"], - "discovery_date": datetime.datetime( + authority=attr.evolve(metadata_authority, metadata=None), + fetcher=attr.evolve(metadata_fetcher, metadata=None), + format="yaml", + metadata=b"foo: bar", +) +origin_metadata3 = RawExtrinsicMetadata( + type=MetadataTargetType.ORIGIN, + id=origin["url"], + discovery_date=datetime.datetime( 2017, 1, 1, 22, 0, 0, tzinfo=datetime.timezone.utc ), - "authority": { - "type": metadata_authority2["type"], - "url": metadata_authority2["url"], - }, - "fetcher": { - "name": metadata_fetcher2["name"], - "version": metadata_fetcher2["version"], - }, - "format": "yaml", - "metadata": b"foo: bar", -} + authority=attr.evolve(metadata_authority2, metadata=None), + fetcher=attr.evolve(metadata_fetcher2, metadata=None), + format="yaml", + metadata=b"foo: bar", +) person = { "name": b"John Doe", diff --git a/swh/storage/tests/test_retry.py b/swh/storage/tests/test_retry.py --- a/swh/storage/tests/test_retry.py +++ b/swh/storage/tests/test_retry.py @@ -17,6 +17,7 @@ SkippedContent, Origin, OriginVisit, + MetadataTargetType, ) from swh.storage import get_storage @@ -426,16 +427,12 @@ """ fetcher = sample_data["fetcher"][0] - metadata_fetcher = swh_storage.metadata_fetcher_get( - fetcher["name"], fetcher["version"] - ) + metadata_fetcher = swh_storage.metadata_fetcher_get(fetcher.name, fetcher.version) assert not metadata_fetcher - swh_storage.metadata_fetcher_add(**fetcher) + swh_storage.metadata_fetcher_add([fetcher]) - actual_fetcher = swh_storage.metadata_fetcher_get( - fetcher["name"], fetcher["version"] - ) + actual_fetcher = swh_storage.metadata_fetcher_get(fetcher.name, fetcher.version) assert actual_fetcher == fetcher @@ -458,19 +455,13 @@ [fetcher], ] - actual_fetcher = swh_storage.metadata_fetcher_get( - fetcher["name"], fetcher["version"] - ) + actual_fetcher = swh_storage.metadata_fetcher_get(fetcher.name, fetcher.version) assert not actual_fetcher - swh_storage.metadata_fetcher_add(**fetcher) + swh_storage.metadata_fetcher_add([fetcher]) mock_memory.assert_has_calls( - [ - call(fetcher["name"], fetcher["version"], fetcher["metadata"]), - call(fetcher["name"], fetcher["version"], fetcher["metadata"]), - call(fetcher["name"], fetcher["version"], fetcher["metadata"]), - ] + [call([fetcher]), call([fetcher]), call([fetcher]),] ) @@ -489,13 +480,11 @@ fetcher = sample_data["fetcher"][0] - actual_fetcher = swh_storage.metadata_fetcher_get( - fetcher["name"], fetcher["version"] - ) + actual_fetcher = swh_storage.metadata_fetcher_get(fetcher.name, fetcher.version) assert not actual_fetcher with pytest.raises(StorageArgumentException, match="Refuse to add"): - swh_storage.metadata_fetcher_add(**fetcher) + swh_storage.metadata_fetcher_add([fetcher]) assert mock_memory.call_count == 1 @@ -506,13 +495,11 @@ """ authority = sample_data["authority"][0] - assert not swh_storage.metadata_authority_get(authority["type"], authority["url"]) + assert not swh_storage.metadata_authority_get(authority.type, authority.url) - swh_storage.metadata_authority_add(**authority) + swh_storage.metadata_authority_add([authority]) - actual_authority = swh_storage.metadata_authority_get( - authority["type"], authority["url"] - ) + actual_authority = swh_storage.metadata_authority_get(authority.type, authority.url) assert actual_authority == authority @@ -536,14 +523,12 @@ None, ] - assert not swh_storage.metadata_authority_get(authority["type"], authority["url"]) + assert not swh_storage.metadata_authority_get(authority.type, authority.url) - swh_storage.metadata_authority_add(**authority) + swh_storage.metadata_authority_add([authority]) - authority_arg_names = ("type", "url", "metadata") - authority_args = [authority[key] for key in authority_arg_names] mock_memory.assert_has_calls( - [call(*authority_args), call(*authority_args), call(*authority_args),] + [call([authority]), call([authority]), call([authority])] ) @@ -562,49 +547,49 @@ authority = sample_data["authority"][0] - swh_storage.metadata_authority_get(authority["type"], authority["url"]) + swh_storage.metadata_authority_get(authority.type, authority.url) with pytest.raises(StorageArgumentException, match="Refuse to add"): - swh_storage.metadata_authority_add(**authority) + swh_storage.metadata_authority_add([authority]) assert mock_memory.call_count == 1 -def test_retrying_proxy_storage_origin_metadata_add(swh_storage, sample_data): - """Standard origin_metadata_add works as before +def test_retrying_proxy_storage_object_metadata_add(swh_storage, sample_data): + """Standard object_metadata_add works as before """ ori_meta = sample_data["origin_metadata"][0] - swh_storage.origin_add_one({"url": ori_meta["origin_url"]}) - swh_storage.metadata_authority_add(**sample_data["authority"][0]) - swh_storage.metadata_fetcher_add(**sample_data["fetcher"][0]) + swh_storage.origin_add_one({"url": ori_meta.id}) + swh_storage.metadata_authority_add([sample_data["authority"][0]]) + swh_storage.metadata_fetcher_add([sample_data["fetcher"][0]]) - origin_metadata = swh_storage.origin_metadata_get( - ori_meta["origin_url"], ori_meta["authority"] + origin_metadata = swh_storage.object_metadata_get( + MetadataTargetType.ORIGIN, ori_meta.id, ori_meta.authority ) assert origin_metadata["next_page_token"] is None assert not origin_metadata["results"] - swh_storage.origin_metadata_add(**ori_meta) + swh_storage.object_metadata_add([ori_meta]) - origin_metadata = swh_storage.origin_metadata_get( - ori_meta["origin_url"], ori_meta["authority"] + origin_metadata = swh_storage.object_metadata_get( + MetadataTargetType.ORIGIN, ori_meta.id, ori_meta.authority ) assert origin_metadata -def test_retrying_proxy_storage_origin_metadata_add_with_retry( +def test_retrying_proxy_storage_object_metadata_add_with_retry( monkeypatch_sleep, swh_storage, sample_data, mocker, fake_hash_collision ): """Multiple retries for hash collision and psycopg2 error but finally ok """ ori_meta = sample_data["origin_metadata"][0] - swh_storage.origin_add_one({"url": ori_meta["origin_url"]}) - swh_storage.metadata_authority_add(**sample_data["authority"][0]) - swh_storage.metadata_fetcher_add(**sample_data["fetcher"][0]) + swh_storage.origin_add_one({"url": ori_meta.id}) + swh_storage.metadata_authority_add([sample_data["authority"][0]]) + swh_storage.metadata_fetcher_add([sample_data["fetcher"][0]]) mock_memory = mocker.patch( - "swh.storage.in_memory.InMemoryStorage.origin_metadata_add" + "swh.storage.in_memory.InMemoryStorage.object_metadata_add" ) mock_memory.side_effect = [ @@ -617,54 +602,33 @@ ] # No exception raised as insertion finally came through - swh_storage.origin_metadata_add(**ori_meta) + swh_storage.object_metadata_add([ori_meta]) mock_memory.assert_has_calls( [ # 3 calls, as long as error raised - call( - ori_meta["origin_url"], - ori_meta["discovery_date"], - ori_meta["authority"], - ori_meta["fetcher"], - ori_meta["format"], - ori_meta["metadata"], - ), - call( - ori_meta["origin_url"], - ori_meta["discovery_date"], - ori_meta["authority"], - ori_meta["fetcher"], - ori_meta["format"], - ori_meta["metadata"], - ), - call( - ori_meta["origin_url"], - ori_meta["discovery_date"], - ori_meta["authority"], - ori_meta["fetcher"], - ori_meta["format"], - ori_meta["metadata"], - ), + call([ori_meta]), + call([ori_meta]), + call([ori_meta]), ] ) -def test_retrying_proxy_swh_storage_origin_metadata_add_failure( +def test_retrying_proxy_swh_storage_object_metadata_add_failure( swh_storage, sample_data, mocker ): """Unfiltered errors are raising without retry """ mock_memory = mocker.patch( - "swh.storage.in_memory.InMemoryStorage.origin_metadata_add" + "swh.storage.in_memory.InMemoryStorage.object_metadata_add" ) mock_memory.side_effect = StorageArgumentException("Refuse to add always!") ori_meta = sample_data["origin_metadata"][0] - swh_storage.origin_add_one({"url": ori_meta["origin_url"]}) + swh_storage.origin_add_one({"url": ori_meta.id}) with pytest.raises(StorageArgumentException, match="Refuse to add"): - swh_storage.origin_metadata_add(**ori_meta) + swh_storage.object_metadata_add([ori_meta]) assert mock_memory.call_count == 1 diff --git a/swh/storage/tests/test_storage.py b/swh/storage/tests/test_storage.py --- a/swh/storage/tests/test_storage.py +++ b/swh/storage/tests/test_storage.py @@ -17,6 +17,7 @@ from datetime import timedelta from unittest.mock import Mock +import attr import psycopg2 import pytest @@ -26,6 +27,7 @@ from swh.model import from_disk, identifiers from swh.model.hashutil import hash_to_bytes +from swh.model.identifiers import SWHID from swh.model.model import ( Content, Directory, @@ -35,6 +37,7 @@ Release, Revision, Snapshot, + MetadataTargetType, ) from swh.model.hypothesis_strategies import objects from swh.model.hashutil import hash_to_hex @@ -3221,50 +3224,51 @@ def test_metadata_fetcher_add_get(self, swh_storage): actual_fetcher = swh_storage.metadata_fetcher_get( - data.metadata_fetcher["name"], data.metadata_fetcher["version"] + data.metadata_fetcher.name, data.metadata_fetcher.version ) assert actual_fetcher is None # does not exist - swh_storage.metadata_fetcher_add(**data.metadata_fetcher) + swh_storage.metadata_fetcher_add([data.metadata_fetcher]) res = swh_storage.metadata_fetcher_get( - data.metadata_fetcher["name"], data.metadata_fetcher["version"] + data.metadata_fetcher.name, data.metadata_fetcher.version ) - assert res is not data.metadata_fetcher assert res == data.metadata_fetcher def test_metadata_authority_add_get(self, swh_storage): actual_authority = swh_storage.metadata_authority_get( - data.metadata_authority["type"], data.metadata_authority["url"] + data.metadata_authority.type, data.metadata_authority.url ) assert actual_authority is None # does not exist - swh_storage.metadata_authority_add(**data.metadata_authority) + swh_storage.metadata_authority_add([data.metadata_authority]) res = swh_storage.metadata_authority_get( - data.metadata_authority["type"], data.metadata_authority["url"] + data.metadata_authority.type, data.metadata_authority.url ) - assert res is not data.metadata_authority assert res == data.metadata_authority def test_content_metadata_add(self, swh_storage): content = data.cont fetcher = data.metadata_fetcher authority = data.metadata_authority - content_swhid = f"swh:1:cnt:{content['sha1_git']}" + content_swhid = SWHID( + object_type="content", object_id=hash_to_bytes(content["sha1_git"]) + ) - swh_storage.metadata_fetcher_add(**fetcher) - swh_storage.metadata_authority_add(**authority) + swh_storage.metadata_fetcher_add([fetcher]) + swh_storage.metadata_authority_add([authority]) - swh_storage.content_metadata_add(**data.content_metadata) - swh_storage.content_metadata_add(**data.content_metadata2) + swh_storage.object_metadata_add([data.content_metadata, data.content_metadata2]) - result = swh_storage.content_metadata_get(content_swhid, authority) + result = swh_storage.object_metadata_get( + MetadataTargetType.CONTENT, content_swhid, authority + ) assert result["next_page_token"] is None assert [data.content_metadata, data.content_metadata2] == list( - sorted(result["results"], key=lambda x: x["discovery_date"],) + sorted(result["results"], key=lambda x: x.discovery_date,) ) def test_content_metadata_add_duplicate(self, swh_storage): @@ -3272,81 +3276,81 @@ content = data.cont fetcher = data.metadata_fetcher authority = data.metadata_authority - content_swhid = f"swh:1:cnt:{content['sha1_git']}" + content_swhid = SWHID( + object_type="content", object_id=hash_to_bytes(content["sha1_git"]) + ) - new_content_metadata2 = { - **data.content_metadata2, - "format": "new-format", - "metadata": b"new-metadata", - } + new_content_metadata2 = attr.evolve( + data.content_metadata2, format="new-format", metadata=b"new-metadata", + ) - swh_storage.metadata_fetcher_add(**fetcher) - swh_storage.metadata_authority_add(**authority) + swh_storage.metadata_fetcher_add([fetcher]) + swh_storage.metadata_authority_add([authority]) - swh_storage.content_metadata_add(**data.content_metadata) - swh_storage.content_metadata_add(**data.content_metadata2) - swh_storage.content_metadata_add(**new_content_metadata2) + swh_storage.object_metadata_add([data.content_metadata, data.content_metadata2]) + swh_storage.object_metadata_add([new_content_metadata2]) - result = swh_storage.content_metadata_get(content_swhid, authority) + result = swh_storage.object_metadata_get( + MetadataTargetType.CONTENT, content_swhid, authority + ) assert result["next_page_token"] is None expected_results1 = (data.content_metadata, new_content_metadata2) expected_results2 = (data.content_metadata, data.content_metadata2) - assert tuple(sorted(result["results"], key=lambda x: x["discovery_date"],)) in ( + assert tuple(sorted(result["results"], key=lambda x: x.discovery_date,)) in ( expected_results1, # cassandra expected_results2, # postgresql ) - def test_content_metadata_add_dict(self, swh_storage): - fetcher = data.metadata_fetcher - authority = data.metadata_authority - - swh_storage.metadata_fetcher_add(**fetcher) - swh_storage.metadata_authority_add(**authority) - - kwargs = data.content_metadata.copy() - kwargs["metadata"] = {"foo": "bar"} - - with pytest.raises(StorageArgumentException): - swh_storage.content_metadata_add(**kwargs) - def test_content_metadata_get(self, swh_storage): authority = data.metadata_authority fetcher = data.metadata_fetcher authority2 = data.metadata_authority2 fetcher2 = data.metadata_fetcher2 - content1_swhid = f"swh:1:cnt:{data.cont['sha1_git']}" - content2_swhid = f"swh:1:cnt:{data.cont2['sha1_git']}" + content1_swhid = SWHID( + object_type="content", object_id=hash_to_bytes(data.cont["sha1_git"]) + ) + content2_swhid = SWHID( + object_type="content", object_id=hash_to_bytes(data.cont2["sha1_git"]) + ) content1_metadata1 = data.content_metadata content1_metadata2 = data.content_metadata2 content1_metadata3 = data.content_metadata3 - content2_metadata = {**data.content_metadata2, "id": content2_swhid} + content2_metadata = attr.evolve(data.content_metadata2, id=content2_swhid) - swh_storage.metadata_authority_add(**authority) - swh_storage.metadata_fetcher_add(**fetcher) - swh_storage.metadata_authority_add(**authority2) - swh_storage.metadata_fetcher_add(**fetcher2) + swh_storage.metadata_authority_add([authority, authority2]) + swh_storage.metadata_fetcher_add([fetcher, fetcher2]) - swh_storage.content_metadata_add(**content1_metadata1) - swh_storage.content_metadata_add(**content1_metadata2) - swh_storage.content_metadata_add(**content1_metadata3) - swh_storage.content_metadata_add(**content2_metadata) + swh_storage.object_metadata_add( + [ + content1_metadata1, + content1_metadata2, + content1_metadata3, + content2_metadata, + ] + ) - result = swh_storage.content_metadata_get(content1_swhid, authority) + result = swh_storage.object_metadata_get( + MetadataTargetType.CONTENT, content1_swhid, authority + ) assert result["next_page_token"] is None assert [content1_metadata1, content1_metadata2] == list( - sorted(result["results"], key=lambda x: x["discovery_date"],) + sorted(result["results"], key=lambda x: x.discovery_date,) ) - result = swh_storage.content_metadata_get(content1_swhid, authority2) + result = swh_storage.object_metadata_get( + MetadataTargetType.CONTENT, content1_swhid, authority2 + ) assert result["next_page_token"] is None assert [content1_metadata3] == list( - sorted(result["results"], key=lambda x: x["discovery_date"],) + sorted(result["results"], key=lambda x: x.discovery_date,) ) - result = swh_storage.content_metadata_get(content2_swhid, authority) + result = swh_storage.object_metadata_get( + MetadataTargetType.CONTENT, content2_swhid, authority + ) assert result["next_page_token"] is None assert [content2_metadata] == list(result["results"],) @@ -3354,32 +3358,40 @@ content = data.cont fetcher = data.metadata_fetcher authority = data.metadata_authority - content_swhid = f"swh:1:cnt:{content['sha1_git']}" + content_swhid = SWHID( + object_type="content", object_id=hash_to_bytes(content["sha1_git"]) + ) - swh_storage.metadata_fetcher_add(**fetcher) - swh_storage.metadata_authority_add(**authority) + swh_storage.metadata_fetcher_add([fetcher]) + swh_storage.metadata_authority_add([authority]) - swh_storage.content_metadata_add(**data.content_metadata) - swh_storage.content_metadata_add(**data.content_metadata2) + swh_storage.object_metadata_add([data.content_metadata, data.content_metadata2]) - result = swh_storage.content_metadata_get( + result = swh_storage.object_metadata_get( + MetadataTargetType.CONTENT, content_swhid, authority, - after=data.content_metadata["discovery_date"] - timedelta(seconds=1), + after=data.content_metadata.discovery_date - timedelta(seconds=1), ) assert result["next_page_token"] is None assert [data.content_metadata, data.content_metadata2] == list( - sorted(result["results"], key=lambda x: x["discovery_date"],) + sorted(result["results"], key=lambda x: x.discovery_date,) ) - result = swh_storage.content_metadata_get( - content_swhid, authority, after=data.content_metadata["discovery_date"] + result = swh_storage.object_metadata_get( + MetadataTargetType.CONTENT, + content_swhid, + authority, + after=data.content_metadata.discovery_date, ) assert result["next_page_token"] is None assert [data.content_metadata2] == result["results"] - result = swh_storage.content_metadata_get( - content_swhid, authority, after=data.content_metadata2["discovery_date"] + result = swh_storage.object_metadata_get( + MetadataTargetType.CONTENT, + content_swhid, + authority, + after=data.content_metadata2.discovery_date, ) assert result["next_page_token"] is None assert [] == result["results"] @@ -3388,22 +3400,31 @@ content = data.cont fetcher = data.metadata_fetcher authority = data.metadata_authority - content_swhid = f"swh:1:cnt:{content['sha1_git']}" + content_swhid = SWHID( + object_type="content", object_id=hash_to_bytes(content["sha1_git"]) + ) - swh_storage.metadata_fetcher_add(**fetcher) - swh_storage.metadata_authority_add(**authority) + swh_storage.metadata_fetcher_add([fetcher]) + swh_storage.metadata_authority_add([authority]) - swh_storage.content_metadata_add(**data.content_metadata) - swh_storage.content_metadata_add(**data.content_metadata2) + swh_storage.object_metadata_add([data.content_metadata, data.content_metadata2]) - swh_storage.content_metadata_get(content_swhid, authority) + swh_storage.object_metadata_get( + MetadataTargetType.CONTENT, content_swhid, authority + ) - result = swh_storage.content_metadata_get(content_swhid, authority, limit=1) + result = swh_storage.object_metadata_get( + MetadataTargetType.CONTENT, content_swhid, authority, limit=1 + ) assert result["next_page_token"] is not None assert [data.content_metadata] == result["results"] - result = swh_storage.content_metadata_get( - content_swhid, authority, limit=1, page_token=result["next_page_token"] + result = swh_storage.object_metadata_get( + MetadataTargetType.CONTENT, + content_swhid, + authority, + limit=1, + page_token=result["next_page_token"], ) assert result["next_page_token"] is None assert [data.content_metadata2] == result["results"] @@ -3413,27 +3434,33 @@ fetcher1 = data.metadata_fetcher fetcher2 = data.metadata_fetcher2 authority = data.metadata_authority - content_swhid = f"swh:1:cnt:{content['sha1_git']}" + content_swhid = SWHID( + object_type="content", object_id=hash_to_bytes(content["sha1_git"]) + ) - swh_storage.metadata_fetcher_add(**fetcher1) - swh_storage.metadata_fetcher_add(**fetcher2) - swh_storage.metadata_authority_add(**authority) + swh_storage.metadata_fetcher_add([fetcher1, fetcher2]) + swh_storage.metadata_authority_add([authority]) - content_metadata2 = { - **data.content_metadata2, - "discovery_date": data.content_metadata2["discovery_date"], - "fetcher": {"name": fetcher2["name"], "version": fetcher2["version"],}, - } + content_metadata2 = attr.evolve( + data.content_metadata2, + discovery_date=data.content_metadata2.discovery_date, + fetcher=attr.evolve(fetcher2, metadata=None), + ) - swh_storage.content_metadata_add(**data.content_metadata) - swh_storage.content_metadata_add(**content_metadata2) + swh_storage.object_metadata_add([data.content_metadata, content_metadata2]) - result = swh_storage.content_metadata_get(content_swhid, authority, limit=1) + result = swh_storage.object_metadata_get( + MetadataTargetType.CONTENT, content_swhid, authority, limit=1 + ) assert result["next_page_token"] is not None assert [data.content_metadata] == result["results"] - result = swh_storage.content_metadata_get( - content_swhid, authority, limit=1, page_token=result["next_page_token"] + result = swh_storage.object_metadata_get( + MetadataTargetType.CONTENT, + content_swhid, + authority, + limit=1, + page_token=result["next_page_token"], ) assert result["next_page_token"] is None assert [content_metadata2] == result["results"] @@ -3444,16 +3471,17 @@ authority = data.metadata_authority assert swh_storage.origin_add([origin]) == {"origin:add": 1} - swh_storage.metadata_fetcher_add(**fetcher) - swh_storage.metadata_authority_add(**authority) + swh_storage.metadata_fetcher_add([fetcher]) + swh_storage.metadata_authority_add([authority]) - swh_storage.origin_metadata_add(**data.origin_metadata) - swh_storage.origin_metadata_add(**data.origin_metadata2) + swh_storage.object_metadata_add([data.origin_metadata, data.origin_metadata2]) - result = swh_storage.origin_metadata_get(origin["url"], authority) + result = swh_storage.object_metadata_get( + MetadataTargetType.ORIGIN, origin["url"], authority + ) assert result["next_page_token"] is None assert [data.origin_metadata, data.origin_metadata2] == list( - sorted(result["results"], key=lambda x: x["discovery_date"],) + sorted(result["results"], key=lambda x: x.discovery_date) ) def test_origin_metadata_add_duplicate(self, swh_storage): @@ -3463,46 +3491,30 @@ authority = data.metadata_authority assert swh_storage.origin_add([origin]) == {"origin:add": 1} - new_origin_metadata2 = { - **data.origin_metadata2, - "format": "new-format", - "metadata": b"new-metadata", - } + new_origin_metadata2 = attr.evolve( + data.origin_metadata2, format="new-format", metadata=b"new-metadata", + ) - swh_storage.metadata_fetcher_add(**fetcher) - swh_storage.metadata_authority_add(**authority) + swh_storage.metadata_fetcher_add([fetcher]) + swh_storage.metadata_authority_add([authority]) - swh_storage.origin_metadata_add(**data.origin_metadata) - swh_storage.origin_metadata_add(**data.origin_metadata2) - swh_storage.origin_metadata_add(**new_origin_metadata2) + swh_storage.object_metadata_add([data.origin_metadata, data.origin_metadata2]) + swh_storage.object_metadata_add([new_origin_metadata2]) - result = swh_storage.origin_metadata_get(origin["url"], authority) + result = swh_storage.object_metadata_get( + MetadataTargetType.ORIGIN, origin["url"], authority + ) assert result["next_page_token"] is None # which of the two behavior happens is backend-specific. expected_results1 = (data.origin_metadata, new_origin_metadata2) expected_results2 = (data.origin_metadata, data.origin_metadata2) - assert tuple(sorted(result["results"], key=lambda x: x["discovery_date"],)) in ( + assert tuple(sorted(result["results"], key=lambda x: x.discovery_date,)) in ( expected_results1, # cassandra expected_results2, # postgresql ) - def test_origin_metadata_add_dict(self, swh_storage): - origin = data.origin - fetcher = data.metadata_fetcher - authority = data.metadata_authority - assert swh_storage.origin_add([origin]) == {"origin:add": 1} - - swh_storage.metadata_fetcher_add(**fetcher) - swh_storage.metadata_authority_add(**authority) - - kwargs = data.origin_metadata.copy() - kwargs["metadata"] = {"foo": "bar"} - - with pytest.raises(StorageArgumentException): - swh_storage.origin_metadata_add(**kwargs) - def test_origin_metadata_get(self, swh_storage): authority = data.metadata_authority fetcher = data.metadata_fetcher @@ -3515,31 +3527,34 @@ origin1_metadata1 = data.origin_metadata origin1_metadata2 = data.origin_metadata2 origin1_metadata3 = data.origin_metadata3 - origin2_metadata = {**data.origin_metadata2, "origin_url": origin_url2} + origin2_metadata = attr.evolve(data.origin_metadata2, id=origin_url2) - swh_storage.metadata_authority_add(**authority) - swh_storage.metadata_fetcher_add(**fetcher) - swh_storage.metadata_authority_add(**authority2) - swh_storage.metadata_fetcher_add(**fetcher2) + swh_storage.metadata_authority_add([authority, authority2]) + swh_storage.metadata_fetcher_add([fetcher, fetcher2]) - swh_storage.origin_metadata_add(**origin1_metadata1) - swh_storage.origin_metadata_add(**origin1_metadata2) - swh_storage.origin_metadata_add(**origin1_metadata3) - swh_storage.origin_metadata_add(**origin2_metadata) + swh_storage.object_metadata_add( + [origin1_metadata1, origin1_metadata2, origin1_metadata3, origin2_metadata] + ) - result = swh_storage.origin_metadata_get(origin_url1, authority) + result = swh_storage.object_metadata_get( + MetadataTargetType.ORIGIN, origin_url1, authority + ) assert result["next_page_token"] is None assert [origin1_metadata1, origin1_metadata2] == list( - sorted(result["results"], key=lambda x: x["discovery_date"],) + sorted(result["results"], key=lambda x: x.discovery_date,) ) - result = swh_storage.origin_metadata_get(origin_url1, authority2) + result = swh_storage.object_metadata_get( + MetadataTargetType.ORIGIN, origin_url1, authority2 + ) assert result["next_page_token"] is None assert [origin1_metadata3] == list( - sorted(result["results"], key=lambda x: x["discovery_date"],) + sorted(result["results"], key=lambda x: x.discovery_date,) ) - result = swh_storage.origin_metadata_get(origin_url2, authority) + result = swh_storage.object_metadata_get( + MetadataTargetType.ORIGIN, origin_url2, authority + ) assert result["next_page_token"] is None assert [origin2_metadata] == list(result["results"],) @@ -3549,30 +3564,36 @@ authority = data.metadata_authority assert swh_storage.origin_add([origin]) == {"origin:add": 1} - swh_storage.metadata_fetcher_add(**fetcher) - swh_storage.metadata_authority_add(**authority) + swh_storage.metadata_fetcher_add([fetcher]) + swh_storage.metadata_authority_add([authority]) - swh_storage.origin_metadata_add(**data.origin_metadata) - swh_storage.origin_metadata_add(**data.origin_metadata2) + swh_storage.object_metadata_add([data.origin_metadata, data.origin_metadata2]) - result = swh_storage.origin_metadata_get( + result = swh_storage.object_metadata_get( + MetadataTargetType.ORIGIN, origin["url"], authority, - after=data.origin_metadata["discovery_date"] - timedelta(seconds=1), + after=data.origin_metadata.discovery_date - timedelta(seconds=1), ) assert result["next_page_token"] is None assert [data.origin_metadata, data.origin_metadata2] == list( - sorted(result["results"], key=lambda x: x["discovery_date"],) + sorted(result["results"], key=lambda x: x.discovery_date,) ) - result = swh_storage.origin_metadata_get( - origin["url"], authority, after=data.origin_metadata["discovery_date"] + result = swh_storage.object_metadata_get( + MetadataTargetType.ORIGIN, + origin["url"], + authority, + after=data.origin_metadata.discovery_date, ) assert result["next_page_token"] is None assert [data.origin_metadata2] == result["results"] - result = swh_storage.origin_metadata_get( - origin["url"], authority, after=data.origin_metadata2["discovery_date"] + result = swh_storage.object_metadata_get( + MetadataTargetType.ORIGIN, + origin["url"], + authority, + after=data.origin_metadata2.discovery_date, ) assert result["next_page_token"] is None assert [] == result["results"] @@ -3583,20 +3604,27 @@ authority = data.metadata_authority assert swh_storage.origin_add([origin]) == {"origin:add": 1} - swh_storage.metadata_fetcher_add(**fetcher) - swh_storage.metadata_authority_add(**authority) + swh_storage.metadata_fetcher_add([fetcher]) + swh_storage.metadata_authority_add([authority]) - swh_storage.origin_metadata_add(**data.origin_metadata) - swh_storage.origin_metadata_add(**data.origin_metadata2) + swh_storage.object_metadata_add([data.origin_metadata, data.origin_metadata2]) - swh_storage.origin_metadata_get(origin["url"], authority) + swh_storage.object_metadata_get( + MetadataTargetType.ORIGIN, origin["url"], authority + ) - result = swh_storage.origin_metadata_get(origin["url"], authority, limit=1) + result = swh_storage.object_metadata_get( + MetadataTargetType.ORIGIN, origin["url"], authority, limit=1 + ) assert result["next_page_token"] is not None assert [data.origin_metadata] == result["results"] - result = swh_storage.origin_metadata_get( - origin["url"], authority, limit=1, page_token=result["next_page_token"] + result = swh_storage.object_metadata_get( + MetadataTargetType.ORIGIN, + origin["url"], + authority, + limit=1, + page_token=result["next_page_token"], ) assert result["next_page_token"] is None assert [data.origin_metadata2] == result["results"] @@ -3608,25 +3636,30 @@ authority = data.metadata_authority assert swh_storage.origin_add([origin]) == {"origin:add": 1} - swh_storage.metadata_fetcher_add(**fetcher1) - swh_storage.metadata_fetcher_add(**fetcher2) - swh_storage.metadata_authority_add(**authority) + swh_storage.metadata_fetcher_add([fetcher1]) + swh_storage.metadata_fetcher_add([fetcher2]) + swh_storage.metadata_authority_add([authority]) - origin_metadata2 = { - **data.origin_metadata2, - "discovery_date": data.origin_metadata2["discovery_date"], - "fetcher": {"name": fetcher2["name"], "version": fetcher2["version"],}, - } + origin_metadata2 = attr.evolve( + data.origin_metadata2, + discovery_date=data.origin_metadata2.discovery_date, + fetcher=attr.evolve(fetcher2, metadata=None), + ) - swh_storage.origin_metadata_add(**data.origin_metadata) - swh_storage.origin_metadata_add(**origin_metadata2) + swh_storage.object_metadata_add([data.origin_metadata, origin_metadata2]) - result = swh_storage.origin_metadata_get(origin["url"], authority, limit=1) + result = swh_storage.object_metadata_get( + MetadataTargetType.ORIGIN, origin["url"], authority, limit=1 + ) assert result["next_page_token"] is not None assert [data.origin_metadata] == result["results"] - result = swh_storage.origin_metadata_get( - origin["url"], authority, limit=1, page_token=result["next_page_token"] + result = swh_storage.object_metadata_get( + MetadataTargetType.ORIGIN, + origin["url"], + authority, + limit=1, + page_token=result["next_page_token"], ) assert result["next_page_token"] is None assert [origin_metadata2] == result["results"] diff --git a/swh/storage/utils.py b/swh/storage/utils.py --- a/swh/storage/utils.py +++ b/swh/storage/utils.py @@ -6,7 +6,7 @@ import re from datetime import datetime, timezone -from typing import Dict, Optional, Tuple +from typing import Callable, Dict, Optional, Tuple, TypeVar from swh.model.hashutil import hash_to_bytes, hash_to_hex, DEFAULT_ALGORITHMS @@ -15,6 +15,17 @@ return datetime.now(tz=timezone.utc) +T1 = TypeVar("T1") +T2 = TypeVar("T2") + + +def map_optional(f: Callable[[T1], T2], x: Optional[T1]) -> Optional[T2]: + if x is None: + return None + else: + return f(x) + + def _is_power_of_two(n: int) -> bool: return n > 0 and n & (n - 1) == 0