Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/cassandra/storage.py
Show First 20 Lines • Show All 186 Lines • ▼ Show 20 Lines | def _content_add(self, contents: List[Content], with_data: bool) -> Dict: | ||||
} | } | ||||
if with_data: | if with_data: | ||||
summary["content:add:bytes"] = content_add_bytes | summary["content:add:bytes"] = content_add_bytes | ||||
return summary | return summary | ||||
def content_add(self, content: List[Content]) -> Dict: | def content_add(self, content: List[Content]) -> Dict: | ||||
contents = [attr.evolve(c, ctime=now()) for c in content] | to_add = { | ||||
(c.sha1, c.sha1_git, c.sha256, c.blake2s256): c for c in content | |||||
}.values() | |||||
contents = [attr.evolve(c, ctime=now()) for c in to_add] | |||||
return self._content_add(list(contents), with_data=True) | return self._content_add(list(contents), with_data=True) | ||||
vlorentz: use a tuple | |||||
Not Done Inline Actionsyou don't need to convert to strings vlorentz: you don't need to convert to strings | |||||
def content_update( | def content_update( | ||||
self, contents: List[Dict[str, Any]], keys: List[str] = [] | self, contents: List[Dict[str, Any]], keys: List[str] = [] | ||||
) -> None: | ) -> None: | ||||
raise NotImplementedError( | raise NotImplementedError( | ||||
Done Inline ActionsNot covered. Should I do something about this? KShivendu: Not covered. Should I do something about this? | |||||
Not Done Inline Actionsit's intentional, see test_cassandra.py vlorentz: it's intentional, see `test_cassandra.py` | |||||
"content_update is not supported by the Cassandra backend" | "content_update is not supported by the Cassandra backend" | ||||
) | ) | ||||
def content_add_metadata(self, content: List[Content]) -> Dict: | def content_add_metadata(self, content: List[Content]) -> Dict: | ||||
return self._content_add(content, with_data=False) | return self._content_add(content, with_data=False) | ||||
def content_get_data(self, content: Sha1) -> Optional[bytes]: | def content_get_data(self, content: Sha1) -> Optional[bytes]: | ||||
# FIXME: Make this method support slicing the `data` | # FIXME: Make this method support slicing the `data` | ||||
▲ Show 20 Lines • Show All 138 Lines • ▼ Show 20 Lines | class CassandraStorage: | ||||
def skipped_content_missing( | def skipped_content_missing( | ||||
self, contents: List[Dict[str, Any]] | self, contents: List[Dict[str, Any]] | ||||
) -> Iterable[Dict[str, Any]]: | ) -> Iterable[Dict[str, Any]]: | ||||
for content in contents: | for content in contents: | ||||
if not self._cql_runner.skipped_content_get_from_pk(content): | if not self._cql_runner.skipped_content_get_from_pk(content): | ||||
yield {algo: content[algo] for algo in DEFAULT_ALGORITHMS} | yield {algo: content[algo] for algo in DEFAULT_ALGORITHMS} | ||||
def directory_add(self, directories: List[Directory]) -> Dict: | def directory_add(self, directories: List[Directory]) -> Dict: | ||||
Not Done Inline Actionsd.id (swhid() is formatted based on the id, so this just adds useless computation) vlorentz: `d.id` (`swhid()` is formatted based on the id, so this just adds useless computation) | |||||
to_add = {d.id: d for d in directories}.values() | |||||
# Filter out directories that are already inserted. | # Filter out directories that are already inserted. | ||||
missing = self.directory_missing([dir_.id for dir_ in directories]) | missing = self.directory_missing([dir_.id for dir_ in to_add]) | ||||
directories = [dir_ for dir_ in directories if dir_.id in missing] | directories = [dir_ for dir_ in directories if dir_.id in missing] | ||||
self.journal_writer.directory_add(directories) | self.journal_writer.directory_add(directories) | ||||
for directory in directories: | for directory in directories: | ||||
# Add directory entries to the 'directory_entry' table | # Add directory entries to the 'directory_entry' table | ||||
for entry in directory.entries: | for entry in directory.entries: | ||||
self._cql_runner.directory_entry_add_one( | self._cql_runner.directory_entry_add_one( | ||||
▲ Show 20 Lines • Show All 107 Lines • ▼ Show 20 Lines | class CassandraStorage: | ||||
def directory_get_random(self) -> Sha1Git: | def directory_get_random(self) -> Sha1Git: | ||||
directory = self._cql_runner.directory_get_random() | directory = self._cql_runner.directory_get_random() | ||||
assert directory, "Could not find any directory" | assert directory, "Could not find any directory" | ||||
return directory.id | return directory.id | ||||
def revision_add(self, revisions: List[Revision]) -> Dict: | def revision_add(self, revisions: List[Revision]) -> Dict: | ||||
# Filter-out revisions already in the database | # Filter-out revisions already in the database | ||||
missing = self.revision_missing([rev.id for rev in revisions]) | to_add = {r.id: r for r in revisions}.values() | ||||
missing = self.revision_missing([rev.id for rev in to_add]) | |||||
Not Done Inline Actionsr.id vlorentz: `r.id` | |||||
revisions = [rev for rev in revisions if rev.id in missing] | revisions = [rev for rev in revisions if rev.id in missing] | ||||
self.journal_writer.revision_add(revisions) | self.journal_writer.revision_add(revisions) | ||||
for revision in revisions: | for revision in revisions: | ||||
revobject = converters.revision_to_db(revision) | revobject = converters.revision_to_db(revision) | ||||
if revobject: | if revobject: | ||||
# Add parents first | # Add parents first | ||||
for (rank, parent) in enumerate(revision.parents): | for (rank, parent) in enumerate(revision.parents): | ||||
▲ Show 20 Lines • Show All 93 Lines • ▼ Show 20 Lines | ) -> Iterable[Optional[Tuple[Sha1Git, Tuple[Sha1Git, ...]]]]: | ||||
yield from self._get_parent_revs(revisions, seen, limit, True) | yield from self._get_parent_revs(revisions, seen, limit, True) | ||||
def revision_get_random(self) -> Sha1Git: | def revision_get_random(self) -> Sha1Git: | ||||
revision = self._cql_runner.revision_get_random() | revision = self._cql_runner.revision_get_random() | ||||
assert revision, "Could not find any revision" | assert revision, "Could not find any revision" | ||||
return revision.id | return revision.id | ||||
def release_add(self, releases: List[Release]) -> Dict: | def release_add(self, releases: List[Release]) -> Dict: | ||||
to_add = [] | to_add = {r.id: r for r in releases}.values() | ||||
for rel in releases: | |||||
if rel not in to_add: | |||||
to_add.append(rel) | |||||
missing = set(self.release_missing([rel.id for rel in to_add])) | missing = set(self.release_missing([rel.id for rel in to_add])) | ||||
to_add = [rel for rel in to_add if rel.id in missing] | releases = [rel for rel in to_add if rel.id in missing] | ||||
self.journal_writer.release_add(releases) | |||||
self.journal_writer.release_add(to_add) | |||||
for release in to_add: | for release in releases: | ||||
if release: | if release: | ||||
self._cql_runner.release_add_one(converters.release_to_db(release)) | self._cql_runner.release_add_one(converters.release_to_db(release)) | ||||
return {"release:add": len(to_add)} | return {"release:add": len(releases)} | ||||
def release_missing(self, releases: List[Sha1Git]) -> Iterable[Sha1Git]: | def release_missing(self, releases: List[Sha1Git]) -> Iterable[Sha1Git]: | ||||
return self._cql_runner.release_missing(releases) | return self._cql_runner.release_missing(releases) | ||||
def release_get(self, releases: List[Sha1Git]) -> List[Optional[Release]]: | def release_get(self, releases: List[Sha1Git]) -> List[Optional[Release]]: | ||||
rows = self._cql_runner.release_get(releases) | rows = self._cql_runner.release_get(releases) | ||||
rels: Dict[Sha1Git, Release] = {} | rels: Dict[Sha1Git, Release] = {} | ||||
for row in rows: | for row in rows: | ||||
release = converters.release_from_db(row) | release = converters.release_from_db(row) | ||||
rels[row.id] = release | rels[row.id] = release | ||||
return [rels.get(rel_id) for rel_id in releases] | return [rels.get(rel_id) for rel_id in releases] | ||||
def release_get_random(self) -> Sha1Git: | def release_get_random(self) -> Sha1Git: | ||||
release = self._cql_runner.release_get_random() | release = self._cql_runner.release_get_random() | ||||
assert release, "Could not find any release" | assert release, "Could not find any release" | ||||
return release.id | return release.id | ||||
def snapshot_add(self, snapshots: List[Snapshot]) -> Dict: | def snapshot_add(self, snapshots: List[Snapshot]) -> Dict: | ||||
missing = self._cql_runner.snapshot_missing([snp.id for snp in snapshots]) | to_add = {s.id: s for s in snapshots}.values() | ||||
missing = self._cql_runner.snapshot_missing([snp.id for snp in to_add]) | |||||
Not Done Inline Actionss.id vlorentz: `s.id` | |||||
snapshots = [snp for snp in snapshots if snp.id in missing] | snapshots = [snp for snp in snapshots if snp.id in missing] | ||||
for snapshot in snapshots: | for snapshot in snapshots: | ||||
self.journal_writer.snapshot_add([snapshot]) | self.journal_writer.snapshot_add([snapshot]) | ||||
# Add branches | # Add branches | ||||
for (branch_name, branch) in snapshot.branches.items(): | for (branch_name, branch) in snapshot.branches.items(): | ||||
if branch is None: | if branch is None: | ||||
▲ Show 20 Lines • Show All 252 Lines • ▼ Show 20 Lines | class CassandraStorage: | ||||
def origin_count( | def origin_count( | ||||
self, url_pattern: str, regexp: bool = False, with_visit: bool = False | self, url_pattern: str, regexp: bool = False, with_visit: bool = False | ||||
) -> int: | ) -> int: | ||||
raise NotImplementedError( | raise NotImplementedError( | ||||
"The Cassandra backend does not implement origin_count" | "The Cassandra backend does not implement origin_count" | ||||
) | ) | ||||
def origin_add(self, origins: List[Origin]) -> Dict[str, int]: | def origin_add(self, origins: List[Origin]) -> Dict[str, int]: | ||||
to_add = [ori for ori in origins if self.origin_get_one(ori.url) is None] | to_add = {o.url: o for o in origins}.values() | ||||
Not Done Inline Actionso.url vlorentz: `o.url` | |||||
# keep only one occurrence of each given origin while keeping the list | origins = [ori for ori in to_add if self.origin_get_one(ori.url) is None] | ||||
# sorted as originally given | |||||
to_add = sorted(set(to_add), key=to_add.index) | self.journal_writer.origin_add(origins) | ||||
self.journal_writer.origin_add(to_add) | for origin in origins: | ||||
Not Done Inline Actionsno need for set() now vlorentz: no need for `set()` now | |||||
Not Done Inline ActionsThe comment is outdated, and this line actually does nothing now (it sorts a list using the list's own order...) vlorentz: The comment is outdated, and this line actually does nothing now (it sorts a list using the… | |||||
for origin in to_add: | |||||
self._cql_runner.origin_add_one( | self._cql_runner.origin_add_one( | ||||
OriginRow(sha1=hash_url(origin.url), url=origin.url, next_visit_id=1) | OriginRow(sha1=hash_url(origin.url), url=origin.url, next_visit_id=1) | ||||
) | ) | ||||
return {"origin:add": len(to_add)} | return {"origin:add": len(origins)} | ||||
def origin_visit_add(self, visits: List[OriginVisit]) -> Iterable[OriginVisit]: | def origin_visit_add(self, visits: List[OriginVisit]) -> Iterable[OriginVisit]: | ||||
for visit in visits: | for visit in visits: | ||||
origin = self.origin_get_one(visit.origin) | origin = self.origin_get_one(visit.origin) | ||||
if not origin: # Cannot add a visit without an origin | if not origin: # Cannot add a visit without an origin | ||||
raise StorageArgumentException("Unknown origin %s", visit.origin) | raise StorageArgumentException("Unknown origin %s", visit.origin) | ||||
all_visits = [] | all_visits = [] | ||||
▲ Show 20 Lines • Show All 501 Lines • Show Last 20 Lines |
use a tuple