Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/storage.py
# Copyright (C) 2015-2020 The Software Heritage developers | # Copyright (C) 2015-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import contextlib | import contextlib | ||||
import datetime | import datetime | ||||
import itertools | import itertools | ||||
from collections import defaultdict | from collections import defaultdict | ||||
from contextlib import contextmanager | from contextlib import contextmanager | ||||
from deprecated import deprecated | from deprecated import deprecated | ||||
from typing import Any, Dict, Iterable, List, Optional | from typing import Any, Dict, Iterable, List, Optional, Union | ||||
import attr | import attr | ||||
import psycopg2 | import psycopg2 | ||||
import psycopg2.pool | import psycopg2.pool | ||||
import psycopg2.errors | import psycopg2.errors | ||||
from swh.core.api.serializers import msgpack_loads, msgpack_dumps | from swh.core.api.serializers import msgpack_loads, msgpack_dumps | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
Show All 9 Lines | from swh.model.model import ( | ||||
SHA1_SIZE, | SHA1_SIZE, | ||||
) | ) | ||||
from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex | from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex | ||||
from swh.storage.objstorage import ObjStorage | from swh.storage.objstorage import ObjStorage | ||||
from swh.storage.validate import VALIDATION_EXCEPTIONS | from swh.storage.validate import VALIDATION_EXCEPTIONS | ||||
from swh.storage.utils import now | from swh.storage.utils import now | ||||
from . import converters | from . import converters | ||||
from .extrinsic_metadata import ( | |||||
check_extrinsic_metadata_context, | |||||
CONTEXT_KEYS, | |||||
ALL_CONTEXT_KEYS, | |||||
) | |||||
from .common import db_transaction_generator, db_transaction | from .common import db_transaction_generator, db_transaction | ||||
from .db import Db | from .db import Db | ||||
from .exc import StorageArgumentException, StorageDBError, HashCollision | from .exc import StorageArgumentException, StorageDBError, HashCollision | ||||
from .algos import diff | from .algos import diff | ||||
from .metrics import timed, send_metric, process_metrics | from .metrics import timed, send_metric, process_metrics | ||||
from .utils import get_partition_bounds_bytes, extract_collision_hash | from .utils import get_partition_bounds_bytes, extract_collision_hash | ||||
from .writer import JournalWriter | from .writer import JournalWriter | ||||
▲ Show 20 Lines • Show All 310 Lines • ▼ Show 20 Lines | def content_find(self, content, db=None, cur=None): | ||||
sha256=content.get("sha256"), | sha256=content.get("sha256"), | ||||
blake2s256=content.get("blake2s256"), | blake2s256=content.get("blake2s256"), | ||||
cur=cur, | cur=cur, | ||||
) | ) | ||||
return [dict(zip(db.content_find_cols, content)) for content in contents] | return [dict(zip(db.content_find_cols, content)) for content in contents] | ||||
@timed | @timed | ||||
@db_transaction() | @db_transaction() | ||||
def content_metadata_add( | |||||
self, | |||||
id: str, | |||||
context: Dict[str, Union[str, bytes, int]], | |||||
discovery_date: datetime.datetime, | |||||
authority: Dict[str, Any], | |||||
fetcher: Dict[str, Any], | |||||
format: str, | |||||
metadata: bytes, | |||||
db=None, | |||||
cur=None, | |||||
) -> None: | |||||
self._object_metadata_add( | |||||
"content", | |||||
id, | |||||
context, | |||||
discovery_date, | |||||
authority, | |||||
fetcher, | |||||
format, | |||||
metadata, | |||||
db, | |||||
cur, | |||||
) | |||||
@timed | |||||
@db_transaction() | |||||
def content_metadata_get( | |||||
self, | |||||
id: str, | |||||
authority: Dict[str, str], | |||||
after: Optional[datetime.datetime] = None, | |||||
page_token: Optional[bytes] = None, | |||||
limit: int = 1000, | |||||
db=None, | |||||
cur=None, | |||||
) -> Dict[str, Any]: | |||||
return self._object_metadata_get( | |||||
"content", id, authority, after, page_token, limit, db, cur | |||||
) | |||||
@timed | |||||
@db_transaction() | |||||
def content_get_random(self, db=None, cur=None): | def content_get_random(self, db=None, cur=None): | ||||
return db.content_get_random(cur) | return db.content_get_random(cur) | ||||
@staticmethod | @staticmethod | ||||
def _skipped_content_normalize(d): | def _skipped_content_normalize(d): | ||||
d = d.copy() | d = d.copy() | ||||
if d.get("status") is None: | if d.get("status") is None: | ||||
▲ Show 20 Lines • Show All 782 Lines • ▼ Show 20 Lines | def origin_metadata_add( | ||||
discovery_date: datetime.datetime, | discovery_date: datetime.datetime, | ||||
authority: Dict[str, Any], | authority: Dict[str, Any], | ||||
fetcher: Dict[str, Any], | fetcher: Dict[str, Any], | ||||
format: str, | format: str, | ||||
metadata: bytes, | metadata: bytes, | ||||
db=None, | db=None, | ||||
cur=None, | cur=None, | ||||
) -> None: | ) -> None: | ||||
authority_id = db.metadata_authority_get_id( | origin_id = next(iter(list(db.origin_id_get_by_url([origin_url], cur))), None) | ||||
authority["type"], authority["url"], cur | if origin_id is None: | ||||
raise StorageArgumentException(f"Unknown origin {origin_url}") | |||||
context: Dict[str, Union[str, bytes, int]] = {} # origins have no context | |||||
self._object_metadata_add( | |||||
"origin", | |||||
origin_url, | |||||
context, | |||||
discovery_date, | |||||
authority, | |||||
fetcher, | |||||
format, | |||||
metadata, | |||||
db, | |||||
cur, | |||||
) | ) | ||||
if not authority_id: | |||||
raise StorageArgumentException(f"Unknown authority {authority}") | def _object_metadata_add( | ||||
fetcher_id = db.metadata_fetcher_get_id( | self, | ||||
fetcher["name"], fetcher["version"], cur | object_type: str, | ||||
id: str, | |||||
context: Dict[str, Union[str, bytes, int]], | |||||
discovery_date: datetime.datetime, | |||||
authority: Dict[str, Any], | |||||
fetcher: Dict[str, Any], | |||||
format: str, | |||||
metadata: bytes, | |||||
db, | |||||
cur, | |||||
) -> None: | |||||
check_extrinsic_metadata_context(object_type, context) | |||||
authority_id = self._get_authority_id(authority, db, cur) | |||||
fetcher_id = self._get_fetcher_id(fetcher, db, cur) | |||||
if not isinstance(metadata, bytes): | |||||
raise StorageArgumentException( | |||||
"metadata must be bytes, not %r" % (metadata,) | |||||
) | ) | ||||
if not fetcher_id: | |||||
raise StorageArgumentException(f"Unknown fetcher {fetcher}") | db.object_metadata_add( | ||||
try: | object_type, | ||||
db.origin_metadata_add( | id, | ||||
origin_url, | context, | ||||
discovery_date, | discovery_date, | ||||
authority_id, | authority_id, | ||||
fetcher_id, | fetcher_id, | ||||
format, | format, | ||||
metadata, | metadata, | ||||
cur, | cur, | ||||
) | ) | ||||
except psycopg2.ProgrammingError as e: | |||||
raise StorageArgumentException(*e.args) | send_metric( | ||||
send_metric("origin_metadata:add", count=1, method_name="origin_metadata_add") | f"{object_type}_metadata:add", | ||||
count=1, | |||||
method_name=f"{object_type}_metadata_add", | |||||
) | |||||
@timed | @timed | ||||
@db_transaction(statement_timeout=500) | @db_transaction(statement_timeout=500) | ||||
def origin_metadata_get( | def origin_metadata_get( | ||||
self, | self, | ||||
origin_url: str, | origin_url: str, | ||||
authority: Dict[str, str], | authority: Dict[str, str], | ||||
after: Optional[datetime.datetime] = None, | after: Optional[datetime.datetime] = None, | ||||
page_token: Optional[bytes] = None, | page_token: Optional[bytes] = None, | ||||
limit: int = 1000, | limit: int = 1000, | ||||
db=None, | db=None, | ||||
cur=None, | cur=None, | ||||
) -> Dict[str, Any]: | ) -> Dict[str, Any]: | ||||
origin_id = next(iter(list(db.origin_id_get_by_url([origin_url], cur))), None) | |||||
if origin_id is None: | |||||
raise StorageArgumentException(f"Unknown origin {origin_url}") | |||||
result = self._object_metadata_get( | |||||
"origin", origin_url, authority, after, page_token, limit, db, cur | |||||
) | |||||
for res in result["results"]: | |||||
res.pop("id") | |||||
res["origin_url"] = origin_url | |||||
return result | |||||
def _object_metadata_get( | |||||
self, | |||||
object_type: str, | |||||
id: str, | |||||
authority: Dict[str, str], | |||||
after: Optional[datetime.datetime], | |||||
page_token: Optional[bytes], | |||||
limit: int, | |||||
db, | |||||
cur, | |||||
) -> Dict[str, Any]: | |||||
if page_token: | if page_token: | ||||
(after_time, after_fetcher) = msgpack_loads(page_token) | (after_time, after_fetcher) = msgpack_loads(page_token) | ||||
if after and after_time < after: | if after and after_time < after: | ||||
raise StorageArgumentException( | raise StorageArgumentException( | ||||
"page_token is inconsistent with the value of 'after'." | "page_token is inconsistent with the value of 'after'." | ||||
) | ) | ||||
else: | else: | ||||
after_time = after | after_time = after | ||||
after_fetcher = None | after_fetcher = None | ||||
authority_id = db.metadata_authority_get_id( | authority_id = db.metadata_authority_get_id( | ||||
authority["type"], authority["url"], cur | authority["type"], authority["url"], cur | ||||
) | ) | ||||
if not authority_id: | if not authority_id: | ||||
return { | return { | ||||
"next_page_token": None, | "next_page_token": None, | ||||
"results": [], | "results": [], | ||||
} | } | ||||
rows = db.origin_metadata_get( | rows = db.object_metadata_get( | ||||
origin_url, authority_id, after_time, after_fetcher, limit + 1, cur | object_type, id, authority_id, after_time, after_fetcher, limit + 1, cur | ||||
) | ) | ||||
rows = [dict(zip(db.origin_metadata_get_cols, row)) for row in rows] | rows = [dict(zip(db.object_metadata_get_cols, row)) for row in rows] | ||||
results = [] | results = [] | ||||
for row in rows: | for row in rows: | ||||
row = row.copy() | row = row.copy() | ||||
row.pop("metadata_fetcher.id") | row.pop("metadata_fetcher.id") | ||||
results.append( | context = {} | ||||
{ | for key in ALL_CONTEXT_KEYS: | ||||
"origin_url": row.pop("origin.url"), | value = row.pop(key) | ||||
if key in CONTEXT_KEYS[object_type]: | |||||
if value is not None: | |||||
context[key] = value | |||||
result = { | |||||
"authority": { | "authority": { | ||||
"type": row.pop("metadata_authority.type"), | "type": row.pop("metadata_authority.type"), | ||||
"url": row.pop("metadata_authority.url"), | "url": row.pop("metadata_authority.url"), | ||||
}, | }, | ||||
"fetcher": { | "fetcher": { | ||||
"name": row.pop("metadata_fetcher.name"), | "name": row.pop("metadata_fetcher.name"), | ||||
"version": row.pop("metadata_fetcher.version"), | "version": row.pop("metadata_fetcher.version"), | ||||
}, | }, | ||||
**row, | **row, | ||||
} | } | ||||
) | |||||
if CONTEXT_KEYS[object_type]: | |||||
result["context"] = context | |||||
results.append(result) | |||||
if len(results) > limit: | if len(results) > limit: | ||||
results.pop() | results.pop() | ||||
assert len(results) == limit | assert len(results) == limit | ||||
last_returned_row = rows[-2] # rows[-1] corresponds to the popped result | last_returned_row = rows[-2] # rows[-1] corresponds to the popped result | ||||
next_page_token: Optional[bytes] = msgpack_dumps( | next_page_token: Optional[bytes] = msgpack_dumps( | ||||
( | ( | ||||
last_returned_row["discovery_date"], | last_returned_row["discovery_date"], | ||||
▲ Show 20 Lines • Show All 59 Lines • ▼ Show 20 Lines | |||||
def clear_buffers(self, object_types: Optional[Iterable[str]] = None) -> None: | def clear_buffers(self, object_types: Optional[Iterable[str]] = None) -> None: | ||||
"""Do nothing | """Do nothing | ||||
""" | """ | ||||
return None | return None | ||||
def flush(self, object_types: Optional[Iterable[str]] = None) -> Dict: | def flush(self, object_types: Optional[Iterable[str]] = None) -> Dict: | ||||
return {} | return {} | ||||
def _get_authority_id(self, authority: Dict[str, Any], db, cur): | |||||
authority_id = db.metadata_authority_get_id( | |||||
authority["type"], authority["url"], cur | |||||
) | |||||
if not authority_id: | |||||
raise StorageArgumentException(f"Unknown authority {authority}") | |||||
return authority_id | |||||
def _get_fetcher_id(self, fetcher: Dict[str, Any], db, cur): | |||||
fetcher_id = db.metadata_fetcher_get_id( | |||||
fetcher["name"], fetcher["version"], cur | |||||
) | |||||
if not fetcher_id: | |||||
raise StorageArgumentException(f"Unknown fetcher {fetcher}") | |||||
return fetcher_id |