Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/contents.py b/contents.py
new file mode 100644
index 0000000..299a602
--- /dev/null
+++ b/contents.py
@@ -0,0 +1,51 @@
+import os
+import psycopg2
+
+from swh.model.hashutil import (hash_to_bytes, hash_to_hex)
+from swh.provenance.provenance import get_provenance
+
+
+if __name__ == "__main__":
+ conninfo = {
+ "host": "localhost",
+ "database": "test2",
+ "user": "postgres",
+ "password": "postgres"
+ }
+ provenance = get_provenance(conninfo)
+
+ print('content(id, date): ################################################')
+ provenance.cursor.execute('''SELECT id, date FROM content''')
+ for row in provenance.cursor.fetchall():
+ print(f'{hash_to_hex(row[0])}, {row[1]}')
+ print('###################################################################')
+
+ print('content_early_in_rev(blob, rev, path): ############################')
+ provenance.cursor.execute('''SELECT blob, rev, path FROM content_early_in_rev''')
+ for row in provenance.cursor.fetchall():
+ print(f'{hash_to_hex(row[0])}, {hash_to_hex(row[1])}, {os.fsdecode(row[2])}')
+ print('###################################################################')
+
+ print('content_in_dir(blob, dir, path): ##################################')
+ provenance.cursor.execute('''SELECT blob, dir, path FROM content_in_dir''')
+ for row in provenance.cursor.fetchall():
+ print(f'{hash_to_hex(row[0])}, {hash_to_hex(row[1])}, {os.fsdecode(row[2])}')
+ print('###################################################################')
+
+ print('directory(id, date): ##############################################')
+ provenance.cursor.execute('''SELECT id, date FROM directory''')
+ for row in provenance.cursor.fetchall():
+ print(f'{hash_to_hex(row[0])}, {row[1]}')
+ print('###################################################################')
+
+ print('directory_in_rev(dir, rev, path): #################################')
+ provenance.cursor.execute('''SELECT dir, rev, path FROM directory_in_rev''')
+ for row in provenance.cursor.fetchall():
+ print(f'{hash_to_hex(row[0])}, {hash_to_hex(row[1])}, {os.fsdecode(row[2])}')
+ print('###################################################################')
+
+ print('revision(id, date): ###############################################')
+ provenance.cursor.execute('''SELECT id, date FROM revision''')
+ for row in provenance.cursor.fetchall():
+ print(f'{hash_to_hex(row[0])}, {row[1]}')
+ print('###################################################################')
diff --git a/revisions.py b/revisions.py
new file mode 100644
index 0000000..eff0fcf
--- /dev/null
+++ b/revisions.py
@@ -0,0 +1,62 @@
+import io
+import random
+import pytz
+
+from datetime import datetime
+
+from swh.model.hashutil import (hash_to_bytes, hash_to_hex)
+from swh.storage import get_storage
+from swh.provenance.revision import RevisionEntry
+
+
+def rev_to_csv(revision: RevisionEntry):
+ return ','.join([
+ hash_to_hex(revision.id),
+ str(pytz.utc.localize(revision.date)),
+ hash_to_hex(revision.root)
+ ]) + '\n'
+
+
+if __name__ == "__main__":
+ conninfo = {
+ "cls": "remote",
+ "url": "http://uffizi.internal.softwareheritage.org:5002"
+ }
+ storage = get_storage(**conninfo)
+
+ revisions = [
+ # '6eec5815ef8fc88d9fc5bcc91c6465a8899c1445',
+ # 'd1468bb5f06ca44cc42c43fbd011c5dcbdc262c6',
+ # '6a45ebb887d87ee53f359aaeba8a9840576c907b'
+ '02f95c0a1868cbef82ff73fc1b903183a579c7de',
+ 'da061f1caf293a5da00bff6a45abcf4d7ae54c50',
+ 'e3bfd73a9fd8ef3dd4c5b05a927de485f9871323'
+ ]
+ print(revisions)
+
+ revisions = list(map(hash_to_bytes, revisions))
+ print(revisions)
+
+ entries = []
+ for revision in storage.revision_get(revisions):
+ if revision is not None:
+ print(revision)
+ entries.append(RevisionEntry(
+ storage,
+ revision.id,
+ datetime.fromtimestamp(revision.date.timestamp.seconds),
+ revision.directory
+ ))
+
+ random.shuffle(entries)
+ with io.open('random.csv', 'w') as outfile:
+ for revision in entries:
+ outfile.write(rev_to_csv(revision))
+
+ with io.open('ordered.csv', 'w') as outfile:
+ for revision in sorted(entries, key=lambda rev: rev.date):
+ outfile.write(rev_to_csv(revision))
+
+ with io.open('reverse.csv', 'w') as outfile:
+ for revision in sorted(entries, key=lambda rev: rev.date, reverse=True):
+ outfile.write(rev_to_csv(revision))
diff --git a/swh/provenance/archive.py b/swh/provenance/archive.py
new file mode 100644
index 0000000..4bc725e
--- /dev/null
+++ b/swh/provenance/archive.py
@@ -0,0 +1,82 @@
+import psycopg2
+
+from .db_utils import connect
+
+from typing import List
+
+from swh.storage import get_storage
+
+
+class ArchiveInterface:
+ def __init__(self):
+ raise NotImplementedError
+
+ def directory_ls(self, id: bytes):
+ raise NotImplementedError
+
+ def revision_get(self, ids: List[bytes]):
+ raise NotImplementedError
+
+
+class ArchiveStorage(ArchiveInterface):
+ def __init__(self, cls: str, **kwargs):
+ self.storage = get_storage(cls, **kwargs)
+
+ def directory_ls(self, id: bytes):
+ # TODO: filter unused fields
+ yield from self.storage.directory_ls(id)
+
+ def revision_get(self, ids: List[bytes]):
+ # TODO: filter unused fields
+ yield from self.storage.revision_get(ids)
+
+
+class Archive(ArchiveInterface):
+ def __init__(self, conn: psycopg2.extensions.connection):
+ self.conn = conn
+ self.cursor = conn.cursor()
+
+ def directory_ls(self, id: bytes):
+ self.cursor.execute('''WITH
+ dir AS (SELECT id AS dir_id, dir_entries, file_entries, rev_entries
+ FROM directory WHERE id=%s),
+ ls_d AS (SELECT dir_id, unnest(dir_entries) AS entry_id from dir),
+ ls_f AS (SELECT dir_id, unnest(file_entries) AS entry_id from dir),
+ ls_r AS (SELECT dir_id, unnest(rev_entries) AS entry_id from dir)
+ (SELECT 'dir'::directory_entry_type AS type, e.target, e.name, NULL::sha1_git
+ FROM ls_d
+ LEFT JOIN directory_entry_dir e ON ls_d.entry_id=e.id)
+ UNION
+ (WITH known_contents AS
+ (SELECT 'file'::directory_entry_type AS type, e.target, e.name, c.sha1_git
+ FROM ls_f
+ LEFT JOIN directory_entry_file e ON ls_f.entry_id=e.id
+ INNER JOIN content c ON e.target=c.sha1_git)
+ SELECT * FROM known_contents
+ UNION
+ (SELECT 'file'::directory_entry_type AS type, e.target, e.name, c.sha1_git
+ FROM ls_f
+ LEFT JOIN directory_entry_file e ON ls_f.entry_id=e.id
+ LEFT JOIN skipped_content c ON e.target=c.sha1_git
+ WHERE NOT EXISTS (SELECT 1 FROM known_contents WHERE known_contents.sha1_git=e.target)))
+ UNION
+ (SELECT 'rev'::directory_entry_type AS type, e.target, e.name, NULL::sha1_git
+ FROM ls_r
+ LEFT JOIN directory_entry_rev e ON ls_r.entry_id=e.id)
+ ORDER BY name
+ ''', (id,))
+ for row in self.cursor.fetchall():
+ yield {'type': row[0], 'target': row[1], 'name': row[2]}
+
+ def revision_get(self, ids: List[bytes]):
+ raise NotImplementedError
+
+
+def get_archive(cls: str, **kwargs) -> ArchiveInterface:
+ if cls == "api":
+ return ArchiveStorage(**kwargs["storage"])
+ elif cls == "ps":
+ conn = connect(kwargs["db"])
+ return Archive(conn)
+ else:
+ raise NotImplementedError
diff --git a/swh/provenance/cli.py b/swh/provenance/cli.py
index 959c041..c013181 100644
--- a/swh/provenance/cli.py
+++ b/swh/provenance/cli.py
@@ -1,195 +1,236 @@
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
# WARNING: do not import unnecessary things here to keep cli startup time under
# control
import os
from typing import Any, Dict, Optional
import click
import yaml
from swh.core import config
from swh.core.cli import CONTEXT_SETTINGS
from swh.core.cli import swh as swh_cli_group
from swh.model.hashutil import (hash_to_bytes, hash_to_hex)
-from swh.storage import get_storage
# All generic config code should reside in swh.core.config
CONFIG_ENVVAR = "SWH_CONFIG_FILE"
DEFAULT_CONFIG_PATH = os.path.join(click.get_app_dir("swh"), "global.yml")
DEFAULT_PATH = os.environ.get(CONFIG_ENVVAR, DEFAULT_CONFIG_PATH)
DEFAULT_CONFIG: Dict[str, Any] = {
- "storage": {
- "cls": "remote",
- "url": "http://uffizi.internal.softwareheritage.org:5002"
+ "archive": {
+ # "cls": "api",
+ # "storage": {
+ # "cls": "remote",
+ # "url": "http://uffizi.internal.softwareheritage.org:5002"
+ # }
+ "cls": "ps",
+ "db": {
+ "host": "db.internal.softwareheritage.org",
+ "database": "softwareheritage",
+ "user": "guest"
+ }
},
"db": {
"host": "localhost",
- "database": "provenance",
+ "database": "test3",
"user": "postgres",
"password": "postgres"
}
}
CONFIG_FILE_HELP = f"""Configuration file:
\b
The CLI option or the environment variable will fail if invalid.
CLI option is checked first.
Then, environment variable {CONFIG_ENVVAR} is checked.
Then, if cannot load the default path, a set of default values are used.
Default config path is {DEFAULT_CONFIG_PATH}.
Default config values are:
\b
{yaml.dump(DEFAULT_CONFIG)}"""
PROVENANCE_HELP = f"""Software Heritage Scanner tools.
{CONFIG_FILE_HELP}"""
@swh_cli_group.group(
name="provenance", context_settings=CONTEXT_SETTINGS, help=PROVENANCE_HELP,
)
@click.option(
"-C",
"--config-file",
default=None,
type=click.Path(exists=False, dir_okay=False, path_type=str),
help="""YAML configuration file""",
)
-@click.option("--profile", is_flag=True)
+@click.option("--profile", default=None)
@click.pass_context
-def cli(ctx, config_file: Optional[str], profile: bool):
+def cli(ctx, config_file: Optional[str], profile: str):
if config_file is None and config.config_exists(DEFAULT_PATH):
config_file = DEFAULT_PATH
if config_file is None:
conf = DEFAULT_CONFIG
else:
# read_raw_config do not fail on ENOENT
if not config.config_exists(config_file):
raise FileNotFoundError(config_file)
conf = config.read_raw_config(config.config_basepath(config_file))
conf = config.merge_configs(DEFAULT_CONFIG, conf)
ctx.ensure_object(dict)
ctx.obj["config"] = conf
-
- if profile:
- import cProfile
- import pstats
- import io
- import atexit
-
- print("Profiling...")
- pr = cProfile.Profile()
- pr.enable()
-
- def exit():
- pr.disable()
- print("Profiling completed")
- s = io.StringIO()
- pstats.Stats(pr, stream=s).sort_stats("cumulative").print_stats()
- print(s.getvalue())
-
- atexit.register(exit)
+ ctx.obj["profile"] = profile
+
+ # if profile:
+ # import cProfile
+ # import pstats
+ # import io
+ # import atexit
+ #
+ # print("Profiling...")
+ # pr = cProfile.Profile()
+ # pr.enable()
+ #
+ # def exit():
+ # pr.disable()
+ # print("Profiling completed")
+ # s = io.StringIO()
+ # pstats.Stats(pr, stream=s).sort_stats("cumulative").print_stats()
+ # print(s.getvalue())
+ #
+ # atexit.register(exit)
@cli.command(name="create")
@click.option("--name", default=None)
@click.pass_context
def create(ctx, name):
"""Create new provenance database."""
from .db_utils import connect
from .provenance import create_database
# Connect to server without selecting a database
conninfo = ctx.obj["config"]["db"]
database = conninfo.pop('database', None)
conn = connect(conninfo)
if name is None:
name = database
create_database(conn, conninfo, name)
@cli.command(name="iter-revisions")
@click.argument("filename")
@click.option('-l', '--limit', type=int)
@click.option('-t', '--threads', type=int, default=1)
@click.pass_context
def iter_revisions(ctx, filename, limit, threads):
"""Iterate over provided list of revisions and add them to the provenance database."""
+ from .archive import get_archive
from .revision import FileRevisionIterator
- from .revision import RevisionWorker
+ # from .revision import RevisionWorker
+
+ # conninfo = ctx.obj["config"]["db"]
+ # archive = get_archive(**ctx.obj["config"]["archive"])
+ # revisions = FileRevisionIterator(filename, archive, limit=limit)
+ # workers = []
+ #
+ # for id in range(threads):
+ # worker = RevisionWorker(id, conninfo, archive, revisions)
+ # worker.start()
+ # workers.append(worker)
+ #
+ # for worker in workers:
+ # worker.join()
+
+ ############################################################################
+ archive = get_archive(**ctx.obj["config"]["archive"])
+ revisions = FileRevisionIterator(filename, archive, limit=limit)
+
+ if ctx.obj["profile"]:
+ from .provenance import get_provenance, revision_add
+ import cProfile
- conninfo = ctx.obj["config"]["db"]
- storage = get_storage(**ctx.obj["config"]["storage"])
- revisions = FileRevisionIterator(filename, storage, limit=limit)
- workers = []
+ provenance = get_provenance(ctx.obj["config"]["db"])
+ command = """
+while True:
+ revision = revisions.next()
+ if revision is None: break
+ revision_add(provenance, archive, revision)
+ """
- for id in range(threads):
- worker = RevisionWorker(id, conninfo, storage, revisions)
- worker.start()
- workers.append(worker)
+ cProfile.runctx(command, globals(), locals(), filename=ctx.obj["profile"])
- for worker in workers:
- worker.join()
+ else:
+ from .revision import RevisionWorker
+
+ conninfo = ctx.obj["config"]["db"]
+ workers = []
+
+ for id in range(threads):
+ worker = RevisionWorker(id, conninfo, archive, revisions)
+ worker.start()
+ workers.append(worker)
+
+ for worker in workers:
+ worker.join()
+ ############################################################################
@cli.command(name="iter-origins")
@click.argument("filename")
@click.option('-l', '--limit', type=int)
#@click.option('-t', '--threads', type=int, default=1)
@click.pass_context
#def iter_revisions(ctx, filename, limit, threads):
def iter_origins(ctx, filename, limit):
"""Iterate over provided list of revisions and add them to the provenance database."""
- from .db_utils import connect
+ from .archive import get_archive
from .origin import FileOriginIterator
- from .provenance import origin_add
+ from .provenance import get_provenance, origin_add
- conn = connect(ctx.obj["config"]["db"])
- storage = get_storage(**ctx.obj["config"]["storage"])
+ provenance = get_provenance(ctx.obj["config"]["db"])
+ archive = get_archive(**ctx.obj["config"]["archive"])
- for origin in FileOriginIterator(filename, storage, limit=limit):
+ for origin in FileOriginIterator(filename, archive, limit=limit):
# TODO: consider using threads and a OriginWorker class
- origin_add(conn, storage, origin)
+ origin_add(provenance, origin)
@cli.command(name="find-first")
@click.argument("swhid")
@click.pass_context
def find_first(ctx, swhid):
"""Find first occurrence of the requested blob."""
- from .db_utils import connect
- from .provenance import content_find_first
+ from .provenance import get_provenance
- with connect(ctx.obj["config"]["db"]).cursor() as cursor:
- # TODO: return a dictionary with proper keys for each field
- row = content_find_first(cursor, hash_to_bytes(swhid))
- if row is not None:
- print(f'{hash_to_hex(row[0])}, {hash_to_hex(row[1])}, {row[2]}, {os.fsdecode(row[3])}')
- else:
- print(f'Cannot find a content with the id {swhid}')
+ provenance = get_provenance(ctx.obj["config"]["db"])
+ # TODO: return a dictionary with proper keys for each field
+ row = provenance.content_find_first(hash_to_bytes(swhid))
+ if row is not None:
+ print(f'{hash_to_hex(row[0])}, {hash_to_hex(row[1])}, {row[2]}, {os.fsdecode(row[3])}')
+ else:
+ print(f'Cannot find a content with the id {swhid}')
@cli.command(name="find-all")
@click.argument("swhid")
@click.pass_context
def find_all(ctx, swhid):
"""Find all occurrences of the requested blob."""
- from .db_utils import connect
- from .provenance import content_find_all
+ from .provenance import get_provenance
- with connect(ctx.obj["config"]["db"]).cursor() as cursor:
- # TODO: return a dictionary with proper keys for each field
- for row in content_find_all(cursor, hash_to_bytes(swhid)):
- print(f'{hash_to_hex(row[0])}, {hash_to_hex(row[1])}, {row[2]}, {os.fsdecode(row[3])}')
+ provenance = get_provenance(ctx.obj["config"]["db"])
+ # TODO: return a dictionary with proper keys for each field
+ for row in provenance.content_find_all(hash_to_bytes(swhid)):
+ print(f'{hash_to_hex(row[0])}, {hash_to_hex(row[1])}, {row[2]}, {os.fsdecode(row[3])}')
diff --git a/swh/provenance/model.py b/swh/provenance/model.py
index 4b06320..9858af4 100644
--- a/swh/provenance/model.py
+++ b/swh/provenance/model.py
@@ -1,45 +1,46 @@
import os
+from .archive import ArchiveInterface
+
from pathlib import PosixPath
-from swh.storage.interface import StorageInterface
class Tree:
- def __init__(self, storage: StorageInterface, id: bytes):
- self.root = DirectoryEntry(storage, id, PosixPath('.'))
+ def __init__(self, archive: ArchiveInterface, id: bytes):
+ self.root = DirectoryEntry(archive, id, PosixPath('.'))
class TreeEntry:
def __init__(self, id: bytes, name: PosixPath):
self.id = id
self.name = name
class DirectoryEntry(TreeEntry):
- def __init__(self, storage: StorageInterface, id: bytes, name: PosixPath):
+ def __init__(self, archive: ArchiveInterface, id: bytes, name: PosixPath):
super().__init__(id, name)
- self.storage = storage
+ self.archive = archive
self.children = None
def __iter__(self):
if self.children is None:
self.children = []
- for child in self.storage.directory_ls(self.id):
+ for child in self.archive.directory_ls(self.id):
if child['type'] == 'dir':
self.children.append(DirectoryEntry(
- self.storage,
+ self.archive,
child['target'],
PosixPath(os.fsdecode(child['name']))
))
elif child['type'] == 'file':
self.children.append(FileEntry(
child['target'],
PosixPath(os.fsdecode(child['name']))
))
return iter(self.children)
class FileEntry(TreeEntry):
pass
diff --git a/swh/provenance/provenance.py b/swh/provenance/provenance.py
index e82f528..5b607ac 100644
--- a/swh/provenance/provenance.py
+++ b/swh/provenance/provenance.py
@@ -1,511 +1,600 @@
+import itertools
import logging
import os
import psycopg2
import psycopg2.extras
+from .archive import ArchiveInterface
from .db_utils import connect, execute_sql
from .model import DirectoryEntry, FileEntry, Tree
from .origin import OriginEntry
from .revision import RevisionEntry
from datetime import datetime
from pathlib import PosixPath
+from typing import Dict, List
from swh.model.hashutil import hash_to_hex
-from swh.storage.interface import StorageInterface
def normalize(path: PosixPath) -> PosixPath:
spath = str(path)
if spath.startswith('./'):
return PosixPath(spath[2:])
return path
def create_database(
conn: psycopg2.extensions.connection,
conninfo: dict,
name: str
):
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
# Create new database dropping previous one if exists
cursor = conn.cursor();
cursor.execute(f'''DROP DATABASE IF EXISTS {name}''')
cursor.execute(f'''CREATE DATABASE {name}''');
conn.close()
# Reconnect to server selecting newly created database to add tables
conninfo['database'] = name
conn = connect(conninfo)
sqldir = os.path.dirname(os.path.realpath(__file__))
execute_sql(conn, os.path.join(sqldir, 'db/provenance.sql'))
################################################################################
################################################################################
################################################################################
-def content_add_to_dir(
- cursor: psycopg2.extensions.cursor,
- cache: dict,
- directory: DirectoryEntry,
- blob: FileEntry,
- prefix: PosixPath
-):
- logging.debug(f'NEW occurrence of content {hash_to_hex(blob.id)} in directory {hash_to_hex(directory.id)} (path: {prefix / blob.name})')
- # cursor.execute('INSERT INTO content_in_dir VALUES (%s,%s,%s)',
- # (blob.id, directory.id, bytes(normalize(prefix / blob.name))))
- cache['content_in_dir'].append(
- (blob.id, directory.id, bytes(normalize(prefix / blob.name)))
- )
+class ProvenanceInterface:
+ # TODO: turn this into a real interface and move PostgreSQL implementation
+ # to a separate file
+ def __init__(self, conn: psycopg2.extensions.connection):
+ # TODO: consider addind a mutex for thread safety
+ self.conn = conn
+ self.cursor = self.conn.cursor()
+ self.insert_cache = None
+ self.select_cache = None
+ self.clear_caches()
+
+
+ def clear_caches(self):
+ self.insert_cache = {
+ "content": dict(),
+ "content_early_in_rev": list(),
+ "content_in_dir": list(),
+ "directory": dict(),
+ "directory_in_rev": list(),
+ "revision": dict()
+ }
+ self.select_cache = {
+ "content": dict(),
+ "directory": dict(),
+ "revision": dict()
+ }
+
+
+ def commit(self):
+ result = False
+ try:
+ self.insert_all()
+ self.conn.commit()
+ result = True
+
+ except psycopg2.DatabaseError:
+ # Database error occurred, rollback all changes
+ self.conn.rollback()
+ # TODO: maybe serialize and auto-merge transations.
+ # The only conflicts are on:
+ # - content: we keep the earliest date
+ # - directory: we keep the earliest date
+ # - content_in_dir: there should be just duplicated entries.
+
+ except Exception as error:
+ # Unexpected error occurred, rollback all changes and log message
+ logging.warning(f'Unexpected error: {error}')
+ self.conn.rollback()
+
+ finally:
+ self.clear_caches()
+
+ return result
+
+
+ def content_add_to_directory(
+ self,
+ directory: DirectoryEntry,
+ blob: FileEntry,
+ prefix: PosixPath
+ ):
+ # logging.debug(f'NEW occurrence of content {hash_to_hex(blob.id)} in directory {hash_to_hex(directory.id)} (path: {prefix / blob.name})')
+ # self.cursor.execute('''INSERT INTO content_in_dir VALUES (%s,%s,%s)''',
+ # (blob.id, directory.id, bytes(normalize(prefix / blob.name))))
+ self.insert_cache['content_in_dir'].append(
+ (blob.id, directory.id, bytes(normalize(prefix / blob.name)))
+ )
+
+
+ def content_add_to_revision(
+ self,
+ revision: RevisionEntry,
+ blob: FileEntry,
+ prefix: PosixPath
+ ):
+ # logging.debug(f'EARLY occurrence of blob {hash_to_hex(blob.id)} in revision {hash_to_hex(revision.id)} (path: {prefix / blob.name})')
+ # self.cursor.execute('''INSERT INTO content_early_in_rev VALUES (%s,%s,%s)''',
+ # (blob.id, revision.id, bytes(normalize(prefix / blob.name))))
+ self.insert_cache['content_early_in_rev'].append(
+ (blob.id, revision.id, bytes(normalize(prefix / blob.name)))
+ )
+
+
+ def content_find_first(self, blobid: str):
+ logging.info(f'Retrieving first occurrence of content {hash_to_hex(blobid)}')
+ self.cursor.execute('''SELECT blob, rev, date, path
+ FROM content_early_in_rev JOIN revision ON revision.id=content_early_in_rev.rev
+ WHERE content_early_in_rev.blob=%s ORDER BY date, rev, path ASC LIMIT 1''', (blobid,))
+ return self.cursor.fetchone()
+
+
+ def content_find_all(self, blobid: str):
+ logging.info(f'Retrieving all occurrences of content {hash_to_hex(blobid)}')
+ self.cursor.execute('''(SELECT blob, rev, date, path
+ FROM content_early_in_rev JOIN revision ON revision.id=content_early_in_rev.rev
+ WHERE content_early_in_rev.blob=%s)
+ UNION
+ (SELECT content_in_rev.blob, content_in_rev.rev, revision.date, content_in_rev.path
+ FROM (SELECT content_in_dir.blob, directory_in_rev.rev,
+ CASE directory_in_rev.path
+ WHEN '.' THEN content_in_dir.path
+ ELSE (directory_in_rev.path || '/' || content_in_dir.path)::unix_path
+ END AS path
+ FROM content_in_dir
+ JOIN directory_in_rev ON content_in_dir.dir=directory_in_rev.dir
+ WHERE content_in_dir.blob=%s) AS content_in_rev
+ JOIN revision ON revision.id=content_in_rev.rev)
+ ORDER BY date, rev, path''', (blobid, blobid))
+ # POSTGRESQL EXPLAIN
+ yield from self.cursor.fetchall()
+
+
+ # def content_get_early_date(self, blob: FileEntry) -> datetime:
+ # logging.debug(f'Getting content {hash_to_hex(blob.id)} early date')
+ # # First check if the date is being modified by current transection.
+ # date = self.insert_cache['content'].get(blob.id, None)
+ # if date is None:
+ # # If not, check whether it's been query before
+ # date = self.select_cache['content'].get(blob.id, None)
+ # if date is None:
+ # # Otherwise, query the database and cache the value
+ # self.cursor.execute('''SELECT date FROM content WHERE id=%s''',
+ # (blob.id,))
+ # row = self.cursor.fetchone()
+ # date = row[0] if row is not None else None
+ # self.select_cache['content'][blob.id] = date
+ # return date
+
+
+ def content_get_early_dates(self, blobs: List[FileEntry]) -> Dict[bytes, datetime]:
+ dates = {}
+ pending = []
+ for blob in blobs:
+ # First check if the date is being modified by current transection.
+ date = self.insert_cache['content'].get(blob.id, None)
+ if date is not None:
+ dates[blob.id] = date
+ else:
+ # If not, check whether it's been query before
+ date = self.select_cache['content'].get(blob.id, None)
+ if date is not None:
+ dates[blob.id] = date
+ else:
+ pending.append(blob.id)
+ if pending:
+ # Otherwise, query the database and cache the values
+ values = ', '.join(itertools.repeat('%s', len(pending)))
+ self.cursor.execute(f'''SELECT id, date FROM content WHERE id IN ({values})''',
+ tuple(pending))
+ for row in self.cursor.fetchall():
+ dates[row[0]] = row[1]
+ self.select_cache['content'][row[0]] = row[1]
+ return dates
+
+
+ def content_set_early_date(self, blob: FileEntry, date: datetime):
+ # logging.debug(f'EARLY occurrence of blob {hash_to_hex(blob.id)} (timestamp: {date})')
+ # self.cursor.execute('''INSERT INTO content VALUES (%s,%s)
+ # ON CONFLICT (id) DO UPDATE SET date=%s''',
+ # (blob.id, date, date))
+ self.insert_cache['content'][blob.id] = date
+
+
+ def directory_add_to_revision(
+ self,
+ revision: RevisionEntry,
+ directory: DirectoryEntry,
+ path: PosixPath
+ ):
+ # logging.debug(f'NEW occurrence of directory {hash_to_hex(directory.id)} on the ISOCHRONE FRONTIER of revision {hash_to_hex(revision.id)} (path: {path})')
+ # self.cursor.execute('''INSERT INTO directory_in_rev VALUES (%s,%s,%s)''',
+ # (directory.id, revision.id, bytes(normalize(path))))
+ self.insert_cache['directory_in_rev'].append(
+ (directory.id, revision.id, bytes(normalize(path)))
+ )
+
+
+ def directory_date_in_isochrone_frontier(self, directory: DirectoryEntry) -> datetime:
+ # logging.debug(f'Getting directory {hash_to_hex(directory.id)} early date')
+ # First check if the date is being modified by current transection.
+ date = self.insert_cache['directory'].get(directory.id, None)
+ if date is None:
+ # If not, check whether it's been query before
+ date = self.select_cache['directory'].get(directory.id, None)
+ if date is None:
+ # Otherwise, query the database and cache the value
+ self.cursor.execute('''SELECT date FROM directory WHERE id=%s''',
+ (directory.id,))
+ row = self.cursor.fetchone()
+ date = row[0] if row is not None else None
+ self.select_cache['directory'][directory.id] = date
+ return date
+
+
+ def directory_get_early_dates(self, dirs: List[DirectoryEntry]) -> Dict[bytes, datetime]:
+ dates = {}
+ pending = []
+ for dir in dirs:
+ # First check if the date is being modified by current transection.
+ date = self.insert_cache['directory'].get(dir.id, None)
+ if date is not None:
+ dates[dir.id] = date
+ else:
+ # If not, check whether it's been query before
+ date = self.select_cache['directory'].get(dir.id, None)
+ if date is not None:
+ dates[dir.id] = date
+ else:
+ pending.append(dir.id)
+ if pending:
+ # Otherwise, query the database and cache the values
+ values = ', '.join(itertools.repeat('%s', len(pending)))
+ self.cursor.execute(f'''SELECT id, date FROM directory WHERE id IN ({values})''',
+ tuple(pending))
+ for row in self.cursor.fetchall():
+ dates[row[0]] = row[1]
+ self.select_cache['directory'][row[0]] = row[1]
+ return dates
+
+
+ def directory_add_to_isochrone_frontier(self, directory: DirectoryEntry,date: datetime):
+ # logging.debug(f'EARLY occurrence of directory {hash_to_hex(directory.id)} on the ISOCHRONE FRONTIER (timestamp: {date})')
+ # self.cursor.execute('''INSERT INTO directory VALUES (%s,%s)
+ # ON CONFLICT (id) DO UPDATE SET date=%s''',
+ # (directory.id, date, date))
+ self.insert_cache['directory'][directory.id] = date
+
+
+ def insert_all(self):
+ # Performe insertions with cached information
+ psycopg2.extras.execute_values(
+ self.cursor,
+ '''INSERT INTO content(id, date) VALUES %s
+ ON CONFLICT (id) DO UPDATE SET date=excluded.date''', # TODO: keep earliest date on conflict
+ self.insert_cache['content'].items()
+ )
+
+ psycopg2.extras.execute_values(
+ self.cursor,
+ '''INSERT INTO content_early_in_rev VALUES %s
+ ON CONFLICT DO NOTHING''',
+ self.insert_cache['content_early_in_rev']
+ )
+
+ psycopg2.extras.execute_values(
+ self.cursor,
+ '''INSERT INTO content_in_dir VALUES %s
+ ON CONFLICT DO NOTHING''',
+ self.insert_cache['content_in_dir']
+ )
+
+ psycopg2.extras.execute_values(
+ self.cursor,
+ '''INSERT INTO directory(id, date) VALUES %s
+ ON CONFLICT (id) DO UPDATE SET date=excluded.date''', # TODO: keep earliest date on conflict
+ self.insert_cache['directory'].items()
+ )
+
+ psycopg2.extras.execute_values(
+ self.cursor,
+ '''INSERT INTO directory_in_rev VALUES %s
+ ON CONFLICT DO NOTHING''',
+ self.insert_cache['directory_in_rev']
+ )
+
+ psycopg2.extras.execute_values(
+ self.cursor,
+ '''INSERT INTO revision(id, date) VALUES %s
+ ON CONFLICT (id) DO UPDATE SET date=excluded.date''', # TODO: keep earliest date on conflict
+ self.insert_cache['revision'].items()
+ )
+
+
+ def origin_get_id(self, origin: OriginEntry) -> int:
+ if origin.id is None:
+ # Check if current origin is already known and retrieve its internal id.
+ self.cursor.execute('''SELECT id FROM origin WHERE url=%s''', (origin.url,))
+ row = self.cursor.fetchone()
+
+ if row is None:
+ # If the origin is seen for the first time, current revision is
+ # the prefered one.
+ self.cursor.execute('''INSERT INTO origin (url) VALUES (%s) RETURNING id''',
+ (origin.url,))
+ return self.cursor.fetchone()[0]
+ else:
+ return row[0]
+ else:
+ return origin.id
-def content_add_to_rev(
- cursor: psycopg2.extensions.cursor,
- cache: dict,
- revision: RevisionEntry,
- blob: FileEntry,
- prefix: PosixPath
-):
- logging.debug(f'EARLY occurrence of blob {hash_to_hex(blob.id)} in revision {hash_to_hex(revision.id)} (path: {prefix / blob.name})')
- # cursor.execute('INSERT INTO content_early_in_rev VALUES (%s,%s,%s)',
- # (blob.id, revision.id, bytes(normalize(prefix / blob.name))))
- cache['content_early_in_rev'].append(
- (blob.id, revision.id, bytes(normalize(prefix / blob.name)))
- )
+ def revision_add(self, revision: RevisionEntry):
+ # Add current revision to the compact DB
+ self.insert_cache['revision'][revision.id] = revision.date
-def content_find_first(
- cursor: psycopg2.extensions.cursor,
- blobid: str
-):
- logging.info(f'Retrieving first occurrence of content {hash_to_hex(blobid)}')
- cursor.execute('''SELECT blob, rev, date, path
- FROM content_early_in_rev JOIN revision ON revision.id=content_early_in_rev.rev
- WHERE content_early_in_rev.blob=%s ORDER BY date, rev, path ASC LIMIT 1''', (blobid,))
- return cursor.fetchone()
+ def revision_add_before_revision(self, relative: RevisionEntry, revision: RevisionEntry):
+ self.cursor.execute('''INSERT INTO revision_before_rev VALUES (%s,%s)''',
+ (revision.id, relative.id))
-def content_find_all(
- cursor: psycopg2.extensions.cursor,
- blobid: str
-):
- logging.info(f'Retrieving all occurrences of content {hash_to_hex(blobid)}')
- cursor.execute('''(SELECT blob, rev, date, path
- FROM content_early_in_rev JOIN revision ON revision.id=content_early_in_rev.rev
- WHERE content_early_in_rev.blob=%s)
- UNION
- (SELECT content_in_rev.blob, content_in_rev.rev, revision.date, content_in_rev.path
- FROM (SELECT content_in_dir.blob, directory_in_rev.rev,
- CASE directory_in_rev.path
- WHEN '.' THEN content_in_dir.path
- ELSE (directory_in_rev.path || '/' || content_in_dir.path)::unix_path
- END AS path
- FROM content_in_dir
- JOIN directory_in_rev ON content_in_dir.dir=directory_in_rev.dir
- WHERE content_in_dir.blob=%s) AS content_in_rev
- JOIN revision ON revision.id=content_in_rev.rev)
- ORDER BY date, rev, path''', (blobid, blobid))
- # POSTGRESQL EXPLAIN
- yield from cursor.fetchall()
-
-
-def content_get_early_date(
- cursor: psycopg2.extensions.cursor,
- cache: dict,
- blob: FileEntry
-) -> datetime:
- logging.debug(f'Getting content {hash_to_hex(blob.id)} early date')
- if blob.id in cache['content'].keys():
- return cache['content'][blob.id]
- else:
- cursor.execute('SELECT date FROM content WHERE id=%s',
- (blob.id,))
- row = cursor.fetchone()
- return row[0] if row is not None else None
-
-
-def content_set_early_date(
- cursor: psycopg2.extensions.cursor,
- cache: dict,
- blob: FileEntry,
- date: datetime
-):
- logging.debug(f'EARLY occurrence of blob {hash_to_hex(blob.id)} (timestamp: {date})')
- # cursor.execute('''INSERT INTO content VALUES (%s,%s)
- # ON CONFLICT (id) DO UPDATE SET date=%s''',
- # (blob.id, date, date))
- cache['content'][blob.id] = date
+ def revision_add_to_origin(self, origin: OriginEntry, revision: RevisionEntry):
+ self.cursor.execute('''INSERT INTO revision_in_org VALUES (%s,%s)
+ ON CONFLICT DO NOTHING''',
+ (revision.id, origin.id))
-def directory_add_to_rev(
- cursor: psycopg2.extensions.cursor,
- cache: dict,
- revision: RevisionEntry,
- directory: DirectoryEntry,
- path: PosixPath
-):
- logging.debug(f'NEW occurrence of directory {hash_to_hex(directory.id)} on the ISOCHRONE FRONTIER of revision {hash_to_hex(revision.id)} (path: {path})')
- # cursor.execute('INSERT INTO directory_in_rev VALUES (%s,%s,%s)',
- # (directory.id, revision.id, bytes(normalize(path))))
- cache['directory_in_rev'].append(
- (directory.id, revision.id, bytes(normalize(path)))
- )
+ def revision_get_early_date(self, revision: RevisionEntry) -> datetime:
+ # logging.debug(f'Getting revision {hash_to_hex(revision.id)} early date')
+ # First check if the date is being modified by current transection.
+ date = self.insert_cache['revision'].get(revision.id, None)
+ if date is None:
+ # If not, check whether it's been query before
+ date = self.select_cache['revision'].get(revision.id, None)
+ if date is None:
+ # Otherwise, query the database and cache the value
+ self.cursor.execute('''SELECT date FROM revision WHERE id=%s''',
+ (revision.id,))
+ row = self.cursor.fetchone()
+ date = row[0] if row is not None else None
+ self.select_cache['revision'][revision.id] = date
+ return date
-def directory_get_early_date(
- cursor: psycopg2.extensions.cursor,
- cache: dict,
- directory: DirectoryEntry
-) -> datetime:
- logging.debug(f'Getting directory {hash_to_hex(directory.id)} early date')
- if directory.id in cache['directory'].keys():
- return cache['directory'][directory.id]
- else:
- cursor.execute('SELECT date FROM directory WHERE id=%s',
- (directory.id,))
- row = cursor.fetchone()
- return row[0] if row is not None else None
+ def revision_get_prefered_origin(self, revision: RevisionEntry) -> int:
+ self.cursor.execute('''SELECT COALESCE(org,0) FROM revision WHERE id=%s''',
+ (revision.id,))
+ row = self.cursor.fetchone()
+ # None means revision is not in database
+ # 0 means revision has no prefered origin
+ return row[0] if row is not None and row[0] != 0 else None
+
+
+ def revision_in_history(self, revision: RevisionEntry) -> bool:
+ self.cursor.execute('''SELECT 1 FROM revision_before_rev WHERE prev=%s''',
+ (revision.id,))
+ return self.cursor.fetchone() is not None
+
+
+ def revision_set_prefered_origin(self, origin: OriginEntry, revision: RevisionEntry):
+ self.cursor.execute('''UPDATE revision SET org=%s WHERE id=%s''',
+ (origin.id, revision.id))
+
+
+ def revision_visited(self, revision: RevisionEntry) -> bool:
+ self.cursor.execute('''SELECT 1 FROM revision_in_org WHERE rev=%s''',
+ (revision.id,))
+ return self.cursor.fetchone() is not None
+################################################################################
+################################################################################
+################################################################################
+
def directory_process_content(
- cursor: psycopg2.extensions.cursor,
- cache: dict,
+ provenance: ProvenanceInterface,
directory: DirectoryEntry,
relative: DirectoryEntry,
prefix: PosixPath
):
stack = [(directory, prefix)]
while stack:
dir, path = stack.pop()
for child in iter(dir):
if isinstance(child, FileEntry):
# Add content to the relative directory with the computed path.
- content_add_to_dir(cursor, cache, relative, child, path)
+ provenance.content_add_to_directory(relative, child, path)
else:
# Recursively walk the child directory.
- # directory_process_content(cursor, child, relative, path / child.name)
stack.append((child, path / child.name))
-def directory_set_early_date(
- cursor: psycopg2.extensions.cursor,
- cache: dict,
- directory: DirectoryEntry,
- date: datetime
-):
- logging.debug(f'EARLY occurrence of directory {hash_to_hex(directory.id)} on the ISOCHRONE FRONTIER (timestamp: {date})')
- # cursor.execute('''INSERT INTO directory VALUES (%s,%s)
- # ON CONFLICT (id) DO UPDATE SET date=%s''',
- # (directory.id, date, date))
- cache['directory'][directory.id] = date
-
-
def origin_add(
- conn: psycopg2.extensions.connection,
- storage: StorageInterface,
+ provenance: ProvenanceInterface,
origin: OriginEntry
):
- with conn.cursor() as cursor:
- origin.id = origin_get_id(cursor, origin)
+ origin.id = provenance.origin_get_id(cursor, origin)
- for revision in origin.revisions:
- logging.info(f'Processing revision {hash_to_hex(revision.id)} from origin {origin.url}')
- origin_add_revision(cursor, storage, origin, revision)
+ for revision in origin.revisions:
+ # logging.info(f'Processing revision {hash_to_hex(revision.id)} from origin {origin.url}')
+ origin_add_revision(provenance, origin, revision)
- # Commit after each revision
- conn.commit()
+ # Commit after each revision
+ provenance.commit() # TODO: verify this!
def origin_add_revision(
- cursor: psycopg2.extensions.cursor,
- storage: StorageInterface,
+ provenance: ProvenanceInterface,
origin: OriginEntry,
revision: RevisionEntry
):
stack = [(None, revision)]
while stack:
relative, rev = stack.pop()
# Check if current revision has no prefered origin and update if necessary.
- prefered = revision_get_prefered_org(cursor, rev)
- logging.debug(f'Prefered origin for revision {hash_to_hex(rev.id)}: {prefered}')
+ prefered = provenance.revision_get_prefered_origin(rev)
+ # logging.debug(f'Prefered origin for revision {hash_to_hex(rev.id)}: {prefered}')
if prefered is None:
- revision_set_prefered_org(cursor, origin, rev)
+ provenance.revision_set_prefered_origin(origin, rev)
########################################################################
if relative is None:
# This revision is pointed directly by the origin.
- visited = revision_visited(cursor, rev)
+ visited = provenance.revision_visited(rev)
logging.debug(f'Revision {hash_to_hex(rev.id)} in origin {origin.id}: {visited}')
logging.debug(f'Adding revision {hash_to_hex(rev.id)} to origin {origin.id}')
- revision_add_to_org(cursor, origin, rev)
+ provenance.revision_add_to_origin(origin, rev)
if not visited:
- # revision_walk_history(cursor, origin, rev.id, rev)
stack.append((rev, rev))
else:
# This revision is a parent of another one in the history of the
# relative revision.
for parent in iter(rev):
- visited = revision_visited(cursor, parent)
+ visited = provenance.revision_visited(parent)
logging.debug(f'Parent {hash_to_hex(parent.id)} in some origin: {visited}')
if not visited:
# The parent revision has never been seen before pointing
# directly to an origin.
- known = revision_in_history(cursor, parent)
+ known = provenance.revision_in_history(parent)
logging.debug(f'Revision {hash_to_hex(parent.id)} before revision: {known}')
if known:
# The parent revision is already known in some other
# revision's history. We should point it directly to
# the origin and (eventually) walk its history.
logging.debug(f'Adding revision {hash_to_hex(parent.id)} directly to origin {origin.id}')
- # origin_add_revision(cursor, origin, parent)
stack.append((None, parent))
else:
# The parent revision was never seen before. We should
# walk its history and associate it with the same
# relative revision.
logging.debug(f'Adding parent revision {hash_to_hex(parent.id)} to revision {hash_to_hex(relative.id)}')
- revision_add_before_rev(cursor, relative, parent)
- # revision_walk_history(cursor, origin, relative, parent)
+ provenance.revision_add_before_revision(relative, parent)
stack.append((relative, parent))
else:
# The parent revision already points to an origin, so its
# history was properly processed before. We just need to
# make sure it points to the current origin as well.
logging.debug(f'Adding parent revision {hash_to_hex(parent.id)} to origin {origin.id}')
- revision_add_to_org(cursor, origin, parent)
-
-
-def origin_get_id(
- cursor: psycopg2.extensions.cursor,
- origin: OriginEntry
-) -> int:
- if origin.id is None:
- # Check if current origin is already known and retrieve its internal id.
- cursor.execute('''SELECT id FROM origin WHERE url=%s''', (origin.url,))
- row = cursor.fetchone()
-
- if row is None:
- # If the origin is seen for the first time, current revision is
- # the prefered one.
- cursor.execute('''INSERT INTO origin (url) VALUES (%s) RETURNING id''',
- (origin.url,))
- return cursor.fetchone()[0]
- else:
- return row[0]
- else:
- return origin.id
+ provenance.revision_add_to_origin(origin, parent)
def revision_add(
- conn: psycopg2.extensions.connection,
- storage: StorageInterface,
- revision: RevisionEntry
-):
- try:
- with conn.cursor() as cursor:
- # Processed content starting from the revision's root directory
- directory = Tree(storage, revision.root).root
- revision_process_dir(cursor, revision, directory)
-
- # Add current revision to the compact DB
- cursor.execute('INSERT INTO revision VALUES (%s,%s,NULL)',
- (revision.id, revision.date))
-
- # Commit changes (one transaction per revision)
- conn.commit()
- return True
-
- except psycopg2.DatabaseError:
- # Database error occurred, rollback all changes
- conn.rollback()
- # TODO: maybe serialize and auto-merge transations.
- # The only conflicts are on:
- # - content: we keep the earliest date
- # - directory: we keep the earliest date
- # - content_in_dir: there should be just duplicated entries.
- return False
-
- except Exception as error:
- # Unexpected error occurred, rollback all changes and log message
- logging.warning(f'Unexpected error: {error}')
- conn.rollback()
- return False
-
-
-def revision_add_before_rev(
- cursor: psycopg2.extensions.cursor,
- relative: RevisionEntry,
+ provenance: ProvenanceInterface,
+ archive: ArchiveInterface,
revision: RevisionEntry
):
- cursor.execute('''INSERT INTO revision_before_rev VALUES (%s,%s)''',
- (revision.id, relative.id))
-
-
-def revision_add_to_org(
- cursor: psycopg2.extensions.cursor,
- origin: OriginEntry,
- revision: RevisionEntry
-):
- cursor.execute('''INSERT INTO revision_in_org VALUES (%s,%s)
- ON CONFLICT DO NOTHING''',
- (revision.id, origin.id))
-
-
-def revision_get_prefered_org(
- cursor: psycopg2.extensions.cursor,
- revision: RevisionEntry
-) -> int:
- cursor.execute('''SELECT COALESCE(org,0) FROM revision WHERE id=%s''',
- (revision.id,))
- row = cursor.fetchone()
- # None means revision is not in database
- # 0 means revision has no prefered origin
- return row[0] if row is not None and row[0] != 0 else None
-
-
-def revision_in_history(
- cursor: psycopg2.extensions.cursor,
- revision: RevisionEntry
-) -> bool:
- cursor.execute('''SELECT 1 FROM revision_before_rev WHERE prev=%s''',
- (revision.id,))
- return cursor.fetchone() is not None
+ # Processed content starting from the revision's root directory
+ directory = Tree(archive, revision.root).root
+ date = provenance.revision_get_early_date(revision)
+ if date is None or revision.date < date:
+ provenance.revision_add(revision)
+ revision_process_content(provenance, revision, directory)
+ return provenance.commit()
-def revision_process_dir(
- cursor: psycopg2.extensions.cursor,
+def revision_process_content(
+ provenance: ProvenanceInterface,
revision: RevisionEntry,
directory: DirectoryEntry
):
- stack = [(directory, directory.name)]
- cache = {
- "content": dict(),
- "content_early_in_rev": list(),
- "content_in_dir": list(),
- "directory": dict(),
- "directory_in_rev": list()
- }
+ stack = [(directory, provenance.directory_date_in_isochrone_frontier(directory), directory.name)]
while stack:
- dir, path = stack.pop()
-
- date = directory_get_early_date(cursor, cache, dir)
+ dir, date, path = stack.pop()
if date is None:
# The directory has never been seen on the isochrone graph of a
# revision. Its children should be checked.
- children = []
- for child in iter(dir):
- if isinstance(child, FileEntry):
- children.append((child, content_get_early_date(cursor, cache, child)))
+ blobs = [child for child in iter(dir) if isinstance(child, FileEntry)]
+ dirs = [child for child in iter(dir) if isinstance(child, DirectoryEntry)]
+
+ blobdates = provenance.content_get_early_dates(blobs)
+ dirdates = provenance.directory_get_early_dates(dirs)
+
+ if blobs + dirs:
+ dates = list(blobdates.values()) + list(dirdates.values())
+
+ if len(dates) == len(blobs) + len(dirs) and max(dates) <= revision.date:
+ # The directory belongs to the isochrone frontier of the
+ # current revision, and this is the first time it appears
+ # as such.
+ provenance.directory_add_to_isochrone_frontier(dir, max(dates))
+ provenance.directory_add_to_revision(revision, dir, path)
+ directory_process_content(
+ provenance,
+ directory=dir,
+ relative=dir,
+ prefix=PosixPath('.')
+ )
+
else:
- children.append((child, directory_get_early_date(cursor, cache, child)))
- dates = [child[1] for child in children]
-
- if dates != [] and None not in dates and max(dates) <= revision.date:
- # The directory belongs to the isochrone frontier of the current
- # revision, and this is the first time it appears as such.
- directory_set_early_date(cursor, cache, dir, max(dates))
- directory_add_to_rev(cursor, cache, revision, dir, path)
- directory_process_content(cursor, cache, directory=dir, relative=dir, prefix=PosixPath('.'))
- else:
- # The directory is not on the isochrone frontier of the current
- # revision. Its child nodes should be analyzed.
- # revision_process_content(cursor, revision, dir, path)
- ################################################################
- for child, date in children:
- if isinstance(child, FileEntry):
+ # The directory is not on the isochrone frontier of the
+ # current revision. Its child nodes should be analyzed.
+ ############################################################
+ for child in blobs:
+ date = blobdates.get(child.id, None)
if date is None or revision.date < date:
- content_set_early_date(cursor, cache, child, revision.date)
- content_add_to_rev(cursor, cache, revision, child, path)
- else:
- # revision_process_dir(cursor, revision, child, path / child.name)
- stack.append((child, path / child.name))
- ################################################################
+ provenance.content_set_early_date(child, revision.date)
+ provenance.content_add_to_revision(revision, child, path)
+
+ for child in dirs:
+ date = dirdates.get(child.id, None)
+ stack.append((child, date, path / child.name))
+ ############################################################
elif revision.date < date:
- # The directory has already been seen on the isochrone frontier of a
- # revision, but current revision is earlier. Its children should be
- # updated.
- # revision_process_content(cursor, revision, dir, path)
+ # The directory has already been seen on the isochrone frontier of
+ # a revision, but current revision is earlier. Its children should
+ # be updated.
+ blobs = [child for child in iter(dir) if isinstance(child, FileEntry)]
+ dirs = [child for child in iter(dir) if isinstance(child, DirectoryEntry)]
+
+ blobdates = provenance.content_get_early_dates(blobs)
+ dirdates = provenance.directory_get_early_dates(dirs)
+
####################################################################
- for child in iter(dir):
- if isinstance(child, FileEntry):
- date = content_get_early_date(cursor, cache, child)
- if date is None or revision.date < date:
- content_set_early_date(cursor, cache, child, revision.date)
- content_add_to_rev(cursor, cache, revision, child, path)
- else:
- # revision_process_dir(cursor, revision, child, path / child.name)
- stack.append((child, path / child.name))
+ for child in blobs:
+ date = blobdates.get(child.id, None)
+ if date is None or revision.date < date:
+ provenance.content_set_early_date(child, revision.date)
+ provenance.content_add_to_revision(revision, child, path)
+
+ for child in dirs:
+ date = dirdates.get(child.id, None)
+ stack.append((child, date, path / child.name))
####################################################################
- directory_set_early_date(cursor, cache, dir, revision.date)
+
+ provenance.directory_add_to_isochrone_frontier(dir, revision.date)
else:
- # The directory has already been seen on the isochrone frontier of an
- # earlier revision. Just add it to the current revision.
- directory_add_to_rev(cursor, cache, revision, dir, path)
-
- # Performe insertions with cached information
- psycopg2.extras.execute_values(
- cursor,
- '''INSERT INTO content(id, date) VALUES %s
- ON CONFLICT (id) DO UPDATE SET date=excluded.date''',
- cache['content'].items()
- )
-
- psycopg2.extras.execute_values(
- cursor,
- '''INSERT INTO content_early_in_rev VALUES %s''',
- cache['content_early_in_rev']
- )
-
- psycopg2.extras.execute_values(
- cursor,
- '''INSERT INTO content_in_dir VALUES %s''',
- cache['content_in_dir']
- )
-
- psycopg2.extras.execute_values(
- cursor,
- '''INSERT INTO directory(id, date) VALUES %s
- ON CONFLICT (id) DO UPDATE SET date=excluded.date''',
- cache['directory'].items()
- )
-
- psycopg2.extras.execute_values(
- cursor,
- '''INSERT INTO directory_in_rev VALUES %s''',
- cache['directory_in_rev']
- )
-
-
-def revision_set_prefered_org(
- cursor: psycopg2.extensions.cursor,
- origin: OriginEntry,
- revision: RevisionEntry
-):
- cursor.execute('''UPDATE revision SET org=%s WHERE id=%s''',
- (origin.id, revision.id))
+ # The directory has already been seen on the isochrone frontier of
+ # an earlier revision. Just add it to the current revision.
+ provenance.directory_add_to_revision(revision, dir, path)
-def revision_visited(
- cursor: psycopg2.extensions.cursor,
- revision: RevisionEntry
-) -> bool:
- cursor.execute('''SELECT 1 FROM revision_in_org WHERE rev=%s''',
- (revision.id,))
- return cursor.fetchone() is not None
+def get_provenance(conninfo: dict) -> ProvenanceInterface:
+ # TODO: improve this methos to allow backend selection
+ conn = connect(conninfo)
+ return ProvenanceInterface(conn)
diff --git a/swh/provenance/revision.py b/swh/provenance/revision.py
index b33763a..0344a93 100644
--- a/swh/provenance/revision.py
+++ b/swh/provenance/revision.py
@@ -1,169 +1,168 @@
import logging
import threading
+from .archive import ArchiveInterface
from .db_utils import connect
from datetime import datetime
from swh.model.hashutil import hash_to_bytes, hash_to_hex
-from swh.storage.interface import StorageInterface
class RevisionEntry:
def __init__(
self,
- storage: StorageInterface,
+ archive: ArchiveInterface,
id: bytes,
date: datetime=None,
root: bytes=None,
parents: list=None
):
+ self.archive = archive
self.id = id
self.date = date
self.parents = parents
self.root = root
- self.storage = storage
def __iter__(self):
if self.parents is None:
self.parents = []
- for parent in self.storage.revision_get([self.id]):
+ for parent in self.archive.revision_get([self.id]):
if parent is not None:
self.parents.append(
RevisionEntry(
- self.storage,
+ self.archive,
parent.id,
- parents=[RevisionEntry(self.storage, id) for id in parent.parents]
+ parents=[RevisionEntry(self.archive, id) for id in parent.parents]
)
)
return iter(self.parents)
################################################################################
################################################################################
class RevisionIterator:
"""Iterator interface."""
def __iter__(self):
pass
def __next__(self):
pass
class FileRevisionIterator(RevisionIterator):
"""Iterator over revisions present in the given CSV file."""
- def __init__(self, filename: str, storage: StorageInterface, limit: int=None):
+ def __init__(self, filename: str, archive: ArchiveInterface, limit: int=None):
self.file = open(filename)
self.idx = 0
self.limit = limit
self.mutex = threading.Lock()
- self.storage = storage
+ self.archive = archive
def next(self):
self.mutex.acquire()
line = self.file.readline().strip()
if line and (self.limit is None or self.idx < self.limit):
self.idx = self.idx + 1
id, date, root = line.strip().split(',')
self.mutex.release()
return RevisionEntry(
- self.storage,
+ self.archive,
hash_to_bytes(id),
- date=datetime.strptime(date, '%Y-%m-%d %H:%M:%S%z'),
+ date=datetime.fromisoformat(date),
root=hash_to_bytes(root)
)
else:
self.mutex.release()
return None
# class ArchiveRevisionIterator(RevisionIterator):
# """Iterator over revisions present in the given database."""
#
# def __init__(self, conn, limit=None, chunksize=100):
# self.cur = conn.cursor()
# self.chunksize = chunksize
# self.records = []
# if limit is None:
# self.cur.execute('''SELECT id, date, committer_date, directory
# FROM revision''')
# else:
# self.cur.execute('''SELECT id, date, committer_date, directory
# FROM revision
# LIMIT %s''', (limit,))
# for row in self.cur.fetchmany(self.chunksize):
# record = self.make_record(row)
# if record is not None:
# self.records.append(record)
# self.mutex = threading.Lock()
#
# def __del__(self):
# self.cur.close()
#
# def next(self):
# self.mutex.acquire()
# if not self.records:
# self.records.clear()
# for row in self.cur.fetchmany(self.chunksize):
# record = self.make_record(row)
# if record is not None:
# self.records.append(record)
#
# if self.records:
# revision, *self.records = self.records
# self.mutex.release()
# return revision
# else:
# self.mutex.release()
# return None
#
# def make_record(self, row):
# # Only revision with author or commiter date are considered
# if row[1] is not None:
# # If the revision has author date, it takes precedence
# return RevisionEntry(row[0], row[1], row[3])
# elif row[2] is not None:
# # If not, we use the commiter date
# return RevisionEntry(row[0], row[2], row[3])
################################################################################
################################################################################
class RevisionWorker(threading.Thread):
def __init__(
self,
id: int,
conninfo: dict,
- storage: StorageInterface,
+ archive: ArchiveInterface,
revisions: RevisionIterator
):
+ from .provenance import get_provenance
+
super().__init__()
+ self.archive = archive
self.id = id
- self.conninfo = conninfo
+ self.provenance = get_provenance(conninfo)
self.revisions = revisions
- self.storage = storage
def run(self):
from .provenance import revision_add
- conn = connect(self.conninfo)
while True:
revision = self.revisions.next()
if revision is None: break
processed = False
while not processed:
logging.info(f'Thread {self.id} - Processing revision {hash_to_hex(revision.id)} (timestamp: {revision.date})')
- processed = revision_add(conn, self.storage, revision)
+ processed = revision_add(self.provenance, self.archive, revision)
if not processed:
logging.warning(f'Thread {self.id} - Failed to process revision {hash_to_hex(revision.id)} (timestamp: {revision.date})')
-
- conn.close()

File Metadata

Mime Type
text/x-diff
Expires
Fri, Jul 4, 12:19 PM (2 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3452560

Event Timeline