Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/cassandra/storage.py
Show All 31 Lines | |||||
from ..exc import StorageArgumentException, HashCollision | from ..exc import StorageArgumentException, HashCollision | ||||
from .common import TOKEN_BEGIN, TOKEN_END | from .common import TOKEN_BEGIN, TOKEN_END | ||||
from .converters import ( | from .converters import ( | ||||
revision_to_db, | revision_to_db, | ||||
revision_from_db, | revision_from_db, | ||||
release_to_db, | release_to_db, | ||||
release_from_db, | release_from_db, | ||||
row_to_visit_status, | |||||
) | ) | ||||
from .cql import CqlRunner | from .cql import CqlRunner | ||||
from .schema import HASH_ALGORITHMS | from .schema import HASH_ALGORITHMS | ||||
# Max block size of contents to return | # Max block size of contents to return | ||||
BULK_BLOCK_CONTENT_LEN_MAX = 10000 | BULK_BLOCK_CONTENT_LEN_MAX = 10000 | ||||
▲ Show 20 Lines • Show All 785 Lines • ▼ Show 20 Lines | ) -> None: | ||||
origin_url = self.origin_get({"url": visit_status.origin}) | origin_url = self.origin_get({"url": visit_status.origin}) | ||||
if not origin_url: | if not origin_url: | ||||
raise StorageArgumentException(f"Unknown origin {visit_status.origin}") | raise StorageArgumentException(f"Unknown origin {visit_status.origin}") | ||||
for visit_status in visit_statuses: | for visit_status in visit_statuses: | ||||
self._origin_visit_status_add(visit_status) | self._origin_visit_status_add(visit_status) | ||||
def _origin_visit_merge( | def _origin_visit_merge( | ||||
self, visit: Dict[str, Any], visit_status: Dict[str, Any] | self, visit: Dict[str, Any], visit_status: OriginVisitStatus, | ||||
) -> Dict[str, Any]: | ) -> Dict[str, Any]: | ||||
"""Merge origin_visit and visit_status together. | """Merge origin_visit and visit_status together. | ||||
""" | """ | ||||
return OriginVisit.from_dict( | return OriginVisit.from_dict( | ||||
{ | { | ||||
# default to the values in visit | # default to the values in visit | ||||
**visit, | **visit, | ||||
# override with the last update | # override with the last update | ||||
**visit_status, | **visit_status.to_dict(), | ||||
# visit['origin'] is the URL (via a join), while | # visit['origin'] is the URL (via a join), while | ||||
# visit_status['origin'] is only an id. | # visit_status['origin'] is only an id. | ||||
"origin": visit["origin"], | "origin": visit["origin"], | ||||
# but keep the date of the creation of the origin visit | # but keep the date of the creation of the origin visit | ||||
"date": visit["date"], | "date": visit["date"], | ||||
} | } | ||||
).to_dict() | ).to_dict() | ||||
def _origin_visit_apply_last_status(self, visit: Dict[str, Any]) -> Dict[str, Any]: | def _origin_visit_apply_last_status(self, visit: Dict[str, Any]) -> Dict[str, Any]: | ||||
"""Retrieve the latest visit status information for the origin visit. | """Retrieve the latest visit status information for the origin visit. | ||||
Then merge it with the visit and return it. | Then merge it with the visit and return it. | ||||
""" | """ | ||||
visit_status = self._cql_runner.origin_visit_status_get_latest( | row = self._cql_runner.origin_visit_status_get_latest( | ||||
visit["origin"], visit["visit"] | visit["origin"], visit["visit"] | ||||
) | ) | ||||
assert visit_status is not None | assert row is not None | ||||
return self._origin_visit_merge(visit, visit_status) | return self._origin_visit_merge(visit, row_to_visit_status(row)) | ||||
def _origin_visit_get_updated(self, origin: str, visit_id: int) -> Dict[str, Any]: | def _origin_visit_get_updated(self, origin: str, visit_id: int) -> Dict[str, Any]: | ||||
"""Retrieve origin visit and latest origin visit status and merge them | """Retrieve origin visit and latest origin visit status and merge them | ||||
into an origin visit. | into an origin visit. | ||||
""" | """ | ||||
row_visit = self._cql_runner.origin_visit_get_one(origin, visit_id) | row_visit = self._cql_runner.origin_visit_get_one(origin, visit_id) | ||||
assert row_visit is not None | assert row_visit is not None | ||||
▲ Show 20 Lines • Show All 65 Lines • ▼ Show 20 Lines | ) -> Optional[Dict[str, Any]]: | ||||
continue | continue | ||||
if updated_visit["visit"] < latest_visit["visit"]: | if updated_visit["visit"] < latest_visit["visit"]: | ||||
continue | continue | ||||
latest_visit = updated_visit | latest_visit = updated_visit | ||||
return latest_visit | return latest_visit | ||||
def origin_visit_status_get_latest( | |||||
self, | |||||
origin_url: str, | |||||
visit: int, | |||||
allowed_statuses: Optional[List[str]] = None, | |||||
require_snapshot: bool = False, | |||||
) -> Optional[OriginVisitStatus]: | |||||
rows = self._cql_runner.origin_visit_status_get( | |||||
origin_url, visit, allowed_statuses, require_snapshot | |||||
) | |||||
# filtering is done python side as we cannot do it server side | |||||
if allowed_statuses: | |||||
rows = [row for row in rows if row.status in allowed_statuses] | |||||
if require_snapshot: | |||||
rows = [row for row in rows if row.snapshot is not None] | |||||
if not rows: | |||||
return None | |||||
return row_to_visit_status(rows[0]) | |||||
def origin_visit_get_random(self, type: str) -> Optional[Dict[str, Any]]: | def origin_visit_get_random(self, type: str) -> Optional[Dict[str, Any]]: | ||||
back_in_the_day = now() - datetime.timedelta(weeks=12) # 3 months back | back_in_the_day = now() - datetime.timedelta(weeks=12) # 3 months back | ||||
# Random position to start iteration at | # Random position to start iteration at | ||||
start_token = random.randint(TOKEN_BEGIN, TOKEN_END) | start_token = random.randint(TOKEN_BEGIN, TOKEN_END) | ||||
# Iterator over all visits, ordered by token(origins) then visit_id | # Iterator over all visits, ordered by token(origins) then visit_id | ||||
rows = self._cql_runner.origin_visit_iter(start_token) | rows = self._cql_runner.origin_visit_iter(start_token) | ||||
▲ Show 20 Lines • Show All 180 Lines • Show Last 20 Lines |