Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/storage.py
# Copyright (C) 2015-2020 The Software Heritage developers | # Copyright (C) 2015-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import contextlib | import contextlib | ||||
import datetime | import datetime | ||||
import itertools | import itertools | ||||
from collections import defaultdict | from collections import defaultdict | ||||
from contextlib import contextmanager | from contextlib import contextmanager | ||||
from deprecated import deprecated | from deprecated import deprecated | ||||
from typing import Any, Dict, Iterable, List, Optional, Union | from typing import ( | ||||
Any, | |||||
Counter, | |||||
Dict, | |||||
Iterable, | |||||
List, | |||||
Optional, | |||||
Union, | |||||
) | |||||
import attr | import attr | ||||
import psycopg2 | import psycopg2 | ||||
import psycopg2.pool | import psycopg2.pool | ||||
import psycopg2.errors | import psycopg2.errors | ||||
from swh.core.api.serializers import msgpack_loads, msgpack_dumps | from swh.core.api.serializers import msgpack_loads, msgpack_dumps | ||||
from swh.model.identifiers import parse_swhid, SWHID | |||||
from swh.model.model import ( | from swh.model.model import ( | ||||
Content, | Content, | ||||
Directory, | Directory, | ||||
Origin, | Origin, | ||||
OriginVisit, | OriginVisit, | ||||
OriginVisitStatus, | OriginVisitStatus, | ||||
Revision, | Revision, | ||||
Release, | Release, | ||||
SkippedContent, | SkippedContent, | ||||
Snapshot, | Snapshot, | ||||
SHA1_SIZE, | SHA1_SIZE, | ||||
MetadataAuthority, | |||||
MetadataAuthorityType, | |||||
MetadataFetcher, | |||||
MetadataTargetType, | |||||
RawExtrinsicMetadata, | |||||
) | ) | ||||
from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex | from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex | ||||
from swh.storage.objstorage import ObjStorage | from swh.storage.objstorage import ObjStorage | ||||
from swh.storage.validate import VALIDATION_EXCEPTIONS | from swh.storage.validate import VALIDATION_EXCEPTIONS | ||||
from swh.storage.utils import now | from swh.storage.utils import now | ||||
from . import converters | from . import converters | ||||
from .extrinsic_metadata import ( | |||||
check_extrinsic_metadata_context, | |||||
CONTEXT_KEYS, | |||||
) | |||||
from .common import db_transaction_generator, db_transaction | from .common import db_transaction_generator, db_transaction | ||||
from .db import Db | from .db import Db | ||||
from .exc import StorageArgumentException, StorageDBError, HashCollision | from .exc import StorageArgumentException, StorageDBError, HashCollision | ||||
from .algos import diff | from .algos import diff | ||||
from .metrics import timed, send_metric, process_metrics | from .metrics import timed, send_metric, process_metrics | ||||
from .utils import get_partition_bounds_bytes, extract_collision_hash | from .utils import get_partition_bounds_bytes, extract_collision_hash, map_optional | ||||
from .writer import JournalWriter | from .writer import JournalWriter | ||||
# Max block size of contents to return | # Max block size of contents to return | ||||
BULK_BLOCK_CONTENT_LEN_MAX = 10000 | BULK_BLOCK_CONTENT_LEN_MAX = 10000 | ||||
EMPTY_SNAPSHOT_ID = hash_to_bytes("1a8893e6a86f444e8be8e7bda6cb34fb1735a00e") | EMPTY_SNAPSHOT_ID = hash_to_bytes("1a8893e6a86f444e8be8e7bda6cb34fb1735a00e") | ||||
"""Identifier for the empty snapshot""" | """Identifier for the empty snapshot""" | ||||
▲ Show 20 Lines • Show All 1,067 Lines • ▼ Show 20 Lines | def refresh_stat_counters(self, db=None, cur=None): | ||||
"revision_history", | "revision_history", | ||||
"skipped_content", | "skipped_content", | ||||
"snapshot", | "snapshot", | ||||
] | ] | ||||
for key in keys: | for key in keys: | ||||
cur.execute("select * from swh_update_counter(%s)", (key,)) | cur.execute("select * from swh_update_counter(%s)", (key,)) | ||||
@timed | |||||
@db_transaction() | @db_transaction() | ||||
def content_metadata_add( | def object_metadata_add( | ||||
self, | self, metadata: Iterable[RawExtrinsicMetadata], db, cur, | ||||
ardumont: I only realize now, make it returns its dict stats like most other add endpoints.
yeah, i know… | |||||
id: str, | |||||
context: Dict[str, Union[str, bytes, int]], | |||||
discovery_date: datetime.datetime, | |||||
authority: Dict[str, Any], | |||||
fetcher: Dict[str, Any], | |||||
format: str, | |||||
metadata: bytes, | |||||
db=None, | |||||
cur=None, | |||||
) -> None: | ) -> None: | ||||
self._object_metadata_add( | counter = Counter[MetadataTargetType]() | ||||
"content", | for metadata_entry in metadata: | ||||
id, | authority_id = self._get_authority_id(metadata_entry.authority, db, cur) | ||||
context, | fetcher_id = self._get_fetcher_id(metadata_entry.fetcher, db, cur) | ||||
discovery_date, | |||||
authority, | |||||
fetcher, | |||||
format, | |||||
metadata, | |||||
db, | |||||
cur, | |||||
) | |||||
@timed | db.object_metadata_add( | ||||
@db_transaction() | object_type=metadata_entry.type.value, | ||||
def content_metadata_get( | id=str(metadata_entry.id), | ||||
self, | discovery_date=metadata_entry.discovery_date, | ||||
id: str, | authority_id=authority_id, | ||||
authority: Dict[str, str], | fetcher_id=fetcher_id, | ||||
after: Optional[datetime.datetime] = None, | format=metadata_entry.format, | ||||
page_token: Optional[bytes] = None, | metadata=metadata_entry.metadata, | ||||
limit: int = 1000, | origin=metadata_entry.origin, | ||||
db=None, | visit=metadata_entry.visit, | ||||
cur=None, | snapshot=map_optional(str, metadata_entry.snapshot), | ||||
) -> Dict[str, Any]: | release=map_optional(str, metadata_entry.release), | ||||
return self._object_metadata_get( | revision=map_optional(str, metadata_entry.revision), | ||||
"content", id, authority, after, page_token, limit, db, cur | path=metadata_entry.path, | ||||
directory=map_optional(str, metadata_entry.directory), | |||||
cur=cur, | |||||
) | ) | ||||
counter[metadata_entry.type] += 1 | |||||
@timed | for (object_type, count) in counter.items(): | ||||
@db_transaction() | send_metric( | ||||
def origin_metadata_add( | f"{object_type.value}_metadata:add", | ||||
self, | count=count, | ||||
origin_url: str, | method_name=f"{object_type.value}_metadata_add", | ||||
discovery_date: datetime.datetime, | |||||
authority: Dict[str, Any], | |||||
fetcher: Dict[str, Any], | |||||
format: str, | |||||
metadata: bytes, | |||||
db=None, | |||||
cur=None, | |||||
) -> None: | |||||
context: Dict[str, Union[str, bytes, int]] = {} # origins have no context | |||||
self._object_metadata_add( | |||||
"origin", | |||||
origin_url, | |||||
context, | |||||
discovery_date, | |||||
authority, | |||||
fetcher, | |||||
format, | |||||
metadata, | |||||
db, | |||||
cur, | |||||
) | ) | ||||
@timed | @db_transaction() | ||||
@db_transaction(statement_timeout=500) | def object_metadata_get( | ||||
def origin_metadata_get( | |||||
self, | self, | ||||
origin_url: str, | object_type: MetadataTargetType, | ||||
authority: Dict[str, str], | id: Union[str, SWHID], | ||||
authority: MetadataAuthority, | |||||
after: Optional[datetime.datetime] = None, | after: Optional[datetime.datetime] = None, | ||||
page_token: Optional[bytes] = None, | page_token: Optional[bytes] = None, | ||||
limit: int = 1000, | limit: int = 1000, | ||||
db=None, | db=None, | ||||
cur=None, | cur=None, | ||||
) -> Dict[str, Any]: | ) -> Dict[str, Union[Optional[bytes], List[RawExtrinsicMetadata]]]: | ||||
result = self._object_metadata_get( | if object_type == MetadataTargetType.ORIGIN: | ||||
"origin", origin_url, authority, after, page_token, limit, db, cur | if isinstance(id, SWHID): | ||||
) | |||||
for res in result["results"]: | |||||
res.pop("id") | |||||
res["origin_url"] = origin_url | |||||
return result | |||||
def _object_metadata_add( | |||||
self, | |||||
object_type: str, | |||||
id: str, | |||||
context: Dict[str, Union[str, bytes, int]], | |||||
discovery_date: datetime.datetime, | |||||
authority: Dict[str, Any], | |||||
fetcher: Dict[str, Any], | |||||
format: str, | |||||
metadata: bytes, | |||||
db, | |||||
cur, | |||||
) -> None: | |||||
check_extrinsic_metadata_context(object_type, context) | |||||
authority_id = self._get_authority_id(authority, db, cur) | |||||
fetcher_id = self._get_fetcher_id(fetcher, db, cur) | |||||
if not isinstance(metadata, bytes): | |||||
raise StorageArgumentException( | raise StorageArgumentException( | ||||
"metadata must be bytes, not %r" % (metadata,) | f"object_metadata_get called with object_type='origin', but " | ||||
) | f"provided id is an SWHID: {id!r}" | ||||
db.object_metadata_add( | |||||
object_type, | |||||
id, | |||||
context, | |||||
discovery_date, | |||||
authority_id, | |||||
fetcher_id, | |||||
format, | |||||
metadata, | |||||
cur, | |||||
) | ) | ||||
else: | |||||
send_metric( | if not isinstance(id, SWHID): | ||||
f"{object_type}_metadata:add", | raise StorageArgumentException( | ||||
count=1, | f"object_metadata_get called with object_type!='origin', but " | ||||
method_name=f"{object_type}_metadata_add", | f"provided id is not an SWHID: {id!r}" | ||||
) | ) | ||||
def _object_metadata_get( | |||||
self, | |||||
object_type: str, | |||||
id: str, | |||||
authority: Dict[str, str], | |||||
after: Optional[datetime.datetime], | |||||
page_token: Optional[bytes], | |||||
limit: int, | |||||
db, | |||||
cur, | |||||
) -> Dict[str, Any]: | |||||
if page_token: | if page_token: | ||||
(after_time, after_fetcher) = msgpack_loads(page_token) | (after_time, after_fetcher) = msgpack_loads(page_token) | ||||
if after and after_time < after: | if after and after_time < after: | ||||
raise StorageArgumentException( | raise StorageArgumentException( | ||||
"page_token is inconsistent with the value of 'after'." | "page_token is inconsistent with the value of 'after'." | ||||
) | ) | ||||
else: | else: | ||||
after_time = after | after_time = after | ||||
after_fetcher = None | after_fetcher = None | ||||
authority_id = db.metadata_authority_get_id( | authority_id = self._get_authority_id(authority, db, cur) | ||||
authority["type"], authority["url"], cur | |||||
) | |||||
if not authority_id: | if not authority_id: | ||||
return { | return { | ||||
"next_page_token": None, | "next_page_token": None, | ||||
"results": [], | "results": [], | ||||
} | } | ||||
rows = db.object_metadata_get( | rows = db.object_metadata_get( | ||||
object_type, id, authority_id, after_time, after_fetcher, limit + 1, cur | object_type, | ||||
str(id), | |||||
authority_id, | |||||
after_time, | |||||
after_fetcher, | |||||
limit + 1, | |||||
cur, | |||||
) | ) | ||||
rows = [dict(zip(db.object_metadata_get_cols, row)) for row in rows] | rows = [dict(zip(db.object_metadata_get_cols, row)) for row in rows] | ||||
results = [] | results = [] | ||||
for row in rows: | for row in rows: | ||||
row = row.copy() | row = row.copy() | ||||
row.pop("metadata_fetcher.id") | row.pop("metadata_fetcher.id") | ||||
context = {} | |||||
for key in CONTEXT_KEYS[object_type]: | |||||
value = row[key] | |||||
if value is not None: | |||||
context[key] = value | |||||
result = { | |||||
"id": row["id"], | |||||
"authority": { | |||||
"type": row.pop("metadata_authority.type"), | |||||
"url": row.pop("metadata_authority.url"), | |||||
}, | |||||
"fetcher": { | |||||
"name": row.pop("metadata_fetcher.name"), | |||||
"version": row.pop("metadata_fetcher.version"), | |||||
}, | |||||
"discovery_date": row["discovery_date"], | |||||
"format": row["format"], | |||||
"metadata": row["metadata"], | |||||
} | |||||
if CONTEXT_KEYS[object_type]: | assert str(id) == row["object_metadata.id"] | ||||
result["context"] = context | |||||
result = RawExtrinsicMetadata( | |||||
type=MetadataTargetType(row["object_metadata.type"]), | |||||
id=id, | |||||
authority=MetadataAuthority( | |||||
type=MetadataAuthorityType(row["metadata_authority.type"]), | |||||
url=row["metadata_authority.url"], | |||||
), | |||||
fetcher=MetadataFetcher( | |||||
name=row["metadata_fetcher.name"], | |||||
version=row["metadata_fetcher.version"], | |||||
), | |||||
discovery_date=row["discovery_date"], | |||||
format=row["format"], | |||||
metadata=row["object_metadata.metadata"], | |||||
origin=row["origin"], | |||||
visit=row["visit"], | |||||
snapshot=map_optional(parse_swhid, row["snapshot"]), | |||||
release=map_optional(parse_swhid, row["release"]), | |||||
revision=map_optional(parse_swhid, row["revision"]), | |||||
path=row["path"], | |||||
directory=map_optional(parse_swhid, row["directory"]), | |||||
) | |||||
results.append(result) | results.append(result) | ||||
if len(results) > limit: | if len(results) > limit: | ||||
results.pop() | results.pop() | ||||
assert len(results) == limit | assert len(results) == limit | ||||
last_returned_row = rows[-2] # rows[-1] corresponds to the popped result | last_returned_row = rows[-2] # rows[-1] corresponds to the popped result | ||||
next_page_token: Optional[bytes] = msgpack_dumps( | next_page_token: Optional[bytes] = msgpack_dumps( | ||||
( | ( | ||||
last_returned_row["discovery_date"], | last_returned_row["discovery_date"], | ||||
last_returned_row["metadata_fetcher.id"], | last_returned_row["metadata_fetcher.id"], | ||||
) | ) | ||||
) | ) | ||||
else: | else: | ||||
next_page_token = None | next_page_token = None | ||||
return { | return { | ||||
"next_page_token": next_page_token, | "next_page_token": next_page_token, | ||||
"results": results, | "results": results, | ||||
} | } | ||||
@timed | @timed | ||||
@db_transaction() | @db_transaction() | ||||
def metadata_fetcher_add( | def metadata_fetcher_add( | ||||
self, name: str, version: str, metadata: Dict[str, Any], db=None, cur=None | self, fetchers: Iterable[MetadataFetcher], db=None, cur=None | ||||
) -> None: | ) -> None: | ||||
db.metadata_fetcher_add(name, version, metadata) | for (i, fetcher) in enumerate(fetchers): | ||||
send_metric("metadata_fetcher:add", count=1, method_name="metadata_fetcher") | if fetcher.metadata is None: | ||||
raise StorageArgumentException( | |||||
"MetadataFetcher.metadata may not be None in metadata_fetcher_add." | |||||
) | |||||
db.metadata_fetcher_add( | |||||
fetcher.name, fetcher.version, dict(fetcher.metadata), cur=cur | |||||
) | |||||
Not Done Inline Actionssame remark about metrics ardumont: same remark about metrics | |||||
send_metric("metadata_fetcher:add", count=i + 1, method_name="metadata_fetcher") | |||||
@timed | @timed | ||||
@db_transaction(statement_timeout=500) | @db_transaction(statement_timeout=500) | ||||
def metadata_fetcher_get( | def metadata_fetcher_get( | ||||
self, name: str, version: str, db=None, cur=None | self, name: str, version: str, db=None, cur=None | ||||
) -> Optional[Dict[str, Any]]: | ) -> Optional[MetadataFetcher]: | ||||
row = db.metadata_fetcher_get(name, version, cur=cur) | row = db.metadata_fetcher_get(name, version, cur=cur) | ||||
if not row: | if not row: | ||||
return None | return None | ||||
return dict(zip(db.metadata_fetcher_cols, row)) | return MetadataFetcher.from_dict(dict(zip(db.metadata_fetcher_cols, row))) | ||||
@timed | @timed | ||||
@db_transaction() | @db_transaction() | ||||
def metadata_authority_add( | def metadata_authority_add( | ||||
self, type: str, url: str, metadata: Dict[str, Any], db=None, cur=None | self, authorities: Iterable[MetadataAuthority], db=None, cur=None | ||||
) -> None: | ) -> None: | ||||
db.metadata_authority_add(type, url, metadata, cur) | for (i, authority) in enumerate(authorities): | ||||
send_metric("metadata_authority:add", count=1, method_name="metadata_authority") | if authority.metadata is None: | ||||
raise StorageArgumentException( | |||||
"MetadataAuthority.metadata may not be None in " | |||||
"metadata_authority_add." | |||||
) | |||||
db.metadata_authority_add( | |||||
authority.type.value, authority.url, dict(authority.metadata), cur=cur | |||||
) | |||||
send_metric( | |||||
"metadata_authority:add", count=i + 1, method_name="metadata_authority" | |||||
Not Done Inline Actionssame remark about metrics. ardumont: same remark about metrics. | |||||
) | |||||
@timed | @timed | ||||
@db_transaction() | @db_transaction() | ||||
def metadata_authority_get( | def metadata_authority_get( | ||||
self, type: str, url: str, db=None, cur=None | self, type: MetadataAuthorityType, url: str, db=None, cur=None | ||||
) -> Optional[Dict[str, Any]]: | ) -> Optional[MetadataAuthority]: | ||||
row = db.metadata_authority_get(type, url, cur=cur) | row = db.metadata_authority_get(type.value, url, cur=cur) | ||||
if not row: | if not row: | ||||
return None | return None | ||||
return dict(zip(db.metadata_authority_cols, row)) | return MetadataAuthority.from_dict(dict(zip(db.metadata_authority_cols, row))) | ||||
@timed | @timed | ||||
def diff_directories(self, from_dir, to_dir, track_renaming=False): | def diff_directories(self, from_dir, to_dir, track_renaming=False): | ||||
return diff.diff_directories(self, from_dir, to_dir, track_renaming) | return diff.diff_directories(self, from_dir, to_dir, track_renaming) | ||||
@timed | @timed | ||||
def diff_revisions(self, from_rev, to_rev, track_renaming=False): | def diff_revisions(self, from_rev, to_rev, track_renaming=False): | ||||
return diff.diff_revisions(self, from_rev, to_rev, track_renaming) | return diff.diff_revisions(self, from_rev, to_rev, track_renaming) | ||||
@timed | @timed | ||||
def diff_revision(self, revision, track_renaming=False): | def diff_revision(self, revision, track_renaming=False): | ||||
return diff.diff_revision(self, revision, track_renaming) | return diff.diff_revision(self, revision, track_renaming) | ||||
def clear_buffers(self, object_types: Optional[Iterable[str]] = None) -> None: | def clear_buffers(self, object_types: Optional[Iterable[str]] = None) -> None: | ||||
"""Do nothing | """Do nothing | ||||
""" | """ | ||||
return None | return None | ||||
def flush(self, object_types: Optional[Iterable[str]] = None) -> Dict: | def flush(self, object_types: Optional[Iterable[str]] = None) -> Dict: | ||||
return {} | return {} | ||||
def _get_authority_id(self, authority: Dict[str, Any], db, cur): | def _get_authority_id(self, authority: MetadataAuthority, db, cur): | ||||
authority_id = db.metadata_authority_get_id( | authority_id = db.metadata_authority_get_id( | ||||
authority["type"], authority["url"], cur | authority.type.value, authority.url, cur | ||||
) | ) | ||||
if not authority_id: | if not authority_id: | ||||
raise StorageArgumentException(f"Unknown authority {authority}") | raise StorageArgumentException(f"Unknown authority {authority}") | ||||
return authority_id | return authority_id | ||||
def _get_fetcher_id(self, fetcher: Dict[str, Any], db, cur): | def _get_fetcher_id(self, fetcher: MetadataFetcher, db, cur): | ||||
fetcher_id = db.metadata_fetcher_get_id( | fetcher_id = db.metadata_fetcher_get_id(fetcher.name, fetcher.version, cur) | ||||
fetcher["name"], fetcher["version"], cur | |||||
) | |||||
if not fetcher_id: | if not fetcher_id: | ||||
raise StorageArgumentException(f"Unknown fetcher {fetcher}") | raise StorageArgumentException(f"Unknown fetcher {fetcher}") | ||||
return fetcher_id | return fetcher_id |
I only realize now, make it returns its dict stats like most other add endpoints.
yeah, i know it changes the interface as well.
that way, you can only add the @process_metrics to that method and drop the internal calls to send_metrics.
Let's agree it can be done in another diff though as this one is quite large already ;)