Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/storage.py
Show All 19 Lines | |||||
import psycopg2.pool | import psycopg2.pool | ||||
import psycopg2.errors | import psycopg2.errors | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
SkippedContent, Content, Directory, Revision, Release, | SkippedContent, Content, Directory, Revision, Release, | ||||
Snapshot, Origin, SHA1_SIZE | Snapshot, Origin, SHA1_SIZE | ||||
) | ) | ||||
from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex | from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex | ||||
try: | |||||
from swh.journal.writer import get_journal_writer | |||||
except ImportError: | |||||
get_journal_writer = None # type: ignore | |||||
# mypy limitation, see https://github.com/python/mypy/issues/1153 | |||||
from swh.storage.objstorage import ObjStorage | from swh.storage.objstorage import ObjStorage | ||||
from . import converters, HashCollision | from . import converters, HashCollision | ||||
from .common import db_transaction_generator, db_transaction | from .common import db_transaction_generator, db_transaction | ||||
from .db import Db | from .db import Db | ||||
from .exc import StorageArgumentException, StorageDBError | from .exc import StorageArgumentException, StorageDBError | ||||
from .algos import diff | from .algos import diff | ||||
from .metrics import timed, send_metric, process_metrics | from .metrics import timed, send_metric, process_metrics | ||||
from .utils import get_partition_bounds_bytes | from .utils import get_partition_bounds_bytes | ||||
from .writer import JournalWriter | |||||
# Max block size of contents to return | # Max block size of contents to return | ||||
BULK_BLOCK_CONTENT_LEN_MAX = 10000 | BULK_BLOCK_CONTENT_LEN_MAX = 10000 | ||||
EMPTY_SNAPSHOT_ID = hash_to_bytes('1a8893e6a86f444e8be8e7bda6cb34fb1735a00e') | EMPTY_SNAPSHOT_ID = hash_to_bytes('1a8893e6a86f444e8be8e7bda6cb34fb1735a00e') | ||||
"""Identifier for the empty snapshot""" | """Identifier for the empty snapshot""" | ||||
Show All 40 Lines | def __init__(self, db, objstorage, min_pool_conns=1, max_pool_conns=10, | ||||
else: | else: | ||||
self._pool = psycopg2.pool.ThreadedConnectionPool( | self._pool = psycopg2.pool.ThreadedConnectionPool( | ||||
min_pool_conns, max_pool_conns, db | min_pool_conns, max_pool_conns, db | ||||
) | ) | ||||
self._db = None | self._db = None | ||||
except psycopg2.OperationalError as e: | except psycopg2.OperationalError as e: | ||||
raise StorageDBError(e) | raise StorageDBError(e) | ||||
if journal_writer: | self.journal_writer = JournalWriter(journal_writer) | ||||
if get_journal_writer is None: | |||||
raise EnvironmentError( | |||||
'You need the swh.journal package to use the ' | |||||
'journal_writer feature') | |||||
self.journal_writer = get_journal_writer(**journal_writer) | |||||
else: | |||||
self.journal_writer = None | |||||
self.objstorage = ObjStorage(objstorage) | self.objstorage = ObjStorage(objstorage) | ||||
def get_db(self): | def get_db(self): | ||||
if self._db: | if self._db: | ||||
return self._db | return self._db | ||||
else: | else: | ||||
return Db.from_pool(self._pool) | return Db.from_pool(self._pool) | ||||
▲ Show 20 Lines • Show All 67 Lines • ▼ Show 20 Lines | def _content_add_metadata(self, db, cur, content): | ||||
raise | raise | ||||
@timed | @timed | ||||
@process_metrics | @process_metrics | ||||
@db_transaction() | @db_transaction() | ||||
def content_add( | def content_add( | ||||
self, content: Iterable[Content], db=None, cur=None) -> Dict: | self, content: Iterable[Content], db=None, cur=None) -> Dict: | ||||
now = datetime.datetime.now(tz=datetime.timezone.utc) | now = datetime.datetime.now(tz=datetime.timezone.utc) | ||||
content = [attr.evolve(c, ctime=now) for c in content] | contents = [attr.evolve(c, ctime=now) for c in content] | ||||
missing = list(self.content_missing( | missing = list(self.content_missing( | ||||
map(Content.to_dict, content), key_hash='sha1_git')) | map(Content.to_dict, contents), key_hash='sha1_git')) | ||||
content = [c for c in content if c.sha1_git in missing] | contents = [c for c in contents if c.sha1_git in missing] | ||||
if self.journal_writer: | self.journal_writer.content_add(contents) | ||||
for item in content: | |||||
if item.data: | |||||
item = attr.evolve(item, data=None) | |||||
self.journal_writer.write_addition('content', item) | |||||
def add_to_objstorage(): | def add_to_objstorage(): | ||||
"""Add to objstorage the new missing_content | """Add to objstorage the new missing_content | ||||
Returns: | Returns: | ||||
Sum of all the content's data length pushed to the | Sum of all the content's data length pushed to the | ||||
objstorage. Content present twice is only sent once. | objstorage. Content present twice is only sent once. | ||||
""" | """ | ||||
summary = self.objstorage.content_add(content) | summary = self.objstorage.content_add(contents) | ||||
return summary['content:add:bytes'] | return summary['content:add:bytes'] | ||||
with ThreadPoolExecutor(max_workers=1) as executor: | with ThreadPoolExecutor(max_workers=1) as executor: | ||||
added_to_objstorage = executor.submit(add_to_objstorage) | added_to_objstorage = executor.submit(add_to_objstorage) | ||||
self._content_add_metadata(db, cur, content) | self._content_add_metadata(db, cur, contents) | ||||
# Wait for objstorage addition before returning from the | # Wait for objstorage addition before returning from the | ||||
# transaction, bubbling up any exception | # transaction, bubbling up any exception | ||||
content_bytes_added = added_to_objstorage.result() | content_bytes_added = added_to_objstorage.result() | ||||
return { | return { | ||||
'content:add': len(content), | 'content:add': len(contents), | ||||
'content:add:bytes': content_bytes_added, | 'content:add:bytes': content_bytes_added, | ||||
} | } | ||||
@timed | @timed | ||||
@db_transaction() | @db_transaction() | ||||
def content_update(self, content, keys=[], db=None, cur=None): | def content_update(self, content, keys=[], db=None, cur=None): | ||||
# TODO: Add a check on input keys. How to properly implement | # TODO: Add a check on input keys. How to properly implement | ||||
# this? We don't know yet the new columns. | # this? We don't know yet the new columns. | ||||
self.journal_writer.content_update(content) | |||||
if self.journal_writer: | |||||
raise NotImplementedError( | |||||
'content_update is not yet supported with a journal_writer.') | |||||
db.mktemp('content', cur) | db.mktemp('content', cur) | ||||
select_keys = list(set(db.content_get_metadata_keys).union(set(keys))) | select_keys = list(set(db.content_get_metadata_keys).union(set(keys))) | ||||
with convert_validation_exceptions(): | with convert_validation_exceptions(): | ||||
db.copy_to(content, 'tmp_content', select_keys, cur) | db.copy_to(content, 'tmp_content', select_keys, cur) | ||||
db.content_update_from_temp(keys_to_update=keys, | db.content_update_from_temp(keys_to_update=keys, | ||||
cur=cur) | cur=cur) | ||||
@timed | @timed | ||||
@process_metrics | @process_metrics | ||||
@db_transaction() | @db_transaction() | ||||
def content_add_metadata(self, content: Iterable[Content], | def content_add_metadata(self, content: Iterable[Content], | ||||
db=None, cur=None) -> Dict: | db=None, cur=None) -> Dict: | ||||
content = list(content) | contents = list(content) | ||||
missing = self.content_missing( | missing = self.content_missing( | ||||
(c.to_dict() for c in content), key_hash='sha1_git') | (c.to_dict() for c in contents), key_hash='sha1_git') | ||||
content = [c for c in content if c.sha1_git in missing] | contents = [c for c in contents if c.sha1_git in missing] | ||||
if self.journal_writer: | |||||
for item in itertools.chain(content): | |||||
assert item.data is None | |||||
self.journal_writer.write_addition('content', item) | |||||
self._content_add_metadata(db, cur, content) | self.journal_writer.content_add_metadata(contents) | ||||
self._content_add_metadata(db, cur, contents) | |||||
return { | return { | ||||
'content:add': len(content), | 'content:add': len(contents), | ||||
} | } | ||||
@timed | @timed | ||||
def content_get(self, content): | def content_get(self, content): | ||||
# FIXME: Make this method support slicing the `data`. | # FIXME: Make this method support slicing the `data`. | ||||
if len(content) > BULK_BLOCK_CONTENT_LEN_MAX: | if len(content) > BULK_BLOCK_CONTENT_LEN_MAX: | ||||
raise StorageArgumentException( | raise StorageArgumentException( | ||||
"Send at maximum %s contents." % BULK_BLOCK_CONTENT_LEN_MAX) | "Send at maximum %s contents." % BULK_BLOCK_CONTENT_LEN_MAX) | ||||
▲ Show 20 Lines • Show All 154 Lines • ▼ Show 20 Lines | def skipped_content_add(self, content: Iterable[SkippedContent], | ||||
missing_contents = self.skipped_content_missing( | missing_contents = self.skipped_content_missing( | ||||
c.to_dict() for c in content) | c.to_dict() for c in content) | ||||
content = [c for c in content | content = [c for c in content | ||||
if any(all(c.get_hash(algo) == missing_content.get(algo) | if any(all(c.get_hash(algo) == missing_content.get(algo) | ||||
for algo in DEFAULT_ALGORITHMS) | for algo in DEFAULT_ALGORITHMS) | ||||
for missing_content in missing_contents)] | for missing_content in missing_contents)] | ||||
if self.journal_writer: | self.journal_writer.skipped_content_add(content) | ||||
for item in content: | |||||
self.journal_writer.write_addition('content', item) | |||||
self._skipped_content_add_metadata(db, cur, content) | self._skipped_content_add_metadata(db, cur, content) | ||||
return { | return { | ||||
'skipped_content:add': len(content), | 'skipped_content:add': len(content), | ||||
} | } | ||||
@timed | @timed | ||||
@db_transaction_generator() | @db_transaction_generator() | ||||
Show All 24 Lines | def directory_add(self, directories: Iterable[Directory], | ||||
entry = src_entry.to_dict() | entry = src_entry.to_dict() | ||||
entry['dir_id'] = dir_id | entry['dir_id'] = dir_id | ||||
dir_entries[entry['type']][dir_id].append(entry) | dir_entries[entry['type']][dir_id].append(entry) | ||||
dirs_missing = set(self.directory_missing(dirs, db=db, cur=cur)) | dirs_missing = set(self.directory_missing(dirs, db=db, cur=cur)) | ||||
if not dirs_missing: | if not dirs_missing: | ||||
return summary | return summary | ||||
if self.journal_writer: | self.journal_writer.directory_add( | ||||
self.journal_writer.write_additions( | dir_ for dir_ in directories | ||||
'directory', | if dir_.id in dirs_missing | ||||
(dir_ for dir_ in directories | ) | ||||
if dir_.id in dirs_missing)) | |||||
# Copy directory ids | # Copy directory ids | ||||
dirs_missing_dict = ({'id': dir} for dir in dirs_missing) | dirs_missing_dict = ({'id': dir} for dir in dirs_missing) | ||||
db.mktemp('directory', cur) | db.mktemp('directory', cur) | ||||
db.copy_to(dirs_missing_dict, 'tmp_directory', ['id'], cur) | db.copy_to(dirs_missing_dict, 'tmp_directory', ['id'], cur) | ||||
# Copy entries | # Copy entries | ||||
for entry_type, entry_list in dir_entries.items(): | for entry_type, entry_list in dir_entries.items(): | ||||
▲ Show 20 Lines • Show All 63 Lines • ▼ Show 20 Lines | def revision_add(self, revisions: Iterable[Revision], | ||||
return summary | return summary | ||||
db.mktemp_revision(cur) | db.mktemp_revision(cur) | ||||
revisions_filtered = [ | revisions_filtered = [ | ||||
revision for revision in revisions | revision for revision in revisions | ||||
if revision.id in revisions_missing] | if revision.id in revisions_missing] | ||||
if self.journal_writer: | self.journal_writer.revision_add(revisions_filtered) | ||||
self.journal_writer.write_additions('revision', revisions_filtered) | |||||
revisions_filtered = \ | revisions_filtered = \ | ||||
list(map(converters.revision_to_db, revisions_filtered)) | list(map(converters.revision_to_db, revisions_filtered)) | ||||
parents_filtered: List[bytes] = [] | parents_filtered: List[bytes] = [] | ||||
with convert_validation_exceptions(): | with convert_validation_exceptions(): | ||||
db.copy_to( | db.copy_to( | ||||
▲ Show 20 Lines • Show All 69 Lines • ▼ Show 20 Lines | def release_add( | ||||
db.mktemp_release(cur) | db.mktemp_release(cur) | ||||
releases_filtered = [ | releases_filtered = [ | ||||
release for release in releases | release for release in releases | ||||
if release.id in releases_missing | if release.id in releases_missing | ||||
] | ] | ||||
if self.journal_writer: | self.journal_writer.release_add(releases_filtered) | ||||
self.journal_writer.write_additions('release', releases_filtered) | |||||
releases_filtered = \ | releases_filtered = \ | ||||
list(map(converters.release_to_db, releases_filtered)) | list(map(converters.release_to_db, releases_filtered)) | ||||
with convert_validation_exceptions(): | with convert_validation_exceptions(): | ||||
db.copy_to(releases_filtered, 'tmp_release', db.release_add_cols, | db.copy_to(releases_filtered, 'tmp_release', db.release_add_cols, | ||||
cur) | cur) | ||||
▲ Show 20 Lines • Show All 51 Lines • ▼ Show 20 Lines | def snapshot_add( | ||||
), | ), | ||||
'tmp_snapshot_branch', | 'tmp_snapshot_branch', | ||||
['name', 'target', 'target_type'], | ['name', 'target', 'target_type'], | ||||
cur, | cur, | ||||
) | ) | ||||
except VALIDATION_EXCEPTIONS + (KeyError,) as e: | except VALIDATION_EXCEPTIONS + (KeyError,) as e: | ||||
raise StorageArgumentException(*e.args) | raise StorageArgumentException(*e.args) | ||||
if self.journal_writer: | self.journal_writer.snapshot_add(snapshot) | ||||
self.journal_writer.write_addition('snapshot', snapshot) | |||||
db.snapshot_add(snapshot.id, cur) | db.snapshot_add(snapshot.id, cur) | ||||
count += 1 | count += 1 | ||||
return {'snapshot:add': count} | return {'snapshot:add': count} | ||||
@timed | @timed | ||||
@db_transaction_generator() | @db_transaction_generator() | ||||
▲ Show 20 Lines • Show All 99 Lines • ▼ Show 20 Lines | def origin_visit_add( | ||||
if isinstance(date, str): | if isinstance(date, str): | ||||
# FIXME: Converge on iso8601 at some point | # FIXME: Converge on iso8601 at some point | ||||
date = dateutil.parser.parse(date) | date = dateutil.parser.parse(date) | ||||
with convert_validation_exceptions(): | with convert_validation_exceptions(): | ||||
visit_id = db.origin_visit_add(origin_url, date, type, cur) | visit_id = db.origin_visit_add(origin_url, date, type, cur) | ||||
if self.journal_writer: | |||||
# We can write to the journal only after inserting to the | # We can write to the journal only after inserting to the | ||||
# DB, because we want the id of the visit | # DB, because we want the id of the visit | ||||
self.journal_writer.write_addition('origin_visit', { | visit = { | ||||
'origin': origin_url, 'date': date, 'type': type, | 'origin': origin_url, | ||||
'date': date, | |||||
'type': type, | |||||
'visit': visit_id, | 'visit': visit_id, | ||||
'status': 'ongoing', 'metadata': None, 'snapshot': None}) | 'status': 'ongoing', | ||||
'metadata': None, | |||||
'snapshot': None | |||||
} | |||||
self.journal_writer.origin_visit_add(visit) | |||||
send_metric('origin_visit:add', count=1, method_name='origin_visit') | send_metric('origin_visit:add', count=1, method_name='origin_visit') | ||||
return { | return { | ||||
'origin': origin_url, | 'origin': origin_url, | ||||
'visit': visit_id, | 'visit': visit_id, | ||||
} | } | ||||
@timed | @timed | ||||
Show All 18 Lines | def origin_visit_update(self, origin: str, visit_id: int, | ||||
if status and status != visit['status']: | if status and status != visit['status']: | ||||
updates['status'] = status | updates['status'] = status | ||||
if metadata and metadata != visit['metadata']: | if metadata and metadata != visit['metadata']: | ||||
updates['metadata'] = metadata | updates['metadata'] = metadata | ||||
if snapshot and snapshot != visit['snapshot']: | if snapshot and snapshot != visit['snapshot']: | ||||
updates['snapshot'] = snapshot | updates['snapshot'] = snapshot | ||||
if updates: | if updates: | ||||
if self.journal_writer: | updated_visit = {**visit, **updates} | ||||
self.journal_writer.write_update('origin_visit', { | self.journal_writer.origin_visit_update(updated_visit) | ||||
**visit, **updates}) | |||||
with convert_validation_exceptions(): | with convert_validation_exceptions(): | ||||
db.origin_visit_update(origin_url, visit_id, updates, cur) | db.origin_visit_update(origin_url, visit_id, updates, cur) | ||||
@timed | @timed | ||||
@db_transaction() | @db_transaction() | ||||
def origin_visit_upsert(self, visits, db=None, cur=None): | def origin_visit_upsert(self, visits, db=None, cur=None): | ||||
visits = copy.deepcopy(visits) | visits = copy.deepcopy(visits) | ||||
for visit in visits: | for visit in visits: | ||||
if isinstance(visit['date'], str): | if isinstance(visit['date'], str): | ||||
visit['date'] = dateutil.parser.parse(visit['date']) | visit['date'] = dateutil.parser.parse(visit['date']) | ||||
if not isinstance(visit['origin'], str): | if not isinstance(visit['origin'], str): | ||||
raise StorageArgumentException( | raise StorageArgumentException( | ||||
"visit['origin'] must be a string, not %r" | "visit['origin'] must be a string, not %r" | ||||
% (visit['origin'],)) | % (visit['origin'],)) | ||||
if self.journal_writer: | self.journal_writer.origin_visit_upsert(visits) | ||||
for visit in visits: | |||||
self.journal_writer.write_addition('origin_visit', visit) | |||||
for visit in visits: | for visit in visits: | ||||
# TODO: upsert them all in a single query | # TODO: upsert them all in a single query | ||||
db.origin_visit_upsert(**visit, cur=cur) | db.origin_visit_upsert(**visit, cur=cur) | ||||
@timed | @timed | ||||
@db_transaction_generator(statement_timeout=500) | @db_transaction_generator(statement_timeout=500) | ||||
def origin_visit_get(self, origin, last_visit=None, limit=None, db=None, | def origin_visit_get(self, origin, last_visit=None, limit=None, db=None, | ||||
▲ Show 20 Lines • Show All 146 Lines • ▼ Show 20 Lines | class Storage(): | ||||
@timed | @timed | ||||
@db_transaction() | @db_transaction() | ||||
def origin_add_one(self, origin: Origin, db=None, cur=None) -> str: | def origin_add_one(self, origin: Origin, db=None, cur=None) -> str: | ||||
origin_row = list(db.origin_get_by_url([origin.url], cur))[0] | origin_row = list(db.origin_get_by_url([origin.url], cur))[0] | ||||
origin_url = dict(zip(db.origin_cols, origin_row))['url'] | origin_url = dict(zip(db.origin_cols, origin_row))['url'] | ||||
if origin_url: | if origin_url: | ||||
return origin_url | return origin_url | ||||
if self.journal_writer: | self.journal_writer.origin_add_one(origin) | ||||
self.journal_writer.write_addition('origin', origin) | |||||
origins = db.origin_add(origin.url, cur) | origins = db.origin_add(origin.url, cur) | ||||
send_metric('origin:add', count=len(origins), method_name='origin_add') | send_metric('origin:add', count=len(origins), method_name='origin_add') | ||||
return origins | return origins | ||||
@db_transaction(statement_timeout=500) | @db_transaction(statement_timeout=500) | ||||
def stat_counters(self, db=None, cur=None): | def stat_counters(self, db=None, cur=None): | ||||
return {k: v for (k, v) in db.stat_counters()} | return {k: v for (k, v) in db.stat_counters()} | ||||
▲ Show 20 Lines • Show All 106 Lines • Show Last 20 Lines |