Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/cassandra/storage.py
Show First 20 Lines • Show All 140 Lines • ▼ Show 20 Lines | def _content_add(self, contents: List[Content], with_data: bool) -> Dict: | ||||
"content:add": content_add, | "content:add": content_add, | ||||
} | } | ||||
if with_data: | if with_data: | ||||
summary["content:add:bytes"] = content_add_bytes | summary["content:add:bytes"] = content_add_bytes | ||||
return summary | return summary | ||||
def content_add(self, content: Iterable[Content]) -> Dict: | def content_add(self, content: List[Content]) -> Dict: | ||||
contents = [attr.evolve(c, ctime=now()) for c in content] | contents = [attr.evolve(c, ctime=now()) for c in content] | ||||
return self._content_add(list(contents), with_data=True) | return self._content_add(list(contents), with_data=True) | ||||
def content_update(self, content, keys=[]): | def content_update(self, content, keys=[]): | ||||
raise NotImplementedError( | raise NotImplementedError( | ||||
"content_update is not supported by the Cassandra backend" | "content_update is not supported by the Cassandra backend" | ||||
) | ) | ||||
def content_add_metadata(self, content: Iterable[Content]) -> Dict: | def content_add_metadata(self, content: List[Content]) -> Dict: | ||||
return self._content_add(list(content), with_data=False) | return self._content_add(content, with_data=False) | ||||
def content_get(self, content): | def content_get(self, content): | ||||
if len(content) > BULK_BLOCK_CONTENT_LEN_MAX: | if len(content) > BULK_BLOCK_CONTENT_LEN_MAX: | ||||
raise StorageArgumentException( | raise StorageArgumentException( | ||||
"Sending at most %s contents." % BULK_BLOCK_CONTENT_LEN_MAX | "Sending at most %s contents." % BULK_BLOCK_CONTENT_LEN_MAX | ||||
) | ) | ||||
yield from self.objstorage.content_get(content) | yield from self.objstorage.content_get(content) | ||||
▲ Show 20 Lines • Show All 107 Lines • ▼ Show 20 Lines | def _skipped_content_get_from_hash(self, algo, hash_) -> Iterable: | ||||
# Query the main table ('content'). | # Query the main table ('content'). | ||||
res = self._cql_runner.skipped_content_get_from_token(token) | res = self._cql_runner.skipped_content_get_from_token(token) | ||||
for row in res: | for row in res: | ||||
# re-check the the hash (in case of murmur3 collision) | # re-check the the hash (in case of murmur3 collision) | ||||
if getattr(row, algo) == hash_: | if getattr(row, algo) == hash_: | ||||
yield row | yield row | ||||
def _skipped_content_add(self, contents: Iterable[SkippedContent]) -> Dict: | def _skipped_content_add(self, contents: List[SkippedContent]) -> Dict: | ||||
# Filter-out content already in the database. | # Filter-out content already in the database. | ||||
contents = [ | contents = [ | ||||
c | c | ||||
for c in contents | for c in contents | ||||
if not self._cql_runner.skipped_content_get_from_pk(c.to_dict()) | if not self._cql_runner.skipped_content_get_from_pk(c.to_dict()) | ||||
] | ] | ||||
self.journal_writer.skipped_content_add(contents) | self.journal_writer.skipped_content_add(contents) | ||||
for content in contents: | for content in contents: | ||||
# Compute token of the row in the main table | # Compute token of the row in the main table | ||||
(token, insertion_finalizer) = self._cql_runner.skipped_content_add_prepare( | (token, insertion_finalizer) = self._cql_runner.skipped_content_add_prepare( | ||||
content | content | ||||
) | ) | ||||
# Then add to index tables | # Then add to index tables | ||||
for algo in HASH_ALGORITHMS: | for algo in HASH_ALGORITHMS: | ||||
self._cql_runner.skipped_content_index_add_one(algo, content, token) | self._cql_runner.skipped_content_index_add_one(algo, content, token) | ||||
# Then to the main table | # Then to the main table | ||||
insertion_finalizer() | insertion_finalizer() | ||||
return {"skipped_content:add": len(contents)} | return {"skipped_content:add": len(contents)} | ||||
def skipped_content_add(self, content: Iterable[SkippedContent]) -> Dict: | def skipped_content_add(self, content: List[SkippedContent]) -> Dict: | ||||
contents = [attr.evolve(c, ctime=now()) for c in content] | contents = [attr.evolve(c, ctime=now()) for c in content] | ||||
return self._skipped_content_add(contents) | return self._skipped_content_add(contents) | ||||
def skipped_content_missing(self, contents): | def skipped_content_missing(self, contents): | ||||
for content in contents: | for content in contents: | ||||
if not self._cql_runner.skipped_content_get_from_pk(content): | if not self._cql_runner.skipped_content_get_from_pk(content): | ||||
yield {algo: content[algo] for algo in DEFAULT_ALGORITHMS} | yield {algo: content[algo] for algo in DEFAULT_ALGORITHMS} | ||||
def directory_add(self, directories: Iterable[Directory]) -> Dict: | def directory_add(self, directories: List[Directory]) -> Dict: | ||||
directories = list(directories) | |||||
# Filter out directories that are already inserted. | # Filter out directories that are already inserted. | ||||
missing = self.directory_missing([dir_.id for dir_ in directories]) | missing = self.directory_missing([dir_.id for dir_ in directories]) | ||||
directories = [dir_ for dir_ in directories if dir_.id in missing] | directories = [dir_ for dir_ in directories if dir_.id in missing] | ||||
self.journal_writer.directory_add(directories) | self.journal_writer.directory_add(directories) | ||||
for directory in directories: | for directory in directories: | ||||
# Add directory entries to the 'directory_entry' table | # Add directory entries to the 'directory_entry' table | ||||
▲ Show 20 Lines • Show All 86 Lines • ▼ Show 20 Lines | def _directory_entry_get_by_path(self, directory, paths, prefix): | ||||
) | ) | ||||
def directory_ls(self, directory, recursive=False): | def directory_ls(self, directory, recursive=False): | ||||
yield from self._directory_ls(directory, recursive) | yield from self._directory_ls(directory, recursive) | ||||
def directory_get_random(self): | def directory_get_random(self): | ||||
return self._cql_runner.directory_get_random().id | return self._cql_runner.directory_get_random().id | ||||
def revision_add(self, revisions: Iterable[Revision]) -> Dict: | def revision_add(self, revisions: List[Revision]) -> Dict: | ||||
revisions = list(revisions) | |||||
# Filter-out revisions already in the database | # Filter-out revisions already in the database | ||||
missing = self.revision_missing([rev.id for rev in revisions]) | missing = self.revision_missing([rev.id for rev in revisions]) | ||||
revisions = [rev for rev in revisions if rev.id in missing] | revisions = [rev for rev in revisions if rev.id in missing] | ||||
self.journal_writer.revision_add(revisions) | self.journal_writer.revision_add(revisions) | ||||
for revision in revisions: | for revision in revisions: | ||||
revobject = converters.revision_to_db(revision) | revobject = converters.revision_to_db(revision) | ||||
if revobject: | if revobject: | ||||
▲ Show 20 Lines • Show All 72 Lines • ▼ Show 20 Lines | class CassandraStorage: | ||||
def revision_shortlog(self, revisions, limit=None): | def revision_shortlog(self, revisions, limit=None): | ||||
seen = set() | seen = set() | ||||
yield from self._get_parent_revs(revisions, seen, limit, True) | yield from self._get_parent_revs(revisions, seen, limit, True) | ||||
def revision_get_random(self): | def revision_get_random(self): | ||||
return self._cql_runner.revision_get_random().id | return self._cql_runner.revision_get_random().id | ||||
def release_add(self, releases: Iterable[Release]) -> Dict: | def release_add(self, releases: List[Release]) -> Dict: | ||||
to_add = [] | to_add = [] | ||||
for rel in releases: | for rel in releases: | ||||
if rel not in to_add: | if rel not in to_add: | ||||
to_add.append(rel) | to_add.append(rel) | ||||
missing = set(self.release_missing([rel.id for rel in to_add])) | missing = set(self.release_missing([rel.id for rel in to_add])) | ||||
to_add = [rel for rel in to_add if rel.id in missing] | to_add = [rel for rel in to_add if rel.id in missing] | ||||
self.journal_writer.release_add(to_add) | self.journal_writer.release_add(to_add) | ||||
Show All 15 Lines | def release_get(self, releases): | ||||
rels[row.id] = release.to_dict() | rels[row.id] = release.to_dict() | ||||
for rel_id in releases: | for rel_id in releases: | ||||
yield rels.get(rel_id) | yield rels.get(rel_id) | ||||
def release_get_random(self): | def release_get_random(self): | ||||
return self._cql_runner.release_get_random().id | return self._cql_runner.release_get_random().id | ||||
def snapshot_add(self, snapshots: Iterable[Snapshot]) -> Dict: | def snapshot_add(self, snapshots: List[Snapshot]) -> Dict: | ||||
snapshots = list(snapshots) | |||||
missing = self._cql_runner.snapshot_missing([snp.id for snp in snapshots]) | missing = self._cql_runner.snapshot_missing([snp.id for snp in snapshots]) | ||||
snapshots = [snp for snp in snapshots if snp.id in missing] | snapshots = [snp for snp in snapshots if snp.id in missing] | ||||
for snapshot in snapshots: | for snapshot in snapshots: | ||||
self.journal_writer.snapshot_add([snapshot]) | self.journal_writer.snapshot_add([snapshot]) | ||||
# Add branches | # Add branches | ||||
for (branch_name, branch) in snapshot.branches.items(): | for (branch_name, branch) in snapshot.branches.items(): | ||||
▲ Show 20 Lines • Show All 125 Lines • ▼ Show 20 Lines | def object_find_by_sha1_git(self, ids): | ||||
missing_ids.remove(sha1_git) | missing_ids.remove(sha1_git) | ||||
if not missing_ids: | if not missing_ids: | ||||
# We found everything, skipping the next queries. | # We found everything, skipping the next queries. | ||||
break | break | ||||
return results | return results | ||||
def origin_get(self, origins: Iterable[str]) -> Iterable[Optional[Origin]]: | def origin_get(self, origins: List[str]) -> Iterable[Optional[Origin]]: | ||||
return [self.origin_get_one(origin) for origin in origins] | return [self.origin_get_one(origin) for origin in origins] | ||||
def origin_get_one(self, origin_url: str) -> Optional[Origin]: | def origin_get_one(self, origin_url: str) -> Optional[Origin]: | ||||
"""Given an origin url, return the origin if it exists, None otherwise | """Given an origin url, return the origin if it exists, None otherwise | ||||
""" | """ | ||||
rows = list(self._cql_runner.origin_get_by_url(origin_url)) | rows = list(self._cql_runner.origin_get_by_url(origin_url)) | ||||
if rows: | if rows: | ||||
▲ Show 20 Lines • Show All 44 Lines • ▼ Show 20 Lines | ): | ||||
else: | else: | ||||
origins = [orig for orig in origins if url_pattern in orig.url] | origins = [orig for orig in origins if url_pattern in orig.url] | ||||
if with_visit: | if with_visit: | ||||
origins = [orig for orig in origins if orig.next_visit_id > 1] | origins = [orig for orig in origins if orig.next_visit_id > 1] | ||||
return [{"url": orig.url,} for orig in origins[offset : offset + limit]] | return [{"url": orig.url,} for orig in origins[offset : offset + limit]] | ||||
def origin_add(self, origins: Iterable[Origin]) -> Dict[str, int]: | def origin_add(self, origins: List[Origin]) -> Dict[str, int]: | ||||
origins = list(origins) | |||||
to_add = [ori for ori in origins if self.origin_get_one(ori.url) is None] | to_add = [ori for ori in origins if self.origin_get_one(ori.url) is None] | ||||
self.journal_writer.origin_add(to_add) | self.journal_writer.origin_add(to_add) | ||||
for origin in to_add: | for origin in to_add: | ||||
self._cql_runner.origin_add_one(origin) | self._cql_runner.origin_add_one(origin) | ||||
return {"origin:add": len(to_add)} | return {"origin:add": len(to_add)} | ||||
def origin_visit_add(self, visits: Iterable[OriginVisit]) -> Iterable[OriginVisit]: | def origin_visit_add(self, visits: List[OriginVisit]) -> Iterable[OriginVisit]: | ||||
for visit in visits: | for visit in visits: | ||||
origin = self.origin_get_one(visit.origin) | origin = self.origin_get_one(visit.origin) | ||||
if not origin: # Cannot add a visit without an origin | if not origin: # Cannot add a visit without an origin | ||||
raise StorageArgumentException("Unknown origin %s", visit.origin) | raise StorageArgumentException("Unknown origin %s", visit.origin) | ||||
all_visits = [] | all_visits = [] | ||||
nb_visits = 0 | nb_visits = 0 | ||||
for visit in visits: | for visit in visits: | ||||
Show All 19 Lines | def origin_visit_add(self, visits: List[OriginVisit]) -> Iterable[OriginVisit]: | ||||
return all_visits | return all_visits | ||||
def _origin_visit_status_add(self, visit_status: OriginVisitStatus) -> None: | def _origin_visit_status_add(self, visit_status: OriginVisitStatus) -> None: | ||||
"""Add an origin visit status""" | """Add an origin visit status""" | ||||
self.journal_writer.origin_visit_status_add([visit_status]) | self.journal_writer.origin_visit_status_add([visit_status]) | ||||
self._cql_runner.origin_visit_status_add_one(visit_status) | self._cql_runner.origin_visit_status_add_one(visit_status) | ||||
def origin_visit_status_add( | def origin_visit_status_add(self, visit_statuses: List[OriginVisitStatus]) -> None: | ||||
self, visit_statuses: Iterable[OriginVisitStatus] | |||||
) -> None: | |||||
# First round to check existence (fail early if any is ko) | # First round to check existence (fail early if any is ko) | ||||
for visit_status in visit_statuses: | for visit_status in visit_statuses: | ||||
origin_url = self.origin_get_one(visit_status.origin) | origin_url = self.origin_get_one(visit_status.origin) | ||||
if not origin_url: | if not origin_url: | ||||
raise StorageArgumentException(f"Unknown origin {visit_status.origin}") | raise StorageArgumentException(f"Unknown origin {visit_status.origin}") | ||||
for visit_status in visit_statuses: | for visit_status in visit_statuses: | ||||
self._origin_visit_status_add(visit_status) | self._origin_visit_status_add(visit_status) | ||||
▲ Show 20 Lines • Show All 160 Lines • ▼ Show 20 Lines | def stat_counters(self): | ||||
) | ) | ||||
stats = {key: 0 for key in keys} | stats = {key: 0 for key in keys} | ||||
stats.update({row.object_type: row.count for row in rows}) | stats.update({row.object_type: row.count for row in rows}) | ||||
return stats | return stats | ||||
def refresh_stat_counters(self): | def refresh_stat_counters(self): | ||||
pass | pass | ||||
def raw_extrinsic_metadata_add( | def raw_extrinsic_metadata_add(self, metadata: List[RawExtrinsicMetadata]) -> None: | ||||
self, metadata: Iterable[RawExtrinsicMetadata] | |||||
) -> None: | |||||
metadata = list(metadata) | |||||
self.journal_writer.raw_extrinsic_metadata_add(metadata) | self.journal_writer.raw_extrinsic_metadata_add(metadata) | ||||
for metadata_entry in metadata: | for metadata_entry in metadata: | ||||
if not self._cql_runner.metadata_authority_get( | if not self._cql_runner.metadata_authority_get( | ||||
metadata_entry.authority.type.value, metadata_entry.authority.url | metadata_entry.authority.type.value, metadata_entry.authority.url | ||||
): | ): | ||||
raise StorageArgumentException( | raise StorageArgumentException( | ||||
f"Unknown authority {metadata_entry.authority}" | f"Unknown authority {metadata_entry.authority}" | ||||
) | ) | ||||
▲ Show 20 Lines • Show All 120 Lines • ▼ Show 20 Lines | ) -> Dict[str, Union[Optional[bytes], List[RawExtrinsicMetadata]]]: | ||||
else: | else: | ||||
next_page_token = None | next_page_token = None | ||||
return { | return { | ||||
"next_page_token": next_page_token, | "next_page_token": next_page_token, | ||||
"results": results, | "results": results, | ||||
} | } | ||||
def metadata_fetcher_add(self, fetchers: Iterable[MetadataFetcher]) -> None: | def metadata_fetcher_add(self, fetchers: List[MetadataFetcher]) -> None: | ||||
fetchers = list(fetchers) | |||||
self.journal_writer.metadata_fetcher_add(fetchers) | self.journal_writer.metadata_fetcher_add(fetchers) | ||||
for fetcher in fetchers: | for fetcher in fetchers: | ||||
self._cql_runner.metadata_fetcher_add( | self._cql_runner.metadata_fetcher_add( | ||||
fetcher.name, | fetcher.name, | ||||
fetcher.version, | fetcher.version, | ||||
json.dumps(map_optional(dict, fetcher.metadata)), | json.dumps(map_optional(dict, fetcher.metadata)), | ||||
) | ) | ||||
def metadata_fetcher_get( | def metadata_fetcher_get( | ||||
self, name: str, version: str | self, name: str, version: str | ||||
) -> Optional[MetadataFetcher]: | ) -> Optional[MetadataFetcher]: | ||||
fetcher = self._cql_runner.metadata_fetcher_get(name, version) | fetcher = self._cql_runner.metadata_fetcher_get(name, version) | ||||
if fetcher: | if fetcher: | ||||
return MetadataFetcher( | return MetadataFetcher( | ||||
name=fetcher.name, | name=fetcher.name, | ||||
version=fetcher.version, | version=fetcher.version, | ||||
metadata=json.loads(fetcher.metadata), | metadata=json.loads(fetcher.metadata), | ||||
) | ) | ||||
else: | else: | ||||
return None | return None | ||||
def metadata_authority_add(self, authorities: Iterable[MetadataAuthority]) -> None: | def metadata_authority_add(self, authorities: List[MetadataAuthority]) -> None: | ||||
authorities = list(authorities) | |||||
self.journal_writer.metadata_authority_add(authorities) | self.journal_writer.metadata_authority_add(authorities) | ||||
for authority in authorities: | for authority in authorities: | ||||
self._cql_runner.metadata_authority_add( | self._cql_runner.metadata_authority_add( | ||||
authority.url, | authority.url, | ||||
authority.type.value, | authority.type.value, | ||||
json.dumps(map_optional(dict, authority.metadata)), | json.dumps(map_optional(dict, authority.metadata)), | ||||
) | ) | ||||
def metadata_authority_get( | def metadata_authority_get( | ||||
self, type: MetadataAuthorityType, url: str | self, type: MetadataAuthorityType, url: str | ||||
) -> Optional[MetadataAuthority]: | ) -> Optional[MetadataAuthority]: | ||||
authority = self._cql_runner.metadata_authority_get(type.value, url) | authority = self._cql_runner.metadata_authority_get(type.value, url) | ||||
if authority: | if authority: | ||||
return MetadataAuthority( | return MetadataAuthority( | ||||
type=MetadataAuthorityType(authority.type), | type=MetadataAuthorityType(authority.type), | ||||
url=authority.url, | url=authority.url, | ||||
metadata=json.loads(authority.metadata), | metadata=json.loads(authority.metadata), | ||||
) | ) | ||||
else: | else: | ||||
return None | return None | ||||
def clear_buffers(self, object_types: Optional[Iterable[str]] = None) -> None: | def clear_buffers(self, object_types: Optional[List[str]] = None) -> None: | ||||
"""Do nothing | """Do nothing | ||||
""" | """ | ||||
return None | return None | ||||
def flush(self, object_types: Optional[Iterable[str]] = None) -> Dict: | def flush(self, object_types: Optional[List[str]] = None) -> Dict: | ||||
return {} | return {} |