Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/storage.py
Show All 13 Lines | |||||
import attr | import attr | ||||
import dateutil.parser | import dateutil.parser | ||||
import psycopg2 | import psycopg2 | ||||
import psycopg2.pool | import psycopg2.pool | ||||
import psycopg2.errors | import psycopg2.errors | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
Content, Directory, Origin, OriginVisit, | Content, Directory, Origin, OriginVisit, OriginVisitUpdate, | ||||
Revision, Release, SkippedContent, Snapshot, SHA1_SIZE | Revision, Release, SkippedContent, Snapshot, SHA1_SIZE | ||||
) | ) | ||||
from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex | from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex | ||||
from swh.storage.objstorage import ObjStorage | from swh.storage.objstorage import ObjStorage | ||||
from . import converters | from . import converters | ||||
from .common import db_transaction_generator, db_transaction | from .common import db_transaction_generator, db_transaction | ||||
from .db import Db | from .db import Db | ||||
from .exc import StorageArgumentException, StorageDBError, HashCollision | from .exc import StorageArgumentException, StorageDBError, HashCollision | ||||
from .algos import diff | from .algos import diff | ||||
from .metrics import timed, send_metric, process_metrics | from .metrics import timed, send_metric, process_metrics | ||||
from .utils import ( | from .utils import ( | ||||
get_partition_bounds_bytes, extract_collision_hash | get_partition_bounds_bytes, extract_collision_hash | ||||
) | ) | ||||
from .validate import VALIDATION_EXCEPTIONS | |||||
from .writer import JournalWriter | from .writer import JournalWriter | ||||
def now(): | |||||
return datetime.datetime.now(tz=datetime.timezone.utc) | |||||
# Max block size of contents to return | # Max block size of contents to return | ||||
BULK_BLOCK_CONTENT_LEN_MAX = 10000 | BULK_BLOCK_CONTENT_LEN_MAX = 10000 | ||||
EMPTY_SNAPSHOT_ID = hash_to_bytes('1a8893e6a86f444e8be8e7bda6cb34fb1735a00e') | EMPTY_SNAPSHOT_ID = hash_to_bytes('1a8893e6a86f444e8be8e7bda6cb34fb1735a00e') | ||||
"""Identifier for the empty snapshot""" | """Identifier for the empty snapshot""" | ||||
VALIDATION_EXCEPTIONS = ( | VALIDATION_EXCEPTIONS = VALIDATION_EXCEPTIONS + [ | ||||
psycopg2.errors.CheckViolation, | psycopg2.errors.CheckViolation, | ||||
psycopg2.errors.IntegrityError, | psycopg2.errors.IntegrityError, | ||||
psycopg2.errors.InvalidTextRepresentation, | psycopg2.errors.InvalidTextRepresentation, | ||||
psycopg2.errors.NotNullViolation, | psycopg2.errors.NotNullViolation, | ||||
psycopg2.errors.NumericValueOutOfRange, | psycopg2.errors.NumericValueOutOfRange, | ||||
psycopg2.errors.UndefinedFunction, # (raised on wrong argument typs) | psycopg2.errors.UndefinedFunction, # (raised on wrong argument typs) | ||||
) | ] | ||||
"""Exceptions raised by postgresql when validation of the arguments | """Exceptions raised by postgresql when validation of the arguments | ||||
failed.""" | failed.""" | ||||
@contextlib.contextmanager | @contextlib.contextmanager | ||||
def convert_validation_exceptions(): | def convert_validation_exceptions(): | ||||
"""Catches postgresql errors related to invalid arguments, and | """Catches postgresql errors related to invalid arguments, and | ||||
re-raises a StorageArgumentException.""" | re-raises a StorageArgumentException.""" | ||||
try: | try: | ||||
yield | yield | ||||
except VALIDATION_EXCEPTIONS as e: | except tuple(VALIDATION_EXCEPTIONS) as e: | ||||
raise StorageArgumentException(*e.args) | raise StorageArgumentException(*e.args) | ||||
class Storage(): | class Storage(): | ||||
"""SWH storage proxy, encompassing DB and object storage | """SWH storage proxy, encompassing DB and object storage | ||||
""" | """ | ||||
▲ Show 20 Lines • Show All 107 Lines • ▼ Show 20 Lines | def _content_add_metadata(self, db, cur, content): | ||||
) from None | ) from None | ||||
else: | else: | ||||
raise | raise | ||||
@timed | @timed | ||||
@process_metrics | @process_metrics | ||||
def content_add( | def content_add( | ||||
self, content: Iterable[Content]) -> Dict: | self, content: Iterable[Content]) -> Dict: | ||||
now = datetime.datetime.now(tz=datetime.timezone.utc) | contents = [attr.evolve(c, ctime=now()) for c in content] | ||||
contents = [attr.evolve(c, ctime=now) for c in content] | |||||
objstorage_summary = self.objstorage.content_add(contents) | objstorage_summary = self.objstorage.content_add(contents) | ||||
with self.db() as db: | with self.db() as db: | ||||
with db.transaction() as cur: | with db.transaction() as cur: | ||||
missing = list(self.content_missing( | missing = list(self.content_missing( | ||||
map(Content.to_dict, contents), key_hash='sha1_git', | map(Content.to_dict, contents), key_hash='sha1_git', | ||||
db=db, cur=cur, | db=db, cur=cur, | ||||
▲ Show 20 Lines • Show All 193 Lines • ▼ Show 20 Lines | def _skipped_content_add_metadata( | ||||
# move metadata in place | # move metadata in place | ||||
db.skipped_content_add_from_temp(cur) | db.skipped_content_add_from_temp(cur) | ||||
@timed | @timed | ||||
@process_metrics | @process_metrics | ||||
@db_transaction() | @db_transaction() | ||||
def skipped_content_add(self, content: Iterable[SkippedContent], | def skipped_content_add(self, content: Iterable[SkippedContent], | ||||
db=None, cur=None) -> Dict: | db=None, cur=None) -> Dict: | ||||
now = datetime.datetime.now(tz=datetime.timezone.utc) | content = [attr.evolve(c, ctime=now()) for c in content] | ||||
content = [attr.evolve(c, ctime=now) for c in content] | |||||
missing_contents = self.skipped_content_missing( | missing_contents = self.skipped_content_missing( | ||||
(c.to_dict() for c in content), | (c.to_dict() for c in content), | ||||
db=db, cur=cur, | db=db, cur=cur, | ||||
) | ) | ||||
content = [c for c in content | content = [c for c in content | ||||
if any(all(c.get_hash(algo) == missing_content.get(algo) | if any(all(c.get_hash(algo) == missing_content.get(algo) | ||||
for algo in DEFAULT_ALGORITHMS) | for algo in DEFAULT_ALGORITHMS) | ||||
▲ Show 20 Lines • Show All 254 Lines • ▼ Show 20 Lines | def snapshot_add( | ||||
count = 0 | count = 0 | ||||
for snapshot in snapshots: | for snapshot in snapshots: | ||||
if not db.snapshot_exists(snapshot.id, cur): | if not db.snapshot_exists(snapshot.id, cur): | ||||
if not created_temp_table: | if not created_temp_table: | ||||
db.mktemp_snapshot_branch(cur) | db.mktemp_snapshot_branch(cur) | ||||
created_temp_table = True | created_temp_table = True | ||||
try: | with convert_validation_exceptions(): | ||||
db.copy_to( | db.copy_to( | ||||
( | ( | ||||
{ | { | ||||
'name': name, | 'name': name, | ||||
'target': info.target if info else None, | 'target': info.target if info else None, | ||||
'target_type': (info.target_type.value | 'target_type': (info.target_type.value | ||||
if info else None), | if info else None), | ||||
} | } | ||||
for name, info in snapshot.branches.items() | for name, info in snapshot.branches.items() | ||||
), | ), | ||||
'tmp_snapshot_branch', | 'tmp_snapshot_branch', | ||||
['name', 'target', 'target_type'], | ['name', 'target', 'target_type'], | ||||
cur, | cur, | ||||
) | ) | ||||
except VALIDATION_EXCEPTIONS + (KeyError,) as e: | |||||
raise StorageArgumentException(*e.args) | |||||
self.journal_writer.snapshot_add(snapshot) | self.journal_writer.snapshot_add(snapshot) | ||||
db.snapshot_add(snapshot.id, cur) | db.snapshot_add(snapshot.id, cur) | ||||
count += 1 | count += 1 | ||||
return {'snapshot:add': count} | return {'snapshot:add': count} | ||||
▲ Show 20 Lines • Show All 107 Lines • ▼ Show 20 Lines | def origin_visit_add( | ||||
origin = self.origin_get({'url': origin_url}, db=db, cur=cur) | origin = self.origin_get({'url': origin_url}, db=db, cur=cur) | ||||
if not origin: # Cannot add a visit without an origin | if not origin: # Cannot add a visit without an origin | ||||
raise StorageArgumentException( | raise StorageArgumentException( | ||||
'Unknown origin %s', origin_url) | 'Unknown origin %s', origin_url) | ||||
with convert_validation_exceptions(): | with convert_validation_exceptions(): | ||||
visit_id = db.origin_visit_add(origin_url, date, type, cur=cur) | visit_id = db.origin_visit_add(origin_url, date, type, cur=cur) | ||||
status = 'ongoing' | |||||
# We can write to the journal only after inserting to the | # We can write to the journal only after inserting to the | ||||
# DB, because we want the id of the visit | # DB, because we want the id of the visit | ||||
visit = OriginVisit.from_dict({ | visit = OriginVisit.from_dict({ | ||||
'origin': origin_url, | 'origin': origin_url, | ||||
'date': date, | 'date': date, | ||||
'type': type, | 'type': type, | ||||
'visit': visit_id, | 'visit': visit_id, | ||||
'status': 'ongoing', | # TODO: Remove when we remove those fields from the model | ||||
'status': status, | |||||
'metadata': None, | 'metadata': None, | ||||
'snapshot': None | 'snapshot': None | ||||
}) | }) | ||||
with convert_validation_exceptions(): | |||||
visit_update = OriginVisitUpdate( | |||||
origin=origin_url, | |||||
visit=visit_id, | |||||
date=date, | |||||
status=status, | |||||
snapshot=None, | |||||
metadata=None, | |||||
) | |||||
self._origin_visit_update_add(visit_update, db=db, cur=cur) | |||||
self.journal_writer.origin_visit_add(visit) | self.journal_writer.origin_visit_add(visit) | ||||
send_metric('origin_visit:add', count=1, method_name='origin_visit') | send_metric('origin_visit:add', count=1, method_name='origin_visit') | ||||
return visit | return visit | ||||
def _origin_visit_update_add(self, origin_visit_update: OriginVisitUpdate, | |||||
db, cur) -> None: | |||||
"""Add an origin visit update""" | |||||
# Inject origin visit update in the schema | |||||
db.origin_visit_update_add(origin_visit_update, cur=cur) | |||||
# write to the journal the origin visit update | |||||
send_metric('origin_visit_update:add', | |||||
count=1, method_name='origin_visit_update') | |||||
@timed | @timed | ||||
@db_transaction() | @db_transaction() | ||||
def origin_visit_update(self, origin: str, visit_id: int, | def origin_visit_update(self, origin: str, visit_id: int, | ||||
status: str, | status: str, | ||||
metadata: Optional[Dict] = None, | metadata: Optional[Dict] = None, | ||||
snapshot: Optional[bytes] = None, | snapshot: Optional[bytes] = None, | ||||
date: Optional[datetime.datetime] = None, | |||||
db=None, cur=None): | db=None, cur=None): | ||||
if not isinstance(origin, str): | if not isinstance(origin, str): | ||||
raise StorageArgumentException( | raise StorageArgumentException( | ||||
'origin must be a string, not %r' % (origin,)) | 'origin must be a string, not %r' % (origin,)) | ||||
origin_url = origin | origin_url = origin | ||||
visit = db.origin_visit_get(origin_url, visit_id, cur=cur) | visit = db.origin_visit_get(origin_url, visit_id, cur=cur) | ||||
if not visit: | if not visit: | ||||
raise StorageArgumentException('Invalid visit_id for this origin.') | raise StorageArgumentException('Invalid visit_id for this origin.') | ||||
visit = dict(zip(db.origin_visit_get_cols, visit)) | visit = dict(zip(db.origin_visit_get_cols, visit)) | ||||
updates: Dict[str, Any] = { | updates: Dict[str, Any] = { | ||||
'status': status, | 'status': status, | ||||
} | } | ||||
if metadata and metadata != visit['metadata']: | if metadata and metadata != visit['metadata']: | ||||
updates['metadata'] = metadata | updates['metadata'] = metadata | ||||
if snapshot and snapshot != visit['snapshot']: | if snapshot and snapshot != visit['snapshot']: | ||||
updates['snapshot'] = snapshot | updates['snapshot'] = snapshot | ||||
if updates: | if updates: | ||||
updated_visit = {**visit, **updates} | updated_visit = {**visit, **updates} | ||||
self.journal_writer.origin_visit_update(updated_visit) | self.journal_writer.origin_visit_update(updated_visit) | ||||
last_visit_update = self._origin_visit_get_updated( | |||||
origin, visit_id, db=db, cur=cur) | |||||
assert last_visit_update is not None | |||||
with convert_validation_exceptions(): | with convert_validation_exceptions(): | ||||
db.origin_visit_update(origin_url, visit_id, updates, cur) | visit_update = OriginVisitUpdate( | ||||
origin=origin_url, | |||||
visit=visit_id, | |||||
date=date or now(), | |||||
status=status, | |||||
snapshot=snapshot or last_visit_update['snapshot'], | |||||
metadata=metadata or last_visit_update['metadata'], | |||||
) | |||||
self._origin_visit_update_add(visit_update, db=db, cur=cur) | |||||
def _origin_visit_get_updated( | |||||
self, origin: str, visit_id: int, | |||||
db, cur) -> Optional[Dict[str, Any]]: | |||||
"""Retrieve origin visit and latest origin visit update and merge them | |||||
into an origin visit. | |||||
""" | |||||
row_visit = db.origin_visit_get(origin, visit_id) | |||||
if row_visit is None: | |||||
return None | |||||
visit = dict(zip(db.origin_visit_get_cols, row_visit)) | |||||
return self._origin_visit_apply_update(visit, db=db, cur=cur) | |||||
def _origin_visit_apply_update( | |||||
self, visit: Dict[str, Any], db, cur=None) -> Dict[str, Any]: | |||||
"""Retrieve the latest visit update information for the origin visit. | |||||
Then merge it with the visit and return it. | |||||
""" | |||||
visit_update = db.origin_visit_update_get_latest( | |||||
visit['origin'], visit['visit']) | |||||
return self._origin_visit_merge(visit, visit_update) | |||||
def _origin_visit_merge( | |||||
self, visit: Dict[str, Any], | |||||
visit_update: Dict[str, Any]) -> Dict[str, Any]: | |||||
"""Merge origin_visit and origin_visit_update together. | |||||
""" | |||||
return OriginVisit.from_dict({ | |||||
# default to the values in visit | |||||
**visit, | |||||
# override with the last update | |||||
**visit_update, | |||||
# visit['origin'] is the URL (via a join), while | |||||
# visit_update['origin'] is only an id. | |||||
'origin': visit['origin'], | |||||
# but keep the date of the creation of the origin visit | |||||
'date': visit['date'] | |||||
}).to_dict() | |||||
@timed | @timed | ||||
@db_transaction() | @db_transaction() | ||||
def origin_visit_upsert(self, visits: Iterable[OriginVisit], | def origin_visit_upsert(self, visits: Iterable[OriginVisit], | ||||
db=None, cur=None) -> None: | db=None, cur=None) -> None: | ||||
self.journal_writer.origin_visit_upsert(visits) | self.journal_writer.origin_visit_upsert(visits) | ||||
for visit in visits: | for visit in visits: | ||||
# TODO: upsert them all in a single query | # TODO: upsert them all in a single query | ||||
db.origin_visit_upsert(visit, cur=cur) | db.origin_visit_upsert(visit, cur=cur) | ||||
assert visit.visit is not None | |||||
with convert_validation_exceptions(): | |||||
visit_update = OriginVisitUpdate( | |||||
origin=visit.origin, | |||||
visit=visit.visit, | |||||
date=now(), | |||||
status=visit.status, | |||||
snapshot=visit.snapshot, | |||||
metadata=visit.metadata, | |||||
) | |||||
db.origin_visit_update_add(visit_update, cur=cur) | |||||
@timed | @timed | ||||
@db_transaction_generator(statement_timeout=500) | @db_transaction_generator(statement_timeout=500) | ||||
def origin_visit_get(self, origin, last_visit=None, limit=None, db=None, | def origin_visit_get( | ||||
cur=None): | self, origin: str, last_visit: Optional[int] = None, | ||||
for line in db.origin_visit_get_all( | limit: Optional[int] = None, | ||||
origin, last_visit=last_visit, limit=limit, cur=cur): | db=None, cur=None) -> Iterable[Dict[str, Any]]: | ||||
data = dict(zip(db.origin_visit_get_cols, line)) | lines = db.origin_visit_get_all( | ||||
yield data | origin, last_visit=last_visit, limit=limit, cur=cur) | ||||
for line in lines: | |||||
visit = dict(zip(db.origin_visit_get_cols, line)) | |||||
yield self._origin_visit_apply_update(visit, db) | |||||
@timed | @timed | ||||
@db_transaction(statement_timeout=500) | @db_transaction(statement_timeout=500) | ||||
def origin_visit_find_by_date(self, origin, visit_date, db=None, cur=None): | def origin_visit_find_by_date( | ||||
line = db.origin_visit_find_by_date(origin, visit_date, cur=cur) | self, origin: str, visit_date: datetime.datetime, | ||||
if line: | db=None, cur=None) -> Optional[Dict[str, Any]]: | ||||
return dict(zip(db.origin_visit_get_cols, line)) | visit = db.origin_visit_find_by_date(origin, visit_date, cur=cur) | ||||
if visit: | |||||
return self._origin_visit_apply_update(visit, db) | |||||
return None | |||||
@timed | @timed | ||||
@db_transaction(statement_timeout=500) | @db_transaction(statement_timeout=500) | ||||
def origin_visit_get_by(self, origin, visit, db=None, cur=None): | def origin_visit_get_by( | ||||
ori_visit = db.origin_visit_get(origin, visit, cur) | self, origin: str, | ||||
if not ori_visit: | visit: int, db=None, cur=None) -> Optional[Dict[str, Any]]: | ||||
row = db.origin_visit_get(origin, visit, cur) | |||||
if row: | |||||
visit_dict = dict(zip(db.origin_visit_get_cols, row)) | |||||
return self._origin_visit_apply_update(visit_dict, db) | |||||
return None | return None | ||||
return dict(zip(db.origin_visit_get_cols, ori_visit)) | |||||
@timed | @timed | ||||
@db_transaction(statement_timeout=4000) | @db_transaction(statement_timeout=4000) | ||||
def origin_visit_get_latest( | def origin_visit_get_latest( | ||||
self, origin, allowed_statuses=None, require_snapshot=False, | self, origin: str, allowed_statuses: Optional[List[str]] = None, | ||||
db=None, cur=None): | require_snapshot: bool = False, | ||||
origin_visit = db.origin_visit_get_latest( | db=None, cur=None) -> Optional[Dict[str, Any]]: | ||||
row = db.origin_visit_get_latest( | |||||
origin, allowed_statuses=allowed_statuses, | origin, allowed_statuses=allowed_statuses, | ||||
require_snapshot=require_snapshot, cur=cur) | require_snapshot=require_snapshot, cur=cur) | ||||
if origin_visit: | if row: | ||||
return dict(zip(db.origin_visit_get_cols, origin_visit)) | visit = dict(zip(db.origin_visit_get_cols, row)) | ||||
return self._origin_visit_apply_update(visit, db) | |||||
return None | |||||
@timed | @timed | ||||
@db_transaction() | @db_transaction() | ||||
def origin_visit_get_random( | def origin_visit_get_random( | ||||
self, type: str, db=None, cur=None) -> Optional[Dict[str, Any]]: | self, type: str, db=None, cur=None) -> Optional[Dict[str, Any]]: | ||||
result = db.origin_visit_get_random(type, cur) | row = db.origin_visit_get_random(type, cur) | ||||
if result: | if row: | ||||
return dict(zip(db.origin_visit_get_cols, result)) | visit = dict(zip(db.origin_visit_get_cols, row)) | ||||
else: | return self._origin_visit_apply_update(visit, db) | ||||
return None | return None | ||||
@timed | @timed | ||||
@db_transaction(statement_timeout=2000) | @db_transaction(statement_timeout=2000) | ||||
def object_find_by_sha1_git(self, ids, db=None, cur=None): | def object_find_by_sha1_git(self, ids, db=None, cur=None): | ||||
ret = {id: [] for id in ids} | ret = {id: [] for id in ids} | ||||
for retval in db.object_find_by_sha1_git(ids, cur=cur): | for retval in db.object_find_by_sha1_git(ids, cur=cur): | ||||
if retval[1]: | if retval[1]: | ||||
▲ Show 20 Lines • Show All 218 Lines • Show Last 20 Lines |