Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/in_memory.py
# Copyright (C) 2015-2020 The Software Heritage developers | # Copyright (C) 2015-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import re | import re | ||||
import bisect | import bisect | ||||
import dateutil | import dateutil | ||||
import collections | import collections | ||||
import copy | import copy | ||||
import datetime | import datetime | ||||
import itertools | import itertools | ||||
import random | import random | ||||
from collections import defaultdict | from collections import defaultdict | ||||
from datetime import timedelta | from datetime import timedelta | ||||
from typing import Any, Dict, Iterable, List, Optional, Union | from typing import Any, Dict, Iterable, List, Optional, Tuple, Union | ||||
import attr | import attr | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
BaseContent, | BaseContent, | ||||
Content, | Content, | ||||
SkippedContent, | SkippedContent, | ||||
Directory, | Directory, | ||||
Revision, | Revision, | ||||
Release, | Release, | ||||
Snapshot, | Snapshot, | ||||
OriginVisit, | OriginVisit, | ||||
OriginVisitUpdate, | |||||
Origin, | Origin, | ||||
SHA1_SIZE, | SHA1_SIZE, | ||||
) | ) | ||||
from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex | from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex | ||||
from swh.storage.objstorage import ObjStorage | from swh.storage.objstorage import ObjStorage | ||||
from swh.storage.validate import convert_validation_exceptions | from swh.storage.validate import convert_validation_exceptions | ||||
from swh.storage.utils import now | from swh.storage.utils import now | ||||
Show All 21 Lines | def reset(self): | ||||
self._directories = {} | self._directories = {} | ||||
self._revisions = {} | self._revisions = {} | ||||
self._releases = {} | self._releases = {} | ||||
self._snapshots = {} | self._snapshots = {} | ||||
self._origins = {} | self._origins = {} | ||||
self._origins_by_id = [] | self._origins_by_id = [] | ||||
self._origins_by_sha1 = {} | self._origins_by_sha1 = {} | ||||
self._origin_visits = {} | self._origin_visits = {} | ||||
self._origin_visit_updates: Dict[Tuple[str, int], List[OriginVisitUpdate]] = {} | |||||
self._persons = [] | self._persons = [] | ||||
self._origin_metadata = defaultdict(list) | self._origin_metadata = defaultdict(list) | ||||
self._tools = {} | self._tools = {} | ||||
self._metadata_providers = {} | self._metadata_providers = {} | ||||
self._objects = defaultdict(list) | self._objects = defaultdict(list) | ||||
# ideally we would want a skip list for both fast inserts and searches | # ideally we would want a skip list for both fast inserts and searches | ||||
self._sorted_sha1s = [] | self._sorted_sha1s = [] | ||||
▲ Show 20 Lines • Show All 419 Lines • ▼ Show 20 Lines | def snapshot_get_by_origin_visit(self, origin, visit): | ||||
origin_url = self._get_origin_url(origin) | origin_url = self._get_origin_url(origin) | ||||
if not origin_url: | if not origin_url: | ||||
return | return | ||||
if origin_url not in self._origins or visit > len( | if origin_url not in self._origins or visit > len( | ||||
self._origin_visits[origin_url] | self._origin_visits[origin_url] | ||||
): | ): | ||||
return None | return None | ||||
snapshot_id = self._origin_visits[origin_url][visit - 1].snapshot | |||||
visit = self._origin_visit_get_updated(origin_url, visit) | |||||
snapshot_id = visit.snapshot | |||||
if snapshot_id: | if snapshot_id: | ||||
return self.snapshot_get(snapshot_id) | return self.snapshot_get(snapshot_id) | ||||
else: | else: | ||||
return None | return None | ||||
def snapshot_get_latest(self, origin, allowed_statuses=None): | def snapshot_get_latest(self, origin, allowed_statuses=None): | ||||
origin_url = self._get_origin_url(origin) | origin_url = self._get_origin_url(origin) | ||||
if not origin_url: | if not origin_url: | ||||
▲ Show 20 Lines • Show All 150 Lines • ▼ Show 20 Lines | class InMemoryStorage: | ||||
): | ): | ||||
origins = map(self._convert_origin, self._origins.values()) | origins = map(self._convert_origin, self._origins.values()) | ||||
if regexp: | if regexp: | ||||
pat = re.compile(url_pattern) | pat = re.compile(url_pattern) | ||||
origins = [orig for orig in origins if pat.search(orig["url"])] | origins = [orig for orig in origins if pat.search(orig["url"])] | ||||
else: | else: | ||||
origins = [orig for orig in origins if url_pattern in orig["url"]] | origins = [orig for orig in origins if url_pattern in orig["url"]] | ||||
if with_visit: | if with_visit: | ||||
origins = [ | filtered_origins = [] | ||||
orig | for orig in origins: | ||||
for orig in origins | visits = ( | ||||
if len(self._origin_visits[orig["url"]]) > 0 | self._origin_visit_get_updated(ov.origin, ov.visit) | ||||
and set( | |||||
ov.snapshot | |||||
for ov in self._origin_visits[orig["url"]] | for ov in self._origin_visits[orig["url"]] | ||||
if ov.snapshot | |||||
) | ) | ||||
& set(self._snapshots) | for ov in visits: | ||||
] | if ov.snapshot and ov.snapshot in self._snapshots: | ||||
filtered_origins.append(orig) | |||||
break | |||||
else: | |||||
filtered_origins = origins | |||||
return origins[offset : offset + limit] | return filtered_origins[offset : offset + limit] | ||||
def origin_count(self, url_pattern, regexp=False, with_visit=False): | def origin_count(self, url_pattern, regexp=False, with_visit=False): | ||||
return len( | return len( | ||||
self.origin_search( | self.origin_search( | ||||
url_pattern, | url_pattern, | ||||
regexp=regexp, | regexp=regexp, | ||||
with_visit=with_visit, | with_visit=with_visit, | ||||
limit=len(self._origins), | limit=len(self._origins), | ||||
Show All 35 Lines | ) -> OriginVisit: | ||||
if not origin: # Cannot add a visit without an origin | if not origin: # Cannot add a visit without an origin | ||||
raise StorageArgumentException("Unknown origin %s", origin_url) | raise StorageArgumentException("Unknown origin %s", origin_url) | ||||
if origin_url in self._origins: | if origin_url in self._origins: | ||||
origin = self._origins[origin_url] | origin = self._origins[origin_url] | ||||
# visit ids are in the range [1, +inf[ | # visit ids are in the range [1, +inf[ | ||||
visit_id = len(self._origin_visits[origin_url]) + 1 | visit_id = len(self._origin_visits[origin_url]) + 1 | ||||
status = "ongoing" | status = "ongoing" | ||||
with convert_validation_exceptions(): | |||||
visit = OriginVisit( | visit = OriginVisit( | ||||
origin=origin_url, | origin=origin_url, | ||||
date=date, | date=date, | ||||
type=type, | type=type, | ||||
# TODO: Remove when we remove those fields from the model | |||||
status=status, | status=status, | ||||
snapshot=None, | snapshot=None, | ||||
metadata=None, | metadata=None, | ||||
visit=visit_id, | visit=visit_id, | ||||
) | ) | ||||
self._origin_visits[origin_url].append(visit) | self._origin_visits[origin_url].append(visit) | ||||
visit = visit | assert visit.visit is not None | ||||
visit_key = (origin_url, visit.visit) | |||||
self._objects[(origin_url, visit.visit)].append(("origin_visit", None)) | with convert_validation_exceptions(): | ||||
visit_update = OriginVisitUpdate( | |||||
origin=origin_url, | |||||
visit=visit_id, | |||||
date=date, | |||||
status=status, | |||||
snapshot=None, | |||||
metadata=None, | |||||
) | |||||
self._origin_visit_updates[visit_key] = [visit_update] | |||||
self._objects[visit_key].append(("origin_visit", None)) | |||||
self.journal_writer.origin_visit_add(visit) | self.journal_writer.origin_visit_add(visit) | ||||
# return last visit | # return last visit | ||||
return visit | return visit | ||||
def origin_visit_update( | def origin_visit_update( | ||||
self, | self, | ||||
origin: str, | origin: str, | ||||
visit_id: int, | visit_id: int, | ||||
status: str, | status: str, | ||||
metadata: Optional[Dict] = None, | metadata: Optional[Dict] = None, | ||||
snapshot: Optional[bytes] = None, | snapshot: Optional[bytes] = None, | ||||
date: Optional[datetime.datetime] = None, | date: Optional[datetime.datetime] = None, | ||||
): | ): | ||||
origin_url = self._get_origin_url(origin) | origin_url = self._get_origin_url(origin) | ||||
if origin_url is None: | if origin_url is None: | ||||
raise StorageArgumentException("Unknown origin.") | raise StorageArgumentException("Unknown origin.") | ||||
try: | try: | ||||
visit = self._origin_visits[origin_url][visit_id - 1] | visit = self._origin_visits[origin_url][visit_id - 1] | ||||
except IndexError: | except IndexError: | ||||
raise StorageArgumentException("Unknown visit_id for this origin") from None | raise StorageArgumentException("Unknown visit_id for this origin") from None | ||||
updates: Dict[str, Any] = {"status": status} | # Retrieve the previous visit update | ||||
if metadata: | assert visit.visit is not None | ||||
updates["metadata"] = metadata | visit_key = (origin_url, visit.visit) | ||||
if snapshot: | |||||
updates["snapshot"] = snapshot | last_visit_update = max( | ||||
self._origin_visit_updates[visit_key], key=lambda v: v.date | |||||
) | |||||
with convert_validation_exceptions(): | with convert_validation_exceptions(): | ||||
visit = attr.evolve(visit, **updates) | visit_update = OriginVisitUpdate( | ||||
origin=origin_url, | |||||
visit=visit_id, | |||||
date=date or now(), | |||||
status=status, | |||||
snapshot=snapshot or last_visit_update.snapshot, | |||||
metadata=metadata or last_visit_update.metadata, | |||||
) | |||||
self._origin_visit_updates[visit_key].append(visit_update) | |||||
self.journal_writer.origin_visit_update(visit) | self.journal_writer.origin_visit_update( | ||||
self._origin_visit_get_updated(origin_url, visit_id) | |||||
) | |||||
self._origin_visits[origin_url][visit_id - 1] = visit | self._origin_visits[origin_url][visit_id - 1] = visit | ||||
def origin_visit_upsert(self, visits: Iterable[OriginVisit]) -> None: | def origin_visit_upsert(self, visits: Iterable[OriginVisit]) -> None: | ||||
for visit in visits: | for visit in visits: | ||||
if visit.visit is None: | if visit.visit is None: | ||||
raise StorageArgumentException(f"Missing visit id for visit {visit}") | raise StorageArgumentException(f"Missing visit id for visit {visit}") | ||||
self.journal_writer.origin_visit_upsert(visits) | self.journal_writer.origin_visit_upsert(visits) | ||||
date = now() | |||||
for visit in visits: | for visit in visits: | ||||
visit_id = visit.visit | assert visit.visit is not None | ||||
origin_url = visit.origin | origin_url = visit.origin | ||||
origin = self.origin_get({"url": origin_url}) | |||||
with convert_validation_exceptions(): | if not origin: # Cannot add a visit without an origin | ||||
visit = attr.evolve(visit, origin=origin_url) | raise StorageArgumentException("Unknown origin %s", origin_url) | ||||
self._objects[(origin_url, visit_id)].append(("origin_visit", None)) | if origin_url in self._origins: | ||||
origin = self._origins[origin_url] | |||||
# visit ids are in the range [1, +inf[ | |||||
assert visit.visit is not None | |||||
visit_key = (origin_url, visit.visit) | |||||
if visit_id: | with convert_validation_exceptions(): | ||||
while len(self._origin_visits[origin_url]) <= visit_id: | visit_update = OriginVisitUpdate( | ||||
origin=origin_url, | |||||
visit=visit.visit, | |||||
date=date, | |||||
status=visit.status, | |||||
snapshot=visit.snapshot, | |||||
metadata=visit.metadata, | |||||
) | |||||
self._origin_visit_updates.setdefault(visit_key, []) | |||||
while len(self._origin_visits[origin_url]) <= visit.visit: | |||||
self._origin_visits[origin_url].append(None) | self._origin_visits[origin_url].append(None) | ||||
self._origin_visits[origin_url][visit_id - 1] = visit | self._origin_visits[origin_url][visit.visit - 1] = visit | ||||
self._origin_visit_updates[visit_key].append(visit_update) | |||||
self._objects[visit_key].append(("origin_visit", None)) | |||||
def _origin_visit_get_updated( | |||||
self, origin: str, visit_id: int | |||||
) -> Optional[OriginVisit]: | |||||
"""Merge origin visit and latest origin visit update | |||||
def _convert_visit(self, visit): | """ | ||||
assert visit_id >= 1 | |||||
visit = self._origin_visits[origin][visit_id - 1] | |||||
if visit is None: | if visit is None: | ||||
return | return None | ||||
visit_key = (origin, visit_id) | |||||
visit = visit.to_dict() | visit_update = max(self._origin_visit_updates[visit_key], key=lambda v: v.date) | ||||
return visit | return OriginVisit.from_dict( | ||||
{ | |||||
# default to the values in visit | |||||
**visit.to_dict(), | |||||
# override with the last update | |||||
**visit_update.to_dict(), | |||||
# but keep the date of the creation of the origin visit | |||||
"date": visit.date, | |||||
} | |||||
) | |||||
def origin_visit_get( | def origin_visit_get( | ||||
self, origin: str, last_visit: Optional[int] = None, limit: Optional[int] = None | self, origin: str, last_visit: Optional[int] = None, limit: Optional[int] = None | ||||
) -> Iterable[Dict[str, Any]]: | ) -> Iterable[Dict[str, Any]]: | ||||
origin_url = self._get_origin_url(origin) | origin_url = self._get_origin_url(origin) | ||||
if origin_url in self._origin_visits: | if origin_url in self._origin_visits: | ||||
visits = self._origin_visits[origin_url] | visits = self._origin_visits[origin_url] | ||||
if last_visit is not None: | if last_visit is not None: | ||||
visits = visits[last_visit:] | visits = visits[last_visit:] | ||||
if limit is not None: | if limit is not None: | ||||
visits = visits[:limit] | visits = visits[:limit] | ||||
for visit in visits: | for visit in visits: | ||||
if not visit: | if not visit: | ||||
continue | continue | ||||
visit_id = visit.visit | visit_id = visit.visit | ||||
yield self._convert_visit(self._origin_visits[origin_url][visit_id - 1]) | visit_update = self._origin_visit_get_updated(origin_url, visit_id) | ||||
assert visit_update is not None | |||||
yield visit_update.to_dict() | |||||
def origin_visit_find_by_date( | def origin_visit_find_by_date( | ||||
self, origin: str, visit_date: datetime.datetime | self, origin: str, visit_date: datetime.datetime | ||||
) -> Optional[Dict[str, Any]]: | ) -> Optional[Dict[str, Any]]: | ||||
origin_url = self._get_origin_url(origin) | origin_url = self._get_origin_url(origin) | ||||
if origin_url in self._origin_visits: | if origin_url in self._origin_visits: | ||||
visits = self._origin_visits[origin_url] | visits = self._origin_visits[origin_url] | ||||
visit = min(visits, key=lambda v: (abs(v.date - visit_date), -v.visit)) | visit = min(visits, key=lambda v: (abs(v.date - visit_date), -v.visit)) | ||||
return self._convert_visit(visit) | visit_update = self._origin_visit_get_updated(origin, visit.visit) | ||||
assert visit_update is not None | |||||
return visit_update.to_dict() | |||||
return None | return None | ||||
def origin_visit_get_by(self, origin: str, visit: int) -> Optional[Dict[str, Any]]: | def origin_visit_get_by(self, origin: str, visit: int) -> Optional[Dict[str, Any]]: | ||||
origin_url = self._get_origin_url(origin) | origin_url = self._get_origin_url(origin) | ||||
if origin_url in self._origin_visits and visit <= len( | if origin_url in self._origin_visits and visit <= len( | ||||
self._origin_visits[origin_url] | self._origin_visits[origin_url] | ||||
): | ): | ||||
return self._convert_visit(self._origin_visits[origin_url][visit - 1]) | visit_update = self._origin_visit_get_updated(origin_url, visit) | ||||
assert visit_update is not None | |||||
return visit_update.to_dict() | |||||
return None | return None | ||||
def origin_visit_get_latest( | def origin_visit_get_latest( | ||||
self, | self, | ||||
origin: str, | origin: str, | ||||
allowed_statuses: Optional[List[str]] = None, | allowed_statuses: Optional[List[str]] = None, | ||||
require_snapshot: bool = False, | require_snapshot: bool = False, | ||||
) -> Optional[Dict[str, Any]]: | ) -> Optional[Dict[str, Any]]: | ||||
ori = self._origins.get(origin) | ori = self._origins.get(origin) | ||||
if not ori: | if not ori: | ||||
return None | return None | ||||
visits = self._origin_visits[ori.url] | visits = self._origin_visits[ori.url] | ||||
visits = [ | |||||
self._origin_visit_get_updated(visit.origin, visit.visit) | |||||
for visit in visits | |||||
if visit is not None | |||||
] | |||||
if allowed_statuses is not None: | if allowed_statuses is not None: | ||||
visits = [visit for visit in visits if visit.status in allowed_statuses] | visits = [visit for visit in visits if visit.status in allowed_statuses] | ||||
if require_snapshot: | if require_snapshot: | ||||
visits = [visit for visit in visits if visit.snapshot] | visits = [visit for visit in visits if visit.snapshot] | ||||
visit = max(visits, key=lambda v: (v.date, v.visit), default=None) | visit = max(visits, key=lambda v: (v.date, v.visit), default=None) | ||||
return self._convert_visit(visit) | if visit is None: | ||||
return None | |||||
return visit.to_dict() | |||||
def _select_random_origin_visit_by_type(self, type: str) -> str: | def _select_random_origin_visit_by_type(self, type: str) -> str: | ||||
while True: | while True: | ||||
url = random.choice(list(self._origin_visits.keys())) | url = random.choice(list(self._origin_visits.keys())) | ||||
random_origin_visits = self._origin_visits[url] | random_origin_visits = self._origin_visits[url] | ||||
if random_origin_visits[0].type == type: | if random_origin_visits[0].type == type: | ||||
return url | return url | ||||
def origin_visit_get_random(self, type: str) -> Optional[Dict[str, Any]]: | def origin_visit_get_random(self, type: str) -> Optional[Dict[str, Any]]: | ||||
url = self._select_random_origin_visit_by_type(type) | url = self._select_random_origin_visit_by_type(type) | ||||
random_origin_visits = copy.deepcopy(self._origin_visits[url]) | random_origin_visits = copy.deepcopy(self._origin_visits[url]) | ||||
random_origin_visits.reverse() | random_origin_visits.reverse() | ||||
back_in_the_day = now() - timedelta(weeks=12) # 3 months back | back_in_the_day = now() - timedelta(weeks=12) # 3 months back | ||||
# This should be enough for tests | # This should be enough for tests | ||||
for visit in random_origin_visits: | for visit in random_origin_visits: | ||||
if visit.date > back_in_the_day and visit.status == "full": | updated_visit = self._origin_visit_get_updated(url, visit.visit) | ||||
return visit.to_dict() | assert updated_visit is not None | ||||
if updated_visit.date > back_in_the_day and updated_visit.status == "full": | |||||
return updated_visit.to_dict() | |||||
else: | else: | ||||
return None | return None | ||||
def stat_counters(self): | def stat_counters(self): | ||||
keys = ( | keys = ( | ||||
"content", | "content", | ||||
"directory", | "directory", | ||||
"origin", | "origin", | ||||
▲ Show 20 Lines • Show All 135 Lines • Show Last 20 Lines |