Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/db.py
# Copyright (C) 2015-2020 The Software Heritage developers | # Copyright (C) 2015-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import datetime | |||||
import random | import random | ||||
import select | import select | ||||
from typing import Any, Dict, Optional, Tuple | from typing import Any, Dict, Optional, Tuple | ||||
from swh.core.db import BaseDb | from swh.core.db import BaseDb | ||||
from swh.core.db.db_utils import stored_procedure, jsonize | from swh.core.db.db_utils import stored_procedure, jsonize | ||||
from swh.core.db.db_utils import execute_values_generator | from swh.core.db.db_utils import execute_values_generator | ||||
from swh.model.model import OriginVisit, OriginVisitStatus, SHA1_SIZE | from swh.model.model import OriginVisit, OriginVisitStatus, SHA1_SIZE | ||||
▲ Show 20 Lines • Show All 1,037 Lines • ▼ Show 20 Lines | def release_get_from_list(self, releases, cur=None): | ||||
""" | """ | ||||
% query_keys, | % query_keys, | ||||
((sortkey, id) for sortkey, id in enumerate(releases)), | ((sortkey, id) for sortkey, id in enumerate(releases)), | ||||
) | ) | ||||
def release_get_random(self, cur=None): | def release_get_random(self, cur=None): | ||||
return self._get_random_row_from_table("release", ["id"], "id", cur) | return self._get_random_row_from_table("release", ["id"], "id", cur) | ||||
def origin_metadata_add(self, origin, ts, provider, tool, metadata, cur=None): | |||||
""" Add an origin_metadata for the origin at ts with provider, tool and | |||||
metadata. | |||||
Args: | |||||
origin (int): the origin's id for which the metadata is added | |||||
ts (datetime): time when the metadata was found | |||||
provider (int): the metadata provider identifier | |||||
tool (int): the tool's identifier used to extract metadata | |||||
metadata (jsonb): the metadata retrieved at the time and location | |||||
Returns: | |||||
id (int): the origin_metadata unique id | |||||
""" | |||||
cur = self._cursor(cur) | |||||
insert = """INSERT INTO origin_metadata (origin_id, discovery_date, | |||||
provider_id, tool_id, metadata) | |||||
SELECT id, %s, %s, %s, %s FROM origin WHERE url = %s""" | |||||
cur.execute(insert, (ts, provider, tool, jsonize(metadata), origin)) | |||||
origin_metadata_get_cols = [ | origin_metadata_get_cols = [ | ||||
"origin_url", | "origin.url", | ||||
"discovery_date", | "discovery_date", | ||||
"tool_id", | "metadata_authority.type", | ||||
"metadata_authority.url", | |||||
"metadata_fetcher.name", | |||||
"metadata_fetcher.version", | |||||
"format", | |||||
"metadata", | "metadata", | ||||
"provider_id", | |||||
"provider_name", | |||||
"provider_type", | |||||
"provider_url", | |||||
] | ] | ||||
def origin_metadata_get_by(self, origin_url, provider_type=None, cur=None): | def origin_metadata_add( | ||||
"""Retrieve all origin_metadata entries for one origin_url | self, | ||||
origin: str, | |||||
discovery_date: datetime.datetime, | |||||
authority: int, | |||||
fetcher: int, | |||||
format: str, | |||||
metadata: bytes, | |||||
cur=None, | |||||
) -> None: | |||||
""" Add an origin_metadata for the origin at ts with provider, tool and | |||||
metadata. | |||||
Args: | |||||
origin: the origin's id for which the metadata is added | |||||
discovery_date: time when the metadata was found | |||||
authority: the metadata provider identifier | |||||
fetcher: the tool's identifier used to extract metadata | |||||
format: the format of the metadata | |||||
metadata: the metadata retrieved at the time and location | |||||
""" | """ | ||||
cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
if not provider_type: | insert = """INSERT INTO origin_metadata (origin_id, discovery_date, | ||||
query = """SELECT %s | authority_id, fetcher_id, format, metadata) | ||||
FROM swh_origin_metadata_get_by_origin( | SELECT id, %s, %s, %s, %s, %s FROM origin WHERE url = %s""" | ||||
%%s)""" % ( | cur.execute( | ||||
",".join(self.origin_metadata_get_cols) | insert, | ||||
(discovery_date, authority, fetcher, format, jsonize(metadata), origin), | |||||
) | ) | ||||
cur.execute(query, (origin_url,)) | def origin_metadata_get( | ||||
self, | |||||
origin_url: str, | |||||
authority: int, | |||||
after: Optional[datetime.datetime], | |||||
limit: Optional[int], | |||||
cur=None, | |||||
): | |||||
cur = self._cursor(cur) | |||||
assert self.origin_metadata_get_cols[-1] == "metadata" | |||||
query_parts = [ | |||||
f"SELECT {', '.join(self.origin_metadata_get_cols[0:-1])}, " | |||||
f" origin_metadata.metadata AS metadata " | |||||
f"FROM origin_metadata " | |||||
f"INNER JOIN metadata_authority " | |||||
f" ON (metadata_authority.id=authority_id) " | |||||
f"INNER JOIN metadata_fetcher ON (metadata_fetcher.id=fetcher_id) " | |||||
f"INNER JOIN origin ON (origin.id=origin_metadata.origin_id) " | |||||
f"WHERE origin.url=%s AND authority_id=%s" | |||||
] | |||||
args = [origin_url, authority] | |||||
else: | if after: | ||||
query = """SELECT %s | query_parts.append("AND discovery_date >= %s") | ||||
FROM swh_origin_metadata_get_by_provider_type( | args.append(after) | ||||
%%s, %%s)""" % ( | |||||
",".join(self.origin_metadata_get_cols) | |||||
) | |||||
cur.execute(query, (origin_url, provider_type)) | query_parts.append("ORDER BY discovery_date") | ||||
yield from cur | if limit: | ||||
query_parts.append("LIMIT %s") | |||||
args.append(limit) | |||||
tool_cols = ["id", "name", "version", "configuration"] | cur.execute(" ".join(query_parts), args) | ||||
yield from cur | |||||
@stored_procedure("swh_mktemp_tool") | metadata_fetcher_cols = ["name", "version", "metadata"] | ||||
def mktemp_tool(self, cur=None): | |||||
pass | |||||
def tool_add_from_temp(self, cur=None): | def metadata_fetcher_add( | ||||
self, name: str, version: str, metadata: bytes, cur=None | |||||
) -> None: | |||||
cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
cur.execute("SELECT %s from swh_tool_add()" % (",".join(self.tool_cols),)) | cur.execute( | ||||
yield from cur | "INSERT INTO metadata_fetcher (name, version, metadata) " | ||||
"VALUES (%s, %s, %s) ON CONFLICT DO NOTHING", | |||||
(name, version, jsonize(metadata)), | |||||
) | |||||
def tool_get(self, name, version, configuration, cur=None): | def metadata_fetcher_get(self, name: str, version: str, cur=None): | ||||
cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
cur.execute( | cur.execute( | ||||
"""select %s | f"SELECT {', '.join(self.metadata_fetcher_cols)} " | ||||
from tool | f"FROM metadata_fetcher " | ||||
where name=%%s and | f"WHERE name=%s AND version=%s", | ||||
version=%%s and | (name, version), | ||||
configuration=%%s""" | |||||
% (",".join(self.tool_cols)), | |||||
(name, version, configuration), | |||||
) | ) | ||||
return cur.fetchone() | return cur.fetchone() | ||||
metadata_provider_cols = [ | def metadata_fetcher_get_id( | ||||
"id", | self, name: str, version: str, cur=None | ||||
"provider_name", | ) -> Optional[int]: | ||||
"provider_type", | |||||
"provider_url", | |||||
"metadata", | |||||
] | |||||
def metadata_provider_add( | |||||
self, | |||||
provider_name: str, | |||||
provider_type: str, | |||||
provider_url: str, | |||||
metadata: Dict, | |||||
cur=None, | |||||
) -> int: | |||||
"""Insert a new provider and return the new identifier.""" | |||||
cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
insert = """ | |||||
INSERT INTO metadata_provider (provider_name, provider_type, | |||||
provider_url, metadata) values (%s, %s, %s, %s) | |||||
ON CONFLICT(provider_type, provider_url) do nothing | |||||
""" | |||||
cur.execute( | cur.execute( | ||||
insert, (provider_name, provider_type, provider_url, jsonize(metadata)) | "SELECT id FROM metadata_fetcher WHERE name=%s AND version=%s", | ||||
) | (name, version), | ||||
row = self.metadata_provider_get_by_composite_key( | |||||
provider_type, provider_url, cur=cur | |||||
) | ) | ||||
row = cur.fetchone() | |||||
if row: | |||||
return row[0] | return row[0] | ||||
else: | |||||
return None | |||||
def metadata_provider_get_by_composite_key( | metadata_authority_cols = ["type", "url", "metadata"] | ||||
self, provider_type: str, provider_url: str, cur=None | |||||
) -> Tuple: | |||||
"""Retrieve metadata provider by its composite primary key. | |||||
""" | def metadata_authority_add( | ||||
self, type: str, url: str, metadata: bytes, cur=None | |||||
) -> None: | |||||
cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
cur.execute( | cur.execute( | ||||
"""select %s | "INSERT INTO metadata_authority (type, url, metadata) " | ||||
from metadata_provider | "VALUES (%s, %s, %s) ON CONFLICT DO NOTHING", | ||||
where provider_type=%%s and provider_url=%%s""" | (type, url, jsonize(metadata)), | ||||
% (",".join(self.metadata_provider_cols)), | |||||
(provider_type, provider_url,), | |||||
) | ) | ||||
return cur.fetchone() | |||||
def metadata_provider_get(self, provider_id, cur=None): | def metadata_authority_get(self, type: str, url: str, cur=None): | ||||
cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
cur.execute( | cur.execute( | ||||
"""select %s | f"SELECT {', '.join(self.metadata_authority_cols)} " | ||||
from metadata_provider | f"FROM metadata_authority " | ||||
where id=%%s """ | f"WHERE type=%s AND url=%s", | ||||
% (",".join(self.metadata_provider_cols)), | (type, url), | ||||
(provider_id,), | |||||
) | ) | ||||
return cur.fetchone() | return cur.fetchone() | ||||
def metadata_provider_get_by(self, provider_name, provider_url, cur=None): | def metadata_authority_get_id(self, type: str, url: str, cur=None) -> Optional[int]: | ||||
cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
cur.execute( | cur.execute( | ||||
"""select %s | "SELECT id FROM metadata_authority WHERE type=%s AND url=%s", (type, url) | ||||
from metadata_provider | |||||
where provider_name=%%s and | |||||
provider_url=%%s""" | |||||
% (",".join(self.metadata_provider_cols)), | |||||
(provider_name, provider_url), | |||||
) | ) | ||||
row = cur.fetchone() | |||||
return cur.fetchone() | if row: | ||||
return row[0] | |||||
else: | |||||
return None | |||||
def _get_random_row_from_table(self, table_name, cols, id_col, cur=None): | def _get_random_row_from_table(self, table_name, cols, id_col, cur=None): | ||||
random_sha1 = bytes(random.randint(0, 255) for _ in range(SHA1_SIZE)) | random_sha1 = bytes(random.randint(0, 255) for _ in range(SHA1_SIZE)) | ||||
cur = self._cursor(cur) | cur = self._cursor(cur) | ||||
query = """ | query = """ | ||||
(SELECT {cols} FROM {table} WHERE {id_col} >= %s | (SELECT {cols} FROM {table} WHERE {id_col} >= %s | ||||
ORDER BY {id_col} LIMIT 1) | ORDER BY {id_col} LIMIT 1) | ||||
UNION | UNION | ||||
Show All 10 Lines |