Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/in_memory.py
| # Copyright (C) 2015-2020 The Software Heritage developers | # Copyright (C) 2015-2020 The Software Heritage developers | ||||
| # See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
| # License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
| # See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
| import re | import re | ||||
| import bisect | import bisect | ||||
| import dateutil | import dateutil | ||||
| import collections | import collections | ||||
| import copy | import copy | ||||
| import datetime | import datetime | ||||
| import itertools | import itertools | ||||
| import random | import random | ||||
| from collections import defaultdict | from collections import defaultdict | ||||
| from datetime import timedelta | from datetime import timedelta | ||||
| from typing import Any, Dict, Iterable, List, Optional, Union | from typing import Any, Dict, Iterable, List, Optional, Tuple, Union | ||||
| import attr | import attr | ||||
| from swh.model.model import ( | from swh.model.model import ( | ||||
| BaseContent, Content, SkippedContent, Directory, Revision, | BaseContent, Content, SkippedContent, Directory, Revision, | ||||
| Release, Snapshot, OriginVisit, Origin, SHA1_SIZE | Release, Snapshot, OriginVisit, OriginVisitUpdate, Origin, SHA1_SIZE | ||||
| ) | ) | ||||
| from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex | from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex | ||||
| from swh.storage.objstorage import ObjStorage | from swh.storage.objstorage import ObjStorage | ||||
| from swh.storage.validate import convert_validation_exceptions | from swh.storage.validate import convert_validation_exceptions | ||||
| from swh.storage.utils import now | from swh.storage.utils import now | ||||
| from .exc import StorageArgumentException, HashCollision | from .exc import StorageArgumentException, HashCollision | ||||
| Show All 19 Lines | def reset(self): | ||||
| self._directories = {} | self._directories = {} | ||||
| self._revisions = {} | self._revisions = {} | ||||
| self._releases = {} | self._releases = {} | ||||
| self._snapshots = {} | self._snapshots = {} | ||||
| self._origins = {} | self._origins = {} | ||||
| self._origins_by_id = [] | self._origins_by_id = [] | ||||
| self._origins_by_sha1 = {} | self._origins_by_sha1 = {} | ||||
| self._origin_visits = {} | self._origin_visits = {} | ||||
| self._origin_visit_updates: Dict[ | |||||
| Tuple[str, int], List[OriginVisitUpdate]] = {} | |||||
| self._persons = [] | self._persons = [] | ||||
| self._origin_metadata = defaultdict(list) | self._origin_metadata = defaultdict(list) | ||||
| self._tools = {} | self._tools = {} | ||||
| self._metadata_providers = {} | self._metadata_providers = {} | ||||
| self._objects = defaultdict(list) | self._objects = defaultdict(list) | ||||
| # ideally we would want a skip list for both fast inserts and searches | # ideally we would want a skip list for both fast inserts and searches | ||||
| self._sorted_sha1s = [] | self._sorted_sha1s = [] | ||||
| ▲ Show 20 Lines • Show All 424 Lines • ▼ Show 20 Lines | class InMemoryStorage: | ||||
| def snapshot_get_by_origin_visit(self, origin, visit): | def snapshot_get_by_origin_visit(self, origin, visit): | ||||
| origin_url = self._get_origin_url(origin) | origin_url = self._get_origin_url(origin) | ||||
| if not origin_url: | if not origin_url: | ||||
| return | return | ||||
| if origin_url not in self._origins or \ | if origin_url not in self._origins or \ | ||||
| visit > len(self._origin_visits[origin_url]): | visit > len(self._origin_visits[origin_url]): | ||||
| return None | return None | ||||
| snapshot_id = self._origin_visits[origin_url][visit-1].snapshot | |||||
| visit = self._origin_visit_get_updated(origin_url, visit) | |||||
| snapshot_id = visit.snapshot | |||||
| if snapshot_id: | if snapshot_id: | ||||
| return self.snapshot_get(snapshot_id) | return self.snapshot_get(snapshot_id) | ||||
| else: | else: | ||||
| return None | return None | ||||
| def snapshot_get_latest(self, origin, allowed_statuses=None): | def snapshot_get_latest(self, origin, allowed_statuses=None): | ||||
| origin_url = self._get_origin_url(origin) | origin_url = self._get_origin_url(origin) | ||||
| if not origin_url: | if not origin_url: | ||||
| ▲ Show 20 Lines • Show All 148 Lines • ▼ Show 20 Lines | def origin_search(self, url_pattern, offset=0, limit=50, | ||||
| regexp=False, with_visit=False): | regexp=False, with_visit=False): | ||||
| origins = map(self._convert_origin, self._origins.values()) | origins = map(self._convert_origin, self._origins.values()) | ||||
| if regexp: | if regexp: | ||||
| pat = re.compile(url_pattern) | pat = re.compile(url_pattern) | ||||
| origins = [orig for orig in origins if pat.search(orig['url'])] | origins = [orig for orig in origins if pat.search(orig['url'])] | ||||
| else: | else: | ||||
| origins = [orig for orig in origins if url_pattern in orig['url']] | origins = [orig for orig in origins if url_pattern in orig['url']] | ||||
| if with_visit: | if with_visit: | ||||
| origins = [ | filtered_origins = [] | ||||
| orig for orig in origins | for orig in origins: | ||||
| if len(self._origin_visits[orig['url']]) > 0 and | visits = (self._origin_visit_get_updated(ov.origin, ov.visit) | ||||
| set(ov.snapshot | for ov in self._origin_visits[orig['url']]) | ||||
| for ov in self._origin_visits[orig['url']] | for ov in visits: | ||||
| if ov.snapshot) & | if ov.snapshot and ov.snapshot in self._snapshots: | ||||
| set(self._snapshots)] | filtered_origins.append(orig) | ||||
| break | |||||
| else: | |||||
| filtered_origins = origins | |||||
| return origins[offset:offset+limit] | return filtered_origins[offset:offset+limit] | ||||
| def origin_count(self, url_pattern, regexp=False, with_visit=False): | def origin_count(self, url_pattern, regexp=False, with_visit=False): | ||||
| return len(self.origin_search(url_pattern, regexp=regexp, | return len(self.origin_search(url_pattern, regexp=regexp, | ||||
| with_visit=with_visit, | with_visit=with_visit, | ||||
| limit=len(self._origins))) | limit=len(self._origins))) | ||||
| def origin_add(self, origins: Iterable[Origin]) -> List[Dict]: | def origin_add(self, origins: Iterable[Origin]) -> List[Dict]: | ||||
| origins = copy.deepcopy(list(origins)) | origins = copy.deepcopy(list(origins)) | ||||
| Show All 32 Lines | def origin_visit_add(self, origin_url: str, | ||||
| raise StorageArgumentException( | raise StorageArgumentException( | ||||
| 'Unknown origin %s', origin_url) | 'Unknown origin %s', origin_url) | ||||
| if origin_url in self._origins: | if origin_url in self._origins: | ||||
| origin = self._origins[origin_url] | origin = self._origins[origin_url] | ||||
| # visit ids are in the range [1, +inf[ | # visit ids are in the range [1, +inf[ | ||||
| visit_id = len(self._origin_visits[origin_url]) + 1 | visit_id = len(self._origin_visits[origin_url]) + 1 | ||||
| status = 'ongoing' | status = 'ongoing' | ||||
| with convert_validation_exceptions(): | |||||
| visit = OriginVisit( | visit = OriginVisit( | ||||
| origin=origin_url, | origin=origin_url, | ||||
| date=date, | date=date, | ||||
| type=type, | type=type, | ||||
| # TODO: Remove when we remove those fields from the model | |||||
| status=status, | status=status, | ||||
| snapshot=None, | snapshot=None, | ||||
| metadata=None, | metadata=None, | ||||
| visit=visit_id, | visit=visit_id, | ||||
| ) | ) | ||||
| self._origin_visits[origin_url].append(visit) | self._origin_visits[origin_url].append(visit) | ||||
| visit = visit | assert visit.visit is not None | ||||
| visit_key = (origin_url, visit.visit) | |||||
| self._objects[(origin_url, visit.visit)].append( | with convert_validation_exceptions(): | ||||
| visit_update = OriginVisitUpdate( | |||||
| origin=origin_url, | |||||
| visit=visit_id, | |||||
| date=date, | |||||
| status=status, | |||||
| snapshot=None, | |||||
| metadata=None, | |||||
| ) | |||||
| self._origin_visit_updates[visit_key] = [visit_update] | |||||
| self._objects[visit_key].append( | |||||
| ('origin_visit', None)) | ('origin_visit', None)) | ||||
| self.journal_writer.origin_visit_add(visit) | self.journal_writer.origin_visit_add(visit) | ||||
| # return last visit | # return last visit | ||||
| return visit | return visit | ||||
| def origin_visit_update( | def origin_visit_update( | ||||
| self, origin: str, visit_id: int, status: str, | self, origin: str, visit_id: int, status: str, | ||||
| metadata: Optional[Dict] = None, snapshot: Optional[bytes] = None, | metadata: Optional[Dict] = None, snapshot: Optional[bytes] = None, | ||||
| date: Optional[datetime.datetime] = None): | date: Optional[datetime.datetime] = None): | ||||
| origin_url = self._get_origin_url(origin) | origin_url = self._get_origin_url(origin) | ||||
| if origin_url is None: | if origin_url is None: | ||||
| raise StorageArgumentException('Unknown origin.') | raise StorageArgumentException('Unknown origin.') | ||||
| try: | try: | ||||
| visit = self._origin_visits[origin_url][visit_id-1] | visit = self._origin_visits[origin_url][visit_id-1] | ||||
| except IndexError: | except IndexError: | ||||
| raise StorageArgumentException( | raise StorageArgumentException( | ||||
| 'Unknown visit_id for this origin') from None | 'Unknown visit_id for this origin') from None | ||||
| updates: Dict[str, Any] = { | # Retrieve the previous visit update | ||||
| 'status': status | assert visit.visit is not None | ||||
| } | visit_key = (origin_url, visit.visit) | ||||
| if metadata: | |||||
| updates['metadata'] = metadata | last_visit_update = max( | ||||
| if snapshot: | self._origin_visit_updates[visit_key], key=lambda v: v.date) | ||||
| updates['snapshot'] = snapshot | |||||
| with convert_validation_exceptions(): | with convert_validation_exceptions(): | ||||
| visit = attr.evolve(visit, **updates) | visit_update = OriginVisitUpdate( | ||||
| origin=origin_url, | |||||
| visit=visit_id, | |||||
| date=date or now(), | |||||
| status=status, | |||||
| snapshot=snapshot or last_visit_update.snapshot, | |||||
| metadata=metadata or last_visit_update.metadata, | |||||
| ) | |||||
| self._origin_visit_updates[visit_key].append(visit_update) | |||||
| self.journal_writer.origin_visit_update(visit) | self.journal_writer.origin_visit_update( | ||||
| self._origin_visit_get_updated(origin_url, visit_id)) | |||||
| self._origin_visits[origin_url][visit_id-1] = visit | self._origin_visits[origin_url][visit_id-1] = visit | ||||
| def origin_visit_upsert(self, visits: Iterable[OriginVisit]) -> None: | def origin_visit_upsert(self, visits: Iterable[OriginVisit]) -> None: | ||||
| for visit in visits: | for visit in visits: | ||||
| if visit.visit is None: | if visit.visit is None: | ||||
| raise StorageArgumentException( | raise StorageArgumentException( | ||||
| f'Missing visit id for visit {visit}') | f'Missing visit id for visit {visit}') | ||||
| self.journal_writer.origin_visit_upsert(visits) | self.journal_writer.origin_visit_upsert(visits) | ||||
| date = now() | |||||
| for visit in visits: | for visit in visits: | ||||
| visit_id = visit.visit | assert visit.visit is not None | ||||
| origin_url = visit.origin | origin_url = visit.origin | ||||
| origin = self.origin_get({'url': origin_url}) | |||||
| with convert_validation_exceptions(): | if not origin: # Cannot add a visit without an origin | ||||
| visit = attr.evolve(visit, origin=origin_url) | raise StorageArgumentException( | ||||
| 'Unknown origin %s', origin_url) | |||||
| self._objects[(origin_url, visit_id)].append( | if origin_url in self._origins: | ||||
| ('origin_visit', None)) | origin = self._origins[origin_url] | ||||
| # visit ids are in the range [1, +inf[ | |||||
| assert visit.visit is not None | |||||
| visit_key = (origin_url, visit.visit) | |||||
| with convert_validation_exceptions(): | |||||
| visit_update = OriginVisitUpdate( | |||||
| origin=origin_url, | |||||
| visit=visit.visit, | |||||
| date=date, | |||||
| status=visit.status, | |||||
| snapshot=visit.snapshot, | |||||
| metadata=visit.metadata, | |||||
| ) | |||||
| if visit_id: | self._origin_visit_updates.setdefault(visit_key, []) | ||||
| while len(self._origin_visits[origin_url]) <= visit_id: | while len(self._origin_visits[origin_url]) <= visit.visit: | ||||
| self._origin_visits[origin_url].append(None) | self._origin_visits[origin_url].append(None) | ||||
| self._origin_visits[origin_url][visit_id-1] = visit | self._origin_visits[origin_url][visit.visit-1] = visit | ||||
| self._origin_visit_updates[visit_key].append(visit_update) | |||||
| self._objects[visit_key].append( | |||||
| ('origin_visit', None)) | |||||
| def _convert_visit(self, visit): | def _origin_visit_get_updated( | ||||
| self, origin: str, visit_id: int) -> Optional[OriginVisit]: | |||||
| """Merge origin visit and latest origin visit update | |||||
| """ | |||||
| assert visit_id >= 1 | |||||
| visit = self._origin_visits[origin][visit_id-1] | |||||
| if visit is None: | if visit is None: | ||||
| return | return None | ||||
| visit_key = (origin, visit_id) | |||||
| visit = visit.to_dict() | visit_update = max( | ||||
| self._origin_visit_updates[visit_key], key=lambda v: v.date) | |||||
| return visit | return OriginVisit.from_dict({ | ||||
| # default to the values in visit | |||||
| **visit.to_dict(), | |||||
| # override with the last update | |||||
| **visit_update.to_dict(), | |||||
| # but keep the date of the creation of the origin visit | |||||
| 'date': visit.date | |||||
| }) | |||||
| def origin_visit_get( | def origin_visit_get( | ||||
| self, origin: str, last_visit: Optional[int] = None, | self, origin: str, last_visit: Optional[int] = None, | ||||
| limit: Optional[int] = None) -> Iterable[Dict[str, Any]]: | limit: Optional[int] = None) -> Iterable[Dict[str, Any]]: | ||||
| origin_url = self._get_origin_url(origin) | origin_url = self._get_origin_url(origin) | ||||
| if origin_url in self._origin_visits: | if origin_url in self._origin_visits: | ||||
| visits = self._origin_visits[origin_url] | visits = self._origin_visits[origin_url] | ||||
| if last_visit is not None: | if last_visit is not None: | ||||
| visits = visits[last_visit:] | visits = visits[last_visit:] | ||||
| if limit is not None: | if limit is not None: | ||||
| visits = visits[:limit] | visits = visits[:limit] | ||||
| for visit in visits: | for visit in visits: | ||||
| if not visit: | if not visit: | ||||
| continue | continue | ||||
| visit_id = visit.visit | visit_id = visit.visit | ||||
| yield self._convert_visit( | visit_update = self._origin_visit_get_updated( | ||||
| self._origin_visits[origin_url][visit_id-1]) | origin_url, visit_id) | ||||
| assert visit_update is not None | |||||
| yield visit_update.to_dict() | |||||
| def origin_visit_find_by_date( | def origin_visit_find_by_date( | ||||
| self, origin: str, | self, origin: str, | ||||
| visit_date: datetime.datetime) -> Optional[Dict[str, Any]]: | visit_date: datetime.datetime) -> Optional[Dict[str, Any]]: | ||||
| origin_url = self._get_origin_url(origin) | origin_url = self._get_origin_url(origin) | ||||
| if origin_url in self._origin_visits: | if origin_url in self._origin_visits: | ||||
| visits = self._origin_visits[origin_url] | visits = self._origin_visits[origin_url] | ||||
| visit = min( | visit = min( | ||||
| visits, | visits, | ||||
| key=lambda v: (abs(v.date - visit_date), -v.visit)) | key=lambda v: (abs(v.date - visit_date), -v.visit)) | ||||
| return self._convert_visit(visit) | visit_update = self._origin_visit_get_updated( | ||||
| origin, visit.visit) | |||||
| assert visit_update is not None | |||||
| return visit_update.to_dict() | |||||
| return None | return None | ||||
| def origin_visit_get_by( | def origin_visit_get_by( | ||||
| self, origin: str, visit: int) -> Optional[Dict[str, Any]]: | self, origin: str, visit: int) -> Optional[Dict[str, Any]]: | ||||
| origin_url = self._get_origin_url(origin) | origin_url = self._get_origin_url(origin) | ||||
| if origin_url in self._origin_visits and \ | if origin_url in self._origin_visits and \ | ||||
| visit <= len(self._origin_visits[origin_url]): | visit <= len(self._origin_visits[origin_url]): | ||||
| return self._convert_visit( | visit_update = self._origin_visit_get_updated( | ||||
| self._origin_visits[origin_url][visit-1]) | origin_url, visit) | ||||
| assert visit_update is not None | |||||
| return visit_update.to_dict() | |||||
| return None | return None | ||||
| def origin_visit_get_latest( | def origin_visit_get_latest( | ||||
| self, origin: str, allowed_statuses: Optional[List[str]] = None, | self, origin: str, allowed_statuses: Optional[List[str]] = None, | ||||
| require_snapshot: bool = False) -> Optional[Dict[str, Any]]: | require_snapshot: bool = False) -> Optional[Dict[str, Any]]: | ||||
| ori = self._origins.get(origin) | ori = self._origins.get(origin) | ||||
| if not ori: | if not ori: | ||||
| return None | return None | ||||
| visits = self._origin_visits[ori.url] | visits = self._origin_visits[ori.url] | ||||
| visits = [self._origin_visit_get_updated(visit.origin, visit.visit) | |||||
| for visit in visits | |||||
| if visit is not None] | |||||
| if allowed_statuses is not None: | if allowed_statuses is not None: | ||||
| visits = [visit for visit in visits | visits = [visit for visit in visits | ||||
| if visit.status in allowed_statuses] | if visit.status in allowed_statuses] | ||||
| if require_snapshot: | if require_snapshot: | ||||
| visits = [visit for visit in visits | visits = [visit for visit in visits | ||||
| if visit.snapshot] | if visit.snapshot] | ||||
| visit = max( | visit = max( | ||||
| visits, key=lambda v: (v.date, v.visit), default=None) | visits, key=lambda v: (v.date, v.visit), default=None) | ||||
| return self._convert_visit(visit) | if visit is None: | ||||
| return None | |||||
| return visit.to_dict() | |||||
| def _select_random_origin_visit_by_type(self, type: str) -> str: | def _select_random_origin_visit_by_type(self, type: str) -> str: | ||||
| while True: | while True: | ||||
| url = random.choice(list(self._origin_visits.keys())) | url = random.choice(list(self._origin_visits.keys())) | ||||
| random_origin_visits = self._origin_visits[url] | random_origin_visits = self._origin_visits[url] | ||||
| if random_origin_visits[0].type == type: | if random_origin_visits[0].type == type: | ||||
| return url | return url | ||||
| def origin_visit_get_random(self, type: str) -> Optional[Dict[str, Any]]: | def origin_visit_get_random(self, type: str) -> Optional[Dict[str, Any]]: | ||||
| url = self._select_random_origin_visit_by_type(type) | url = self._select_random_origin_visit_by_type(type) | ||||
| random_origin_visits = copy.deepcopy(self._origin_visits[url]) | random_origin_visits = copy.deepcopy(self._origin_visits[url]) | ||||
| random_origin_visits.reverse() | random_origin_visits.reverse() | ||||
| back_in_the_day = now() - timedelta(weeks=12) # 3 months back | back_in_the_day = now() - timedelta(weeks=12) # 3 months back | ||||
| # This should be enough for tests | # This should be enough for tests | ||||
| for visit in random_origin_visits: | for visit in random_origin_visits: | ||||
| if visit.date > back_in_the_day and visit.status == 'full': | updated_visit = self._origin_visit_get_updated( | ||||
| return visit.to_dict() | url, visit.visit) | ||||
| assert updated_visit is not None | |||||
| if updated_visit.date > back_in_the_day \ | |||||
| and updated_visit.status == 'full': | |||||
| return updated_visit.to_dict() | |||||
| else: | else: | ||||
| return None | return None | ||||
| def stat_counters(self): | def stat_counters(self): | ||||
| keys = ( | keys = ( | ||||
| 'content', | 'content', | ||||
| 'directory', | 'directory', | ||||
| 'origin', | 'origin', | ||||
| ▲ Show 20 Lines • Show All 125 Lines • Show Last 20 Lines | |||||