Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/postgresql/storage.py
Show First 20 Lines • Show All 204 Lines • ▼ Show 20 Lines | def _content_add_metadata(self, db, cur, content): | ||||
raise HashCollision( | raise HashCollision( | ||||
hash_name, hash_id, collision_contents_hashes | hash_name, hash_id, collision_contents_hashes | ||||
) from None | ) from None | ||||
else: | else: | ||||
raise | raise | ||||
@timed | @timed | ||||
@process_metrics | @process_metrics | ||||
def content_add(self, content: List[Content]) -> Dict: | def content_add(self, content: List[Content]) -> Dict[str, int]: | ||||
ctime = now() | ctime = now() | ||||
contents = [attr.evolve(c, ctime=ctime) for c in content] | contents = [attr.evolve(c, ctime=ctime) for c in content] | ||||
# Must add to the objstorage before the DB and journal. Otherwise: | # Must add to the objstorage before the DB and journal. Otherwise: | ||||
# 1. in case of a crash the DB may "believe" we have the content, but | # 1. in case of a crash the DB may "believe" we have the content, but | ||||
# we didn't have time to write to the objstorage before the crash | # we didn't have time to write to the objstorage before the crash | ||||
# 2. the objstorage mirroring, which reads from the journal, may attempt to | # 2. the objstorage mirroring, which reads from the journal, may attempt to | ||||
▲ Show 20 Lines • Show All 192 Lines • ▼ Show 20 Lines | def _skipped_content_add_metadata(self, db, cur, content: List[SkippedContent]): | ||||
# move metadata in place | # move metadata in place | ||||
db.skipped_content_add_from_temp(cur) | db.skipped_content_add_from_temp(cur) | ||||
@timed | @timed | ||||
@process_metrics | @process_metrics | ||||
@db_transaction() | @db_transaction() | ||||
def skipped_content_add( | def skipped_content_add( | ||||
self, content: List[SkippedContent], db=None, cur=None | self, content: List[SkippedContent], db=None, cur=None | ||||
) -> Dict: | ) -> Dict[str, int]: | ||||
ctime = now() | ctime = now() | ||||
content = [attr.evolve(c, ctime=ctime) for c in content] | content = [attr.evolve(c, ctime=ctime) for c in content] | ||||
missing_contents = self.skipped_content_missing( | missing_contents = self.skipped_content_missing( | ||||
(c.to_dict() for c in content), db=db, cur=cur, | (c.to_dict() for c in content), db=db, cur=cur, | ||||
) | ) | ||||
content = [ | content = [ | ||||
c | c | ||||
Show All 21 Lines | class Storage: | ||||
) -> Iterable[Dict[str, Any]]: | ) -> Iterable[Dict[str, Any]]: | ||||
contents = list(contents) | contents = list(contents) | ||||
for content in db.skipped_content_missing(contents, cur): | for content in db.skipped_content_missing(contents, cur): | ||||
yield dict(zip(db.content_hash_keys, content)) | yield dict(zip(db.content_hash_keys, content)) | ||||
@timed | @timed | ||||
@process_metrics | @process_metrics | ||||
@db_transaction() | @db_transaction() | ||||
def directory_add(self, directories: List[Directory], db=None, cur=None) -> Dict: | def directory_add( | ||||
self, directories: List[Directory], db=None, cur=None | |||||
) -> Dict[str, int]: | |||||
summary = {"directory:add": 0} | summary = {"directory:add": 0} | ||||
dirs = set() | dirs = set() | ||||
dir_entries: Dict[str, defaultdict] = { | dir_entries: Dict[str, defaultdict] = { | ||||
"file": defaultdict(list), | "file": defaultdict(list), | ||||
"dir": defaultdict(list), | "dir": defaultdict(list), | ||||
"rev": defaultdict(list), | "rev": defaultdict(list), | ||||
} | } | ||||
▲ Show 20 Lines • Show All 74 Lines • ▼ Show 20 Lines | class Storage: | ||||
@timed | @timed | ||||
@db_transaction() | @db_transaction() | ||||
def directory_get_random(self, db=None, cur=None) -> Sha1Git: | def directory_get_random(self, db=None, cur=None) -> Sha1Git: | ||||
return db.directory_get_random(cur) | return db.directory_get_random(cur) | ||||
@timed | @timed | ||||
@process_metrics | @process_metrics | ||||
@db_transaction() | @db_transaction() | ||||
def revision_add(self, revisions: List[Revision], db=None, cur=None) -> Dict: | def revision_add( | ||||
self, revisions: List[Revision], db=None, cur=None | |||||
) -> Dict[str, int]: | |||||
summary = {"revision:add": 0} | summary = {"revision:add": 0} | ||||
revisions_missing = set( | revisions_missing = set( | ||||
self.revision_missing( | self.revision_missing( | ||||
set(revision.id for revision in revisions), db=db, cur=cur | set(revision.id for revision in revisions), db=db, cur=cur | ||||
) | ) | ||||
) | ) | ||||
▲ Show 20 Lines • Show All 122 Lines • ▼ Show 20 Lines | def extid_add(self, ids: List[ExtID], db=None, cur=None) -> Dict[str, int]: | ||||
# move metadata in place | # move metadata in place | ||||
db.extid_add_from_temp(cur) | db.extid_add_from_temp(cur) | ||||
return {"extid:add": len(extid)} | return {"extid:add": len(extid)} | ||||
@timed | @timed | ||||
@process_metrics | @process_metrics | ||||
@db_transaction() | @db_transaction() | ||||
def release_add(self, releases: List[Release], db=None, cur=None) -> Dict: | def release_add(self, releases: List[Release], db=None, cur=None) -> Dict[str, int]: | ||||
summary = {"release:add": 0} | summary = {"release:add": 0} | ||||
release_ids = set(release.id for release in releases) | release_ids = set(release.id for release in releases) | ||||
releases_missing = set(self.release_missing(release_ids, db=db, cur=cur)) | releases_missing = set(self.release_missing(release_ids, db=db, cur=cur)) | ||||
if not releases_missing: | if not releases_missing: | ||||
return summary | return summary | ||||
Show All 39 Lines | class Storage: | ||||
@timed | @timed | ||||
@db_transaction() | @db_transaction() | ||||
def release_get_random(self, db=None, cur=None) -> Sha1Git: | def release_get_random(self, db=None, cur=None) -> Sha1Git: | ||||
return db.release_get_random(cur) | return db.release_get_random(cur) | ||||
@timed | @timed | ||||
@process_metrics | @process_metrics | ||||
@db_transaction() | @db_transaction() | ||||
def snapshot_add(self, snapshots: List[Snapshot], db=None, cur=None) -> Dict: | def snapshot_add( | ||||
self, snapshots: List[Snapshot], db=None, cur=None | |||||
) -> Dict[str, int]: | |||||
created_temp_table = False | created_temp_table = False | ||||
count = 0 | count = 0 | ||||
for snapshot in snapshots: | for snapshot in snapshots: | ||||
if not db.snapshot_exists(snapshot.id, cur): | if not db.snapshot_exists(snapshot.id, cur): | ||||
if not created_temp_table: | if not created_temp_table: | ||||
db.mktemp_snapshot_branch(cur) | db.mktemp_snapshot_branch(cur) | ||||
created_temp_table = True | created_temp_table = True | ||||
▲ Show 20 Lines • Show All 720 Lines • Show Last 20 Lines |