diff --git a/compact.py b/compact.py index c951e10..e0b71a2 100644 --- a/compact.py +++ b/compact.py @@ -1,383 +1,374 @@ import click -import io import logging import psycopg2 import sys import time import threading import utils from datetime import datetime from pathlib import PosixPath from iterator import ( RevisionEntry, RevisionIterator, ArchiveRevisionIterator, FileRevisionIterator ) from model import ( DirectoryEntry, FileEntry, TreeEntry, Tree ) from swh.model.identifiers import identifier_to_str -def create_tables(conn: psycopg2.extensions.cursor, filename: PosixPath='compact.sql'): - with io.open(filename) as file: - cur = conn.cursor() - cur.execute(file.read()) - cur.close() - conn.commit() - - def revision_add( cursor: psycopg2.extensions.cursor, archive: psycopg2.extensions.connection, revision: RevisionEntry, id : int ): logging.info(f'Thread {id} - Processing revision {identifier_to_str(revision.swhid)} (timestamp: {revision.timestamp})') # Processed content starting from the revision's root directory directory = Tree(archive, revision.directory).root revision_process_directory(cursor, revision, directory, directory.name) # Add current revision to the compact DB cursor.execute('INSERT INTO revision VALUES (%s,%s)', (revision.swhid, revision.timestamp)) def content_find_first( cursor: psycopg2.extensions.cursor, swhid: str ): logging.info(f'Retrieving first occurrence of content {identifier_to_str(swhid)}') cursor.execute('''SELECT blob, rev, date, path FROM content_early_in_rev JOIN revision ON revision.id=content_early_in_rev.rev WHERE content_early_in_rev.blob=%s ORDER BY date, rev, path ASC LIMIT 1''', (swhid,)) return cursor.fetchone() def content_find_all( cursor: psycopg2.extensions.cursor, swhid: str ): logging.info(f'Retrieving all occurrences of content {identifier_to_str(swhid)}') cursor.execute('''(SELECT blob, rev, date, path FROM content_early_in_rev JOIN revision ON revision.id=content_early_in_rev.rev WHERE content_early_in_rev.blob=%s) UNION (SELECT content_in_rev.blob, content_in_rev.rev, revision.date, content_in_rev.path FROM (SELECT content_in_dir.blob, directory_in_rev.rev, CASE directory_in_rev.path WHEN '.' THEN content_in_dir.path ELSE (directory_in_rev.path || '/' || content_in_dir.path)::unix_path END AS path FROM content_in_dir JOIN directory_in_rev ON content_in_dir.dir=directory_in_rev.dir WHERE content_in_dir.blob=%s) AS content_in_rev JOIN revision ON revision.id=content_in_rev.rev) ORDER BY date, rev, path''', (swhid, swhid)) yield from cursor.fetchall() ################################################################################ ################################################################################ ################################################################################ def normalize(path: PosixPath) -> PosixPath: spath = str(path) if spath.startswith('./'): return PosixPath(spath[2:]) return path def content_get_early_timestamp( cursor: psycopg2.extensions.cursor, blob: FileEntry, depth: int ): logging.debug(f'{" "*depth}Getting content {identifier_to_str(blob.swhid)} early timestamp') start = time.perf_counter_ns() cursor.execute('SELECT date FROM content WHERE id=%s', (blob.swhid,)) row = cursor.fetchone() stop = time.perf_counter_ns() logging.debug(f'{" "*depth} Time elapsed: {stop-start}ns') return row[0] if row is not None else None def content_set_early_timestamp( cursor: psycopg2.extensions.cursor, blob: FileEntry, timestamp: datetime, depth: int ): logging.debug(f'{" "*depth}EARLY occurrence of blob {identifier_to_str(blob.swhid)} (timestamp: {timestamp})') start = time.perf_counter_ns() cursor.execute('''INSERT INTO content VALUES (%s,%s) ON CONFLICT (id) DO UPDATE SET date=%s''', (blob.swhid, timestamp, timestamp)) stop = time.perf_counter_ns() logging.debug(f'{" "*depth} Time elapsed: {stop-start}ns') def content_add_to_directory( cursor: psycopg2.extensions.cursor, directory: DirectoryEntry, blob: FileEntry, prefix: PosixPath, depth: int ): logging.debug(f'{" "*depth}NEW occurrence of content {identifier_to_str(blob.swhid)} in directory {identifier_to_str(directory.swhid)} (path: {prefix / blob.name})') start = time.perf_counter_ns() cursor.execute('INSERT INTO content_in_dir VALUES (%s,%s,%s)', (blob.swhid, directory.swhid, bytes(normalize(prefix / blob.name)))) stop = time.perf_counter_ns() logging.debug(f'{" "*depth} Time elapsed: {stop-start}ns') def content_add_to_revision( cursor: psycopg2.extensions.cursor, revision: RevisionEntry, blob: FileEntry, prefix: PosixPath, depth: int ): logging.debug(f'{" "*depth}EARLY occurrence of blob {identifier_to_str(blob.swhid)} in revision {identifier_to_str(revision.swhid)} (path: {prefix / blob.name})') start = time.perf_counter_ns() cursor.execute('INSERT INTO content_early_in_rev VALUES (%s,%s,%s)', (blob.swhid, revision.swhid, bytes(normalize(prefix / blob.name)))) stop = time.perf_counter_ns() logging.debug(f'{" "*depth} Time elapsed: {stop-start}ns') def directory_get_early_timestamp( cursor: psycopg2.extensions.cursor, directory: DirectoryEntry, depth: int ): logging.debug(f'{" "*depth}Getting directory {identifier_to_str(directory.swhid)} early timestamp') start = time.perf_counter_ns() cursor.execute('SELECT date FROM directory WHERE id=%s', (directory.swhid,)) row = cursor.fetchone() stop = time.perf_counter_ns() logging.debug(f'{" "*depth} Time elapsed: {stop-start}ns') return row[0] if row is not None else None def directory_set_early_timestamp( cursor: psycopg2.extensions.cursor, directory: DirectoryEntry, timestamp: datetime, depth: int ): logging.debug(f'{" "*depth}EARLY occurrence of directory {identifier_to_str(directory.swhid)} on the ISOCHRONE FRONTIER (timestamp: {timestamp})') start = time.perf_counter_ns() cursor.execute('''INSERT INTO directory VALUES (%s,%s) ON CONFLICT (id) DO UPDATE SET date=%s''', (directory.swhid, timestamp, timestamp)) stop = time.perf_counter_ns() logging.debug(f'{" "*depth} Time elapsed: {stop-start}ns') def directory_add_to_revision( cursor: psycopg2.extensions.cursor, revision: RevisionEntry, directory: DirectoryEntry, path: PosixPath, depth: int ): logging.debug(f'{" "*depth}NEW occurrence of directory {identifier_to_str(directory.swhid)} on the ISOCHRONE FRONTIER of revision {identifier_to_str(revision.swhid)} (path: {path})') start = time.perf_counter_ns() cursor.execute('INSERT INTO directory_in_rev VALUES (%s,%s,%s)', (directory.swhid, revision.swhid, bytes(normalize(path)))) stop = time.perf_counter_ns() logging.debug(f'{" "*depth} Time elapsed: {stop-start}ns') def directory_process_content( cursor: psycopg2.extensions.cursor, directory: DirectoryEntry, relative: DirectoryEntry, prefix: PosixPath, depth: int ): for child in iter(directory): if isinstance(child, FileEntry): # Add content to the relative directory with the computed prefix. content_add_to_directory(cursor, relative, child, prefix, depth) else: # Recursively walk the child directory. directory_process_content(cursor, child, relative, prefix / child.name, depth) def revision_process_content( cursor: psycopg2.extensions.cursor, revision: RevisionEntry, directory: DirectoryEntry, path: PosixPath, depth: int ): for child in iter(directory): if isinstance(child, FileEntry): timestamp = content_get_early_timestamp(cursor, child, depth) if timestamp is None or revision.timestamp < timestamp: content_set_early_timestamp(cursor, child, revision.timestamp, depth) content_add_to_revision(cursor, revision, child, path, depth) else: revision_process_directory(cursor, revision, child, path / child.name, depth + 1) def revision_process_directory( cursor: psycopg2.extensions.cursor, revision: RevisionEntry, directory: DirectoryEntry, path: PosixPath, depth: int=0 ): timestamp = directory_get_early_timestamp(cursor, directory, depth) if timestamp is None: # The directory has never been seen on the isochrone graph of a # revision. Its children should be checked. timestamps = [] for child in iter(directory): if isinstance(child, FileEntry): timestamps.append(content_get_early_timestamp(cursor, child, depth)) else: timestamps.append(directory_get_early_timestamp(cursor, child, depth)) if timestamps != [] and None not in timestamps and max(timestamps) <= revision.timestamp: # The directory belongs to the isochrone frontier of the current # revision, and this is the first time it appears as such. directory_set_early_timestamp(cursor, directory, max(timestamps), depth) directory_add_to_revision(cursor, revision, directory, path, depth) directory_process_content(cursor, directory, directory, PosixPath('.'), depth) else: # The directory is not on the isochrone frontier of the current # revision. Its child nodes should be analyzed. revision_process_content(cursor, revision, directory, path, depth) elif revision.timestamp < timestamp: # The directory has already been seen on the isochrone frontier of a # revision, but current revision is earlier. Its children should be # updated. revision_process_content(cursor, revision, directory, path, depth) directory_set_early_timestamp(cursor, directory, revision.timestamp, depth) else: # The directory has already been seen on the isochrone frontier of an # earlier revision. Just add it to the current revision. directory_add_to_revision(cursor, revision, directory, path, depth) ################################################################################ ################################################################################ ################################################################################ class Worker(threading.Thread): def __init__( self, id : int, conf : str, database : str, archive : psycopg2.extensions.connection, revisions : RevisionIterator ): super().__init__() self.id = id self.conf = conf self.database = database self.archive = archive self.revisions = revisions def run(self): conn = utils.connect(self.conf, self.database) with conn.cursor() as cursor: while True: processed = False revision = self.revisions.next() if revision is None: break while not processed: try: revision_add(cursor, self.archive, revision, self.id) conn.commit() processed = True except: logging.warning(f'Thread {self.id} - Failed to process revision {identifier_to_str(revision.swhid)} (timestamp: {revision.timestamp})') conn.rollback() conn.close() @click.command() @click.argument('count', type=int) @click.option('-c', '--compact', nargs=2, required=True) @click.option('-a', '--archive', nargs=2) @click.option('-d', '--database', nargs=2) @click.option('-f', '--filename') @click.option('-l', '--limit', type=int) @click.option('-t', '--threads', type=int, default=1) def cli(count, compact, archive, database, filename, limit, threads): """Compact model utility.""" logging.basicConfig(level=logging.INFO) # logging.basicConfig(filename='compact.log', level=logging.DEBUG) click.echo(f'{count} {compact} {archive} {database} {filename} {limit}') if not database: database = None if not archive: archive = None reset = database is not None or filename is not None if reset and archive is None: logging.error('Error: -a option is compulsatory when -d or -f options are set') exit() comp_conn = utils.connect(compact[0], compact[1]) cursor = comp_conn.cursor() if reset: - create_tables(comp_conn) + utils.execute_sql(comp_conn, 'compact.sql') # Create tables dopping existing ones if database is not None: logging.info(f'Reconstructing compact model from {database} database (limit={limit})') data_conn = utils.connect(database[0], database[1]) revisions = ArchiveRevisionIterator(data_conn, limit=limit) else: logging.info(f'Reconstructing compact model from {filename} CSV file (limit={limit})') revisions = FileRevisionIterator(filename, limit=limit) arch_conn = utils.connect(archive[0], archive[1]) workers = [] for id in range(threads): worker = Worker(id, compact[0], compact[1], arch_conn, revisions) worker.start() workers.append(worker) for worker in workers: worker.join() arch_conn.close() if database is not None: data_conn.close() cursor.execute(f'SELECT DISTINCT id FROM content ORDER BY id LIMIT {count}') for idx, row in enumerate(cursor.fetchall()): swhid = row[0] print(f'Test blob {idx}: {identifier_to_str(swhid)}') fst = content_find_first(cursor, swhid) print(f' First occurrence:') print(f' {identifier_to_str(fst[0])}, {identifier_to_str(fst[1])}, {fst[2]}, {fst[3].decode("utf-8")}') print(f' All occurrences:') for row in content_find_all(cursor, swhid): print(f' {identifier_to_str(row[0])}, {identifier_to_str(row[1])}, {row[2]}, {row[3].decode("utf-8")}') print(f'========================================') comp_conn.close() diff --git a/model.py b/model.py index 115ff33..98ee429 100644 --- a/model.py +++ b/model.py @@ -1,61 +1,61 @@ import operator import os import psycopg2 from pathlib import PosixPath -from swh.storage.db import Db +from swh.storage.postgresql.db import Db CONTENT = "file" DIRECTORY = "dir" OTYPE_IDX = 1 PATH_IDX = 3 SWHID_IDX = 2 class Tree: def __init__(self, conn: psycopg2.extensions.connection, swhid: str): self.root = DirectoryEntry(conn, swhid, PosixPath('.')) class TreeEntry: def __init__(self, swhid: str, name: PosixPath): self.swhid = swhid self.name = name class DirectoryEntry(TreeEntry): def __init__( self, conn: psycopg2.extensions.connection, swhid: str, name: PosixPath ): super().__init__(swhid, name) self.conn = conn self.children = None def __iter__(self): if self.children is None: self.children = [] storage = Db(self.conn) for child in storage.directory_walk_one(self.swhid): if child[OTYPE_IDX] == CONTENT: self.children.append(FileEntry( child[SWHID_IDX], PosixPath(os.fsdecode(child[PATH_IDX])) )) elif child[OTYPE_IDX] == DIRECTORY: self.children.append(DirectoryEntry( self.conn, child[SWHID_IDX], PosixPath(os.fsdecode(child[PATH_IDX])) )) return iter(self.children) class FileEntry(TreeEntry): pass diff --git a/origins.py b/origins.py new file mode 100644 index 0000000..99e51ea --- /dev/null +++ b/origins.py @@ -0,0 +1,179 @@ +import psycopg2 +import utils + +import swh.storage +import swh.storage.algos.origin +import swh.storage.algos.snapshot +import swh.storage.interface + +# from swh.model.model import TargetType +from swh.model.identifiers import identifier_to_str + + +class VisitStatusIterator: + def __init__(self, storage: swh.storage.interface.StorageInterface): + self.storage = storage + + def __iter__(self): + yield from self.iterate() + + def iterate(self): + for idx, origin in enumerate(swh.storage.algos.origin.iter_origins(self.storage)): + if idx == 10: break + print(f'##################################################################') + print(f'{idx:03} -> {origin}') + + origin = origin.to_dict() + for visit in swh.storage.algos.origin.iter_origin_visits(self.storage, origin['url']): + print(f' +--> {visit}') + visit = visit.to_dict() + if 'visit' in visit: + for status in swh.storage.algos.origin.iter_origin_visit_statuses(self.storage, origin['url'], visit['visit']): + print(f' +--> {status}') + # TODO: may filter only those whose status is 'full'?? + yield status.to_dict() + + +def origin_add_revision( + cursor: psycopg2.extensions.cursor, + origin: int, # TODO: use OriginEntry structure + revision: dict # TODO: use RevisionEntry structure +): + cursor.execute('''SELECT 1 FROM revision_in_org WHERE rev=%s''', (revision['id'],)) + visited = cursor.fetchone() is not None + + cursor.execute('''INSERT INTO revision_in_org VALUES (%s, %s) + ON CONFLICT DO NOTHING''', + (revision['id'], origin)) + + if not visited: + revision_walk_history(cursor, origin, revision['id'], revision) + + +def revision_walk_history( + cursor: psycopg2.extensions.cursor, + origin: int, # TODO: use OriginEntry structure + relative: bytes, # TODO: use OriginEntry structure + revision: dict # TODO: use RevisionEntry structure +): + to_org = [] + to_rev = [] + + for parent in revision['parents']: + cursor.execute('''SELECT 1 FROM revision_in_org WHERE rev=%s''', (parent,)) + visited = cursor.fetchone() is not None + + if not visited: + # The parent revision has never been seen before pointing directly + # to an origin. + cursor.execute('''SELECT 1 FROM revision_before_rev WHERE prev=%s''', (parent,)) + known = cursor.fetchone() is not None + + if known: + # The parent revision is already known in some other revision's + # history. We should point it directly to the origin and + # (eventually) walk its history. + to_org.append(parent) + else: + # The parent revision was never seen before. We should walk its + # history and associate it with the same relative revision. + to_rev.append(parent) + + else: + # The parent revision already points to an origin, so its history + # was properly processed before. We just need to make sure it points + # to the current origin as well + cursor.execute('''INSERT INTO revision_in_org VALUES (%s,%s) + ON CONFLICT DO NOTHING''', (parent, origin)) + + for parent in storage.revision_get(to_org): + if parent is not None: + origin_add_revision(cursor, origin, parent.to_dict()) + + for parent in storage.revision_get(to_rev): + if parent is not None: + parent = parent.to_dict() + cursor.execute('''INSERT INTO revision_before_rev VALUES (%s,%s)''', + (parent['id'], relative)) + revision_walk_history(cursor, origin, relative, parent) + + +if __name__ == "__main__": + comp_conn = utils.connect('database.conf', 'compact') + cursor = comp_conn.cursor() + + utils.execute_sql(comp_conn, 'origins.sql') # Create tables dopping existing ones + + kwargs = { + "cls" : "remote", + "url" : "http://uffizi.internal.softwareheritage.org:5002" + } + storage = swh.storage.get_storage(**kwargs) + + for status in VisitStatusIterator(storage): + # Check if current origin is already known and retrieve its internal id. + cursor.execute('''SELECT id FROM origin WHERE url=%s''', (status['origin'],)) + row = cursor.fetchone() + origin = row[0] if row is not None else None + + revisions = [] + releases = [] + + snapshot = swh.storage.algos.snapshot.snapshot_get_all_branches(storage, status['snapshot']) + if snapshot is not None: + branches = snapshot.to_dict()['branches'] + for branch in branches: + print(f' +--> {branch} : {branches[branch]}') + target = branches[branch]['target'] + target_type = branches[branch]['target_type'] + + if target_type == 'revision': + revisions.append(target) + + elif target_type == 'release': + releases.append(target) + + print(f' ### RELEASES ###############################################') + # TODO: limit the size of this query! + for release in storage.release_get(releases): + print(f'** {release}') + + release = release.to_dict() + target = release['target'] + target_type = release['target_type'] + + if target_type == 'revision': + revisions.append(target) + + print(f' ############################################################') + print(list(map(identifier_to_str, revisions))) + + print(f' ### REVISIONS ##############################################') + # TODO: limit the size of this query! + for revision in storage.revision_get(revisions): + print(f'** {revision}') + if revision is not None: + revision = revision.to_dict() + + if origin is None: + # If the origin is seen for the first time, current revision is + # the prefered one. + cursor.execute('''INSERT INTO origin (url, rev) VALUES (%s,%s)''', + (status['origin'], revision['id'])) + + # Retrieve current origin's internal id (just generated). + cursor.execute('''SELECT id FROM origin WHERE url=%s''', (status['origin'],)) + origin = cursor.fetchone()[0] + + else: + # TODO: we should check whether current revision is prefered + # over the stored one to perform the update. + pass + # cursor.execute('''UPDATE origin SET rev=%s WHERE id=%s''', + # (revision['id'], origin)) + + origin_add_revision(cursor, origin, revision) + comp_conn.commit() + print(f' ############################################################') + + comp_conn.close() diff --git a/origins.sql b/origins.sql new file mode 100644 index 0000000..5cf9374 --- /dev/null +++ b/origins.sql @@ -0,0 +1,40 @@ +-- a Git object ID, i.e., a Git-style salted SHA1 checksum +drop domain if exists sha1_git cascade; +create domain sha1_git as bytea check (length(value) = 20); + + +drop table if exists origin; +create table origin +( + id bigserial primary key, -- id of the origin + url unix_path unique, -- url of the origin + rev sha1_git not null -- id of the prefered revision for the origin +); + +comment on column origin.id is 'Origin internal identifier'; +comment on column origin.url is 'URL of the origin'; +comment on column origin.rev is 'Prefered revision for the origin'; + + +drop table if exists revision_in_org; +create table revision_in_org +( + rev sha1_git not null, -- id of the revision poined by the origin + org bigint not null, -- id of the origin that points to the revision + primary key (rev, org) +); + +comment on column revision_in_org.rev is 'Revision identifier'; +comment on column revision_in_org.org is 'Origin identifier'; + + +drop table if exists revision_before_rev; +create table revision_before_rev +( + prev sha1_git not null, -- id of the source revision + next sha1_git not null, -- id of the destination revision + primary key (prev, next) +); + +comment on column revision_before_rev.prev is 'Source revision identifier'; +comment on column revision_before_rev.next is 'Destination revision identifier'; diff --git a/utils.py b/utils.py index 0e945d4..2ffa335 100644 --- a/utils.py +++ b/utils.py @@ -1,57 +1,66 @@ +import io import psycopg2 from configparser import ConfigParser from pathlib import PosixPath def config(filename: PosixPath, section: str): # create a parser parser = ConfigParser() # read config file parser.read(filename) # get section, default to postgresql db = {} if parser.has_section(section): params = parser.items(section) for param in params: db[param[0]] = param[1] else: raise Exception(f'Section {section} not found in the {filename} file') return db def typecast_bytea(value, cur): if value is not None: data = psycopg2.BINARY(value, cur) return data.tobytes() def adapt_conn(conn): """Makes psycopg2 use 'bytes' to decode bytea instead of 'memoryview', for this connection.""" t_bytes = psycopg2.extensions.new_type((17,), "bytea", typecast_bytea) psycopg2.extensions.register_type(t_bytes, conn) t_bytes_array = psycopg2.extensions.new_array_type((1001,), "bytea[]", t_bytes) psycopg2.extensions.register_type(t_bytes_array, conn) def connect(filename: PosixPath, section: str): """ Connect to the PostgreSQL database server """ conn = None try: # read connection parameters params = config(filename, section) # connect to the PostgreSQL server # print('Connecting to the PostgreSQL database...') conn = psycopg2.connect(**params) adapt_conn(conn) except (Exception, psycopg2.DatabaseError) as error: print(error) return conn + + +def execute_sql(conn: psycopg2.extensions.cursor, filename: PosixPath): + with io.open(filename) as file: + cur = conn.cursor() + cur.execute(file.read()) + cur.close() + conn.commit()