Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/storage.py
# Copyright (C) 2015-2020 The Software Heritage developers | # Copyright (C) 2015-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import contextlib | import contextlib | ||||
import datetime | import datetime | ||||
import itertools | import itertools | ||||
from collections import defaultdict | from collections import defaultdict | ||||
from contextlib import contextmanager | from contextlib import contextmanager | ||||
from deprecated import deprecated | from deprecated import deprecated | ||||
from typing import Any, Dict, Iterable, List, Optional | from typing import Any, Dict, Iterable, List, Optional, Union | ||||
import attr | import attr | ||||
import psycopg2 | import psycopg2 | ||||
import psycopg2.pool | import psycopg2.pool | ||||
import psycopg2.errors | import psycopg2.errors | ||||
from swh.core.api.serializers import msgpack_loads, msgpack_dumps | from swh.core.api.serializers import msgpack_loads, msgpack_dumps | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
Show All 9 Lines | from swh.model.model import ( | ||||
SHA1_SIZE, | SHA1_SIZE, | ||||
) | ) | ||||
from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex | from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex | ||||
from swh.storage.objstorage import ObjStorage | from swh.storage.objstorage import ObjStorage | ||||
from swh.storage.validate import VALIDATION_EXCEPTIONS | from swh.storage.validate import VALIDATION_EXCEPTIONS | ||||
from swh.storage.utils import now | from swh.storage.utils import now | ||||
from . import converters | from . import converters | ||||
from .extrinsic_metadata import ( | |||||
check_extrinsic_metadata_context, | |||||
CONTEXT_KEYS, | |||||
ALL_CONTEXT_KEYS, | |||||
) | |||||
from .common import db_transaction_generator, db_transaction | from .common import db_transaction_generator, db_transaction | ||||
from .db import Db | from .db import Db | ||||
from .exc import StorageArgumentException, StorageDBError, HashCollision | from .exc import StorageArgumentException, StorageDBError, HashCollision | ||||
from .algos import diff | from .algos import diff | ||||
from .metrics import timed, send_metric, process_metrics | from .metrics import timed, send_metric, process_metrics | ||||
from .utils import get_partition_bounds_bytes, extract_collision_hash | from .utils import get_partition_bounds_bytes, extract_collision_hash | ||||
from .writer import JournalWriter | from .writer import JournalWriter | ||||
▲ Show 20 Lines • Show All 1,110 Lines • ▼ Show 20 Lines | def origin_metadata_add( | ||||
discovery_date: datetime.datetime, | discovery_date: datetime.datetime, | ||||
authority: Dict[str, Any], | authority: Dict[str, Any], | ||||
fetcher: Dict[str, Any], | fetcher: Dict[str, Any], | ||||
format: str, | format: str, | ||||
metadata: bytes, | metadata: bytes, | ||||
db=None, | db=None, | ||||
cur=None, | cur=None, | ||||
) -> None: | ) -> None: | ||||
context: Dict[str, Union[str, bytes, int]] = {} # origins have no context | |||||
self._object_metadata_add( | self._object_metadata_add( | ||||
"origin", | "origin", | ||||
origin_url, | origin_url, | ||||
context, | |||||
discovery_date, | discovery_date, | ||||
authority, | authority, | ||||
fetcher, | fetcher, | ||||
format, | format, | ||||
metadata, | metadata, | ||||
db, | db, | ||||
cur, | cur, | ||||
) | ) | ||||
def _object_metadata_add( | def _object_metadata_add( | ||||
self, | self, | ||||
object_type: str, | object_type: str, | ||||
id: str, | id: str, | ||||
context: Dict[str, Union[str, bytes, int]], | |||||
discovery_date: datetime.datetime, | discovery_date: datetime.datetime, | ||||
authority: Dict[str, Any], | authority: Dict[str, Any], | ||||
fetcher: Dict[str, Any], | fetcher: Dict[str, Any], | ||||
format: str, | format: str, | ||||
metadata: bytes, | metadata: bytes, | ||||
db, | db, | ||||
cur, | cur, | ||||
) -> None: | ) -> None: | ||||
check_extrinsic_metadata_context(object_type, context) | |||||
authority_id = self._get_authority_id(authority, db, cur) | authority_id = self._get_authority_id(authority, db, cur) | ||||
fetcher_id = self._get_fetcher_id(fetcher, db, cur) | fetcher_id = self._get_fetcher_id(fetcher, db, cur) | ||||
if not isinstance(metadata, bytes): | if not isinstance(metadata, bytes): | ||||
raise StorageArgumentException( | raise StorageArgumentException( | ||||
"metadata must be bytes, not %r" % (metadata,) | "metadata must be bytes, not %r" % (metadata,) | ||||
) | ) | ||||
db.object_metadata_add( | db.object_metadata_add( | ||||
object_type, | object_type, | ||||
id, | id, | ||||
context, | |||||
discovery_date, | discovery_date, | ||||
authority_id, | authority_id, | ||||
fetcher_id, | fetcher_id, | ||||
format, | format, | ||||
metadata, | metadata, | ||||
cur, | cur, | ||||
) | ) | ||||
▲ Show 20 Lines • Show All 58 Lines • ▼ Show 20 Lines | ) -> Dict[str, Any]: | ||||
rows = db.object_metadata_get( | rows = db.object_metadata_get( | ||||
object_type, id, authority_id, after_time, after_fetcher, limit + 1, cur | object_type, id, authority_id, after_time, after_fetcher, limit + 1, cur | ||||
) | ) | ||||
rows = [dict(zip(db.object_metadata_get_cols, row)) for row in rows] | rows = [dict(zip(db.object_metadata_get_cols, row)) for row in rows] | ||||
results = [] | results = [] | ||||
for row in rows: | for row in rows: | ||||
row = row.copy() | row = row.copy() | ||||
row.pop("metadata_fetcher.id") | row.pop("metadata_fetcher.id") | ||||
context = {} | |||||
for key in ALL_CONTEXT_KEYS: | |||||
value = row.pop(key) | |||||
if key in CONTEXT_KEYS[object_type]: | |||||
if value is not None: | |||||
context[key] = value | |||||
result = { | result = { | ||||
"authority": { | "authority": { | ||||
"type": row.pop("metadata_authority.type"), | "type": row.pop("metadata_authority.type"), | ||||
"url": row.pop("metadata_authority.url"), | "url": row.pop("metadata_authority.url"), | ||||
}, | }, | ||||
"fetcher": { | "fetcher": { | ||||
"name": row.pop("metadata_fetcher.name"), | "name": row.pop("metadata_fetcher.name"), | ||||
"version": row.pop("metadata_fetcher.version"), | "version": row.pop("metadata_fetcher.version"), | ||||
}, | }, | ||||
**row, | **row, | ||||
} | } | ||||
if CONTEXT_KEYS[object_type]: | |||||
result["context"] = context | |||||
results.append(result) | results.append(result) | ||||
if len(results) > limit: | if len(results) > limit: | ||||
results.pop() | results.pop() | ||||
assert len(results) == limit | assert len(results) == limit | ||||
last_returned_row = rows[-2] # rows[-1] corresponds to the popped result | last_returned_row = rows[-2] # rows[-1] corresponds to the popped result | ||||
next_page_token: Optional[bytes] = msgpack_dumps( | next_page_token: Optional[bytes] = msgpack_dumps( | ||||
( | ( | ||||
▲ Show 20 Lines • Show All 84 Lines • Show Last 20 Lines |