diff --git a/compact.py b/compact.py index dc71175..5730d35 100644 --- a/compact.py +++ b/compact.py @@ -1,374 +1,381 @@ import click import logging import psycopg2 import sys import time import threading import utils from datetime import datetime from pathlib import PosixPath from iterator import ( RevisionEntry, RevisionIterator, ArchiveRevisionIterator, FileRevisionIterator ) from model import ( DirectoryEntry, FileEntry, TreeEntry, Tree ) from swh.model.identifiers import identifier_to_str def revision_add( cursor: psycopg2.extensions.cursor, archive: psycopg2.extensions.connection, revision: RevisionEntry, id : int ): logging.info(f'Thread {id} - Processing revision {identifier_to_str(revision.swhid)} (timestamp: {revision.timestamp})') # Processed content starting from the revision's root directory directory = Tree(archive, revision.directory).root revision_process_directory(cursor, revision, directory, directory.name) # Add current revision to the compact DB cursor.execute('INSERT INTO revision VALUES (%s,%s, NULL)', (revision.swhid, revision.timestamp)) def content_find_first( cursor: psycopg2.extensions.cursor, swhid: str ): logging.info(f'Retrieving first occurrence of content {identifier_to_str(swhid)}') cursor.execute('''SELECT blob, rev, date, path FROM content_early_in_rev JOIN revision ON revision.id=content_early_in_rev.rev WHERE content_early_in_rev.blob=%s ORDER BY date, rev, path ASC LIMIT 1''', (swhid,)) return cursor.fetchone() def content_find_all( cursor: psycopg2.extensions.cursor, swhid: str ): logging.info(f'Retrieving all occurrences of content {identifier_to_str(swhid)}') cursor.execute('''(SELECT blob, rev, date, path FROM content_early_in_rev JOIN revision ON revision.id=content_early_in_rev.rev WHERE content_early_in_rev.blob=%s) UNION (SELECT content_in_rev.blob, content_in_rev.rev, revision.date, content_in_rev.path FROM (SELECT content_in_dir.blob, directory_in_rev.rev, CASE directory_in_rev.path WHEN '.' THEN content_in_dir.path ELSE (directory_in_rev.path || '/' || content_in_dir.path)::unix_path END AS path FROM content_in_dir JOIN directory_in_rev ON content_in_dir.dir=directory_in_rev.dir WHERE content_in_dir.blob=%s) AS content_in_rev JOIN revision ON revision.id=content_in_rev.rev) ORDER BY date, rev, path''', (swhid, swhid)) yield from cursor.fetchall() ################################################################################ ################################################################################ ################################################################################ def normalize(path: PosixPath) -> PosixPath: spath = str(path) if spath.startswith('./'): return PosixPath(spath[2:]) return path def content_get_early_timestamp( cursor: psycopg2.extensions.cursor, - blob: FileEntry, - depth: int + blob: FileEntry ): - logging.debug(f'{" "*depth}Getting content {identifier_to_str(blob.swhid)} early timestamp') + logging.debug(f'Getting content {identifier_to_str(blob.swhid)} early timestamp') start = time.perf_counter_ns() cursor.execute('SELECT date FROM content WHERE id=%s', (blob.swhid,)) row = cursor.fetchone() stop = time.perf_counter_ns() - logging.debug(f'{" "*depth} Time elapsed: {stop-start}ns') + logging.debug(f' Time elapsed: {stop-start}ns') return row[0] if row is not None else None def content_set_early_timestamp( cursor: psycopg2.extensions.cursor, blob: FileEntry, - timestamp: datetime, - depth: int + timestamp: datetime ): - logging.debug(f'{" "*depth}EARLY occurrence of blob {identifier_to_str(blob.swhid)} (timestamp: {timestamp})') + logging.debug(f'EARLY occurrence of blob {identifier_to_str(blob.swhid)} (timestamp: {timestamp})') start = time.perf_counter_ns() cursor.execute('''INSERT INTO content VALUES (%s,%s) ON CONFLICT (id) DO UPDATE SET date=%s''', (blob.swhid, timestamp, timestamp)) stop = time.perf_counter_ns() - logging.debug(f'{" "*depth} Time elapsed: {stop-start}ns') + logging.debug(f' Time elapsed: {stop-start}ns') def content_add_to_directory( cursor: psycopg2.extensions.cursor, directory: DirectoryEntry, blob: FileEntry, - prefix: PosixPath, - depth: int + prefix: PosixPath ): - logging.debug(f'{" "*depth}NEW occurrence of content {identifier_to_str(blob.swhid)} in directory {identifier_to_str(directory.swhid)} (path: {prefix / blob.name})') + logging.debug(f'NEW occurrence of content {identifier_to_str(blob.swhid)} in directory {identifier_to_str(directory.swhid)} (path: {prefix / blob.name})') start = time.perf_counter_ns() cursor.execute('INSERT INTO content_in_dir VALUES (%s,%s,%s)', (blob.swhid, directory.swhid, bytes(normalize(prefix / blob.name)))) stop = time.perf_counter_ns() - logging.debug(f'{" "*depth} Time elapsed: {stop-start}ns') + logging.debug(f' Time elapsed: {stop-start}ns') def content_add_to_revision( cursor: psycopg2.extensions.cursor, revision: RevisionEntry, blob: FileEntry, - prefix: PosixPath, - depth: int + prefix: PosixPath ): - logging.debug(f'{" "*depth}EARLY occurrence of blob {identifier_to_str(blob.swhid)} in revision {identifier_to_str(revision.swhid)} (path: {prefix / blob.name})') + logging.debug(f'EARLY occurrence of blob {identifier_to_str(blob.swhid)} in revision {identifier_to_str(revision.swhid)} (path: {prefix / blob.name})') start = time.perf_counter_ns() cursor.execute('INSERT INTO content_early_in_rev VALUES (%s,%s,%s)', (blob.swhid, revision.swhid, bytes(normalize(prefix / blob.name)))) stop = time.perf_counter_ns() - logging.debug(f'{" "*depth} Time elapsed: {stop-start}ns') + logging.debug(f' Time elapsed: {stop-start}ns') def directory_get_early_timestamp( cursor: psycopg2.extensions.cursor, - directory: DirectoryEntry, - depth: int + directory: DirectoryEntry ): - logging.debug(f'{" "*depth}Getting directory {identifier_to_str(directory.swhid)} early timestamp') + logging.debug(f'Getting directory {identifier_to_str(directory.swhid)} early timestamp') start = time.perf_counter_ns() cursor.execute('SELECT date FROM directory WHERE id=%s', (directory.swhid,)) row = cursor.fetchone() stop = time.perf_counter_ns() - logging.debug(f'{" "*depth} Time elapsed: {stop-start}ns') + logging.debug(f' Time elapsed: {stop-start}ns') return row[0] if row is not None else None def directory_set_early_timestamp( cursor: psycopg2.extensions.cursor, directory: DirectoryEntry, - timestamp: datetime, - depth: int + timestamp: datetime ): - logging.debug(f'{" "*depth}EARLY occurrence of directory {identifier_to_str(directory.swhid)} on the ISOCHRONE FRONTIER (timestamp: {timestamp})') + logging.debug(f'EARLY occurrence of directory {identifier_to_str(directory.swhid)} on the ISOCHRONE FRONTIER (timestamp: {timestamp})') start = time.perf_counter_ns() cursor.execute('''INSERT INTO directory VALUES (%s,%s) ON CONFLICT (id) DO UPDATE SET date=%s''', (directory.swhid, timestamp, timestamp)) stop = time.perf_counter_ns() - logging.debug(f'{" "*depth} Time elapsed: {stop-start}ns') + logging.debug(f' Time elapsed: {stop-start}ns') def directory_add_to_revision( cursor: psycopg2.extensions.cursor, revision: RevisionEntry, directory: DirectoryEntry, - path: PosixPath, - depth: int + path: PosixPath ): - logging.debug(f'{" "*depth}NEW occurrence of directory {identifier_to_str(directory.swhid)} on the ISOCHRONE FRONTIER of revision {identifier_to_str(revision.swhid)} (path: {path})') + logging.debug(f'NEW occurrence of directory {identifier_to_str(directory.swhid)} on the ISOCHRONE FRONTIER of revision {identifier_to_str(revision.swhid)} (path: {path})') start = time.perf_counter_ns() cursor.execute('INSERT INTO directory_in_rev VALUES (%s,%s,%s)', (directory.swhid, revision.swhid, bytes(normalize(path)))) stop = time.perf_counter_ns() - logging.debug(f'{" "*depth} Time elapsed: {stop-start}ns') + logging.debug(f' Time elapsed: {stop-start}ns') def directory_process_content( cursor: psycopg2.extensions.cursor, directory: DirectoryEntry, relative: DirectoryEntry, - prefix: PosixPath, - depth: int + prefix: PosixPath ): - for child in iter(directory): - if isinstance(child, FileEntry): - # Add content to the relative directory with the computed prefix. - content_add_to_directory(cursor, relative, child, prefix, depth) - else: - # Recursively walk the child directory. - directory_process_content(cursor, child, relative, prefix / child.name, depth) + stack = [(directory, relative, prefix)] + while stack: + directory, relative, prefix = stack.pop() -def revision_process_content( - cursor: psycopg2.extensions.cursor, - revision: RevisionEntry, - directory: DirectoryEntry, - path: PosixPath, - depth: int -): - for child in iter(directory): - if isinstance(child, FileEntry): - timestamp = content_get_early_timestamp(cursor, child, depth) - if timestamp is None or revision.timestamp < timestamp: - content_set_early_timestamp(cursor, child, revision.timestamp, depth) - content_add_to_revision(cursor, revision, child, path, depth) - else: - revision_process_directory(cursor, revision, child, path / child.name, depth + 1) + for child in iter(directory): + if isinstance(child, FileEntry): + # Add content to the relative directory with the computed prefix. + content_add_to_directory(cursor, relative, child, prefix) + else: + # Recursively walk the child directory. + # directory_process_content(cursor, child, relative, prefix / child.name) + stack.append((child, relative, prefix / child.name)) def revision_process_directory( cursor: psycopg2.extensions.cursor, revision: RevisionEntry, directory: DirectoryEntry, - path: PosixPath, - depth: int=0 + path: PosixPath ): - timestamp = directory_get_early_timestamp(cursor, directory, depth) - - if timestamp is None: - # The directory has never been seen on the isochrone graph of a - # revision. Its children should be checked. - timestamps = [] - for child in iter(directory): - if isinstance(child, FileEntry): - timestamps.append(content_get_early_timestamp(cursor, child, depth)) + stack = [(revision, directory, path)] + + while stack: + revision, directory, path = stack.pop() + + timestamp = directory_get_early_timestamp(cursor, directory) + + if timestamp is None: + # The directory has never been seen on the isochrone graph of a + # revision. Its children should be checked. + timestamps = [] + for child in iter(directory): + if isinstance(child, FileEntry): + timestamps.append(content_get_early_timestamp(cursor, child)) + else: + timestamps.append(directory_get_early_timestamp(cursor, child)) + + if timestamps != [] and None not in timestamps and max(timestamps) <= revision.timestamp: + # The directory belongs to the isochrone frontier of the current + # revision, and this is the first time it appears as such. + directory_set_early_timestamp(cursor, directory, max(timestamps)) + directory_add_to_revision(cursor, revision, directory, path) + directory_process_content(cursor, directory, directory, PosixPath('.')) else: - timestamps.append(directory_get_early_timestamp(cursor, child, depth)) - - if timestamps != [] and None not in timestamps and max(timestamps) <= revision.timestamp: - # The directory belongs to the isochrone frontier of the current - # revision, and this is the first time it appears as such. - directory_set_early_timestamp(cursor, directory, max(timestamps), depth) - directory_add_to_revision(cursor, revision, directory, path, depth) - directory_process_content(cursor, directory, directory, PosixPath('.'), depth) + # The directory is not on the isochrone frontier of the current + # revision. Its child nodes should be analyzed. + # revision_process_content(cursor, revision, directory, path) + ################################################################ + for child in iter(directory): + if isinstance(child, FileEntry): + timestamp = content_get_early_timestamp(cursor, child) + if timestamp is None or revision.timestamp < timestamp: + content_set_early_timestamp(cursor, child, revision.timestamp) + content_add_to_revision(cursor, revision, child, path) + else: + # revision_process_directory(cursor, revision, child, path / child.name) + stack.append((revision, child, path / child.name)) + ################################################################ + + elif revision.timestamp < timestamp: + # The directory has already been seen on the isochrone frontier of a + # revision, but current revision is earlier. Its children should be + # updated. + # revision_process_content(cursor, revision, directory, path) + #################################################################### + for child in iter(directory): + if isinstance(child, FileEntry): + timestamp = content_get_early_timestamp(cursor, child) + if timestamp is None or revision.timestamp < timestamp: + content_set_early_timestamp(cursor, child, revision.timestamp) + content_add_to_revision(cursor, revision, child, path) + else: + # revision_process_directory(cursor, revision, child, path / child.name) + stack.append((cursor, revision, child, path / child.name)) + #################################################################### + directory_set_early_timestamp(cursor, directory, revision.timestamp) + else: - # The directory is not on the isochrone frontier of the current - # revision. Its child nodes should be analyzed. - revision_process_content(cursor, revision, directory, path, depth) - - elif revision.timestamp < timestamp: - # The directory has already been seen on the isochrone frontier of a - # revision, but current revision is earlier. Its children should be - # updated. - revision_process_content(cursor, revision, directory, path, depth) - directory_set_early_timestamp(cursor, directory, revision.timestamp, depth) - - else: - # The directory has already been seen on the isochrone frontier of an - # earlier revision. Just add it to the current revision. - directory_add_to_revision(cursor, revision, directory, path, depth) + # The directory has already been seen on the isochrone frontier of an + # earlier revision. Just add it to the current revision. + directory_add_to_revision(cursor, revision, directory, path) ################################################################################ ################################################################################ ################################################################################ class Worker(threading.Thread): def __init__( self, id : int, conf : str, database : str, archive : psycopg2.extensions.connection, revisions : RevisionIterator ): super().__init__() self.id = id self.conf = conf self.database = database self.archive = archive self.revisions = revisions def run(self): conn = utils.connect(self.conf, self.database) with conn.cursor() as cursor: while True: processed = False revision = self.revisions.next() if revision is None: break while not processed: try: revision_add(cursor, self.archive, revision, self.id) conn.commit() processed = True except: logging.warning(f'Thread {self.id} - Failed to process revision {identifier_to_str(revision.swhid)} (timestamp: {revision.timestamp})') conn.rollback() conn.close() @click.command() @click.argument('count', type=int) @click.option('-c', '--compact', nargs=2, required=True) @click.option('-a', '--archive', nargs=2) @click.option('-d', '--database', nargs=2) @click.option('-f', '--filename') @click.option('-l', '--limit', type=int) @click.option('-t', '--threads', type=int, default=1) def cli(count, compact, archive, database, filename, limit, threads): """Compact model revision-content layer utility.""" logging.basicConfig(level=logging.INFO) # logging.basicConfig(filename='compact.log', level=logging.DEBUG) click.echo(f'{count} {compact} {archive} {database} {filename} {limit}') if not database: database = None if not archive: archive = None reset = database is not None or filename is not None if reset and archive is None: logging.error('Error: -a option is compulsatory when -d or -f options are set') exit() comp_conn = utils.connect(compact[0], compact[1]) cursor = comp_conn.cursor() if reset: utils.execute_sql(comp_conn, 'compact.sql') # Create tables dopping existing ones if database is not None: logging.info(f'Reconstructing compact model from {database} database (limit={limit})') data_conn = utils.connect(database[0], database[1]) revisions = ArchiveRevisionIterator(data_conn, limit=limit) else: logging.info(f'Reconstructing compact model from {filename} CSV file (limit={limit})') revisions = FileRevisionIterator(filename, limit=limit) arch_conn = utils.connect(archive[0], archive[1]) workers = [] for id in range(threads): worker = Worker(id, compact[0], compact[1], arch_conn, revisions) worker.start() workers.append(worker) for worker in workers: worker.join() arch_conn.close() if database is not None: data_conn.close() cursor.execute(f'SELECT DISTINCT id FROM content ORDER BY id LIMIT {count}') for idx, row in enumerate(cursor.fetchall()): swhid = row[0] print(f'Test blob {idx}: {identifier_to_str(swhid)}') fst = content_find_first(cursor, swhid) print(f' First occurrence:') print(f' {identifier_to_str(fst[0])}, {identifier_to_str(fst[1])}, {fst[2]}, {fst[3].decode("utf-8")}') print(f' All occurrences:') for row in content_find_all(cursor, swhid): print(f' {identifier_to_str(row[0])}, {identifier_to_str(row[1])}, {row[2]}, {row[3].decode("utf-8")}') print(f'========================================') comp_conn.close() diff --git a/origin.py b/origin.py index 3dc41a7..427770e 100644 --- a/origin.py +++ b/origin.py @@ -1,248 +1,248 @@ import logging import psycopg2 import utils import swh.storage import swh.storage.algos.origin import swh.storage.algos.snapshot import swh.storage.interface from swh.model.identifiers import identifier_to_str class OriginEntry: def __init__(self, url, revisions, id=None): self.id = id self.url = url self.revisions = revisions def __str__(self): # return f'{type(self).__name__}(id={self.id}, url={self.url}, revisions={list(map(str, self.revisions))})' return f'{type(self).__name__}(id={self.id}, url={self.url})' class RevisionEntry: def __init__(self, swhid, parents): self.swhid = swhid self.parents = parents def __str__(self): return f'{type(self).__name__}(swhid={identifier_to_str(self.swhid)}, parents={list(map(identifier_to_str, self.parents))})' class OriginIterator: def __init__(self, storage: swh.storage.interface.StorageInterface): self.storage = storage def __iter__(self): yield from self.iterate() def iterate(self): idx = 0 for origin in swh.storage.algos.origin.iter_origins(self.storage): # print(f'{idx:03} -> {origin}') origin = origin.to_dict() for visit in swh.storage.algos.origin.iter_origin_visits(self.storage, origin['url']): # print(f' +--> {visit}') visit = visit.to_dict() if 'visit' in visit: for status in swh.storage.algos.origin.iter_origin_visit_statuses(self.storage, origin['url'], visit['visit']): # print(f' +--> {status}') # TODO: may filter only those whose status is 'full'?? status = status.to_dict() targets = [] releases = [] snapshot = swh.storage.algos.snapshot.snapshot_get_all_branches(storage, status['snapshot']) if snapshot is not None: branches = snapshot.to_dict()['branches'] for branch in branches: # print(f' +--> {branch} : {branches[branch]}') target = branches[branch]['target'] target_type = branches[branch]['target_type'] if target_type == 'revision': targets.append(target) elif target_type == 'release': releases.append(target) # print(f' ############################################################') # print(list(map(identifier_to_str, releases))) # print(f' ### RELEASES ###############################################') # This is done to keep the query in release_get small, hence avoiding a timeout. limit = 100 for i in range(0, len(releases), limit): for release in storage.release_get(releases[i:i+limit]): if release is not None: # print(f'** {release}') release = release.to_dict() target = release['target'] target_type = release['target_type'] if target_type == 'revision': targets.append(target) # print(f' ############################################################') # print(list(map(identifier_to_str, targets))) # print(f' ### REVISIONS ##############################################') # This is done to keep the query in revision_get small, hence avoiding a timeout. revisions = [] limit = 100 for i in range(0, len(targets), limit): for revision in storage.revision_get(targets[i:i+limit]): if revision is not None: # print(f'** {revision}') revision = revision.to_dict() revisions.append(RevisionEntry(revision['id'], revision['parents'])) yield OriginEntry(status['origin'], revisions) idx = idx + 1 if idx == 100: return def origin_add_revision( cursor: psycopg2.extensions.cursor, origin: OriginEntry, revision: RevisionEntry ): - env = [(origin, None, revision)] + stack = [(origin, None, revision)] - while env: - origin, relative, revision = env.pop() + while stack: + origin, relative, revision = stack.pop() # Check if current revision has no prefered origin and update if necessary. cursor.execute('''SELECT COALESCE(org, 0) FROM revision WHERE id=%s''', (revision.swhid,)) row = cursor.fetchone() prefered = row[0] if row is not None else None print(f'Prefered origin for revision {identifier_to_str(revision.swhid)}: {prefered}') if prefered == 0: cursor.execute('''UPDATE TABLE revision SET org=%s WHERE id=%s''', (origin.id, revision.swhid)) ######################################################################## if relative is None: # This revision is pointed directly by the origin. logging.debug(f'Adding revision {identifier_to_str(revision.swhid)} to origin {origin.id}') cursor.execute('''SELECT 1 FROM revision_in_org WHERE rev=%s''', (revision.swhid,)) visited = cursor.fetchone() is not None print(f'Revision {identifier_to_str(revision.swhid)} in origin {origin.id}: {visited}') cursor.execute('''INSERT INTO revision_in_org VALUES (%s, %s) ON CONFLICT DO NOTHING''', (revision.swhid, origin.id)) if not visited: - # revision_walk_history(cursor, origin, revision.swhid, revision, depth) + # revision_walk_history(cursor, origin, revision.swhid, revision) env.append((origin, revision.swhid, revision)) else: # This revision a parent of another one in the history of the # relative revision. to_org = [] to_rev = [] for parent in revision.parents: cursor.execute('''SELECT 1 FROM revision_in_org WHERE rev=%s''', (parent,)) visited = cursor.fetchone() is not None print(f'Parent {identifier_to_str(parent)} in origin {origin.id}: {visited}') if not visited: # The parent revision has never been seen before pointing # directly to an origin. cursor.execute('''SELECT 1 FROM revision_before_rev WHERE prev=%s''', (parent,)) known = cursor.fetchone() is not None print(f'Revision {identifier_to_str(parent)} before revision: {visited}') if known: # The parent revision is already known in some other # revision's history. We should point it directly to # the origin and (eventually) walk its history. to_org.append(parent) else: # The parent revision was never seen before. We should # walk its history and associate it with the same # relative revision. to_rev.append(parent) else: # The parent revision already points to an origin, so its # history was properly processed before. We just need to # make sure it points to the current origin as well. logging.debug(f'Adding parent revision {identifier_to_str(parent)} to origin {origin.id}') cursor.execute('''INSERT INTO revision_in_org VALUES (%s,%s) ON CONFLICT DO NOTHING''', (parent, origin.id)) for parent in storage.revision_get(to_org): if parent is not None: parent = parent.to_dict() parent = RevisionEntry(parent['id'], parent['parents']) - # origin_add_revision(cursor, origin, parent, depth+1) - env.append((origin, None, parent)) + # origin_add_revision(cursor, origin, parent) + stack.append((origin, None, parent)) for parent in storage.revision_get(to_rev): if parent is not None: parent = parent.to_dict() parent = RevisionEntry(parent['id'], parent['parents']) logging.debug(f'Adding parent revision {identifier_to_str(parent.swhid)} to revision {identifier_to_str(relative)}') cursor.execute('''INSERT INTO revision_before_rev VALUES (%s,%s)''', (parent.swhid, relative)) - # revision_walk_history(cursor, origin, relative, parent, depth+1) - env.append((origin, relative, parent)) + # revision_walk_history(cursor, origin, relative, parent) + stack.append((origin, relative, parent)) if __name__ == "__main__": """Compact model origin-revision layer utility.""" # logging.basicConfig(level=logging.DEBUG) logging.basicConfig(filename='origin.log', level=logging.DEBUG) comp_conn = utils.connect('database.conf', 'compact') cursor = comp_conn.cursor() utils.execute_sql(comp_conn, 'origin.sql') # Create tables dopping existing ones kwargs = { "cls" : "remote", "url" : "http://uffizi.internal.softwareheritage.org:5002" } storage = swh.storage.get_storage(**kwargs) for origin in OriginIterator(storage): print(f'* {origin}') # Check if current origin is already known and retrieve its internal id. cursor.execute('''SELECT id FROM origin WHERE url=%s''', (origin.url,)) row = cursor.fetchone() origin.id = row[0] if row is not None else None for revision in origin.revisions: print(f'** {revision}') if origin.id is None: # If the origin is seen for the first time, current revision is # the prefered one. cursor.execute('''INSERT INTO origin (url) VALUES (%s)''', (origin.url,)) # Retrieve current origin's internal id (just generated). cursor.execute('''SELECT id FROM origin WHERE url=%s''', (origin.url,)) origin.id = cursor.fetchone()[0] origin_add_revision(cursor, origin, revision) comp_conn.commit() print(f'##################################################################') comp_conn.close()