Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/cassandra/storage.py
# Copyright (C) 2019-2020 The Software Heritage developers | # Copyright (C) 2019-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import datetime | import datetime | ||||
import itertools | import itertools | ||||
import json | import json | ||||
import random | import random | ||||
import re | import re | ||||
from typing import Any, Dict, List, Iterable, Optional, Union | from typing import Any, Dict, List, Iterable, Optional, Union | ||||
import attr | import attr | ||||
from deprecated import deprecated | from deprecated import deprecated | ||||
from swh.core.api.serializers import msgpack_loads, msgpack_dumps | from swh.core.api.serializers import msgpack_loads, msgpack_dumps | ||||
from swh.model.identifiers import parse_swhid, SWHID | |||||
from swh.model.hashutil import DEFAULT_ALGORITHMS | |||||
from swh.model.model import ( | from swh.model.model import ( | ||||
Revision, | Revision, | ||||
Release, | Release, | ||||
Directory, | Directory, | ||||
DirectoryEntry, | DirectoryEntry, | ||||
Content, | Content, | ||||
SkippedContent, | SkippedContent, | ||||
OriginVisit, | OriginVisit, | ||||
OriginVisitStatus, | OriginVisitStatus, | ||||
Snapshot, | Snapshot, | ||||
Origin, | Origin, | ||||
MetadataAuthority, | |||||
MetadataAuthorityType, | |||||
MetadataFetcher, | |||||
MetadataTargetType, | |||||
RawExtrinsicMetadata, | |||||
) | ) | ||||
from swh.model.hashutil import DEFAULT_ALGORITHMS | |||||
from swh.storage.objstorage import ObjStorage | from swh.storage.objstorage import ObjStorage | ||||
from swh.storage.writer import JournalWriter | from swh.storage.writer import JournalWriter | ||||
from swh.storage.utils import now | from swh.storage.utils import map_optional, now | ||||
from ..exc import StorageArgumentException, HashCollision | from ..exc import StorageArgumentException, HashCollision | ||||
from ..extrinsic_metadata import check_extrinsic_metadata_context, CONTEXT_KEYS | |||||
from .common import TOKEN_BEGIN, TOKEN_END | from .common import TOKEN_BEGIN, TOKEN_END | ||||
from .converters import ( | from .converters import ( | ||||
revision_to_db, | revision_to_db, | ||||
revision_from_db, | revision_from_db, | ||||
release_to_db, | release_to_db, | ||||
release_from_db, | release_from_db, | ||||
row_to_visit_status, | row_to_visit_status, | ||||
) | ) | ||||
▲ Show 20 Lines • Show All 960 Lines • ▼ Show 20 Lines | def stat_counters(self): | ||||
) | ) | ||||
stats = {key: 0 for key in keys} | stats = {key: 0 for key in keys} | ||||
stats.update({row.object_type: row.count for row in rows}) | stats.update({row.object_type: row.count for row in rows}) | ||||
return stats | return stats | ||||
def refresh_stat_counters(self): | def refresh_stat_counters(self): | ||||
pass | pass | ||||
def content_metadata_add( | def object_metadata_add(self, metadata: Iterable[RawExtrinsicMetadata]) -> None: | ||||
self, | for metadata_entry in metadata: | ||||
id: str, | if not self._cql_runner.metadata_authority_get( | ||||
context: Dict[str, Union[str, bytes, int]], | metadata_entry.authority.type.value, metadata_entry.authority.url | ||||
discovery_date: datetime.datetime, | ): | ||||
authority: Dict[str, Any], | |||||
fetcher: Dict[str, Any], | |||||
format: str, | |||||
metadata: bytes, | |||||
) -> None: | |||||
self._object_metadata_add( | |||||
"content", | |||||
id, | |||||
discovery_date, | |||||
authority, | |||||
fetcher, | |||||
format, | |||||
metadata, | |||||
context, | |||||
) | |||||
def content_metadata_get( | |||||
self, | |||||
id: str, | |||||
authority: Dict[str, str], | |||||
after: Optional[datetime.datetime] = None, | |||||
page_token: Optional[bytes] = None, | |||||
limit: int = 1000, | |||||
) -> Dict[str, Any]: | |||||
return self._object_metadata_get( | |||||
"content", id, authority, after, page_token, limit, | |||||
) | |||||
def origin_metadata_add( | |||||
self, | |||||
origin_url: str, | |||||
discovery_date: datetime.datetime, | |||||
authority: Dict[str, Any], | |||||
fetcher: Dict[str, Any], | |||||
format: str, | |||||
metadata: bytes, | |||||
) -> None: | |||||
if not isinstance(origin_url, str): | |||||
raise StorageArgumentException( | raise StorageArgumentException( | ||||
"origin_url must be str, not %r" % (origin_url,) | f"Unknown authority {metadata_entry.authority}" | ||||
) | |||||
context: Dict[str, Union[str, bytes, int]] = {} # origins have no context | |||||
self._object_metadata_add( | |||||
"origin", | |||||
origin_url, | |||||
discovery_date, | |||||
authority, | |||||
fetcher, | |||||
format, | |||||
metadata, | |||||
context, | |||||
) | ) | ||||
if not self._cql_runner.metadata_fetcher_get( | |||||
def origin_metadata_get( | metadata_entry.fetcher.name, metadata_entry.fetcher.version | ||||
self, | ): | ||||
origin_url: str, | raise StorageArgumentException( | ||||
authority: Dict[str, str], | f"Unknown fetcher {metadata_entry.fetcher}" | ||||
after: Optional[datetime.datetime] = None, | |||||
page_token: Optional[bytes] = None, | |||||
limit: int = 1000, | |||||
) -> Dict[str, Any]: | |||||
if not isinstance(origin_url, str): | |||||
raise TypeError("origin_url must be str, not %r" % (origin_url,)) | |||||
res = self._object_metadata_get( | |||||
"origin", origin_url, authority, after, page_token, limit | |||||
) | ) | ||||
for result in res["results"]: | |||||
result["origin_url"] = result.pop("id") | |||||
return res | |||||
def _object_metadata_add( | |||||
self, | |||||
object_type: str, | |||||
id: str, | |||||
discovery_date: datetime.datetime, | |||||
authority: Dict[str, Any], | |||||
fetcher: Dict[str, Any], | |||||
format: str, | |||||
metadata: bytes, | |||||
context: Dict[str, Union[str, bytes, int]], | |||||
) -> None: | |||||
check_extrinsic_metadata_context(object_type, context) | |||||
if not self._cql_runner.metadata_authority_get(**authority): | |||||
raise StorageArgumentException(f"Unknown authority {authority}") | |||||
if not self._cql_runner.metadata_fetcher_get(**fetcher): | |||||
raise StorageArgumentException(f"Unknown fetcher {fetcher}") | |||||
try: | try: | ||||
self._cql_runner.object_metadata_add( | self._cql_runner.object_metadata_add( | ||||
object_type, | type=metadata_entry.type.value, | ||||
id, | id=str(metadata_entry.id), | ||||
authority["type"], | authority_type=metadata_entry.authority.type.value, | ||||
authority["url"], | authority_url=metadata_entry.authority.url, | ||||
discovery_date, | discovery_date=metadata_entry.discovery_date, | ||||
fetcher["name"], | fetcher_name=metadata_entry.fetcher.name, | ||||
fetcher["version"], | fetcher_version=metadata_entry.fetcher.version, | ||||
format, | format=metadata_entry.format, | ||||
metadata, | metadata=metadata_entry.metadata, | ||||
context, | origin=metadata_entry.origin, | ||||
visit=metadata_entry.visit, | |||||
snapshot=map_optional(str, metadata_entry.snapshot), | |||||
release=map_optional(str, metadata_entry.release), | |||||
revision=map_optional(str, metadata_entry.revision), | |||||
path=metadata_entry.path, | |||||
directory=map_optional(str, metadata_entry.directory), | |||||
) | ) | ||||
except TypeError as e: | except TypeError as e: | ||||
raise StorageArgumentException(*e.args) | raise StorageArgumentException(*e.args) | ||||
def _object_metadata_get( | def object_metadata_get( | ||||
self, | self, | ||||
object_type: str, | object_type: MetadataTargetType, | ||||
id: str, | id: Union[str, SWHID], | ||||
authority: Dict[str, str], | authority: MetadataAuthority, | ||||
after: Optional[datetime.datetime] = None, | after: Optional[datetime.datetime] = None, | ||||
page_token: Optional[bytes] = None, | page_token: Optional[bytes] = None, | ||||
limit: int = 1000, | limit: int = 1000, | ||||
) -> Dict[str, Any]: | ) -> Dict[str, Union[Optional[bytes], List[RawExtrinsicMetadata]]]: | ||||
if object_type == MetadataTargetType.ORIGIN: | |||||
if isinstance(id, SWHID): | |||||
raise StorageArgumentException( | |||||
f"object_metadata_get called with object_type='origin', but " | |||||
ardumont: missing test case.
(Those are interesting to test nonetheless for at least can ensuring the… | |||||
Done Inline Actions... for at least we can ensure* ... ardumont: ... for at least we can ensure* ...
| |||||
f"provided id is an SWHID: {id!r}" | |||||
) | |||||
else: | |||||
if not isinstance(id, SWHID): | |||||
raise StorageArgumentException( | |||||
Done Inline Actionssame. ardumont: same. | |||||
f"object_metadata_get called with object_type!='origin', but " | |||||
f"provided id is not an SWHID: {id!r}" | |||||
) | |||||
if page_token is not None: | if page_token is not None: | ||||
(after_date, after_fetcher_name, after_fetcher_url) = msgpack_loads( | (after_date, after_fetcher_name, after_fetcher_url) = msgpack_loads( | ||||
page_token | page_token | ||||
) | ) | ||||
if after and after_date < after: | if after and after_date < after: | ||||
raise StorageArgumentException( | raise StorageArgumentException( | ||||
Not Done Inline ActionsIs that possible? Won't that be caught earlier in clients code when they try to instantiate the model objects? And if is possible, can you please add tests to it? ;) ardumont: Is that possible?
Won't that be caught earlier in clients code when they try to instantiate… | |||||
Done Inline ActionsI don't think it's possible. vlorentz: I don't think it's possible. | |||||
"page_token is inconsistent with the value of 'after'." | "page_token is inconsistent with the value of 'after'." | ||||
) | ) | ||||
entries = self._cql_runner.object_metadata_get_after_date_and_fetcher( | entries = self._cql_runner.object_metadata_get_after_date_and_fetcher( | ||||
id, | str(id), | ||||
authority["type"], | authority.type.value, | ||||
authority["url"], | authority.url, | ||||
after_date, | after_date, | ||||
after_fetcher_name, | after_fetcher_name, | ||||
after_fetcher_url, | after_fetcher_url, | ||||
) | ) | ||||
elif after is not None: | elif after is not None: | ||||
entries = self._cql_runner.object_metadata_get_after_date( | entries = self._cql_runner.object_metadata_get_after_date( | ||||
id, authority["type"], authority["url"], after | str(id), authority.type.value, authority.url, after | ||||
) | ) | ||||
else: | else: | ||||
entries = self._cql_runner.object_metadata_get( | entries = self._cql_runner.object_metadata_get( | ||||
id, authority["type"], authority["url"] | str(id), authority.type.value, authority.url | ||||
) | ) | ||||
if limit: | if limit: | ||||
entries = itertools.islice(entries, 0, limit + 1) | entries = itertools.islice(entries, 0, limit + 1) | ||||
results = [] | results = [] | ||||
for entry in entries: | for entry in entries: | ||||
discovery_date = entry.discovery_date.replace(tzinfo=datetime.timezone.utc) | discovery_date = entry.discovery_date.replace(tzinfo=datetime.timezone.utc) | ||||
result = { | assert str(id) == entry.id | ||||
"id": entry.id, | |||||
"authority": { | |||||
"type": entry.authority_type, | |||||
"url": entry.authority_url, | |||||
}, | |||||
"fetcher": { | |||||
"name": entry.fetcher_name, | |||||
"version": entry.fetcher_version, | |||||
}, | |||||
"discovery_date": discovery_date, | |||||
"format": entry.format, | |||||
"metadata": entry.metadata, | |||||
} | |||||
if CONTEXT_KEYS[object_type]: | result = RawExtrinsicMetadata( | ||||
context = {} | type=MetadataTargetType(entry.type), | ||||
for key in CONTEXT_KEYS[object_type]: | id=id, | ||||
value = getattr(entry, key) | authority=MetadataAuthority( | ||||
if value is not None: | type=MetadataAuthorityType(entry.authority_type), | ||||
context[key] = value | url=entry.authority_url, | ||||
result["context"] = context | ), | ||||
fetcher=MetadataFetcher( | |||||
name=entry.fetcher_name, version=entry.fetcher_version, | |||||
), | |||||
discovery_date=discovery_date, | |||||
format=entry.format, | |||||
metadata=entry.metadata, | |||||
origin=entry.origin, | |||||
visit=entry.visit, | |||||
snapshot=map_optional(parse_swhid, entry.snapshot), | |||||
release=map_optional(parse_swhid, entry.release), | |||||
revision=map_optional(parse_swhid, entry.revision), | |||||
path=entry.path, | |||||
directory=map_optional(parse_swhid, entry.directory), | |||||
) | |||||
results.append(result) | results.append(result) | ||||
if len(results) > limit: | if len(results) > limit: | ||||
results.pop() | results.pop() | ||||
assert len(results) == limit | assert len(results) == limit | ||||
last_result = results[-1] | last_result = results[-1] | ||||
next_page_token: Optional[bytes] = msgpack_dumps( | next_page_token: Optional[bytes] = msgpack_dumps( | ||||
( | ( | ||||
last_result["discovery_date"], | last_result.discovery_date, | ||||
last_result["fetcher"]["name"], | last_result.fetcher.name, | ||||
last_result["fetcher"]["version"], | last_result.fetcher.version, | ||||
) | ) | ||||
) | ) | ||||
else: | else: | ||||
next_page_token = None | next_page_token = None | ||||
return { | return { | ||||
"next_page_token": next_page_token, | "next_page_token": next_page_token, | ||||
"results": results, | "results": results, | ||||
} | } | ||||
def metadata_fetcher_add( | def metadata_fetcher_add(self, fetchers: Iterable[MetadataFetcher]) -> None: | ||||
self, name: str, version: str, metadata: Dict[str, Any] | for fetcher in fetchers: | ||||
) -> None: | self._cql_runner.metadata_fetcher_add( | ||||
self._cql_runner.metadata_fetcher_add(name, version, json.dumps(metadata)) | fetcher.name, | ||||
fetcher.version, | |||||
json.dumps(map_optional(dict, fetcher.metadata)), | |||||
) | |||||
def metadata_fetcher_get(self, name: str, version: str) -> Optional[Dict[str, Any]]: | def metadata_fetcher_get( | ||||
self, name: str, version: str | |||||
) -> Optional[MetadataFetcher]: | |||||
fetcher = self._cql_runner.metadata_fetcher_get(name, version) | fetcher = self._cql_runner.metadata_fetcher_get(name, version) | ||||
if fetcher: | if fetcher: | ||||
return { | return MetadataFetcher( | ||||
"name": fetcher.name, | name=fetcher.name, | ||||
"version": fetcher.version, | version=fetcher.version, | ||||
"metadata": json.loads(fetcher.metadata), | metadata=json.loads(fetcher.metadata), | ||||
} | ) | ||||
else: | else: | ||||
return None | return None | ||||
def metadata_authority_add( | def metadata_authority_add(self, authorities: Iterable[MetadataAuthority]) -> None: | ||||
self, type: str, url: str, metadata: Dict[str, Any] | for authority in authorities: | ||||
) -> None: | self._cql_runner.metadata_authority_add( | ||||
self._cql_runner.metadata_authority_add(url, type, json.dumps(metadata)) | authority.url, | ||||
authority.type.value, | |||||
json.dumps(map_optional(dict, authority.metadata)), | |||||
) | |||||
def metadata_authority_get(self, type: str, url: str) -> Optional[Dict[str, Any]]: | def metadata_authority_get( | ||||
authority = self._cql_runner.metadata_authority_get(type, url) | self, type: MetadataAuthorityType, url: str | ||||
) -> Optional[MetadataAuthority]: | |||||
authority = self._cql_runner.metadata_authority_get(type.value, url) | |||||
if authority: | if authority: | ||||
return { | return MetadataAuthority( | ||||
"type": authority.type, | type=MetadataAuthorityType(authority.type), | ||||
"url": authority.url, | url=authority.url, | ||||
"metadata": json.loads(authority.metadata), | metadata=json.loads(authority.metadata), | ||||
} | ) | ||||
else: | else: | ||||
return None | return None | ||||
def clear_buffers(self, object_types: Optional[Iterable[str]] = None) -> None: | def clear_buffers(self, object_types: Optional[Iterable[str]] = None) -> None: | ||||
"""Do nothing | """Do nothing | ||||
""" | """ | ||||
return None | return None | ||||
def flush(self, object_types: Optional[Iterable[str]] = None) -> Dict: | def flush(self, object_types: Optional[Iterable[str]] = None) -> Dict: | ||||
return {} | return {} |
missing test case.
(Those are interesting to test nonetheless for at least can ensuring the message is correctly readable)