diff --git a/.gitignore b/.gitignore index 29a3478..abeeaa0 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,6 @@ +*.c *.csv +*.json *.log *.txt *.zip diff --git a/compare-all.py b/compare-all.py index c441814..5b29fda 100755 --- a/compare-all.py +++ b/compare-all.py @@ -1,185 +1,185 @@ #!/usr/bin/env python import glob import io import logging import os import psycopg2 from swh.model.hashutil import hash_to_hex from swh.provenance import get_provenance # TODO: take conninfo as command line arguments. conninfo1 = { - "cls": "ps", + "cls": "local", "db": {"host": "/var/run/postgresql", "port": "5436", "dbname": "old"}, } conninfo2 = { - "cls": "ps", - "db": {"host": "/var/run/postgresql", "port": "5436", "dbname": "withids"}, + "cls": "local", + "db": {"host": "/var/run/postgresql", "port": "5436", "dbname": "provenance"}, } # Write log file with occurrence detail. def logdiff(filename, occurrences): with io.open(filename, "a") as outfile: for row in occurrences: try: # Try to decode path. path = os.fsdecode(row[3]).decode("utf-8", "replace") except: # Use its raw value if not possible path = row[3] outfile.write( "{blob},{rev},{date},{path}\n".format( blob=hash_to_hex(row[0]), rev=hash_to_hex(row[1]), date=row[2], path=path, ) ) # Write log file with list of occurrences. def loglist(filename, occurrences): with io.open(filename, "a") as outfile: for blobid in occurrences: outfile.write( "{blob}\n".format( blob=hash_to_hex(blobid) ) ) # Output log file name. nextidx = None def outfilename(suffix): global nextidx basename, _ = os.path.splitext(os.path.basename(os.path.abspath(__file__))) prefix = os.path.join(os.getcwd(), basename + "-") if nextidx is None: nextidx = 0 for filename in glob.glob(f"{prefix}*.log"): try: lastidx = int(filename.strip(prefix).split("-")[0]) nextidx = max(nextidx, lastidx + 1) except: continue return f"{prefix}{nextidx:02}-{suffix}.log" # Print iterations progress. # TODO: move to utils module. def progress( iteration, total, prefix="Progress:", suffix="Complete", decimals=1, length=50, fill="█", printEnd="\r", ): """ Call in a loop to create terminal progress bar @params: iteration - Required : current iteration (Int) total - Required : total iterations (Int) prefix - Optional : prefix string (Str) suffix - Optional : suffix string (Str) decimals - Optional : positive number of decimals in percent complete (Int) length - Optional : character length of bar (Int) fill - Optional : bar fill character (Str) printEnd - Optional : end character (e.g. "\r", "\r\n") (Str) """ percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total))) filledLength = int(length * iteration // total) bar = fill * filledLength + "-" * (length - filledLength) print(f"\r{prefix} |{bar}| {percent}% {suffix}", end=printEnd) # Print New Line on Complete if iteration == total: print() if __name__ == "__main__": # Set minimum logging level to INFO. logging.getLogger().setLevel(logging.INFO) # Get provenance object for both databases and query its lists of content. provenance1 = get_provenance(**conninfo1) provenance2 = get_provenance(**conninfo2) provenance1.cursor.execute("""SELECT id FROM content ORDER BY id""") content1 = set(map(lambda row: row[0], provenance1.cursor.fetchall())) provenance2.cursor.execute("""SELECT sha1 FROM content ORDER BY sha1""") content2 = set(map(lambda row: row[0], provenance2.cursor.fetchall())) if content1 == content2: # If lists of content match, we check that occurrences does as well. total = len(content1) progress(0, total) mismatch = False # Iterate over all content querying all its occurrences on both databases. for i, blobid in enumerate(content1): provenance1.cursor.execute( """(SELECT content_early_in_rev.blob, content_early_in_rev.rev, revision.date, content_early_in_rev.path FROM content_early_in_rev JOIN revision ON revision.id=content_early_in_rev.rev WHERE content_early_in_rev.blob=%s ) UNION (SELECT content_in_rev.blob, content_in_rev.rev, revision.date, content_in_rev.path FROM (SELECT content_in_dir.blob, directory_in_rev.rev, CASE directory_in_rev.path WHEN '' THEN content_in_dir.path WHEN '.' THEN content_in_dir.path ELSE (directory_in_rev.path || '/' || content_in_dir.path)::unix_path END AS path FROM content_in_dir JOIN directory_in_rev ON content_in_dir.dir=directory_in_rev.dir WHERE content_in_dir.blob=%s ) AS content_in_rev JOIN revision ON revision.id=content_in_rev.rev ) ORDER BY date, rev, path""", (blobid, blobid), ) occurrences1 = list(provenance1.cursor.fetchall()) occurrences2 = list(provenance2.content_find_all(blobid)) # If there is a mismatch log it to file. if ( len(occurrences1) != len(occurrences2) or set(occurrences1) != set(occurrences2) ): mismatch = True logging.warning(f"Occurrencies mismatch for {hash_to_hex(blobid)}") logdiff(outfilename(conninfo1["db"]["dbname"]), occurrences1) logdiff(outfilename(conninfo2["db"]["dbname"]), occurrences2) progress(i + 1, total) if not mismatch: logging.info("Databases are equivalent!") else: # If lists of content don't match, we are done. loglist(outfilename(conninfo1["db"]["dbname"]), content1) loglist(outfilename(conninfo2["db"]["dbname"]), content2) logging.warning("Content lists are different") diff --git a/compare-first.py b/compare-first.py index 9004cf6..f13e038 100755 --- a/compare-first.py +++ b/compare-first.py @@ -1,160 +1,160 @@ #!/usr/bin/env python import glob import io import logging import os import psycopg2 from swh.model.hashutil import hash_to_hex from swh.provenance import get_provenance # TODO: take conninfo as command line arguments. conninfo1 = { - "cls": "ps", + "cls": "local", "db": {"host": "/var/run/postgresql", "port": "5436", "dbname": "old"}, } conninfo2 = { - "cls": "ps", - "db": {"host": "/var/run/postgresql", "port": "5436", "dbname": "withids"}, + "cls": "local", + "db": {"host": "/var/run/postgresql", "port": "5436", "dbname": "provenance"}, } # Write log file with occurrence detail. def logdiff(filename, occurrence): with io.open(filename, "a") as outfile: try: # Try to decode path. path = os.fsdecode(occurrence[3]).decode("utf-8", "replace") except: # Use its raw value if not possible path = occurrence[3] outfile.write( "{blob},{rev},{date},{path}\n".format( blob=hash_to_hex(occurrence[0]), rev=hash_to_hex(occurrence[1]), date=occurrence[2], path=path, ) ) # Write log file with list of occurrences. def loglist(filename, occurrences): with io.open(filename, "a") as outfile: for blobid in occurrences: outfile.write( "{blob}\n".format( blob=hash_to_hex(blobid) ) ) # Output log file name. nextidx = None def outfilename(suffix): global nextidx basename, _ = os.path.splitext(os.path.basename(os.path.abspath(__file__))) prefix = os.path.join(os.getcwd(), basename + "-") if nextidx is None: nextidx = 0 for filename in glob.glob(f"{prefix}*.log"): try: lastidx = int(filename.strip(prefix).split("-")[0]) nextidx = max(nextidx, lastidx + 1) except: continue return f"{prefix}{nextidx:02}-{suffix}.log" # Print iterations progress. # TODO: move to utils module. def progress( iteration, total, prefix="Progress:", suffix="Complete", decimals=1, length=50, fill="█", printEnd="\r", ): """ Call in a loop to create terminal progress bar @params: iteration - Required : current iteration (Int) total - Required : total iterations (Int) prefix - Optional : prefix string (Str) suffix - Optional : suffix string (Str) decimals - Optional : positive number of decimals in percent complete (Int) length - Optional : character length of bar (Int) fill - Optional : bar fill character (Str) printEnd - Optional : end character (e.g. "\r", "\r\n") (Str) """ percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total))) filledLength = int(length * iteration // total) bar = fill * filledLength + "-" * (length - filledLength) print(f"\r{prefix} |{bar}| {percent}% {suffix}", end=printEnd) # Print New Line on Complete if iteration == total: print() if __name__ == "__main__": # Set minimum logging level to INFO. logging.getLogger().setLevel(logging.INFO) # Get provenance object for both databases and query its lists of content. provenance1 = get_provenance(**conninfo1) provenance2 = get_provenance(**conninfo2) provenance1.cursor.execute("""SELECT id FROM content ORDER BY id""") content1 = set(map(lambda row: row[0], provenance1.cursor.fetchall())) provenance2.cursor.execute("""SELECT sha1 FROM content ORDER BY sha1""") content2 = set(map(lambda row: row[0], provenance2.cursor.fetchall())) if content1 == content2: # If lists of content match, we check that occurrences does as well. total = len(content1) progress(0, total) mismatch = False # Iterate over all content querying all its occurrences on both databases. for i, blobid in enumerate(content1): provenance1.cursor.execute( """SELECT content_early_in_rev.blob, content_early_in_rev.rev, revision.date, content_early_in_rev.path FROM content_early_in_rev JOIN revision ON revision.id=content_early_in_rev.rev WHERE content_early_in_rev.blob=%s ORDER BY date, rev, path ASC LIMIT 1""", (blobid,), ) occurrence1 = provenance1.cursor.fetchone() occurrence2 = provenance2.content_find_first(blobid) # If there is a mismatch log it to file. We can only compare the timestamp # as the same blob might be seen for the first time in different locations. if occurrence1[2] != occurrence2[2]: mismatch = True logging.warning(f"Occurrencies mismatch for {hash_to_hex(blobid)}") logdiff(outfilename(conninfo1["db"]["dbname"]), occurrence1) logdiff(outfilename(conninfo2["db"]["dbname"]), occurrence2) progress(i + 1, total) if not mismatch: logging.info("Databases are equivalent!") else: # If lists of content don't match, we are done. loglist(outfilename(conninfo1["db"]["dbname"]), content1) loglist(outfilename(conninfo2["db"]["dbname"]), content2) logging.warning("Content lists are different") diff --git a/dump.py b/dump.py index 0dd323a..7703c1f 100755 --- a/dump.py +++ b/dump.py @@ -1,129 +1,129 @@ #!/usr/bin/env python import glob import io import logging import os import psycopg2 from swh.model.hashutil import hash_to_hex from swh.provenance import get_provenance # TODO: take conninfo as command line arguments. conninfo = { - "cls": "ps", - "db": {"host": "/var/run/postgresql", "port": "5436", "dbname": "synthetic"}, + "cls": "local", + "db": {"host": "/var/run/postgresql", "port": "5436", "dbname": "provenance"}, } def dump(type, hash, time, path="", header="", table=""): return f"{str(header).ljust(5)} | {str(table).ljust(5)} | {str(path).ljust(30)} | {type} {hash_to_hex(hash)} | {str(time).rjust(10)}" if __name__ == "__main__": # Set minimum logging level to INFO. logging.getLogger().setLevel(logging.INFO) # Get provenance object for both databases and query its lists of content. provenance = get_provenance(**conninfo) provenance.cursor.execute("""SELECT sha1, date FROM revision ORDER BY date""") revisions = list(provenance.cursor.fetchall()) for idx, (revision, date) in enumerate(revisions): # Display current revision information. header = f"R{idx:04}" timestamp = date.timestamp() print(f"{timestamp} {hash_to_hex(revision)} {header}") print(dump("R", revision, timestamp, header=header)) # Display content found early in current revision. provenance.cursor.execute( """SELECT content.sha1 AS blob, content.date AS date, content_location.path AS path FROM (SELECT content_in_rev.blob, location.path FROM (SELECT content_early_in_rev.blob, content_early_in_rev.loc FROM content_early_in_rev JOIN revision ON revision.id=content_early_in_rev.rev WHERE revision.sha1=%s ) AS content_in_rev JOIN location ON location.id=content_in_rev.loc ) AS content_location JOIN content ON content.id=content_location.blob ORDER BY path""", (revision,) ) content = list(provenance.cursor.fetchall()) for blob, date, path in content: delta = date.timestamp() - timestamp location = os.fsdecode(path) print(dump("C", blob, delta, path=location, table="R---C")) # Display isochrone frontiers found in current revision. provenance.cursor.execute( """SELECT directory.sha1 AS dir, directory.date AS date, directory_location.path AS path FROM (SELECT isochrone_frontier.dir, location.path FROM (SELECT directory_in_rev.dir, directory_in_rev.loc FROM directory_in_rev JOIN revision ON revision.id=directory_in_rev.rev WHERE revision.sha1=%s ) AS isochrone_frontier JOIN location ON location.id=isochrone_frontier.loc ) AS directory_location JOIN directory ON directory.id=directory_location.dir ORDER BY path""", (revision,) ) directories = list(provenance.cursor.fetchall()) for directory, date, path in directories: delta = date.timestamp() - timestamp location = os.fsdecode(path) + "/" if location == "/": location = "./" print(dump("D", directory, delta, path=location, table="R-D ")) # Display content found outside the current isochrone frontier. provenance.cursor.execute( """SELECT content.sha1 AS blob, content.date AS date, content_location.path AS path FROM (SELECT content_outside.blob, location.path FROM (SELECT content_in_dir.blob, content_in_dir.loc FROM content_in_dir JOIN directory ON directory.id=content_in_dir.dir WHERE directory.sha1=%s ) AS content_outside JOIN location ON location.id=content_outside.loc ) AS content_location JOIN content ON content.id=content_location.blob ORDER BY path""", (directory,) ) content = list(provenance.cursor.fetchall()) for blob, date, path in content: delta = date.timestamp() - timestamp location = " + " + os.fsdecode(path) print(dump("C", blob, delta, path=location, table=" D-C")) print("") diff --git a/find-blob.py b/find-blob.py index 8d036fc..ed5435a 100755 --- a/find-blob.py +++ b/find-blob.py @@ -1,61 +1,66 @@ #!/usr/bin/env python import logging import os import sys from swh.model.cli import identify_object from swh.model.hashutil import hash_to_bytes, hash_to_hex from swh.provenance import get_provenance # TODO: take conninfo as command line arguments. conninfo = { - "cls": "ps", - "db": {"host": "/var/run/postgresql", "port": "5436", "dbname": "lower1m"}, + "cls": "local", + "db": {"host": "/var/run/postgresql", "port": "5436", "dbname": "provenance"}, } if __name__ == "__main__": # Set minimum logging level to INFO. logging.getLogger().setLevel(logging.INFO) - if len(sys.argv) != 2: - print("usage: find-blob ") + if len(sys.argv) < 2: + print("usage: find-blob [limit]") exit(-1) # Get provenance object for both databases and query its lists of content. provenance = get_provenance(**conninfo) obj, swhid = identify_object("content", True, True, sys.argv[1]) sha1 = hash_to_bytes(swhid.split(":")[-1]) print(f"Identifier of object {obj}: {swhid}") + limit = sys.argv[2] if len(sys.argv) > 2 else None + first = provenance.content_find_first(sha1) if first is not None: print("===============================================================================") print(f"First occurrence of {obj}:") print( - " content: {blob}, revision: {rev}, date: {date}, location: {path}".format( - blob=hash_to_hex(first[0]), + " content: swh:1:cnt:{cnt}, revision: swh:1:rev:{rev}, date: {date}, location: {path}".format( + cnt=hash_to_hex(first[0]), rev=hash_to_hex(first[1]), date=first[2], path=os.fsdecode(first[3]), ) ) print("===============================================================================") - print(f"All occurrences of {obj}:") - for occur in provenance.content_find_all(sha1): + if limit is None: + print(f"All occurrences of {obj}:") + else: + print(f"First {limit} occurrences of {obj}:") + for occur in provenance.content_find_all(sha1, limit=limit): print( - " content: {blob}, revision: {rev}, date: {date}, location: {path}".format( - blob=hash_to_hex(occur[0]), + " content: swh:1:cnt:{cnt}, revision: swh:1:rev:{rev}, date: {date}, location: {path}".format( + cnt=hash_to_hex(occur[0]), rev=hash_to_hex(occur[1]), date=occur[2], path=os.fsdecode(occur[3]), ) ) else: logging.warning("Requested content not available in the provenance database.") diff --git a/metrics.py b/metrics.py index 23a895c..49e95fa 100755 --- a/metrics.py +++ b/metrics.py @@ -1,124 +1,158 @@ #!/usr/bin/env python import io +import json +import os import sys from swh.model.hashutil import hash_to_hex from swh.provenance import get_provenance +from swh.provenance.provenance import ProvenanceInterface # TODO: take conninfo as command line arguments. conninfo = { - "cls": "ps", - "db": {"host": "/var/run/postgresql", "port": "5436", "dbname": "upper2m8proc"}, + "cls": "local", + "db": {"host": "/var/run/postgresql", "port": "5436", "dbname": "provenance"}, } -if __name__ == "__main__": - # Get provenance object for both databases and query its lists of content. - provenance = get_provenance(**conninfo) +def get_tables_stats(provenance: ProvenanceInterface): + tables = { + "content": dict(), + "content_early_in_rev": dict(), + "content_in_dir": dict(), + "directory": dict(), + "directory_in_rev": dict(), + "location": dict(), + "revision": dict() + } - tables = [ - "content", - "content_early_in_rev", - "content_in_dir", - "directory", - "directory_in_rev", - "location", - "revision" - ] - - row_count = {} - table_size = {} - indexes_size = {} - relation_size = {} for table in tables: provenance.cursor.execute(f"SELECT COUNT(*) FROM {table}") - row_count[table] = provenance.cursor.fetchone()[0] + tables[table]["row_count"] = provenance.cursor.fetchone()[0] provenance.cursor.execute(f"SELECT pg_table_size('{table}')") - table_size[table] = provenance.cursor.fetchone()[0] + tables[table]["table_size"] = provenance.cursor.fetchone()[0] provenance.cursor.execute(f"SELECT pg_indexes_size('{table}')") - indexes_size[table] = provenance.cursor.fetchone()[0] + tables[table]["indexes_size"] = provenance.cursor.fetchone()[0] # provenance.cursor.execute(f"SELECT pg_total_relation_size('{table}')") # relation_size[table] = provenance.cursor.fetchone()[0] - relation_size[table] = table_size[table] + indexes_size[table] + tables[table]["relation_size"] = tables[table]["table_size"] + tables[table]["indexes_size"] + + return tables + + +if __name__ == "__main__": + # Get provenance object for both databases and query its lists of content. + provenance = get_provenance(**conninfo) + tables = get_tables_stats(provenance) + + for table in tables: + row_count = tables[table]["row_count"] + table_size = tables[table]["table_size"] + indexes_size = tables[table]["indexes_size"] + relation_size = tables[table]["relation_size"] + print(f"{table}:") - print(f" total rows: {row_count[table]}") - print(f" table size: {table_size[table]} bytes ({table_size[table] / row_count[table]:.2f} per row)") - print(f" index size: {indexes_size[table]} bytes ({indexes_size[table] / row_count[table]:.2f} per row)") - print(f" total size: {relation_size[table]} bytes ({relation_size[table] / row_count[table]:.2f} per row)") + print(f" total rows: {row_count}") + if row_count == 0: + row_count = 1 + print(f" table size: {table_size} bytes ({table_size / row_count:.2f} per row)") + print(f" index size: {indexes_size} bytes ({indexes_size / row_count:.2f} per row)") + print(f" total size: {relation_size} bytes ({relation_size / row_count:.2f} per row)") # Ratios between de different entities/relations. print("ratios:") - print(f" content/revision: {row_count['content'] / row_count['revision']:.2f}") - print(f" content_early_in_rev/content: {row_count['content_early_in_rev'] / row_count['content']:.2f}") - print(f" content_in_dir/content: {row_count['content_in_dir'] / row_count['content']:.2f}") - print(f" directory/revision: {row_count['directory'] / row_count['revision']:.2f}") - print(f" directory_in_rev/directory: {row_count['directory_in_rev'] / row_count['directory']:.2f}") + print(f" content/revision: {tables['content']['row_count'] / (tables['revision']['row_count'] if tables['revision']['row_count'] != 0 else 1):.2f}") + print(f" content_early_in_rev/content: {tables['content_early_in_rev']['row_count'] / (tables['content']['row_count'] if tables['content']['row_count'] != 0 else 1):.2f}") + print(f" content_in_dir/content: {tables['content_in_dir']['row_count'] / (tables['content']['row_count'] if tables['content']['row_count'] != 0 else 1):.2f}") + print(f" directory/revision: {tables['directory']['row_count'] / (tables['revision']['row_count'] if tables['revision']['row_count'] != 0 else 1):.2f}") + print(f" directory_in_rev/directory: {tables['directory_in_rev']['row_count'] / (tables['directory']['row_count'] if tables['directory']['row_count'] != 0 else 1):.2f}") print(f" ==============================") - print(f" content_early_in_rev/revision: {row_count['content_early_in_rev'] / row_count['revision']:.2f}") - print(f" content_in_dir/directory: {row_count['content_in_dir'] / row_count['directory']:.2f}") - print(f" directory_in_rev/revision: {row_count['directory_in_rev'] / row_count['revision']:.2f}") + print(f" content_early_in_rev/revision: {tables['content_early_in_rev']['row_count'] / (tables['revision']['row_count'] if tables['revision']['row_count'] != 0 else 1):.2f}") + print(f" content_in_dir/directory: {tables['content_in_dir']['row_count'] / (tables['directory']['row_count'] if tables['directory']['row_count'] != 0 else 1):.2f}") + print(f" directory_in_rev/revision: {tables['directory_in_rev']['row_count'] / (tables['revision']['row_count'] if tables['revision']['row_count'] != 0 else 1):.2f}") # Metrics for frontiers defined in root directories. provenance.cursor.execute(f"""SELECT dir FROM directory_in_rev INNER JOIN location ON loc=location.id WHERE location.path=%s""", (b"",)) directories = list(provenance.cursor.fetchall()) print(f"Total root frontiers used: {len(directories)}") provenance.cursor.execute(f"""SELECT dir FROM directory_in_rev INNER JOIN location ON loc=location.id WHERE location.path=%s GROUP BY dir""", (b"",)) directories = list(provenance.cursor.fetchall()) print(f"Total distinct root frontiers: {len(directories)}") provenance.cursor.execute(f"""SELECT roots.dir FROM (SELECT dir, loc FROM directory_in_rev INNER JOIN location ON loc=location.id WHERE location.path=%s) AS roots JOIN directory_in_rev ON directory_in_rev.dir=roots.dir WHERE directory_in_rev.loc!=roots.loc""", (b"",)) directories = list(provenance.cursor.fetchall()) print(f"Total other uses of these frontiers: {len(directories)}") provenance.cursor.execute(f"""SELECT roots.dir FROM (SELECT dir, loc FROM directory_in_rev INNER JOIN location ON loc=location.id WHERE location.path=%s) AS roots JOIN directory_in_rev ON directory_in_rev.dir=roots.dir WHERE directory_in_rev.loc!=roots.loc GROUP BY roots.dir""", (b"",)) directories = list(provenance.cursor.fetchall()) print(f"Total distinct other uses of frontiers: {len(directories)}") + # provenance.cursor.execute(f"""SELECT location.path + # FROM directory_in_rev + # JOIN location + # ON directory_in_rev.loc=location.id""") + # depths = list(map(lambda row: os.fsdecode(row[0]).count('/'), provenance.cursor.fetchall())) + # with io.open("directory_in_rev.json", "w") as outfile: + # outfile.write(json.dumps(depths)) + + # provenance.cursor.execute(f"""SELECT location.path + # FROM content_in_dir + # JOIN location + # ON content_in_dir.loc=location.id""") + # depths = list(map(lambda row: os.fsdecode(row[0]).count('/'), provenance.cursor.fetchall())) + # with io.open("content_in_dir.json", "w") as outfile: + # outfile.write(json.dumps(depths)) + + + + # Query the 'limit' most common files inside any isochrone frontier. # f"SELECT blob, COUNT(blob) AS occur FROM content_early_in_rev GROUP BY blob ORDER BY occur DESC LIMIT {limit}" # Query the 'limit' most common files outside any isochrone frontier. # f"SELECT blob, COUNT(blob) AS occur FROM content_in_dir GROUP BY blob ORDER BY occur DESC LIMIT {limit}" # blob 141557 | occur 34610802 # f"SELECT dir FROM directory_in_rev INNER JOIN location ON loc=location.id WHERE location.path=%s" # f"SELECT blob, COUNT(blob) AS occur FROM content_in_dir GROUP BY blob ORDER BY occur DESC LIMIT {limit}" +# f"SELECT depth, COUNT(depth) AS occur FROM (SELECT ARRAY_LENGTH(STRING_TO_ARRAY(path, '/'), 1) - 1 AS depth FROM location) GROUP BY depth ORDER BY occur ASC" + # f"SELECT path FROM location JOIN content_in_dir ON location.id=content_in_dir.loc WHERE blob=%s GROUP BY path" # f"SELECT ENCODE(location.path::bytea, 'escape'), COUNT(*) FROM content_in_dir INNER JOIN location ON loc=location.id WHERE blob=%s GROUP BY 1 ORDER BY 2 DESC" # f"SELECT ENCODE(sha1::bytea, 'escape') FROM content WHERE id=%s" diff --git a/revisions/client.py b/revisions/client.py index b6af42a..4d40941 100755 --- a/revisions/client.py +++ b/revisions/client.py @@ -1,149 +1,134 @@ #!/usr/bin/env python import logging import subprocess import sys import time import zmq from multiprocessing import Process from threading import Thread from datetime import datetime from swh.model.hashutil import hash_to_bytes, hash_to_hex -from swh.provenance import ( - ArchiveInterface, - get_archive, - get_provenance -) +from swh.provenance import get_archive, get_provenance +from swh.provenance.archive import ArchiveInterface from swh.provenance.provenance import revision_add from swh.provenance.revision import RevisionEntry from typing import Any, Dict # TODO: take this from a configuration file conninfo = { "archive": { - "cls": "ps", + "cls": "direct", "db": { "host": "somerset.internal.softwareheritage.org", "port": "5433", "dbname": "softwareheritage", "user": "guest" } }, "provenance": { - "cls": "ps", + "cls": "local", "db": { "host": "/var/run/postgresql", "port": "5436", - # "dbname": "postgres" + "dbname": "provenance" } }, - "server": "tcp://localhost:5556" } class Client(Process): def __init__(self, idx: int, threads: int, conninfo : Dict[str, Any]): super().__init__() self.idx = idx self.threads = threads self.conninfo = conninfo def run(self): # Using the same archive object for every worker to share internal caches. archive = get_archive(**self.conninfo["archive"]) # Launch as many threads as requested workers = [] for idx in range(self.threads): logging.info(f"Process {self.idx}: launching thread {idx}") worker = Worker(idx, archive, self.conninfo) worker.start() workers.append(worker) # Wait for all threads to complete their work for idx, worker in enumerate(workers): logging.info(f"Process {self.idx}: waiting for thread {idx} to finish") worker.join() logging.info(f"Process {self.idx}: thread {idx} finished executing") class Worker(Thread): def __init__(self, idx: int, archive : ArchiveInterface, conninfo : Dict[str, Any]): super().__init__() self.idx = idx self.archive = archive self.server = conninfo["server"] # Each worker has its own provenance object to isolate # the processing of each revision. self.provenance = get_provenance(**conninfo["provenance"]) def run(self): context = zmq.Context() socket = context.socket(zmq.REQ) socket.connect(self.server) while True: socket.send(b"NEXT") message = socket.recv_json() if message is None: break revision = RevisionEntry( self.archive, hash_to_bytes(message["rev"]), date=datetime.fromisoformat(message["date"]), root=hash_to_bytes(message["root"]) ) revision_add(self.provenance, self.archive, revision) if __name__ == "__main__": # Check parameters - if len(sys.argv) < 3: - print("usage: client ") + if len(sys.argv) != 3: + print("usage: client ") exit(-1) processes = int(sys.argv[1]) - threads = int(sys.argv[2]) - dbname = f"proc{processes}thread{threads}" + port = int(sys.argv[2]) + threads = 1 # int(sys.argv[2]) + dbname = conninfo["provenance"]["db"]["dbname"] + conninfo["server"] = f"tcp://localhost:{port}" # Set logging level # logging.getLogger().setLevel(logging.INFO) - # Create database - logging.info(f"MAIN: creating provenance database {dbname}") - status = subprocess.run( - ["swh", "provenance", "create", "--name", dbname], - capture_output=True - ) - if status.returncode != 0: - logging.error("Failed to create provenance database") - exit(-1) - logging.info(f"MAIN: database {dbname} successfuly created") - - conninfo["provenance"]["db"]["dbname"] = dbname - # Start counter start = time.time() # Launch as many clients as requested clients = [] for idx in range(processes): logging.info(f"MAIN: launching process {idx}") client = Client(idx, threads, conninfo) client.start() clients.append(client) # Wait for all processes to complete their work for idx, client in enumerate(clients): logging.info(f"MAIN: waiting for process {idx} to finish") client.join() logging.info(f"MAIN: process {idx} finished executing") # Stop counter and report elapsed time stop = time.time() print("Elapsed time:", stop-start, "seconds") diff --git a/revisions/server.py b/revisions/server.py index fa7aa09..fc066d9 100755 --- a/revisions/server.py +++ b/revisions/server.py @@ -1,63 +1,151 @@ #!/usr/bin/env python +import io +import itertools +import json +import logging +import subprocess import sys import zmq from swh.model.hashutil import hash_to_hex -from swh.provenance import get_archive -from swh.provenance.revision import FileRevisionIterator +from swh.provenance import get_archive, get_provenance +from swh.provenance.provenance import ProvenanceInterface +from swh.provenance.revision import CSVRevisionIterator # TODO: take this from a configuration file conninfo = { "archive": { - "cls": "ps", + "cls": "direct", "db": { "host": "somerset.internal.softwareheritage.org", "port": "5433", "dbname": "softwareheritage", "user": "guest" } - } + }, + "provenance": { + "cls": "local", + "db": { + "host": "/var/run/postgresql", + "port": "5436", + "dbname": "provenance" + } + }, } -if __name__ == "__main__": - # Set minimum logging level to INFO. - logging.getLogger().setLevel(logging.INFO) - if len(sys.argv) < 2: - print("usage: server [limit]") +def get_tables_stats(provenance: ProvenanceInterface): + tables = { + "content": dict(), + "content_early_in_rev": dict(), + "content_in_dir": dict(), + "directory": dict(), + "directory_in_rev": dict(), + "location": dict(), + "revision": dict() + } + + for table in tables: + provenance.cursor.execute(f"SELECT COUNT(*) FROM {table}") + tables[table]["row_count"] = provenance.cursor.fetchone()[0] + + provenance.cursor.execute(f"SELECT pg_table_size('{table}')") + tables[table]["table_size"] = provenance.cursor.fetchone()[0] + + provenance.cursor.execute(f"SELECT pg_indexes_size('{table}')") + tables[table]["indexes_size"] = provenance.cursor.fetchone()[0] + + # provenance.cursor.execute(f"SELECT pg_total_relation_size('{table}')") + # relation_size[table] = provenance.cursor.fetchone()[0] + tables[table]["relation_size"] = tables[table]["table_size"] + tables[table]["indexes_size"] + + return tables + + +def init_stats(filename): + tables = [ + "content", + "content_early_in_rev", + "content_in_dir", + "directory", + "directory_in_rev", + "location", + "revision" + ] + + header = ["revisions count"] + for table in tables: + header.append(f"{table} rows") + header.append(f"{table} table size") + header.append(f"{table} index size") + header.append(f"{table} relation size") + + with io.open(filename, "w") as outfile: + outfile.write(','.join(header)) + outfile.write('\n') + + +def write_stats(filename, count, tables): + line = [str(count)] + + for table, stats in tables.items(): + line.append(str(stats["row_count"])) + line.append(str(stats["table_size"])) + line.append(str(stats["indexes_size"])) + line.append(str(stats["relation_size"])) + + with io.open(filename, "a") as outfile: + outfile.write(','.join(line)) + outfile.write('\n') + + +if __name__ == "__main__": + if len(sys.argv) < 3: + print("usage: server [limit]") print("where") print(" filename : csv file containing the list of revisions to be iterated (one per") print(" line): revision sha1, date in ISO format, root directory sha1.") + print(" port : server listening port.") print(" limit : max number of revisions to be retrieved from the file.") + print(" stats : number of iteration after which stats should be taken.") exit(-1) - filename = sys.arv[1] - limit = int(sys.arv[2]) if len(sys.argv) > 2 else None - port = 5556 + filename = sys.argv[1] + port = int(sys.argv[2]) + limit = int(sys.argv[3]) if len(sys.argv) > 3 else None + stats = int(sys.argv[4]) if len(sys.argv) > 3 else None context = zmq.Context() socket = context.socket(zmq.REP) socket.bind(f"tcp://*:{port}") archive = get_archive(**conninfo["archive"]) - revisions = FileRevisionIterator(filename, archive, limit=limit) - while True: - revision = revisions.next() - if revision is None: - break + provenance = get_provenance(**conninfo["provenance"]) + + statsfile = f"stats_{conninfo['provenance']['db']['dbname']}.csv" + if stats is not None: + init_stats(statsfile) + + revisions_provider = ( + line.strip().split(",") for line in open(filename, "r") if line.strip() + ) + + for idx, revision in enumerate(CSVRevisionIterator(revisions_provider, archive, limit=limit)): + if stats is not None and idx != 0 and idx % stats == 0: + write_stats(statsfile, idx, get_tables_stats(provenance)) # Wait for next request from client message = socket.recv() message = { "rev" : hash_to_hex(revision.id), "date" : str(revision.date), "root" : hash_to_hex(revision.root) } socket.send_json(message) while True: # Force all clients to exit message = socket.recv() socket.send_json(None)