Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/in_memory.py
Show All 21 Lines | from swh.model.model import ( | ||||
BaseContent, Content, SkippedContent, Directory, Revision, Release, | BaseContent, Content, SkippedContent, Directory, Revision, Release, | ||||
Snapshot, OriginVisit, Origin, SHA1_SIZE) | Snapshot, OriginVisit, Origin, SHA1_SIZE) | ||||
from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex | from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex | ||||
from swh.objstorage import get_objstorage | from swh.objstorage import get_objstorage | ||||
from swh.objstorage.exc import ObjNotFoundError | from swh.objstorage.exc import ObjNotFoundError | ||||
from . import HashCollision | from . import HashCollision | ||||
from .exc import StorageArgumentException | from .exc import StorageArgumentException | ||||
from .storage import get_journal_writer | |||||
from .converters import origin_url_to_sha1 | from .converters import origin_url_to_sha1 | ||||
from .utils import get_partition_bounds_bytes | from .utils import get_partition_bounds_bytes | ||||
from .writer import JournalWriter | |||||
# Max block size of contents to return | # Max block size of contents to return | ||||
BULK_BLOCK_CONTENT_LEN_MAX = 10000 | BULK_BLOCK_CONTENT_LEN_MAX = 10000 | ||||
def now(): | def now(): | ||||
return datetime.datetime.now(tz=datetime.timezone.utc) | return datetime.datetime.now(tz=datetime.timezone.utc) | ||||
class InMemoryStorage: | class InMemoryStorage: | ||||
def __init__(self, journal_writer=None): | def __init__(self, journal_writer=None): | ||||
self._contents = {} | self._contents = {} | ||||
self._content_indexes = defaultdict(lambda: defaultdict(set)) | self._content_indexes = defaultdict(lambda: defaultdict(set)) | ||||
self._skipped_contents = {} | self._skipped_contents = {} | ||||
self._skipped_content_indexes = defaultdict(lambda: defaultdict(set)) | self._skipped_content_indexes = defaultdict(lambda: defaultdict(set)) | ||||
self.reset() | self.reset() | ||||
self.journal_writer = JournalWriter(journal_writer) | |||||
if journal_writer: | |||||
self.journal_writer = get_journal_writer(**journal_writer) | |||||
else: | |||||
self.journal_writer = None | |||||
def reset(self): | def reset(self): | ||||
self._directories = {} | self._directories = {} | ||||
self._revisions = {} | self._revisions = {} | ||||
self._releases = {} | self._releases = {} | ||||
self._snapshots = {} | self._snapshots = {} | ||||
self._origins = {} | self._origins = {} | ||||
self._origins_by_id = [] | self._origins_by_id = [] | ||||
Show All 16 Lines | class InMemoryStorage: | ||||
def _content_add(self, contents, with_data): | def _content_add(self, contents, with_data): | ||||
for content in contents: | for content in contents: | ||||
if content.status is None: | if content.status is None: | ||||
content.status = 'visible' | content.status = 'visible' | ||||
if content.status == 'absent': | if content.status == 'absent': | ||||
raise StorageArgumentException('content with status=absent') | raise StorageArgumentException('content with status=absent') | ||||
if content.length is None: | if content.length is None: | ||||
raise StorageArgumentException('content with length=None') | raise StorageArgumentException('content with length=None') | ||||
if self.journal_writer: | |||||
for content in contents: | |||||
try: | try: | ||||
content = attr.evolve(content, data=None) | attr.evolve(content, data=None) | ||||
except (KeyError, TypeError, ValueError) as e: | except (KeyError, TypeError, ValueError) as e: | ||||
raise StorageArgumentException(*e.args) | raise StorageArgumentException(*e.args) | ||||
self.journal_writer.write_addition('content', content) | |||||
self.journal_writer.content_add(c.to_dict() for c in contents) | |||||
vlorentz: is the evolve still needed? | |||||
Done Inline ActionsI don't know, i kept the existing behavior. ardumont: I don't know, i kept the existing behavior. | |||||
Done Inline ActionsAnd i thought to address this in your incoming 'validation' diff as well. ardumont: And i thought to address this in your incoming 'validation' diff as well. | |||||
Done Inline Actionsardumont: after* your incoming 'validation' diff D2640
Also, yes that attr is needed to trigger the… | |||||
summary = { | summary = { | ||||
'content:add': 0, | 'content:add': 0, | ||||
} | } | ||||
if with_data: | if with_data: | ||||
summary['content:add:bytes'] = 0 | summary['content:add:bytes'] = 0 | ||||
Show All 32 Lines | def content_add(self, content): | ||||
try: | try: | ||||
content = [attr.evolve(Content.from_dict(c), ctime=now) | content = [attr.evolve(Content.from_dict(c), ctime=now) | ||||
for c in content] | for c in content] | ||||
except (KeyError, TypeError, ValueError) as e: | except (KeyError, TypeError, ValueError) as e: | ||||
raise StorageArgumentException(*e.args) | raise StorageArgumentException(*e.args) | ||||
return self._content_add(content, with_data=True) | return self._content_add(content, with_data=True) | ||||
def content_update(self, content, keys=[]): | def content_update(self, content, keys=[]): | ||||
if self.journal_writer: | self.journal_writer.content_update(content, keys=keys) | ||||
raise NotImplementedError( | |||||
'content_update is not yet supported with a journal_writer.') | |||||
for cont_update in content: | for cont_update in content: | ||||
cont_update = cont_update.copy() | cont_update = cont_update.copy() | ||||
sha1 = cont_update.pop('sha1') | sha1 = cont_update.pop('sha1') | ||||
for old_key in self._content_indexes['sha1'][sha1]: | for old_key in self._content_indexes['sha1'][sha1]: | ||||
old_cont = self._contents.pop(old_key) | old_cont = self._contents.pop(old_key) | ||||
for algorithm in DEFAULT_ALGORITHMS: | for algorithm in DEFAULT_ALGORITHMS: | ||||
▲ Show 20 Lines • Show All 136 Lines • ▼ Show 20 Lines | def _skipped_content_add(self, contents): | ||||
if content.status is None: | if content.status is None: | ||||
content = attr.evolve(content, status='absent') | content = attr.evolve(content, status='absent') | ||||
if content.length is None: | if content.length is None: | ||||
content = attr.evolve(content, length=-1) | content = attr.evolve(content, length=-1) | ||||
if content.status != 'absent': | if content.status != 'absent': | ||||
raise StorageArgumentException( | raise StorageArgumentException( | ||||
f'Content with status={content.status}') | f'Content with status={content.status}') | ||||
if self.journal_writer: | self.journal_writer.skipped_content_add(contents) | ||||
for content in contents: | |||||
self.journal_writer.write_addition('content', content) | |||||
summary = { | summary = { | ||||
'skipped_content:add': 0 | 'skipped_content:add': 0 | ||||
} | } | ||||
skipped_content_missing = self.skipped_content_missing( | skipped_content_missing = self.skipped_content_missing( | ||||
[c.to_dict() for c in contents]) | [c.to_dict() for c in contents]) | ||||
for content in skipped_content_missing: | for content in skipped_content_missing: | ||||
Show All 27 Lines | def skipped_content_add(self, content): | ||||
content = [attr.evolve(SkippedContent.from_dict(c), ctime=now) | content = [attr.evolve(SkippedContent.from_dict(c), ctime=now) | ||||
for c in content] | for c in content] | ||||
except (KeyError, TypeError, ValueError) as e: | except (KeyError, TypeError, ValueError) as e: | ||||
raise StorageArgumentException(*e.args) | raise StorageArgumentException(*e.args) | ||||
return self._skipped_content_add(content) | return self._skipped_content_add(content) | ||||
def directory_add(self, directories): | def directory_add(self, directories): | ||||
directories = list(directories) | directories = list(directories) | ||||
if self.journal_writer: | self.journal_writer.directory_add( | ||||
self.journal_writer.write_additions( | dir_ for dir_ in directories | ||||
'directory', | if dir_['id'] not in self._directories | ||||
(dir_ for dir_ in directories | ) | ||||
if dir_['id'] not in self._directories)) | |||||
Not Done Inline ActionsFilter before the writer... vlorentz: Filter before the writer... | |||||
try: | try: | ||||
directories = [Directory.from_dict(d) for d in directories] | directories = [Directory.from_dict(d) for d in directories] | ||||
except (KeyError, TypeError, ValueError) as e: | except (KeyError, TypeError, ValueError) as e: | ||||
raise StorageArgumentException(*e.args) | raise StorageArgumentException(*e.args) | ||||
count = 0 | count = 0 | ||||
for directory in directories: | for directory in directories: | ||||
if directory.id not in self._directories: | if directory.id not in self._directories: | ||||
Not Done Inline Actions... and then you can remove this (same for other methods below) vlorentz: ... and then you can remove this
(same for other methods below) | |||||
Done Inline ActionsI was aiming at avoiding conflicts from your other incoming diffs... ardumont: I was aiming at avoiding conflicts from your other incoming diffs... | |||||
count += 1 | count += 1 | ||||
self._directories[directory.id] = directory | self._directories[directory.id] = directory | ||||
self._objects[directory.id].append( | self._objects[directory.id].append( | ||||
('directory', directory.id)) | ('directory', directory.id)) | ||||
return {'directory:add': count} | return {'directory:add': count} | ||||
def directory_missing(self, directories): | def directory_missing(self, directories): | ||||
▲ Show 20 Lines • Show All 66 Lines • ▼ Show 20 Lines | def _directory_entry_get_by_path(self, directory, paths, prefix): | ||||
if not first_item or first_item['type'] != 'dir': | if not first_item or first_item['type'] != 'dir': | ||||
return | return | ||||
return self._directory_entry_get_by_path( | return self._directory_entry_get_by_path( | ||||
first_item['target'], paths[1:], prefix + paths[0] + b'/') | first_item['target'], paths[1:], prefix + paths[0] + b'/') | ||||
def revision_add(self, revisions): | def revision_add(self, revisions): | ||||
revisions = list(revisions) | revisions = list(revisions) | ||||
if self.journal_writer: | self.journal_writer.revision_add( | ||||
self.journal_writer.write_additions( | rev for rev in revisions | ||||
'revision', | if rev['id'] not in self._revisions | ||||
(rev for rev in revisions | ) | ||||
if rev['id'] not in self._revisions)) | |||||
try: | try: | ||||
revisions = [Revision.from_dict(rev) for rev in revisions] | revisions = [Revision.from_dict(rev) for rev in revisions] | ||||
except (KeyError, TypeError, ValueError) as e: | except (KeyError, TypeError, ValueError) as e: | ||||
raise StorageArgumentException(*e.args) | raise StorageArgumentException(*e.args) | ||||
count = 0 | count = 0 | ||||
for revision in revisions: | for revision in revisions: | ||||
Show All 40 Lines | def revision_shortlog(self, revisions, limit=None): | ||||
yield from ((rev['id'], rev['parents']) | yield from ((rev['id'], rev['parents']) | ||||
for rev in self.revision_log(revisions, limit)) | for rev in self.revision_log(revisions, limit)) | ||||
def revision_get_random(self): | def revision_get_random(self): | ||||
return random.choice(list(self._revisions)) | return random.choice(list(self._revisions)) | ||||
def release_add(self, releases): | def release_add(self, releases): | ||||
releases = list(releases) | releases = list(releases) | ||||
if self.journal_writer: | self.journal_writer.release_add( | ||||
self.journal_writer.write_additions( | rel for rel in releases | ||||
'release', | if rel['id'] not in self._releases | ||||
(rel for rel in releases | ) | ||||
if rel['id'] not in self._releases)) | |||||
try: | try: | ||||
releases = [Release.from_dict(rel) for rel in releases] | releases = [Release.from_dict(rel) for rel in releases] | ||||
except (KeyError, TypeError, ValueError) as e: | except (KeyError, TypeError, ValueError) as e: | ||||
raise StorageArgumentException(*e.args) | raise StorageArgumentException(*e.args) | ||||
count = 0 | count = 0 | ||||
for rel in releases: | for rel in releases: | ||||
Show All 24 Lines | def snapshot_add(self, snapshots): | ||||
count = 0 | count = 0 | ||||
try: | try: | ||||
snapshots = [Snapshot.from_dict(d) for d in snapshots] | snapshots = [Snapshot.from_dict(d) for d in snapshots] | ||||
except (KeyError, TypeError, ValueError) as e: | except (KeyError, TypeError, ValueError) as e: | ||||
raise StorageArgumentException(*e.args) | raise StorageArgumentException(*e.args) | ||||
snapshots = (snap for snap in snapshots | snapshots = (snap for snap in snapshots | ||||
if snap.id not in self._snapshots) | if snap.id not in self._snapshots) | ||||
for snapshot in snapshots: | for snapshot in snapshots: | ||||
if self.journal_writer: | self.journal_writer.snapshot_add(snapshot.to_dict()) | ||||
self.journal_writer.write_addition('snapshot', snapshot) | |||||
sorted_branch_names = sorted(snapshot.branches) | sorted_branch_names = sorted(snapshot.branches) | ||||
self._snapshots[snapshot.id] = (snapshot, sorted_branch_names) | self._snapshots[snapshot.id] = (snapshot, sorted_branch_names) | ||||
self._objects[snapshot.id].append(('snapshot', snapshot.id)) | self._objects[snapshot.id].append(('snapshot', snapshot.id)) | ||||
count += 1 | count += 1 | ||||
return {'snapshot:add': count} | return {'snapshot:add': count} | ||||
def snapshot_missing(self, snapshots): | def snapshot_missing(self, snapshots): | ||||
▲ Show 20 Lines • Show All 199 Lines • ▼ Show 20 Lines | def origin_add(self, origins): | ||||
return origins | return origins | ||||
def origin_add_one(self, origin): | def origin_add_one(self, origin): | ||||
try: | try: | ||||
origin = Origin.from_dict(origin) | origin = Origin.from_dict(origin) | ||||
except (KeyError, TypeError, ValueError) as e: | except (KeyError, TypeError, ValueError) as e: | ||||
raise StorageArgumentException(*e.args) | raise StorageArgumentException(*e.args) | ||||
if origin.url not in self._origins: | if origin.url not in self._origins: | ||||
if self.journal_writer: | self.journal_writer.origin_add_one(origin.to_dict()) | ||||
self.journal_writer.write_addition('origin', origin) | |||||
# generate an origin_id because it is needed by origin_get_range. | # generate an origin_id because it is needed by origin_get_range. | ||||
# TODO: remove this when we remove origin_get_range | # TODO: remove this when we remove origin_get_range | ||||
origin_id = len(self._origins) + 1 | origin_id = len(self._origins) + 1 | ||||
self._origins_by_id.append(origin.url) | self._origins_by_id.append(origin.url) | ||||
assert len(self._origins_by_id) == origin_id | assert len(self._origins_by_id) == origin_id | ||||
self._origins[origin.url] = origin | self._origins[origin.url] = origin | ||||
self._origins_by_sha1[origin_url_to_sha1(origin.url)] = origin | self._origins_by_sha1[origin_url_to_sha1(origin.url)] = origin | ||||
Show All 33 Lines | def origin_visit_add(self, origin, date, type): | ||||
visit_ret = { | visit_ret = { | ||||
'origin': origin.url, | 'origin': origin.url, | ||||
'visit': visit_id, | 'visit': visit_id, | ||||
} | } | ||||
self._objects[(origin_url, visit_id)].append( | self._objects[(origin_url, visit_id)].append( | ||||
('origin_visit', None)) | ('origin_visit', None)) | ||||
if self.journal_writer: | self.journal_writer.origin_visit_add(visit.to_dict()) | ||||
self.journal_writer.write_addition('origin_visit', visit) | |||||
return visit_ret | return visit_ret | ||||
def origin_visit_update(self, origin, visit_id, status=None, | def origin_visit_update(self, origin, visit_id, status=None, | ||||
metadata=None, snapshot=None): | metadata=None, snapshot=None): | ||||
if not isinstance(origin, str): | if not isinstance(origin, str): | ||||
raise TypeError('origin must be a string, not %r' % (origin,)) | raise TypeError('origin must be a string, not %r' % (origin,)) | ||||
origin_url = self._get_origin_url(origin) | origin_url = self._get_origin_url(origin) | ||||
Show All 14 Lines | def origin_visit_update(self, origin, visit_id, status=None, | ||||
if snapshot: | if snapshot: | ||||
updates['snapshot'] = snapshot | updates['snapshot'] = snapshot | ||||
try: | try: | ||||
visit = attr.evolve(visit, **updates) | visit = attr.evolve(visit, **updates) | ||||
except (KeyError, TypeError, ValueError) as e: | except (KeyError, TypeError, ValueError) as e: | ||||
raise StorageArgumentException(*e.args) | raise StorageArgumentException(*e.args) | ||||
if self.journal_writer: | self.journal_writer.origin_visit_update(visit.to_dict()) | ||||
self.journal_writer.write_update('origin_visit', visit) | |||||
self._origin_visits[origin_url][visit_id-1] = visit | self._origin_visits[origin_url][visit_id-1] = visit | ||||
def origin_visit_upsert(self, visits): | def origin_visit_upsert(self, visits): | ||||
for visit in visits: | for visit in visits: | ||||
if not isinstance(visit['origin'], str): | if not isinstance(visit['origin'], str): | ||||
raise TypeError("visit['origin'] must be a string, not %r" | raise TypeError("visit['origin'] must be a string, not %r" | ||||
% (visit['origin'],)) | % (visit['origin'],)) | ||||
try: | try: | ||||
visits = [OriginVisit.from_dict(d) for d in visits] | visits = [OriginVisit.from_dict(d) for d in visits] | ||||
except (KeyError, TypeError, ValueError) as e: | except (KeyError, TypeError, ValueError) as e: | ||||
raise StorageArgumentException(*e.args) | raise StorageArgumentException(*e.args) | ||||
if self.journal_writer: | self.journal_writer.origin_visit_upsert(v.to_dict() for v in visits) | ||||
for visit in visits: | |||||
self.journal_writer.write_addition('origin_visit', visit) | |||||
for visit in visits: | for visit in visits: | ||||
visit_id = visit.visit | visit_id = visit.visit | ||||
origin_url = visit.origin | origin_url = visit.origin | ||||
try: | try: | ||||
visit = attr.evolve(visit, origin=origin_url) | visit = attr.evolve(visit, origin=origin_url) | ||||
except (KeyError, TypeError, ValueError) as e: | except (KeyError, TypeError, ValueError) as e: | ||||
▲ Show 20 Lines • Show All 222 Lines • Show Last 20 Lines |
is the evolve still needed?