Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/cassandra/storage.py
# Copyright (C) 2019-2020 The Software Heritage developers | # Copyright (C) 2019-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import datetime | import datetime | ||||
import itertools | import itertools | ||||
import json | import json | ||||
import random | import random | ||||
import re | import re | ||||
from typing import Any, Dict, List, Iterable, Optional | from typing import Any, Dict, List, Iterable, Optional, Union | ||||
import attr | import attr | ||||
from deprecated import deprecated | from deprecated import deprecated | ||||
from swh.core.api.serializers import msgpack_loads, msgpack_dumps | from swh.core.api.serializers import msgpack_loads, msgpack_dumps | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
Revision, | Revision, | ||||
Release, | Release, | ||||
Directory, | Directory, | ||||
DirectoryEntry, | DirectoryEntry, | ||||
Content, | Content, | ||||
SkippedContent, | SkippedContent, | ||||
OriginVisit, | OriginVisit, | ||||
OriginVisitStatus, | OriginVisitStatus, | ||||
Snapshot, | Snapshot, | ||||
Origin, | Origin, | ||||
) | ) | ||||
from swh.model.hashutil import DEFAULT_ALGORITHMS | from swh.model.hashutil import DEFAULT_ALGORITHMS | ||||
from swh.storage.objstorage import ObjStorage | from swh.storage.objstorage import ObjStorage | ||||
from swh.storage.writer import JournalWriter | from swh.storage.writer import JournalWriter | ||||
from swh.storage.utils import now | from swh.storage.utils import now | ||||
from ..exc import StorageArgumentException, HashCollision | from ..exc import StorageArgumentException, HashCollision | ||||
from ..extrinsic_metadata import check_extrinsic_metadata_context, CONTEXT_KEYS | |||||
from .common import TOKEN_BEGIN, TOKEN_END | from .common import TOKEN_BEGIN, TOKEN_END | ||||
from .converters import ( | from .converters import ( | ||||
revision_to_db, | revision_to_db, | ||||
revision_from_db, | revision_from_db, | ||||
release_to_db, | release_to_db, | ||||
release_from_db, | release_from_db, | ||||
row_to_visit_status, | row_to_visit_status, | ||||
) | ) | ||||
▲ Show 20 Lines • Show All 211 Lines • ▼ Show 20 Lines | class CassandraStorage: | ||||
def content_missing_per_sha1(self, contents): | def content_missing_per_sha1(self, contents): | ||||
return self.content_missing([{"sha1": c for c in contents}]) | return self.content_missing([{"sha1": c for c in contents}]) | ||||
def content_missing_per_sha1_git(self, contents): | def content_missing_per_sha1_git(self, contents): | ||||
return self.content_missing( | return self.content_missing( | ||||
[{"sha1_git": c for c in contents}], key_hash="sha1_git" | [{"sha1_git": c for c in contents}], key_hash="sha1_git" | ||||
) | ) | ||||
def content_metadata_add( | |||||
self, | |||||
id: str, | |||||
context: Dict[str, Union[str, bytes, int]], | |||||
discovery_date: datetime.datetime, | |||||
authority: Dict[str, Any], | |||||
fetcher: Dict[str, Any], | |||||
format: str, | |||||
metadata: bytes, | |||||
) -> None: | |||||
self._object_metadata_add( | |||||
"content", | |||||
id, | |||||
discovery_date, | |||||
authority, | |||||
fetcher, | |||||
format, | |||||
metadata, | |||||
context, | |||||
) | |||||
def content_metadata_get( | |||||
self, | |||||
id: str, | |||||
authority: Dict[str, str], | |||||
after: Optional[datetime.datetime] = None, | |||||
page_token: Optional[bytes] = None, | |||||
limit: int = 1000, | |||||
) -> Dict[str, Any]: | |||||
return self._object_metadata_get( | |||||
"content", id, authority, after, page_token, limit, | |||||
) | |||||
def content_get_random(self): | def content_get_random(self): | ||||
return self._cql_runner.content_get_random().sha1_git | return self._cql_runner.content_get_random().sha1_git | ||||
def _skipped_content_get_from_hash(self, algo, hash_) -> Iterable: | def _skipped_content_get_from_hash(self, algo, hash_) -> Iterable: | ||||
"""From the name of a hash algorithm and a value of that hash, | """From the name of a hash algorithm and a value of that hash, | ||||
looks up the "hash -> token" secondary table | looks up the "hash -> token" secondary table | ||||
(skipped_content_by_{algo}) to get tokens. | (skipped_content_by_{algo}) to get tokens. | ||||
Then, looks up the main table (content) to get all contents with | Then, looks up the main table (content) to get all contents with | ||||
▲ Show 20 Lines • Show All 746 Lines • ▼ Show 20 Lines | def origin_metadata_add( | ||||
fetcher: Dict[str, Any], | fetcher: Dict[str, Any], | ||||
format: str, | format: str, | ||||
metadata: bytes, | metadata: bytes, | ||||
) -> None: | ) -> None: | ||||
if not isinstance(origin_url, str): | if not isinstance(origin_url, str): | ||||
raise StorageArgumentException( | raise StorageArgumentException( | ||||
"origin_id must be str, not %r" % (origin_url,) | "origin_id must be str, not %r" % (origin_url,) | ||||
) | ) | ||||
context: Dict[str, Union[str, bytes, int]] = {} # origins have no context | |||||
self._object_metadata_add( | |||||
"origin", | |||||
origin_url, | |||||
discovery_date, | |||||
authority, | |||||
fetcher, | |||||
format, | |||||
metadata, | |||||
context, | |||||
) | |||||
def origin_metadata_get( | |||||
self, | |||||
origin_url: str, | |||||
authority: Dict[str, str], | |||||
after: Optional[datetime.datetime] = None, | |||||
page_token: Optional[bytes] = None, | |||||
limit: int = 1000, | |||||
) -> Dict[str, Any]: | |||||
if not isinstance(origin_url, str): | |||||
raise TypeError("origin_url must be str, not %r" % (origin_url,)) | |||||
res = self._object_metadata_get( | |||||
"origin", origin_url, authority, after, page_token, limit | |||||
) | |||||
for result in res["results"]: | |||||
result["origin_url"] = result.pop("id") | |||||
return res | |||||
def _object_metadata_add( | |||||
self, | |||||
object_type: str, | |||||
id: str, | |||||
discovery_date: datetime.datetime, | |||||
authority: Dict[str, Any], | |||||
fetcher: Dict[str, Any], | |||||
format: str, | |||||
metadata: bytes, | |||||
context: Dict[str, Union[str, bytes, int]], | |||||
) -> None: | |||||
check_extrinsic_metadata_context(object_type, context) | |||||
if not self._cql_runner.metadata_authority_get(**authority): | if not self._cql_runner.metadata_authority_get(**authority): | ||||
raise StorageArgumentException(f"Unknown authority {authority}") | raise StorageArgumentException(f"Unknown authority {authority}") | ||||
if not self._cql_runner.metadata_fetcher_get(**fetcher): | if not self._cql_runner.metadata_fetcher_get(**fetcher): | ||||
raise StorageArgumentException(f"Unknown fetcher {fetcher}") | raise StorageArgumentException(f"Unknown fetcher {fetcher}") | ||||
try: | try: | ||||
self._cql_runner.origin_metadata_add( | self._cql_runner.object_metadata_add( | ||||
origin_url, | object_type, | ||||
id, | |||||
authority["type"], | authority["type"], | ||||
authority["url"], | authority["url"], | ||||
discovery_date, | discovery_date, | ||||
fetcher["name"], | fetcher["name"], | ||||
fetcher["version"], | fetcher["version"], | ||||
format, | format, | ||||
metadata, | metadata, | ||||
context, | |||||
) | ) | ||||
except TypeError as e: | except TypeError as e: | ||||
raise StorageArgumentException(*e.args) | raise StorageArgumentException(*e.args) | ||||
def origin_metadata_get( | def _object_metadata_get( | ||||
self, | self, | ||||
origin_url: str, | object_type: str, | ||||
id: str, | |||||
authority: Dict[str, str], | authority: Dict[str, str], | ||||
after: Optional[datetime.datetime] = None, | after: Optional[datetime.datetime] = None, | ||||
page_token: Optional[bytes] = None, | page_token: Optional[bytes] = None, | ||||
limit: int = 1000, | limit: int = 1000, | ||||
) -> Dict[str, Any]: | ) -> Dict[str, Any]: | ||||
if not isinstance(origin_url, str): | |||||
raise TypeError("origin_url must be str, not %r" % (origin_url,)) | |||||
if page_token is not None: | if page_token is not None: | ||||
(after_date, after_fetcher_name, after_fetcher_url) = msgpack_loads( | (after_date, after_fetcher_name, after_fetcher_url) = msgpack_loads( | ||||
page_token | page_token | ||||
) | ) | ||||
if after and after_date < after: | if after and after_date < after: | ||||
raise StorageArgumentException( | raise StorageArgumentException( | ||||
"page_token is inconsistent with the value of 'after'." | "page_token is inconsistent with the value of 'after'." | ||||
) | ) | ||||
entries = self._cql_runner.origin_metadata_get_after_date_and_fetcher( | entries = self._cql_runner.object_metadata_get_after_date_and_fetcher( | ||||
origin_url, | id, | ||||
authority["type"], | authority["type"], | ||||
authority["url"], | authority["url"], | ||||
after_date, | after_date, | ||||
after_fetcher_name, | after_fetcher_name, | ||||
after_fetcher_url, | after_fetcher_url, | ||||
) | ) | ||||
elif after is not None: | elif after is not None: | ||||
entries = self._cql_runner.origin_metadata_get_after_date( | entries = self._cql_runner.object_metadata_get_after_date( | ||||
origin_url, authority["type"], authority["url"], after | id, authority["type"], authority["url"], after | ||||
) | ) | ||||
else: | else: | ||||
entries = self._cql_runner.origin_metadata_get( | entries = self._cql_runner.object_metadata_get( | ||||
origin_url, authority["type"], authority["url"] | id, authority["type"], authority["url"] | ||||
) | ) | ||||
if limit: | if limit: | ||||
entries = itertools.islice(entries, 0, limit + 1) | entries = itertools.islice(entries, 0, limit + 1) | ||||
results = [] | results = [] | ||||
for entry in entries: | for entry in entries: | ||||
discovery_date = entry.discovery_date.replace(tzinfo=datetime.timezone.utc) | discovery_date = entry.discovery_date.replace(tzinfo=datetime.timezone.utc) | ||||
results.append( | |||||
{ | result = { | ||||
"origin_url": entry.origin, | "id": entry.id, | ||||
"authority": { | "authority": { | ||||
"type": entry.authority_type, | "type": entry.authority_type, | ||||
"url": entry.authority_url, | "url": entry.authority_url, | ||||
}, | }, | ||||
"fetcher": { | "fetcher": { | ||||
"name": entry.fetcher_name, | "name": entry.fetcher_name, | ||||
"version": entry.fetcher_version, | "version": entry.fetcher_version, | ||||
}, | }, | ||||
"discovery_date": discovery_date, | "discovery_date": discovery_date, | ||||
"format": entry.format, | "format": entry.format, | ||||
"metadata": entry.metadata, | "metadata": entry.metadata, | ||||
} | } | ||||
) | |||||
if CONTEXT_KEYS[object_type]: | |||||
context = {} | |||||
for key in CONTEXT_KEYS[object_type]: | |||||
value = getattr(entry, key) | |||||
if value is not None: | |||||
context[key] = value | |||||
result["context"] = context | |||||
results.append(result) | |||||
if len(results) > limit: | if len(results) > limit: | ||||
results.pop() | results.pop() | ||||
assert len(results) == limit | assert len(results) == limit | ||||
last_result = results[-1] | last_result = results[-1] | ||||
next_page_token: Optional[bytes] = msgpack_dumps( | next_page_token: Optional[bytes] = msgpack_dumps( | ||||
( | ( | ||||
last_result["discovery_date"], | last_result["discovery_date"], | ||||
▲ Show 20 Lines • Show All 52 Lines • Show Last 20 Lines |