Changeset View
Standalone View
swh/scanner/db.py
- This file was added.
# Copyright (C) 2020 The Software Heritage developers | |||||
# See the AUTHORS file at the top-level directory of this distribution | |||||
# License: GNU General Public License version 3, or any later version | |||||
# See top-level LICENSE file for more information | |||||
""" | |||||
This module is an interface to interact with the local database | |||||
where the SWHIDs will be saved for the local API. | |||||
SWHIDs can be added directly from an input file. | |||||
""" | |||||
import asyncio | |||||
from io import TextIOWrapper | |||||
import os | |||||
from pathlib import Path | |||||
import sqlite3 | |||||
import sys | |||||
from typing import Any, Dict | |||||
import aiohttp | |||||
from .exceptions import APIError | |||||
from .scanner import swhids_discovery | |||||
class Db: | |||||
"""Local database interface""" | |||||
def __init__(self, db_file: Path): | |||||
self.db_file: Path = db_file | |||||
self.conn: sqlite3.Connection = sqlite3.connect( | |||||
db_file, check_same_thread=False | |||||
) | |||||
def close(self): | |||||
"""Close the connection to the database.""" | |||||
self.conn.close() | |||||
def create_table(self, cur: sqlite3.Cursor): | |||||
zack: Minor nit: I think you want `Iterable[str]` here instead of `List[str]`, as the latter would… | |||||
"""Create the table where the SWHIDs will be stored.""" | |||||
cur.execute("""CREATE TABLE IF NOT EXISTS swhid_db (swhid text)""") | |||||
zackUnsubmitted Not Done Inline ActionsI'd just call this table "swhids", because that is what it contains. Additionally, you kneed here an uniqueness constraint and/or an unique index. That would avoid having to manually check for duplicates in add and also speed up lookups. cc: @seirl in case he has performance tips for the table and/or index. For context: this table of known indexes in the use case we have in mind will end up containing ~30 million SWHIDs. zack: I'd just call this table "swhids", because that is what it contains.
Additionally, you kneed… | |||||
vlorentzUnsubmitted Not Done Inline Actionsor mark swhid as a PRIMARY KEY vlorentz: or mark `swhid` as a `PRIMARY KEY` | |||||
Not Done Inline ActionsThere's a grouper function in swh-core that you can use instead of get_chunk. vlorentz: There's a `grouper` function in swh-core that you can use instead of `get_chunk`. | |||||
def add(self, swhid: str, cur: sqlite3.Cursor): | |||||
zackUnsubmitted Not Done Inline ActionsI wonder if you want a add_one/add_many split here, to be able to insert many SWHIDs at once. In general inserting stuff one-by-one in a table is quite slow, and will get worse with an index. So an option to insert many at once with a single query, and inserting entire "pages" of SWHIDs from the input file at once with it could be interested. cc: @seirl for the same reason as above zack: I wonder if you want a add_one/add_many split here, to be able to insert many SWHIDs at once. | |||||
"""Insert the SWHID inside the database.""" | |||||
cur.execute( | |||||
"""INSERT INTO swhid_db SELECT (?) | |||||
WHERE NOT EXISTS (SELECT 1 FROM swhid_db WHERE swhid=?)""", | |||||
zackUnsubmitted Not Done Inline Actionsthis WHERE predicate can be removed with an index/constraint zack: this WHERE predicate can be removed with an index/constraint | |||||
(swhid, swhid), | |||||
) | |||||
Not Done Inline Actionsthe name doesn't make it clear that this method connects to swh-web and fills the db vlorentz: the name doesn't make it clear that this method connects to swh-web and fills the db | |||||
async def query_swhids_from( | |||||
self, input_file: TextIOWrapper, config: Dict[str, Any], cur: sqlite3.Cursor | |||||
): | |||||
"""Query all the SWHIDs present inside the input file to the Web API and fill | |||||
the local database only with known SWHIDs. | |||||
""" | |||||
swhids = [line.strip() for line in input_file.readlines()] | |||||
api_url = config["web-api"]["url"] | |||||
if config["web-api"]["auth-token"]: | |||||
headers = {"Authorization": f"Bearer {config['web-api']['auth-token']}"} | |||||
else: | |||||
headers = {} | |||||
async with aiohttp.ClientSession(headers=headers) as session: | |||||
Not Done Inline ActionsThat works, but it's not reusable as a library. Could you raise an exception and catch it in the CLI instead? If not, that's not a big deal vlorentz: That works, but it's not reusable as a library. Could you raise an exception and catch it in… | |||||
parsed_swhids = await swhids_discovery(swhids, session, api_url) | |||||
for swhid, attr in parsed_swhids.items(): | |||||
if attr["known"]: | |||||
self.add(swhid, cur) | |||||
zackUnsubmitted Not Done Inline ActionsOh, sorry, maybe we weren't clear on this part of the spec. I don't think this logic of querying the archive to check if SWHIDs are known belongs to the scanner. The spec in T2760 assumed that the SWHIDs in the input .txt file must be considered as known, and hence added to the DB, without further querying the archive. So I think this function can just be removed. (It is tangential to this diff, but FWIW for the ongoing swh-scanner paper, the list of SWHIDs fed to swh-scanner db import has already been filtered to include only known SWHID. See my data/swh-known.py script in the paper repo.) zack: Oh, sorry, maybe we weren't clear on this part of the spec. I don't think this logic of… | |||||
def create_from( | |||||
self, config: Dict[str, Any], input_file: TextIOWrapper, cur: sqlite3.Cursor | |||||
): | |||||
"""Create a new database with the SWHIDs present inside the input file.""" | |||||
self.create_table(cur) | |||||
try: | |||||
loop = asyncio.get_event_loop() | |||||
loop.run_until_complete(self.query_swhids_from(input_file, config, cur)) | |||||
cur.close() | |||||
self.conn.commit() | |||||
zackUnsubmitted Not Done Inline ActionsSo here you can just insert all SWHIDs from the file into the DB. (Ideally paginating with the add_many methods I suggested above, assuming it's more efficient.) And avoid all the API error handling. zack: So here you can just insert //all// SWHIDs from the file into the DB. (Ideally paginating with… | |||||
except APIError: | |||||
print("Error during the api call") | |||||
os.remove(self.db_file) | |||||
Not Done Inline Actionsthese should cause a non-0 status code to be returned vlorentz: these should cause a non-0 status code to be returned | |||||
sys.exit(1) | |||||
except Exception: | |||||
print("Failed to create database") | |||||
os.remove(self.db_file) | |||||
sys.exit(1) | |||||
def check(self, swhid: str, cur: sqlite3.Cursor): | |||||
zackUnsubmitted Not Done Inline Actionsthis should just be called "known", no? zack: this should just be called "known", no? | |||||
"""Check if a given SWHID is present or not inside the local database.""" | |||||
Not Done Inline Actionsnitpick: return res is not None vlorentz: nitpick: `return res is not None` | |||||
cur.execute("""SELECT 1 FROM swhid_db WHERE swhid=?""", (swhid,)) | |||||
res = cur.fetchone() | |||||
cur.close() | |||||
return res is not None |
Minor nit: I think you want Iterable[str] here instead of List[str], as the latter would force client code to actually materialize a potentially long list in memory. (A general rule of thumb for typing is "accept abstract types, return concrete types".)
But I don't know what grouper expects, so please verify that before changing this.