Page MenuHomeSoftware Heritage

provenance.py
No OneTemporary

provenance.py

import itertools
import logging
import operator
import os
from datetime import datetime
from typing import Any, Dict, Generator, List, Optional, Tuple
import psycopg2
import psycopg2.extras
from ..model import DirectoryEntry, FileEntry
from ..origin import OriginEntry
from ..postgresql.db_utils import connect, execute_sql
from ..provenance import ProvenanceInterface
from ..revision import RevisionEntry
########################################################################################
########################################################################################
########################################################################################
class ProvenancePostgreSQLNoPath(ProvenanceInterface):
def __init__(self, conn: psycopg2.extensions.connection):
# TODO: consider adding a mutex for thread safety
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
self.conn = conn
self.cursor = self.conn.cursor()
self.insert_cache: Dict[str, Any] = {}
self.remove_cache: Dict[str, Any] = {}
self.select_cache: Dict[str, Any] = {}
self.clear_caches()
def clear_caches(self):
self.insert_cache = {
"content": dict(),
"content_early_in_rev": set(),
"content_in_dir": set(),
"directory": dict(),
"directory_in_rev": set(),
"revision": dict(),
"revision_before_rev": list(),
"revision_in_org": list(),
}
self.remove_cache = {"directory": dict()}
self.select_cache = {"content": dict(), "directory": dict(), "revision": dict()}
def commit(self):
result = False
try:
self.insert_all()
self.clear_caches()
result = True
except Exception as error:
# Unexpected error occurred, rollback all changes and log message
logging.error(f"Unexpected error: {error}")
return result
def content_add_to_directory(
self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes
):
self.insert_cache["content_in_dir"].add((blob.id, directory.id))
def content_add_to_revision(
self, revision: RevisionEntry, blob: FileEntry, prefix: bytes
):
self.insert_cache["content_early_in_rev"].add((blob.id, revision.id))
def content_find_first(
self, blobid: bytes
) -> Optional[Tuple[bytes, bytes, datetime, bytes]]:
self.cursor.execute(
"""SELECT revision.sha1 AS rev,
revision.date AS date
FROM (SELECT content_early_in_rev.rev
FROM content_early_in_rev
JOIN content
ON content.id=content_early_in_rev.blob
WHERE content.sha1=%s
) AS content_in_rev
JOIN revision
ON revision.id=content_in_rev.rev
ORDER BY date, rev ASC LIMIT 1""",
(blobid,),
)
row = self.cursor.fetchone()
if row is not None:
# TODO: query revision from the archive and look for blobid into a
# recursive directory_ls of the revision's root.
return blobid, row[0], row[1], b""
return None
def content_find_all(
self, blobid: bytes
) -> Generator[Tuple[bytes, bytes, datetime, bytes], None, None]:
self.cursor.execute(
"""(SELECT revision.sha1 AS rev,
revision.date AS date
FROM (SELECT content_early_in_rev.rev
FROM content_early_in_rev
JOIN content
ON content.id=content_early_in_rev.blob
WHERE content.sha1=%s
) AS content_in_rev
JOIN revision
ON revision.id=content_in_rev.rev
)
UNION
(SELECT revision.sha1 AS rev,
revision.date AS date
FROM (SELECT directory_in_rev.rev
FROM (SELECT content_in_dir.dir
FROM content_in_dir
JOIN content
ON content_in_dir.blob=content.id
WHERE content.sha1=%s
) AS content_dir
JOIN directory_in_rev
ON directory_in_rev.dir=content_dir.dir
) AS content_in_rev
JOIN revision
ON revision.id=content_in_rev.rev
)
ORDER BY date, rev""",
(blobid, blobid),
)
# TODO: use POSTGRESQL EXPLAIN looking for query optimizations.
for row in self.cursor.fetchall():
# TODO: query revision from the archive and look for blobid into a
# recursive directory_ls of the revision's root.
yield blobid, row[0], row[1], b""
def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]:
# First check if the date is being modified by current transection.
date = self.insert_cache["content"].get(blob.id, None)
if date is None:
# If not, check whether it's been query before
date = self.select_cache["content"].get(blob.id, None)
if date is None:
# Otherwise, query the database and cache the value
self.cursor.execute(
"""SELECT date FROM content WHERE sha1=%s""", (blob.id,)
)
row = self.cursor.fetchone()
date = row[0] if row is not None else None
self.select_cache["content"][blob.id] = date
return date
def content_get_early_dates(self, blobs: List[FileEntry]) -> Dict[bytes, datetime]:
dates = {}
pending = []
for blob in blobs:
# First check if the date is being modified by current transection.
date = self.insert_cache["content"].get(blob.id, None)
if date is not None:
dates[blob.id] = date
else:
# If not, check whether it's been query before
date = self.select_cache["content"].get(blob.id, None)
if date is not None:
dates[blob.id] = date
else:
pending.append(blob.id)
if pending:
# Otherwise, query the database and cache the values
values = ", ".join(itertools.repeat("%s", len(pending)))
self.cursor.execute(
f"""SELECT sha1, date FROM content WHERE sha1 IN ({values})""",
tuple(pending),
)
for row in self.cursor.fetchall():
dates[row[0]] = row[1]
self.select_cache["content"][row[0]] = row[1]
return dates
def content_set_early_date(self, blob: FileEntry, date: datetime):
self.insert_cache["content"][blob.id] = date
def directory_add_to_revision(
self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes
):
self.insert_cache["directory_in_rev"].add((directory.id, revision.id))
def directory_get_date_in_isochrone_frontier(
self, directory: DirectoryEntry
) -> Optional[datetime]:
# First check if the date is being modified by current transection.
date = self.insert_cache["directory"].get(directory.id, None)
if date is None and directory.id not in self.remove_cache["directory"]:
# If not, check whether it's been query before
date = self.select_cache["directory"].get(directory.id, None)
if date is None:
# Otherwise, query the database and cache the value
self.cursor.execute(
"""SELECT date FROM directory WHERE sha1=%s""", (directory.id,)
)
row = self.cursor.fetchone()
date = row[0] if row is not None else None
self.select_cache["directory"][directory.id] = date
return date
def directory_get_dates_in_isochrone_frontier(
self, dirs: List[DirectoryEntry]
) -> Dict[bytes, datetime]:
dates = {}
pending = []
for directory in dirs:
# First check if the date is being modified by current transection.
date = self.insert_cache["directory"].get(directory.id, None)
if date is not None:
dates[directory.id] = date
elif directory.id not in self.remove_cache["directory"]:
# If not, check whether it's been query before
date = self.select_cache["directory"].get(directory.id, None)
if date is not None:
dates[directory.id] = date
else:
pending.append(directory.id)
if pending:
# Otherwise, query the database and cache the values
values = ", ".join(itertools.repeat("%s", len(pending)))
self.cursor.execute(
f"""SELECT sha1, date FROM directory WHERE sha1 IN ({values})""",
tuple(pending),
)
for row in self.cursor.fetchall():
dates[row[0]] = row[1]
self.select_cache["directory"][row[0]] = row[1]
return dates
def directory_invalidate_in_isochrone_frontier(self, directory: DirectoryEntry):
self.remove_cache["directory"][directory.id] = None
self.insert_cache["directory"].pop(directory.id, None)
def directory_set_date_in_isochrone_frontier(
self, directory: DirectoryEntry, date: datetime
):
self.insert_cache["directory"][directory.id] = date
self.remove_cache["directory"].pop(directory.id, None)
def insert_all(self):
# Performe insertions with cached information
if self.insert_cache["content"]:
psycopg2.extras.execute_values(
self.cursor,
"""LOCK TABLE ONLY content;
INSERT INTO content(sha1, date) VALUES %s
ON CONFLICT (sha1) DO
UPDATE SET date=LEAST(EXCLUDED.date,content.date)""",
self.insert_cache["content"].items(),
)
self.insert_cache["content"].clear()
if self.insert_cache["directory"]:
psycopg2.extras.execute_values(
self.cursor,
"""LOCK TABLE ONLY directory;
INSERT INTO directory(sha1, date) VALUES %s
ON CONFLICT (sha1) DO
UPDATE SET date=LEAST(EXCLUDED.date,directory.date)""",
self.insert_cache["directory"].items(),
)
self.insert_cache["directory"].clear()
if self.insert_cache["revision"]:
psycopg2.extras.execute_values(
self.cursor,
"""LOCK TABLE ONLY revision;
INSERT INTO revision(sha1, date) VALUES %s
ON CONFLICT (sha1) DO
UPDATE SET date=LEAST(EXCLUDED.date,revision.date)""",
self.insert_cache["revision"].items(),
)
self.insert_cache["revision"].clear()
# Relations should come after ids for elements were resolved
if self.insert_cache["content_early_in_rev"]:
self.insert_location("content", "revision", "content_early_in_rev")
if self.insert_cache["content_in_dir"]:
self.insert_location("content", "directory", "content_in_dir")
if self.insert_cache["directory_in_rev"]:
self.insert_location("directory", "revision", "directory_in_rev")
# if self.insert_cache["revision_before_rev"]:
# psycopg2.extras.execute_values(
# self.cursor,
# """INSERT INTO revision_before_rev VALUES %s
# ON CONFLICT DO NOTHING""",
# self.insert_cache["revision_before_rev"],
# )
# self.insert_cache["revision_before_rev"].clear()
# if self.insert_cache["revision_in_org"]:
# psycopg2.extras.execute_values(
# self.cursor,
# """INSERT INTO revision_in_org VALUES %s
# ON CONFLICT DO NOTHING""",
# self.insert_cache["revision_in_org"],
# )
# self.insert_cache["revision_in_org"].clear()
def insert_location(self, src0_table, src1_table, dst_table):
# Resolve src0 ids
src0_values = dict().fromkeys(
map(operator.itemgetter(0), self.insert_cache[dst_table])
)
values = ", ".join(itertools.repeat("%s", len(src0_values)))
self.cursor.execute(
f"""SELECT sha1, id FROM {src0_table} WHERE sha1 IN ({values})""",
tuple(src0_values),
)
src0_values = dict(self.cursor.fetchall())
# Resolve src1 ids
src1_values = dict().fromkeys(
map(operator.itemgetter(1), self.insert_cache[dst_table])
)
values = ", ".join(itertools.repeat("%s", len(src1_values)))
self.cursor.execute(
f"""SELECT sha1, id FROM {src1_table} WHERE sha1 IN ({values})""",
tuple(src1_values),
)
src1_values = dict(self.cursor.fetchall())
# Insert values in dst_table
rows = map(
lambda row: (src0_values[row[0]], src1_values[row[1]]),
self.insert_cache[dst_table],
)
psycopg2.extras.execute_values(
self.cursor,
f"""INSERT INTO {dst_table} VALUES %s
ON CONFLICT DO NOTHING""",
rows,
)
self.insert_cache[dst_table].clear()
def origin_get_id(self, origin: OriginEntry) -> int:
if origin.id is None:
# Insert origin in the DB and return the assigned id
self.cursor.execute(
"""INSERT INTO origin (url) VALUES (%s)
ON CONFLICT DO NOTHING
RETURNING id""",
(origin.url,),
)
return self.cursor.fetchone()[0]
else:
return origin.id
def revision_add(self, revision: RevisionEntry):
# Add current revision to the compact DB
self.insert_cache["revision"][revision.id] = revision.date
def revision_add_before_revision(
self, relative: RevisionEntry, revision: RevisionEntry
):
self.insert_cache["revision_before_rev"].append((revision.id, relative.id))
def revision_add_to_origin(self, origin: OriginEntry, revision: RevisionEntry):
self.insert_cache["revision_in_org"].append((revision.id, origin.id))
def revision_get_early_date(self, revision: RevisionEntry) -> Optional[datetime]:
date = self.insert_cache["revision"].get(revision.id, None)
if date is None:
# If not, check whether it's been query before
date = self.select_cache["revision"].get(revision.id, None)
if date is None:
# Otherwise, query the database and cache the value
self.cursor.execute(
"""SELECT date FROM revision WHERE sha1=%s""", (revision.id,)
)
row = self.cursor.fetchone()
date = row[0] if row is not None else None
self.select_cache["revision"][revision.id] = date
return date
def revision_get_preferred_origin(self, revision: RevisionEntry) -> int:
# TODO: adapt this method to consider cached values
self.cursor.execute(
"""SELECT COALESCE(org,0) FROM revision WHERE sha1=%s""", (revision.id,)
)
row = self.cursor.fetchone()
# None means revision is not in database;
# 0 means revision has no preferred origin
return row[0] if row is not None and row[0] != 0 else None
def revision_in_history(self, revision: RevisionEntry) -> bool:
# TODO: adapt this method to consider cached values
self.cursor.execute(
"""SELECT 1
FROM revision_before_rev
JOIN revision
ON revision.id=revision_before_rev.prev
WHERE revision.sha1=%s""",
(revision.id,),
)
return self.cursor.fetchone() is not None
def revision_set_preferred_origin(
self, origin: OriginEntry, revision: RevisionEntry
):
# TODO: adapt this method to consider cached values
self.cursor.execute(
"""UPDATE revision SET org=%s WHERE sha1=%s""", (origin.id, revision.id)
)
def revision_visited(self, revision: RevisionEntry) -> bool:
# TODO: adapt this method to consider cached values
self.cursor.execute(
"""SELECT 1
FROM revision_in_org
JOIN revision
ON revision.id=revision_in_org.rev
WHERE revision.sha1=%s""",
(revision.id,),
)
return self.cursor.fetchone() is not None

File Metadata

Mime Type
text/x-python
Expires
Fri, Jul 4, 1:25 PM (5 d, 10 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3348853

Event Timeline