Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/in_memory.py
Show First 20 Lines • Show All 170 Lines • ▼ Show 20 Lines | def reset(self): | ||||
self._objects = defaultdict(list) | self._objects = defaultdict(list) | ||||
self._sorted_sha1s = SortedList[bytes, bytes]() | self._sorted_sha1s = SortedList[bytes, bytes]() | ||||
self.objstorage = ObjStorage({"cls": "memory", "args": {}}) | self.objstorage = ObjStorage({"cls": "memory", "args": {}}) | ||||
def check_config(self, *, check_write): | def check_config(self, *, check_write): | ||||
return True | return True | ||||
def _content_add(self, contents: Iterable[Content], with_data: bool) -> Dict: | def _content_add(self, contents: List[Content], with_data: bool) -> Dict: | ||||
self.journal_writer.content_add(contents) | self.journal_writer.content_add(contents) | ||||
content_add = 0 | content_add = 0 | ||||
if with_data: | if with_data: | ||||
summary = self.objstorage.content_add( | summary = self.objstorage.content_add( | ||||
c for c in contents if c.status != "absent" | c for c in contents if c.status != "absent" | ||||
) | ) | ||||
content_add_bytes = summary["content:add:bytes"] | content_add_bytes = summary["content:add:bytes"] | ||||
Show All 27 Lines | def _content_add(self, contents: List[Content], with_data: bool) -> Dict: | ||||
summary = { | summary = { | ||||
"content:add": content_add, | "content:add": content_add, | ||||
} | } | ||||
if with_data: | if with_data: | ||||
summary["content:add:bytes"] = content_add_bytes | summary["content:add:bytes"] = content_add_bytes | ||||
return summary | return summary | ||||
def content_add(self, content: Iterable[Content]) -> Dict: | def content_add(self, content: List[Content]) -> Dict: | ||||
content = [attr.evolve(c, ctime=now()) for c in content] | content = [attr.evolve(c, ctime=now()) for c in content] | ||||
return self._content_add(content, with_data=True) | return self._content_add(content, with_data=True) | ||||
def content_update(self, content, keys=[]): | def content_update(self, content, keys=[]): | ||||
self.journal_writer.content_update(content) | self.journal_writer.content_update(content) | ||||
for cont_update in content: | for cont_update in content: | ||||
cont_update = cont_update.copy() | cont_update = cont_update.copy() | ||||
Show All 9 Lines | def content_update(self, content, keys=[]): | ||||
new_key = self._content_key(new_cont) | new_key = self._content_key(new_cont) | ||||
self._contents[new_key] = new_cont | self._contents[new_key] = new_cont | ||||
for algorithm in DEFAULT_ALGORITHMS: | for algorithm in DEFAULT_ALGORITHMS: | ||||
hash_ = new_cont.get_hash(algorithm) | hash_ = new_cont.get_hash(algorithm) | ||||
self._content_indexes[algorithm][hash_].add(new_key) | self._content_indexes[algorithm][hash_].add(new_key) | ||||
def content_add_metadata(self, content: Iterable[Content]) -> Dict: | def content_add_metadata(self, content: List[Content]) -> Dict: | ||||
return self._content_add(content, with_data=False) | return self._content_add(content, with_data=False) | ||||
def content_get(self, content): | def content_get(self, content): | ||||
# FIXME: Make this method support slicing the `data`. | # FIXME: Make this method support slicing the `data`. | ||||
if len(content) > BULK_BLOCK_CONTENT_LEN_MAX: | if len(content) > BULK_BLOCK_CONTENT_LEN_MAX: | ||||
raise StorageArgumentException( | raise StorageArgumentException( | ||||
"Sending at most %s contents." % BULK_BLOCK_CONTENT_LEN_MAX | "Sending at most %s contents." % BULK_BLOCK_CONTENT_LEN_MAX | ||||
) | ) | ||||
▲ Show 20 Lines • Show All 118 Lines • ▼ Show 20 Lines | def _skipped_content_add(self, contents: List[SkippedContent]) -> Dict: | ||||
for algo in DEFAULT_ALGORITHMS: | for algo in DEFAULT_ALGORITHMS: | ||||
if content.get_hash(algo): | if content.get_hash(algo): | ||||
self._skipped_content_indexes[algo][content.get_hash(algo)].add(key) | self._skipped_content_indexes[algo][content.get_hash(algo)].add(key) | ||||
self._skipped_contents[key] = content | self._skipped_contents[key] = content | ||||
summary["skipped_content:add"] += 1 | summary["skipped_content:add"] += 1 | ||||
return summary | return summary | ||||
def skipped_content_add(self, content: Iterable[SkippedContent]) -> Dict: | def skipped_content_add(self, content: List[SkippedContent]) -> Dict: | ||||
content = [attr.evolve(c, ctime=now()) for c in content] | content = [attr.evolve(c, ctime=now()) for c in content] | ||||
return self._skipped_content_add(content) | return self._skipped_content_add(content) | ||||
def skipped_content_missing(self, contents): | def skipped_content_missing(self, contents): | ||||
for content in contents: | for content in contents: | ||||
matches = list(self._skipped_contents.values()) | matches = list(self._skipped_contents.values()) | ||||
for (algorithm, key) in self._content_key(content): | for (algorithm, key) in self._content_key(content): | ||||
if algorithm == "blake2s256": | if algorithm == "blake2s256": | ||||
continue | continue | ||||
# Filter out skipped contents with the same hash | # Filter out skipped contents with the same hash | ||||
matches = [ | matches = [ | ||||
match for match in matches if match.get_hash(algorithm) == key | match for match in matches if match.get_hash(algorithm) == key | ||||
] | ] | ||||
# if none of the contents match | # if none of the contents match | ||||
if not matches: | if not matches: | ||||
yield {algo: content[algo] for algo in DEFAULT_ALGORITHMS} | yield {algo: content[algo] for algo in DEFAULT_ALGORITHMS} | ||||
def directory_add(self, directories: Iterable[Directory]) -> Dict: | def directory_add(self, directories: List[Directory]) -> Dict: | ||||
directories = [dir_ for dir_ in directories if dir_.id not in self._directories] | directories = [dir_ for dir_ in directories if dir_.id not in self._directories] | ||||
self.journal_writer.directory_add(directories) | self.journal_writer.directory_add(directories) | ||||
count = 0 | count = 0 | ||||
for directory in directories: | for directory in directories: | ||||
count += 1 | count += 1 | ||||
self._directories[directory.id] = directory | self._directories[directory.id] = directory | ||||
self._objects[directory.id].append(("directory", directory.id)) | self._objects[directory.id].append(("directory", directory.id)) | ||||
▲ Show 20 Lines • Show All 70 Lines • ▼ Show 20 Lines | def _directory_entry_get_by_path(self, directory, paths, prefix): | ||||
if not first_item or first_item["type"] != "dir": | if not first_item or first_item["type"] != "dir": | ||||
return | return | ||||
return self._directory_entry_get_by_path( | return self._directory_entry_get_by_path( | ||||
first_item["target"], paths[1:], prefix + paths[0] + b"/" | first_item["target"], paths[1:], prefix + paths[0] + b"/" | ||||
) | ) | ||||
def revision_add(self, revisions: Iterable[Revision]) -> Dict: | def revision_add(self, revisions: List[Revision]) -> Dict: | ||||
revisions = [rev for rev in revisions if rev.id not in self._revisions] | revisions = [rev for rev in revisions if rev.id not in self._revisions] | ||||
self.journal_writer.revision_add(revisions) | self.journal_writer.revision_add(revisions) | ||||
count = 0 | count = 0 | ||||
for revision in revisions: | for revision in revisions: | ||||
revision = attr.evolve( | revision = attr.evolve( | ||||
revision, | revision, | ||||
committer=self._person_add(revision.committer), | committer=self._person_add(revision.committer), | ||||
Show All 35 Lines | class InMemoryStorage: | ||||
def revision_shortlog(self, revisions, limit=None): | def revision_shortlog(self, revisions, limit=None): | ||||
yield from ( | yield from ( | ||||
(rev["id"], rev["parents"]) for rev in self.revision_log(revisions, limit) | (rev["id"], rev["parents"]) for rev in self.revision_log(revisions, limit) | ||||
) | ) | ||||
def revision_get_random(self): | def revision_get_random(self): | ||||
return random.choice(list(self._revisions)) | return random.choice(list(self._revisions)) | ||||
def release_add(self, releases: Iterable[Release]) -> Dict: | def release_add(self, releases: List[Release]) -> Dict: | ||||
to_add = [] | to_add = [] | ||||
for rel in releases: | for rel in releases: | ||||
if rel.id not in self._releases and rel not in to_add: | if rel.id not in self._releases and rel not in to_add: | ||||
to_add.append(rel) | to_add.append(rel) | ||||
self.journal_writer.release_add(to_add) | self.journal_writer.release_add(to_add) | ||||
for rel in to_add: | for rel in to_add: | ||||
if rel.author: | if rel.author: | ||||
Show All 11 Lines | def release_get(self, releases): | ||||
if rel_id in self._releases: | if rel_id in self._releases: | ||||
yield self._releases[rel_id].to_dict() | yield self._releases[rel_id].to_dict() | ||||
else: | else: | ||||
yield None | yield None | ||||
def release_get_random(self): | def release_get_random(self): | ||||
return random.choice(list(self._releases)) | return random.choice(list(self._releases)) | ||||
def snapshot_add(self, snapshots: Iterable[Snapshot]) -> Dict: | def snapshot_add(self, snapshots: List[Snapshot]) -> Dict: | ||||
count = 0 | count = 0 | ||||
snapshots = (snap for snap in snapshots if snap.id not in self._snapshots) | snapshots = [snap for snap in snapshots if snap.id not in self._snapshots] | ||||
for snapshot in snapshots: | for snapshot in snapshots: | ||||
self.journal_writer.snapshot_add([snapshot]) | self.journal_writer.snapshot_add([snapshot]) | ||||
self._snapshots[snapshot.id] = snapshot | self._snapshots[snapshot.id] = snapshot | ||||
self._objects[snapshot.id].append(("snapshot", snapshot.id)) | self._objects[snapshot.id].append(("snapshot", snapshot.id)) | ||||
count += 1 | count += 1 | ||||
return {"snapshot:add": count} | return {"snapshot:add": count} | ||||
▲ Show 20 Lines • Show All 88 Lines • ▼ Show 20 Lines | def _convert_origin(self, t): | ||||
if t is None: | if t is None: | ||||
return None | return None | ||||
return t.to_dict() | return t.to_dict() | ||||
def origin_get_one(self, origin_url: str) -> Optional[Origin]: | def origin_get_one(self, origin_url: str) -> Optional[Origin]: | ||||
return self._origins.get(origin_url) | return self._origins.get(origin_url) | ||||
def origin_get(self, origins: Iterable[str]) -> Iterable[Optional[Origin]]: | def origin_get(self, origins: List[str]) -> Iterable[Optional[Origin]]: | ||||
return [self.origin_get_one(origin_url) for origin_url in origins] | return [self.origin_get_one(origin_url) for origin_url in origins] | ||||
def origin_get_by_sha1(self, sha1s): | def origin_get_by_sha1(self, sha1s): | ||||
return [self._convert_origin(self._origins_by_sha1.get(sha1)) for sha1 in sha1s] | return [self._convert_origin(self._origins_by_sha1.get(sha1)) for sha1 in sha1s] | ||||
def origin_get_range(self, origin_from=1, origin_count=100): | def origin_get_range(self, origin_from=1, origin_count=100): | ||||
origin_from = max(origin_from, 1) | origin_from = max(origin_from, 1) | ||||
if origin_from <= len(self._origins_by_id): | if origin_from <= len(self._origins_by_id): | ||||
▲ Show 20 Lines • Show All 53 Lines • ▼ Show 20 Lines | def origin_count(self, url_pattern, regexp=False, with_visit=False): | ||||
self.origin_search( | self.origin_search( | ||||
url_pattern, | url_pattern, | ||||
regexp=regexp, | regexp=regexp, | ||||
with_visit=with_visit, | with_visit=with_visit, | ||||
limit=len(self._origins), | limit=len(self._origins), | ||||
) | ) | ||||
) | ) | ||||
def origin_add(self, origins: Iterable[Origin]) -> Dict[str, int]: | def origin_add(self, origins: List[Origin]) -> Dict[str, int]: | ||||
origins = list(origins) | |||||
added = 0 | added = 0 | ||||
for origin in origins: | for origin in origins: | ||||
if origin.url not in self._origins: | if origin.url not in self._origins: | ||||
self.origin_add_one(origin) | self.origin_add_one(origin) | ||||
added += 1 | added += 1 | ||||
return {"origin:add": added} | return {"origin:add": added} | ||||
def origin_add_one(self, origin: Origin) -> str: | def origin_add_one(self, origin: Origin) -> str: | ||||
if origin.url not in self._origins: | if origin.url not in self._origins: | ||||
self.journal_writer.origin_add([origin]) | self.journal_writer.origin_add([origin]) | ||||
# generate an origin_id because it is needed by origin_get_range. | # generate an origin_id because it is needed by origin_get_range. | ||||
# TODO: remove this when we remove origin_get_range | # TODO: remove this when we remove origin_get_range | ||||
origin_id = len(self._origins) + 1 | origin_id = len(self._origins) + 1 | ||||
self._origins_by_id.append(origin.url) | self._origins_by_id.append(origin.url) | ||||
assert len(self._origins_by_id) == origin_id | assert len(self._origins_by_id) == origin_id | ||||
self._origins[origin.url] = origin | self._origins[origin.url] = origin | ||||
self._origins_by_sha1[origin_url_to_sha1(origin.url)] = origin | self._origins_by_sha1[origin_url_to_sha1(origin.url)] = origin | ||||
self._origin_visits[origin.url] = [] | self._origin_visits[origin.url] = [] | ||||
self._objects[origin.url].append(("origin", origin.url)) | self._objects[origin.url].append(("origin", origin.url)) | ||||
return origin.url | return origin.url | ||||
def origin_visit_add(self, visits: Iterable[OriginVisit]) -> Iterable[OriginVisit]: | def origin_visit_add(self, visits: List[OriginVisit]) -> Iterable[OriginVisit]: | ||||
for visit in visits: | for visit in visits: | ||||
origin = self.origin_get_one(visit.origin) | origin = self.origin_get_one(visit.origin) | ||||
if not origin: # Cannot add a visit without an origin | if not origin: # Cannot add a visit without an origin | ||||
raise StorageArgumentException("Unknown origin %s", visit.origin) | raise StorageArgumentException("Unknown origin %s", visit.origin) | ||||
all_visits = [] | all_visits = [] | ||||
for visit in visits: | for visit in visits: | ||||
origin_url = visit.origin | origin_url = visit.origin | ||||
Show All 32 Lines | def _origin_visit_status_add_one(self, visit_status: OriginVisitStatus) -> None: | ||||
""" | """ | ||||
self.journal_writer.origin_visit_status_add([visit_status]) | self.journal_writer.origin_visit_status_add([visit_status]) | ||||
visit_key = (visit_status.origin, visit_status.visit) | visit_key = (visit_status.origin, visit_status.visit) | ||||
self._origin_visit_statuses.setdefault(visit_key, []) | self._origin_visit_statuses.setdefault(visit_key, []) | ||||
visit_statuses = self._origin_visit_statuses[visit_key] | visit_statuses = self._origin_visit_statuses[visit_key] | ||||
if visit_status not in visit_statuses: | if visit_status not in visit_statuses: | ||||
visit_statuses.append(visit_status) | visit_statuses.append(visit_status) | ||||
def origin_visit_status_add( | def origin_visit_status_add(self, visit_statuses: List[OriginVisitStatus],) -> None: | ||||
self, visit_statuses: Iterable[OriginVisitStatus], | |||||
) -> None: | |||||
# First round to check existence (fail early if any is ko) | # First round to check existence (fail early if any is ko) | ||||
for visit_status in visit_statuses: | for visit_status in visit_statuses: | ||||
origin_url = self.origin_get_one(visit_status.origin) | origin_url = self.origin_get_one(visit_status.origin) | ||||
if not origin_url: | if not origin_url: | ||||
raise StorageArgumentException(f"Unknown origin {visit_status.origin}") | raise StorageArgumentException(f"Unknown origin {visit_status.origin}") | ||||
for visit_status in visit_statuses: | for visit_status in visit_statuses: | ||||
self._origin_visit_status_add_one(visit_status) | self._origin_visit_status_add_one(visit_status) | ||||
▲ Show 20 Lines • Show All 181 Lines • ▼ Show 20 Lines | def stat_counters(self): | ||||
for (obj_type, obj_id) in itertools.chain(*self._objects.values()) | for (obj_type, obj_id) in itertools.chain(*self._objects.values()) | ||||
) | ) | ||||
) | ) | ||||
return stats | return stats | ||||
def refresh_stat_counters(self): | def refresh_stat_counters(self): | ||||
pass | pass | ||||
def raw_extrinsic_metadata_add( | def raw_extrinsic_metadata_add(self, metadata: List[RawExtrinsicMetadata],) -> None: | ||||
self, metadata: Iterable[RawExtrinsicMetadata], | |||||
) -> None: | |||||
metadata = list(metadata) | |||||
self.journal_writer.raw_extrinsic_metadata_add(metadata) | self.journal_writer.raw_extrinsic_metadata_add(metadata) | ||||
for metadata_entry in metadata: | for metadata_entry in metadata: | ||||
authority_key = self._metadata_authority_key(metadata_entry.authority) | authority_key = self._metadata_authority_key(metadata_entry.authority) | ||||
if authority_key not in self._metadata_authorities: | if authority_key not in self._metadata_authorities: | ||||
raise StorageArgumentException( | raise StorageArgumentException( | ||||
f"Unknown authority {metadata_entry.authority}" | f"Unknown authority {metadata_entry.authority}" | ||||
) | ) | ||||
fetcher_key = self._metadata_fetcher_key(metadata_entry.fetcher) | fetcher_key = self._metadata_fetcher_key(metadata_entry.fetcher) | ||||
▲ Show 20 Lines • Show All 94 Lines • ▼ Show 20 Lines | ) -> Dict[str, Union[Optional[bytes], List[RawExtrinsicMetadata]]]: | ||||
else: | else: | ||||
next_page_token = None | next_page_token = None | ||||
return { | return { | ||||
"next_page_token": next_page_token, | "next_page_token": next_page_token, | ||||
"results": results, | "results": results, | ||||
} | } | ||||
def metadata_fetcher_add(self, fetchers: Iterable[MetadataFetcher]) -> None: | def metadata_fetcher_add(self, fetchers: List[MetadataFetcher]) -> None: | ||||
fetchers = list(fetchers) | |||||
self.journal_writer.metadata_fetcher_add(fetchers) | self.journal_writer.metadata_fetcher_add(fetchers) | ||||
for fetcher in fetchers: | for fetcher in fetchers: | ||||
if fetcher.metadata is None: | if fetcher.metadata is None: | ||||
raise StorageArgumentException( | raise StorageArgumentException( | ||||
"MetadataFetcher.metadata may not be None in metadata_fetcher_add." | "MetadataFetcher.metadata may not be None in metadata_fetcher_add." | ||||
) | ) | ||||
key = self._metadata_fetcher_key(fetcher) | key = self._metadata_fetcher_key(fetcher) | ||||
if key not in self._metadata_fetchers: | if key not in self._metadata_fetchers: | ||||
self._metadata_fetchers[key] = fetcher | self._metadata_fetchers[key] = fetcher | ||||
def metadata_fetcher_get( | def metadata_fetcher_get( | ||||
self, name: str, version: str | self, name: str, version: str | ||||
) -> Optional[MetadataFetcher]: | ) -> Optional[MetadataFetcher]: | ||||
return self._metadata_fetchers.get( | return self._metadata_fetchers.get( | ||||
self._metadata_fetcher_key(MetadataFetcher(name=name, version=version)) | self._metadata_fetcher_key(MetadataFetcher(name=name, version=version)) | ||||
) | ) | ||||
def metadata_authority_add(self, authorities: Iterable[MetadataAuthority]) -> None: | def metadata_authority_add(self, authorities: List[MetadataAuthority]) -> None: | ||||
authorities = list(authorities) | |||||
self.journal_writer.metadata_authority_add(authorities) | self.journal_writer.metadata_authority_add(authorities) | ||||
for authority in authorities: | for authority in authorities: | ||||
if authority.metadata is None: | if authority.metadata is None: | ||||
raise StorageArgumentException( | raise StorageArgumentException( | ||||
"MetadataAuthority.metadata may not be None in " | "MetadataAuthority.metadata may not be None in " | ||||
"metadata_authority_add." | "metadata_authority_add." | ||||
) | ) | ||||
key = self._metadata_authority_key(authority) | key = self._metadata_authority_key(authority) | ||||
Show All 39 Lines | def diff_directories(self, from_dir, to_dir, track_renaming=False): | ||||
raise NotImplementedError("InMemoryStorage.diff_directories") | raise NotImplementedError("InMemoryStorage.diff_directories") | ||||
def diff_revisions(self, from_rev, to_rev, track_renaming=False): | def diff_revisions(self, from_rev, to_rev, track_renaming=False): | ||||
raise NotImplementedError("InMemoryStorage.diff_revisions") | raise NotImplementedError("InMemoryStorage.diff_revisions") | ||||
def diff_revision(self, revision, track_renaming=False): | def diff_revision(self, revision, track_renaming=False): | ||||
raise NotImplementedError("InMemoryStorage.diff_revision") | raise NotImplementedError("InMemoryStorage.diff_revision") | ||||
def clear_buffers(self, object_types: Optional[Iterable[str]] = None) -> None: | def clear_buffers(self, object_types: Optional[List[str]] = None) -> None: | ||||
"""Do nothing | """Do nothing | ||||
""" | """ | ||||
return None | return None | ||||
def flush(self, object_types: Optional[Iterable[str]] = None) -> Dict: | def flush(self, object_types: Optional[List[str]] = None) -> Dict: | ||||
return {} | return {} |