diff --git a/swh/provenance/tools/compare-all.py b/swh/provenance/tools/compare-all.py new file mode 100755 index 0000000..42b4614 --- /dev/null +++ b/swh/provenance/tools/compare-all.py @@ -0,0 +1,146 @@ +#!/usr/bin/env python + +import glob +import io +import logging +import os +from typing import Iterable + +from swh.model.hashutil import hash_to_hex +from swh.model.model import Sha1Git +from swh.provenance import get_provenance +from swh.provenance.interface import EntityType, ProvenanceResult + +# TODO: take conninfo as command line arguments. +conninfo1 = { + "cls": "local", + "db": {"host": "/var/run/postgresql", "port": "5436", "dbname": "old"}, +} +conninfo2 = { + "cls": "local", + "db": {"host": "/var/run/postgresql", "port": "5436", "dbname": "provenance"}, +} + + +# Write log file with occurrence detail. +def logdiff(filename: str, occurrences: Iterable[ProvenanceResult]) -> None: + with io.open(filename, "a") as outfile: + for occur in occurrences: + try: + # Try to decode path. + path = os.fsdecode(occur.path).decode("utf-8", "replace") + except: + # Use its raw value if not possible + path = occur.path + outfile.write( + "{blob},{rev},{date},{path}\n".format( + blob=hash_to_hex(occur.content), + rev=hash_to_hex(occur.revision), + date=occur.date, + path=path, + ) + ) + + +# Write log file with list of occurrences. +def loglist(filename: str, occurrences: Iterable[Sha1Git]) -> None: + with io.open(filename, "a") as outfile: + for sha1 in occurrences: + outfile.write("{blob}\n".format(blob=hash_to_hex(sha1))) + + +# Output log file name. +nextidx = None + + +def outfilename(suffix: str) -> str: + global nextidx + basename, _ = os.path.splitext(os.path.basename(os.path.abspath(__file__))) + prefix = os.path.join(os.getcwd(), basename + "-") + if nextidx is None: + nextidx = 0 + for filename in glob.glob(f"{prefix}*.log"): + try: + lastidx = int(filename.strip(prefix).split("-")[0]) + nextidx = max(nextidx, lastidx + 1) + except: + continue + return f"{prefix}{nextidx:02}-{suffix}.log" + + +# Print iterations progress. +# TODO: move to utils module. +def progress( + iteration: int, + total: int, + prefix: str = "Progress:", + suffix: str = "Complete", + decimals: int = 1, + length: int = 50, + fill: str = "█", + printEnd: str = "\r", +): + """ + Call in a loop to create terminal progress bar + @params: + iteration - Required : current iteration (Int) + total - Required : total iterations (Int) + prefix - Optional : prefix string (Str) + suffix - Optional : suffix string (Str) + decimals - Optional : positive number of decimals in percent complete (Int) + length - Optional : character length of bar (Int) + fill - Optional : bar fill character (Str) + printEnd - Optional : end character (e.g. "\r", "\r\n") (Str) + """ + percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total))) + filledLength = int(length * iteration // total) + bar = fill * filledLength + "-" * (length - filledLength) + print(f"\r{prefix} |{bar}| {percent}% {suffix}", end=printEnd) + # Print New Line on Complete + if iteration == total: + print() + + +if __name__ == "__main__": + # Set minimum logging level to INFO. + logging.getLogger().setLevel(logging.INFO) + + # Get provenance object for both databases and query its lists of content. + with get_provenance(**conninfo1) as provenance1: + with get_provenance(**conninfo2) as provenance2: + content1 = provenance1.storage.entity_get_all(EntityType.CONTENT) + content2 = provenance2.storage.entity_get_all(EntityType.CONTENT) + + if content1 == content2: + # If lists of content match, we check that occurrences does as well. + total = len(content1) + progress(0, total) + + mismatch = False + # Iterate over all content querying all its occurrences on both + # databases. + for i, sha1 in enumerate(content1): + occurrences1 = list(provenance1.content_find_all(sha1)) + occurrences2 = list(provenance2.content_find_all(sha1)) + + # If there is a mismatch log it to file. + if len(occurrences1) != len(occurrences2) or set( + occurrences1 + ) != set(occurrences2): + mismatch = True + logging.warning( + f"Occurrencies mismatch for {hash_to_hex(sha1)}" + ) + logdiff(outfilename(conninfo1["db"]["dbname"]), occurrences1) + logdiff(outfilename(conninfo2["db"]["dbname"]), occurrences2) + + progress(i + 1, total) + + if not mismatch: + logging.info("Databases are equivalent!") + + else: + # If lists of content don't match, we are done. + loglist(outfilename(conninfo1["db"]["dbname"]), content1) + loglist(outfilename(conninfo2["db"]["dbname"]), content2) + logging.warning("Content lists are different") diff --git a/swh/provenance/tools/compare-first.py b/swh/provenance/tools/compare-first.py new file mode 100755 index 0000000..8ad7844 --- /dev/null +++ b/swh/provenance/tools/compare-first.py @@ -0,0 +1,145 @@ +#!/usr/bin/env python + +import glob +import io +import logging +import os +from typing import Iterable + +from swh.model.hashutil import hash_to_hex +from swh.model.model import Sha1Git +from swh.provenance import get_provenance +from swh.provenance.interface import EntityType, ProvenanceResult + +# TODO: take conninfo as command line arguments. +conninfo1 = { + "cls": "local", + "db": {"host": "/var/run/postgresql", "port": "5436", "dbname": "old"}, +} +conninfo2 = { + "cls": "local", + "db": {"host": "/var/run/postgresql", "port": "5436", "dbname": "provenance"}, +} + + +# Write log file with occurrence detail. +def logdiff(filename: str, occurrence: ProvenanceResult) -> None: + with io.open(filename, "a") as outfile: + try: + # Try to decode path. + path = os.fsdecode(occurrence.path).decode("utf-8", "replace") + except: + # Use its raw value if not possible + path = occurrence.path + outfile.write( + "{blob},{rev},{date},{path}\n".format( + blob=hash_to_hex(occurrence.content), + rev=hash_to_hex(occurrence.revision), + date=occurrence.date, + path=path, + ) + ) + + +# Write log file with list of occurrences. +def loglist(filename: str, occurrences: Iterable[Sha1Git]) -> None: + with io.open(filename, "a") as outfile: + for sha1 in occurrences: + outfile.write("{blob}\n".format(blob=hash_to_hex(sha1))) + + +# Output log file name. +nextidx = None + + +def outfilename(suffix: str) -> str: + global nextidx + basename, _ = os.path.splitext(os.path.basename(os.path.abspath(__file__))) + prefix = os.path.join(os.getcwd(), basename + "-") + if nextidx is None: + nextidx = 0 + for filename in glob.glob(f"{prefix}*.log"): + try: + lastidx = int(filename.strip(prefix).split("-")[0]) + nextidx = max(nextidx, lastidx + 1) + except: + continue + return f"{prefix}{nextidx:02}-{suffix}.log" + + +# Print iterations progress. +# TODO: move to utils module. +def progress( + iteration: int, + total: int, + prefix: str = "Progress:", + suffix: str = "Complete", + decimals: int = 1, + length: int = 50, + fill: str = "█", + printEnd: str = "\r", +): + """ + Call in a loop to create terminal progress bar + @params: + iteration - Required : current iteration (Int) + total - Required : total iterations (Int) + prefix - Optional : prefix string (Str) + suffix - Optional : suffix string (Str) + decimals - Optional : positive number of decimals in percent complete (Int) + length - Optional : character length of bar (Int) + fill - Optional : bar fill character (Str) + printEnd - Optional : end character (e.g. "\r", "\r\n") (Str) + """ + percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total))) + filledLength = int(length * iteration // total) + bar = fill * filledLength + "-" * (length - filledLength) + print(f"\r{prefix} |{bar}| {percent}% {suffix}", end=printEnd) + # Print New Line on Complete + if iteration == total: + print() + + +if __name__ == "__main__": + # Set minimum logging level to INFO. + logging.getLogger().setLevel(logging.INFO) + + # Get provenance object for both databases and query its lists of content. + with get_provenance(**conninfo1) as provenance1: + with get_provenance(**conninfo2) as provenance2: + content1 = provenance1.storage.entity_get_all(EntityType.CONTENT) + content2 = provenance2.storage.entity_get_all(EntityType.CONTENT) + + if content1 == content2: + # If lists of content match, we check that occurrences does as well. + total = len(content1) + progress(0, total) + + mismatch = False + # Iterate over all content querying its first occurrences + # on both databases. + for i, sha1 in enumerate(content1): + occurrence1 = provenance1.content_find_first(sha1) + occurrence2 = provenance2.content_find_first(sha1) + + # If there is a mismatch log it to file. We can only compare the + # timestamp as the same blob might be seen for the first time in + # different locations. + if occurrence1.date != occurrence2.date: + mismatch = True + logging.warning( + f"Occurrencies mismatch for {hash_to_hex(sha1)}" + ) + logdiff(outfilename(conninfo1["db"]["dbname"]), occurrence1) + logdiff(outfilename(conninfo2["db"]["dbname"]), occurrence2) + + progress(i + 1, total) + + if not mismatch: + logging.info("Databases are equivalent!") + + else: + # If lists of content don't match, we are done. + loglist(outfilename(conninfo1["db"]["dbname"]), content1) + loglist(outfilename(conninfo2["db"]["dbname"]), content2) + logging.warning("Content lists are different") diff --git a/swh/provenance/tools/count.py b/swh/provenance/tools/count.py new file mode 100755 index 0000000..b0b7312 --- /dev/null +++ b/swh/provenance/tools/count.py @@ -0,0 +1,19 @@ +#!/usr/bin/env python + +import io +import sys + + +def linecount(filename: str) -> None: + count = 0 + for _ in io.open(filename).xreadlines(): + count += 1 + return count + + +if __name__ == "__main__": + if len(sys.argv) != 2: + print("usage: count ") + exit(-1) + + print(linecount(sys.argv[1])) diff --git a/swh/provenance/tools/dump.py b/swh/provenance/tools/dump.py new file mode 100755 index 0000000..530770e --- /dev/null +++ b/swh/provenance/tools/dump.py @@ -0,0 +1,137 @@ +#!/usr/bin/env python + +import logging +import os + +from swh.model.hashutil import hash_to_hex +from swh.provenance import get_provenance +from swh.provenance.postgresql.provenance import ProvenanceStoragePostgreSql + +# TODO: take conninfo as command line arguments. +conninfo = { + "cls": "local", + "db": {"host": "/var/run/postgresql", "port": "5436", "dbname": "provenance"}, +} + + +def dump(type, hash, time, path="", header="", table=""): + return ( + f"{str(header).ljust(5)} | " + f"{str(table).ljust(5)} | " + f"{str(path).ljust(30)} | " + f"{type} {hash_to_hex(hash)} | " + f"{str(time).rjust(10)}" + ) + + +if __name__ == "__main__": + # Set minimum logging level to INFO. + logging.getLogger().setLevel(logging.INFO) + + # Get provenance object. + with get_provenance(**conninfo) as provenance: + # TODO: use ProvenanceStorageInterface instead! + assert isinstance(provenance.storage, ProvenanceStoragePostgreSql) + + with provenance.storage.transaction() as cursor: + cursor.execute("""SELECT sha1, date FROM revision ORDER BY date""") + revisions = list(cursor.fetchall()) + + for idx, (revision, date) in enumerate(revisions): + # Display current revision information. + header = f"R{idx:04}" + timestamp = date.timestamp() + print(f"{timestamp} {hash_to_hex(revision)} {header}") + print(dump("R", revision, timestamp, header=header)) + + # Display content found early in current revision. + with provenance.storage.transaction() as cursor: + cursor.execute( + """SELECT content.sha1 AS blob, + content.date AS date, + content_location.path AS path + FROM (SELECT content_in_rev.blob, + location.path + FROM (SELECT content_early_in_rev.blob, + content_early_in_rev.loc + FROM content_early_in_rev + JOIN revision + ON revision.id=content_early_in_rev.rev + WHERE revision.sha1=%s + ) AS content_in_rev + JOIN location + ON location.id=content_in_rev.loc + ) AS content_location + JOIN content + ON content.id=content_location.blob + ORDER BY path""", + (revision,), + ) + content = list(cursor.fetchall()) + + for blob, date, path in content: + delta = date.timestamp() - timestamp + location = os.fsdecode(path) + print(dump("C", blob, delta, path=location, table="R---C")) + + # Display isochrone frontiers found in current revision. + cursor.execute( + """SELECT directory.sha1 AS dir, + directory.date AS date, + directory_location.path AS path + FROM (SELECT isochrone_frontier.dir, + location.path + FROM (SELECT directory_in_rev.dir, + directory_in_rev.loc + FROM directory_in_rev + JOIN revision + ON revision.id=directory_in_rev.rev + WHERE revision.sha1=%s + ) AS isochrone_frontier + JOIN location + ON location.id=isochrone_frontier.loc + ) AS directory_location + JOIN directory + ON directory.id=directory_location.dir + ORDER BY path""", + (revision,), + ) + directories = list(cursor.fetchall()) + + for directory, date, path in directories: + delta = date.timestamp() - timestamp + location = os.fsdecode(path) + "/" + if location == "/": + location = "./" + print(dump("D", directory, delta, path=location, table="R-D ")) + + # Display content found outside the current isochrone frontier. + cursor.execute( + """SELECT content.sha1 AS blob, + content.date AS date, + content_location.path AS path + FROM (SELECT content_outside.blob, + location.path + FROM (SELECT content_in_dir.blob, + content_in_dir.loc + FROM content_in_dir + JOIN directory + ON directory.id=content_in_dir.dir + WHERE directory.sha1=%s + ) AS content_outside + JOIN location + ON location.id=content_outside.loc + ) AS content_location + JOIN content + ON content.id=content_location.blob + ORDER BY path""", + (directory,), + ) + content = list(cursor.fetchall()) + + for blob, date, path in content: + delta = date.timestamp() - timestamp + location = " + " + os.fsdecode(path) + print(dump("C", blob, delta, path=location, table=" D-C")) + + print("") diff --git a/swh/provenance/tools/find-blob.py b/swh/provenance/tools/find-blob.py new file mode 100755 index 0000000..2652c9e --- /dev/null +++ b/swh/provenance/tools/find-blob.py @@ -0,0 +1,66 @@ +#!/usr/bin/env python + +import logging +import os +import sys + +from swh.model.cli import identify_object +from swh.model.hashutil import hash_to_bytes, hash_to_hex +from swh.provenance import get_provenance + +# TODO: take conninfo as command line arguments. +conninfo = { + "cls": "local", + "db": {"host": "/var/run/postgresql", "port": "5436", "dbname": "provenance"}, +} + + +if __name__ == "__main__": + # Set minimum logging level to INFO. + logging.getLogger().setLevel(logging.INFO) + + if len(sys.argv) < 2: + print("usage: find-blob [limit]") + exit(-1) + + obj, swhid = identify_object("content", True, True, sys.argv[1]) + sha1 = hash_to_bytes(swhid.split(":")[-1]) + print(f"Identifier of object {obj}: {swhid}") + + limit = sys.argv[2] if len(sys.argv) > 2 else None + + # Get provenance object. + with get_provenance(**conninfo) as provenance: + first = provenance.content_find_first(sha1) + + if first is not None: + print( + "======================================================================" + ) + print(f"First occurrence of {obj}:") + print( + f" content: swh:1:cnt:{hash_to_hex(first[0])}," + f" revision: swh:1:rev:{hash_to_hex(first[1])}," + f" date: {first[2]}," + f" location: {os.fsdecode(first[3])}" + ) + + print( + "======================================================================" + ) + if limit is None: + print(f"All occurrences of {obj}:") + else: + print(f"First {limit} occurrences of {obj}:") + for occur in provenance.content_find_all(sha1, limit=limit): + print( + f" content: swh:1:cnt:{hash_to_hex(occur[0])}," + f" revision: swh:1:rev:{hash_to_hex(occur[1])}," + f" date: {occur[2]}," + f" location: {os.fsdecode(occur[3])}" + ) + + else: + logging.warning( + "Requested content not available in the provenance database." + ) diff --git a/swh/provenance/tools/histogram.py b/swh/provenance/tools/histogram.py new file mode 100755 index 0000000..9e2e629 --- /dev/null +++ b/swh/provenance/tools/histogram.py @@ -0,0 +1,52 @@ +#!/usr/bin/env python + +import io + +from swh.provenance import get_provenance +from swh.provenance.postgresql.provenance import ProvenanceStoragePostgreSql + +# TODO: take conninfo as command line arguments. +conninfo = { + "cls": "local", + "db": {"host": "/var/run/postgresql", "port": "5436", "dbname": "provenance"}, +} + + +if __name__ == "__main__": + # Get provenance object. + with get_provenance(**conninfo) as provenance: + # TODO: use ProvenanceStorageInterface instead! + assert isinstance(provenance.storage, ProvenanceStoragePostgreSql) + + tables = ["directory_in_rev", "content_in_dir"] + + for table in tables: + with provenance.storage.transaction() as cursor: + cursor.execute( + f"""SELECT depths.depth, COUNT(depths.depth) + FROM (SELECT + CASE location.path + WHEN '' THEN 0 + WHEN '.' THEN 0 + ELSE 1 + + CHAR_LENGTH(ENCODE(location.path, 'escape')) + - CHAR_LENGTH( + REPLACE( + ENCODE(location.path, 'escape'), '/', '' + ) + ) + END AS depth + FROM {table} + JOIN location + ON {table}.loc=location.id + ) AS depths + GROUP BY depths.depth + ORDER BY depths.depth""" + ) + + filename = "depths_" + conninfo["db"]["dbname"] + f"_{table}.csv" + + with io.open(filename, "w") as outfile: + outfile.write(f"{table} depth,{table} count\n") + for depth, count in cursor.fetchall(): + outfile.write(f"{depth},{count}\n") diff --git a/swh/provenance/tools/metrics.py b/swh/provenance/tools/metrics.py new file mode 100755 index 0000000..25d6123 --- /dev/null +++ b/swh/provenance/tools/metrics.py @@ -0,0 +1,158 @@ +#!/usr/bin/env python + +from swh.provenance import get_provenance +from swh.provenance.postgresql.provenance import ProvenanceStoragePostgreSql +from swh.provenance.provenance import ProvenanceInterface + +# TODO: take conninfo as command line arguments. +conninfo = { + "cls": "local", + "db": {"host": "/var/run/postgresql", "port": "5436", "dbname": "provenance"}, +} + + +def get_tables_stats(provenance: ProvenanceInterface): + # TODO: use ProvenanceStorageInterface instead! + assert isinstance(provenance.storage, ProvenanceStoragePostgreSql) + + tables = { + "content": dict(), + "content_early_in_rev": dict(), + "content_in_dir": dict(), + "directory": dict(), + "directory_in_rev": dict(), + "location": dict(), + "revision": dict(), + } + + for table in tables: + with provenance.storage.transaction() as cursor: + cursor.execute(f"SELECT COUNT(*) FROM {table}") + tables[table]["row_count"] = cursor.fetchone()[0] + + cursor.execute(f"SELECT pg_table_size('{table}')") + tables[table]["table_size"] = cursor.fetchone()[0] + + cursor.execute(f"SELECT pg_indexes_size('{table}')") + tables[table]["indexes_size"] = cursor.fetchone()[0] + + # cursor.execute(f"SELECT pg_total_relation_size('{table}')") + # relation_size[table] = cursor.fetchone()[0] + tables[table]["relation_size"] = ( + tables[table]["table_size"] + tables[table]["indexes_size"] + ) + + return tables + + +if __name__ == "__main__": + # Get provenance object. + with get_provenance(**conninfo) as provenance: + # TODO: use ProvenanceStorageInterface instead! + assert isinstance(provenance.storage, ProvenanceStoragePostgreSql) + + tables = get_tables_stats(provenance) + + for table in tables: + row_count = tables[table]["row_count"] + table_size = tables[table]["table_size"] + indexes_size = tables[table]["indexes_size"] + relation_size = tables[table]["relation_size"] + + print(f"{table}:") + print(f" total rows: {row_count}") + if row_count == 0: + row_count = 1 + print( + f" table size: {table_size} bytes ({table_size / row_count:.2f} per row)" + ) + print( + f" index size: {indexes_size} bytes ({indexes_size / row_count:.2f} per row)" + ) + print( + f" total size: {relation_size} bytes ({relation_size / row_count:.2f} per row)" + ) + + # Ratios between de different entities/relations. + print("ratios:") + print( + f" content/revision: {tables['content']['row_count'] / (tables['revision']['row_count'] if tables['revision']['row_count'] != 0 else 1):.2f}" + ) + print( + f" content_early_in_rev/content: {tables['content_early_in_rev']['row_count'] / (tables['content']['row_count'] if tables['content']['row_count'] != 0 else 1):.2f}" + ) + print( + f" content_in_dir/content: {tables['content_in_dir']['row_count'] / (tables['content']['row_count'] if tables['content']['row_count'] != 0 else 1):.2f}" + ) + print( + f" directory/revision: {tables['directory']['row_count'] / (tables['revision']['row_count'] if tables['revision']['row_count'] != 0 else 1):.2f}" + ) + print( + f" directory_in_rev/directory: {tables['directory_in_rev']['row_count'] / (tables['directory']['row_count'] if tables['directory']['row_count'] != 0 else 1):.2f}" + ) + print(f" ==============================") + print( + f" content_early_in_rev/revision: {tables['content_early_in_rev']['row_count'] / (tables['revision']['row_count'] if tables['revision']['row_count'] != 0 else 1):.2f}" + ) + print( + f" content_in_dir/directory: {tables['content_in_dir']['row_count'] / (tables['directory']['row_count'] if tables['directory']['row_count'] != 0 else 1):.2f}" + ) + print( + f" directory_in_rev/revision: {tables['directory_in_rev']['row_count'] / (tables['revision']['row_count'] if tables['revision']['row_count'] != 0 else 1):.2f}" + ) + + # Metrics for frontiers defined in root directories. + with provenance.storage.transaction() as cursor: + cursor.execute( + f"""SELECT dir + FROM directory_in_rev + INNER JOIN location + ON loc=location.id + WHERE location.path=%s""", + (b"",), + ) + directories = list(cursor.fetchall()) + print(f"Total root frontiers used: {len(directories)}") + + cursor.execute( + f"""SELECT dir + FROM directory_in_rev + INNER JOIN location + ON loc=location.id + WHERE location.path=%s + GROUP BY dir""", + (b"",), + ) + directories = list(cursor.fetchall()) + print(f"Total distinct root frontiers: {len(directories)}") + + cursor.execute( + f"""SELECT roots.dir + FROM (SELECT dir, loc + FROM directory_in_rev + INNER JOIN location + ON loc=location.id + WHERE location.path=%s) AS roots + JOIN directory_in_rev + ON directory_in_rev.dir=roots.dir + WHERE directory_in_rev.loc!=roots.loc""", + (b"",), + ) + directories = list(cursor.fetchall()) + print(f"Total other uses of these frontiers: {len(directories)}") + + cursor.execute( + f"""SELECT roots.dir + FROM (SELECT dir, loc + FROM directory_in_rev + INNER JOIN location + ON loc=location.id + WHERE location.path=%s) AS roots + JOIN directory_in_rev + ON directory_in_rev.dir=roots.dir + WHERE directory_in_rev.loc!=roots.loc + GROUP BY roots.dir""", + (b"",), + ) + directories = list(cursor.fetchall()) + print(f"Total distinct other uses of frontiers: {len(directories)}") diff --git a/swh/provenance/tools/origins/client.py b/swh/provenance/tools/origins/client.py new file mode 100755 index 0000000..c48a0df --- /dev/null +++ b/swh/provenance/tools/origins/client.py @@ -0,0 +1,104 @@ +#!/usr/bin/env python + +import logging +import logging.handlers +import multiprocessing +import os +import sys +import time +from typing import Any, Callable, Dict, List, Optional + +from swh.core import config +from swh.model.hashutil import hash_to_bytes +from swh.provenance import get_archive, get_provenance +from swh.provenance.origin import OriginEntry, origin_add +import yaml +import zmq + +CONFIG_ENVVAR = "SWH_CONFIG_FILENAME" + +DEFAULT_PATH = os.environ.get(CONFIG_ENVVAR, None) + + +class Client(multiprocessing.Process): + def __init__( + self, + conf: Dict[str, Any], + group: None = None, + target: Optional[Callable[..., Any]] = ..., + name: Optional[str] = ..., + ) -> None: + super().__init__(group=group, target=target, name=name) + self.archive_conf = conf["archive"] + self.storage_conf = conf["storage"] + self.url = f"tcp://{conf['org_server']['host']}:{conf['org_server']['port']}" + logging.info(f"Client {self.name} created") + + def run(self): + logging.info(f"Client {self.name} started") + # XXX: should we reconnect on each iteration to save resources? + archive = get_archive(**self.archive_conf) + + context = zmq.Context() + socket: zmq.Socket = context.socket(zmq.REQ) + socket.connect(self.url) + + with get_provenance(**self.storage_conf) as provenance: + while True: + socket.send(b"NEXT") + response = socket.recv_json() + + if response is None: + break + + batch = [] + for origin in response: + batch.append( + OriginEntry(origin["url"], hash_to_bytes(origin["snapshot"])) + ) + origin_add(provenance, archive, batch) + logging.info(f"Client {self.name} stopped") + + +if __name__ == "__main__": + # Check parameters + if len(sys.argv) != 2: + print("usage: client ") + exit(-1) + + processes = int(sys.argv[1]) + + config_file = None # TODO: add as a cli option + if ( + config_file is None + and DEFAULT_PATH is not None + and config.config_exists(DEFAULT_PATH) + ): + config_file = DEFAULT_PATH + + if config_file is None or not os.path.exists(config_file): + print("No configuration provided") + exit(-1) + + conf = yaml.safe_load(open(config_file, "rb"))["provenance"] + + # Start counter + start = time.time() + + # Launch as many clients as requested + clients: List[Client] = [] + for idx in range(processes): + logging.info(f"MAIN: launching process {idx}") + client = Client(conf, name=f"worker{idx}") + client.start() + clients.append(client) + + # Wait for all processes to complete their work + for client in clients: + logging.info(f"MAIN: waiting for process {client.name} to finish") + client.join() + logging.info(f"MAIN: process {client.name} finished executing") + + # Stop counter and report elapsed time + stop = time.time() + print("Elapsed time:", stop - start, "seconds") diff --git a/swh/provenance/tools/origins/server.py b/swh/provenance/tools/origins/server.py new file mode 100755 index 0000000..2bf9bde --- /dev/null +++ b/swh/provenance/tools/origins/server.py @@ -0,0 +1,234 @@ +#!/usr/bin/env python + +from datetime import datetime, timezone +from enum import Enum +import gzip +import io +import os +import queue +import sys +import threading +import time +from typing import Any, Callable, Dict, List, Optional + +from swh.core import config +from swh.provenance import get_provenance +from swh.provenance.postgresql.provenance import ProvenanceStoragePostgreSql +import yaml +import zmq + +CONFIG_ENVVAR = "SWH_CONFIG_FILENAME" + +DEFAULT_BATCH_SIZE = 1 +DEFAULT_PATH = os.environ.get(CONFIG_ENVVAR, None) +DEFAULT_PORT = 5555 +DEFAULT_STATS_RATE = 300 +DEFAULT_SKIP_VALUE = 0 + +UTCEPOCH = datetime.fromtimestamp(0, timezone.utc) + + +class Command(Enum): + TERMINATE = "terminate" + + +class StatsWorker(threading.Thread): + def __init__( + self, + filename: str, + storage_conf: Dict[str, Any], + timeout: float = DEFAULT_STATS_RATE, + group: None = None, + target: Optional[Callable[..., Any]] = ..., + name: Optional[str] = ..., + ) -> None: + super().__init__(group=group, target=target, name=name) + self.filename = filename + self.queue = queue.Queue() + self.storage_conf = storage_conf + self.timeout = timeout + + def get_tables_stats(self, tables: List[str]) -> Dict[str, int]: + # TODO: use ProvenanceStorageInterface instead! + with get_provenance(**self.storage_conf) as provenance: + assert isinstance(provenance.storage, ProvenanceStoragePostgreSql) + stats = {} + for table in tables: + with provenance.storage.transaction(readonly=True) as cursor: + cursor.execute(f"SELECT COUNT(*) AS count FROM {table}") + stats[table] = cursor.fetchone()["count"] + return stats + + def init_stats(self, filename: str) -> List[str]: + tables = [ + "origin", + "revision", + "revision_in_origin", + "revision_before_revision", + ] + header = ["datetime"] + for table in tables: + header.append(f"{table} rows") + with io.open(filename, "w") as outfile: + outfile.write(",".join(header)) + outfile.write("\n") + return tables + + def run(self) -> None: + tables = self.init_stats(self.filename) + start = time.monotonic() + while True: + now = time.monotonic() + if now - start > self.timeout: + self.write_stats(self.filename, self.get_tables_stats(tables)) + start = now + try: + cmd = self.queue.get(timeout=1) + if cmd == Command.TERMINATE: + break + except queue.Empty: + continue + + def stop(self) -> None: + self.queue.put(Command.TERMINATE) + self.join() + + def write_stats(self, filename: str, stats: Dict[str, int]) -> None: + line = [str(datetime.now())] + for _, stat in stats.items(): + line.append(str(stat)) + with io.open(filename, "a") as outfile: + outfile.write(",".join(line)) + outfile.write("\n") + + +class OriginWorker(threading.Thread): + def __init__( + self, + filename: str, + url: str, + batch_size: int = DEFAULT_BATCH_SIZE, + limit: Optional[int] = None, + skip: int = DEFAULT_SKIP_VALUE, + group: None = None, + target: Optional[Callable[..., Any]] = ..., + name: Optional[str] = ..., + ) -> None: + super().__init__(group=group, target=target, name=name) + self.filename = filename + self.batch_size = batch_size + self.limit = limit + self.queue = queue.Queue() + self.skip = skip + self.url = url + + def run(self) -> None: + context = zmq.Context() + socket: zmq.Socket = context.socket(zmq.REP) + socket.bind(self.url) + + # TODO: improve this using a context manager + file = ( + io.open(self.filename, "r") + if os.path.splitext(self.filename)[1] == ".csv" + else gzip.open(self.filename, "rt") + ) + provider = ( + line.strip().rsplit(",", maxsplit=1) for line in file if line.strip() + ) + + count = 0 + while True: + if self.limit is not None and count > self.limit: + break + + response = [] + for url, snapshot in provider: + count += 1 + if count <= self.skip: + continue + response.append({"url": url, "snapshot": snapshot}) + if len(response) == self.batch_size: + break + if not response: + break + + # Wait for next request from client + # (TODO: make it non-blocking or add timeout) + socket.recv() + socket.send_json(response) + + try: + cmd = self.queue.get(block=False) + if cmd == Command.TERMINATE: + break + except queue.Empty: + continue + + while True: # TODO: improve shutdown logic + socket.recv() + socket.send_json(None) + # context.term() + + def stop(self) -> None: + self.queue.put(Command.TERMINATE) + self.join() + + +if __name__ == "__main__": + # TODO: improve command line parsing + if len(sys.argv) < 2: + print("usage: server ") + print("where") + print( + " filename : csv file containing the list of origins to be iterated (one per" + ) + print(" line): origin url, snapshot sha1.") + exit(-1) + + config_file = None # TODO: Add as a cli option + if ( + config_file is None + and DEFAULT_PATH is not None + and config.config_exists(DEFAULT_PATH) + ): + config_file = DEFAULT_PATH + + if config_file is None or not os.path.exists(config_file): + print("No configuration provided") + exit(-1) + + conf = yaml.safe_load(open(config_file, "rb"))["provenance"] + + # Init stats + stats = conf["org_server"].pop("stats", None) + if stats is not None: + storage_conf = ( + conf["storage"]["storage_config"] + if conf["storage"]["cls"] == "rabbitmq" + else conf["storage"] + ) + statsfile = f"stats_{datetime.now()}_{stats.pop('suffix')}" + statsworker = StatsWorker(statsfile, storage_conf, **stats) + statsworker.start() + + # Init origin provider + orgsfile = sys.argv[1] + host = conf["org_server"].pop("host", None) + url = f"tcp://*:{conf['org_server'].pop('port', DEFAULT_PORT)}" + orgsworker = OriginWorker(orgsfile, url, **conf["org_server"]) + orgsworker.start() + + # Wait for user commands + while True: + try: + command = input("Enter EXIT to stop service: ") + if command.lower() == "exit": + break + except KeyboardInterrupt: + pass + + # Release resources + orgsworker.stop() + if stats is not None: + statsworker.stop() diff --git a/swh/provenance/tools/revisions/client.py b/swh/provenance/tools/revisions/client.py new file mode 100755 index 0000000..e562547 --- /dev/null +++ b/swh/provenance/tools/revisions/client.py @@ -0,0 +1,141 @@ +#!/usr/bin/env python + +from datetime import timezone +import logging +import logging.handlers +import multiprocessing +import os +import sys +import time +from typing import Any, Callable, Dict, List, Optional + +import iso8601 +from swh.core import config +from swh.model.hashutil import hash_to_bytes +from swh.provenance import get_archive, get_provenance +from swh.provenance.revision import RevisionEntry, revision_add +import yaml +import zmq + +CONFIG_ENVVAR = "SWH_CONFIG_FILENAME" + +DEFAULT_PATH = os.environ.get(CONFIG_ENVVAR, None) + + +class Client(multiprocessing.Process): + def __init__( + self, + conf: Dict[str, Any], + trackall: bool, + flatten: bool, + lower: bool, + mindepth: int, + group: None = None, + target: Optional[Callable[..., Any]] = ..., + name: Optional[str] = ..., + ) -> None: + super().__init__(group=group, target=target, name=name) + self.archive_conf = conf["archive"] + self.storage_conf = conf["storage"] + self.url = f"tcp://{conf['rev_server']['host']}:{conf['rev_server']['port']}" + self.trackall = trackall + self.flatten = flatten + self.lower = lower + self.mindepth = mindepth + logging.info(f"Client {self.name} created") + + def run(self): + logging.info(f"Client {self.name} started") + # XXX: should we reconnect on each iteration to save resources? + archive = get_archive(**self.archive_conf) + + context = zmq.Context() + socket: zmq.Socket = context.socket(zmq.REQ) + socket.connect(self.url) + + with get_provenance(**self.storage_conf) as provenance: + while True: + socket.send(b"NEXT") + response = socket.recv_json() + + if response is None: + break + + batch = [] + for revision in response: + # Ensure date has a valid timezone + date = iso8601.parse_date(revision["date"]) + if date.tzinfo is None: + date = date.replace(tzinfo=timezone.utc) + batch.append( + RevisionEntry( + hash_to_bytes(revision["rev"]), + date=date, + root=hash_to_bytes(revision["root"]), + ) + ) + revision_add( + provenance, + archive, + batch, + trackall=self.trackall, + flatten=self.flatten, + lower=self.lower, + mindepth=self.mindepth, + ) + logging.info(f"Client {self.name} stopped") + + +if __name__ == "__main__": + # Check parameters + if len(sys.argv) != 6: + print("usage: client ") + exit(-1) + + processes = int(sys.argv[1]) + trackall = sys.argv[2].lower() != "false" + flatten = sys.argv[3].lower() != "false" + lower = sys.argv[4].lower() != "false" + mindepth = int(sys.argv[5]) + + config_file = None # TODO: add as a cli option + if ( + config_file is None + and DEFAULT_PATH is not None + and config.config_exists(DEFAULT_PATH) + ): + config_file = DEFAULT_PATH + + if config_file is None or not os.path.exists(config_file): + print("No configuration provided") + exit(-1) + + conf = yaml.safe_load(open(config_file, "rb"))["provenance"] + + # Start counter + start = time.time() + + # Launch as many clients as requested + clients: List[Client] = [] + for idx in range(processes): + logging.info(f"MAIN: launching process {idx}") + client = Client( + conf, + trackall=trackall, + flatten=flatten, + lower=lower, + mindepth=mindepth, + name=f"worker{idx}", + ) + client.start() + clients.append(client) + + # Wait for all processes to complete their work + for client in clients: + logging.info(f"MAIN: waiting for process {client.name} to finish") + client.join() + logging.info(f"MAIN: process {client.name} finished executing") + + # Stop counter and report elapsed time + stop = time.time() + print("Elapsed time:", stop - start, "seconds") diff --git a/swh/provenance/tools/revisions/server.py b/swh/provenance/tools/revisions/server.py new file mode 100755 index 0000000..9163b00 --- /dev/null +++ b/swh/provenance/tools/revisions/server.py @@ -0,0 +1,242 @@ +#!/usr/bin/env python + +from datetime import datetime, timezone +from enum import Enum +import gzip +import io +import os +import queue +import sys +import threading +import time +from typing import Any, Callable, Dict, List, Optional + +import iso8601 +from swh.core import config +from swh.provenance import get_provenance +from swh.provenance.postgresql.provenance import ProvenanceStoragePostgreSql +import yaml +import zmq + +CONFIG_ENVVAR = "SWH_CONFIG_FILENAME" + +DEFAULT_BATCH_SIZE = 1 +DEFAULT_PATH = os.environ.get(CONFIG_ENVVAR, None) +DEFAULT_PORT = 5556 +DEFAULT_STATS_RATE = 300 +DEFAULT_SKIP_VALUE = 0 + +UTCEPOCH = datetime.fromtimestamp(0, timezone.utc) + + +class Command(Enum): + TERMINATE = "terminate" + + +class StatsWorker(threading.Thread): + def __init__( + self, + filename: str, + storage_conf: Dict[str, Any], + timeout: float = DEFAULT_STATS_RATE, + group: None = None, + target: Optional[Callable[..., Any]] = ..., + name: Optional[str] = ..., + ) -> None: + super().__init__(group=group, target=target, name=name) + self.filename = filename + self.queue = queue.Queue() + self.storage_conf = storage_conf + self.timeout = timeout + + def get_tables_stats(self, tables: List[str]) -> Dict[str, int]: + # TODO: use ProvenanceStorageInterface instead! + with get_provenance(**self.storage_conf) as provenance: + assert isinstance(provenance.storage, ProvenanceStoragePostgreSql) + stats = {} + for table in tables: + with provenance.storage.transaction(readonly=True) as cursor: + cursor.execute(f"SELECT COUNT(*) AS count FROM {table}") + stats[table] = cursor.fetchone()["count"] + with provenance.storage.transaction(readonly=True) as cursor: + cursor.execute(f"SELECT MAX(date) AS date FROM revision") + stats["maxdate"] = cursor.fetchone()["date"] + return stats + + def init_stats(self, filename: str) -> List[str]: + tables = [ + "content", + "content_in_revision", + "content_in_directory", + "directory", + "directory_in_revision", + "location", + "revision", + ] + header = ["datetime"] + for table in tables: + header.append(f"{table} rows") + header.append("revision maxdate") + with io.open(filename, "w") as outfile: + outfile.write(",".join(header)) + outfile.write("\n") + return tables + + def run(self) -> None: + tables = self.init_stats(self.filename) + start = time.monotonic() + while True: + now = time.monotonic() + if now - start > self.timeout: + self.write_stats(self.filename, self.get_tables_stats(tables)) + start = now + try: + cmd = self.queue.get(timeout=1) + if cmd == Command.TERMINATE: + break + except queue.Empty: + continue + + def stop(self) -> None: + self.queue.put(Command.TERMINATE) + self.join() + + def write_stats(self, filename: str, stats: Dict[str, int]) -> None: + line = [str(datetime.now())] + for _, stat in stats.items(): + line.append(str(stat)) + with io.open(filename, "a") as outfile: + outfile.write(",".join(line)) + outfile.write("\n") + + +class RevisionWorker(threading.Thread): + def __init__( + self, + filename: str, + url: str, + batch_size: int = DEFAULT_BATCH_SIZE, + limit: Optional[int] = None, + skip: int = DEFAULT_SKIP_VALUE, + group: None = None, + target: Optional[Callable[..., Any]] = ..., + name: Optional[str] = ..., + ) -> None: + super().__init__(group=group, target=target, name=name) + self.filename = filename + self.batch_size = batch_size + self.limit = limit + self.queue = queue.Queue() + self.skip = skip + self.url = url + + def run(self) -> None: + context = zmq.Context() + socket: zmq.Socket = context.socket(zmq.REP) + socket.bind(self.url) + + # TODO: improve this using a context manager + file = ( + io.open(self.filename, "r") + if os.path.splitext(self.filename)[1] == ".csv" + else gzip.open(self.filename, "rt") + ) + provider = (line.strip().split(",") for line in file if line.strip()) + + count = 0 + while True: + if self.limit is not None and count > self.limit: + break + + response = [] + for rev, date, root in provider: + count += 1 + if count <= self.skip or iso8601.parse_date(date) <= UTCEPOCH: + continue + response.append({"rev": rev, "date": date, "root": root}) + if len(response) == self.batch_size: + break + if not response: + break + + # Wait for next request from client + # (TODO: make it non-blocking or add timeout) + socket.recv() + socket.send_json(response) + + try: + cmd = self.queue.get(block=False) + if cmd == Command.TERMINATE: + break + except queue.Empty: + continue + + while True: # TODO: improve shutdown logic + socket.recv() + socket.send_json(None) + # context.term() + + def stop(self) -> None: + self.queue.put(Command.TERMINATE) + self.join() + + +if __name__ == "__main__": + # TODO: improve command line parsing + if len(sys.argv) < 2: + print("usage: server ") + print("where") + print( + " filename : csv file containing the list of revisions to be iterated (one per" + ) + print( + " line): revision sha1, date in ISO format, root directory sha1." + ) + exit(-1) + + config_file = None # TODO: Add as a cli option + if ( + config_file is None + and DEFAULT_PATH is not None + and config.config_exists(DEFAULT_PATH) + ): + config_file = DEFAULT_PATH + + if config_file is None or not os.path.exists(config_file): + print("No configuration provided") + exit(-1) + + conf = yaml.safe_load(open(config_file, "rb"))["provenance"] + + # Init stats + stats = conf["rev_server"].pop("stats", None) + if stats is not None: + storage_conf = ( + conf["storage"]["storage_config"] + if conf["storage"]["cls"] == "rabbitmq" + else conf["storage"] + ) + statsfile = f"stats_{datetime.now()}_{stats.pop('suffix')}" + statsworker = StatsWorker(statsfile, storage_conf, **stats) + statsworker.start() + + # Init revision provider + revsfile = sys.argv[1] + host = conf["rev_server"].pop("host", None) + url = f"tcp://*:{conf['rev_server'].pop('port', DEFAULT_PORT)}" + revsworker = RevisionWorker(revsfile, url, **conf["rev_server"]) + revsworker.start() + + # Wait for user commands + while True: + try: + command = input("Enter EXIT to stop service: ") + if command.lower() == "exit": + break + except KeyboardInterrupt: + pass + + # Release resources + revsworker.stop() + if stats is not None: + statsworker.stop() diff --git a/swh/provenance/tools/revisions_format.py b/swh/provenance/tools/revisions_format.py new file mode 100755 index 0000000..7f5bf92 --- /dev/null +++ b/swh/provenance/tools/revisions_format.py @@ -0,0 +1,74 @@ +#!/usr/bin/env python + +import gzip +import sys +from typing import IO, Iterable + +import psycopg2 +from swh.core.db import BaseDb +from swh.model.hashutil import hash_to_bytes, hash_to_hex +from swh.model.model import Sha1Git + +conninfo = { + "host": "db.internal.softwareheritage.org", + "dbname": "softwareheritage", + "user": "guest", +} + + +def write_output( + cursor: psycopg2.cursor, ids: Iterable[Sha1Git], outfile: IO[bytes] +) -> None: + cursor.execute( + """SELECT id, date, directory + FROM revision + WHERE id IN %s + AND date IS NOT NULL + ORDER BY date""", + (tuple(ids),), + ) + for rev in cursor.fetchall(): + assert rev is not None, rev + assert rev[1] is not None, rev + outfile.write(f"{hash_to_hex(rev[0])},{rev[1]},{hash_to_hex(rev[2])}\n") + + +if __name__ == "__main__": + if len(sys.argv) != 3: + print("usage: revisions_format ") + exit(-1) + + print(f"Connection to database: {conninfo}...") + conn = BaseDb.connect(**conninfo).conn + BaseDb.adapt_conn(conn) + cursor = conn.cursor() + + infilename = sys.argv[1] + outfilename = sys.argv[2] + + # with io.open(infilename) as infile: + # with io.open(outfilename, "w") as outfile: + # ids = json.loads(infile.read()) + # print(f"Formatting {len(ids)} revisions") + # for id in ids: + # cursor.execute( + # """SELECT id, date, directory + # FROM revision + # WHERE id=%s AND date IS NOT NULL""", + # (hash_to_bytes(id),), + # ) + # rev = cursor.fetchone() + # assert rev is not None + # outfile.write(f"{hash_to_hex(rev[0])},{rev[1]},{hash_to_hex(rev[2])}\n") + + with gzip.open(infilename, "rt") as infile: + with gzip.open(outfilename, "wt") as outfile: + ids = [] + for idx, line in enumerate(infile.readlines(), start=1): + if line.strip(): + ids.append(hash_to_bytes(line.split(",")[0])) + if idx % 100 == 0: + write_output(cursor, ids, outfile) + ids = [] + if ids: + write_output(cursor, ids, outfile) diff --git a/swh/provenance/tools/revisions_pick.py b/swh/provenance/tools/revisions_pick.py new file mode 100755 index 0000000..f54e62a --- /dev/null +++ b/swh/provenance/tools/revisions_pick.py @@ -0,0 +1,60 @@ +#!/usr/bin/env python + +import io +import sys + +import psycopg2 +from swh.core.db import BaseDb +from swh.model.hashutil import hash_to_bytes, hash_to_hex + +conninfo = { + "host": "db.internal.softwareheritage.org", + "dbname": "softwareheritage", + "user": "guest", +} + + +if __name__ == "__main__": + if len(sys.argv) != 2: + print("usage: listrevs ") + exit(-1) + + filename = sys.argv[1] + + print(f"Connection to database: {conninfo}...") + conn: psycopg2.connection = BaseDb.connect(**conninfo).conn + BaseDb.adapt_conn(conn) + cursor = conn.cursor() + + revisions = set( + [ + hash_to_bytes("1363496c1106606684d40447f5d1149b2c66a9f8"), + hash_to_bytes("b91a781cbc1285d441aa682926d93d8c23678b0b"), + hash_to_bytes("313315d9790c36e22bb5bb034e9c7d7f470cdf73"), + hash_to_bytes("a3b54f0f5de1ad17889fd23aee7c230eefc300cd"), + hash_to_bytes("74deb33d12bf275a3b3a9afc833f4760be90f031"), + ] + ) + pending = revisions + + while pending: + cursor.execute( + """SELECT parent_id FROM revision_history WHERE id IN %s""", + (tuple(pending),), + ) + parents = set(map(lambda row: row[0], cursor.fetchall())) + pending = parents - revisions + revisions = revisions | parents + + # print(f"Requesting {count} revisions out of {total} (probability {probability}).") + cursor.execute( + """SELECT id, date, directory FROM revision WHERE id IN %s""", + (tuple(revisions),), + ) + ordered = [row for row in cursor.fetchall() if row[1] is not None] + ordered.sort(key=lambda rev: rev[1]) + + print(f"Obtained {len(ordered)} revisions.") + with io.open(filename, "w") as outfile: + for rev in ordered: + outfile.write(f"{hash_to_hex(rev[0])},{rev[1]},{hash_to_hex(rev[2])}\n") diff --git a/swh/provenance/tools/revisions_sort.py b/swh/provenance/tools/revisions_sort.py new file mode 100755 index 0000000..a4905eb --- /dev/null +++ b/swh/provenance/tools/revisions_sort.py @@ -0,0 +1,47 @@ +#!/usr/bin/env python + +from datetime import datetime +import gzip +import sys + +from swh.model.hashutil import hash_to_bytes, hash_to_hex + +if __name__ == "__main__": + if len(sys.argv) != 3: + print("usage: revisions_sort ") + exit(-1) + + infilename = sys.argv[1] + outfilename = sys.argv[2] + + with gzip.open(infilename, "rt") as infile: + revisions = [] + sort = False + for idx, line in enumerate(infile.readlines(), start=1): + if line.strip(): + splitted = line.split(",") + revision = hash_to_bytes(splitted[0]) + date = datetime.fromisoformat(splitted[1]) + root = hash_to_bytes(splitted[2]) + + assert date is not None + + if revisions: + last = revisions[-1] + if date < last[1]: + print("Out of order", last, f"({revision},{date},{root})") + sort = True + + revisions.append((revision, date, root)) + + if sort: + revisions = sorted(revisions, key=lambda rev: rev[1]) + + date = None + with gzip.open(outfilename, "wt") as outfile: + for rev in revisions: + assert date == None or date <= rev[1] + date = rev[1] + outfile.write( + f"{hash_to_hex(rev[0])},{rev[1]},{hash_to_hex(rev[2])}\n" + )