Changeset View
Standalone View
swh/storage/postgresql/db.py
# Copyright (C) 2015-2020 The Software Heritage developers | # Copyright (C) 2015-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import datetime | import datetime | ||||
import logging | import logging | ||||
import random | import random | ||||
import re | |||||
import select | import select | ||||
from typing import Any, Dict, Iterable, List, Optional, Tuple | from typing import Any, Dict, Iterable, List, Optional, Tuple | ||||
import psycopg2 | |||||
from swh.core.db import BaseDb | from swh.core.db import BaseDb | ||||
from swh.core.db.db_utils import execute_values_generator | from swh.core.db.db_utils import execute_values_generator | ||||
from swh.core.db.db_utils import jsonize as _jsonize | from swh.core.db.db_utils import jsonize as _jsonize | ||||
from swh.core.db.db_utils import stored_procedure | from swh.core.db.db_utils import stored_procedure | ||||
from swh.model.hashutil import hash_to_bytes | |||||
from swh.model.model import SHA1_SIZE, OriginVisit, OriginVisitStatus | from swh.model.model import SHA1_SIZE, OriginVisit, OriginVisitStatus | ||||
from swh.model.swhid import SWHID | |||||
from swh.model.swhid import _swhid_type_map as swhid_typemap | |||||
from swh.storage.interface import ListOrder | from swh.storage.interface import ListOrder | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
def jsonize(d): | def jsonize(d): | ||||
return _jsonize(dict(d) if d is not None else None) | return _jsonize(dict(d) if d is not None else None) | ||||
def typecast_swhid(value, cur): | |||||
if value is None: | |||||
return None | |||||
olasd: Eeeeek.
If we go for a regexp approach, this should be a `re.fullmatch`. But of course it'd be… | |||||
Done Inline ActionsWell it comes from there... https://www.psycopg.org/docs/advanced.html#type-casting-from-sql-to-python douardda: Well it comes from there...
https://www.psycopg.org/docs/advanced.html#type-casting-from-sql… | |||||
m = re.match(r'\(([^)]+),([^)]+),"([^)]+)"\)', value) | |||||
if m: | |||||
return SWHID( | |||||
scheme_version=int(m.group(1)), | |||||
object_type=swhid_typemap[m.group(2)], | |||||
object_id=hash_to_bytes(m.group(3)[3:]), | |||||
) | |||||
else: | |||||
raise psycopg2.InterfaceError("bad SWHID representation: %r" % value) | |||||
def adapt_swhid(swhid: SWHID): | |||||
value = psycopg2.extensions.AsIs( | |||||
( | |||||
b"ROW(%d, '%s'::swhid_type, '\\x%s'::bytea)" | |||||
% ( | |||||
swhid.scheme_version, | |||||
swhid.object_type.value.encode(), | |||||
swhid.object_id.encode(), | |||||
) | |||||
).decode() | |||||
) | |||||
Not Done Inline Actionsspurious debug print olasd: spurious debug print | |||||
print("VALUE", value) | |||||
return value | |||||
def register_swhid_type(conn): | |||||
with conn.cursor() as cur: | |||||
cur.execute( | |||||
""" | |||||
SELECT pg_type.oid | |||||
FROM pg_type | |||||
JOIN pg_namespace | |||||
ON typnamespace = pg_namespace.oid | |||||
WHERE typname = %(typename)s | |||||
AND nspname = %(namespace)s""", | |||||
{"typename": "swhid", "namespace": "public"}, | |||||
) | |||||
oid = cur.fetchone()[0] | |||||
t_SWHID = psycopg2.extensions.new_type((oid,), "SWHID", typecast_swhid) | |||||
psycopg2.extensions.register_type(t_SWHID, conn) | |||||
psycopg2.extensions.register_adapter(SWHID, adapt_swhid) | |||||
class Db(BaseDb): | class Db(BaseDb): | ||||
"""Proxy to the SWH DB, with wrappers around stored procedures | """Proxy to the SWH DB, with wrappers around stored procedures | ||||
""" | """ | ||||
current_version = 166 | current_version = 166 | ||||
def __init__( | |||||
self, | |||||
conn: psycopg2.extensions.connection, | |||||
pool: Optional[psycopg2.pool.AbstractConnectionPool] = None, | |||||
): | |||||
super().__init__(conn, pool) | |||||
register_swhid_type(conn) | |||||
def mktemp_dir_entry(self, entry_type, cur=None): | def mktemp_dir_entry(self, entry_type, cur=None): | ||||
self._cursor(cur).execute( | self._cursor(cur).execute( | ||||
"SELECT swh_mktemp_dir_entry(%s)", (("directory_entry_%s" % entry_type),) | "SELECT swh_mktemp_dir_entry(%s)", (("directory_entry_%s" % entry_type),) | ||||
) | ) | ||||
@stored_procedure("swh_mktemp_revision") | @stored_procedure("swh_mktemp_revision") | ||||
def mktemp_revision(self, cur=None): | def mktemp_revision(self, cur=None): | ||||
pass | pass | ||||
@stored_procedure("swh_mktemp_release") | @stored_procedure("swh_mktemp_release") | ||||
def mktemp_release(self, cur=None): | def mktemp_release(self, cur=None): | ||||
pass | pass | ||||
@stored_procedure("swh_mktemp_snapshot_branch") | @stored_procedure("swh_mktemp_snapshot_branch") | ||||
def mktemp_snapshot_branch(self, cur=None): | def mktemp_snapshot_branch(self, cur=None): | ||||
Not Done Inline ActionsWhen we used to do this kind of query for bytea and bytea[]s (well, we really only did a select NULL::bytea, NULL::bytea[]), we spent a substantial amount of time querying for the oid of these types instead of doing actual database queries. We really need to make sure this only happens once per connection instance, even in pooled situations. Unfortunately, unlike for byteas, we won't be able to hardcode the oid. olasd: When we used to do this kind of query for `bytea` and `bytea[]`s (well, we really only did a… | |||||
Done Inline Actionsok something to keep in mind... douardda: ok something to keep in mind... | |||||
pass | pass | ||||
def register_listener(self, notify_queue, cur=None): | def register_listener(self, notify_queue, cur=None): | ||||
"""Register a listener for NOTIFY queue `notify_queue`""" | """Register a listener for NOTIFY queue `notify_queue`""" | ||||
self._cursor(cur).execute("LISTEN %s" % notify_queue) | self._cursor(cur).execute("LISTEN %s" % notify_queue) | ||||
def listen_notifies(self, timeout): | def listen_notifies(self, timeout): | ||||
"""Listen to notifications for `timeout` seconds""" | """Listen to notifications for `timeout` seconds""" | ||||
▲ Show 20 Lines • Show All 1,075 Lines • ▼ Show 20 Lines | _raw_extrinsic_metadata_context_cols = [ | ||||
"release", | "release", | ||||
"revision", | "revision", | ||||
"path", | "path", | ||||
"directory", | "directory", | ||||
] | ] | ||||
"""The list of context columns for all artifact types.""" | """The list of context columns for all artifact types.""" | ||||
_raw_extrinsic_metadata_insert_cols = [ | _raw_extrinsic_metadata_insert_cols = [ | ||||
"type", | |||||
"target", | "target", | ||||
"authority_id", | "authority_id", | ||||
"fetcher_id", | "fetcher_id", | ||||
"discovery_date", | "discovery_date", | ||||
"format", | "format", | ||||
"metadata", | "metadata", | ||||
*_raw_extrinsic_metadata_context_cols, | *_raw_extrinsic_metadata_context_cols, | ||||
] | ] | ||||
"""List of columns of the raw_extrinsic_metadata table, used when writing | """List of columns of the raw_extrinsic_metadata table, used when writing | ||||
metadata.""" | metadata.""" | ||||
_raw_extrinsic_metadata_insert_query = f""" | _raw_extrinsic_metadata_insert_query = f""" | ||||
INSERT INTO raw_extrinsic_metadata | INSERT INTO raw_extrinsic_metadata | ||||
({', '.join(_raw_extrinsic_metadata_insert_cols)}) | ({', '.join(_raw_extrinsic_metadata_insert_cols)}) | ||||
VALUES ({', '.join('%s' for _ in _raw_extrinsic_metadata_insert_cols)}) | VALUES ({', '.join('%s' for _ in _raw_extrinsic_metadata_insert_cols)}) | ||||
ON CONFLICT (target, authority_id, discovery_date, fetcher_id) | ON CONFLICT (target, authority_id, discovery_date, fetcher_id) | ||||
DO NOTHING | DO NOTHING | ||||
""" | """ | ||||
raw_extrinsic_metadata_get_cols = [ | raw_extrinsic_metadata_get_cols = [ | ||||
"raw_extrinsic_metadata.target", | "raw_extrinsic_metadata.target", | ||||
"raw_extrinsic_metadata.type", | |||||
"discovery_date", | "discovery_date", | ||||
"metadata_authority.type", | "metadata_authority.type", | ||||
"metadata_authority.url", | "metadata_authority.url", | ||||
"metadata_fetcher.id", | "metadata_fetcher.id", | ||||
"metadata_fetcher.name", | "metadata_fetcher.name", | ||||
"metadata_fetcher.version", | "metadata_fetcher.version", | ||||
*_raw_extrinsic_metadata_context_cols, | *_raw_extrinsic_metadata_context_cols, | ||||
"format", | "format", | ||||
"raw_extrinsic_metadata.metadata", | "raw_extrinsic_metadata.metadata", | ||||
] | ] | ||||
"""List of columns of the raw_extrinsic_metadata, metadata_authority, | """List of columns of the raw_extrinsic_metadata, metadata_authority, | ||||
and metadata_fetcher tables, used when reading object metadata.""" | and metadata_fetcher tables, used when reading object metadata.""" | ||||
_raw_extrinsic_metadata_select_query = f""" | _raw_extrinsic_metadata_select_query = f""" | ||||
SELECT | SELECT | ||||
{', '.join(raw_extrinsic_metadata_get_cols)} | {', '.join(raw_extrinsic_metadata_get_cols)} | ||||
FROM raw_extrinsic_metadata | FROM raw_extrinsic_metadata | ||||
INNER JOIN metadata_authority | INNER JOIN metadata_authority | ||||
ON (metadata_authority.id=authority_id) | ON (metadata_authority.id=authority_id) | ||||
INNER JOIN metadata_fetcher ON (metadata_fetcher.id=fetcher_id) | INNER JOIN metadata_fetcher ON (metadata_fetcher.id=fetcher_id) | ||||
WHERE raw_extrinsic_metadata.target=%s AND authority_id=%s | WHERE (raw_extrinsic_metadata.target)=%s | ||||
AND authority_id=%s | |||||
Not Done Inline ActionsRemembering to add these parentheses is going to be annoying. are they really necessary? olasd: Remembering to add these parentheses is going to be annoying. are they really necessary? | |||||
Done Inline ActionsI thought they are, reading: https://www.postgresql.org/docs/12/rowtypes.html#ROWTYPES-ACCESSING Actually for comparing the whole composite object it might not be required, This seems to work fine: gres=# select * from on_hand where on_hand.item=ROW('fuzzy dice', 42, 1.99)::inventory_item; item | count ------------------------+------- ("fuzzy dice",42,1.99) | 1000 I'll give a try without. douardda: I thought they are, reading:
https://www.postgresql.org/docs/12/rowtypes.html#ROWTYPES… | |||||
""" | """ | ||||
def raw_extrinsic_metadata_add( | def raw_extrinsic_metadata_add( | ||||
self, | self, | ||||
type: str, | target: SWHID, | ||||
target: str, | |||||
discovery_date: datetime.datetime, | discovery_date: datetime.datetime, | ||||
authority_id: int, | authority_id: int, | ||||
fetcher_id: int, | fetcher_id: int, | ||||
format: str, | format: str, | ||||
metadata: bytes, | metadata: bytes, | ||||
origin: Optional[str], | origin: Optional[SWHID], | ||||
visit: Optional[int], | visit: Optional[int], | ||||
snapshot: Optional[str], | snapshot: Optional[SWHID], | ||||
release: Optional[str], | release: Optional[SWHID], | ||||
revision: Optional[str], | revision: Optional[SWHID], | ||||
path: Optional[bytes], | path: Optional[bytes], | ||||
directory: Optional[str], | directory: Optional[SWHID], | ||||
cur, | cur, | ||||
): | ): | ||||
query = self._raw_extrinsic_metadata_insert_query | query = self._raw_extrinsic_metadata_insert_query | ||||
args: Dict[str, Any] = dict( | args: Dict[str, Any] = dict( | ||||
type=type, | |||||
target=target, | target=target, | ||||
authority_id=authority_id, | authority_id=authority_id, | ||||
fetcher_id=fetcher_id, | fetcher_id=fetcher_id, | ||||
discovery_date=discovery_date, | discovery_date=discovery_date, | ||||
format=format, | format=format, | ||||
metadata=metadata, | metadata=metadata, | ||||
origin=origin, | origin=origin, | ||||
visit=visit, | visit=visit, | ||||
snapshot=snapshot, | snapshot=snapshot, | ||||
release=release, | release=release, | ||||
revision=revision, | revision=revision, | ||||
path=path, | path=path, | ||||
directory=directory, | directory=directory, | ||||
) | ) | ||||
params = [args[col] for col in self._raw_extrinsic_metadata_insert_cols] | params = [args[col] for col in self._raw_extrinsic_metadata_insert_cols] | ||||
cur.execute(query, params) | cur.execute(query, params) | ||||
def raw_extrinsic_metadata_get( | def raw_extrinsic_metadata_get( | ||||
self, | self, | ||||
type: str, | target: SWHID, | ||||
target: str, | |||||
authority_id: int, | authority_id: int, | ||||
after_time: Optional[datetime.datetime], | after_time: Optional[datetime.datetime], | ||||
after_fetcher: Optional[int], | after_fetcher: Optional[int], | ||||
limit: int, | limit: int, | ||||
cur, | cur, | ||||
): | ): | ||||
query_parts = [self._raw_extrinsic_metadata_select_query] | query_parts = [self._raw_extrinsic_metadata_select_query] | ||||
args = [target, authority_id] | args = [target, authority_id] | ||||
▲ Show 20 Lines • Show All 122 Lines • Show Last 20 Lines |
Eeeeek.
If we go for a regexp approach, this should be a re.fullmatch. But of course it'd be much, much better to avoid using a regexp altogether.