Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/storage.py
Show All 15 Lines | |||||
import psycopg2 | import psycopg2 | ||||
import psycopg2.pool | import psycopg2.pool | ||||
from . import converters | from . import converters | ||||
from .common import db_transaction_generator, db_transaction | from .common import db_transaction_generator, db_transaction | ||||
from .db import Db | from .db import Db | ||||
from .exc import StorageDBError | from .exc import StorageDBError | ||||
from .algos import diff | from .algos import diff | ||||
from .utils import get_partition_bounds_bytes | |||||
from swh.model.hashutil import ALGORITHMS, hash_to_bytes | from swh.model.model import SHA1_SIZE | ||||
from swh.model.hashutil import ALGORITHMS, hash_to_bytes, hash_to_hex | |||||
from swh.objstorage import get_objstorage | from swh.objstorage import get_objstorage | ||||
from swh.objstorage.exc import ObjNotFoundError | from swh.objstorage.exc import ObjNotFoundError | ||||
try: | try: | ||||
from swh.journal.writer import get_journal_writer | from swh.journal.writer import get_journal_writer | ||||
except ImportError: | except ImportError: | ||||
get_journal_writer = None # type: ignore | get_journal_writer = None # type: ignore | ||||
# mypy limitation, see https://github.com/python/mypy/issues/1153 | # mypy limitation, see https://github.com/python/mypy/issues/1153 | ||||
▲ Show 20 Lines • Show All 435 Lines • ▼ Show 20 Lines | def content_get_range(self, start, end, limit=1000, db=None, cur=None): | ||||
next_content = content['sha1'] | next_content = content['sha1'] | ||||
break | break | ||||
contents.append(content) | contents.append(content) | ||||
return { | return { | ||||
'contents': contents, | 'contents': contents, | ||||
'next': next_content, | 'next': next_content, | ||||
} | } | ||||
@db_transaction() | |||||
def content_get_partition( | |||||
self, partition_id, nb_partitions, limit=1000, page_token=None, | |||||
db=None, cur=None): | |||||
"""Splits contents into nb_partitions, and returns one of these based on | |||||
partition_id (which must be between 0 and nb_partitions-1 inclusive) | |||||
There is no guarantee on how the partitioning is done, or the order | |||||
of results. | |||||
Args: | |||||
partition_id (int): index of the partition to fetch | |||||
nb_partitions (int): total number of partitions to split into | |||||
limit (int): Limit result (default to 1000) | |||||
page_token (Optional[str]): opaque token used for pagination. | |||||
Returns: | |||||
a dict with keys: | |||||
- contents (List[dict]): iterable of contents in the partition. | |||||
- **next_page_token** (Optional[str]): opaque token to be used as | |||||
`page_token` for retrieveing the next page. if absent, there is | |||||
no more pages to gather. | |||||
""" | |||||
if limit is None: | |||||
raise ValueError('Development error: limit should not be None') | |||||
(start, end) = get_partition_bounds_bytes( | |||||
partition_id, nb_partitions, SHA1_SIZE) | |||||
if page_token: | |||||
start = hash_to_bytes(page_token) | |||||
if end is None: | |||||
end = b'\xff'*SHA1_SIZE | |||||
result = self.content_get_range(start, end, limit) | |||||
result2 = { | |||||
'contents': result['contents'], | |||||
'next_page_token': None, | |||||
} | |||||
if result['next']: | |||||
result2['next_page_token'] = hash_to_hex(result['next']) | |||||
return result2 | |||||
@db_transaction_generator(statement_timeout=500) | @db_transaction_generator(statement_timeout=500) | ||||
def content_get_metadata(self, content, db=None, cur=None): | def content_get_metadata(self, content, db=None, cur=None): | ||||
"""Retrieve content metadata in bulk | """Retrieve content metadata in bulk | ||||
Args: | Args: | ||||
content: iterable of content identifiers (sha1) | content: iterable of content identifiers (sha1) | ||||
Returns: | Returns: | ||||
▲ Show 20 Lines • Show All 1,342 Lines • Show Last 20 Lines |