Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/storage.py
Show All 19 Lines | |||||
import psycopg2.pool | import psycopg2.pool | ||||
import psycopg2.errors | import psycopg2.errors | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
SkippedContent, Content, Directory, Revision, Release, | SkippedContent, Content, Directory, Revision, Release, | ||||
Snapshot, Origin, SHA1_SIZE | Snapshot, Origin, SHA1_SIZE | ||||
) | ) | ||||
from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex | from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex | ||||
from swh.objstorage import get_objstorage | |||||
from swh.objstorage.exc import ObjNotFoundError | |||||
try: | try: | ||||
from swh.journal.writer import get_journal_writer | from swh.journal.writer import get_journal_writer | ||||
except ImportError: | except ImportError: | ||||
get_journal_writer = None # type: ignore | get_journal_writer = None # type: ignore | ||||
# mypy limitation, see https://github.com/python/mypy/issues/1153 | # mypy limitation, see https://github.com/python/mypy/issues/1153 | ||||
from swh.storage.objstorage import ObjStorage | |||||
from . import converters, HashCollision | from . import converters, HashCollision | ||||
from .common import db_transaction_generator, db_transaction | from .common import db_transaction_generator, db_transaction | ||||
from .db import Db | from .db import Db | ||||
from .exc import StorageArgumentException, StorageDBError | from .exc import StorageArgumentException, StorageDBError | ||||
from .algos import diff | from .algos import diff | ||||
from .metrics import timed, send_metric, process_metrics | from .metrics import timed, send_metric, process_metrics | ||||
from .utils import get_partition_bounds_bytes | from .utils import get_partition_bounds_bytes | ||||
▲ Show 20 Lines • Show All 48 Lines • ▼ Show 20 Lines | def __init__(self, db, objstorage, min_pool_conns=1, max_pool_conns=10, | ||||
else: | else: | ||||
self._pool = psycopg2.pool.ThreadedConnectionPool( | self._pool = psycopg2.pool.ThreadedConnectionPool( | ||||
min_pool_conns, max_pool_conns, db | min_pool_conns, max_pool_conns, db | ||||
) | ) | ||||
self._db = None | self._db = None | ||||
except psycopg2.OperationalError as e: | except psycopg2.OperationalError as e: | ||||
raise StorageDBError(e) | raise StorageDBError(e) | ||||
self.objstorage = get_objstorage(**objstorage) | |||||
if journal_writer: | if journal_writer: | ||||
if get_journal_writer is None: | if get_journal_writer is None: | ||||
raise EnvironmentError( | raise EnvironmentError( | ||||
'You need the swh.journal package to use the ' | 'You need the swh.journal package to use the ' | ||||
'journal_writer feature') | 'journal_writer feature') | ||||
self.journal_writer = get_journal_writer(**journal_writer) | self.journal_writer = get_journal_writer(**journal_writer) | ||||
else: | else: | ||||
self.journal_writer = None | self.journal_writer = None | ||||
self.objstorage = ObjStorage(objstorage) | |||||
def get_db(self): | def get_db(self): | ||||
if self._db: | if self._db: | ||||
return self._db | return self._db | ||||
else: | else: | ||||
return Db.from_pool(self._pool) | return Db.from_pool(self._pool) | ||||
def put_db(self, db): | def put_db(self, db): | ||||
▲ Show 20 Lines • Show All 86 Lines • ▼ Show 20 Lines | def content_add( | ||||
def add_to_objstorage(): | def add_to_objstorage(): | ||||
"""Add to objstorage the new missing_content | """Add to objstorage the new missing_content | ||||
Returns: | Returns: | ||||
Sum of all the content's data length pushed to the | Sum of all the content's data length pushed to the | ||||
objstorage. Content present twice is only sent once. | objstorage. Content present twice is only sent once. | ||||
""" | """ | ||||
content_bytes_added = 0 | summary = self.objstorage.content_add(content) | ||||
data = {} | return summary['content:add:bytes'] | ||||
for cont in content: | |||||
if cont.sha1 not in data: | |||||
data[cont.sha1] = cont.data | |||||
content_bytes_added += max(0, cont.length) | |||||
# FIXME: Since we do the filtering anyway now, we might as | |||||
# well make the objstorage's add_batch call return what we | |||||
# want here (real bytes added)... that'd simplify this... | |||||
self.objstorage.add_batch(data) | |||||
return content_bytes_added | |||||
with ThreadPoolExecutor(max_workers=1) as executor: | with ThreadPoolExecutor(max_workers=1) as executor: | ||||
added_to_objstorage = executor.submit(add_to_objstorage) | added_to_objstorage = executor.submit(add_to_objstorage) | ||||
self._content_add_metadata(db, cur, content) | self._content_add_metadata(db, cur, content) | ||||
# Wait for objstorage addition before returning from the | # Wait for objstorage addition before returning from the | ||||
# transaction, bubbling up any exception | # transaction, bubbling up any exception | ||||
▲ Show 20 Lines • Show All 43 Lines • ▼ Show 20 Lines | def content_add_metadata(self, content: Iterable[Content], | ||||
} | } | ||||
@timed | @timed | ||||
def content_get(self, content): | def content_get(self, content): | ||||
# FIXME: Make this method support slicing the `data`. | # FIXME: Make this method support slicing the `data`. | ||||
if len(content) > BULK_BLOCK_CONTENT_LEN_MAX: | if len(content) > BULK_BLOCK_CONTENT_LEN_MAX: | ||||
raise StorageArgumentException( | raise StorageArgumentException( | ||||
"Send at maximum %s contents." % BULK_BLOCK_CONTENT_LEN_MAX) | "Send at maximum %s contents." % BULK_BLOCK_CONTENT_LEN_MAX) | ||||
yield from self.objstorage.content_get(content) | |||||
for obj_id in content: | |||||
try: | |||||
data = self.objstorage.get(obj_id) | |||||
except ObjNotFoundError: | |||||
yield None | |||||
continue | |||||
yield {'sha1': obj_id, 'data': data} | |||||
@timed | @timed | ||||
@db_transaction() | @db_transaction() | ||||
def content_get_range(self, start, end, limit=1000, db=None, cur=None): | def content_get_range(self, start, end, limit=1000, db=None, cur=None): | ||||
if limit is None: | if limit is None: | ||||
raise StorageArgumentException('limit should not be None') | raise StorageArgumentException('limit should not be None') | ||||
contents = [] | contents = [] | ||||
next_content = None | next_content = None | ||||
▲ Show 20 Lines • Show All 895 Lines • Show Last 20 Lines |