Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/cassandra/storage.py
# Copyright (C) 2019-2020 The Software Heritage developers | # Copyright (C) 2019-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import base64 | import base64 | ||||
import datetime | import datetime | ||||
import itertools | import itertools | ||||
import json | import json | ||||
import random | import random | ||||
import re | import re | ||||
from typing import Any, Dict, List, Iterable, Optional, Tuple, Union | from typing import Any, Dict, List, Iterable, Optional, Set, Tuple, Union | ||||
import attr | import attr | ||||
from swh.core.api.serializers import msgpack_loads, msgpack_dumps | from swh.core.api.serializers import msgpack_loads, msgpack_dumps | ||||
from swh.model.identifiers import parse_swhid, SWHID | from swh.model.identifiers import parse_swhid, SWHID | ||||
from swh.model.hashutil import DEFAULT_ALGORITHMS | from swh.model.hashutil import DEFAULT_ALGORITHMS | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
Revision, | Revision, | ||||
▲ Show 20 Lines • Show All 436 Lines • ▼ Show 20 Lines | def revision_add(self, revisions: List[Revision]) -> Dict: | ||||
# Then write the main revision row. | # Then write the main revision row. | ||||
# Writing this after all parents were written ensures that | # Writing this after all parents were written ensures that | ||||
# read endpoints don't return a partial view while writing | # read endpoints don't return a partial view while writing | ||||
# the parents | # the parents | ||||
self._cql_runner.revision_add_one(revobject) | self._cql_runner.revision_add_one(revobject) | ||||
return {"revision:add": len(revisions)} | return {"revision:add": len(revisions)} | ||||
def revision_missing(self, revisions): | def revision_missing(self, revisions: List[Sha1Git]) -> Iterable[Sha1Git]: | ||||
return self._cql_runner.revision_missing(revisions) | return self._cql_runner.revision_missing(revisions) | ||||
def revision_get(self, revisions): | def revision_get( | ||||
self, revisions: List[Sha1Git] | |||||
) -> Iterable[Optional[Dict[str, Any]]]: | |||||
rows = self._cql_runner.revision_get(revisions) | rows = self._cql_runner.revision_get(revisions) | ||||
revs = {} | revs = {} | ||||
for row in rows: | for row in rows: | ||||
# TODO: use a single query to get all parents? | # TODO: use a single query to get all parents? | ||||
# (it might have lower latency, but requires more code and more | # (it might have lower latency, but requires more code and more | ||||
# bandwidth, because revision id would be part of each returned | # bandwidth, because revision id would be part of each returned | ||||
# row) | # row) | ||||
parent_rows = self._cql_runner.revision_parent_get(row.id) | parent_rows = self._cql_runner.revision_parent_get(row.id) | ||||
# parent_rank is the clustering key, so results are already | # parent_rank is the clustering key, so results are already | ||||
# sorted by rank. | # sorted by rank. | ||||
parents = tuple(row.parent_id for row in parent_rows) | parents = tuple(row.parent_id for row in parent_rows) | ||||
rev = converters.revision_from_db(row, parents=parents) | rev = converters.revision_from_db(row, parents=parents) | ||||
revs[rev.id] = rev.to_dict() | revs[rev.id] = rev.to_dict() | ||||
for rev_id in revisions: | for rev_id in revisions: | ||||
yield revs.get(rev_id) | yield revs.get(rev_id) | ||||
def _get_parent_revs(self, rev_ids, seen, limit, short): | def _get_parent_revs( | ||||
self, | |||||
rev_ids: Iterable[Sha1Git], | |||||
seen: Set[Sha1Git], | |||||
limit: Optional[int], | |||||
short: bool, | |||||
) -> Union[ | |||||
Iterable[Dict[str, Any]], Iterable[Tuple[Sha1Git, Tuple[Sha1Git, ...]]], | |||||
]: | |||||
if limit and len(seen) >= limit: | if limit and len(seen) >= limit: | ||||
return | return | ||||
rev_ids = [id_ for id_ in rev_ids if id_ not in seen] | rev_ids = [id_ for id_ in rev_ids if id_ not in seen] | ||||
if not rev_ids: | if not rev_ids: | ||||
return | return | ||||
seen |= set(rev_ids) | seen |= set(rev_ids) | ||||
# We need this query, even if short=True, to return consistent | # We need this query, even if short=True, to return consistent | ||||
Show All 17 Lines | ]: | ||||
if short: | if short: | ||||
yield (row.id, parents) | yield (row.id, parents) | ||||
else: | else: | ||||
rev = converters.revision_from_db(row, parents=parents) | rev = converters.revision_from_db(row, parents=parents) | ||||
yield rev.to_dict() | yield rev.to_dict() | ||||
yield from self._get_parent_revs(parents, seen, limit, short) | yield from self._get_parent_revs(parents, seen, limit, short) | ||||
def revision_log(self, revisions, limit=None): | def revision_log( | ||||
seen = set() | self, revisions: List[Sha1Git], limit: Optional[int] = None | ||||
ardumont: i'm a bit confused here.
the pg-storage returns Optional[Dict[str, Any]] but both cass and in… | |||||
Not Done Inline ActionsYes, indeed. The current type of _get_parent_revs is incorrect because it sometimes returns None. vlorentz: Yes, indeed. The current type of `_get_parent_revs` is incorrect because it sometimes returns… | |||||
) -> Iterable[Dict[str, Any]]: | |||||
seen: Set[Sha1Git] = set() | |||||
yield from self._get_parent_revs(revisions, seen, limit, False) | yield from self._get_parent_revs(revisions, seen, limit, False) | ||||
def revision_shortlog(self, revisions, limit=None): | def revision_shortlog( | ||||
seen = set() | self, revisions: List[Sha1Git], limit: Optional[int] = None | ||||
) -> Iterable[Tuple[Sha1Git, Tuple[Sha1Git, ...]]]: | |||||
seen: Set[Sha1Git] = set() | |||||
yield from self._get_parent_revs(revisions, seen, limit, True) | yield from self._get_parent_revs(revisions, seen, limit, True) | ||||
def revision_get_random(self) -> Sha1Git: | def revision_get_random(self) -> Sha1Git: | ||||
return self._cql_runner.revision_get_random().id | return self._cql_runner.revision_get_random().id | ||||
def release_add(self, releases: List[Release]) -> Dict: | def release_add(self, releases: List[Release]) -> Dict: | ||||
to_add = [] | to_add = [] | ||||
for rel in releases: | for rel in releases: | ||||
▲ Show 20 Lines • Show All 714 Lines • Show Last 20 Lines |
i'm a bit confused here.
the pg-storage returns Optional[Dict[str, Any]] but both cass and in-memory don't.
I think those are wrong because they assume the input revisions exist but they could be inexistent.
I think pg-storage implementation is correct because of that.