Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/cassandra/storage.py
Show First 20 Lines • Show All 821 Lines • ▼ Show 20 Lines | def origin_visit_add(self, origin_url: str, | ||||
except (KeyError, TypeError, ValueError) as e: | except (KeyError, TypeError, ValueError) as e: | ||||
raise StorageArgumentException(*e.args) | raise StorageArgumentException(*e.args) | ||||
self.journal_writer.origin_visit_add(visit) | self.journal_writer.origin_visit_add(visit) | ||||
self._cql_runner.origin_visit_add_one(visit) | self._cql_runner.origin_visit_add_one(visit) | ||||
return visit | return visit | ||||
def origin_visit_update( | def origin_visit_update( | ||||
self, origin: str, visit_id: int, status: str, | self, origin: str, visit_id: int, status: str, | ||||
metadata: Optional[Dict] = None, snapshot: Optional[bytes] = None): | metadata: Optional[Dict] = None, snapshot: Optional[bytes] = None, | ||||
date: Optional[datetime.datetime] = None): | |||||
origin_url = origin # TODO: rename the argument | origin_url = origin # TODO: rename the argument | ||||
# Get the existing data of the visit | # Get the existing data of the visit | ||||
row = self._cql_runner.origin_visit_get_one(origin_url, visit_id) | row = self._cql_runner.origin_visit_get_one(origin_url, visit_id) | ||||
if not row: | if not row: | ||||
raise StorageArgumentException('This origin visit does not exist.') | raise StorageArgumentException('This origin visit does not exist.') | ||||
try: | try: | ||||
visit = OriginVisit.from_dict(self._format_origin_visit_row(row)) | visit = OriginVisit.from_dict(self._format_origin_visit_row(row)) | ||||
Show All 27 Lines | def _format_origin_visit_row(visit): | ||||
return { | return { | ||||
**visit._asdict(), | **visit._asdict(), | ||||
'origin': visit.origin, | 'origin': visit.origin, | ||||
'date': visit.date.replace(tzinfo=datetime.timezone.utc), | 'date': visit.date.replace(tzinfo=datetime.timezone.utc), | ||||
'metadata': (json.loads(visit.metadata) | 'metadata': (json.loads(visit.metadata) | ||||
if visit.metadata else None), | if visit.metadata else None), | ||||
} | } | ||||
def origin_visit_get(self, origin, last_visit=None, limit=None): | def origin_visit_get(self, origin: str, last_visit: Optional[int] = None, | ||||
limit: Optional[int] = None): | |||||
rows = self._cql_runner.origin_visit_get(origin, last_visit, limit) | rows = self._cql_runner.origin_visit_get(origin, last_visit, limit) | ||||
yield from map(self._format_origin_visit_row, rows) | yield from map(self._format_origin_visit_row, rows) | ||||
def origin_visit_find_by_date(self, origin, visit_date): | def origin_visit_find_by_date( | ||||
self, origin: str, | |||||
visit_date: datetime.datetime) -> Optional[Dict[str, Any]]: | |||||
# Iterator over all the visits of the origin | # Iterator over all the visits of the origin | ||||
# This should be ok for now, as there aren't too many visits | # This should be ok for now, as there aren't too many visits | ||||
# per origin. | # per origin. | ||||
visits = list(self._cql_runner.origin_visit_get_all(origin)) | visits = list(self._cql_runner.origin_visit_get_all(origin)) | ||||
def key(visit): | def key(visit): | ||||
dt = visit.date.replace(tzinfo=datetime.timezone.utc) - visit_date | dt = visit.date.replace(tzinfo=datetime.timezone.utc) - visit_date | ||||
return (abs(dt), -visit.visit) | return (abs(dt), -visit.visit) | ||||
if visits: | if visits: | ||||
visit = min(visits, key=key) | visit = min(visits, key=key) | ||||
return visit._asdict() | return visit._asdict() | ||||
return None | |||||
def origin_visit_get_by(self, origin, visit): | def origin_visit_get_by( | ||||
self, origin: str, visit: int) -> Optional[Dict[str, Any]]: | |||||
visit = self._cql_runner.origin_visit_get_one(origin, visit) | visit = self._cql_runner.origin_visit_get_one(origin, visit) | ||||
if visit: | if visit: | ||||
return self._format_origin_visit_row(visit) | return self._format_origin_visit_row(visit) | ||||
return None | |||||
def origin_visit_get_latest( | def origin_visit_get_latest( | ||||
self, origin, allowed_statuses=None, require_snapshot=False): | self, origin: str, allowed_statuses: Optional[List[str]] = None, | ||||
require_snapshot: bool = False) -> Optional[Dict[str, Any]]: | |||||
visit = self._cql_runner.origin_visit_get_latest( | visit = self._cql_runner.origin_visit_get_latest( | ||||
origin, | origin, | ||||
allowed_statuses=allowed_statuses, | allowed_statuses=allowed_statuses, | ||||
require_snapshot=require_snapshot) | require_snapshot=require_snapshot) | ||||
if visit: | if visit: | ||||
return self._format_origin_visit_row(visit) | return self._format_origin_visit_row(visit) | ||||
return None | |||||
def origin_visit_get_random(self, type: str) -> Optional[Dict[str, Any]]: | def origin_visit_get_random(self, type: str) -> Optional[Dict[str, Any]]: | ||||
back_in_the_day = now() - datetime.timedelta(weeks=12) # 3 months back | back_in_the_day = now() - datetime.timedelta(weeks=12) # 3 months back | ||||
# Random position to start iteration at | # Random position to start iteration at | ||||
start_token = random.randint(TOKEN_BEGIN, TOKEN_END) | start_token = random.randint(TOKEN_BEGIN, TOKEN_END) | ||||
# Iterator over all visits, ordered by token(origins) then visit_id | # Iterator over all visits, ordered by token(origins) then visit_id | ||||
▲ Show 20 Lines • Show All 69 Lines • Show Last 20 Lines |