Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/storage.py
| Show All 17 Lines | |||||
| import psycopg2.pool | import psycopg2.pool | ||||
| import psycopg2.errors | import psycopg2.errors | ||||
| from swh.model.model import ( | from swh.model.model import ( | ||||
| Content, | Content, | ||||
| Directory, | Directory, | ||||
| Origin, | Origin, | ||||
| OriginVisit, | OriginVisit, | ||||
| OriginVisitStatus, | |||||
| Revision, | Revision, | ||||
| Release, | Release, | ||||
| SkippedContent, | SkippedContent, | ||||
| Snapshot, | Snapshot, | ||||
| SHA1_SIZE, | SHA1_SIZE, | ||||
| ) | ) | ||||
| from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex | from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex | ||||
| from swh.storage.objstorage import ObjStorage | from swh.storage.objstorage import ObjStorage | ||||
| ▲ Show 20 Lines • Show All 804 Lines • ▼ Show 20 Lines | ) -> OriginVisit: | ||||
| origin = self.origin_get({"url": origin_url}, db=db, cur=cur) | origin = self.origin_get({"url": origin_url}, db=db, cur=cur) | ||||
| if not origin: # Cannot add a visit without an origin | if not origin: # Cannot add a visit without an origin | ||||
| raise StorageArgumentException("Unknown origin %s", origin_url) | raise StorageArgumentException("Unknown origin %s", origin_url) | ||||
| with convert_validation_exceptions(): | with convert_validation_exceptions(): | ||||
| visit_id = db.origin_visit_add(origin_url, date, type, cur=cur) | visit_id = db.origin_visit_add(origin_url, date, type, cur=cur) | ||||
| status = "ongoing" | |||||
| # We can write to the journal only after inserting to the | # We can write to the journal only after inserting to the | ||||
| # DB, because we want the id of the visit | # DB, because we want the id of the visit | ||||
| visit = OriginVisit.from_dict( | visit = OriginVisit.from_dict( | ||||
| { | { | ||||
| "origin": origin_url, | "origin": origin_url, | ||||
| "date": date, | "date": date, | ||||
| "type": type, | "type": type, | ||||
| "visit": visit_id, | "visit": visit_id, | ||||
| "status": "ongoing", | # TODO: Remove when we remove those fields from the model | ||||
| "status": status, | |||||
| "metadata": None, | "metadata": None, | ||||
| "snapshot": None, | "snapshot": None, | ||||
| } | } | ||||
| ) | ) | ||||
| with convert_validation_exceptions(): | |||||
| visit_status = OriginVisitStatus( | |||||
| origin=origin_url, | |||||
| visit=visit_id, | |||||
| date=date, | |||||
| status=status, | |||||
| snapshot=None, | |||||
| metadata=None, | |||||
| ) | |||||
| self._origin_visit_status_add(visit_status, db=db, cur=cur) | |||||
| self.journal_writer.origin_visit_add(visit) | self.journal_writer.origin_visit_add(visit) | ||||
| send_metric("origin_visit:add", count=1, method_name="origin_visit") | send_metric("origin_visit:add", count=1, method_name="origin_visit") | ||||
| return visit | return visit | ||||
| def _origin_visit_status_add( | |||||
| self, origin_visit_status: OriginVisitStatus, db, cur | |||||
| ) -> None: | |||||
| """Add an origin visit update""" | |||||
| # Inject origin visit update in the schema | |||||
| db.origin_visit_status_add(origin_visit_status, cur=cur) | |||||
| # write to the journal the origin visit update | |||||
| send_metric( | |||||
| "origin_visit_status:add", count=1, method_name="origin_visit_status" | |||||
| ) | |||||
| @timed | @timed | ||||
| @db_transaction() | @db_transaction() | ||||
| def origin_visit_update( | def origin_visit_update( | ||||
| self, | self, | ||||
| origin: str, | origin: str, | ||||
| visit_id: int, | visit_id: int, | ||||
| status: str, | status: str, | ||||
| metadata: Optional[Dict] = None, | metadata: Optional[Dict] = None, | ||||
| Show All 22 Lines | ): | ||||
| if snapshot and snapshot != visit["snapshot"]: | if snapshot and snapshot != visit["snapshot"]: | ||||
| updates["snapshot"] = snapshot | updates["snapshot"] = snapshot | ||||
| if updates: | if updates: | ||||
| with convert_validation_exceptions(): | with convert_validation_exceptions(): | ||||
| updated_visit = OriginVisit.from_dict({**visit, **updates}) | updated_visit = OriginVisit.from_dict({**visit, **updates}) | ||||
| self.journal_writer.origin_visit_update(updated_visit) | self.journal_writer.origin_visit_update(updated_visit) | ||||
| last_visit_status = self._origin_visit_get_updated( | |||||
| origin, visit_id, db=db, cur=cur | |||||
| ) | |||||
| assert last_visit_status is not None | |||||
| with convert_validation_exceptions(): | with convert_validation_exceptions(): | ||||
| db.origin_visit_update(origin_url, visit_id, updates, cur) | visit_status = OriginVisitStatus( | ||||
| origin=origin_url, | |||||
| visit=visit_id, | |||||
| date=date or now(), | |||||
| status=status, | |||||
| snapshot=snapshot or last_visit_status["snapshot"], | |||||
| metadata=metadata or last_visit_status["metadata"], | |||||
| ) | |||||
| self._origin_visit_status_add(visit_status, db=db, cur=cur) | |||||
| def _origin_visit_get_updated( | |||||
| self, origin: str, visit_id: int, db, cur | |||||
| ) -> Optional[Dict[str, Any]]: | |||||
| """Retrieve origin visit and latest origin visit update and merge them | |||||
| into an origin visit. | |||||
| """ | |||||
| row_visit = db.origin_visit_get(origin, visit_id) | |||||
| if row_visit is None: | |||||
| return None | |||||
| visit = dict(zip(db.origin_visit_get_cols, row_visit)) | |||||
| return self._origin_visit_apply_update(visit, db=db, cur=cur) | |||||
| def _origin_visit_apply_update( | |||||
| self, visit: Dict[str, Any], db, cur=None | |||||
| ) -> Dict[str, Any]: | |||||
| """Retrieve the latest visit update information for the origin visit. | |||||
| Then merge it with the visit and return it. | |||||
| """ | |||||
| visit_status = db.origin_visit_status_get_latest( | |||||
| visit["origin"], visit["visit"] | |||||
| ) | |||||
| return self._origin_visit_merge(visit, visit_status) | |||||
| def _origin_visit_merge( | |||||
| self, visit: Dict[str, Any], visit_status: Dict[str, Any] | |||||
| ) -> Dict[str, Any]: | |||||
| """Merge origin_visit and origin_visit_status together. | |||||
| """ | |||||
| return OriginVisit.from_dict( | |||||
| { | |||||
| # default to the values in visit | |||||
| **visit, | |||||
| # override with the last update | |||||
| **visit_status, | |||||
| # visit['origin'] is the URL (via a join), while | |||||
| # visit_status['origin'] is only an id. | |||||
| "origin": visit["origin"], | |||||
| # but keep the date of the creation of the origin visit | |||||
| "date": visit["date"], | |||||
| } | |||||
| ).to_dict() | |||||
| @timed | @timed | ||||
| @db_transaction() | @db_transaction() | ||||
| def origin_visit_upsert( | def origin_visit_upsert( | ||||
| self, visits: Iterable[OriginVisit], db=None, cur=None | self, visits: Iterable[OriginVisit], db=None, cur=None | ||||
| ) -> None: | ) -> None: | ||||
| for visit in visits: | for visit in visits: | ||||
| if visit.visit is None: | if visit.visit is None: | ||||
| raise StorageArgumentException(f"Missing visit id for visit {visit}") | raise StorageArgumentException(f"Missing visit id for visit {visit}") | ||||
| self.journal_writer.origin_visit_upsert(visits) | self.journal_writer.origin_visit_upsert(visits) | ||||
| for visit in visits: | for visit in visits: | ||||
| # TODO: upsert them all in a single query | # TODO: upsert them all in a single query | ||||
| assert visit.visit is not None | |||||
| db.origin_visit_upsert(visit, cur=cur) | db.origin_visit_upsert(visit, cur=cur) | ||||
| with convert_validation_exceptions(): | |||||
| visit_status = OriginVisitStatus( | |||||
| origin=visit.origin, | |||||
| visit=visit.visit, | |||||
| date=now(), | |||||
| status=visit.status, | |||||
| snapshot=visit.snapshot, | |||||
| metadata=visit.metadata, | |||||
| ) | |||||
| db.origin_visit_status_add(visit_status, cur=cur) | |||||
| @timed | @timed | ||||
| @db_transaction_generator(statement_timeout=500) | @db_transaction_generator(statement_timeout=500) | ||||
| def origin_visit_get( | def origin_visit_get( | ||||
| self, | self, | ||||
| origin: str, | origin: str, | ||||
| last_visit: Optional[int] = None, | last_visit: Optional[int] = None, | ||||
| limit: Optional[int] = None, | limit: Optional[int] = None, | ||||
| db=None, | db=None, | ||||
| cur=None, | cur=None, | ||||
| ) -> Iterable[Dict[str, Any]]: | ) -> Iterable[Dict[str, Any]]: | ||||
| for line in db.origin_visit_get_all( | lines = db.origin_visit_get_all( | ||||
| origin, last_visit=last_visit, limit=limit, cur=cur | origin, last_visit=last_visit, limit=limit, cur=cur | ||||
| ): | ) | ||||
| data = dict(zip(db.origin_visit_get_cols, line)) | for line in lines: | ||||
| yield data | visit = dict(zip(db.origin_visit_get_cols, line)) | ||||
| yield self._origin_visit_apply_update(visit, db) | |||||
| @timed | @timed | ||||
| @db_transaction(statement_timeout=500) | @db_transaction(statement_timeout=500) | ||||
| def origin_visit_find_by_date( | def origin_visit_find_by_date( | ||||
| self, origin: str, visit_date: datetime.datetime, db=None, cur=None | self, origin: str, visit_date: datetime.datetime, db=None, cur=None | ||||
| ) -> Optional[Dict[str, Any]]: | ) -> Optional[Dict[str, Any]]: | ||||
| line = db.origin_visit_find_by_date(origin, visit_date, cur=cur) | visit = db.origin_visit_find_by_date(origin, visit_date, cur=cur) | ||||
| if line: | if visit: | ||||
| return dict(zip(db.origin_visit_get_cols, line)) | return self._origin_visit_apply_update(visit, db) | ||||
| return None | return None | ||||
| @timed | @timed | ||||
| @db_transaction(statement_timeout=500) | @db_transaction(statement_timeout=500) | ||||
| def origin_visit_get_by( | def origin_visit_get_by( | ||||
| self, origin: str, visit: int, db=None, cur=None | self, origin: str, visit: int, db=None, cur=None | ||||
| ) -> Optional[Dict[str, Any]]: | ) -> Optional[Dict[str, Any]]: | ||||
| ori_visit = db.origin_visit_get(origin, visit, cur) | row = db.origin_visit_get(origin, visit, cur) | ||||
| if not ori_visit: | if row: | ||||
| visit_dict = dict(zip(db.origin_visit_get_cols, row)) | |||||
| return self._origin_visit_apply_update(visit_dict, db) | |||||
| return None | return None | ||||
| return dict(zip(db.origin_visit_get_cols, ori_visit)) | |||||
| @timed | @timed | ||||
| @db_transaction(statement_timeout=4000) | @db_transaction(statement_timeout=4000) | ||||
| def origin_visit_get_latest( | def origin_visit_get_latest( | ||||
| self, | self, | ||||
| origin: str, | origin: str, | ||||
| allowed_statuses: Optional[List[str]] = None, | allowed_statuses: Optional[List[str]] = None, | ||||
| require_snapshot: bool = False, | require_snapshot: bool = False, | ||||
| db=None, | db=None, | ||||
| cur=None, | cur=None, | ||||
| ) -> Optional[Dict[str, Any]]: | ) -> Optional[Dict[str, Any]]: | ||||
| origin_visit = db.origin_visit_get_latest( | row = db.origin_visit_get_latest( | ||||
| origin, | origin, | ||||
| allowed_statuses=allowed_statuses, | allowed_statuses=allowed_statuses, | ||||
| require_snapshot=require_snapshot, | require_snapshot=require_snapshot, | ||||
| cur=cur, | cur=cur, | ||||
| ) | ) | ||||
| if origin_visit: | if row: | ||||
| return dict(zip(db.origin_visit_get_cols, origin_visit)) | visit = dict(zip(db.origin_visit_get_cols, row)) | ||||
| return self._origin_visit_apply_update(visit, db) | |||||
| return None | return None | ||||
| @timed | @timed | ||||
| @db_transaction() | @db_transaction() | ||||
| def origin_visit_get_random( | def origin_visit_get_random( | ||||
| self, type: str, db=None, cur=None | self, type: str, db=None, cur=None | ||||
| ) -> Optional[Dict[str, Any]]: | ) -> Optional[Dict[str, Any]]: | ||||
| result = db.origin_visit_get_random(type, cur) | row = db.origin_visit_get_random(type, cur) | ||||
| if result: | if row: | ||||
| return dict(zip(db.origin_visit_get_cols, result)) | visit = dict(zip(db.origin_visit_get_cols, row)) | ||||
| else: | return self._origin_visit_apply_update(visit, db) | ||||
| return None | return None | ||||
| @timed | @timed | ||||
| @db_transaction(statement_timeout=2000) | @db_transaction(statement_timeout=2000) | ||||
| def object_find_by_sha1_git(self, ids, db=None, cur=None): | def object_find_by_sha1_git(self, ids, db=None, cur=None): | ||||
| ret = {id: [] for id in ids} | ret = {id: [] for id in ids} | ||||
| for retval in db.object_find_by_sha1_git(ids, cur=cur): | for retval in db.object_find_by_sha1_git(ids, cur=cur): | ||||
| if retval[1]: | if retval[1]: | ||||
| ▲ Show 20 Lines • Show All 233 Lines • Show Last 20 Lines | |||||