Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/storage.py
Show First 20 Lines • Show All 730 Lines • ▼ Show 20 Lines | def snapshot_get_by_origin_visit(self, origin, visit, db=None, cur=None): | ||||
snapshot_id = db.snapshot_get_by_origin_visit(origin, visit, cur) | snapshot_id = db.snapshot_get_by_origin_visit(origin, visit, cur) | ||||
if snapshot_id: | if snapshot_id: | ||||
return self.snapshot_get(snapshot_id, db=db, cur=cur) | return self.snapshot_get(snapshot_id, db=db, cur=cur) | ||||
return None | return None | ||||
@timed | @timed | ||||
@db_transaction(statement_timeout=4000) | |||||
def snapshot_get_latest(self, origin, allowed_statuses=None, db=None, cur=None): | |||||
if isinstance(origin, int): | |||||
origin = self.origin_get({"id": origin}, db=db, cur=cur) | |||||
if not origin: | |||||
return | |||||
origin = origin["url"] | |||||
origin_visit = self.origin_visit_get_latest( | |||||
origin, | |||||
allowed_statuses=allowed_statuses, | |||||
require_snapshot=True, | |||||
db=db, | |||||
cur=cur, | |||||
) | |||||
if origin_visit and origin_visit["snapshot"]: | |||||
snapshot = self.snapshot_get(origin_visit["snapshot"], db=db, cur=cur) | |||||
if not snapshot: | |||||
raise StorageArgumentException( | |||||
"last origin visit references an unknown snapshot" | |||||
) | |||||
return snapshot | |||||
@timed | |||||
@db_transaction(statement_timeout=2000) | @db_transaction(statement_timeout=2000) | ||||
def snapshot_count_branches(self, snapshot_id, db=None, cur=None): | def snapshot_count_branches(self, snapshot_id, db=None, cur=None): | ||||
return dict([bc for bc in db.snapshot_count_branches(snapshot_id, cur)]) | return dict([bc for bc in db.snapshot_count_branches(snapshot_id, cur)]) | ||||
@timed | @timed | ||||
@db_transaction(statement_timeout=2000) | @db_transaction(statement_timeout=2000) | ||||
def snapshot_get_branches( | def snapshot_get_branches( | ||||
self, | self, | ||||
▲ Show 20 Lines • Show All 560 Lines • Show Last 20 Lines |