Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/in_memory.py
Show First 20 Lines • Show All 355 Lines • ▼ Show 20 Lines | ) -> Iterable[bytes]: | ||||
for content in contents: | for content in contents: | ||||
for (algo, hash_) in content.items(): | for (algo, hash_) in content.items(): | ||||
if algo not in DEFAULT_ALGORITHMS: | if algo not in DEFAULT_ALGORITHMS: | ||||
continue | continue | ||||
if hash_ not in self._content_indexes.get(algo, []): | if hash_ not in self._content_indexes.get(algo, []): | ||||
yield content[key_hash] | yield content[key_hash] | ||||
break | break | ||||
def content_missing_per_sha1(self, contents): | def content_missing_per_sha1(self, contents: List[bytes]) -> Iterable[bytes]: | ||||
for content in contents: | for content in contents: | ||||
if content not in self._content_indexes["sha1"]: | if content not in self._content_indexes["sha1"]: | ||||
yield content | yield content | ||||
def content_missing_per_sha1_git(self, contents): | def content_missing_per_sha1_git( | ||||
self, contents: List[Sha1Git] | |||||
) -> Iterable[Sha1Git]: | |||||
for content in contents: | for content in contents: | ||||
if content not in self._content_indexes["sha1_git"]: | if content not in self._content_indexes["sha1_git"]: | ||||
yield content | yield content | ||||
def content_get_random(self) -> Sha1Git: | def content_get_random(self) -> Sha1Git: | ||||
return random.choice(list(self._content_indexes["sha1_git"])) | return random.choice(list(self._content_indexes["sha1_git"])) | ||||
def _skipped_content_add(self, contents: List[SkippedContent]) -> Dict: | def _skipped_content_add(self, contents: List[SkippedContent]) -> Dict: | ||||
▲ Show 20 Lines • Show All 902 Lines • Show Last 20 Lines |