Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/in_memory.py
Show All 13 Lines | |||||
from collections import defaultdict | from collections import defaultdict | ||||
from datetime import timedelta | from datetime import timedelta | ||||
from typing import Any, Dict, List, Optional | from typing import Any, Dict, List, Optional | ||||
import attr | import attr | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
Content, Directory, Revision, Release, Snapshot, OriginVisit, Origin, | BaseContent, Content, SkippedContent, Directory, Revision, Release, | ||||
SHA1_SIZE) | Snapshot, OriginVisit, Origin, SHA1_SIZE) | ||||
from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex | from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex | ||||
from swh.objstorage import get_objstorage | from swh.objstorage import get_objstorage | ||||
from swh.objstorage.exc import ObjNotFoundError | from swh.objstorage.exc import ObjNotFoundError | ||||
from .storage import get_journal_writer | from .storage import get_journal_writer | ||||
from .converters import origin_url_to_sha1 | from .converters import origin_url_to_sha1 | ||||
from .utils import get_partition_bounds_bytes | from .utils import get_partition_bounds_bytes | ||||
Show All 38 Lines | def reset(self): | ||||
self._sorted_sha1s = [] | self._sorted_sha1s = [] | ||||
self.objstorage = get_objstorage('memory', {}) | self.objstorage = get_objstorage('memory', {}) | ||||
def check_config(self, *, check_write): | def check_config(self, *, check_write): | ||||
return True | return True | ||||
def _content_add(self, contents, with_data): | def _content_add(self, contents, with_data): | ||||
content_with_data = [] | |||||
content_without_data = [] | |||||
for content in contents: | for content in contents: | ||||
if content.status is None: | if content.status is None: | ||||
content.status = 'visible' | content.status = 'visible' | ||||
if content.status == 'absent': | |||||
raise ValueError('content with status=absent') | |||||
if content.length is None: | if content.length is None: | ||||
content.length = -1 | raise ValueError('content with length=None') | ||||
if content.status != 'absent': | |||||
if self._content_key(content) not in self._contents: | |||||
content_with_data.append(content) | |||||
else: | |||||
if self._content_key(content) not in self._skipped_contents: | |||||
content_without_data.append(content) | |||||
if self.journal_writer: | if self.journal_writer: | ||||
for content in content_with_data: | for content in contents: | ||||
content = attr.evolve(content, data=None) | content = attr.evolve(content, data=None) | ||||
self.journal_writer.write_addition('content', content) | self.journal_writer.write_addition('content', content) | ||||
for content in content_without_data: | |||||
self.journal_writer.write_addition('content', content) | |||||
count_content_added, count_content_bytes_added = \ | |||||
self._content_add_present(content_with_data, with_data) | |||||
count_skipped_content_added = self._content_add_absent( | |||||
content_without_data | |||||
) | |||||
summary = { | summary = { | ||||
'content:add': count_content_added, | 'content:add': 0, | ||||
'skipped_content:add': count_skipped_content_added, | |||||
} | } | ||||
if with_data: | if with_data: | ||||
summary['content:add:bytes'] = count_content_bytes_added | summary['content:add:bytes'] = 0 | ||||
return summary | |||||
def _content_add_present(self, contents, with_data): | |||||
count_content_added = 0 | |||||
count_content_bytes_added = 0 | |||||
for content in contents: | for content in contents: | ||||
key = self._content_key(content) | key = self._content_key(content) | ||||
if key in self._contents: | if key in self._contents: | ||||
continue | continue | ||||
for algorithm in DEFAULT_ALGORITHMS: | for algorithm in DEFAULT_ALGORITHMS: | ||||
hash_ = content.get_hash(algorithm) | hash_ = content.get_hash(algorithm) | ||||
if hash_ in self._content_indexes[algorithm]\ | if hash_ in self._content_indexes[algorithm]\ | ||||
and (algorithm not in {'blake2s256', 'sha256'}): | and (algorithm not in {'blake2s256', 'sha256'}): | ||||
from . import HashCollision | from . import HashCollision | ||||
raise HashCollision(algorithm, hash_, key) | raise HashCollision(algorithm, hash_, key) | ||||
for algorithm in DEFAULT_ALGORITHMS: | for algorithm in DEFAULT_ALGORITHMS: | ||||
hash_ = content.get_hash(algorithm) | hash_ = content.get_hash(algorithm) | ||||
self._content_indexes[algorithm][hash_].add(key) | self._content_indexes[algorithm][hash_].add(key) | ||||
self._objects[content.sha1_git].append( | self._objects[content.sha1_git].append( | ||||
('content', content.sha1)) | ('content', content.sha1)) | ||||
self._contents[key] = content | self._contents[key] = content | ||||
bisect.insort(self._sorted_sha1s, content.sha1) | bisect.insort(self._sorted_sha1s, content.sha1) | ||||
count_content_added += 1 | summary['content:add'] += 1 | ||||
if with_data: | if with_data: | ||||
content_data = self._contents[key].data | content_data = self._contents[key].data | ||||
self._contents[key] = attr.evolve( | self._contents[key] = attr.evolve( | ||||
self._contents[key], | self._contents[key], | ||||
data=None) | data=None) | ||||
count_content_bytes_added += len(content_data) | summary['content:add:bytes'] += len(content_data) | ||||
self.objstorage.add(content_data, content.sha1) | self.objstorage.add(content_data, content.sha1) | ||||
return (count_content_added, count_content_bytes_added) | return summary | ||||
def _content_add_absent(self, contents): | |||||
count = 0 | |||||
skipped_content_missing = self.skipped_content_missing(contents) | |||||
for content in skipped_content_missing: | |||||
key = self._content_key(content) | |||||
for algo in DEFAULT_ALGORITHMS: | |||||
self._skipped_content_indexes[algo][content.get_hash(algo)] \ | |||||
.add(key) | |||||
self._skipped_contents[key] = content | |||||
count += 1 | |||||
return count | |||||
def _content_to_model(self, contents): | |||||
for content in contents: | |||||
content = content.copy() | |||||
content.pop('origin', None) | |||||
yield Content.from_dict(content) | |||||
def content_add(self, content): | def content_add(self, content): | ||||
now = datetime.datetime.now(tz=datetime.timezone.utc) | now = datetime.datetime.now(tz=datetime.timezone.utc) | ||||
content = [attr.evolve(c, ctime=now) | content = [attr.evolve(Content.from_dict(c), ctime=now) | ||||
for c in self._content_to_model(content)] | for c in content] | ||||
return self._content_add(content, with_data=True) | return self._content_add(content, with_data=True) | ||||
def content_update(self, content, keys=[]): | def content_update(self, content, keys=[]): | ||||
if self.journal_writer: | if self.journal_writer: | ||||
raise NotImplementedError( | raise NotImplementedError( | ||||
'content_update is not yet supported with a journal_writer.') | 'content_update is not yet supported with a journal_writer.') | ||||
for cont_update in content: | for cont_update in content: | ||||
Show All 11 Lines | def content_update(self, content, keys=[]): | ||||
self._contents[new_key] = new_cont | self._contents[new_key] = new_cont | ||||
for algorithm in DEFAULT_ALGORITHMS: | for algorithm in DEFAULT_ALGORITHMS: | ||||
hash_ = new_cont.get_hash(algorithm) | hash_ = new_cont.get_hash(algorithm) | ||||
self._content_indexes[algorithm][hash_].add(new_key) | self._content_indexes[algorithm][hash_].add(new_key) | ||||
def content_add_metadata(self, content): | def content_add_metadata(self, content): | ||||
content = list(self._content_to_model(content)) | content = [Content.from_dict(c) for c in content] | ||||
return self._content_add(content, with_data=False) | return self._content_add(content, with_data=False) | ||||
def content_get(self, content): | def content_get(self, content): | ||||
# FIXME: Make this method support slicing the `data`. | # FIXME: Make this method support slicing the `data`. | ||||
if len(content) > BULK_BLOCK_CONTENT_LEN_MAX: | if len(content) > BULK_BLOCK_CONTENT_LEN_MAX: | ||||
raise ValueError( | raise ValueError( | ||||
"Sending at most %s contents." % BULK_BLOCK_CONTENT_LEN_MAX) | "Sending at most %s contents." % BULK_BLOCK_CONTENT_LEN_MAX) | ||||
for obj_id in content: | for obj_id in content: | ||||
▲ Show 20 Lines • Show All 97 Lines • ▼ Show 20 Lines | def content_missing_per_sha1(self, contents): | ||||
if content not in self._content_indexes['sha1']: | if content not in self._content_indexes['sha1']: | ||||
yield content | yield content | ||||
def content_missing_per_sha1_git(self, contents): | def content_missing_per_sha1_git(self, contents): | ||||
for content in contents: | for content in contents: | ||||
if content not in self._content_indexes['sha1_git']: | if content not in self._content_indexes['sha1_git']: | ||||
yield content | yield content | ||||
def content_get_random(self): | |||||
return random.choice(list(self._content_indexes['sha1_git'])) | |||||
def _skipped_content_add(self, contents): | |||||
for content in contents: | |||||
if content.status is None: | |||||
content = attr.evolve(content, status='absent') | |||||
if content.length is None: | |||||
content = attr.evolve(content, length=-1) | |||||
if content.status != 'absent': | |||||
raise ValueError(f'Content with status={content.status}') | |||||
if self.journal_writer: | |||||
for content in contents: | |||||
self.journal_writer.write_addition('content', content) | |||||
summary = { | |||||
'skipped_content:add': 0 | |||||
} | |||||
skipped_content_missing = self.skipped_content_missing( | |||||
[c.to_dict() for c in contents]) | |||||
for content in skipped_content_missing: | |||||
ardumont: why not directly iterate over skipped_content_missing (removing the explicit call to list in… | |||||
Done Inline Actionsprobably because I wanted to print it while debugging vlorentz: probably because I wanted to print it while debugging | |||||
Not Done Inline Actionsheh, happens too me as well. ardumont: heh, happens too me as well. | |||||
key = self._content_key(content, allow_missing=True) | |||||
for algo in DEFAULT_ALGORITHMS: | |||||
if algo in content: | |||||
self._skipped_content_indexes[algo][content[algo]] \ | |||||
.add(key) | |||||
self._skipped_contents[key] = content | |||||
summary['skipped_content:add'] += 1 | |||||
return summary | |||||
def skipped_content_missing(self, contents): | def skipped_content_missing(self, contents): | ||||
for content in contents: | for content in contents: | ||||
for (key, algorithm) in self._content_key_algorithm(content): | for (key, algorithm) in self._content_key_algorithm(content): | ||||
if algorithm == 'blake2s256': | if algorithm == 'blake2s256': | ||||
continue | continue | ||||
if key not in self._skipped_content_indexes[algorithm]: | if key not in self._skipped_content_indexes[algorithm]: | ||||
# index must contain hashes of algos except blake2s256 | # index must contain hashes of algos except blake2s256 | ||||
# else the content is considered skipped | # else the content is considered skipped | ||||
yield content | yield {algo: content[algo] | ||||
for algo in DEFAULT_ALGORITHMS | |||||
if content[algo] is not None} | |||||
break | break | ||||
def content_get_random(self): | def skipped_content_add(self, content): | ||||
return random.choice(list(self._content_indexes['sha1_git'])) | content = list(content) | ||||
now = datetime.datetime.now(tz=datetime.timezone.utc) | |||||
content = [attr.evolve(SkippedContent.from_dict(c), ctime=now) | |||||
for c in content] | |||||
return self._skipped_content_add(content) | |||||
def directory_add(self, directories): | def directory_add(self, directories): | ||||
directories = list(directories) | directories = list(directories) | ||||
if self.journal_writer: | if self.journal_writer: | ||||
self.journal_writer.write_additions( | self.journal_writer.write_additions( | ||||
'directory', | 'directory', | ||||
(dir_ for dir_ in directories | (dir_ for dir_ in directories | ||||
if dir_['id'] not in self._directories)) | if dir_['id'] not in self._directories)) | ||||
▲ Show 20 Lines • Show All 684 Lines • ▼ Show 20 Lines | def _person_add(self, person): | ||||
self._persons.append(person) | self._persons.append(person) | ||||
self._objects[key].append(('person', person_id)) | self._objects[key].append(('person', person_id)) | ||||
else: | else: | ||||
person_id = self._objects[key][0][1] | person_id = self._objects[key][0][1] | ||||
person = self._persons[person_id-1] | person = self._persons[person_id-1] | ||||
return person | return person | ||||
@staticmethod | @staticmethod | ||||
def _content_key(content): | def _content_key(content, allow_missing=False): | ||||
"""A stable key for a content""" | """A stable key for a content""" | ||||
return tuple(getattr(content, key) | return tuple(getattr(content, key, None) | ||||
for key in sorted(DEFAULT_ALGORITHMS)) | for key in sorted(DEFAULT_ALGORITHMS)) | ||||
@staticmethod | @staticmethod | ||||
def _content_key_algorithm(content): | def _content_key_algorithm(content): | ||||
""" A stable key and the algorithm for a content""" | """ A stable key and the algorithm for a content""" | ||||
if isinstance(content, Content): | if isinstance(content, BaseContent): | ||||
content = content.to_dict() | content = content.to_dict() | ||||
return tuple((content.get(key), key) | return tuple((content.get(key), key) | ||||
for key in sorted(DEFAULT_ALGORITHMS)) | for key in sorted(DEFAULT_ALGORITHMS)) | ||||
@staticmethod | @staticmethod | ||||
def _tool_key(tool): | def _tool_key(tool): | ||||
return '%r %r %r' % (tool['name'], tool['version'], | return '%r %r %r' % (tool['name'], tool['version'], | ||||
tuple(sorted(tool['configuration'].items()))) | tuple(sorted(tool['configuration'].items()))) | ||||
Show All 13 Lines |
why not directly iterate over skipped_content_missing (removing the explicit call to list in previous line)?