Changeset View
Changeset View
Standalone View
Standalone View
swh/provenance/postgresql/provenancedb_base.py
from datetime import datetime | from datetime import datetime | ||||
import itertools | import itertools | ||||
import logging | import logging | ||||
from typing import Any, Dict, List, Optional | from typing import Any, Dict, List, Optional, Set | ||||
import psycopg2 | import psycopg2 | ||||
import psycopg2.extras | import psycopg2.extras | ||||
from ..model import DirectoryEntry, FileEntry | from ..model import DirectoryEntry, FileEntry | ||||
from ..origin import OriginEntry | from ..origin import OriginEntry | ||||
from ..revision import RevisionEntry | from ..revision import RevisionEntry | ||||
class ProvenanceDBBase: | class ProvenanceDBBase: | ||||
def __init__(self, conn: psycopg2.extensions.connection): | def __init__(self, conn: psycopg2.extensions.connection): | ||||
# TODO: consider adding a mutex for thread safety | # TODO: consider adding a mutex for thread safety | ||||
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) | conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) | ||||
conn.set_session(autocommit=True) | conn.set_session(autocommit=True) | ||||
self.conn = conn | self.conn = conn | ||||
self.cursor = self.conn.cursor() | self.cursor = self.conn.cursor() | ||||
# XXX: not sure this is the best place to do it! | # XXX: not sure this is the best place to do it! | ||||
self.cursor.execute("SET timezone TO 'UTC'") | self.cursor.execute("SET timezone TO 'UTC'") | ||||
self.insert_cache: Dict[str, Any] = {} | self.insert_cache: Dict[str, Any] = {} | ||||
self.remove_cache: Dict[str, Any] = {} | self.remove_cache: Dict[str, Set[bytes]] = {} | ||||
self.select_cache: Dict[str, Any] = {} | self.select_cache: Dict[str, Any] = {} | ||||
self.clear_caches() | self.clear_caches() | ||||
def clear_caches(self): | def clear_caches(self): | ||||
self.insert_cache = { | self.insert_cache = { | ||||
"content": dict(), | "content": dict(), | ||||
"content_early_in_rev": set(), | "content_early_in_rev": set(), | ||||
"content_in_dir": set(), | "content_in_dir": set(), | ||||
"directory": dict(), | "directory": dict(), | ||||
"directory_in_rev": set(), | "directory_in_rev": set(), | ||||
"revision": dict(), | "revision": dict(), | ||||
"revision_before_rev": list(), | "revision_before_rev": list(), | ||||
"revision_in_org": list(), | "revision_in_org": list(), | ||||
} | } | ||||
self.remove_cache = {"directory": dict()} | self.remove_cache = {"directory": set()} | ||||
self.select_cache = {"content": dict(), "directory": dict(), "revision": dict()} | self.select_cache = {"content": dict(), "directory": dict(), "revision": dict()} | ||||
def commit(self): | def commit(self): | ||||
result = False | result = False | ||||
try: | try: | ||||
self.insert_all() | self.insert_all() | ||||
self.clear_caches() | self.clear_caches() | ||||
result = True | result = True | ||||
▲ Show 20 Lines • Show All 93 Lines • ▼ Show 20 Lines | ) -> Dict[bytes, datetime]: | ||||
tuple(pending), | tuple(pending), | ||||
) | ) | ||||
for row in self.cursor.fetchall(): | for row in self.cursor.fetchall(): | ||||
dates[row[0]] = row[1] | dates[row[0]] = row[1] | ||||
self.select_cache["directory"][row[0]] = row[1] | self.select_cache["directory"][row[0]] = row[1] | ||||
return dates | return dates | ||||
def directory_invalidate_in_isochrone_frontier(self, directory: DirectoryEntry): | def directory_invalidate_in_isochrone_frontier(self, directory: DirectoryEntry): | ||||
self.remove_cache["directory"][directory.id] = None | self.remove_cache["directory"].add(directory.id) | ||||
self.insert_cache["directory"].pop(directory.id, None) | self.insert_cache["directory"].pop(directory.id, None) | ||||
def directory_set_date_in_isochrone_frontier( | def directory_set_date_in_isochrone_frontier( | ||||
self, directory: DirectoryEntry, date: datetime | self, directory: DirectoryEntry, date: datetime | ||||
): | ): | ||||
self.insert_cache["directory"][directory.id] = date | self.insert_cache["directory"][directory.id] = date | ||||
self.remove_cache["directory"].pop(directory.id, None) | self.remove_cache["directory"].discard(directory.id) | ||||
def insert_all(self): | def insert_all(self): | ||||
# Performe insertions with cached information | # Performe insertions with cached information | ||||
if self.insert_cache["content"]: | if self.insert_cache["content"]: | ||||
psycopg2.extras.execute_values( | psycopg2.extras.execute_values( | ||||
self.cursor, | self.cursor, | ||||
""" | """ | ||||
LOCK TABLE ONLY content; | LOCK TABLE ONLY content; | ||||
▲ Show 20 Lines • Show All 156 Lines • Show Last 20 Lines |