Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/in_memory.py
# Copyright (C) 2015-2020 The Software Heritage developers | # Copyright (C) 2015-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import re | import re | ||||
import bisect | import bisect | ||||
import dateutil | import dateutil | ||||
import collections | import collections | ||||
import copy | import copy | ||||
import datetime | import datetime | ||||
import itertools | import itertools | ||||
import random | import random | ||||
from collections import defaultdict | from collections import defaultdict | ||||
from datetime import timedelta | from datetime import timedelta | ||||
from typing import Any, Dict, List, Optional | from typing import Any, Dict, Iterable, List, Optional, Union | ||||
import attr | import attr | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
BaseContent, Content, SkippedContent, Directory, Revision, Release, | BaseContent, Content, SkippedContent, Directory, Revision, Release, | ||||
Snapshot, OriginVisit, Origin, SHA1_SIZE) | Snapshot, OriginVisit, Origin, SHA1_SIZE) | ||||
from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex | from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex | ||||
from swh.objstorage import get_objstorage | from swh.objstorage import get_objstorage | ||||
▲ Show 20 Lines • Show All 45 Lines • ▼ Show 20 Lines | def reset(self): | ||||
# ideally we would want a skip list for both fast inserts and searches | # ideally we would want a skip list for both fast inserts and searches | ||||
self._sorted_sha1s = [] | self._sorted_sha1s = [] | ||||
self.objstorage = get_objstorage('memory', {}) | self.objstorage = get_objstorage('memory', {}) | ||||
def check_config(self, *, check_write): | def check_config(self, *, check_write): | ||||
return True | return True | ||||
def _content_add(self, contents, with_data): | def _content_add( | ||||
for content in contents: | self, contents: Iterable[Content], with_data: bool) -> Dict: | ||||
if content.status is None: | |||||
content.status = 'visible' | |||||
if content.status == 'absent': | |||||
raise StorageArgumentException('content with status=absent') | |||||
if content.length is None: | |||||
raise StorageArgumentException('content with length=None') | |||||
if self.journal_writer: | if self.journal_writer: | ||||
for content in contents: | for content in contents: | ||||
try: | |||||
content = attr.evolve(content, data=None) | content = attr.evolve(content, data=None) | ||||
except (KeyError, TypeError, ValueError) as e: | |||||
raise StorageArgumentException(*e.args) | |||||
self.journal_writer.write_addition('content', content) | self.journal_writer.write_addition('content', content) | ||||
summary = { | summary = { | ||||
'content:add': 0, | 'content:add': 0, | ||||
} | } | ||||
if with_data: | if with_data: | ||||
summary['content:add:bytes'] = 0 | summary['content:add:bytes'] = 0 | ||||
Show All 12 Lines | def _content_add( | ||||
self._content_indexes[algorithm][hash_].add(key) | self._content_indexes[algorithm][hash_].add(key) | ||||
self._objects[content.sha1_git].append( | self._objects[content.sha1_git].append( | ||||
('content', content.sha1)) | ('content', content.sha1)) | ||||
self._contents[key] = content | self._contents[key] = content | ||||
bisect.insort(self._sorted_sha1s, content.sha1) | bisect.insort(self._sorted_sha1s, content.sha1) | ||||
summary['content:add'] += 1 | summary['content:add'] += 1 | ||||
if with_data: | if with_data: | ||||
content_data = self._contents[key].data | content_data = self._contents[key].data | ||||
try: | |||||
self._contents[key] = attr.evolve( | self._contents[key] = attr.evolve( | ||||
self._contents[key], | self._contents[key], | ||||
data=None) | data=None) | ||||
except (KeyError, TypeError, ValueError) as e: | |||||
raise StorageArgumentException(*e.args) | |||||
summary['content:add:bytes'] += len(content_data) | summary['content:add:bytes'] += len(content_data) | ||||
self.objstorage.add(content_data, content.sha1) | self.objstorage.add(content_data, content.sha1) | ||||
return summary | return summary | ||||
def content_add(self, content): | def content_add(self, content: Iterable[Content]) -> Dict: | ||||
now = datetime.datetime.now(tz=datetime.timezone.utc) | now = datetime.datetime.now(tz=datetime.timezone.utc) | ||||
try: | content = [attr.evolve(c, ctime=now) for c in content] | ||||
content = [attr.evolve(Content.from_dict(c), ctime=now) | |||||
for c in content] | |||||
except (KeyError, TypeError, ValueError) as e: | |||||
raise StorageArgumentException(*e.args) | |||||
return self._content_add(content, with_data=True) | return self._content_add(content, with_data=True) | ||||
def content_update(self, content, keys=[]): | def content_update(self, content, keys=[]): | ||||
if self.journal_writer: | if self.journal_writer: | ||||
raise NotImplementedError( | raise NotImplementedError( | ||||
'content_update is not yet supported with a journal_writer.') | 'content_update is not yet supported with a journal_writer.') | ||||
for cont_update in content: | for cont_update in content: | ||||
cont_update = cont_update.copy() | cont_update = cont_update.copy() | ||||
sha1 = cont_update.pop('sha1') | sha1 = cont_update.pop('sha1') | ||||
for old_key in self._content_indexes['sha1'][sha1]: | for old_key in self._content_indexes['sha1'][sha1]: | ||||
old_cont = self._contents.pop(old_key) | old_cont = self._contents.pop(old_key) | ||||
for algorithm in DEFAULT_ALGORITHMS: | for algorithm in DEFAULT_ALGORITHMS: | ||||
hash_ = old_cont.get_hash(algorithm) | hash_ = old_cont.get_hash(algorithm) | ||||
self._content_indexes[algorithm][hash_].remove(old_key) | self._content_indexes[algorithm][hash_].remove(old_key) | ||||
try: | |||||
new_cont = attr.evolve(old_cont, **cont_update) | new_cont = attr.evolve(old_cont, **cont_update) | ||||
except (KeyError, TypeError, ValueError) as e: | |||||
raise StorageArgumentException(*e.args) | |||||
new_key = self._content_key(new_cont) | new_key = self._content_key(new_cont) | ||||
self._contents[new_key] = new_cont | self._contents[new_key] = new_cont | ||||
for algorithm in DEFAULT_ALGORITHMS: | for algorithm in DEFAULT_ALGORITHMS: | ||||
hash_ = new_cont.get_hash(algorithm) | hash_ = new_cont.get_hash(algorithm) | ||||
self._content_indexes[algorithm][hash_].add(new_key) | self._content_indexes[algorithm][hash_].add(new_key) | ||||
def content_add_metadata(self, content): | def content_add_metadata(self, content: Iterable[Content]) -> Dict: | ||||
content = [Content.from_dict(c) for c in content] | |||||
return self._content_add(content, with_data=False) | return self._content_add(content, with_data=False) | ||||
def content_get(self, content): | def content_get(self, content): | ||||
# FIXME: Make this method support slicing the `data`. | # FIXME: Make this method support slicing the `data`. | ||||
if len(content) > BULK_BLOCK_CONTENT_LEN_MAX: | if len(content) > BULK_BLOCK_CONTENT_LEN_MAX: | ||||
raise StorageArgumentException( | raise StorageArgumentException( | ||||
"Sending at most %s contents." % BULK_BLOCK_CONTENT_LEN_MAX) | "Sending at most %s contents." % BULK_BLOCK_CONTENT_LEN_MAX) | ||||
for obj_id in content: | for obj_id in content: | ||||
▲ Show 20 Lines • Show All 101 Lines • ▼ Show 20 Lines | class InMemoryStorage: | ||||
def content_missing_per_sha1_git(self, contents): | def content_missing_per_sha1_git(self, contents): | ||||
for content in contents: | for content in contents: | ||||
if content not in self._content_indexes['sha1_git']: | if content not in self._content_indexes['sha1_git']: | ||||
yield content | yield content | ||||
def content_get_random(self): | def content_get_random(self): | ||||
return random.choice(list(self._content_indexes['sha1_git'])) | return random.choice(list(self._content_indexes['sha1_git'])) | ||||
def _skipped_content_add(self, contents): | def _skipped_content_add(self, contents: Iterable[SkippedContent]) -> Dict: | ||||
for content in contents: | |||||
if content.status is None: | |||||
content = attr.evolve(content, status='absent') | |||||
if content.length is None: | |||||
content = attr.evolve(content, length=-1) | |||||
if content.status != 'absent': | |||||
raise StorageArgumentException( | |||||
f'Content with status={content.status}') | |||||
if self.journal_writer: | if self.journal_writer: | ||||
for content in contents: | for cont in contents: | ||||
self.journal_writer.write_addition('content', content) | self.journal_writer.write_addition('content', cont) | ||||
summary = { | summary = { | ||||
'skipped_content:add': 0 | 'skipped_content:add': 0 | ||||
} | } | ||||
skipped_content_missing = self.skipped_content_missing( | skipped_content_missing = self.skipped_content_missing( | ||||
[c.to_dict() for c in contents]) | [c.to_dict() for c in contents]) | ||||
for content in skipped_content_missing: | for content in skipped_content_missing: | ||||
key = self._content_key(content, allow_missing=True) | key = self._content_key(content, allow_missing=True) | ||||
for algo in DEFAULT_ALGORITHMS: | for algo in DEFAULT_ALGORITHMS: | ||||
if algo in content: | if content.get(algo): | ||||
self._skipped_content_indexes[algo][content[algo]] \ | self._skipped_content_indexes[algo][ | ||||
.add(key) | content.get(algo)].add(key) | ||||
self._skipped_contents[key] = content | self._skipped_contents[key] = content | ||||
summary['skipped_content:add'] += 1 | summary['skipped_content:add'] += 1 | ||||
return summary | return summary | ||||
def skipped_content_missing(self, contents): | def skipped_content_missing(self, contents): | ||||
for content in contents: | for content in contents: | ||||
for (key, algorithm) in self._content_key_algorithm(content): | for (key, algorithm) in self._content_key_algorithm(content): | ||||
if algorithm == 'blake2s256': | if algorithm == 'blake2s256': | ||||
continue | continue | ||||
if key not in self._skipped_content_indexes[algorithm]: | if key not in self._skipped_content_indexes[algorithm]: | ||||
# index must contain hashes of algos except blake2s256 | # index must contain hashes of algos except blake2s256 | ||||
# else the content is considered skipped | # else the content is considered skipped | ||||
yield {algo: content[algo] | yield {algo: content[algo] | ||||
for algo in DEFAULT_ALGORITHMS | for algo in DEFAULT_ALGORITHMS | ||||
if content[algo] is not None} | if content[algo] is not None} | ||||
break | break | ||||
def skipped_content_add(self, content): | def skipped_content_add(self, content: Iterable[SkippedContent]) -> Dict: | ||||
content = list(content) | content = list(content) | ||||
now = datetime.datetime.now(tz=datetime.timezone.utc) | now = datetime.datetime.now(tz=datetime.timezone.utc) | ||||
try: | content = [attr.evolve(c, ctime=now) for c in content] | ||||
content = [attr.evolve(SkippedContent.from_dict(c), ctime=now) | |||||
for c in content] | |||||
except (KeyError, TypeError, ValueError) as e: | |||||
raise StorageArgumentException(*e.args) | |||||
return self._skipped_content_add(content) | return self._skipped_content_add(content) | ||||
def directory_add(self, directories): | def directory_add(self, directories: Iterable[Directory]) -> Dict: | ||||
directories = list(directories) | directories = list(directories) | ||||
if self.journal_writer: | if self.journal_writer: | ||||
self.journal_writer.write_additions( | self.journal_writer.write_additions( | ||||
'directory', | 'directory', | ||||
(dir_ for dir_ in directories | (dir_ for dir_ in directories | ||||
if dir_['id'] not in self._directories)) | if dir_.id not in self._directories)) | ||||
try: | |||||
directories = [Directory.from_dict(d) for d in directories] | |||||
except (KeyError, TypeError, ValueError) as e: | |||||
raise StorageArgumentException(*e.args) | |||||
count = 0 | count = 0 | ||||
for directory in directories: | for directory in directories: | ||||
if directory.id not in self._directories: | if directory.id not in self._directories: | ||||
count += 1 | count += 1 | ||||
self._directories[directory.id] = directory | self._directories[directory.id] = directory | ||||
self._objects[directory.id].append( | self._objects[directory.id].append( | ||||
('directory', directory.id)) | ('directory', directory.id)) | ||||
▲ Show 20 Lines • Show All 68 Lines • ▼ Show 20 Lines | def _directory_entry_get_by_path(self, directory, paths, prefix): | ||||
return first_item | return first_item | ||||
if not first_item or first_item['type'] != 'dir': | if not first_item or first_item['type'] != 'dir': | ||||
return | return | ||||
return self._directory_entry_get_by_path( | return self._directory_entry_get_by_path( | ||||
first_item['target'], paths[1:], prefix + paths[0] + b'/') | first_item['target'], paths[1:], prefix + paths[0] + b'/') | ||||
def revision_add(self, revisions): | def revision_add(self, revisions: Iterable[Revision]) -> Dict: | ||||
revisions = list(revisions) | revisions = list(revisions) | ||||
if self.journal_writer: | if self.journal_writer: | ||||
self.journal_writer.write_additions( | self.journal_writer.write_additions( | ||||
'revision', | 'revision', | ||||
(rev for rev in revisions | (rev for rev in revisions | ||||
if rev['id'] not in self._revisions)) | if rev.id not in self._revisions)) | ||||
try: | |||||
revisions = [Revision.from_dict(rev) for rev in revisions] | |||||
except (KeyError, TypeError, ValueError) as e: | |||||
raise StorageArgumentException(*e.args) | |||||
count = 0 | count = 0 | ||||
for revision in revisions: | for revision in revisions: | ||||
if revision.id not in self._revisions: | if revision.id not in self._revisions: | ||||
revision = attr.evolve( | revision = attr.evolve( | ||||
revision, | revision, | ||||
committer=self._person_add(revision.committer), | committer=self._person_add(revision.committer), | ||||
author=self._person_add(revision.author)) | author=self._person_add(revision.author)) | ||||
Show All 33 Lines | class InMemoryStorage: | ||||
def revision_shortlog(self, revisions, limit=None): | def revision_shortlog(self, revisions, limit=None): | ||||
yield from ((rev['id'], rev['parents']) | yield from ((rev['id'], rev['parents']) | ||||
for rev in self.revision_log(revisions, limit)) | for rev in self.revision_log(revisions, limit)) | ||||
def revision_get_random(self): | def revision_get_random(self): | ||||
return random.choice(list(self._revisions)) | return random.choice(list(self._revisions)) | ||||
def release_add(self, releases): | def release_add(self, releases: Iterable[Release]) -> Dict: | ||||
releases = list(releases) | releases = list(releases) | ||||
if self.journal_writer: | if self.journal_writer: | ||||
self.journal_writer.write_additions( | self.journal_writer.write_additions( | ||||
'release', | 'release', | ||||
(rel for rel in releases | (rel for rel in releases | ||||
if rel['id'] not in self._releases)) | if rel.id not in self._releases)) | ||||
try: | |||||
releases = [Release.from_dict(rel) for rel in releases] | |||||
except (KeyError, TypeError, ValueError) as e: | |||||
raise StorageArgumentException(*e.args) | |||||
count = 0 | count = 0 | ||||
for rel in releases: | for rel in releases: | ||||
if rel.id not in self._releases: | if rel.id not in self._releases: | ||||
if rel.author: | if rel.author: | ||||
self._person_add(rel.author) | self._person_add(rel.author) | ||||
self._objects[rel.id].append( | self._objects[rel.id].append( | ||||
('release', rel.id)) | ('release', rel.id)) | ||||
Show All 10 Lines | def release_get(self, releases): | ||||
if rel_id in self._releases: | if rel_id in self._releases: | ||||
yield self._releases[rel_id].to_dict() | yield self._releases[rel_id].to_dict() | ||||
else: | else: | ||||
yield None | yield None | ||||
def release_get_random(self): | def release_get_random(self): | ||||
return random.choice(list(self._releases)) | return random.choice(list(self._releases)) | ||||
def snapshot_add(self, snapshots): | def snapshot_add(self, snapshots: Iterable[Snapshot]) -> Dict: | ||||
count = 0 | count = 0 | ||||
try: | |||||
snapshots = [Snapshot.from_dict(d) for d in snapshots] | |||||
except (KeyError, TypeError, ValueError) as e: | |||||
raise StorageArgumentException(*e.args) | |||||
snapshots = (snap for snap in snapshots | snapshots = (snap for snap in snapshots | ||||
if snap.id not in self._snapshots) | if snap.id not in self._snapshots) | ||||
for snapshot in snapshots: | for snapshot in snapshots: | ||||
if self.journal_writer: | if self.journal_writer: | ||||
self.journal_writer.write_addition('snapshot', snapshot) | self.journal_writer.write_addition('snapshot', snapshot) | ||||
sorted_branch_names = sorted(snapshot.branches) | sorted_branch_names = sorted(snapshot.branches) | ||||
self._snapshots[snapshot.id] = (snapshot, sorted_branch_names) | self._snapshots[snapshot.id] = (snapshot, sorted_branch_names) | ||||
▲ Show 20 Lines • Show All 193 Lines • ▼ Show 20 Lines | def origin_search(self, url_pattern, offset=0, limit=50, | ||||
return origins[offset:offset+limit] | return origins[offset:offset+limit] | ||||
def origin_count(self, url_pattern, regexp=False, with_visit=False): | def origin_count(self, url_pattern, regexp=False, with_visit=False): | ||||
return len(self.origin_search(url_pattern, regexp=regexp, | return len(self.origin_search(url_pattern, regexp=regexp, | ||||
with_visit=with_visit, | with_visit=with_visit, | ||||
limit=len(self._origins))) | limit=len(self._origins))) | ||||
def origin_add(self, origins): | def origin_add(self, origins: Iterable[Origin]) -> List[Dict]: | ||||
origins = copy.deepcopy(list(origins)) | origins = copy.deepcopy(list(origins)) | ||||
for origin in origins: | for origin in origins: | ||||
self.origin_add_one(origin) | self.origin_add_one(origin) | ||||
return origins | return [origin.to_dict() for origin in origins] | ||||
def origin_add_one(self, origin): | def origin_add_one(self, origin: Origin) -> str: | ||||
try: | |||||
origin = Origin.from_dict(origin) | |||||
except (KeyError, TypeError, ValueError) as e: | |||||
raise StorageArgumentException(*e.args) | |||||
if origin.url not in self._origins: | if origin.url not in self._origins: | ||||
if self.journal_writer: | if self.journal_writer: | ||||
self.journal_writer.write_addition('origin', origin) | self.journal_writer.write_addition('origin', origin) | ||||
# generate an origin_id because it is needed by origin_get_range. | # generate an origin_id because it is needed by origin_get_range. | ||||
# TODO: remove this when we remove origin_get_range | # TODO: remove this when we remove origin_get_range | ||||
origin_id = len(self._origins) + 1 | origin_id = len(self._origins) + 1 | ||||
self._origins_by_id.append(origin.url) | self._origins_by_id.append(origin.url) | ||||
assert len(self._origins_by_id) == origin_id | assert len(self._origins_by_id) == origin_id | ||||
self._origins[origin.url] = origin | self._origins[origin.url] = origin | ||||
self._origins_by_sha1[origin_url_to_sha1(origin.url)] = origin | self._origins_by_sha1[origin_url_to_sha1(origin.url)] = origin | ||||
self._origin_visits[origin.url] = [] | self._origin_visits[origin.url] = [] | ||||
self._objects[origin.url].append(('origin', origin.url)) | self._objects[origin.url].append(('origin', origin.url)) | ||||
return origin.url | return origin.url | ||||
def origin_visit_add(self, origin, date, type): | def origin_visit_add( | ||||
self, origin, date, type) -> Optional[Dict[str, Union[str, int]]]: | |||||
origin_url = origin | origin_url = origin | ||||
if origin_url is None: | if origin_url is None: | ||||
raise StorageArgumentException('Unknown origin.') | raise StorageArgumentException('Unknown origin.') | ||||
if isinstance(date, str): | if isinstance(date, str): | ||||
# FIXME: Converge on iso8601 at some point | # FIXME: Converge on iso8601 at some point | ||||
date = dateutil.parser.parse(date) | date = dateutil.parser.parse(date) | ||||
elif not isinstance(date, datetime.datetime): | elif not isinstance(date, datetime.datetime): | ||||
Show All 24 Lines | def origin_visit_add( | ||||
self._objects[(origin_url, visit_id)].append( | self._objects[(origin_url, visit_id)].append( | ||||
('origin_visit', None)) | ('origin_visit', None)) | ||||
if self.journal_writer: | if self.journal_writer: | ||||
self.journal_writer.write_addition('origin_visit', visit) | self.journal_writer.write_addition('origin_visit', visit) | ||||
return visit_ret | return visit_ret | ||||
def origin_visit_update(self, origin, visit_id, status=None, | def origin_visit_update( | ||||
metadata=None, snapshot=None): | self, origin: str, visit_id: int, status: Optional[str] = None, | ||||
if not isinstance(origin, str): | metadata: Optional[Dict] = None, snapshot: Optional[bytes] = None): | ||||
raise TypeError('origin must be a string, not %r' % (origin,)) | |||||
origin_url = self._get_origin_url(origin) | origin_url = self._get_origin_url(origin) | ||||
if origin_url is None: | if origin_url is None: | ||||
raise StorageArgumentException('Unknown origin.') | raise StorageArgumentException('Unknown origin.') | ||||
try: | try: | ||||
visit = self._origin_visits[origin_url][visit_id-1] | visit = self._origin_visits[origin_url][visit_id-1] | ||||
except IndexError: | except IndexError: | ||||
raise StorageArgumentException( | raise StorageArgumentException( | ||||
'Unknown visit_id for this origin') from None | 'Unknown visit_id for this origin') from None | ||||
updates = {} | updates: Dict[str, Any] = {} | ||||
if status: | if status: | ||||
updates['status'] = status | updates['status'] = status | ||||
if metadata: | if metadata: | ||||
updates['metadata'] = metadata | updates['metadata'] = metadata | ||||
if snapshot: | if snapshot: | ||||
updates['snapshot'] = snapshot | updates['snapshot'] = snapshot | ||||
try: | try: | ||||
▲ Show 20 Lines • Show All 252 Lines • Show Last 20 Lines |