Changeset View
Changeset View
Standalone View
Standalone View
swh/provenance/postgresql/provenancedb_base.py
- This file was moved from swh/provenance/postgresql_nopath/provenance.py.
import itertools | import itertools | ||||
import logging | import logging | ||||
import operator | |||||
import os | |||||
from datetime import datetime | |||||
from typing import Any, Dict, Generator, List, Optional, Tuple | |||||
import psycopg2 | import psycopg2 | ||||
import psycopg2.extras | import psycopg2.extras | ||||
from ..model import DirectoryEntry, FileEntry | from ..model import DirectoryEntry, FileEntry | ||||
from ..origin import OriginEntry | from ..origin import OriginEntry | ||||
from ..postgresql.db_utils import connect, execute_sql | |||||
from ..provenance import ProvenanceInterface | from ..provenance import ProvenanceInterface | ||||
from ..revision import RevisionEntry | from ..revision import RevisionEntry | ||||
from datetime import datetime | |||||
######################################################################################## | from typing import Any, Dict, List, Optional | ||||
######################################################################################## | |||||
######################################################################################## | |||||
class ProvenancePostgreSQLNoPath(ProvenanceInterface): | class ProvenanceDBBase(ProvenanceInterface): | ||||
def __init__(self, conn: psycopg2.extensions.connection): | def __init__(self, conn: psycopg2.extensions.connection): | ||||
# TODO: consider adding a mutex for thread safety | # TODO: consider adding a mutex for thread safety | ||||
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) | conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) | ||||
self.conn = conn | self.conn = conn | ||||
self.cursor = self.conn.cursor() | self.cursor = self.conn.cursor() | ||||
self.insert_cache: Dict[str, Any] = {} | self.insert_cache: Dict[str, Any] = {} | ||||
self.remove_cache: Dict[str, Any] = {} | self.remove_cache: Dict[str, Any] = {} | ||||
self.select_cache: Dict[str, Any] = {} | self.select_cache: Dict[str, Any] = {} | ||||
Show All 21 Lines | def commit(self): | ||||
result = True | result = True | ||||
except Exception as error: | except Exception as error: | ||||
# Unexpected error occurred, rollback all changes and log message | # Unexpected error occurred, rollback all changes and log message | ||||
logging.error(f"Unexpected error: {error}") | logging.error(f"Unexpected error: {error}") | ||||
return result | return result | ||||
def content_add_to_directory( | |||||
self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes | |||||
): | |||||
self.insert_cache["content_in_dir"].add((blob.id, directory.id)) | |||||
def content_add_to_revision( | |||||
self, revision: RevisionEntry, blob: FileEntry, prefix: bytes | |||||
): | |||||
self.insert_cache["content_early_in_rev"].add((blob.id, revision.id)) | |||||
def content_find_first( | |||||
self, blobid: bytes | |||||
) -> Optional[Tuple[bytes, bytes, datetime, bytes]]: | |||||
self.cursor.execute( | |||||
"""SELECT revision.sha1 AS rev, | |||||
revision.date AS date | |||||
FROM (SELECT content_early_in_rev.rev | |||||
FROM content_early_in_rev | |||||
JOIN content | |||||
ON content.id=content_early_in_rev.blob | |||||
WHERE content.sha1=%s | |||||
) AS content_in_rev | |||||
JOIN revision | |||||
ON revision.id=content_in_rev.rev | |||||
ORDER BY date, rev ASC LIMIT 1""", | |||||
(blobid,), | |||||
) | |||||
row = self.cursor.fetchone() | |||||
if row is not None: | |||||
# TODO: query revision from the archive and look for blobid into a | |||||
# recursive directory_ls of the revision's root. | |||||
return blobid, row[0], row[1], b"" | |||||
return None | |||||
def content_find_all( | |||||
self, blobid: bytes | |||||
) -> Generator[Tuple[bytes, bytes, datetime, bytes], None, None]: | |||||
self.cursor.execute( | |||||
"""(SELECT revision.sha1 AS rev, | |||||
revision.date AS date | |||||
FROM (SELECT content_early_in_rev.rev | |||||
FROM content_early_in_rev | |||||
JOIN content | |||||
ON content.id=content_early_in_rev.blob | |||||
WHERE content.sha1=%s | |||||
) AS content_in_rev | |||||
JOIN revision | |||||
ON revision.id=content_in_rev.rev | |||||
) | |||||
UNION | |||||
(SELECT revision.sha1 AS rev, | |||||
revision.date AS date | |||||
FROM (SELECT directory_in_rev.rev | |||||
FROM (SELECT content_in_dir.dir | |||||
FROM content_in_dir | |||||
JOIN content | |||||
ON content_in_dir.blob=content.id | |||||
WHERE content.sha1=%s | |||||
) AS content_dir | |||||
JOIN directory_in_rev | |||||
ON directory_in_rev.dir=content_dir.dir | |||||
) AS content_in_rev | |||||
JOIN revision | |||||
ON revision.id=content_in_rev.rev | |||||
) | |||||
ORDER BY date, rev""", | |||||
(blobid, blobid), | |||||
) | |||||
# TODO: use POSTGRESQL EXPLAIN looking for query optimizations. | |||||
for row in self.cursor.fetchall(): | |||||
# TODO: query revision from the archive and look for blobid into a | |||||
# recursive directory_ls of the revision's root. | |||||
yield blobid, row[0], row[1], b"" | |||||
def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: | def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]: | ||||
# First check if the date is being modified by current transection. | # First check if the date is being modified by current transection. | ||||
date = self.insert_cache["content"].get(blob.id, None) | date = self.insert_cache["content"].get(blob.id, None) | ||||
if date is None: | if date is None: | ||||
# If not, check whether it's been query before | # If not, check whether it's been query before | ||||
date = self.select_cache["content"].get(blob.id, None) | date = self.select_cache["content"].get(blob.id, None) | ||||
if date is None: | if date is None: | ||||
# Otherwise, query the database and cache the value | # Otherwise, query the database and cache the value | ||||
Show All 30 Lines | def content_get_early_dates(self, blobs: List[FileEntry]) -> Dict[bytes, datetime]: | ||||
for row in self.cursor.fetchall(): | for row in self.cursor.fetchall(): | ||||
dates[row[0]] = row[1] | dates[row[0]] = row[1] | ||||
self.select_cache["content"][row[0]] = row[1] | self.select_cache["content"][row[0]] = row[1] | ||||
return dates | return dates | ||||
def content_set_early_date(self, blob: FileEntry, date: datetime): | def content_set_early_date(self, blob: FileEntry, date: datetime): | ||||
self.insert_cache["content"][blob.id] = date | self.insert_cache["content"][blob.id] = date | ||||
def directory_add_to_revision( | |||||
self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes | |||||
): | |||||
self.insert_cache["directory_in_rev"].add((directory.id, revision.id)) | |||||
def directory_get_date_in_isochrone_frontier( | def directory_get_date_in_isochrone_frontier( | ||||
self, directory: DirectoryEntry | self, directory: DirectoryEntry | ||||
) -> Optional[datetime]: | ) -> Optional[datetime]: | ||||
# First check if the date is being modified by current transection. | # First check if the date is being modified by current transection. | ||||
date = self.insert_cache["directory"].get(directory.id, None) | date = self.insert_cache["directory"].get(directory.id, None) | ||||
if date is None and directory.id not in self.remove_cache["directory"]: | if date is None and directory.id not in self.remove_cache["directory"]: | ||||
# If not, check whether it's been query before | # If not, check whether it's been query before | ||||
date = self.select_cache["directory"].get(directory.id, None) | date = self.select_cache["directory"].get(directory.id, None) | ||||
▲ Show 20 Lines • Show All 104 Lines • ▼ Show 20 Lines | def insert_all(self): | ||||
# psycopg2.extras.execute_values( | # psycopg2.extras.execute_values( | ||||
# self.cursor, | # self.cursor, | ||||
# """INSERT INTO revision_in_org VALUES %s | # """INSERT INTO revision_in_org VALUES %s | ||||
# ON CONFLICT DO NOTHING""", | # ON CONFLICT DO NOTHING""", | ||||
# self.insert_cache["revision_in_org"], | # self.insert_cache["revision_in_org"], | ||||
# ) | # ) | ||||
# self.insert_cache["revision_in_org"].clear() | # self.insert_cache["revision_in_org"].clear() | ||||
def insert_location(self, src0_table, src1_table, dst_table): | |||||
# Resolve src0 ids | |||||
src0_values = dict().fromkeys( | |||||
map(operator.itemgetter(0), self.insert_cache[dst_table]) | |||||
) | |||||
values = ", ".join(itertools.repeat("%s", len(src0_values))) | |||||
self.cursor.execute( | |||||
f"""SELECT sha1, id FROM {src0_table} WHERE sha1 IN ({values})""", | |||||
tuple(src0_values), | |||||
) | |||||
src0_values = dict(self.cursor.fetchall()) | |||||
# Resolve src1 ids | |||||
src1_values = dict().fromkeys( | |||||
map(operator.itemgetter(1), self.insert_cache[dst_table]) | |||||
) | |||||
values = ", ".join(itertools.repeat("%s", len(src1_values))) | |||||
self.cursor.execute( | |||||
f"""SELECT sha1, id FROM {src1_table} WHERE sha1 IN ({values})""", | |||||
tuple(src1_values), | |||||
) | |||||
src1_values = dict(self.cursor.fetchall()) | |||||
# Insert values in dst_table | |||||
rows = map( | |||||
lambda row: (src0_values[row[0]], src1_values[row[1]]), | |||||
self.insert_cache[dst_table], | |||||
) | |||||
psycopg2.extras.execute_values( | |||||
self.cursor, | |||||
f"""INSERT INTO {dst_table} VALUES %s | |||||
ON CONFLICT DO NOTHING""", | |||||
rows, | |||||
) | |||||
self.insert_cache[dst_table].clear() | |||||
def origin_get_id(self, origin: OriginEntry) -> int: | def origin_get_id(self, origin: OriginEntry) -> int: | ||||
if origin.id is None: | if origin.id is None: | ||||
# Insert origin in the DB and return the assigned id | # Insert origin in the DB and return the assigned id | ||||
self.cursor.execute( | self.cursor.execute( | ||||
"""INSERT INTO origin (url) VALUES (%s) | """INSERT INTO origin (url) VALUES (%s) | ||||
ON CONFLICT DO NOTHING | ON CONFLICT DO NOTHING | ||||
RETURNING id""", | RETURNING id""", | ||||
(origin.url,), | (origin.url,), | ||||
▲ Show 20 Lines • Show All 73 Lines • Show Last 20 Lines |