Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/cassandra/storage.py
# Copyright (C) 2019-2020 The Software Heritage developers | # Copyright (C) 2019-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import base64 | |||||
import datetime | import datetime | ||||
import itertools | import itertools | ||||
import json | import json | ||||
import random | import random | ||||
import re | import re | ||||
from typing import Any, Dict, List, Iterable, Optional, Tuple, Union | from typing import Any, Dict, List, Iterable, Optional, Tuple, Union | ||||
import attr | import attr | ||||
▲ Show 20 Lines • Show All 1,053 Lines • ▼ Show 20 Lines | |||||
def raw_extrinsic_metadata_get( | def raw_extrinsic_metadata_get( | ||||
self, | self, | ||||
object_type: MetadataTargetType, | object_type: MetadataTargetType, | ||||
id: Union[str, SWHID], | id: Union[str, SWHID], | ||||
authority: MetadataAuthority, | authority: MetadataAuthority, | ||||
after: Optional[datetime.datetime] = None, | after: Optional[datetime.datetime] = None, | ||||
page_token: Optional[bytes] = None, | page_token: Optional[bytes] = None, | ||||
limit: int = 1000, | limit: int = 1000, | ||||
) -> Dict[str, Union[Optional[bytes], List[RawExtrinsicMetadata]]]: | ) -> PagedResult[RawExtrinsicMetadata]: | ||||
if object_type == MetadataTargetType.ORIGIN: | if object_type == MetadataTargetType.ORIGIN: | ||||
if isinstance(id, SWHID): | if isinstance(id, SWHID): | ||||
raise StorageArgumentException( | raise StorageArgumentException( | ||||
f"raw_extrinsic_metadata_get called with object_type='origin', " | f"raw_extrinsic_metadata_get called with object_type='origin', " | ||||
f"but provided id is an SWHID: {id!r}" | f"but provided id is an SWHID: {id!r}" | ||||
) | ) | ||||
else: | else: | ||||
if not isinstance(id, SWHID): | if not isinstance(id, SWHID): | ||||
raise StorageArgumentException( | raise StorageArgumentException( | ||||
f"raw_extrinsic_metadata_get called with object_type!='origin', " | f"raw_extrinsic_metadata_get called with object_type!='origin', " | ||||
f"but provided id is not an SWHID: {id!r}" | f"but provided id is not an SWHID: {id!r}" | ||||
) | ) | ||||
if page_token is not None: | if page_token is not None: | ||||
(after_date, after_fetcher_name, after_fetcher_url) = msgpack_loads( | (after_date, after_fetcher_name, after_fetcher_url) = msgpack_loads( | ||||
page_token | base64.b64decode(page_token) | ||||
) | ) | ||||
if after and after_date < after: | if after and after_date < after: | ||||
raise StorageArgumentException( | raise StorageArgumentException( | ||||
"page_token is inconsistent with the value of 'after'." | "page_token is inconsistent with the value of 'after'." | ||||
) | ) | ||||
entries = self._cql_runner.raw_extrinsic_metadata_get_after_date_and_fetcher( # noqa | entries = self._cql_runner.raw_extrinsic_metadata_get_after_date_and_fetcher( # noqa | ||||
str(id), | str(id), | ||||
authority.type.value, | authority.type.value, | ||||
▲ Show 20 Lines • Show All 43 Lines • ▼ Show 20 Lines | ) -> PagedResult[RawExtrinsicMetadata]: | ||||
) | ) | ||||
results.append(result) | results.append(result) | ||||
if len(results) > limit: | if len(results) > limit: | ||||
results.pop() | results.pop() | ||||
assert len(results) == limit | assert len(results) == limit | ||||
last_result = results[-1] | last_result = results[-1] | ||||
next_page_token: Optional[bytes] = msgpack_dumps( | next_page_token: Optional[str] = base64.b64encode( | ||||
msgpack_dumps( | |||||
( | ( | ||||
last_result.discovery_date, | last_result.discovery_date, | ||||
last_result.fetcher.name, | last_result.fetcher.name, | ||||
last_result.fetcher.version, | last_result.fetcher.version, | ||||
) | ) | ||||
) | ) | ||||
).decode() | |||||
else: | else: | ||||
next_page_token = None | next_page_token = None | ||||
return { | return PagedResult(next_page_token=next_page_token, results=results,) | ||||
"next_page_token": next_page_token, | |||||
"results": results, | |||||
} | |||||
def metadata_fetcher_add(self, fetchers: List[MetadataFetcher]) -> None: | def metadata_fetcher_add(self, fetchers: List[MetadataFetcher]) -> None: | ||||
self.journal_writer.metadata_fetcher_add(fetchers) | self.journal_writer.metadata_fetcher_add(fetchers) | ||||
for fetcher in fetchers: | for fetcher in fetchers: | ||||
self._cql_runner.metadata_fetcher_add( | self._cql_runner.metadata_fetcher_add( | ||||
fetcher.name, | fetcher.name, | ||||
fetcher.version, | fetcher.version, | ||||
json.dumps(map_optional(dict, fetcher.metadata)), | json.dumps(map_optional(dict, fetcher.metadata)), | ||||
▲ Show 20 Lines • Show All 45 Lines • Show Last 20 Lines |