Changeset View
Changeset View
Standalone View
Standalone View
swh/provenance/postgresql/provenance.py
import itertools | import itertools | ||||
import logging | import logging | ||||
import operator | import operator | ||||
import os | import os | ||||
from datetime import datetime | |||||
from typing import Any, Dict, Generator, List, Optional, Tuple | |||||
import psycopg2 | import psycopg2 | ||||
import psycopg2.extras | import psycopg2.extras | ||||
from ..model import DirectoryEntry, FileEntry | from ..model import DirectoryEntry, FileEntry | ||||
from ..origin import OriginEntry | from ..origin import OriginEntry | ||||
from .db_utils import connect, execute_sql | |||||
from ..provenance import ProvenanceInterface | from ..provenance import ProvenanceInterface | ||||
from ..revision import RevisionEntry | from ..revision import RevisionEntry | ||||
from .db_utils import connect, execute_sql | |||||
from datetime import datetime | |||||
from typing import Any, Dict, Generator, List, Optional, Tuple | |||||
def normalize(path: bytes) -> bytes: | def normalize(path: bytes) -> bytes: | ||||
return path[2:] if path.startswith(bytes("." + os.path.sep, "utf-8")) else path | return path[2:] if path.startswith(bytes("." + os.path.sep, "utf-8")) else path | ||||
def create_database(conn: psycopg2.extensions.connection, conninfo: dict, name: str): | def create_database(conn: psycopg2.extensions.connection, conninfo: dict, name: str): | ||||
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) | conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) | ||||
▲ Show 20 Lines • Show All 433 Lines • ▼ Show 20 Lines | def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]: | ||||
self.cursor.execute( | self.cursor.execute( | ||||
"""SELECT date FROM revision WHERE sha1=%s""", (revision.id,) | """SELECT date FROM revision WHERE sha1=%s""", (revision.id,) | ||||
) | ) | ||||
row = self.cursor.fetchone() | row = self.cursor.fetchone() | ||||
date = row[0] if row is not None else None | date = row[0] if row is not None else None | ||||
self.select_cache["revision"][revision.id] = date | self.select_cache["revision"][revision.id] = date | ||||
return date | return date | ||||
def revision_get_prefered_origin(self, revision: RevisionEntry) -> int: | def revision_get_preferred_origin(self, revision: RevisionEntry) -> int: | ||||
# TODO: adapt this method to consider cached values | # TODO: adapt this method to consider cached values | ||||
self.cursor.execute( | self.cursor.execute( | ||||
"""SELECT COALESCE(org,0) FROM revision WHERE sha1=%s""", (revision.id,) | """SELECT COALESCE(org,0) FROM revision WHERE sha1=%s""", (revision.id,) | ||||
) | ) | ||||
row = self.cursor.fetchone() | row = self.cursor.fetchone() | ||||
# None means revision is not in database; | # None means revision is not in database; | ||||
# 0 means revision has no prefered origin | # 0 means revision has no preferred origin | ||||
return row[0] if row is not None and row[0] != 0 else None | return row[0] if row is not None and row[0] != 0 else None | ||||
def revision_in_history(self, revision: RevisionEntry) -> bool: | def revision_in_history(self, revision: RevisionEntry) -> bool: | ||||
# TODO: adapt this method to consider cached values | # TODO: adapt this method to consider cached values | ||||
self.cursor.execute( | self.cursor.execute( | ||||
"""SELECT 1 | """SELECT 1 | ||||
FROM revision_before_rev | FROM revision_before_rev | ||||
JOIN revision | JOIN revision | ||||
ON revision.id=revision_before_rev.prev | ON revision.id=revision_before_rev.prev | ||||
WHERE revision.sha1=%s""", | WHERE revision.sha1=%s""", | ||||
(revision.id,), | (revision.id,), | ||||
) | ) | ||||
return self.cursor.fetchone() is not None | return self.cursor.fetchone() is not None | ||||
def revision_set_prefered_origin( | def revision_set_preferred_origin( | ||||
self, origin: OriginEntry, revision: RevisionEntry | self, origin: OriginEntry, revision: RevisionEntry | ||||
): | ): | ||||
# TODO: adapt this method to consider cached values | # TODO: adapt this method to consider cached values | ||||
self.cursor.execute( | self.cursor.execute( | ||||
"""UPDATE revision SET org=%s WHERE sha1=%s""", (origin.id, revision.id) | """UPDATE revision SET org=%s WHERE sha1=%s""", (origin.id, revision.id) | ||||
) | ) | ||||
def revision_visited(self, revision: RevisionEntry) -> bool: | def revision_visited(self, revision: RevisionEntry) -> bool: | ||||
Show All 10 Lines |