Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/postgresql/storage.py
# Copyright (C) 2015-2020 The Software Heritage developers | # Copyright (C) 2015-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import base64 | import base64 | ||||
from collections import defaultdict | from collections import defaultdict | ||||
import contextlib | import contextlib | ||||
from contextlib import contextmanager | from contextlib import contextmanager | ||||
import datetime | import datetime | ||||
import itertools | import itertools | ||||
from typing import Any, Counter, Dict, Iterable, List, Optional, Sequence, Tuple, Union | from typing import Any, Counter, Dict, Iterable, List, Optional, Sequence, Tuple | ||||
import attr | import attr | ||||
import psycopg2 | import psycopg2 | ||||
import psycopg2.errors | import psycopg2.errors | ||||
import psycopg2.pool | import psycopg2.pool | ||||
from swh.core.api.serializers import msgpack_dumps, msgpack_loads | from swh.core.api.serializers import msgpack_dumps, msgpack_loads | ||||
from swh.core.db.common import db_transaction, db_transaction_generator | from swh.core.db.common import db_transaction, db_transaction_generator | ||||
from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex | from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex | ||||
from swh.model.identifiers import SWHID | |||||
from swh.model.model import ( | from swh.model.model import ( | ||||
SHA1_SIZE, | SHA1_SIZE, | ||||
Content, | Content, | ||||
Directory, | Directory, | ||||
MetadataAuthority, | MetadataAuthority, | ||||
MetadataAuthorityType, | MetadataAuthorityType, | ||||
MetadataFetcher, | MetadataFetcher, | ||||
MetadataTargetType, | |||||
Origin, | Origin, | ||||
OriginVisit, | OriginVisit, | ||||
OriginVisitStatus, | OriginVisitStatus, | ||||
RawExtrinsicMetadata, | RawExtrinsicMetadata, | ||||
Release, | Release, | ||||
Revision, | Revision, | ||||
Sha1, | Sha1, | ||||
Sha1Git, | Sha1Git, | ||||
SkippedContent, | SkippedContent, | ||||
Snapshot, | Snapshot, | ||||
SnapshotBranch, | SnapshotBranch, | ||||
TargetType, | TargetType, | ||||
) | ) | ||||
from swh.model.swhid import SWHID, SWHIDObjectType | |||||
from swh.storage.exc import HashCollision, StorageArgumentException, StorageDBError | from swh.storage.exc import HashCollision, StorageArgumentException, StorageDBError | ||||
from swh.storage.interface import ( | from swh.storage.interface import ( | ||||
VISIT_STATUSES, | VISIT_STATUSES, | ||||
ListOrder, | ListOrder, | ||||
PagedResult, | PagedResult, | ||||
PartialBranches, | PartialBranches, | ||||
) | ) | ||||
from swh.storage.metrics import process_metrics, send_metric, timed | from swh.storage.metrics import process_metrics, send_metric, timed | ||||
from swh.storage.objstorage import ObjStorage | from swh.storage.objstorage import ObjStorage | ||||
from swh.storage.utils import ( | from swh.storage.utils import extract_collision_hash, get_partition_bounds_bytes, now | ||||
extract_collision_hash, | |||||
get_partition_bounds_bytes, | |||||
map_optional, | |||||
now, | |||||
) | |||||
from swh.storage.writer import JournalWriter | from swh.storage.writer import JournalWriter | ||||
from . import converters | from . import converters | ||||
from .db import Db | from .db import Db | ||||
# Max block size of contents to return | # Max block size of contents to return | ||||
BULK_BLOCK_CONTENT_LEN_MAX = 10000 | BULK_BLOCK_CONTENT_LEN_MAX = 10000 | ||||
▲ Show 20 Lines • Show All 1,178 Lines • ▼ Show 20 Lines | def refresh_stat_counters(self, db=None, cur=None): | ||||
cur.execute("select * from swh_update_counter(%s)", (key,)) | cur.execute("select * from swh_update_counter(%s)", (key,)) | ||||
@db_transaction() | @db_transaction() | ||||
def raw_extrinsic_metadata_add( | def raw_extrinsic_metadata_add( | ||||
self, metadata: List[RawExtrinsicMetadata], db, cur, | self, metadata: List[RawExtrinsicMetadata], db, cur, | ||||
) -> None: | ) -> None: | ||||
metadata = list(metadata) | metadata = list(metadata) | ||||
self.journal_writer.raw_extrinsic_metadata_add(metadata) | self.journal_writer.raw_extrinsic_metadata_add(metadata) | ||||
counter = Counter[MetadataTargetType]() | counter = Counter[SWHIDObjectType]() | ||||
for metadata_entry in metadata: | for metadata_entry in metadata: | ||||
authority_id = self._get_authority_id(metadata_entry.authority, db, cur) | |||||
fetcher_id = self._get_fetcher_id(metadata_entry.fetcher, db, cur) | |||||
db.raw_extrinsic_metadata_add( | db.raw_extrinsic_metadata_add( | ||||
type=metadata_entry.type.value, | target=metadata_entry.target, | ||||
target=str(metadata_entry.target), | authority_id=self._get_authority_id(metadata_entry.authority, db, cur), | ||||
discovery_date=metadata_entry.discovery_date, | discovery_date=metadata_entry.discovery_date, | ||||
authority_id=authority_id, | fetcher_id=self._get_fetcher_id(metadata_entry.fetcher, db, cur), | ||||
fetcher_id=fetcher_id, | |||||
format=metadata_entry.format, | format=metadata_entry.format, | ||||
metadata=metadata_entry.metadata, | metadata=metadata_entry.metadata, | ||||
origin=metadata_entry.origin, | origin=metadata_entry.origin, | ||||
visit=metadata_entry.visit, | visit=metadata_entry.visit, | ||||
snapshot=map_optional(str, metadata_entry.snapshot), | snapshot=metadata_entry.snapshot, | ||||
release=map_optional(str, metadata_entry.release), | release=metadata_entry.release, | ||||
revision=map_optional(str, metadata_entry.revision), | revision=metadata_entry.revision, | ||||
path=metadata_entry.path, | path=metadata_entry.path, | ||||
directory=map_optional(str, metadata_entry.directory), | directory=metadata_entry.directory, | ||||
cur=cur, | cur=cur, | ||||
olasd: I think I prefer the previous, better-typed form of this call, even if it's verbose and… | |||||
) | ) | ||||
counter[metadata_entry.type] += 1 | counter[metadata_entry.type] += 1 | ||||
for (type, count) in counter.items(): | for (type, count) in counter.items(): | ||||
send_metric( | send_metric( | ||||
f"{type.value}_metadata:add", | f"{type.value}_metadata:add", | ||||
count=count, | count=count, | ||||
method_name=f"{type.value}_metadata_add", | method_name=f"{type.value}_metadata_add", | ||||
) | ) | ||||
@db_transaction() | @db_transaction() | ||||
def raw_extrinsic_metadata_get( | def raw_extrinsic_metadata_get( | ||||
self, | self, | ||||
type: MetadataTargetType, | target: SWHID, | ||||
target: Union[str, SWHID], | |||||
authority: MetadataAuthority, | authority: MetadataAuthority, | ||||
after: Optional[datetime.datetime] = None, | after: Optional[datetime.datetime] = None, | ||||
page_token: Optional[bytes] = None, | page_token: Optional[bytes] = None, | ||||
limit: int = 1000, | limit: int = 1000, | ||||
db=None, | db=None, | ||||
cur=None, | cur=None, | ||||
) -> PagedResult[RawExtrinsicMetadata]: | ) -> PagedResult[RawExtrinsicMetadata]: | ||||
if type == MetadataTargetType.ORIGIN: | |||||
if isinstance(target, SWHID): | |||||
raise StorageArgumentException( | |||||
f"raw_extrinsic_metadata_get called with type='origin', " | |||||
f"but provided target is a SWHID: {target!r}" | |||||
) | |||||
else: | |||||
if not isinstance(target, SWHID): | |||||
raise StorageArgumentException( | |||||
f"raw_extrinsic_metadata_get called with type!='origin', " | |||||
f"but provided target is not a SWHID: {target!r}" | |||||
) | |||||
if page_token: | if page_token: | ||||
(after_time, after_fetcher) = msgpack_loads(base64.b64decode(page_token)) | (after_time, after_fetcher) = msgpack_loads(base64.b64decode(page_token)) | ||||
if after and after_time < after: | if after and after_time < after: | ||||
raise StorageArgumentException( | raise StorageArgumentException( | ||||
"page_token is inconsistent with the value of 'after'." | "page_token is inconsistent with the value of 'after'." | ||||
) | ) | ||||
else: | else: | ||||
after_time = after | after_time = after | ||||
after_fetcher = None | after_fetcher = None | ||||
authority_id = self._get_authority_id(authority, db, cur) | authority_id = self._get_authority_id(authority, db, cur) | ||||
if not authority_id: | if not authority_id: | ||||
return PagedResult(next_page_token=None, results=[],) | return PagedResult(next_page_token=None, results=[],) | ||||
rows = db.raw_extrinsic_metadata_get( | rows = db.raw_extrinsic_metadata_get( | ||||
type, str(target), authority_id, after_time, after_fetcher, limit + 1, cur, | target, authority_id, after_time, after_fetcher, limit + 1, cur, | ||||
) | ) | ||||
rows = [dict(zip(db.raw_extrinsic_metadata_get_cols, row)) for row in rows] | rows = [dict(zip(db.raw_extrinsic_metadata_get_cols, row)) for row in rows] | ||||
results = [] | results = [] | ||||
for row in rows: | for row in rows: | ||||
assert str(target) == row["raw_extrinsic_metadata.target"] | assert target == row["raw_extrinsic_metadata.target"] | ||||
results.append(converters.db_to_raw_extrinsic_metadata(row)) | results.append(converters.db_to_raw_extrinsic_metadata(row)) | ||||
if len(results) > limit: | if len(results) > limit: | ||||
results.pop() | results.pop() | ||||
assert len(results) == limit | assert len(results) == limit | ||||
last_returned_row = rows[-2] # rows[-1] corresponds to the popped result | last_returned_row = rows[-2] # rows[-1] corresponds to the popped result | ||||
next_page_token: Optional[str] = base64.b64encode( | next_page_token: Optional[str] = base64.b64encode( | ||||
msgpack_dumps( | msgpack_dumps( | ||||
▲ Show 20 Lines • Show All 94 Lines • Show Last 20 Lines |
I think I prefer the previous, better-typed form of this call, even if it's verbose and (silently) breaks when we change the RawExtrinsicMetadata fields.