Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/db.py
# Copyright (C) 2015-2020 The Software Heritage developers | # Copyright (C) 2015-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import datetime | import datetime | ||||
import random | import random | ||||
import select | import select | ||||
from typing import Any, Dict, Iterable, List, Optional, Tuple | from typing import Any, Dict, Iterable, List, Optional, Tuple, Union | ||||
from swh.core.db import BaseDb | from swh.core.db import BaseDb | ||||
from swh.core.db.db_utils import stored_procedure, jsonize | from swh.core.db.db_utils import stored_procedure, jsonize | ||||
from swh.core.db.db_utils import execute_values_generator | from swh.core.db.db_utils import execute_values_generator | ||||
from swh.model.model import OriginVisit, OriginVisitStatus, SHA1_SIZE | from swh.model.model import OriginVisit, OriginVisitStatus, SHA1_SIZE | ||||
class Db(BaseDb): | class Db(BaseDb): | ||||
▲ Show 20 Lines • Show All 1,059 Lines • ▼ Show 20 Lines | def release_get_from_list(self, releases, cur=None): | ||||
""" | """ | ||||
% query_keys, | % query_keys, | ||||
((sortkey, id) for sortkey, id in enumerate(releases)), | ((sortkey, id) for sortkey, id in enumerate(releases)), | ||||
) | ) | ||||
def release_get_random(self, cur=None): | def release_get_random(self, cur=None): | ||||
return self._get_random_row_from_table("release", ["id"], "id", cur) | return self._get_random_row_from_table("release", ["id"], "id", cur) | ||||
origin_metadata_get_cols = [ | _object_metadata_context_cols = [ | ||||
"origin.url", | "origin", | ||||
"visit", | |||||
"snapshot", | |||||
"release", | |||||
"revision", | |||||
"path", | |||||
"directory", | |||||
] | |||||
"""The list of context columns for all artifact types.""" | |||||
_object_metadata_insert_cols = [ | |||||
"type", | |||||
"id", | |||||
"authority_id", | |||||
"fetcher_id", | |||||
"discovery_date", | |||||
"format", | |||||
"metadata", | |||||
*_object_metadata_context_cols, | |||||
] | |||||
"""List of columns of the object_metadata table, used when writing | |||||
metadata.""" | |||||
_object_metadata_insert_query = f""" | |||||
INSERT INTO object_metadata | |||||
({', '.join(_object_metadata_insert_cols)}) | |||||
VALUES ({', '.join('%s' for _ in _object_metadata_insert_cols)}) | |||||
ON CONFLICT (id, authority_id, discovery_date, fetcher_id) | |||||
DO UPDATE SET | |||||
format=EXCLUDED.format, | |||||
metadata=EXCLUDED.metadata | |||||
""" | |||||
object_metadata_get_cols = [ | |||||
"id", | |||||
"discovery_date", | "discovery_date", | ||||
"metadata_authority.type", | "metadata_authority.type", | ||||
"metadata_authority.url", | "metadata_authority.url", | ||||
"metadata_fetcher.id", | "metadata_fetcher.id", | ||||
"metadata_fetcher.name", | "metadata_fetcher.name", | ||||
"metadata_fetcher.version", | "metadata_fetcher.version", | ||||
*_object_metadata_context_cols, | |||||
"format", | "format", | ||||
"metadata", | "metadata", | ||||
] | ] | ||||
"""List of columns of the object_metadata, metadata_authority, | |||||
and metadata_fetcher tables, used when reading object metadata.""" | |||||
_object_metadata_select_query = f""" | |||||
SELECT | |||||
object_metadata.id AS id, | |||||
{', '.join(object_metadata_get_cols[1:-1])}, | |||||
object_metadata.metadata AS metadata | |||||
FROM object_metadata | |||||
INNER JOIN metadata_authority | |||||
ON (metadata_authority.id=authority_id) | |||||
INNER JOIN metadata_fetcher ON (metadata_fetcher.id=fetcher_id) | |||||
WHERE object_metadata.id=%s AND authority_id=%s | |||||
""" | |||||
def origin_metadata_add( | def object_metadata_add( | ||||
self, | self, | ||||
origin: str, | object_type: str, | ||||
id: str, | |||||
context: Dict[str, Union[str, bytes, int]], | |||||
discovery_date: datetime.datetime, | discovery_date: datetime.datetime, | ||||
authority: int, | authority_id: int, | ||||
fetcher: int, | fetcher_id: int, | ||||
format: str, | format: str, | ||||
metadata: bytes, | metadata: bytes, | ||||
cur=None, | cur, | ||||
) -> None: | ): | ||||
""" Add an origin_metadata for the origin at ts with provider, tool and | query = self._object_metadata_insert_query | ||||
metadata. | args: Dict[str, Any] = dict( | ||||
type=object_type, | |||||
Args: | id=id, | ||||
origin: the origin's id for which the metadata is added | authority_id=authority_id, | ||||
discovery_date: time when the metadata was found | fetcher_id=fetcher_id, | ||||
authority: the metadata provider identifier | discovery_date=discovery_date, | ||||
fetcher: the tool's identifier used to extract metadata | format=format, | ||||
format: the format of the metadata | metadata=metadata, | ||||
metadata: the metadata retrieved at the time and location | |||||
""" | |||||
cur = self._cursor(cur) | |||||
insert = """INSERT INTO origin_metadata (origin_id, discovery_date, | |||||
authority_id, fetcher_id, format, metadata) | |||||
SELECT id, %s, %s, %s, %s, %s FROM origin WHERE url = %s | |||||
ON CONFLICT (origin_id, authority_id, discovery_date, fetcher_id) | |||||
DO UPDATE SET | |||||
format=EXCLUDED.format, | |||||
metadata=EXCLUDED.metadata | |||||
""" | |||||
cur.execute( | |||||
insert, (discovery_date, authority, fetcher, format, metadata, origin), | |||||
) | ) | ||||
for col in self._object_metadata_context_cols: | |||||
args[col] = context.get(col) | |||||
params = [args[col] for col in self._object_metadata_insert_cols] | |||||
def origin_metadata_get( | cur.execute(query, params) | ||||
def object_metadata_get( | |||||
self, | self, | ||||
origin_url: str, | object_type: str, | ||||
authority: int, | id: str, | ||||
authority_id: int, | |||||
after_time: Optional[datetime.datetime], | after_time: Optional[datetime.datetime], | ||||
after_fetcher: Optional[int], | after_fetcher: Optional[int], | ||||
limit: Optional[int], | limit: int, | ||||
ardumont: since you're removing id (index 0) and metadata (index -1) from the cols, maybe assert those… | |||||
cur=None, | cur, | ||||
): | ): | ||||
cur = self._cursor(cur) | query_parts = [self._object_metadata_select_query] | ||||
assert self.origin_metadata_get_cols[-1] == "metadata" | args = [id, authority_id] | ||||
query_parts = [ | |||||
f"SELECT {', '.join(self.origin_metadata_get_cols[0:-1])}, " | |||||
f" origin_metadata.metadata AS metadata " | |||||
f"FROM origin_metadata " | |||||
f"INNER JOIN metadata_authority " | |||||
f" ON (metadata_authority.id=authority_id) " | |||||
f"INNER JOIN metadata_fetcher ON (metadata_fetcher.id=fetcher_id) " | |||||
f"INNER JOIN origin ON (origin.id=origin_metadata.origin_id) " | |||||
f"WHERE origin.url=%s AND authority_id=%s " | |||||
] | |||||
args = [origin_url, authority] | |||||
if after_fetcher is not None: | if after_fetcher is not None: | ||||
assert after_time | assert after_time | ||||
query_parts.append("AND (discovery_date, fetcher_id) > (%s, %s)") | query_parts.append("AND (discovery_date, fetcher_id) > (%s, %s)") | ||||
args.extend([after_time, after_fetcher]) | args.extend([after_time, after_fetcher]) | ||||
elif after_time is not None: | elif after_time is not None: | ||||
query_parts.append("AND discovery_date > %s") | query_parts.append("AND discovery_date > %s") | ||||
args.append(after_time) | args.append(after_time) | ||||
▲ Show 20 Lines • Show All 96 Lines • Show Last 20 Lines |
since you're removing id (index 0) and metadata (index -1) from the cols, maybe assert those are what we think they are (to avoid future breaking change on that).
(We did that for the origin visit status)