diff --git a/compare-all.py b/compare-all.py index 5b29fda..3616a78 100755 --- a/compare-all.py +++ b/compare-all.py @@ -1,185 +1,180 @@ #!/usr/bin/env python import glob import io import logging import os import psycopg2 from swh.model.hashutil import hash_to_hex from swh.provenance import get_provenance # TODO: take conninfo as command line arguments. conninfo1 = { "cls": "local", "db": {"host": "/var/run/postgresql", "port": "5436", "dbname": "old"}, } conninfo2 = { "cls": "local", "db": {"host": "/var/run/postgresql", "port": "5436", "dbname": "provenance"}, } # Write log file with occurrence detail. def logdiff(filename, occurrences): with io.open(filename, "a") as outfile: for row in occurrences: try: # Try to decode path. path = os.fsdecode(row[3]).decode("utf-8", "replace") except: # Use its raw value if not possible path = row[3] outfile.write( "{blob},{rev},{date},{path}\n".format( blob=hash_to_hex(row[0]), rev=hash_to_hex(row[1]), date=row[2], path=path, ) ) # Write log file with list of occurrences. def loglist(filename, occurrences): with io.open(filename, "a") as outfile: for blobid in occurrences: - outfile.write( - "{blob}\n".format( - blob=hash_to_hex(blobid) - ) - ) + outfile.write("{blob}\n".format(blob=hash_to_hex(blobid))) # Output log file name. nextidx = None def outfilename(suffix): global nextidx basename, _ = os.path.splitext(os.path.basename(os.path.abspath(__file__))) prefix = os.path.join(os.getcwd(), basename + "-") if nextidx is None: nextidx = 0 for filename in glob.glob(f"{prefix}*.log"): try: lastidx = int(filename.strip(prefix).split("-")[0]) nextidx = max(nextidx, lastidx + 1) except: continue return f"{prefix}{nextidx:02}-{suffix}.log" # Print iterations progress. # TODO: move to utils module. def progress( iteration, total, prefix="Progress:", suffix="Complete", decimals=1, length=50, fill="█", printEnd="\r", ): """ Call in a loop to create terminal progress bar @params: iteration - Required : current iteration (Int) total - Required : total iterations (Int) prefix - Optional : prefix string (Str) suffix - Optional : suffix string (Str) decimals - Optional : positive number of decimals in percent complete (Int) length - Optional : character length of bar (Int) fill - Optional : bar fill character (Str) printEnd - Optional : end character (e.g. "\r", "\r\n") (Str) """ percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total))) filledLength = int(length * iteration // total) bar = fill * filledLength + "-" * (length - filledLength) print(f"\r{prefix} |{bar}| {percent}% {suffix}", end=printEnd) # Print New Line on Complete if iteration == total: print() if __name__ == "__main__": # Set minimum logging level to INFO. logging.getLogger().setLevel(logging.INFO) # Get provenance object for both databases and query its lists of content. provenance1 = get_provenance(**conninfo1) provenance2 = get_provenance(**conninfo2) provenance1.cursor.execute("""SELECT id FROM content ORDER BY id""") content1 = set(map(lambda row: row[0], provenance1.cursor.fetchall())) provenance2.cursor.execute("""SELECT sha1 FROM content ORDER BY sha1""") content2 = set(map(lambda row: row[0], provenance2.cursor.fetchall())) if content1 == content2: # If lists of content match, we check that occurrences does as well. total = len(content1) progress(0, total) mismatch = False # Iterate over all content querying all its occurrences on both databases. for i, blobid in enumerate(content1): provenance1.cursor.execute( - """(SELECT content_early_in_rev.blob, + """(SELECT content_early_in_rev.blob, content_early_in_rev.rev, revision.date, content_early_in_rev.path FROM content_early_in_rev JOIN revision ON revision.id=content_early_in_rev.rev WHERE content_early_in_rev.blob=%s ) UNION (SELECT content_in_rev.blob, content_in_rev.rev, revision.date, content_in_rev.path FROM (SELECT content_in_dir.blob, directory_in_rev.rev, CASE directory_in_rev.path WHEN '' THEN content_in_dir.path WHEN '.' THEN content_in_dir.path ELSE (directory_in_rev.path || '/' || content_in_dir.path)::unix_path END AS path FROM content_in_dir JOIN directory_in_rev ON content_in_dir.dir=directory_in_rev.dir WHERE content_in_dir.blob=%s ) AS content_in_rev JOIN revision ON revision.id=content_in_rev.rev ) ORDER BY date, rev, path""", (blobid, blobid), ) occurrences1 = list(provenance1.cursor.fetchall()) occurrences2 = list(provenance2.content_find_all(blobid)) # If there is a mismatch log it to file. - if ( - len(occurrences1) != len(occurrences2) or - set(occurrences1) != set(occurrences2) + if len(occurrences1) != len(occurrences2) or set(occurrences1) != set( + occurrences2 ): mismatch = True logging.warning(f"Occurrencies mismatch for {hash_to_hex(blobid)}") logdiff(outfilename(conninfo1["db"]["dbname"]), occurrences1) logdiff(outfilename(conninfo2["db"]["dbname"]), occurrences2) progress(i + 1, total) if not mismatch: logging.info("Databases are equivalent!") else: # If lists of content don't match, we are done. loglist(outfilename(conninfo1["db"]["dbname"]), content1) loglist(outfilename(conninfo2["db"]["dbname"]), content2) logging.warning("Content lists are different") diff --git a/compare-first.py b/compare-first.py index f13e038..18d9cbb 100755 --- a/compare-first.py +++ b/compare-first.py @@ -1,160 +1,156 @@ #!/usr/bin/env python import glob import io import logging import os import psycopg2 from swh.model.hashutil import hash_to_hex from swh.provenance import get_provenance # TODO: take conninfo as command line arguments. conninfo1 = { "cls": "local", "db": {"host": "/var/run/postgresql", "port": "5436", "dbname": "old"}, } conninfo2 = { "cls": "local", "db": {"host": "/var/run/postgresql", "port": "5436", "dbname": "provenance"}, } # Write log file with occurrence detail. def logdiff(filename, occurrence): with io.open(filename, "a") as outfile: try: # Try to decode path. path = os.fsdecode(occurrence[3]).decode("utf-8", "replace") except: # Use its raw value if not possible path = occurrence[3] outfile.write( "{blob},{rev},{date},{path}\n".format( blob=hash_to_hex(occurrence[0]), rev=hash_to_hex(occurrence[1]), date=occurrence[2], path=path, ) ) # Write log file with list of occurrences. def loglist(filename, occurrences): with io.open(filename, "a") as outfile: for blobid in occurrences: - outfile.write( - "{blob}\n".format( - blob=hash_to_hex(blobid) - ) - ) + outfile.write("{blob}\n".format(blob=hash_to_hex(blobid))) # Output log file name. nextidx = None def outfilename(suffix): global nextidx basename, _ = os.path.splitext(os.path.basename(os.path.abspath(__file__))) prefix = os.path.join(os.getcwd(), basename + "-") if nextidx is None: nextidx = 0 for filename in glob.glob(f"{prefix}*.log"): try: lastidx = int(filename.strip(prefix).split("-")[0]) nextidx = max(nextidx, lastidx + 1) except: continue return f"{prefix}{nextidx:02}-{suffix}.log" # Print iterations progress. # TODO: move to utils module. def progress( iteration, total, prefix="Progress:", suffix="Complete", decimals=1, length=50, fill="█", printEnd="\r", ): """ Call in a loop to create terminal progress bar @params: iteration - Required : current iteration (Int) total - Required : total iterations (Int) prefix - Optional : prefix string (Str) suffix - Optional : suffix string (Str) decimals - Optional : positive number of decimals in percent complete (Int) length - Optional : character length of bar (Int) fill - Optional : bar fill character (Str) printEnd - Optional : end character (e.g. "\r", "\r\n") (Str) """ percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total))) filledLength = int(length * iteration // total) bar = fill * filledLength + "-" * (length - filledLength) print(f"\r{prefix} |{bar}| {percent}% {suffix}", end=printEnd) # Print New Line on Complete if iteration == total: print() if __name__ == "__main__": # Set minimum logging level to INFO. logging.getLogger().setLevel(logging.INFO) # Get provenance object for both databases and query its lists of content. provenance1 = get_provenance(**conninfo1) provenance2 = get_provenance(**conninfo2) provenance1.cursor.execute("""SELECT id FROM content ORDER BY id""") content1 = set(map(lambda row: row[0], provenance1.cursor.fetchall())) provenance2.cursor.execute("""SELECT sha1 FROM content ORDER BY sha1""") content2 = set(map(lambda row: row[0], provenance2.cursor.fetchall())) if content1 == content2: # If lists of content match, we check that occurrences does as well. total = len(content1) progress(0, total) mismatch = False # Iterate over all content querying all its occurrences on both databases. for i, blobid in enumerate(content1): provenance1.cursor.execute( """SELECT content_early_in_rev.blob, content_early_in_rev.rev, revision.date, content_early_in_rev.path FROM content_early_in_rev JOIN revision ON revision.id=content_early_in_rev.rev WHERE content_early_in_rev.blob=%s ORDER BY date, rev, path ASC LIMIT 1""", (blobid,), ) occurrence1 = provenance1.cursor.fetchone() occurrence2 = provenance2.content_find_first(blobid) # If there is a mismatch log it to file. We can only compare the timestamp # as the same blob might be seen for the first time in different locations. if occurrence1[2] != occurrence2[2]: mismatch = True logging.warning(f"Occurrencies mismatch for {hash_to_hex(blobid)}") logdiff(outfilename(conninfo1["db"]["dbname"]), occurrence1) logdiff(outfilename(conninfo2["db"]["dbname"]), occurrence2) progress(i + 1, total) if not mismatch: logging.info("Databases are equivalent!") else: # If lists of content don't match, we are done. loglist(outfilename(conninfo1["db"]["dbname"]), content1) loglist(outfilename(conninfo2["db"]["dbname"]), content2) logging.warning("Content lists are different") diff --git a/count.py b/count.py index 4e0a4dd..82b920f 100755 --- a/count.py +++ b/count.py @@ -1,18 +1,18 @@ #!/usr/bin/env python import sys def linecount(filename): count = 0 for line in open(filename).xreadlines(): count += 1 return count if __name__ == "__main__": if len(sys.argv) != 2: - print('usage: count ') + print("usage: count ") exit(-1) print(linecount(sys.argv[1])) diff --git a/dump.py b/dump.py index 85bd66f..82260b9 100755 --- a/dump.py +++ b/dump.py @@ -1,129 +1,130 @@ #!/usr/bin/env python import glob import io import logging import os import psycopg2 from swh.model.hashutil import hash_to_hex from swh.provenance import get_provenance # TODO: take conninfo as command line arguments. conninfo = { "cls": "local", "db": {"host": "/var/run/postgresql", "port": "5436", "dbname": "provenance"}, } def dump(type, hash, time, path="", header="", table=""): return f"{str(header).ljust(5)} | {str(table).ljust(5)} | {str(path).ljust(30)} | {type} {hash_to_hex(hash)} | {str(time).rjust(10)}" if __name__ == "__main__": # Set minimum logging level to INFO. logging.getLogger().setLevel(logging.INFO) # Get provenance object. provenance = get_provenance(**conninfo) provenance.cursor.execute("""SELECT sha1, date FROM revision ORDER BY date""") revisions = list(provenance.cursor.fetchall()) for idx, (revision, date) in enumerate(revisions): # Display current revision information. header = f"R{idx:04}" timestamp = date.timestamp() print(f"{timestamp} {hash_to_hex(revision)} {header}") print(dump("R", revision, timestamp, header=header)) # Display content found early in current revision. provenance.cursor.execute( """SELECT content.sha1 AS blob, content.date AS date, content_location.path AS path FROM (SELECT content_in_rev.blob, location.path FROM (SELECT content_early_in_rev.blob, content_early_in_rev.loc FROM content_early_in_rev JOIN revision ON revision.id=content_early_in_rev.rev WHERE revision.sha1=%s ) AS content_in_rev JOIN location ON location.id=content_in_rev.loc ) AS content_location JOIN content ON content.id=content_location.blob ORDER BY path""", - (revision,) + (revision,), ) content = list(provenance.cursor.fetchall()) for blob, date, path in content: delta = date.timestamp() - timestamp location = os.fsdecode(path) print(dump("C", blob, delta, path=location, table="R---C")) # Display isochrone frontiers found in current revision. provenance.cursor.execute( """SELECT directory.sha1 AS dir, directory.date AS date, directory_location.path AS path FROM (SELECT isochrone_frontier.dir, location.path FROM (SELECT directory_in_rev.dir, directory_in_rev.loc FROM directory_in_rev JOIN revision ON revision.id=directory_in_rev.rev WHERE revision.sha1=%s ) AS isochrone_frontier JOIN location ON location.id=isochrone_frontier.loc ) AS directory_location JOIN directory ON directory.id=directory_location.dir ORDER BY path""", - (revision,) + (revision,), ) directories = list(provenance.cursor.fetchall()) for directory, date, path in directories: delta = date.timestamp() - timestamp location = os.fsdecode(path) + "/" - if location == "/": location = "./" + if location == "/": + location = "./" print(dump("D", directory, delta, path=location, table="R-D ")) # Display content found outside the current isochrone frontier. provenance.cursor.execute( """SELECT content.sha1 AS blob, content.date AS date, content_location.path AS path FROM (SELECT content_outside.blob, location.path FROM (SELECT content_in_dir.blob, content_in_dir.loc FROM content_in_dir JOIN directory ON directory.id=content_in_dir.dir WHERE directory.sha1=%s ) AS content_outside JOIN location ON location.id=content_outside.loc ) AS content_location JOIN content ON content.id=content_location.blob ORDER BY path""", - (directory,) + (directory,), ) content = list(provenance.cursor.fetchall()) for blob, date, path in content: delta = date.timestamp() - timestamp location = " + " + os.fsdecode(path) print(dump("C", blob, delta, path=location, table=" D-C")) print("") diff --git a/find-blob.py b/find-blob.py index 2865f3c..d0c9cd4 100755 --- a/find-blob.py +++ b/find-blob.py @@ -1,66 +1,70 @@ #!/usr/bin/env python import logging import os import sys from swh.model.cli import identify_object from swh.model.hashutil import hash_to_bytes, hash_to_hex from swh.provenance import get_provenance # TODO: take conninfo as command line arguments. conninfo = { "cls": "local", "db": {"host": "/var/run/postgresql", "port": "5436", "dbname": "provenance"}, } if __name__ == "__main__": # Set minimum logging level to INFO. logging.getLogger().setLevel(logging.INFO) if len(sys.argv) < 2: print("usage: find-blob [limit]") exit(-1) # Get provenance object. provenance = get_provenance(**conninfo) obj, swhid = identify_object("content", True, True, sys.argv[1]) sha1 = hash_to_bytes(swhid.split(":")[-1]) print(f"Identifier of object {obj}: {swhid}") limit = sys.argv[2] if len(sys.argv) > 2 else None first = provenance.content_find_first(sha1) if first is not None: - print("===============================================================================") + print( + "===============================================================================" + ) print(f"First occurrence of {obj}:") print( " content: swh:1:cnt:{cnt}, revision: swh:1:rev:{rev}, date: {date}, location: {path}".format( cnt=hash_to_hex(first[0]), rev=hash_to_hex(first[1]), date=first[2], path=os.fsdecode(first[3]), ) ) - print("===============================================================================") + print( + "===============================================================================" + ) if limit is None: print(f"All occurrences of {obj}:") else: print(f"First {limit} occurrences of {obj}:") for occur in provenance.content_find_all(sha1, limit=limit): print( " content: swh:1:cnt:{cnt}, revision: swh:1:rev:{rev}, date: {date}, location: {path}".format( cnt=hash_to_hex(occur[0]), rev=hash_to_hex(occur[1]), date=occur[2], path=os.fsdecode(occur[3]), ) ) else: logging.warning("Requested content not available in the provenance database.") diff --git a/histogram.py b/histogram.py index c44d2ee..6e6b0e1 100755 --- a/histogram.py +++ b/histogram.py @@ -1,44 +1,44 @@ #!/usr/bin/env python import io from swh.provenance import get_provenance # TODO: take conninfo as command line arguments. conninfo = { "cls": "local", "db": {"host": "/var/run/postgresql", "port": "5436", "dbname": "provenance"}, } if __name__ == "__main__": # Get provenance object. provenance = get_provenance(**conninfo) tables = ["directory_in_rev", "content_in_dir"] for table in tables: - provenance.cursor.execute(f""" - SELECT depths.depth, COUNT(depths.depth) - FROM (SELECT - CASE location.path - WHEN '' THEN 0 - WHEN '.' THEN 0 - ELSE 1 + CHAR_LENGTH(ENCODE(location.path, 'escape')) - - CHAR_LENGTH(REPLACE(ENCODE(location.path, 'escape'), '/', '')) - END AS depth - FROM {table} - JOIN location - ON {table}.loc=location.id - ) AS depths - GROUP BY depths.depth - ORDER BY depths.depth - """) + provenance.cursor.execute( + f"""SELECT depths.depth, COUNT(depths.depth) + FROM (SELECT + CASE location.path + WHEN '' THEN 0 + WHEN '.' THEN 0 + ELSE 1 + CHAR_LENGTH(ENCODE(location.path, 'escape')) - + CHAR_LENGTH(REPLACE(ENCODE(location.path, 'escape'), '/', '')) + END AS depth + FROM {table} + JOIN location + ON {table}.loc=location.id + ) AS depths + GROUP BY depths.depth + ORDER BY depths.depth""" + ) filename = "depths_" + conninfo["db"]["dbname"] + f"_{table}.csv" with io.open(filename, "w") as outfile: outfile.write(f"{table} depth,{table} count\n") for depth, count in provenance.cursor.fetchall(): outfile.write(f"{depth},{count}\n") diff --git a/metrics.py b/metrics.py index c58fb33..e331bdb 100755 --- a/metrics.py +++ b/metrics.py @@ -1,121 +1,157 @@ #!/usr/bin/env python import io import json import os import sys from swh.model.hashutil import hash_to_hex from swh.provenance import get_provenance from swh.provenance.provenance import ProvenanceInterface # TODO: take conninfo as command line arguments. conninfo = { "cls": "local", "db": {"host": "/var/run/postgresql", "port": "5436", "dbname": "provenance"}, } def get_tables_stats(provenance: ProvenanceInterface): tables = { - "content": dict(), - "content_early_in_rev": dict(), - "content_in_dir": dict(), - "directory": dict(), - "directory_in_rev": dict(), - "location": dict(), - "revision": dict() + "content": dict(), + "content_early_in_rev": dict(), + "content_in_dir": dict(), + "directory": dict(), + "directory_in_rev": dict(), + "location": dict(), + "revision": dict(), } for table in tables: provenance.cursor.execute(f"SELECT COUNT(*) FROM {table}") tables[table]["row_count"] = provenance.cursor.fetchone()[0] provenance.cursor.execute(f"SELECT pg_table_size('{table}')") tables[table]["table_size"] = provenance.cursor.fetchone()[0] provenance.cursor.execute(f"SELECT pg_indexes_size('{table}')") tables[table]["indexes_size"] = provenance.cursor.fetchone()[0] # provenance.cursor.execute(f"SELECT pg_total_relation_size('{table}')") # relation_size[table] = provenance.cursor.fetchone()[0] - tables[table]["relation_size"] = tables[table]["table_size"] + tables[table]["indexes_size"] + tables[table]["relation_size"] = ( + tables[table]["table_size"] + tables[table]["indexes_size"] + ) return tables if __name__ == "__main__": # Get provenance object. provenance = get_provenance(**conninfo) tables = get_tables_stats(provenance) for table in tables: row_count = tables[table]["row_count"] table_size = tables[table]["table_size"] indexes_size = tables[table]["indexes_size"] relation_size = tables[table]["relation_size"] - + print(f"{table}:") print(f" total rows: {row_count}") if row_count == 0: row_count = 1 - print(f" table size: {table_size} bytes ({table_size / row_count:.2f} per row)") - print(f" index size: {indexes_size} bytes ({indexes_size / row_count:.2f} per row)") - print(f" total size: {relation_size} bytes ({relation_size / row_count:.2f} per row)") + print( + f" table size: {table_size} bytes ({table_size / row_count:.2f} per row)" + ) + print( + f" index size: {indexes_size} bytes ({indexes_size / row_count:.2f} per row)" + ) + print( + f" total size: {relation_size} bytes ({relation_size / row_count:.2f} per row)" + ) # Ratios between de different entities/relations. print("ratios:") - print(f" content/revision: {tables['content']['row_count'] / (tables['revision']['row_count'] if tables['revision']['row_count'] != 0 else 1):.2f}") - print(f" content_early_in_rev/content: {tables['content_early_in_rev']['row_count'] / (tables['content']['row_count'] if tables['content']['row_count'] != 0 else 1):.2f}") - print(f" content_in_dir/content: {tables['content_in_dir']['row_count'] / (tables['content']['row_count'] if tables['content']['row_count'] != 0 else 1):.2f}") - print(f" directory/revision: {tables['directory']['row_count'] / (tables['revision']['row_count'] if tables['revision']['row_count'] != 0 else 1):.2f}") - print(f" directory_in_rev/directory: {tables['directory_in_rev']['row_count'] / (tables['directory']['row_count'] if tables['directory']['row_count'] != 0 else 1):.2f}") + print( + f" content/revision: {tables['content']['row_count'] / (tables['revision']['row_count'] if tables['revision']['row_count'] != 0 else 1):.2f}" + ) + print( + f" content_early_in_rev/content: {tables['content_early_in_rev']['row_count'] / (tables['content']['row_count'] if tables['content']['row_count'] != 0 else 1):.2f}" + ) + print( + f" content_in_dir/content: {tables['content_in_dir']['row_count'] / (tables['content']['row_count'] if tables['content']['row_count'] != 0 else 1):.2f}" + ) + print( + f" directory/revision: {tables['directory']['row_count'] / (tables['revision']['row_count'] if tables['revision']['row_count'] != 0 else 1):.2f}" + ) + print( + f" directory_in_rev/directory: {tables['directory_in_rev']['row_count'] / (tables['directory']['row_count'] if tables['directory']['row_count'] != 0 else 1):.2f}" + ) print(f" ==============================") - print(f" content_early_in_rev/revision: {tables['content_early_in_rev']['row_count'] / (tables['revision']['row_count'] if tables['revision']['row_count'] != 0 else 1):.2f}") - print(f" content_in_dir/directory: {tables['content_in_dir']['row_count'] / (tables['directory']['row_count'] if tables['directory']['row_count'] != 0 else 1):.2f}") - print(f" directory_in_rev/revision: {tables['directory_in_rev']['row_count'] / (tables['revision']['row_count'] if tables['revision']['row_count'] != 0 else 1):.2f}") + print( + f" content_early_in_rev/revision: {tables['content_early_in_rev']['row_count'] / (tables['revision']['row_count'] if tables['revision']['row_count'] != 0 else 1):.2f}" + ) + print( + f" content_in_dir/directory: {tables['content_in_dir']['row_count'] / (tables['directory']['row_count'] if tables['directory']['row_count'] != 0 else 1):.2f}" + ) + print( + f" directory_in_rev/revision: {tables['directory_in_rev']['row_count'] / (tables['revision']['row_count'] if tables['revision']['row_count'] != 0 else 1):.2f}" + ) # Metrics for frontiers defined in root directories. - provenance.cursor.execute(f"""SELECT dir - FROM directory_in_rev - INNER JOIN location - ON loc=location.id - WHERE location.path=%s""", (b"",)) + provenance.cursor.execute( + f"""SELECT dir + FROM directory_in_rev + INNER JOIN location + ON loc=location.id + WHERE location.path=%s""", + (b"",), + ) directories = list(provenance.cursor.fetchall()) print(f"Total root frontiers used: {len(directories)}") - provenance.cursor.execute(f"""SELECT dir - FROM directory_in_rev - INNER JOIN location - ON loc=location.id - WHERE location.path=%s - GROUP BY dir""", (b"",)) + provenance.cursor.execute( + f"""SELECT dir + FROM directory_in_rev + INNER JOIN location + ON loc=location.id + WHERE location.path=%s + GROUP BY dir""", + (b"",), + ) directories = list(provenance.cursor.fetchall()) print(f"Total distinct root frontiers: {len(directories)}") - provenance.cursor.execute(f"""SELECT roots.dir - FROM (SELECT dir, loc - FROM directory_in_rev - INNER JOIN location - ON loc=location.id - WHERE location.path=%s) AS roots - JOIN directory_in_rev - ON directory_in_rev.dir=roots.dir - WHERE directory_in_rev.loc!=roots.loc""", (b"",)) + provenance.cursor.execute( + f"""SELECT roots.dir + FROM (SELECT dir, loc + FROM directory_in_rev + INNER JOIN location + ON loc=location.id + WHERE location.path=%s) AS roots + JOIN directory_in_rev + ON directory_in_rev.dir=roots.dir + WHERE directory_in_rev.loc!=roots.loc""", + (b"",), + ) directories = list(provenance.cursor.fetchall()) print(f"Total other uses of these frontiers: {len(directories)}") - provenance.cursor.execute(f"""SELECT roots.dir - FROM (SELECT dir, loc - FROM directory_in_rev - INNER JOIN location - ON loc=location.id - WHERE location.path=%s) AS roots - JOIN directory_in_rev - ON directory_in_rev.dir=roots.dir - WHERE directory_in_rev.loc!=roots.loc - GROUP BY roots.dir""", (b"",)) + provenance.cursor.execute( + f"""SELECT roots.dir + FROM (SELECT dir, loc + FROM directory_in_rev + INNER JOIN location + ON loc=location.id + WHERE location.path=%s) AS roots + JOIN directory_in_rev + ON directory_in_rev.dir=roots.dir + WHERE directory_in_rev.loc!=roots.loc + GROUP BY roots.dir""", + (b"",), + ) directories = list(provenance.cursor.fetchall()) print(f"Total distinct other uses of frontiers: {len(directories)}") diff --git a/revisions/client.py b/revisions/client.py index 4d40941..68d7e91 100755 --- a/revisions/client.py +++ b/revisions/client.py @@ -1,134 +1,156 @@ #!/usr/bin/env python import logging import subprocess import sys import time import zmq from multiprocessing import Process from threading import Thread from datetime import datetime from swh.model.hashutil import hash_to_bytes, hash_to_hex from swh.provenance import get_archive, get_provenance from swh.provenance.archive import ArchiveInterface from swh.provenance.provenance import revision_add from swh.provenance.revision import RevisionEntry from typing import Any, Dict # TODO: take this from a configuration file conninfo = { "archive": { "cls": "direct", "db": { "host": "somerset.internal.softwareheritage.org", "port": "5433", "dbname": "softwareheritage", - "user": "guest" - } + "user": "guest", + }, }, "provenance": { "cls": "local", - "db": { - "host": "/var/run/postgresql", - "port": "5436", - "dbname": "provenance" - } + "db": {"host": "/var/run/postgresql", "port": "5436", "dbname": "provenance"}, }, } class Client(Process): - def __init__(self, idx: int, threads: int, conninfo : Dict[str, Any]): + def __init__( + self, + idx: int, + threads: int, + conninfo: Dict[str, Any], + lower: bool, + mindepth: int, + ): super().__init__() self.idx = idx self.threads = threads self.conninfo = conninfo + self.lower = lower + self.mindepth = mindepth def run(self): # Using the same archive object for every worker to share internal caches. archive = get_archive(**self.conninfo["archive"]) # Launch as many threads as requested workers = [] for idx in range(self.threads): logging.info(f"Process {self.idx}: launching thread {idx}") - worker = Worker(idx, archive, self.conninfo) + worker = Worker(idx, archive, self.conninfo, self.lower, self.mindepth) worker.start() workers.append(worker) # Wait for all threads to complete their work for idx, worker in enumerate(workers): logging.info(f"Process {self.idx}: waiting for thread {idx} to finish") worker.join() logging.info(f"Process {self.idx}: thread {idx} finished executing") class Worker(Thread): - def __init__(self, idx: int, archive : ArchiveInterface, conninfo : Dict[str, Any]): + def __init__( + self, + idx: int, + archive: ArchiveInterface, + conninfo: Dict[str, Any], + lower: bool, + mindepth: int, + ): super().__init__() self.idx = idx self.archive = archive self.server = conninfo["server"] # Each worker has its own provenance object to isolate # the processing of each revision. self.provenance = get_provenance(**conninfo["provenance"]) + self.lower = lower + self.mindepth = mindepth def run(self): context = zmq.Context() socket = context.socket(zmq.REQ) socket.connect(self.server) while True: socket.send(b"NEXT") message = socket.recv_json() if message is None: break revision = RevisionEntry( self.archive, hash_to_bytes(message["rev"]), date=datetime.fromisoformat(message["date"]), - root=hash_to_bytes(message["root"]) + root=hash_to_bytes(message["root"]), + ) + revision_add( + self.provenance, + self.archive, + revision, + lower=self.lower, + mindepth=self.mindepth, ) - revision_add(self.provenance, self.archive, revision) if __name__ == "__main__": # Check parameters - if len(sys.argv) != 3: - print("usage: client ") + if len(sys.argv) != 5: + print("usage: client ") exit(-1) processes = int(sys.argv[1]) port = int(sys.argv[2]) - threads = 1 # int(sys.argv[2]) + threads = 1 # int(sys.argv[2]) + lower = bool(sys.argv[3]) + mindepth = int(sys.argv[4]) dbname = conninfo["provenance"]["db"]["dbname"] conninfo["server"] = f"tcp://localhost:{port}" # Set logging level # logging.getLogger().setLevel(logging.INFO) # Start counter start = time.time() # Launch as many clients as requested clients = [] for idx in range(processes): logging.info(f"MAIN: launching process {idx}") - client = Client(idx, threads, conninfo) + client = Client(idx, threads, conninfo, lower, mindepth) client.start() clients.append(client) # Wait for all processes to complete their work for idx, client in enumerate(clients): logging.info(f"MAIN: waiting for process {idx} to finish") client.join() logging.info(f"MAIN: process {idx} finished executing") # Stop counter and report elapsed time stop = time.time() - print("Elapsed time:", stop-start, "seconds") + print("Elapsed time:", stop - start, "seconds") diff --git a/revisions/server.py b/revisions/server.py index fc066d9..67d89b1 100755 --- a/revisions/server.py +++ b/revisions/server.py @@ -1,151 +1,159 @@ #!/usr/bin/env python import io import itertools import json import logging import subprocess import sys import zmq from swh.model.hashutil import hash_to_hex from swh.provenance import get_archive, get_provenance from swh.provenance.provenance import ProvenanceInterface from swh.provenance.revision import CSVRevisionIterator # TODO: take this from a configuration file conninfo = { "archive": { "cls": "direct", "db": { "host": "somerset.internal.softwareheritage.org", "port": "5433", "dbname": "softwareheritage", - "user": "guest" - } + "user": "guest", + }, }, "provenance": { "cls": "local", - "db": { - "host": "/var/run/postgresql", - "port": "5436", - "dbname": "provenance" - } + "db": {"host": "/var/run/postgresql", "port": "5436", "dbname": "provenance"}, }, } def get_tables_stats(provenance: ProvenanceInterface): tables = { - "content": dict(), - "content_early_in_rev": dict(), - "content_in_dir": dict(), - "directory": dict(), - "directory_in_rev": dict(), - "location": dict(), - "revision": dict() + "content": dict(), + "content_early_in_rev": dict(), + "content_in_dir": dict(), + "directory": dict(), + "directory_in_rev": dict(), + "location": dict(), + "revision": dict(), } for table in tables: provenance.cursor.execute(f"SELECT COUNT(*) FROM {table}") tables[table]["row_count"] = provenance.cursor.fetchone()[0] provenance.cursor.execute(f"SELECT pg_table_size('{table}')") tables[table]["table_size"] = provenance.cursor.fetchone()[0] provenance.cursor.execute(f"SELECT pg_indexes_size('{table}')") tables[table]["indexes_size"] = provenance.cursor.fetchone()[0] # provenance.cursor.execute(f"SELECT pg_total_relation_size('{table}')") # relation_size[table] = provenance.cursor.fetchone()[0] - tables[table]["relation_size"] = tables[table]["table_size"] + tables[table]["indexes_size"] + tables[table]["relation_size"] = ( + tables[table]["table_size"] + tables[table]["indexes_size"] + ) return tables def init_stats(filename): tables = [ - "content", - "content_early_in_rev", - "content_in_dir", - "directory", - "directory_in_rev", - "location", - "revision" + "content", + "content_early_in_rev", + "content_in_dir", + "directory", + "directory_in_rev", + "location", + "revision", ] header = ["revisions count"] for table in tables: header.append(f"{table} rows") header.append(f"{table} table size") header.append(f"{table} index size") header.append(f"{table} relation size") with io.open(filename, "w") as outfile: - outfile.write(','.join(header)) - outfile.write('\n') + outfile.write(",".join(header)) + outfile.write("\n") def write_stats(filename, count, tables): line = [str(count)] for table, stats in tables.items(): line.append(str(stats["row_count"])) line.append(str(stats["table_size"])) line.append(str(stats["indexes_size"])) line.append(str(stats["relation_size"])) with io.open(filename, "a") as outfile: - outfile.write(','.join(line)) - outfile.write('\n') + outfile.write(",".join(line)) + outfile.write("\n") if __name__ == "__main__": if len(sys.argv) < 3: print("usage: server [limit]") print("where") - print(" filename : csv file containing the list of revisions to be iterated (one per") - print(" line): revision sha1, date in ISO format, root directory sha1.") + print( + " filename : csv file containing the list of revisions to be iterated (one per" + ) + print( + " line): revision sha1, date in ISO format, root directory sha1." + ) print(" port : server listening port.") - print(" limit : max number of revisions to be retrieved from the file.") - print(" stats : number of iteration after which stats should be taken.") + print( + " limit : max number of revisions to be retrieved from the file." + ) + print( + " stats : number of iteration after which stats should be taken." + ) exit(-1) filename = sys.argv[1] port = int(sys.argv[2]) limit = int(sys.argv[3]) if len(sys.argv) > 3 else None stats = int(sys.argv[4]) if len(sys.argv) > 3 else None - + context = zmq.Context() socket = context.socket(zmq.REP) socket.bind(f"tcp://*:{port}") archive = get_archive(**conninfo["archive"]) provenance = get_provenance(**conninfo["provenance"]) statsfile = f"stats_{conninfo['provenance']['db']['dbname']}.csv" if stats is not None: init_stats(statsfile) revisions_provider = ( line.strip().split(",") for line in open(filename, "r") if line.strip() ) - for idx, revision in enumerate(CSVRevisionIterator(revisions_provider, archive, limit=limit)): + for idx, revision in enumerate( + CSVRevisionIterator(revisions_provider, archive, limit=limit) + ): if stats is not None and idx != 0 and idx % stats == 0: write_stats(statsfile, idx, get_tables_stats(provenance)) # Wait for next request from client message = socket.recv() message = { - "rev" : hash_to_hex(revision.id), - "date" : str(revision.date), - "root" : hash_to_hex(revision.root) + "rev": hash_to_hex(revision.id), + "date": str(revision.date), + "root": hash_to_hex(revision.root), } socket.send_json(message) while True: # Force all clients to exit message = socket.recv() socket.send_json(None) diff --git a/revisions_format.py b/revisions_format.py index f193963..8afc200 100755 --- a/revisions_format.py +++ b/revisions_format.py @@ -1,41 +1,43 @@ #!/usr/bin/env python import io import json import sys from swh.model.hashutil import hash_to_hex, hash_to_bytes from swh.provenance.postgresql.db_utils import connect conninfo = { "host": "db.internal.softwareheritage.org", "dbname": "softwareheritage", - "user": "guest" + "user": "guest", } -# conninfo = 'postgresql://guest@db.internal.softwareheritage.org/softwareheritage' if __name__ == "__main__": if len(sys.argv) != 3: - print('usage: revisions_format ') + print("usage: revisions_format ") exit(-1) - print(f'Connection to database: {conninfo}...') + print(f"Connection to database: {conninfo}...") conn = connect(conninfo) infilename = sys.argv[1] outfilename = sys.argv[2] with io.open(infilename) as infile: - with io.open(outfilename, 'w') as outfile: + with io.open(outfilename, "w") as outfile: ids = json.loads(infile.read()) print(f"Formatting {len(ids)} revisions") for id in ids: cursor = conn.cursor() - cursor.execute('''SELECT id, date, directory FROM revision - WHERE id=%s AND date IS NOT NULL''', - (hash_to_bytes(id),)) + cursor.execute( + """SELECT id, date, directory + FROM revision + WHERE id=%s AND date IS NOT NULL""", + (hash_to_bytes(id),), + ) rev = cursor.fetchone() assert rev is not None - outfile.write(f'{hash_to_hex(rev[0])},{rev[1]},{hash_to_hex(rev[2])}\n') + outfile.write(f"{hash_to_hex(rev[0])},{rev[1]},{hash_to_hex(rev[2])}\n") diff --git a/revisions_pick.py b/revisions_pick.py index d093176..4a05647 100755 --- a/revisions_pick.py +++ b/revisions_pick.py @@ -1,43 +1,59 @@ #!/usr/bin/env python import io import sys -from swh.model.hashutil import hash_to_hex +from swh.model.hashutil import hash_to_hex, hash_to_bytes from swh.provenance.postgresql.db_utils import connect conninfo = { "host": "db.internal.softwareheritage.org", "dbname": "softwareheritage", - "user": "guest" + "user": "guest", } if __name__ == "__main__": - if len(sys.argv) != 3: - print('usage: listrevs ') + if len(sys.argv) != 2: + print("usage: listrevs ") exit(-1) - count = int(sys.argv[1]) - filename = sys.argv[2] + filename = sys.argv[1] - print(f'Connection to database: {conninfo}...') + print(f"Connection to database: {conninfo}...") conn = connect(conninfo) - cursor = conn.cursor() - cursor.execute('''SELECT COUNT(*) FROM revision''') - total = cursor.fetchone()[0] - - probability = count / total * 100 - print(f"Requesting {count} revisions out of {total} (probability {probability}).") - cursor.execute('''SELECT id, date, directory FROM revision TABLESAMPLE BERNOULLI(%s)''', - (probability,)) - revisions = [row for row in cursor.fetchall() if row[1] is not None] - revisions.sort(key=lambda rev: rev[1]) - # assert len(revisions) >= count - - print(f"Filtering first {count} of {len(revisions)} obtained.") - with io.open(filename, 'w') as outfile: - for rev in revisions[:count]: - outfile.write(f'{hash_to_hex(rev[0])},{rev[1]},{hash_to_hex(rev[2])}\n') + + revisions = set( + [ + hash_to_bytes("1363496c1106606684d40447f5d1149b2c66a9f8"), + hash_to_bytes("b91a781cbc1285d441aa682926d93d8c23678b0b"), + hash_to_bytes("313315d9790c36e22bb5bb034e9c7d7f470cdf73"), + hash_to_bytes("a3b54f0f5de1ad17889fd23aee7c230eefc300cd"), + hash_to_bytes("74deb33d12bf275a3b3a9afc833f4760be90f031"), + ] + ) + pending = revisions + + while pending: + cursor.execute( + """SELECT parent_id FROM revision_history WHERE id IN %s""", + (tuple(pending),), + ) + parents = set(map(lambda row: row[0], cursor.fetchall())) + pending = parents - revisions + revisions = revisions | parents + + # print(f"Requesting {count} revisions out of {total} (probability {probability}).") + cursor.execute( + """SELECT id, date, directory FROM revision WHERE id IN %s""", + (tuple(revisions),), + ) + ordered = [row for row in cursor.fetchall() if row[1] is not None] + ordered.sort(key=lambda rev: rev[1]) + + print(f"Obtained {len(ordered)} revisions.") + with io.open(filename, "w") as outfile: + for rev in ordered: + outfile.write(f"{hash_to_hex(rev[0])},{rev[1]},{hash_to_hex(rev[2])}\n")