Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/storage.py
Show All 12 Lines | |||||
from contextlib import contextmanager | from contextlib import contextmanager | ||||
from typing import Any, Dict, Mapping, Optional | from typing import Any, Dict, Mapping, Optional | ||||
import dateutil.parser | import dateutil.parser | ||||
import psycopg2 | import psycopg2 | ||||
import psycopg2.pool | import psycopg2.pool | ||||
from swh.core.api import remote_api_endpoint | from swh.core.api import remote_api_endpoint | ||||
from swh.model.hashutil import ALGORITHMS, hash_to_bytes | from swh.model.model import SHA1_SIZE | ||||
from swh.model.hashutil import ALGORITHMS, hash_to_bytes, hash_to_hex | |||||
from swh.objstorage import get_objstorage | from swh.objstorage import get_objstorage | ||||
from swh.objstorage.exc import ObjNotFoundError | from swh.objstorage.exc import ObjNotFoundError | ||||
try: | try: | ||||
from swh.journal.writer import get_journal_writer | from swh.journal.writer import get_journal_writer | ||||
except ImportError: | except ImportError: | ||||
get_journal_writer = None # type: ignore | get_journal_writer = None # type: ignore | ||||
# mypy limitation, see https://github.com/python/mypy/issues/1153 | # mypy limitation, see https://github.com/python/mypy/issues/1153 | ||||
from . import converters | from . import converters | ||||
from .common import db_transaction_generator, db_transaction | from .common import db_transaction_generator, db_transaction | ||||
from .db import Db | from .db import Db | ||||
from .exc import StorageDBError | from .exc import StorageDBError | ||||
from .algos import diff | from .algos import diff | ||||
from .metrics import timed, send_metric, process_metrics | from .metrics import timed, send_metric, process_metrics | ||||
from .utils import get_partition_bounds_bytes | |||||
# Max block size of contents to return | # Max block size of contents to return | ||||
BULK_BLOCK_CONTENT_LEN_MAX = 10000 | BULK_BLOCK_CONTENT_LEN_MAX = 10000 | ||||
EMPTY_SNAPSHOT_ID = hash_to_bytes('1a8893e6a86f444e8be8e7bda6cb34fb1735a00e') | EMPTY_SNAPSHOT_ID = hash_to_bytes('1a8893e6a86f444e8be8e7bda6cb34fb1735a00e') | ||||
"""Identifier for the empty snapshot""" | """Identifier for the empty snapshot""" | ||||
▲ Show 20 Lines • Show All 442 Lines • ▼ Show 20 Lines | def content_get_range(self, start, end, limit=1000, db=None, cur=None): | ||||
next_content = content['sha1'] | next_content = content['sha1'] | ||||
break | break | ||||
contents.append(content) | contents.append(content) | ||||
return { | return { | ||||
'contents': contents, | 'contents': contents, | ||||
'next': next_content, | 'next': next_content, | ||||
} | } | ||||
@remote_api_endpoint('content/partition') | |||||
@timed | |||||
@db_transaction() | |||||
def content_get_partition( | |||||
self, partition_id: int, nb_partitions: int, limit: int = 1000, | |||||
page_token: str = None, db=None, cur=None): | |||||
"""Splits contents into nb_partitions, and returns one of these based on | |||||
partition_id (which must be in [0, nb_partitions-1]) | |||||
There is no guarantee on how the partitioning is done, or the | |||||
result order. | |||||
Args: | |||||
partition_id (int): index of the partition to fetch | |||||
nb_partitions (int): total number of partitions to split into | |||||
limit (int): Limit result (default to 1000) | |||||
page_token (Optional[str]): opaque token used for pagination. | |||||
Returns: | |||||
a dict with keys: | |||||
- contents (List[dict]): iterable of contents in the partition. | |||||
- **next_page_token** (Optional[str]): opaque token to be used as | |||||
`page_token` for retrieving the next page. if absent, there is | |||||
no more pages to gather. | |||||
""" | |||||
if limit is None: | |||||
raise ValueError('Development error: limit should not be None') | |||||
(start, end) = get_partition_bounds_bytes( | |||||
partition_id, nb_partitions, SHA1_SIZE) | |||||
if page_token: | |||||
start = hash_to_bytes(page_token) | |||||
if end is None: | |||||
end = b'\xff'*SHA1_SIZE | |||||
result = self.content_get_range(start, end, limit) | |||||
result2 = { | |||||
'contents': result['contents'], | |||||
'next_page_token': None, | |||||
} | |||||
if result['next']: | |||||
result2['next_page_token'] = hash_to_hex(result['next']) | |||||
return result2 | |||||
@remote_api_endpoint('content/metadata') | @remote_api_endpoint('content/metadata') | ||||
@timed | @timed | ||||
@db_transaction_generator(statement_timeout=500) | @db_transaction_generator(statement_timeout=500) | ||||
def content_get_metadata(self, content, db=None, cur=None): | def content_get_metadata(self, content, db=None, cur=None): | ||||
"""Retrieve content metadata in bulk | """Retrieve content metadata in bulk | ||||
Args: | Args: | ||||
content: iterable of content identifiers (sha1) | content: iterable of content identifiers (sha1) | ||||
▲ Show 20 Lines • Show All 1,585 Lines • Show Last 20 Lines |