Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/cassandra/storage.py
Show All 15 Lines | |||||
from swh.model.model import ( | from swh.model.model import ( | ||||
Revision, | Revision, | ||||
Release, | Release, | ||||
Directory, | Directory, | ||||
DirectoryEntry, | DirectoryEntry, | ||||
Content, | Content, | ||||
SkippedContent, | SkippedContent, | ||||
OriginVisit, | OriginVisit, | ||||
OriginVisitStatus, | |||||
Snapshot, | Snapshot, | ||||
Origin, | Origin, | ||||
) | ) | ||||
from swh.model.hashutil import DEFAULT_ALGORITHMS | from swh.model.hashutil import DEFAULT_ALGORITHMS | ||||
from swh.storage.objstorage import ObjStorage | from swh.storage.objstorage import ObjStorage | ||||
from swh.storage.writer import JournalWriter | from swh.storage.writer import JournalWriter | ||||
from swh.storage.validate import convert_validation_exceptions | from swh.storage.validate import convert_validation_exceptions | ||||
from swh.storage.utils import now | from swh.storage.utils import now | ||||
▲ Show 20 Lines • Show All 539 Lines • ▼ Show 20 Lines | class CassandraStorage: | ||||
def snapshot_missing(self, snapshots): | def snapshot_missing(self, snapshots): | ||||
return self._cql_runner.snapshot_missing(snapshots) | return self._cql_runner.snapshot_missing(snapshots) | ||||
def snapshot_get(self, snapshot_id): | def snapshot_get(self, snapshot_id): | ||||
return self.snapshot_get_branches(snapshot_id) | return self.snapshot_get_branches(snapshot_id) | ||||
def snapshot_get_by_origin_visit(self, origin, visit): | def snapshot_get_by_origin_visit(self, origin, visit): | ||||
try: | try: | ||||
visit = self._cql_runner.origin_visit_get_one(origin, visit) | visit = self.origin_visit_get_by(origin, visit) | ||||
except IndexError: | except IndexError: | ||||
return None | return None | ||||
return self.snapshot_get(visit.snapshot) | return self.snapshot_get(visit["snapshot"]) | ||||
def snapshot_get_latest(self, origin, allowed_statuses=None): | def snapshot_get_latest(self, origin, allowed_statuses=None): | ||||
visit = self.origin_visit_get_latest( | visit = self.origin_visit_get_latest( | ||||
origin, allowed_statuses=allowed_statuses, require_snapshot=True | origin, allowed_statuses=allowed_statuses, require_snapshot=True | ||||
) | ) | ||||
if visit: | if visit: | ||||
assert visit["snapshot"] | assert visit["snapshot"] | ||||
▲ Show 20 Lines • Show All 209 Lines • ▼ Show 20 Lines | ) -> OriginVisit: | ||||
date = dateutil.parser.parse(date) | date = dateutil.parser.parse(date) | ||||
elif not isinstance(date, datetime.datetime): | elif not isinstance(date, datetime.datetime): | ||||
raise StorageArgumentException("Date must be a datetime or a string") | raise StorageArgumentException("Date must be a datetime or a string") | ||||
if not self.origin_get_one({"url": origin_url}): | if not self.origin_get_one({"url": origin_url}): | ||||
raise StorageArgumentException("Unknown origin %s", origin_url) | raise StorageArgumentException("Unknown origin %s", origin_url) | ||||
visit_id = self._cql_runner.origin_generate_unique_visit_id(origin_url) | visit_id = self._cql_runner.origin_generate_unique_visit_id(origin_url) | ||||
visit_state = "ongoing" | |||||
with convert_validation_exceptions(): | with convert_validation_exceptions(): | ||||
visit = OriginVisit.from_dict( | visit = OriginVisit.from_dict( | ||||
{ | { | ||||
"origin": origin_url, | "origin": origin_url, | ||||
"date": date, | "date": date, | ||||
"type": type, | "type": type, | ||||
"status": "ongoing", | "status": visit_state, | ||||
"snapshot": None, | "snapshot": None, | ||||
"metadata": None, | "metadata": None, | ||||
"visit": visit_id, | "visit": visit_id, | ||||
} | } | ||||
) | ) | ||||
self.journal_writer.origin_visit_add(visit) | self.journal_writer.origin_visit_add(visit) | ||||
self._cql_runner.origin_visit_add_one(visit) | self._cql_runner.origin_visit_add_one(visit) | ||||
with convert_validation_exceptions(): | |||||
visit_status = OriginVisitStatus( | |||||
origin=origin_url, | |||||
visit=visit_id, | |||||
date=date, | |||||
status=visit_state, | |||||
snapshot=None, | |||||
metadata=None, | |||||
) | |||||
self._origin_visit_status_add(visit_status) | |||||
return visit | return visit | ||||
def _origin_visit_status_add(self, visit_status: OriginVisitStatus) -> None: | |||||
"""Add an origin visit status""" | |||||
self._cql_runner.origin_visit_status_add_one(visit_status) | |||||
def origin_visit_update( | def origin_visit_update( | ||||
self, | self, | ||||
origin: str, | origin: str, | ||||
visit_id: int, | visit_id: int, | ||||
status: str, | status: str, | ||||
metadata: Optional[Dict] = None, | metadata: Optional[Dict] = None, | ||||
snapshot: Optional[bytes] = None, | snapshot: Optional[bytes] = None, | ||||
date: Optional[datetime.datetime] = None, | date: Optional[datetime.datetime] = None, | ||||
): | ): | ||||
origin_url = origin # TODO: rename the argument | origin_url = origin # TODO: rename the argument | ||||
# Get the existing data of the visit | # Get the existing data of the visit | ||||
row = self._cql_runner.origin_visit_get_one(origin_url, visit_id) | visit_ = self.origin_visit_get_by(origin_url, visit_id) | ||||
if not row: | if not visit_: | ||||
raise StorageArgumentException("This origin visit does not exist.") | raise StorageArgumentException("This origin visit does not exist.") | ||||
with convert_validation_exceptions(): | with convert_validation_exceptions(): | ||||
visit = OriginVisit.from_dict(self._format_origin_visit_row(row)) | visit = OriginVisit.from_dict(visit_) | ||||
updates: Dict[str, Any] = {"status": status} | updates: Dict[str, Any] = {"status": status} | ||||
if metadata: | if metadata and metadata != visit.metadata: | ||||
updates["metadata"] = metadata | updates["metadata"] = metadata | ||||
if snapshot: | if snapshot and snapshot != visit.snapshot: | ||||
updates["snapshot"] = snapshot | updates["snapshot"] = snapshot | ||||
with convert_validation_exceptions(): | with convert_validation_exceptions(): | ||||
visit = attr.evolve(visit, **updates) | visit = attr.evolve(visit, **updates) | ||||
self.journal_writer.origin_visit_update(visit) | self.journal_writer.origin_visit_update(visit) | ||||
self._cql_runner.origin_visit_update(origin_url, visit_id, updates) | last_visit_update = self._origin_visit_get_updated(visit.origin, visit.visit) | ||||
assert last_visit_update is not None | |||||
with convert_validation_exceptions(): | |||||
visit_status = OriginVisitStatus( | |||||
origin=origin_url, | |||||
visit=visit_id, | |||||
date=date or now(), | |||||
status=status, | |||||
snapshot=snapshot or last_visit_update["snapshot"], | |||||
metadata=metadata or last_visit_update["metadata"], | |||||
) | |||||
self._origin_visit_status_add(visit_status) | |||||
def _origin_visit_merge( | |||||
self, visit: Dict[str, Any], visit_status: Dict[str, Any] | |||||
) -> Dict[str, Any]: | |||||
"""Merge origin_visit and visit_status together. | |||||
""" | |||||
return OriginVisit.from_dict( | |||||
{ | |||||
# default to the values in visit | |||||
**visit, | |||||
# override with the last update | |||||
**visit_status, | |||||
# visit['origin'] is the URL (via a join), while | |||||
# visit_status['origin'] is only an id. | |||||
"origin": visit["origin"], | |||||
# but keep the date of the creation of the origin visit | |||||
"date": visit["date"], | |||||
} | |||||
).to_dict() | |||||
def _origin_visit_apply_last_status(self, visit: Dict[str, Any]) -> Dict[str, Any]: | |||||
"""Retrieve the latest visit status information for the origin visit. | |||||
Then merge it with the visit and return it. | |||||
""" | |||||
visit_status = self._cql_runner.origin_visit_status_get_latest( | |||||
visit["origin"], visit["visit"] | |||||
) | |||||
assert visit_status is not None | |||||
return self._origin_visit_merge(visit, visit_status) | |||||
def _origin_visit_get_updated(self, origin: str, visit_id: int) -> Dict[str, Any]: | |||||
"""Retrieve origin visit and latest origin visit status and merge them | |||||
into an origin visit. | |||||
""" | |||||
row_visit = self._cql_runner.origin_visit_get_one(origin, visit_id) | |||||
assert row_visit is not None | |||||
visit = self._format_origin_visit_row(row_visit) | |||||
return self._origin_visit_apply_last_status(visit) | |||||
vlorentz: I think this should be an assert, and the function not return Optional | |||||
def origin_visit_upsert(self, visits: Iterable[OriginVisit]) -> None: | def origin_visit_upsert(self, visits: Iterable[OriginVisit]) -> None: | ||||
for visit in visits: | for visit in visits: | ||||
if visit.visit is None: | if visit.visit is None: | ||||
raise StorageArgumentException(f"Missing visit id for visit {visit}") | raise StorageArgumentException(f"Missing visit id for visit {visit}") | ||||
self.journal_writer.origin_visit_upsert(visits) | self.journal_writer.origin_visit_upsert(visits) | ||||
for visit in visits: | for visit in visits: | ||||
assert visit.visit is not None | |||||
self._cql_runner.origin_visit_upsert(visit) | self._cql_runner.origin_visit_upsert(visit) | ||||
with convert_validation_exceptions(): | |||||
visit_status = OriginVisitStatus( | |||||
origin=visit.origin, | |||||
visit=visit.visit, | |||||
date=now(), | |||||
status=visit.status, | |||||
snapshot=visit.snapshot, | |||||
metadata=visit.metadata, | |||||
) | |||||
self._origin_visit_status_add(visit_status) | |||||
@staticmethod | @staticmethod | ||||
def _format_origin_visit_row(visit): | def _format_origin_visit_row(visit): | ||||
return { | return { | ||||
**visit._asdict(), | **visit._asdict(), | ||||
"origin": visit.origin, | "origin": visit.origin, | ||||
"date": visit.date.replace(tzinfo=datetime.timezone.utc), | "date": visit.date.replace(tzinfo=datetime.timezone.utc), | ||||
"metadata": (json.loads(visit.metadata) if visit.metadata else None), | "metadata": (json.loads(visit.metadata) if visit.metadata else None), | ||||
} | } | ||||
def origin_visit_get( | def origin_visit_get( | ||||
self, origin: str, last_visit: Optional[int] = None, limit: Optional[int] = None | self, origin: str, last_visit: Optional[int] = None, limit: Optional[int] = None | ||||
) -> Iterable[Dict[str, Any]]: | ) -> Iterable[Dict[str, Any]]: | ||||
rows = self._cql_runner.origin_visit_get(origin, last_visit, limit) | rows = self._cql_runner.origin_visit_get(origin, last_visit, limit) | ||||
for row in rows: | |||||
yield from map(self._format_origin_visit_row, rows) | visit = self._format_origin_visit_row(row) | ||||
yield self._origin_visit_apply_last_status(visit) | |||||
def origin_visit_find_by_date( | def origin_visit_find_by_date( | ||||
self, origin: str, visit_date: datetime.datetime | self, origin: str, visit_date: datetime.datetime | ||||
) -> Optional[Dict[str, Any]]: | ) -> Optional[Dict[str, Any]]: | ||||
# Iterator over all the visits of the origin | # Iterator over all the visits of the origin | ||||
# This should be ok for now, as there aren't too many visits | # This should be ok for now, as there aren't too many visits | ||||
# per origin. | # per origin. | ||||
visits = list(self._cql_runner.origin_visit_get_all(origin)) | rows = list(self._cql_runner.origin_visit_get_all(origin)) | ||||
def key(visit): | def key(visit): | ||||
dt = visit.date.replace(tzinfo=datetime.timezone.utc) - visit_date | dt = visit.date.replace(tzinfo=datetime.timezone.utc) - visit_date | ||||
return (abs(dt), -visit.visit) | return (abs(dt), -visit.visit) | ||||
if visits: | if rows: | ||||
visit = min(visits, key=key) | row = min(rows, key=key) | ||||
return visit._asdict() | visit = self._format_origin_visit_row(row) | ||||
return self._origin_visit_apply_last_status(visit) | |||||
return None | return None | ||||
def origin_visit_get_by(self, origin: str, visit: int) -> Optional[Dict[str, Any]]: | def origin_visit_get_by(self, origin: str, visit: int) -> Optional[Dict[str, Any]]: | ||||
visit = self._cql_runner.origin_visit_get_one(origin, visit) | row = self._cql_runner.origin_visit_get_one(origin, visit) | ||||
if visit: | if row: | ||||
return self._format_origin_visit_row(visit) | visit_ = self._format_origin_visit_row(row) | ||||
return self._origin_visit_apply_last_status(visit_) | |||||
return None | return None | ||||
def origin_visit_get_latest( | def origin_visit_get_latest( | ||||
self, | self, | ||||
origin: str, | origin: str, | ||||
allowed_statuses: Optional[List[str]] = None, | allowed_statuses: Optional[List[str]] = None, | ||||
require_snapshot: bool = False, | require_snapshot: bool = False, | ||||
) -> Optional[Dict[str, Any]]: | ) -> Optional[Dict[str, Any]]: | ||||
visit = self._cql_runner.origin_visit_get_latest( | # TODO: Do not fetch all visits | ||||
origin, allowed_statuses=allowed_statuses, require_snapshot=require_snapshot | rows = self._cql_runner.origin_visit_get_all(origin) | ||||
) | latest_visit = None | ||||
if visit: | for row in rows: | ||||
return self._format_origin_visit_row(visit) | visit = self._format_origin_visit_row(row) | ||||
return None | updated_visit = self._origin_visit_apply_last_status(visit) | ||||
if allowed_statuses and updated_visit["status"] not in allowed_statuses: | |||||
continue | |||||
if require_snapshot and updated_visit["snapshot"] is None: | |||||
continue | |||||
# updated_visit is a candidate | |||||
if latest_visit is not None: | |||||
if updated_visit["date"] < latest_visit["date"]: | |||||
continue | |||||
if updated_visit["visit"] < latest_visit["visit"]: | |||||
continue | |||||
latest_visit = updated_visit | |||||
return latest_visit | |||||
def origin_visit_get_random(self, type: str) -> Optional[Dict[str, Any]]: | def origin_visit_get_random(self, type: str) -> Optional[Dict[str, Any]]: | ||||
back_in_the_day = now() - datetime.timedelta(weeks=12) # 3 months back | back_in_the_day = now() - datetime.timedelta(weeks=12) # 3 months back | ||||
# Random position to start iteration at | # Random position to start iteration at | ||||
start_token = random.randint(TOKEN_BEGIN, TOKEN_END) | start_token = random.randint(TOKEN_BEGIN, TOKEN_END) | ||||
# Iterator over all visits, ordered by token(origins) then visit_id | # Iterator over all visits, ordered by token(origins) then visit_id | ||||
rows = self._cql_runner.origin_visit_iter(start_token) | rows = self._cql_runner.origin_visit_iter(start_token) | ||||
for row in rows: | for row in rows: | ||||
visit = self._format_origin_visit_row(row) | visit = self._format_origin_visit_row(row) | ||||
if visit["date"] > back_in_the_day and visit["status"] == "full": | visit_status = self._origin_visit_apply_last_status(visit) | ||||
return visit | if ( | ||||
visit_status["date"] > back_in_the_day | |||||
and visit_status["status"] == "full" | |||||
): | |||||
return visit_status | |||||
else: | else: | ||||
return None | return None | ||||
def tool_add(self, tools): | def tool_add(self, tools): | ||||
inserted = [] | inserted = [] | ||||
for tool in tools: | for tool in tools: | ||||
tool = tool.copy() | tool = tool.copy() | ||||
tool_json = tool.copy() | tool_json = tool.copy() | ||||
▲ Show 20 Lines • Show All 75 Lines • Show Last 20 Lines |
I think this should be an assert, and the function not return Optional