Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/in_memory.py
# Copyright (C) 2015-2020 The Software Heritage developers | # Copyright (C) 2015-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import re | import re | ||||
import bisect | import bisect | ||||
import dateutil | import dateutil | ||||
import collections | import collections | ||||
import copy | import copy | ||||
import datetime | import datetime | ||||
import itertools | import itertools | ||||
import random | import random | ||||
from collections import defaultdict | from collections import defaultdict | ||||
from datetime import timedelta | from datetime import timedelta | ||||
from typing import Any, Dict, Iterable, List, Optional, Union | from typing import Any, Dict, Iterable, List, Optional, Tuple, Union | ||||
import attr | import attr | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
BaseContent, Content, SkippedContent, Directory, Revision, | BaseContent, Content, SkippedContent, Directory, Revision, | ||||
Release, Snapshot, OriginVisit, Origin, SHA1_SIZE | Release, Snapshot, OriginVisit, OriginVisitUpdate, Origin, SHA1_SIZE | ||||
) | ) | ||||
from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex | from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex | ||||
from swh.storage.objstorage import ObjStorage | from swh.storage.objstorage import ObjStorage | ||||
from .exc import StorageArgumentException, HashCollision | from .exc import StorageArgumentException, HashCollision | ||||
from .converters import origin_url_to_sha1 | from .converters import origin_url_to_sha1 | ||||
from .utils import get_partition_bounds_bytes | from .utils import get_partition_bounds_bytes | ||||
from .validate import convert_validation_exceptions | |||||
from .writer import JournalWriter | from .writer import JournalWriter | ||||
# Max block size of contents to return | # Max block size of contents to return | ||||
BULK_BLOCK_CONTENT_LEN_MAX = 10000 | BULK_BLOCK_CONTENT_LEN_MAX = 10000 | ||||
def now(): | def now(): | ||||
return datetime.datetime.now(tz=datetime.timezone.utc) | return datetime.datetime.now(tz=datetime.timezone.utc) | ||||
Show All 13 Lines | def reset(self): | ||||
self._directories = {} | self._directories = {} | ||||
self._revisions = {} | self._revisions = {} | ||||
self._releases = {} | self._releases = {} | ||||
self._snapshots = {} | self._snapshots = {} | ||||
self._origins = {} | self._origins = {} | ||||
self._origins_by_id = [] | self._origins_by_id = [] | ||||
self._origins_by_sha1 = {} | self._origins_by_sha1 = {} | ||||
self._origin_visits = {} | self._origin_visits = {} | ||||
self._origin_visit_updates: Dict[ | |||||
Tuple[str, int], List[OriginVisitUpdate]] = {} | |||||
self._persons = [] | self._persons = [] | ||||
self._origin_metadata = defaultdict(list) | self._origin_metadata = defaultdict(list) | ||||
self._tools = {} | self._tools = {} | ||||
self._metadata_providers = {} | self._metadata_providers = {} | ||||
self._objects = defaultdict(list) | self._objects = defaultdict(list) | ||||
# ideally we would want a skip list for both fast inserts and searches | # ideally we would want a skip list for both fast inserts and searches | ||||
self._sorted_sha1s = [] | self._sorted_sha1s = [] | ||||
▲ Show 20 Lines • Show All 426 Lines • ▼ Show 20 Lines | class InMemoryStorage: | ||||
def snapshot_get_by_origin_visit(self, origin, visit): | def snapshot_get_by_origin_visit(self, origin, visit): | ||||
origin_url = self._get_origin_url(origin) | origin_url = self._get_origin_url(origin) | ||||
if not origin_url: | if not origin_url: | ||||
return | return | ||||
if origin_url not in self._origins or \ | if origin_url not in self._origins or \ | ||||
visit > len(self._origin_visits[origin_url]): | visit > len(self._origin_visits[origin_url]): | ||||
return None | return None | ||||
snapshot_id = self._origin_visits[origin_url][visit-1].snapshot | |||||
visit = self._origin_visit_get_updated(origin_url, visit) | |||||
snapshot_id = visit.snapshot | |||||
if snapshot_id: | if snapshot_id: | ||||
return self.snapshot_get(snapshot_id) | return self.snapshot_get(snapshot_id) | ||||
else: | else: | ||||
return None | return None | ||||
def snapshot_get_latest(self, origin, allowed_statuses=None): | def snapshot_get_latest(self, origin, allowed_statuses=None): | ||||
origin_url = self._get_origin_url(origin) | origin_url = self._get_origin_url(origin) | ||||
if not origin_url: | if not origin_url: | ||||
▲ Show 20 Lines • Show All 148 Lines • ▼ Show 20 Lines | def origin_search(self, url_pattern, offset=0, limit=50, | ||||
regexp=False, with_visit=False): | regexp=False, with_visit=False): | ||||
origins = map(self._convert_origin, self._origins.values()) | origins = map(self._convert_origin, self._origins.values()) | ||||
if regexp: | if regexp: | ||||
pat = re.compile(url_pattern) | pat = re.compile(url_pattern) | ||||
origins = [orig for orig in origins if pat.search(orig['url'])] | origins = [orig for orig in origins if pat.search(orig['url'])] | ||||
else: | else: | ||||
origins = [orig for orig in origins if url_pattern in orig['url']] | origins = [orig for orig in origins if url_pattern in orig['url']] | ||||
if with_visit: | if with_visit: | ||||
origins = [ | filtered_origins = [] | ||||
orig for orig in origins | for orig in origins: | ||||
if len(self._origin_visits[orig['url']]) > 0 and | visits = (self._origin_visit_get_updated(ov.origin, ov.visit) | ||||
set(ov.snapshot | for ov in self._origin_visits[orig['url']]) | ||||
for ov in self._origin_visits[orig['url']] | for ov in visits: | ||||
if ov.snapshot) & | if ov.snapshot and ov.snapshot in self._snapshots: | ||||
set(self._snapshots)] | filtered_origins.append(orig) | ||||
break | |||||
else: | |||||
filtered_origins = origins | |||||
return origins[offset:offset+limit] | return filtered_origins[offset:offset+limit] | ||||
def origin_count(self, url_pattern, regexp=False, with_visit=False): | def origin_count(self, url_pattern, regexp=False, with_visit=False): | ||||
return len(self.origin_search(url_pattern, regexp=regexp, | return len(self.origin_search(url_pattern, regexp=regexp, | ||||
with_visit=with_visit, | with_visit=with_visit, | ||||
limit=len(self._origins))) | limit=len(self._origins))) | ||||
def origin_add(self, origins: Iterable[Origin]) -> List[Dict]: | def origin_add(self, origins: Iterable[Origin]) -> List[Dict]: | ||||
origins = copy.deepcopy(list(origins)) | origins = copy.deepcopy(list(origins)) | ||||
Show All 32 Lines | def origin_visit_add(self, origin_url: str, | ||||
raise StorageArgumentException( | raise StorageArgumentException( | ||||
'Unknown origin %s', origin_url) | 'Unknown origin %s', origin_url) | ||||
if origin_url in self._origins: | if origin_url in self._origins: | ||||
origin = self._origins[origin_url] | origin = self._origins[origin_url] | ||||
# visit ids are in the range [1, +inf[ | # visit ids are in the range [1, +inf[ | ||||
visit_id = len(self._origin_visits[origin_url]) + 1 | visit_id = len(self._origin_visits[origin_url]) + 1 | ||||
status = 'ongoing' | status = 'ongoing' | ||||
with convert_validation_exceptions(): | |||||
visit = OriginVisit( | visit = OriginVisit( | ||||
origin=origin_url, | origin=origin_url, | ||||
date=date, | date=date, | ||||
type=type, | type=type, | ||||
# TODO: Remove when we remove those fields from the model | |||||
status=status, | status=status, | ||||
snapshot=None, | snapshot=None, | ||||
metadata=None, | metadata=None, | ||||
visit=visit_id, | visit=visit_id, | ||||
) | ) | ||||
self._origin_visits[origin_url].append(visit) | self._origin_visits[origin_url].append(visit) | ||||
visit = visit | assert visit.visit is not None | ||||
visit_key = (origin_url, visit.visit) | |||||
with convert_validation_exceptions(): | |||||
visit_update = OriginVisitUpdate( | |||||
origin=origin_url, | |||||
visit=visit_id, | |||||
date=date, | |||||
status=status, | |||||
snapshot=None, | |||||
metadata=None, | |||||
) | |||||
self._origin_visit_updates[visit_key] = [visit_update] | |||||
self._objects[(origin_url, visit.visit)].append( | self._objects[visit_key].append( | ||||
('origin_visit', None)) | ('origin_visit', None)) | ||||
self.journal_writer.origin_visit_add(visit) | self.journal_writer.origin_visit_add(visit) | ||||
# return last visit | # return last visit | ||||
return visit | return visit | ||||
def origin_visit_update( | def origin_visit_update( | ||||
self, origin: str, visit_id: int, status: str, | self, origin: str, visit_id: int, status: str, | ||||
metadata: Optional[Dict] = None, snapshot: Optional[bytes] = None): | metadata: Optional[Dict] = None, snapshot: Optional[bytes] = None, | ||||
date: Optional[datetime.datetime] = None): | |||||
origin_url = self._get_origin_url(origin) | origin_url = self._get_origin_url(origin) | ||||
if origin_url is None: | if origin_url is None: | ||||
raise StorageArgumentException('Unknown origin.') | raise StorageArgumentException('Unknown origin.') | ||||
try: | try: | ||||
visit = self._origin_visits[origin_url][visit_id-1] | visit = self._origin_visits[origin_url][visit_id-1] | ||||
except IndexError: | except IndexError: | ||||
raise StorageArgumentException( | raise StorageArgumentException( | ||||
'Unknown visit_id for this origin') from None | 'Unknown visit_id for this origin') from None | ||||
updates: Dict[str, Any] = { | # Retrieve the previous visit update | ||||
'status': status | assert visit.visit is not None | ||||
} | visit_key = (origin_url, visit.visit) | ||||
if metadata: | |||||
updates['metadata'] = metadata | |||||
if snapshot: | |||||
updates['snapshot'] = snapshot | |||||
try: | last_visit_update = max( | ||||
visit = attr.evolve(visit, **updates) | self._origin_visit_updates[visit_key], key=lambda v: v.date) | ||||
except (KeyError, TypeError, ValueError) as e: | |||||
raise StorageArgumentException(*e.args) | with convert_validation_exceptions(): | ||||
visit_update = OriginVisitUpdate( | |||||
origin=origin_url, | |||||
visit=visit_id, | |||||
date=date or now(), | |||||
status=status, | |||||
snapshot=snapshot or last_visit_update.snapshot, | |||||
metadata=metadata or last_visit_update.metadata, | |||||
) | |||||
self._origin_visit_updates[visit_key].append(visit_update) | |||||
self.journal_writer.origin_visit_update(visit) | self.journal_writer.origin_visit_update( | ||||
self._origin_visit_get_updated(origin_url, visit_id)) | |||||
self._origin_visits[origin_url][visit_id-1] = visit | self._origin_visits[origin_url][visit_id-1] = visit | ||||
def origin_visit_upsert(self, visits: Iterable[OriginVisit]) -> None: | def origin_visit_upsert(self, visits: Iterable[OriginVisit]) -> None: | ||||
self.journal_writer.origin_visit_upsert(visits) | self.journal_writer.origin_visit_upsert(visits) | ||||
date = now() | |||||
for visit in visits: | for visit in visits: | ||||
visit_id = visit.visit | |||||
origin_url = visit.origin | origin_url = visit.origin | ||||
origin = self.origin_get({'url': origin_url}) | |||||
try: | if not origin: # Cannot add a visit without an origin | ||||
visit = attr.evolve(visit, origin=origin_url) | raise StorageArgumentException( | ||||
except (KeyError, TypeError, ValueError) as e: | 'Unknown origin %s', origin_url) | ||||
raise StorageArgumentException(*e.args) | |||||
self._objects[(origin_url, visit_id)].append( | if origin_url in self._origins: | ||||
('origin_visit', None)) | origin = self._origins[origin_url] | ||||
# visit ids are in the range [1, +inf[ | |||||
assert visit.visit is not None | |||||
visit_key = (origin_url, visit.visit) | |||||
if visit_id: | with convert_validation_exceptions(): | ||||
while len(self._origin_visits[origin_url]) <= visit_id: | visit_update = OriginVisitUpdate( | ||||
origin=origin_url, | |||||
visit=visit.visit, | |||||
date=date, | |||||
status=visit.status, | |||||
snapshot=visit.snapshot, | |||||
metadata=visit.metadata, | |||||
) | |||||
self._origin_visit_updates.setdefault(visit_key, []) | |||||
while len(self._origin_visits[origin_url]) <= visit.visit: | |||||
self._origin_visits[origin_url].append(None) | self._origin_visits[origin_url].append(None) | ||||
self._origin_visits[origin_url][visit_id-1] = visit | self._origin_visits[origin_url][visit.visit-1] = visit | ||||
self._origin_visit_updates[visit_key].append(visit_update) | |||||
self._objects[visit_key].append( | |||||
('origin_visit', None)) | |||||
def _convert_visit(self, visit): | def _convert_visit(self, visit): | ||||
if visit is None: | if visit is None: | ||||
return | return | ||||
visit = visit.to_dict() | visit = visit.to_dict() | ||||
return visit | return visit | ||||
def _origin_visit_get_updated( | |||||
self, origin: str, visit_id: int) -> Optional[OriginVisit]: | |||||
"""Merge origin visit and latest origin visit update | |||||
""" | |||||
assert visit_id >= 1 | |||||
visit = self._origin_visits[origin][visit_id-1] | |||||
if visit is None: | |||||
return None | |||||
visit_key = (origin, visit_id) | |||||
visit_update = max( | |||||
self._origin_visit_updates[visit_key], key=lambda v: v.date) | |||||
return OriginVisit.from_dict({ | |||||
# default to the values in visit | |||||
**visit.to_dict(), | |||||
# override with the last update | |||||
**visit_update.to_dict(), | |||||
# but keep the date of the creation of the origin visit | |||||
'date': visit.date | |||||
}) | |||||
def origin_visit_get(self, origin, last_visit=None, limit=None): | def origin_visit_get(self, origin, last_visit=None, limit=None): | ||||
origin_url = self._get_origin_url(origin) | origin_url = self._get_origin_url(origin) | ||||
if origin_url in self._origin_visits: | if origin_url in self._origin_visits: | ||||
visits = self._origin_visits[origin_url] | visits = self._origin_visits[origin_url] | ||||
if last_visit is not None: | if last_visit is not None: | ||||
visits = visits[last_visit:] | visits = visits[last_visit:] | ||||
if limit is not None: | if limit is not None: | ||||
visits = visits[:limit] | visits = visits[:limit] | ||||
for visit in visits: | for visit in visits: | ||||
if not visit: | if not visit: | ||||
continue | continue | ||||
visit_id = visit.visit | visit_id = visit.visit | ||||
yield self._convert_visit( | yield self._origin_visit_get_updated( | ||||
self._origin_visits[origin_url][visit_id-1]) | origin_url, visit_id).to_dict() | ||||
def origin_visit_find_by_date(self, origin, visit_date): | def origin_visit_find_by_date(self, origin, visit_date): | ||||
origin_url = self._get_origin_url(origin) | origin_url = self._get_origin_url(origin) | ||||
if origin_url in self._origin_visits: | if origin_url in self._origin_visits: | ||||
visits = self._origin_visits[origin_url] | visits = self._origin_visits[origin_url] | ||||
visit = min( | visit = min( | ||||
visits, | visits, | ||||
key=lambda v: (abs(v.date - visit_date), -v.visit)) | key=lambda v: (abs(v.date - visit_date), -v.visit)) | ||||
return self._convert_visit(visit) | return self._origin_visit_get_updated( | ||||
origin, visit.visit).to_dict() | |||||
def origin_visit_get_by(self, origin, visit): | def origin_visit_get_by(self, origin, visit): | ||||
origin_url = self._get_origin_url(origin) | origin_url = self._get_origin_url(origin) | ||||
if origin_url in self._origin_visits and \ | if origin_url in self._origin_visits and \ | ||||
visit <= len(self._origin_visits[origin_url]): | visit <= len(self._origin_visits[origin_url]): | ||||
return self._convert_visit( | return self._origin_visit_get_updated( | ||||
self._origin_visits[origin_url][visit-1]) | origin_url, visit).to_dict() | ||||
def origin_visit_get_latest( | def origin_visit_get_latest( | ||||
self, origin, allowed_statuses=None, require_snapshot=False): | self, origin, allowed_statuses=None, require_snapshot=False): | ||||
origin = self._origins.get(origin) | origin = self._origins.get(origin) | ||||
if not origin: | if not origin: | ||||
return | return | ||||
visits = self._origin_visits[origin.url] | visits = self._origin_visits[origin.url] | ||||
visits = [self._origin_visit_get_updated(visit.origin, visit.visit) | |||||
for visit in visits | |||||
if visit is not None] | |||||
if allowed_statuses is not None: | if allowed_statuses is not None: | ||||
visits = [visit for visit in visits | visits = [visit for visit in visits | ||||
if visit.status in allowed_statuses] | if visit.status in allowed_statuses] | ||||
if require_snapshot: | if require_snapshot: | ||||
visits = [visit for visit in visits | visits = [visit for visit in visits | ||||
if visit.snapshot] | if visit.snapshot] | ||||
visit = max( | visit = max( | ||||
visits, key=lambda v: (v.date, v.visit), default=None) | visits, key=lambda v: (v.date, v.visit), default=None) | ||||
return self._convert_visit(visit) | if visit is None: | ||||
return None | |||||
return visit.to_dict() | |||||
def _select_random_origin_visit_by_type(self, type: str) -> str: | def _select_random_origin_visit_by_type(self, type: str) -> str: | ||||
while True: | while True: | ||||
url = random.choice(list(self._origin_visits.keys())) | url = random.choice(list(self._origin_visits.keys())) | ||||
random_origin_visits = self._origin_visits[url] | random_origin_visits = self._origin_visits[url] | ||||
if random_origin_visits[0].type == type: | if random_origin_visits[0].type == type: | ||||
return url | return url | ||||
def origin_visit_get_random(self, type: str) -> Optional[Dict[str, Any]]: | def origin_visit_get_random(self, type: str) -> Optional[Dict[str, Any]]: | ||||
url = self._select_random_origin_visit_by_type(type) | url = self._select_random_origin_visit_by_type(type) | ||||
random_origin_visits = copy.deepcopy(self._origin_visits[url]) | random_origin_visits = copy.deepcopy(self._origin_visits[url]) | ||||
random_origin_visits.reverse() | random_origin_visits.reverse() | ||||
back_in_the_day = now() - timedelta(weeks=12) # 3 months back | back_in_the_day = now() - timedelta(weeks=12) # 3 months back | ||||
# This should be enough for tests | # This should be enough for tests | ||||
for visit in random_origin_visits: | for visit in random_origin_visits: | ||||
if visit.date > back_in_the_day and visit.status == 'full': | updated_visit = self._origin_visit_get_updated( | ||||
return visit.to_dict() | url, visit.visit) | ||||
assert updated_visit is not None | |||||
if updated_visit.date > back_in_the_day \ | |||||
and updated_visit.status == 'full': | |||||
return updated_visit.to_dict() | |||||
else: | else: | ||||
return None | return None | ||||
def stat_counters(self): | def stat_counters(self): | ||||
keys = ( | keys = ( | ||||
'content', | 'content', | ||||
'directory', | 'directory', | ||||
'origin', | 'origin', | ||||
▲ Show 20 Lines • Show All 125 Lines • Show Last 20 Lines |