Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/cassandra/storage.py
# Copyright (C) 2019-2020 The Software Heritage developers | # Copyright (C) 2019-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import datetime | import datetime | ||||
import itertools | import itertools | ||||
import json | import json | ||||
import random | import random | ||||
import re | import re | ||||
from typing import Any, Dict, List, Iterable, Optional, Union | from typing import Any, Dict, List, Iterable, Optional, Tuple, Union | ||||
import attr | import attr | ||||
from swh.core.api.serializers import msgpack_loads, msgpack_dumps | from swh.core.api.serializers import msgpack_loads, msgpack_dumps | ||||
from swh.model.identifiers import parse_swhid, SWHID | from swh.model.identifiers import parse_swhid, SWHID | ||||
from swh.model.hashutil import DEFAULT_ALGORITHMS | from swh.model.hashutil import DEFAULT_ALGORITHMS | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
Revision, | Revision, | ||||
Show All 13 Lines | from swh.model.model import ( | ||||
RawExtrinsicMetadata, | RawExtrinsicMetadata, | ||||
) | ) | ||||
from swh.storage.objstorage import ObjStorage | from swh.storage.objstorage import ObjStorage | ||||
from swh.storage.writer import JournalWriter | from swh.storage.writer import JournalWriter | ||||
from swh.storage.utils import map_optional, now | from swh.storage.utils import map_optional, now | ||||
from ..exc import StorageArgumentException, HashCollision | from ..exc import StorageArgumentException, HashCollision | ||||
from .common import TOKEN_BEGIN, TOKEN_END | from .common import TOKEN_BEGIN, TOKEN_END | ||||
from .converters import ( | from .converters import ( | ||||
revision_to_db, | revision_to_db, | ||||
revision_from_db, | revision_from_db, | ||||
release_to_db, | release_to_db, | ||||
release_from_db, | release_from_db, | ||||
row_to_visit, | |||||
row_to_visit_status, | row_to_visit_status, | ||||
) | ) | ||||
vlorentz: considering how many converters we are going to have, I suggest we use `from . import… | |||||
from .cql import CqlRunner | from .cql import CqlRunner | ||||
from .schema import HASH_ALGORITHMS | from .schema import HASH_ALGORITHMS | ||||
# Max block size of contents to return | # Max block size of contents to return | ||||
BULK_BLOCK_CONTENT_LEN_MAX = 10000 | BULK_BLOCK_CONTENT_LEN_MAX = 10000 | ||||
▲ Show 20 Lines • Show All 796 Lines • ▼ Show 20 Lines | def _origin_visit_apply_last_status(self, visit: Dict[str, Any]) -> Dict[str, Any]: | ||||
**visit_status.to_dict(), | **visit_status.to_dict(), | ||||
# visit['origin'] is the URL (via a join), while | # visit['origin'] is the URL (via a join), while | ||||
# visit_status['origin'] is only an id. | # visit_status['origin'] is only an id. | ||||
"origin": visit["origin"], | "origin": visit["origin"], | ||||
# but keep the date of the creation of the origin visit | # but keep the date of the creation of the origin visit | ||||
"date": visit["date"], | "date": visit["date"], | ||||
} | } | ||||
def _origin_visit_get_latest_status(self, visit: OriginVisit) -> OriginVisitStatus: | |||||
"""Retrieve the latest visit status information for the origin visit. | |||||
Then merge it with the visit and return it. | |||||
""" | |||||
row = self._cql_runner.origin_visit_status_get_latest(visit.origin, visit.visit) | |||||
assert row is not None | |||||
visit_status = row_to_visit_status(row) | |||||
return attr.evolve(visit_status, origin=visit.origin) | |||||
def _origin_visit_get_updated(self, origin: str, visit_id: int) -> Dict[str, Any]: | def _origin_visit_get_updated(self, origin: str, visit_id: int) -> Dict[str, Any]: | ||||
"""Retrieve origin visit and latest origin visit status and merge them | """Retrieve origin visit and latest origin visit status and merge them | ||||
into an origin visit. | into an origin visit. | ||||
""" | """ | ||||
row_visit = self._cql_runner.origin_visit_get_one(origin, visit_id) | row_visit = self._cql_runner.origin_visit_get_one(origin, visit_id) | ||||
assert row_visit is not None | assert row_visit is not None | ||||
visit = self._format_origin_visit_row(row_visit) | visit = self._format_origin_visit_row(row_visit) | ||||
Done Inline ActionsI initially thought that this missing coverage was a lie but apparently that's dead code now! ardumont: I initially thought that this missing coverage was a lie but apparently that's dead code now! | |||||
Done Inline Actionsdone ardumont: done | |||||
return self._origin_visit_apply_last_status(visit) | return self._origin_visit_apply_last_status(visit) | ||||
@staticmethod | @staticmethod | ||||
def _format_origin_visit_row(visit): | def _format_origin_visit_row(visit): | ||||
return { | return { | ||||
**visit._asdict(), | **visit._asdict(), | ||||
"origin": visit.origin, | "origin": visit.origin, | ||||
"date": visit.date.replace(tzinfo=datetime.timezone.utc), | "date": visit.date.replace(tzinfo=datetime.timezone.utc), | ||||
▲ Show 20 Lines • Show All 82 Lines • ▼ Show 20 Lines | ) -> Optional[OriginVisitStatus]: | ||||
if allowed_statuses: | if allowed_statuses: | ||||
rows = [row for row in rows if row.status in allowed_statuses] | rows = [row for row in rows if row.status in allowed_statuses] | ||||
if require_snapshot: | if require_snapshot: | ||||
rows = [row for row in rows if row.snapshot is not None] | rows = [row for row in rows if row.snapshot is not None] | ||||
if not rows: | if not rows: | ||||
return None | return None | ||||
return row_to_visit_status(rows[0]) | return row_to_visit_status(rows[0]) | ||||
def origin_visit_get_random(self, type: str) -> Optional[Dict[str, Any]]: | def origin_visit_get_random( | ||||
self, type: str | |||||
) -> Optional[Tuple[OriginVisit, OriginVisitStatus]]: | |||||
back_in_the_day = now() - datetime.timedelta(weeks=12) # 3 months back | back_in_the_day = now() - datetime.timedelta(weeks=12) # 3 months back | ||||
# Random position to start iteration at | # Random position to start iteration at | ||||
start_token = random.randint(TOKEN_BEGIN, TOKEN_END) | start_token = random.randint(TOKEN_BEGIN, TOKEN_END) | ||||
# Iterator over all visits, ordered by token(origins) then visit_id | # Iterator over all visits, ordered by token(origins) then visit_id | ||||
rows = self._cql_runner.origin_visit_iter(start_token) | rows = self._cql_runner.origin_visit_iter(start_token) | ||||
for row in rows: | for row in rows: | ||||
visit = self._format_origin_visit_row(row) | visit = row_to_visit(row) | ||||
visit_status = self._origin_visit_apply_last_status(visit) | visit_status = self._origin_visit_get_latest_status(visit) | ||||
if ( | if visit.date > back_in_the_day and visit_status.status == "full": | ||||
visit_status["date"] > back_in_the_day | return visit, visit_status | ||||
and visit_status["status"] == "full" | |||||
): | |||||
return visit_status | |||||
else: | |||||
return None | return None | ||||
def stat_counters(self): | def stat_counters(self): | ||||
rows = self._cql_runner.stat_counters() | rows = self._cql_runner.stat_counters() | ||||
keys = ( | keys = ( | ||||
"content", | "content", | ||||
"directory", | "directory", | ||||
"origin", | "origin", | ||||
"origin_visit", | "origin_visit", | ||||
▲ Show 20 Lines • Show All 198 Lines • Show Last 20 Lines |
considering how many converters we are going to have, I suggest we use from . import converters instead.
And converters are not involved in long expressions anyway, so it won't be an issue for readability.