diff --git a/swh/storage/cassandra/cql.py b/swh/storage/cassandra/cql.py --- a/swh/storage/cassandra/cql.py +++ b/swh/storage/cassandra/cql.py @@ -46,6 +46,7 @@ from .common import Row, TOKEN_BEGIN, TOKEN_END, hash_url from .schema import CREATE_TABLES_QUERIES, HASH_ALGORITHMS +from .. import extrinsic_metadata logger = logging.getLogger(__name__) @@ -921,6 +922,10 @@ metadata, ] + params.extend( + context.get(key) for key in extrinsic_metadata.CONTEXT_KEYS[object_type] + ) + return self._execute_with_retries(statement, params,) @_prepared_statement( diff --git a/swh/storage/cassandra/storage.py b/swh/storage/cassandra/storage.py --- a/swh/storage/cassandra/storage.py +++ b/swh/storage/cassandra/storage.py @@ -260,6 +260,39 @@ [{"sha1_git": c for c in contents}], key_hash="sha1_git" ) + def content_metadata_add( + self, + id: str, + context: Dict[str, Union[str, bytes, int]], + discovery_date: datetime.datetime, + authority: Dict[str, Any], + fetcher: Dict[str, Any], + format: str, + metadata: bytes, + ) -> None: + self._object_metadata_add( + "content", + id, + discovery_date, + authority, + fetcher, + format, + metadata, + context, + ) + + def content_metadata_get( + self, + id: str, + authority: Dict[str, str], + after: Optional[datetime.datetime] = None, + page_token: Optional[bytes] = None, + limit: int = 1000, + ) -> Dict[str, Any]: + return self._object_metadata_get( + "content", id, authority, after, page_token, limit, + ) + def content_get_random(self): return self._cql_runner.content_get_random().sha1_git diff --git a/swh/storage/in_memory.py b/swh/storage/in_memory.py --- a/swh/storage/in_memory.py +++ b/swh/storage/in_memory.py @@ -1112,6 +1112,18 @@ else: object_metadata_list.add(object_metadata) + def content_metadata_get( + self, + id: str, + authority: Dict[str, str], + after: Optional[datetime.datetime] = None, + page_token: Optional[bytes] = None, + limit: int = 1000, + ) -> Dict[str, Any]: + return self._object_metadata_get( + "content", id, authority, after, page_token, limit + ) + def origin_metadata_get( self, origin_url: str, diff --git a/swh/storage/interface.py b/swh/storage/interface.py --- a/swh/storage/interface.py +++ b/swh/storage/interface.py @@ -5,7 +5,7 @@ import datetime -from typing import Any, Dict, Iterable, List, Optional +from typing import Any, Dict, Iterable, List, Optional, Union from swh.core.api import remote_api_endpoint from swh.model.model import ( @@ -1103,6 +1103,73 @@ """Recomputes the statistics for `stat_counters`.""" ... + @remote_api_endpoint("content/metadata/add") + def content_metadata_add( + self, + id: str, + context: Dict[str, Union[str, bytes, int]], + discovery_date: datetime.datetime, + authority: Dict[str, Any], + fetcher: Dict[str, Any], + format: str, + metadata: bytes, + ) -> None: + """Add a content_metadata for the content at discovery_date, + obtained using the `fetcher` from the `authority`. + + The authority and fetcher must be known to the storage before + using this endpoint. + + If there is already content metadata for the same content, authority, + fetcher, and at the same date, it will be replaced by this one. + + Args: + discovery_date: when the metadata was fetched. + authority: a dict containing keys `type` and `url`. + fetcher: a dict containing keys `name` and `version`. + format: text field indicating the format of the content of the + metadata: blob of raw metadata + """ + ... + + @remote_api_endpoint("content/metadata/get") + def content_metadata_get( + self, + id: str, + authority: Dict[str, str], + after: Optional[datetime.datetime] = None, + page_token: Optional[bytes] = None, + limit: int = 1000, + ) -> Dict[str, Any]: + """Retrieve list of all content_metadata entries for the id + + Args: + id: the content's SWHID + authority: a dict containing keys `type` and `url`. + after: minimum discovery_date for a result to be returned + page_token: opaque token, used to get the next page of results + limit: maximum number of results to be returned + + Returns: + dict with keys `next_page_token` and `results`. + `next_page_token` is an opaque token that is used to get the + next page of results, or `None` if there are no more results. + `results` is a list of dicts in the format: + + .. code-block: python + + { + 'authority': {'type': ..., 'url': ...}, + 'fetcher': {'name': ..., 'version': ...}, + 'discovery_date': ..., + 'format': '...', + 'metadata': b'...', + 'context': { ... }, + } + + """ + ... + @remote_api_endpoint("origin/metadata/add") def origin_metadata_add( self, diff --git a/swh/storage/storage.py b/swh/storage/storage.py --- a/swh/storage/storage.py +++ b/swh/storage/storage.py @@ -364,6 +364,49 @@ ) return [dict(zip(db.content_find_cols, content)) for content in contents] + @timed + @db_transaction() + def content_metadata_add( + self, + id: str, + context: Dict[str, Union[str, bytes, int]], + discovery_date: datetime.datetime, + authority: Dict[str, Any], + fetcher: Dict[str, Any], + format: str, + metadata: bytes, + db=None, + cur=None, + ) -> None: + self._object_metadata_add( + "content", + id, + context, + discovery_date, + authority, + fetcher, + format, + metadata, + db, + cur, + ) + + @timed + @db_transaction() + def content_metadata_get( + self, + id: str, + authority: Dict[str, str], + after: Optional[datetime.datetime] = None, + page_token: Optional[bytes] = None, + limit: int = 1000, + db=None, + cur=None, + ) -> Dict[str, Any]: + return self._object_metadata_get( + "content", id, authority, after, page_token, limit, db, cur + ) + @timed @db_transaction() def content_get_random(self, db=None, cur=None): diff --git a/swh/storage/tests/storage_data.py b/swh/storage/tests/storage_data.py --- a/swh/storage/tests/storage_data.py +++ b/swh/storage/tests/storage_data.py @@ -4,7 +4,7 @@ # See top-level LICENSE file for more information import datetime -from swh.model.hashutil import hash_to_bytes +from swh.model.hashutil import hash_to_bytes, hash_to_hex from swh.model import from_disk @@ -472,6 +472,65 @@ snapshots = (snapshot, empty_snapshot, complete_snapshot) +content_metadata = { + "id": f"swh:1:cnt:{cont['sha1_git']}", + "context": {"origin": origin["url"]}, + "discovery_date": datetime.datetime( + 2015, 1, 1, 21, 0, 0, tzinfo=datetime.timezone.utc + ), + "authority": { + "type": metadata_authority["type"], + "url": metadata_authority["url"], + }, + "fetcher": { + "name": metadata_fetcher["name"], + "version": metadata_fetcher["version"], + }, + "format": "json", + "metadata": b'{"foo": "bar"}', +} +content_metadata2 = { + "id": f"swh:1:cnt:{cont['sha1_git']}", + "context": {"origin": origin2["url"]}, + "discovery_date": datetime.datetime( + 2017, 1, 1, 22, 0, 0, tzinfo=datetime.timezone.utc + ), + "authority": { + "type": metadata_authority["type"], + "url": metadata_authority["url"], + }, + "fetcher": { + "name": metadata_fetcher["name"], + "version": metadata_fetcher["version"], + }, + "format": "yaml", + "metadata": b"foo: bar", +} +content_metadata3 = { + "id": f"swh:1:cnt:{cont['sha1_git']}", + "context": { + "origin": origin["url"], + "visit": 42, + "snapshot": f"swh:1:snp:{hash_to_hex(snapshot['id'])}", + "release": f"swh:1:rel:{hash_to_hex(release['id'])}", + "revision": f"swh:1:rev:{hash_to_hex(revision['id'])}", + "directory": f"swh:1:dir:{hash_to_hex(dir['id'])}", + "path": b"/foo/bar", + }, + "discovery_date": datetime.datetime( + 2017, 1, 1, 22, 0, 0, tzinfo=datetime.timezone.utc + ), + "authority": { + "type": metadata_authority2["type"], + "url": metadata_authority2["url"], + }, + "fetcher": { + "name": metadata_fetcher2["name"], + "version": metadata_fetcher2["version"], + }, + "format": "yaml", + "metadata": b"foo: bar", +} origin_metadata = { "origin_url": origin["url"], diff --git a/swh/storage/tests/test_storage.py b/swh/storage/tests/test_storage.py --- a/swh/storage/tests/test_storage.py +++ b/swh/storage/tests/test_storage.py @@ -3187,6 +3187,190 @@ with pytest.raises(StorageArgumentException): swh_storage.content_find({"unknown-sha1": "something"}) # not the right key + def test_content_metadata_add(self, swh_storage): + content = data.cont + fetcher = data.metadata_fetcher + authority = data.metadata_authority + content_swhid = f"swh:1:cnt:{content['sha1_git']}" + + swh_storage.metadata_fetcher_add(**fetcher) + swh_storage.metadata_authority_add(**authority) + + swh_storage.content_metadata_add(**data.content_metadata) + swh_storage.content_metadata_add(**data.content_metadata2) + + result = swh_storage.content_metadata_get(content_swhid, authority) + assert result["next_page_token"] is None + assert [data.content_metadata, data.content_metadata2] == list( + sorted(result["results"], key=lambda x: x["discovery_date"],) + ) + + def test_content_metadata_add_duplicate(self, swh_storage): + """Duplicates should be silently updated.""" + content = data.cont + fetcher = data.metadata_fetcher + authority = data.metadata_authority + content_swhid = f"swh:1:cnt:{content['sha1_git']}" + + new_content_metadata2 = { + **data.content_metadata2, + "format": "new-format", + "metadata": b"new-metadata", + } + + swh_storage.metadata_fetcher_add(**fetcher) + swh_storage.metadata_authority_add(**authority) + + swh_storage.content_metadata_add(**data.content_metadata) + swh_storage.content_metadata_add(**data.content_metadata2) + swh_storage.content_metadata_add(**new_content_metadata2) + + result = swh_storage.content_metadata_get(content_swhid, authority) + assert result["next_page_token"] is None + assert [data.content_metadata, new_content_metadata2] == list( + sorted(result["results"], key=lambda x: x["discovery_date"],) + ) + + def test_content_metadata_add_dict(self, swh_storage): + fetcher = data.metadata_fetcher + authority = data.metadata_authority + + swh_storage.metadata_fetcher_add(**fetcher) + swh_storage.metadata_authority_add(**authority) + + kwargs = data.content_metadata.copy() + kwargs["metadata"] = {"foo": "bar"} + + with pytest.raises(StorageArgumentException): + swh_storage.content_metadata_add(**kwargs) + + def test_content_metadata_get(self, swh_storage): + authority = data.metadata_authority + fetcher = data.metadata_fetcher + authority2 = data.metadata_authority2 + fetcher2 = data.metadata_fetcher2 + content1_swhid = f"swh:1:cnt:{data.cont['sha1_git']}" + content2_swhid = f"swh:1:cnt:{data.cont2['sha1_git']}" + + content1_metadata1 = data.content_metadata + content1_metadata2 = data.content_metadata2 + content1_metadata3 = data.content_metadata3 + content2_metadata = {**data.content_metadata2, "id": content2_swhid} + + swh_storage.metadata_authority_add(**authority) + swh_storage.metadata_fetcher_add(**fetcher) + swh_storage.metadata_authority_add(**authority2) + swh_storage.metadata_fetcher_add(**fetcher2) + + swh_storage.content_metadata_add(**content1_metadata1) + swh_storage.content_metadata_add(**content1_metadata2) + swh_storage.content_metadata_add(**content1_metadata3) + swh_storage.content_metadata_add(**content2_metadata) + + result = swh_storage.content_metadata_get(content1_swhid, authority) + assert result["next_page_token"] is None + assert [content1_metadata1, content1_metadata2] == list( + sorted(result["results"], key=lambda x: x["discovery_date"],) + ) + + result = swh_storage.content_metadata_get(content1_swhid, authority2) + assert result["next_page_token"] is None + assert [content1_metadata3] == list( + sorted(result["results"], key=lambda x: x["discovery_date"],) + ) + + result = swh_storage.content_metadata_get(content2_swhid, authority) + assert result["next_page_token"] is None + assert [content2_metadata] == list(result["results"],) + + def test_content_metadata_get_after(self, swh_storage): + content = data.cont + fetcher = data.metadata_fetcher + authority = data.metadata_authority + content_swhid = f"swh:1:cnt:{content['sha1_git']}" + + swh_storage.metadata_fetcher_add(**fetcher) + swh_storage.metadata_authority_add(**authority) + + swh_storage.content_metadata_add(**data.content_metadata) + swh_storage.content_metadata_add(**data.content_metadata2) + + result = swh_storage.content_metadata_get( + content_swhid, + authority, + after=data.content_metadata["discovery_date"] - timedelta(seconds=1), + ) + assert result["next_page_token"] is None + assert [data.content_metadata, data.content_metadata2] == list( + sorted(result["results"], key=lambda x: x["discovery_date"],) + ) + + result = swh_storage.content_metadata_get( + content_swhid, authority, after=data.content_metadata["discovery_date"] + ) + assert result["next_page_token"] is None + assert [data.content_metadata2] == result["results"] + + result = swh_storage.content_metadata_get( + content_swhid, authority, after=data.content_metadata2["discovery_date"] + ) + assert result["next_page_token"] is None + assert [] == result["results"] + + def test_content_metadata_get_paginate(self, swh_storage): + content = data.cont + fetcher = data.metadata_fetcher + authority = data.metadata_authority + content_swhid = f"swh:1:cnt:{content['sha1_git']}" + + swh_storage.metadata_fetcher_add(**fetcher) + swh_storage.metadata_authority_add(**authority) + + swh_storage.content_metadata_add(**data.content_metadata) + swh_storage.content_metadata_add(**data.content_metadata2) + + swh_storage.content_metadata_get(content_swhid, authority) + + result = swh_storage.content_metadata_get(content_swhid, authority, limit=1) + assert result["next_page_token"] is not None + assert [data.content_metadata] == result["results"] + + result = swh_storage.content_metadata_get( + content_swhid, authority, limit=1, page_token=result["next_page_token"] + ) + assert result["next_page_token"] is None + assert [data.content_metadata2] == result["results"] + + def test_content_metadata_get_paginate_same_date(self, swh_storage): + content = data.cont + fetcher1 = data.metadata_fetcher + fetcher2 = data.metadata_fetcher2 + authority = data.metadata_authority + content_swhid = f"swh:1:cnt:{content['sha1_git']}" + + swh_storage.metadata_fetcher_add(**fetcher1) + swh_storage.metadata_fetcher_add(**fetcher2) + swh_storage.metadata_authority_add(**authority) + + content_metadata2 = { + **data.content_metadata2, + "discovery_date": data.content_metadata2["discovery_date"], + "fetcher": {"name": fetcher2["name"], "version": fetcher2["version"],}, + } + + swh_storage.content_metadata_add(**data.content_metadata) + swh_storage.content_metadata_add(**content_metadata2) + + result = swh_storage.content_metadata_get(content_swhid, authority, limit=1) + assert result["next_page_token"] is not None + assert [data.content_metadata] == result["results"] + + result = swh_storage.content_metadata_get( + content_swhid, authority, limit=1, page_token=result["next_page_token"] + ) + assert result["next_page_token"] is None + assert [content_metadata2] == result["results"] + def test_object_find_by_sha1_git(self, swh_storage): sha1_gits = [b"00000000000000000000"] expected = {