Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/storage.py
Show All 13 Lines | |||||
import attr | import attr | ||||
import dateutil.parser | import dateutil.parser | ||||
import psycopg2 | import psycopg2 | ||||
import psycopg2.pool | import psycopg2.pool | ||||
import psycopg2.errors | import psycopg2.errors | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
Content, Directory, Origin, OriginVisit, | Content, Directory, Origin, OriginVisit, OriginVisitUpdate, | ||||
Revision, Release, SkippedContent, Snapshot, SHA1_SIZE | Revision, Release, SkippedContent, Snapshot, SHA1_SIZE | ||||
) | ) | ||||
from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex | from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex | ||||
from swh.storage.objstorage import ObjStorage | from swh.storage.objstorage import ObjStorage | ||||
from . import converters | from . import converters | ||||
from .common import db_transaction_generator, db_transaction | from .common import db_transaction_generator, db_transaction | ||||
from .db import Db | from .db import Db | ||||
▲ Show 20 Lines • Show All 773 Lines • ▼ Show 20 Lines | def origin_visit_add( | ||||
origin = self.origin_get({'url': origin_url}, db=db, cur=cur) | origin = self.origin_get({'url': origin_url}, db=db, cur=cur) | ||||
if not origin: # Cannot add a visit without an origin | if not origin: # Cannot add a visit without an origin | ||||
raise StorageArgumentException( | raise StorageArgumentException( | ||||
'Unknown origin %s', origin_url) | 'Unknown origin %s', origin_url) | ||||
with convert_validation_exceptions(): | with convert_validation_exceptions(): | ||||
visit_id = db.origin_visit_add(origin_url, date, type, cur=cur) | visit_id = db.origin_visit_add(origin_url, date, type, cur=cur) | ||||
status = 'ongoing' | |||||
# We can write to the journal only after inserting to the | # We can write to the journal only after inserting to the | ||||
# DB, because we want the id of the visit | # DB, because we want the id of the visit | ||||
visit = OriginVisit.from_dict({ | visit = OriginVisit.from_dict({ | ||||
'origin': origin_url, | 'origin': origin_url, | ||||
'date': date, | 'date': date, | ||||
'type': type, | 'type': type, | ||||
'visit': visit_id, | 'visit': visit_id, | ||||
'status': 'ongoing', | # TODO: Remove when we remove those fields from the model | ||||
'status': status, | |||||
'metadata': None, | 'metadata': None, | ||||
'snapshot': None | 'snapshot': None | ||||
}) | }) | ||||
with convert_validation_exceptions(): | |||||
visit_update = OriginVisitUpdate( | |||||
origin=origin_url, | |||||
visit=visit_id, | |||||
date=date, | |||||
status=status, | |||||
snapshot=None, | |||||
metadata=None, | |||||
) | |||||
self._origin_visit_update_add(visit_update, db=db, cur=cur) | |||||
self.journal_writer.origin_visit_add(visit) | self.journal_writer.origin_visit_add(visit) | ||||
send_metric('origin_visit:add', count=1, method_name='origin_visit') | send_metric('origin_visit:add', count=1, method_name='origin_visit') | ||||
return visit | return visit | ||||
def _origin_visit_update_add(self, origin_visit_update: OriginVisitUpdate, | |||||
db, cur) -> None: | |||||
"""Add an origin visit update""" | |||||
# Inject origin visit update in the schema | |||||
db.origin_visit_update_add(origin_visit_update, cur=cur) | |||||
# write to the journal the origin visit update | |||||
send_metric('origin_visit_update:add', | |||||
count=1, method_name='origin_visit_update') | |||||
@timed | @timed | ||||
@db_transaction() | @db_transaction() | ||||
def origin_visit_update(self, origin: str, visit_id: int, | def origin_visit_update(self, origin: str, visit_id: int, | ||||
status: str, | status: str, | ||||
metadata: Optional[Dict] = None, | metadata: Optional[Dict] = None, | ||||
snapshot: Optional[bytes] = None, | snapshot: Optional[bytes] = None, | ||||
date: Optional[datetime.datetime] = None, | |||||
db=None, cur=None): | db=None, cur=None): | ||||
if not isinstance(origin, str): | if not isinstance(origin, str): | ||||
raise StorageArgumentException( | raise StorageArgumentException( | ||||
'origin must be a string, not %r' % (origin,)) | 'origin must be a string, not %r' % (origin,)) | ||||
origin_url = origin | origin_url = origin | ||||
visit = db.origin_visit_get(origin_url, visit_id, cur=cur) | visit = db.origin_visit_get(origin_url, visit_id, cur=cur) | ||||
if not visit: | if not visit: | ||||
Show All 11 Lines | def origin_visit_update(self, origin: str, visit_id: int, | ||||
if updates: | if updates: | ||||
updated_visit = {**visit, **updates} | updated_visit = {**visit, **updates} | ||||
self.journal_writer.origin_visit_update(updated_visit) | self.journal_writer.origin_visit_update(updated_visit) | ||||
with convert_validation_exceptions(): | with convert_validation_exceptions(): | ||||
db.origin_visit_update(origin_url, visit_id, updates, cur) | db.origin_visit_update(origin_url, visit_id, updates, cur) | ||||
last_visit_update = self._origin_visit_get_updated( | |||||
origin, visit_id, db=db, cur=cur) | |||||
assert last_visit_update is not None | |||||
with convert_validation_exceptions(): | |||||
from swh.storage.in_memory import now | |||||
visit_update = OriginVisitUpdate( | |||||
origin=origin_url, | |||||
visit=visit_id, | |||||
date=date or now(), | |||||
status=status, | |||||
snapshot=snapshot or last_visit_update.snapshot, | |||||
metadata=metadata or last_visit_update.metadata, | |||||
) | |||||
self._origin_visit_update_add(visit_update, db=db, cur=cur) | |||||
def _origin_visit_get_updated( | |||||
self, origin: str, visit_id: int, | |||||
db, cur) -> Optional[OriginVisit]: | |||||
"""Merge origin visit and latest origin visit update | |||||
""" | |||||
row_visit = db.origin_visit_get(origin, visit_id) | |||||
if row_visit is None: | |||||
return None | |||||
visit = dict(zip(db.origin_visit_get_cols, row_visit)) | |||||
row_visit_update = db.origin_visit_update_get_latest(origin, visit_id) | |||||
visit_update = dict(zip(db.origin_visit_update_cols, row_visit_update)) | |||||
return OriginVisit.from_dict({ | |||||
# default to the values in visit | |||||
**visit, | |||||
# override with the last update | |||||
**visit_update, | |||||
# but keep the date of the creation of the origin visit | |||||
'date': visit['date'] | |||||
}) | |||||
@timed | @timed | ||||
@db_transaction() | @db_transaction() | ||||
def origin_visit_upsert(self, visits: Iterable[OriginVisit], | def origin_visit_upsert(self, visits: Iterable[OriginVisit], | ||||
db=None, cur=None) -> None: | db=None, cur=None) -> None: | ||||
self.journal_writer.origin_visit_upsert(visits) | self.journal_writer.origin_visit_upsert(visits) | ||||
for visit in visits: | for visit in visits: | ||||
# TODO: upsert them all in a single query | # TODO: upsert them all in a single query | ||||
▲ Show 20 Lines • Show All 273 Lines • Show Last 20 Lines |