Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/in_memory.py
Show All 20 Lines | |||||
from swh.model.model import ( | from swh.model.model import ( | ||||
BaseContent, Content, SkippedContent, Directory, Revision, Release, | BaseContent, Content, SkippedContent, Directory, Revision, Release, | ||||
Snapshot, OriginVisit, Origin, SHA1_SIZE) | Snapshot, OriginVisit, Origin, SHA1_SIZE) | ||||
from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex | from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex | ||||
from swh.storage.objstorage import ObjStorage | from swh.storage.objstorage import ObjStorage | ||||
from . import HashCollision | from . import HashCollision | ||||
from .exc import StorageArgumentException | from .exc import StorageArgumentException | ||||
from .storage import get_journal_writer | |||||
from .converters import origin_url_to_sha1 | from .converters import origin_url_to_sha1 | ||||
from .utils import get_partition_bounds_bytes | from .utils import get_partition_bounds_bytes | ||||
from .writer import JournalWriter | |||||
# Max block size of contents to return | # Max block size of contents to return | ||||
BULK_BLOCK_CONTENT_LEN_MAX = 10000 | BULK_BLOCK_CONTENT_LEN_MAX = 10000 | ||||
def now(): | def now(): | ||||
return datetime.datetime.now(tz=datetime.timezone.utc) | return datetime.datetime.now(tz=datetime.timezone.utc) | ||||
class InMemoryStorage: | class InMemoryStorage: | ||||
def __init__(self, journal_writer=None): | def __init__(self, journal_writer=None): | ||||
self._contents = {} | self._contents = {} | ||||
self._content_indexes = defaultdict(lambda: defaultdict(set)) | self._content_indexes = defaultdict(lambda: defaultdict(set)) | ||||
self._skipped_contents = {} | self._skipped_contents = {} | ||||
self._skipped_content_indexes = defaultdict(lambda: defaultdict(set)) | self._skipped_content_indexes = defaultdict(lambda: defaultdict(set)) | ||||
self.reset() | self.reset() | ||||
self.journal_writer = JournalWriter(journal_writer) | |||||
if journal_writer: | |||||
self.journal_writer = get_journal_writer(**journal_writer) | |||||
else: | |||||
self.journal_writer = None | |||||
def reset(self): | def reset(self): | ||||
self._directories = {} | self._directories = {} | ||||
self._revisions = {} | self._revisions = {} | ||||
self._releases = {} | self._releases = {} | ||||
self._snapshots = {} | self._snapshots = {} | ||||
self._origins = {} | self._origins = {} | ||||
self._origins_by_id = [] | self._origins_by_id = [] | ||||
Show All 10 Lines | def reset(self): | ||||
self.objstorage = ObjStorage({'cls': 'memory', 'args': {}}) | self.objstorage = ObjStorage({'cls': 'memory', 'args': {}}) | ||||
def check_config(self, *, check_write): | def check_config(self, *, check_write): | ||||
return True | return True | ||||
def _content_add( | def _content_add( | ||||
self, contents: Iterable[Content], with_data: bool) -> Dict: | self, contents: Iterable[Content], with_data: bool) -> Dict: | ||||
if self.journal_writer: | self.journal_writer.content_add(contents) | ||||
vlorentz: is the evolve still needed? | |||||
Done Inline ActionsI don't know, i kept the existing behavior. ardumont: I don't know, i kept the existing behavior. | |||||
Done Inline ActionsAnd i thought to address this in your incoming 'validation' diff as well. ardumont: And i thought to address this in your incoming 'validation' diff as well. | |||||
Done Inline Actionsardumont: after* your incoming 'validation' diff D2640
Also, yes that attr is needed to trigger the… | |||||
for content in contents: | |||||
content = attr.evolve(content, data=None) | |||||
self.journal_writer.write_addition('content', content) | |||||
content_add = 0 | content_add = 0 | ||||
content_add_bytes = 0 | content_add_bytes = 0 | ||||
if with_data: | if with_data: | ||||
summary = self.objstorage.content_add( | summary = self.objstorage.content_add( | ||||
c for c in contents | c for c in contents | ||||
if c.status != 'absent') | if c.status != 'absent') | ||||
content_add_bytes = summary['content:add:bytes'] | content_add_bytes = summary['content:add:bytes'] | ||||
Show All 28 Lines | def _content_add( | ||||
return summary | return summary | ||||
def content_add(self, content: Iterable[Content]) -> Dict: | def content_add(self, content: Iterable[Content]) -> Dict: | ||||
now = datetime.datetime.now(tz=datetime.timezone.utc) | now = datetime.datetime.now(tz=datetime.timezone.utc) | ||||
content = [attr.evolve(c, ctime=now) for c in content] | content = [attr.evolve(c, ctime=now) for c in content] | ||||
return self._content_add(content, with_data=True) | return self._content_add(content, with_data=True) | ||||
def content_update(self, content, keys=[]): | def content_update(self, content, keys=[]): | ||||
if self.journal_writer: | self.journal_writer.content_update(content) | ||||
raise NotImplementedError( | |||||
'content_update is not yet supported with a journal_writer.') | |||||
for cont_update in content: | for cont_update in content: | ||||
cont_update = cont_update.copy() | cont_update = cont_update.copy() | ||||
sha1 = cont_update.pop('sha1') | sha1 = cont_update.pop('sha1') | ||||
for old_key in self._content_indexes['sha1'][sha1]: | for old_key in self._content_indexes['sha1'][sha1]: | ||||
old_cont = self._contents.pop(old_key) | old_cont = self._contents.pop(old_key) | ||||
for algorithm in DEFAULT_ALGORITHMS: | for algorithm in DEFAULT_ALGORITHMS: | ||||
▲ Show 20 Lines • Show All 116 Lines • ▼ Show 20 Lines | def content_missing_per_sha1_git(self, contents): | ||||
for content in contents: | for content in contents: | ||||
if content not in self._content_indexes['sha1_git']: | if content not in self._content_indexes['sha1_git']: | ||||
yield content | yield content | ||||
def content_get_random(self): | def content_get_random(self): | ||||
return random.choice(list(self._content_indexes['sha1_git'])) | return random.choice(list(self._content_indexes['sha1_git'])) | ||||
def _skipped_content_add(self, contents: Iterable[SkippedContent]) -> Dict: | def _skipped_content_add(self, contents: Iterable[SkippedContent]) -> Dict: | ||||
if self.journal_writer: | self.journal_writer.skipped_content_add(contents) | ||||
for cont in contents: | |||||
self.journal_writer.write_addition('content', cont) | |||||
summary = { | summary = { | ||||
'skipped_content:add': 0 | 'skipped_content:add': 0 | ||||
} | } | ||||
skipped_content_missing = self.skipped_content_missing( | skipped_content_missing = self.skipped_content_missing( | ||||
[c.to_dict() for c in contents]) | [c.to_dict() for c in contents]) | ||||
for content in skipped_content_missing: | for content in skipped_content_missing: | ||||
Show All 21 Lines | def skipped_content_missing(self, contents): | ||||
break | break | ||||
def skipped_content_add(self, content: Iterable[SkippedContent]) -> Dict: | def skipped_content_add(self, content: Iterable[SkippedContent]) -> Dict: | ||||
content = list(content) | content = list(content) | ||||
now = datetime.datetime.now(tz=datetime.timezone.utc) | now = datetime.datetime.now(tz=datetime.timezone.utc) | ||||
content = [attr.evolve(c, ctime=now) for c in content] | content = [attr.evolve(c, ctime=now) for c in content] | ||||
return self._skipped_content_add(content) | return self._skipped_content_add(content) | ||||
def directory_add(self, directories: Iterable[Directory]) -> Dict: | def directory_add(self, directories: Iterable[Directory]) -> Dict: | ||||
directories = list(directories) | directories = [dir_ for dir_ in directories | ||||
if self.journal_writer: | if dir_.id not in self._directories] | ||||
self.journal_writer.write_additions( | self.journal_writer.directory_add(directories) | ||||
Not Done Inline ActionsFilter before the writer... vlorentz: Filter before the writer... | |||||
'directory', | |||||
(dir_ for dir_ in directories | |||||
if dir_.id not in self._directories)) | |||||
count = 0 | count = 0 | ||||
for directory in directories: | for directory in directories: | ||||
if directory.id not in self._directories: | |||||
count += 1 | count += 1 | ||||
Not Done Inline Actions... and then you can remove this (same for other methods below) vlorentz: ... and then you can remove this
(same for other methods below) | |||||
Done Inline ActionsI was aiming at avoiding conflicts from your other incoming diffs... ardumont: I was aiming at avoiding conflicts from your other incoming diffs... | |||||
self._directories[directory.id] = directory | self._directories[directory.id] = directory | ||||
self._objects[directory.id].append( | self._objects[directory.id].append( | ||||
('directory', directory.id)) | ('directory', directory.id)) | ||||
return {'directory:add': count} | return {'directory:add': count} | ||||
def directory_missing(self, directories): | def directory_missing(self, directories): | ||||
for id in directories: | for id in directories: | ||||
if id not in self._directories: | if id not in self._directories: | ||||
yield id | yield id | ||||
▲ Show 20 Lines • Show All 61 Lines • ▼ Show 20 Lines | def _directory_entry_get_by_path(self, directory, paths, prefix): | ||||
if not first_item or first_item['type'] != 'dir': | if not first_item or first_item['type'] != 'dir': | ||||
return | return | ||||
return self._directory_entry_get_by_path( | return self._directory_entry_get_by_path( | ||||
first_item['target'], paths[1:], prefix + paths[0] + b'/') | first_item['target'], paths[1:], prefix + paths[0] + b'/') | ||||
def revision_add(self, revisions: Iterable[Revision]) -> Dict: | def revision_add(self, revisions: Iterable[Revision]) -> Dict: | ||||
revisions = list(revisions) | revisions = [rev for rev in revisions | ||||
if self.journal_writer: | if rev.id not in self._revisions] | ||||
self.journal_writer.write_additions( | self.journal_writer.revision_add(revisions) | ||||
'revision', | |||||
(rev for rev in revisions | |||||
if rev.id not in self._revisions)) | |||||
count = 0 | count = 0 | ||||
for revision in revisions: | for revision in revisions: | ||||
if revision.id not in self._revisions: | |||||
revision = attr.evolve( | revision = attr.evolve( | ||||
revision, | revision, | ||||
committer=self._person_add(revision.committer), | committer=self._person_add(revision.committer), | ||||
author=self._person_add(revision.author)) | author=self._person_add(revision.author)) | ||||
self._revisions[revision.id] = revision | self._revisions[revision.id] = revision | ||||
self._objects[revision.id].append( | self._objects[revision.id].append( | ||||
('revision', revision.id)) | ('revision', revision.id)) | ||||
count += 1 | count += 1 | ||||
return {'revision:add': count} | return {'revision:add': count} | ||||
def revision_missing(self, revisions): | def revision_missing(self, revisions): | ||||
for id in revisions: | for id in revisions: | ||||
if id not in self._revisions: | if id not in self._revisions: | ||||
yield id | yield id | ||||
Show All 22 Lines | class InMemoryStorage: | ||||
def revision_shortlog(self, revisions, limit=None): | def revision_shortlog(self, revisions, limit=None): | ||||
yield from ((rev['id'], rev['parents']) | yield from ((rev['id'], rev['parents']) | ||||
for rev in self.revision_log(revisions, limit)) | for rev in self.revision_log(revisions, limit)) | ||||
def revision_get_random(self): | def revision_get_random(self): | ||||
return random.choice(list(self._revisions)) | return random.choice(list(self._revisions)) | ||||
def release_add(self, releases: Iterable[Release]) -> Dict: | def release_add(self, releases: Iterable[Release]) -> Dict: | ||||
releases = list(releases) | releases = [rel for rel in releases | ||||
if self.journal_writer: | if rel.id not in self._releases] | ||||
self.journal_writer.write_additions( | self.journal_writer.release_add(releases) | ||||
'release', | |||||
(rel for rel in releases | |||||
if rel.id not in self._releases)) | |||||
count = 0 | count = 0 | ||||
for rel in releases: | for rel in releases: | ||||
if rel.id not in self._releases: | |||||
if rel.author: | if rel.author: | ||||
self._person_add(rel.author) | self._person_add(rel.author) | ||||
self._objects[rel.id].append( | self._objects[rel.id].append( | ||||
('release', rel.id)) | ('release', rel.id)) | ||||
self._releases[rel.id] = rel | self._releases[rel.id] = rel | ||||
count += 1 | count += 1 | ||||
return {'release:add': count} | return {'release:add': count} | ||||
def release_missing(self, releases): | def release_missing(self, releases): | ||||
yield from (rel for rel in releases if rel not in self._releases) | yield from (rel for rel in releases if rel not in self._releases) | ||||
def release_get(self, releases): | def release_get(self, releases): | ||||
for rel_id in releases: | for rel_id in releases: | ||||
if rel_id in self._releases: | if rel_id in self._releases: | ||||
yield self._releases[rel_id].to_dict() | yield self._releases[rel_id].to_dict() | ||||
else: | else: | ||||
yield None | yield None | ||||
def release_get_random(self): | def release_get_random(self): | ||||
return random.choice(list(self._releases)) | return random.choice(list(self._releases)) | ||||
def snapshot_add(self, snapshots: Iterable[Snapshot]) -> Dict: | def snapshot_add(self, snapshots: Iterable[Snapshot]) -> Dict: | ||||
count = 0 | count = 0 | ||||
snapshots = (snap for snap in snapshots | snapshots = (snap for snap in snapshots | ||||
if snap.id not in self._snapshots) | if snap.id not in self._snapshots) | ||||
for snapshot in snapshots: | for snapshot in snapshots: | ||||
if self.journal_writer: | self.journal_writer.snapshot_add(snapshot) | ||||
self.journal_writer.write_addition('snapshot', snapshot) | |||||
sorted_branch_names = sorted(snapshot.branches) | sorted_branch_names = sorted(snapshot.branches) | ||||
self._snapshots[snapshot.id] = (snapshot, sorted_branch_names) | self._snapshots[snapshot.id] = (snapshot, sorted_branch_names) | ||||
self._objects[snapshot.id].append(('snapshot', snapshot.id)) | self._objects[snapshot.id].append(('snapshot', snapshot.id)) | ||||
count += 1 | count += 1 | ||||
return {'snapshot:add': count} | return {'snapshot:add': count} | ||||
def snapshot_missing(self, snapshots): | def snapshot_missing(self, snapshots): | ||||
▲ Show 20 Lines • Show All 195 Lines • ▼ Show 20 Lines | class InMemoryStorage: | ||||
def origin_add(self, origins: Iterable[Origin]) -> List[Dict]: | def origin_add(self, origins: Iterable[Origin]) -> List[Dict]: | ||||
origins = copy.deepcopy(list(origins)) | origins = copy.deepcopy(list(origins)) | ||||
for origin in origins: | for origin in origins: | ||||
self.origin_add_one(origin) | self.origin_add_one(origin) | ||||
return [origin.to_dict() for origin in origins] | return [origin.to_dict() for origin in origins] | ||||
def origin_add_one(self, origin: Origin) -> str: | def origin_add_one(self, origin: Origin) -> str: | ||||
if origin.url not in self._origins: | if origin.url not in self._origins: | ||||
if self.journal_writer: | self.journal_writer.origin_add_one(origin) | ||||
self.journal_writer.write_addition('origin', origin) | |||||
# generate an origin_id because it is needed by origin_get_range. | # generate an origin_id because it is needed by origin_get_range. | ||||
# TODO: remove this when we remove origin_get_range | # TODO: remove this when we remove origin_get_range | ||||
origin_id = len(self._origins) + 1 | origin_id = len(self._origins) + 1 | ||||
self._origins_by_id.append(origin.url) | self._origins_by_id.append(origin.url) | ||||
assert len(self._origins_by_id) == origin_id | assert len(self._origins_by_id) == origin_id | ||||
self._origins[origin.url] = origin | self._origins[origin.url] = origin | ||||
self._origins_by_sha1[origin_url_to_sha1(origin.url)] = origin | self._origins_by_sha1[origin_url_to_sha1(origin.url)] = origin | ||||
Show All 34 Lines | def origin_visit_add( | ||||
visit_ret = { | visit_ret = { | ||||
'origin': origin.url, | 'origin': origin.url, | ||||
'visit': visit_id, | 'visit': visit_id, | ||||
} | } | ||||
self._objects[(origin_url, visit_id)].append( | self._objects[(origin_url, visit_id)].append( | ||||
('origin_visit', None)) | ('origin_visit', None)) | ||||
if self.journal_writer: | self.journal_writer.origin_visit_add(visit) | ||||
self.journal_writer.write_addition('origin_visit', visit) | |||||
return visit_ret | return visit_ret | ||||
def origin_visit_update( | def origin_visit_update( | ||||
self, origin: str, visit_id: int, status: Optional[str] = None, | self, origin: str, visit_id: int, status: Optional[str] = None, | ||||
metadata: Optional[Dict] = None, snapshot: Optional[bytes] = None): | metadata: Optional[Dict] = None, snapshot: Optional[bytes] = None): | ||||
origin_url = self._get_origin_url(origin) | origin_url = self._get_origin_url(origin) | ||||
if origin_url is None: | if origin_url is None: | ||||
Show All 13 Lines | def origin_visit_update( | ||||
if snapshot: | if snapshot: | ||||
updates['snapshot'] = snapshot | updates['snapshot'] = snapshot | ||||
try: | try: | ||||
visit = attr.evolve(visit, **updates) | visit = attr.evolve(visit, **updates) | ||||
except (KeyError, TypeError, ValueError) as e: | except (KeyError, TypeError, ValueError) as e: | ||||
raise StorageArgumentException(*e.args) | raise StorageArgumentException(*e.args) | ||||
if self.journal_writer: | self.journal_writer.origin_visit_update(visit) | ||||
self.journal_writer.write_update('origin_visit', visit) | |||||
self._origin_visits[origin_url][visit_id-1] = visit | self._origin_visits[origin_url][visit_id-1] = visit | ||||
def origin_visit_upsert(self, visits): | def origin_visit_upsert(self, visits): | ||||
for visit in visits: | for visit in visits: | ||||
if not isinstance(visit['origin'], str): | if not isinstance(visit['origin'], str): | ||||
raise TypeError("visit['origin'] must be a string, not %r" | raise TypeError("visit['origin'] must be a string, not %r" | ||||
% (visit['origin'],)) | % (visit['origin'],)) | ||||
try: | try: | ||||
visits = [OriginVisit.from_dict(d) for d in visits] | visits = [OriginVisit.from_dict(d) for d in visits] | ||||
except (KeyError, TypeError, ValueError) as e: | except (KeyError, TypeError, ValueError) as e: | ||||
raise StorageArgumentException(*e.args) | raise StorageArgumentException(*e.args) | ||||
if self.journal_writer: | self.journal_writer.origin_visit_upsert(visits) | ||||
for visit in visits: | |||||
self.journal_writer.write_addition('origin_visit', visit) | |||||
for visit in visits: | for visit in visits: | ||||
visit_id = visit.visit | visit_id = visit.visit | ||||
origin_url = visit.origin | origin_url = visit.origin | ||||
try: | try: | ||||
visit = attr.evolve(visit, origin=origin_url) | visit = attr.evolve(visit, origin=origin_url) | ||||
except (KeyError, TypeError, ValueError) as e: | except (KeyError, TypeError, ValueError) as e: | ||||
▲ Show 20 Lines • Show All 222 Lines • Show Last 20 Lines |
is the evolve still needed?