Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/in_memory.py
Show First 20 Lines • Show All 892 Lines • ▼ Show 20 Lines | class InMemoryStorage: | ||||
) -> Optional[OriginVisit]: | ) -> Optional[OriginVisit]: | ||||
origin_url = self._get_origin_url(origin) | origin_url = self._get_origin_url(origin) | ||||
if origin_url in self._origin_visits: | if origin_url in self._origin_visits: | ||||
visits = self._origin_visits[origin_url] | visits = self._origin_visits[origin_url] | ||||
visit = min(visits, key=lambda v: (abs(v.date - visit_date), -v.visit)) | visit = min(visits, key=lambda v: (abs(v.date - visit_date), -v.visit)) | ||||
return visit | return visit | ||||
return None | return None | ||||
def origin_visit_get_by(self, origin: str, visit: int) -> Optional[Dict[str, Any]]: | def origin_visit_get_by(self, origin: str, visit: int) -> Optional[OriginVisit]: | ||||
origin_url = self._get_origin_url(origin) | origin_url = self._get_origin_url(origin) | ||||
if origin_url in self._origin_visits and visit <= len( | if origin_url in self._origin_visits and visit <= len( | ||||
self._origin_visits[origin_url] | self._origin_visits[origin_url] | ||||
): | ): | ||||
visit_update = self._origin_visit_get_updated(origin_url, visit) | found_visit, _ = self._origin_visit_status_get_latest(origin, visit) | ||||
assert visit_update is not None | return found_visit | ||||
return visit_update | |||||
return None | return None | ||||
def origin_visit_get_latest( | def origin_visit_get_latest( | ||||
self, | self, | ||||
origin: str, | origin: str, | ||||
type: Optional[str] = None, | type: Optional[str] = None, | ||||
allowed_statuses: Optional[List[str]] = None, | allowed_statuses: Optional[List[str]] = None, | ||||
require_snapshot: bool = False, | require_snapshot: bool = False, | ||||
▲ Show 20 Lines • Show All 297 Lines • Show Last 20 Lines |