Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/in_memory.py
Show All 19 Lines | |||||
from swh.model.model import ( | from swh.model.model import ( | ||||
BaseContent, Content, SkippedContent, Directory, Revision, Release, | BaseContent, Content, SkippedContent, Directory, Revision, Release, | ||||
Snapshot, OriginVisit, Origin, SHA1_SIZE) | Snapshot, OriginVisit, Origin, SHA1_SIZE) | ||||
from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex | from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex | ||||
from swh.objstorage import get_objstorage | from swh.objstorage import get_objstorage | ||||
from swh.objstorage.exc import ObjNotFoundError | from swh.objstorage.exc import ObjNotFoundError | ||||
from . import HashCollision | |||||
from .exc import StorageArgumentException | |||||
from .storage import get_journal_writer | from .storage import get_journal_writer | ||||
from .converters import origin_url_to_sha1 | from .converters import origin_url_to_sha1 | ||||
from .utils import get_partition_bounds_bytes | from .utils import get_partition_bounds_bytes | ||||
# Max block size of contents to return | # Max block size of contents to return | ||||
BULK_BLOCK_CONTENT_LEN_MAX = 10000 | BULK_BLOCK_CONTENT_LEN_MAX = 10000 | ||||
Show All 38 Lines | class InMemoryStorage: | ||||
def check_config(self, *, check_write): | def check_config(self, *, check_write): | ||||
return True | return True | ||||
def _content_add(self, contents, with_data): | def _content_add(self, contents, with_data): | ||||
for content in contents: | for content in contents: | ||||
if content.status is None: | if content.status is None: | ||||
content.status = 'visible' | content.status = 'visible' | ||||
if content.status == 'absent': | if content.status == 'absent': | ||||
raise ValueError('content with status=absent') | raise StorageArgumentException('content with status=absent') | ||||
if content.length is None: | if content.length is None: | ||||
raise ValueError('content with length=None') | raise StorageArgumentException('content with length=None') | ||||
if self.journal_writer: | if self.journal_writer: | ||||
for content in contents: | for content in contents: | ||||
try: | |||||
content = attr.evolve(content, data=None) | content = attr.evolve(content, data=None) | ||||
except (KeyError, TypeError, ValueError) as e: | |||||
raise StorageArgumentException(*e.args) | |||||
self.journal_writer.write_addition('content', content) | self.journal_writer.write_addition('content', content) | ||||
summary = { | summary = { | ||||
'content:add': 0, | 'content:add': 0, | ||||
} | } | ||||
if with_data: | if with_data: | ||||
summary['content:add:bytes'] = 0 | summary['content:add:bytes'] = 0 | ||||
for content in contents: | for content in contents: | ||||
key = self._content_key(content) | key = self._content_key(content) | ||||
if key in self._contents: | if key in self._contents: | ||||
continue | continue | ||||
for algorithm in DEFAULT_ALGORITHMS: | for algorithm in DEFAULT_ALGORITHMS: | ||||
hash_ = content.get_hash(algorithm) | hash_ = content.get_hash(algorithm) | ||||
if hash_ in self._content_indexes[algorithm]\ | if hash_ in self._content_indexes[algorithm]\ | ||||
and (algorithm not in {'blake2s256', 'sha256'}): | and (algorithm not in {'blake2s256', 'sha256'}): | ||||
from . import HashCollision | |||||
raise HashCollision(algorithm, hash_, key) | raise HashCollision(algorithm, hash_, key) | ||||
for algorithm in DEFAULT_ALGORITHMS: | for algorithm in DEFAULT_ALGORITHMS: | ||||
hash_ = content.get_hash(algorithm) | hash_ = content.get_hash(algorithm) | ||||
self._content_indexes[algorithm][hash_].add(key) | self._content_indexes[algorithm][hash_].add(key) | ||||
self._objects[content.sha1_git].append( | self._objects[content.sha1_git].append( | ||||
('content', content.sha1)) | ('content', content.sha1)) | ||||
self._contents[key] = content | self._contents[key] = content | ||||
bisect.insort(self._sorted_sha1s, content.sha1) | bisect.insort(self._sorted_sha1s, content.sha1) | ||||
summary['content:add'] += 1 | summary['content:add'] += 1 | ||||
if with_data: | if with_data: | ||||
content_data = self._contents[key].data | content_data = self._contents[key].data | ||||
try: | |||||
self._contents[key] = attr.evolve( | self._contents[key] = attr.evolve( | ||||
self._contents[key], | self._contents[key], | ||||
data=None) | data=None) | ||||
except (KeyError, TypeError, ValueError) as e: | |||||
raise StorageArgumentException(*e.args) | |||||
summary['content:add:bytes'] += len(content_data) | summary['content:add:bytes'] += len(content_data) | ||||
self.objstorage.add(content_data, content.sha1) | self.objstorage.add(content_data, content.sha1) | ||||
return summary | return summary | ||||
def content_add(self, content): | def content_add(self, content): | ||||
now = datetime.datetime.now(tz=datetime.timezone.utc) | now = datetime.datetime.now(tz=datetime.timezone.utc) | ||||
try: | |||||
content = [attr.evolve(Content.from_dict(c), ctime=now) | content = [attr.evolve(Content.from_dict(c), ctime=now) | ||||
for c in content] | for c in content] | ||||
except (KeyError, TypeError, ValueError) as e: | |||||
raise StorageArgumentException(*e.args) | |||||
return self._content_add(content, with_data=True) | return self._content_add(content, with_data=True) | ||||
def content_update(self, content, keys=[]): | def content_update(self, content, keys=[]): | ||||
if self.journal_writer: | if self.journal_writer: | ||||
raise NotImplementedError( | raise NotImplementedError( | ||||
'content_update is not yet supported with a journal_writer.') | 'content_update is not yet supported with a journal_writer.') | ||||
for cont_update in content: | for cont_update in content: | ||||
cont_update = cont_update.copy() | cont_update = cont_update.copy() | ||||
sha1 = cont_update.pop('sha1') | sha1 = cont_update.pop('sha1') | ||||
for old_key in self._content_indexes['sha1'][sha1]: | for old_key in self._content_indexes['sha1'][sha1]: | ||||
old_cont = self._contents.pop(old_key) | old_cont = self._contents.pop(old_key) | ||||
for algorithm in DEFAULT_ALGORITHMS: | for algorithm in DEFAULT_ALGORITHMS: | ||||
hash_ = old_cont.get_hash(algorithm) | hash_ = old_cont.get_hash(algorithm) | ||||
self._content_indexes[algorithm][hash_].remove(old_key) | self._content_indexes[algorithm][hash_].remove(old_key) | ||||
try: | |||||
new_cont = attr.evolve(old_cont, **cont_update) | new_cont = attr.evolve(old_cont, **cont_update) | ||||
except (KeyError, TypeError, ValueError) as e: | |||||
raise StorageArgumentException(*e.args) | |||||
new_key = self._content_key(new_cont) | new_key = self._content_key(new_cont) | ||||
self._contents[new_key] = new_cont | self._contents[new_key] = new_cont | ||||
for algorithm in DEFAULT_ALGORITHMS: | for algorithm in DEFAULT_ALGORITHMS: | ||||
hash_ = new_cont.get_hash(algorithm) | hash_ = new_cont.get_hash(algorithm) | ||||
self._content_indexes[algorithm][hash_].add(new_key) | self._content_indexes[algorithm][hash_].add(new_key) | ||||
def content_add_metadata(self, content): | def content_add_metadata(self, content): | ||||
content = [Content.from_dict(c) for c in content] | content = [Content.from_dict(c) for c in content] | ||||
return self._content_add(content, with_data=False) | return self._content_add(content, with_data=False) | ||||
def content_get(self, content): | def content_get(self, content): | ||||
# FIXME: Make this method support slicing the `data`. | # FIXME: Make this method support slicing the `data`. | ||||
if len(content) > BULK_BLOCK_CONTENT_LEN_MAX: | if len(content) > BULK_BLOCK_CONTENT_LEN_MAX: | ||||
raise ValueError( | raise StorageArgumentException( | ||||
"Sending at most %s contents." % BULK_BLOCK_CONTENT_LEN_MAX) | "Sending at most %s contents." % BULK_BLOCK_CONTENT_LEN_MAX) | ||||
for obj_id in content: | for obj_id in content: | ||||
try: | try: | ||||
data = self.objstorage.get(obj_id) | data = self.objstorage.get(obj_id) | ||||
except ObjNotFoundError: | except ObjNotFoundError: | ||||
yield None | yield None | ||||
continue | continue | ||||
yield {'sha1': obj_id, 'data': data} | yield {'sha1': obj_id, 'data': data} | ||||
def content_get_range(self, start, end, limit=1000): | def content_get_range(self, start, end, limit=1000): | ||||
if limit is None: | if limit is None: | ||||
raise ValueError('Development error: limit should not be None') | raise StorageArgumentException('limit should not be None') | ||||
from_index = bisect.bisect_left(self._sorted_sha1s, start) | from_index = bisect.bisect_left(self._sorted_sha1s, start) | ||||
sha1s = itertools.islice(self._sorted_sha1s, from_index, None) | sha1s = itertools.islice(self._sorted_sha1s, from_index, None) | ||||
sha1s = ((sha1, content_key) | sha1s = ((sha1, content_key) | ||||
for sha1 in sha1s | for sha1 in sha1s | ||||
for content_key in self._content_indexes['sha1'][sha1]) | for content_key in self._content_indexes['sha1'][sha1]) | ||||
matched = [] | matched = [] | ||||
next_content = None | next_content = None | ||||
for sha1, key in sha1s: | for sha1, key in sha1s: | ||||
if sha1 > end: | if sha1 > end: | ||||
break | break | ||||
if len(matched) >= limit: | if len(matched) >= limit: | ||||
next_content = sha1 | next_content = sha1 | ||||
break | break | ||||
matched.append(self._contents[key].to_dict()) | matched.append(self._contents[key].to_dict()) | ||||
return { | return { | ||||
'contents': matched, | 'contents': matched, | ||||
'next': next_content, | 'next': next_content, | ||||
} | } | ||||
def content_get_partition( | def content_get_partition( | ||||
self, partition_id: int, nb_partitions: int, limit: int = 1000, | self, partition_id: int, nb_partitions: int, limit: int = 1000, | ||||
page_token: str = None): | page_token: str = None): | ||||
if limit is None: | if limit is None: | ||||
raise ValueError('Development error: limit should not be None') | raise StorageArgumentException('limit should not be None') | ||||
(start, end) = get_partition_bounds_bytes( | (start, end) = get_partition_bounds_bytes( | ||||
partition_id, nb_partitions, SHA1_SIZE) | partition_id, nb_partitions, SHA1_SIZE) | ||||
if page_token: | if page_token: | ||||
start = hash_to_bytes(page_token) | start = hash_to_bytes(page_token) | ||||
if end is None: | if end is None: | ||||
end = b'\xff'*SHA1_SIZE | end = b'\xff'*SHA1_SIZE | ||||
result = self.content_get_range(start, end, limit) | result = self.content_get_range(start, end, limit) | ||||
result2 = { | result2 = { | ||||
Show All 17 Lines | def content_get_metadata( | ||||
del d['ctime'] | del d['ctime'] | ||||
if 'data' in d: | if 'data' in d: | ||||
del d['data'] | del d['data'] | ||||
result[sha1].append(d) | result[sha1].append(d) | ||||
return result | return result | ||||
def content_find(self, content): | def content_find(self, content): | ||||
if not set(content).intersection(DEFAULT_ALGORITHMS): | if not set(content).intersection(DEFAULT_ALGORITHMS): | ||||
raise ValueError('content keys must contain at least one of: ' | raise StorageArgumentException( | ||||
'%s' % ', '.join(sorted(DEFAULT_ALGORITHMS))) | 'content keys must contain at least one of: %s' | ||||
% ', '.join(sorted(DEFAULT_ALGORITHMS))) | |||||
found = [] | found = [] | ||||
for algo in DEFAULT_ALGORITHMS: | for algo in DEFAULT_ALGORITHMS: | ||||
hash = content.get(algo) | hash = content.get(algo) | ||||
if hash and hash in self._content_indexes[algo]: | if hash and hash in self._content_indexes[algo]: | ||||
found.append(self._content_indexes[algo][hash]) | found.append(self._content_indexes[algo][hash]) | ||||
if not found: | if not found: | ||||
return [] | return [] | ||||
Show All 29 Lines | class InMemoryStorage: | ||||
def _skipped_content_add(self, contents): | def _skipped_content_add(self, contents): | ||||
for content in contents: | for content in contents: | ||||
if content.status is None: | if content.status is None: | ||||
content = attr.evolve(content, status='absent') | content = attr.evolve(content, status='absent') | ||||
if content.length is None: | if content.length is None: | ||||
content = attr.evolve(content, length=-1) | content = attr.evolve(content, length=-1) | ||||
if content.status != 'absent': | if content.status != 'absent': | ||||
raise ValueError(f'Content with status={content.status}') | raise StorageArgumentException( | ||||
f'Content with status={content.status}') | |||||
if self.journal_writer: | if self.journal_writer: | ||||
for content in contents: | for content in contents: | ||||
self.journal_writer.write_addition('content', content) | self.journal_writer.write_addition('content', content) | ||||
summary = { | summary = { | ||||
'skipped_content:add': 0 | 'skipped_content:add': 0 | ||||
} | } | ||||
Show All 23 Lines | def skipped_content_missing(self, contents): | ||||
yield {algo: content[algo] | yield {algo: content[algo] | ||||
for algo in DEFAULT_ALGORITHMS | for algo in DEFAULT_ALGORITHMS | ||||
if content[algo] is not None} | if content[algo] is not None} | ||||
break | break | ||||
def skipped_content_add(self, content): | def skipped_content_add(self, content): | ||||
content = list(content) | content = list(content) | ||||
now = datetime.datetime.now(tz=datetime.timezone.utc) | now = datetime.datetime.now(tz=datetime.timezone.utc) | ||||
try: | |||||
content = [attr.evolve(SkippedContent.from_dict(c), ctime=now) | content = [attr.evolve(SkippedContent.from_dict(c), ctime=now) | ||||
for c in content] | for c in content] | ||||
except (KeyError, TypeError, ValueError) as e: | |||||
raise StorageArgumentException(*e.args) | |||||
return self._skipped_content_add(content) | return self._skipped_content_add(content) | ||||
def directory_add(self, directories): | def directory_add(self, directories): | ||||
directories = list(directories) | directories = list(directories) | ||||
if self.journal_writer: | if self.journal_writer: | ||||
self.journal_writer.write_additions( | self.journal_writer.write_additions( | ||||
'directory', | 'directory', | ||||
(dir_ for dir_ in directories | (dir_ for dir_ in directories | ||||
if dir_['id'] not in self._directories)) | if dir_['id'] not in self._directories)) | ||||
try: | |||||
directories = [Directory.from_dict(d) for d in directories] | directories = [Directory.from_dict(d) for d in directories] | ||||
except (KeyError, TypeError, ValueError) as e: | |||||
raise StorageArgumentException(*e.args) | |||||
count = 0 | count = 0 | ||||
for directory in directories: | for directory in directories: | ||||
if directory.id not in self._directories: | if directory.id not in self._directories: | ||||
count += 1 | count += 1 | ||||
self._directories[directory.id] = directory | self._directories[directory.id] = directory | ||||
self._objects[directory.id].append( | self._objects[directory.id].append( | ||||
('directory', directory.id)) | ('directory', directory.id)) | ||||
▲ Show 20 Lines • Show All 76 Lines • ▼ Show 20 Lines | class InMemoryStorage: | ||||
def revision_add(self, revisions): | def revision_add(self, revisions): | ||||
revisions = list(revisions) | revisions = list(revisions) | ||||
if self.journal_writer: | if self.journal_writer: | ||||
self.journal_writer.write_additions( | self.journal_writer.write_additions( | ||||
'revision', | 'revision', | ||||
(rev for rev in revisions | (rev for rev in revisions | ||||
if rev['id'] not in self._revisions)) | if rev['id'] not in self._revisions)) | ||||
try: | |||||
revisions = [Revision.from_dict(rev) for rev in revisions] | revisions = [Revision.from_dict(rev) for rev in revisions] | ||||
except (KeyError, TypeError, ValueError) as e: | |||||
raise StorageArgumentException(*e.args) | |||||
count = 0 | count = 0 | ||||
for revision in revisions: | for revision in revisions: | ||||
if revision.id not in self._revisions: | if revision.id not in self._revisions: | ||||
revision = attr.evolve( | revision = attr.evolve( | ||||
revision, | revision, | ||||
committer=self._person_add(revision.committer), | committer=self._person_add(revision.committer), | ||||
author=self._person_add(revision.author)) | author=self._person_add(revision.author)) | ||||
▲ Show 20 Lines • Show All 41 Lines • ▼ Show 20 Lines | class InMemoryStorage: | ||||
def release_add(self, releases): | def release_add(self, releases): | ||||
releases = list(releases) | releases = list(releases) | ||||
if self.journal_writer: | if self.journal_writer: | ||||
self.journal_writer.write_additions( | self.journal_writer.write_additions( | ||||
'release', | 'release', | ||||
(rel for rel in releases | (rel for rel in releases | ||||
if rel['id'] not in self._releases)) | if rel['id'] not in self._releases)) | ||||
try: | |||||
releases = [Release.from_dict(rel) for rel in releases] | releases = [Release.from_dict(rel) for rel in releases] | ||||
except (KeyError, TypeError, ValueError) as e: | |||||
raise StorageArgumentException(*e.args) | |||||
count = 0 | count = 0 | ||||
for rel in releases: | for rel in releases: | ||||
if rel.id not in self._releases: | if rel.id not in self._releases: | ||||
if rel.author: | if rel.author: | ||||
self._person_add(rel.author) | self._person_add(rel.author) | ||||
self._objects[rel.id].append( | self._objects[rel.id].append( | ||||
('release', rel.id)) | ('release', rel.id)) | ||||
Show All 12 Lines | def release_get(self, releases): | ||||
else: | else: | ||||
yield None | yield None | ||||
def release_get_random(self): | def release_get_random(self): | ||||
return random.choice(list(self._releases)) | return random.choice(list(self._releases)) | ||||
def snapshot_add(self, snapshots): | def snapshot_add(self, snapshots): | ||||
count = 0 | count = 0 | ||||
snapshots = (Snapshot.from_dict(d) for d in snapshots) | try: | ||||
snapshots = [Snapshot.from_dict(d) for d in snapshots] | |||||
except (KeyError, TypeError, ValueError) as e: | |||||
raise StorageArgumentException(*e.args) | |||||
snapshots = (snap for snap in snapshots | snapshots = (snap for snap in snapshots | ||||
if snap.id not in self._snapshots) | if snap.id not in self._snapshots) | ||||
for snapshot in snapshots: | for snapshot in snapshots: | ||||
if self.journal_writer: | if self.journal_writer: | ||||
self.journal_writer.write_addition('snapshot', snapshot) | self.journal_writer.write_addition('snapshot', snapshot) | ||||
sorted_branch_names = sorted(snapshot.branches) | sorted_branch_names = sorted(snapshot.branches) | ||||
self._snapshots[snapshot.id] = (snapshot, sorted_branch_names) | self._snapshots[snapshot.id] = (snapshot, sorted_branch_names) | ||||
Show All 31 Lines | def snapshot_get_latest(self, origin, allowed_statuses=None): | ||||
visit = self.origin_visit_get_latest( | visit = self.origin_visit_get_latest( | ||||
origin_url, | origin_url, | ||||
allowed_statuses=allowed_statuses, | allowed_statuses=allowed_statuses, | ||||
require_snapshot=True) | require_snapshot=True) | ||||
if visit and visit['snapshot']: | if visit and visit['snapshot']: | ||||
snapshot = self.snapshot_get(visit['snapshot']) | snapshot = self.snapshot_get(visit['snapshot']) | ||||
if not snapshot: | if not snapshot: | ||||
raise ValueError( | raise StorageArgumentException( | ||||
'last origin visit references an unknown snapshot') | 'last origin visit references an unknown snapshot') | ||||
return snapshot | return snapshot | ||||
def snapshot_count_branches(self, snapshot_id): | def snapshot_count_branches(self, snapshot_id): | ||||
(snapshot, _) = self._snapshots[snapshot_id] | (snapshot, _) = self._snapshots[snapshot_id] | ||||
return collections.Counter(branch.target_type.value if branch else None | return collections.Counter(branch.target_type.value if branch else None | ||||
for branch in snapshot.branches.values()) | for branch in snapshot.branches.values()) | ||||
▲ Show 20 Lines • Show All 61 Lines • ▼ Show 20 Lines | def origin_get(self, origins): | ||||
return_single = True | return_single = True | ||||
origins = [origins] | origins = [origins] | ||||
else: | else: | ||||
return_single = False | return_single = False | ||||
# Sanity check to be error-compatible with the pgsql backend | # Sanity check to be error-compatible with the pgsql backend | ||||
if any('id' in origin for origin in origins) \ | if any('id' in origin for origin in origins) \ | ||||
and not all('id' in origin for origin in origins): | and not all('id' in origin for origin in origins): | ||||
raise ValueError( | raise StorageArgumentException( | ||||
'Either all origins or none at all should have an "id".') | 'Either all origins or none at all should have an "id".') | ||||
if any('url' in origin for origin in origins) \ | if any('url' in origin for origin in origins) \ | ||||
and not all('url' in origin for origin in origins): | and not all('url' in origin for origin in origins): | ||||
raise ValueError( | raise StorageArgumentException( | ||||
'Either all origins or none at all should have ' | 'Either all origins or none at all should have ' | ||||
'an "url" key.') | 'an "url" key.') | ||||
results = [] | results = [] | ||||
for origin in origins: | for origin in origins: | ||||
result = None | result = None | ||||
if 'url' in origin: | if 'url' in origin: | ||||
if origin['url'] in self._origins: | if origin['url'] in self._origins: | ||||
result = self._origins[origin['url']] | result = self._origins[origin['url']] | ||||
else: | else: | ||||
raise ValueError( | raise StorageArgumentException( | ||||
'Origin must have an url.') | 'Origin must have an url.') | ||||
results.append(self._convert_origin(result)) | results.append(self._convert_origin(result)) | ||||
if return_single: | if return_single: | ||||
assert len(results) == 1 | assert len(results) == 1 | ||||
return results[0] | return results[0] | ||||
else: | else: | ||||
return results | return results | ||||
▲ Show 20 Lines • Show All 59 Lines • ▼ Show 20 Lines | class InMemoryStorage: | ||||
def origin_add(self, origins): | def origin_add(self, origins): | ||||
origins = copy.deepcopy(list(origins)) | origins = copy.deepcopy(list(origins)) | ||||
for origin in origins: | for origin in origins: | ||||
self.origin_add_one(origin) | self.origin_add_one(origin) | ||||
return origins | return origins | ||||
def origin_add_one(self, origin): | def origin_add_one(self, origin): | ||||
try: | |||||
origin = Origin.from_dict(origin) | origin = Origin.from_dict(origin) | ||||
except (KeyError, TypeError, ValueError) as e: | |||||
raise StorageArgumentException(*e.args) | |||||
if origin.url not in self._origins: | if origin.url not in self._origins: | ||||
if self.journal_writer: | if self.journal_writer: | ||||
self.journal_writer.write_addition('origin', origin) | self.journal_writer.write_addition('origin', origin) | ||||
# generate an origin_id because it is needed by origin_get_range. | # generate an origin_id because it is needed by origin_get_range. | ||||
# TODO: remove this when we remove origin_get_range | # TODO: remove this when we remove origin_get_range | ||||
origin_id = len(self._origins) + 1 | origin_id = len(self._origins) + 1 | ||||
self._origins_by_id.append(origin.url) | self._origins_by_id.append(origin.url) | ||||
assert len(self._origins_by_id) == origin_id | assert len(self._origins_by_id) == origin_id | ||||
self._origins[origin.url] = origin | self._origins[origin.url] = origin | ||||
self._origins_by_sha1[origin_url_to_sha1(origin.url)] = origin | self._origins_by_sha1[origin_url_to_sha1(origin.url)] = origin | ||||
self._origin_visits[origin.url] = [] | self._origin_visits[origin.url] = [] | ||||
self._objects[origin.url].append(('origin', origin.url)) | self._objects[origin.url].append(('origin', origin.url)) | ||||
return origin.url | return origin.url | ||||
def origin_visit_add(self, origin, date, type): | def origin_visit_add(self, origin, date, type): | ||||
origin_url = origin | origin_url = origin | ||||
if origin_url is None: | if origin_url is None: | ||||
raise ValueError('Unknown origin.') | raise StorageArgumentException('Unknown origin.') | ||||
if isinstance(date, str): | if isinstance(date, str): | ||||
# FIXME: Converge on iso8601 at some point | # FIXME: Converge on iso8601 at some point | ||||
date = dateutil.parser.parse(date) | date = dateutil.parser.parse(date) | ||||
elif not isinstance(date, datetime.datetime): | elif not isinstance(date, datetime.datetime): | ||||
raise TypeError('date must be a datetime or a string.') | raise StorageArgumentException( | ||||
'date must be a datetime or a string.') | |||||
visit_ret = None | visit_ret = None | ||||
if origin_url in self._origins: | if origin_url in self._origins: | ||||
origin = self._origins[origin_url] | origin = self._origins[origin_url] | ||||
# visit ids are in the range [1, +inf[ | # visit ids are in the range [1, +inf[ | ||||
visit_id = len(self._origin_visits[origin_url]) + 1 | visit_id = len(self._origin_visits[origin_url]) + 1 | ||||
status = 'ongoing' | status = 'ongoing' | ||||
visit = OriginVisit( | visit = OriginVisit( | ||||
Show All 20 Lines | def origin_visit_add(self, origin, date, type): | ||||
return visit_ret | return visit_ret | ||||
def origin_visit_update(self, origin, visit_id, status=None, | def origin_visit_update(self, origin, visit_id, status=None, | ||||
metadata=None, snapshot=None): | metadata=None, snapshot=None): | ||||
if not isinstance(origin, str): | if not isinstance(origin, str): | ||||
raise TypeError('origin must be a string, not %r' % (origin,)) | raise TypeError('origin must be a string, not %r' % (origin,)) | ||||
origin_url = self._get_origin_url(origin) | origin_url = self._get_origin_url(origin) | ||||
if origin_url is None: | if origin_url is None: | ||||
raise ValueError('Unknown origin.') | raise StorageArgumentException('Unknown origin.') | ||||
try: | try: | ||||
visit = self._origin_visits[origin_url][visit_id-1] | visit = self._origin_visits[origin_url][visit_id-1] | ||||
except IndexError: | except IndexError: | ||||
raise ValueError('Unknown visit_id for this origin') \ | raise StorageArgumentException( | ||||
from None | 'Unknown visit_id for this origin') from None | ||||
updates = {} | updates = {} | ||||
if status: | if status: | ||||
updates['status'] = status | updates['status'] = status | ||||
if metadata: | if metadata: | ||||
updates['metadata'] = metadata | updates['metadata'] = metadata | ||||
if snapshot: | if snapshot: | ||||
updates['snapshot'] = snapshot | updates['snapshot'] = snapshot | ||||
try: | |||||
visit = attr.evolve(visit, **updates) | visit = attr.evolve(visit, **updates) | ||||
except (KeyError, TypeError, ValueError) as e: | |||||
raise StorageArgumentException(*e.args) | |||||
if self.journal_writer: | if self.journal_writer: | ||||
self.journal_writer.write_update('origin_visit', visit) | self.journal_writer.write_update('origin_visit', visit) | ||||
self._origin_visits[origin_url][visit_id-1] = visit | self._origin_visits[origin_url][visit_id-1] = visit | ||||
def origin_visit_upsert(self, visits): | def origin_visit_upsert(self, visits): | ||||
for visit in visits: | for visit in visits: | ||||
if not isinstance(visit['origin'], str): | if not isinstance(visit['origin'], str): | ||||
raise TypeError("visit['origin'] must be a string, not %r" | raise TypeError("visit['origin'] must be a string, not %r" | ||||
% (visit['origin'],)) | % (visit['origin'],)) | ||||
try: | |||||
visits = [OriginVisit.from_dict(d) for d in visits] | visits = [OriginVisit.from_dict(d) for d in visits] | ||||
except (KeyError, TypeError, ValueError) as e: | |||||
raise StorageArgumentException(*e.args) | |||||
if self.journal_writer: | if self.journal_writer: | ||||
for visit in visits: | for visit in visits: | ||||
self.journal_writer.write_addition('origin_visit', visit) | self.journal_writer.write_addition('origin_visit', visit) | ||||
for visit in visits: | for visit in visits: | ||||
visit_id = visit.visit | visit_id = visit.visit | ||||
origin_url = visit.origin | origin_url = visit.origin | ||||
try: | |||||
visit = attr.evolve(visit, origin=origin_url) | visit = attr.evolve(visit, origin=origin_url) | ||||
except (KeyError, TypeError, ValueError) as e: | |||||
raise StorageArgumentException(*e.args) | |||||
self._objects[(origin_url, visit_id)].append( | self._objects[(origin_url, visit_id)].append( | ||||
('origin_visit', None)) | ('origin_visit', None)) | ||||
while len(self._origin_visits[origin_url]) <= visit_id: | while len(self._origin_visits[origin_url]) <= visit_id: | ||||
self._origin_visits[origin_url].append(None) | self._origin_visits[origin_url].append(None) | ||||
self._origin_visits[origin_url][visit_id-1] = visit | self._origin_visits[origin_url][visit_id-1] = visit | ||||
▲ Show 20 Lines • Show All 213 Lines • Show Last 20 Lines |