Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/in_memory.py
Show First 20 Lines • Show All 936 Lines • ▼ Show 20 Lines | ) -> Optional[Dict[str, Any]]: | ||||
if require_snapshot: | if require_snapshot: | ||||
visits = [visit for visit in visits if visit.snapshot] | visits = [visit for visit in visits if visit.snapshot] | ||||
visit = max(visits, key=lambda v: (v.date, v.visit), default=None) | visit = max(visits, key=lambda v: (v.date, v.visit), default=None) | ||||
if visit is None: | if visit is None: | ||||
return None | return None | ||||
return visit.to_dict() | return visit.to_dict() | ||||
def origin_visit_status_get_latest( | |||||
self, | |||||
origin_url: str, | |||||
visit: int, | |||||
allowed_statuses: Optional[List[str]] = None, | |||||
require_snapshot: bool = False, | |||||
) -> Optional[OriginVisitStatus]: | |||||
ori = self._origins.get(origin_url) | |||||
if not ori: | |||||
return None | |||||
visit_key = (origin_url, visit) | |||||
visits = self._origin_visit_statuses.get(visit_key) | |||||
if not visits: | |||||
return None | |||||
if allowed_statuses is not None: | |||||
visits = [visit for visit in visits if visit.status in allowed_statuses] | |||||
if require_snapshot: | |||||
visits = [visit for visit in visits if visit.snapshot] | |||||
visit_status = max(visits, key=lambda v: (v.date, v.visit), default=None) | |||||
return visit_status | |||||
def _select_random_origin_visit_by_type(self, type: str) -> str: | def _select_random_origin_visit_by_type(self, type: str) -> str: | ||||
while True: | while True: | ||||
url = random.choice(list(self._origin_visits.keys())) | url = random.choice(list(self._origin_visits.keys())) | ||||
random_origin_visits = self._origin_visits[url] | random_origin_visits = self._origin_visits[url] | ||||
if random_origin_visits[0].type == type: | if random_origin_visits[0].type == type: | ||||
return url | return url | ||||
def origin_visit_get_random(self, type: str) -> Optional[Dict[str, Any]]: | def origin_visit_get_random(self, type: str) -> Optional[Dict[str, Any]]: | ||||
▲ Show 20 Lines • Show All 232 Lines • Show Last 20 Lines |