diff --git a/swh/provenance/tools/compare-all.py b/swh/provenance/tools/compare-all.py index 42b4614..d4ff02a 100755 --- a/swh/provenance/tools/compare-all.py +++ b/swh/provenance/tools/compare-all.py @@ -1,146 +1,146 @@ #!/usr/bin/env python import glob import io import logging import os from typing import Iterable from swh.model.hashutil import hash_to_hex from swh.model.model import Sha1Git from swh.provenance import get_provenance from swh.provenance.interface import EntityType, ProvenanceResult # TODO: take conninfo as command line arguments. conninfo1 = { "cls": "local", "db": {"host": "/var/run/postgresql", "port": "5436", "dbname": "old"}, } conninfo2 = { "cls": "local", "db": {"host": "/var/run/postgresql", "port": "5436", "dbname": "provenance"}, } # Write log file with occurrence detail. def logdiff(filename: str, occurrences: Iterable[ProvenanceResult]) -> None: with io.open(filename, "a") as outfile: for occur in occurrences: try: # Try to decode path. path = os.fsdecode(occur.path).decode("utf-8", "replace") - except: + except Exception: # Use its raw value if not possible path = occur.path outfile.write( "{blob},{rev},{date},{path}\n".format( blob=hash_to_hex(occur.content), rev=hash_to_hex(occur.revision), date=occur.date, path=path, ) ) # Write log file with list of occurrences. def loglist(filename: str, occurrences: Iterable[Sha1Git]) -> None: with io.open(filename, "a") as outfile: for sha1 in occurrences: outfile.write("{blob}\n".format(blob=hash_to_hex(sha1))) # Output log file name. nextidx = None def outfilename(suffix: str) -> str: global nextidx basename, _ = os.path.splitext(os.path.basename(os.path.abspath(__file__))) prefix = os.path.join(os.getcwd(), basename + "-") if nextidx is None: nextidx = 0 for filename in glob.glob(f"{prefix}*.log"): try: lastidx = int(filename.strip(prefix).split("-")[0]) nextidx = max(nextidx, lastidx + 1) - except: + except Exception: continue return f"{prefix}{nextidx:02}-{suffix}.log" # Print iterations progress. # TODO: move to utils module. def progress( iteration: int, total: int, prefix: str = "Progress:", suffix: str = "Complete", decimals: int = 1, length: int = 50, fill: str = "█", printEnd: str = "\r", ): """ Call in a loop to create terminal progress bar @params: iteration - Required : current iteration (Int) total - Required : total iterations (Int) prefix - Optional : prefix string (Str) suffix - Optional : suffix string (Str) decimals - Optional : positive number of decimals in percent complete (Int) length - Optional : character length of bar (Int) fill - Optional : bar fill character (Str) printEnd - Optional : end character (e.g. "\r", "\r\n") (Str) """ percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total))) filledLength = int(length * iteration // total) bar = fill * filledLength + "-" * (length - filledLength) print(f"\r{prefix} |{bar}| {percent}% {suffix}", end=printEnd) # Print New Line on Complete if iteration == total: print() if __name__ == "__main__": # Set minimum logging level to INFO. logging.getLogger().setLevel(logging.INFO) # Get provenance object for both databases and query its lists of content. with get_provenance(**conninfo1) as provenance1: with get_provenance(**conninfo2) as provenance2: content1 = provenance1.storage.entity_get_all(EntityType.CONTENT) content2 = provenance2.storage.entity_get_all(EntityType.CONTENT) if content1 == content2: # If lists of content match, we check that occurrences does as well. total = len(content1) progress(0, total) mismatch = False # Iterate over all content querying all its occurrences on both # databases. for i, sha1 in enumerate(content1): occurrences1 = list(provenance1.content_find_all(sha1)) occurrences2 = list(provenance2.content_find_all(sha1)) # If there is a mismatch log it to file. if len(occurrences1) != len(occurrences2) or set( occurrences1 ) != set(occurrences2): mismatch = True logging.warning( f"Occurrencies mismatch for {hash_to_hex(sha1)}" ) logdiff(outfilename(conninfo1["db"]["dbname"]), occurrences1) logdiff(outfilename(conninfo2["db"]["dbname"]), occurrences2) progress(i + 1, total) if not mismatch: logging.info("Databases are equivalent!") else: # If lists of content don't match, we are done. loglist(outfilename(conninfo1["db"]["dbname"]), content1) loglist(outfilename(conninfo2["db"]["dbname"]), content2) logging.warning("Content lists are different") diff --git a/swh/provenance/tools/compare-first.py b/swh/provenance/tools/compare-first.py index 8ad7844..9f73e7a 100755 --- a/swh/provenance/tools/compare-first.py +++ b/swh/provenance/tools/compare-first.py @@ -1,145 +1,145 @@ #!/usr/bin/env python import glob import io import logging import os from typing import Iterable from swh.model.hashutil import hash_to_hex from swh.model.model import Sha1Git from swh.provenance import get_provenance from swh.provenance.interface import EntityType, ProvenanceResult # TODO: take conninfo as command line arguments. conninfo1 = { "cls": "local", "db": {"host": "/var/run/postgresql", "port": "5436", "dbname": "old"}, } conninfo2 = { "cls": "local", "db": {"host": "/var/run/postgresql", "port": "5436", "dbname": "provenance"}, } # Write log file with occurrence detail. def logdiff(filename: str, occurrence: ProvenanceResult) -> None: with io.open(filename, "a") as outfile: try: # Try to decode path. path = os.fsdecode(occurrence.path).decode("utf-8", "replace") - except: + except Exception: # Use its raw value if not possible path = occurrence.path outfile.write( "{blob},{rev},{date},{path}\n".format( blob=hash_to_hex(occurrence.content), rev=hash_to_hex(occurrence.revision), date=occurrence.date, path=path, ) ) # Write log file with list of occurrences. def loglist(filename: str, occurrences: Iterable[Sha1Git]) -> None: with io.open(filename, "a") as outfile: for sha1 in occurrences: outfile.write("{blob}\n".format(blob=hash_to_hex(sha1))) # Output log file name. nextidx = None def outfilename(suffix: str) -> str: global nextidx basename, _ = os.path.splitext(os.path.basename(os.path.abspath(__file__))) prefix = os.path.join(os.getcwd(), basename + "-") if nextidx is None: nextidx = 0 for filename in glob.glob(f"{prefix}*.log"): try: lastidx = int(filename.strip(prefix).split("-")[0]) nextidx = max(nextidx, lastidx + 1) - except: + except Exception: continue return f"{prefix}{nextidx:02}-{suffix}.log" # Print iterations progress. # TODO: move to utils module. def progress( iteration: int, total: int, prefix: str = "Progress:", suffix: str = "Complete", decimals: int = 1, length: int = 50, fill: str = "█", printEnd: str = "\r", ): """ Call in a loop to create terminal progress bar @params: iteration - Required : current iteration (Int) total - Required : total iterations (Int) prefix - Optional : prefix string (Str) suffix - Optional : suffix string (Str) decimals - Optional : positive number of decimals in percent complete (Int) length - Optional : character length of bar (Int) fill - Optional : bar fill character (Str) printEnd - Optional : end character (e.g. "\r", "\r\n") (Str) """ percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total))) filledLength = int(length * iteration // total) bar = fill * filledLength + "-" * (length - filledLength) print(f"\r{prefix} |{bar}| {percent}% {suffix}", end=printEnd) # Print New Line on Complete if iteration == total: print() if __name__ == "__main__": # Set minimum logging level to INFO. logging.getLogger().setLevel(logging.INFO) # Get provenance object for both databases and query its lists of content. with get_provenance(**conninfo1) as provenance1: with get_provenance(**conninfo2) as provenance2: content1 = provenance1.storage.entity_get_all(EntityType.CONTENT) content2 = provenance2.storage.entity_get_all(EntityType.CONTENT) if content1 == content2: # If lists of content match, we check that occurrences does as well. total = len(content1) progress(0, total) mismatch = False # Iterate over all content querying its first occurrences # on both databases. for i, sha1 in enumerate(content1): occurrence1 = provenance1.content_find_first(sha1) occurrence2 = provenance2.content_find_first(sha1) # If there is a mismatch log it to file. We can only compare the # timestamp as the same blob might be seen for the first time in # different locations. if occurrence1.date != occurrence2.date: mismatch = True logging.warning( f"Occurrencies mismatch for {hash_to_hex(sha1)}" ) logdiff(outfilename(conninfo1["db"]["dbname"]), occurrence1) logdiff(outfilename(conninfo2["db"]["dbname"]), occurrence2) progress(i + 1, total) if not mismatch: logging.info("Databases are equivalent!") else: # If lists of content don't match, we are done. loglist(outfilename(conninfo1["db"]["dbname"]), content1) loglist(outfilename(conninfo2["db"]["dbname"]), content2) logging.warning("Content lists are different") diff --git a/swh/provenance/tools/histogram.py b/swh/provenance/tools/histogram.py index 9e2e629..9738e1e 100755 --- a/swh/provenance/tools/histogram.py +++ b/swh/provenance/tools/histogram.py @@ -1,52 +1,52 @@ #!/usr/bin/env python import io from swh.provenance import get_provenance from swh.provenance.postgresql.provenance import ProvenanceStoragePostgreSql # TODO: take conninfo as command line arguments. conninfo = { "cls": "local", "db": {"host": "/var/run/postgresql", "port": "5436", "dbname": "provenance"}, } if __name__ == "__main__": # Get provenance object. with get_provenance(**conninfo) as provenance: # TODO: use ProvenanceStorageInterface instead! assert isinstance(provenance.storage, ProvenanceStoragePostgreSql) tables = ["directory_in_rev", "content_in_dir"] for table in tables: with provenance.storage.transaction() as cursor: cursor.execute( f"""SELECT depths.depth, COUNT(depths.depth) - FROM (SELECT + FROM (SELECT CASE location.path WHEN '' THEN 0 WHEN '.' THEN 0 ELSE 1 + CHAR_LENGTH(ENCODE(location.path, 'escape')) - CHAR_LENGTH( REPLACE( ENCODE(location.path, 'escape'), '/', '' ) ) END AS depth FROM {table} JOIN location ON {table}.loc=location.id ) AS depths GROUP BY depths.depth ORDER BY depths.depth""" ) filename = "depths_" + conninfo["db"]["dbname"] + f"_{table}.csv" with io.open(filename, "w") as outfile: outfile.write(f"{table} depth,{table} count\n") for depth, count in cursor.fetchall(): outfile.write(f"{depth},{count}\n") diff --git a/swh/provenance/tools/metrics.py b/swh/provenance/tools/metrics.py index 25d6123..5c13fe8 100755 --- a/swh/provenance/tools/metrics.py +++ b/swh/provenance/tools/metrics.py @@ -1,158 +1,144 @@ #!/usr/bin/env python from swh.provenance import get_provenance from swh.provenance.postgresql.provenance import ProvenanceStoragePostgreSql from swh.provenance.provenance import ProvenanceInterface # TODO: take conninfo as command line arguments. conninfo = { "cls": "local", "db": {"host": "/var/run/postgresql", "port": "5436", "dbname": "provenance"}, } def get_tables_stats(provenance: ProvenanceInterface): # TODO: use ProvenanceStorageInterface instead! assert isinstance(provenance.storage, ProvenanceStoragePostgreSql) tables = { "content": dict(), "content_early_in_rev": dict(), "content_in_dir": dict(), "directory": dict(), "directory_in_rev": dict(), "location": dict(), "revision": dict(), } for table in tables: with provenance.storage.transaction() as cursor: cursor.execute(f"SELECT COUNT(*) FROM {table}") tables[table]["row_count"] = cursor.fetchone()[0] cursor.execute(f"SELECT pg_table_size('{table}')") tables[table]["table_size"] = cursor.fetchone()[0] cursor.execute(f"SELECT pg_indexes_size('{table}')") tables[table]["indexes_size"] = cursor.fetchone()[0] # cursor.execute(f"SELECT pg_total_relation_size('{table}')") # relation_size[table] = cursor.fetchone()[0] tables[table]["relation_size"] = ( tables[table]["table_size"] + tables[table]["indexes_size"] ) return tables if __name__ == "__main__": # Get provenance object. with get_provenance(**conninfo) as provenance: # TODO: use ProvenanceStorageInterface instead! assert isinstance(provenance.storage, ProvenanceStoragePostgreSql) tables = get_tables_stats(provenance) for table in tables: row_count = tables[table]["row_count"] table_size = tables[table]["table_size"] + table_ratio = table_size / (row_count or 1) indexes_size = tables[table]["indexes_size"] + indexes_ratio = indexes_size / (row_count or 1) relation_size = tables[table]["relation_size"] + relation_ratio = indexes_size / (row_count or 1) print(f"{table}:") print(f" total rows: {row_count}") if row_count == 0: row_count = 1 + print(f" table size: {table_size} bytes ({table_ratio:.2f} per row)") + print(f" index size: {indexes_size} bytes ({indexes_ratio:.2f} per row)") print( - f" table size: {table_size} bytes ({table_size / row_count:.2f} per row)" - ) - print( - f" index size: {indexes_size} bytes ({indexes_size / row_count:.2f} per row)" - ) - print( - f" total size: {relation_size} bytes ({relation_size / row_count:.2f} per row)" + f" total size: {relation_size} bytes ({relation_ratio:.2f} per row)" ) # Ratios between de different entities/relations. print("ratios:") - print( - f" content/revision: {tables['content']['row_count'] / (tables['revision']['row_count'] if tables['revision']['row_count'] != 0 else 1):.2f}" - ) - print( - f" content_early_in_rev/content: {tables['content_early_in_rev']['row_count'] / (tables['content']['row_count'] if tables['content']['row_count'] != 0 else 1):.2f}" - ) - print( - f" content_in_dir/content: {tables['content_in_dir']['row_count'] / (tables['content']['row_count'] if tables['content']['row_count'] != 0 else 1):.2f}" - ) - print( - f" directory/revision: {tables['directory']['row_count'] / (tables['revision']['row_count'] if tables['revision']['row_count'] != 0 else 1):.2f}" - ) - print( - f" directory_in_rev/directory: {tables['directory_in_rev']['row_count'] / (tables['directory']['row_count'] if tables['directory']['row_count'] != 0 else 1):.2f}" - ) - print(f" ==============================") - print( - f" content_early_in_rev/revision: {tables['content_early_in_rev']['row_count'] / (tables['revision']['row_count'] if tables['revision']['row_count'] != 0 else 1):.2f}" - ) - print( - f" content_in_dir/directory: {tables['content_in_dir']['row_count'] / (tables['directory']['row_count'] if tables['directory']['row_count'] != 0 else 1):.2f}" - ) - print( - f" directory_in_rev/revision: {tables['directory_in_rev']['row_count'] / (tables['revision']['row_count'] if tables['revision']['row_count'] != 0 else 1):.2f}" - ) + for num, den in ( + ("content", "revision"), + ("content_early_in_rev", "content"), + ("content_in_dir", "content"), + ("directory", "revision"), + ("directory_in_rev", "directory"), + ("content_early_in_rev", "revision"), + ("content_in_dir", "directory"), + ("directory_in_rev", "revision"), + ): + ratio = tables[num]["row_count"] / (tables[den]["row_count"] or 1) + print(f"{num}/{den}: {ratio:.2f}") # Metrics for frontiers defined in root directories. with provenance.storage.transaction() as cursor: cursor.execute( - f"""SELECT dir + """SELECT dir FROM directory_in_rev INNER JOIN location ON loc=location.id WHERE location.path=%s""", (b"",), ) directories = list(cursor.fetchall()) print(f"Total root frontiers used: {len(directories)}") cursor.execute( - f"""SELECT dir + """SELECT dir FROM directory_in_rev INNER JOIN location ON loc=location.id WHERE location.path=%s GROUP BY dir""", (b"",), ) directories = list(cursor.fetchall()) print(f"Total distinct root frontiers: {len(directories)}") cursor.execute( - f"""SELECT roots.dir + """SELECT roots.dir FROM (SELECT dir, loc FROM directory_in_rev INNER JOIN location ON loc=location.id WHERE location.path=%s) AS roots JOIN directory_in_rev ON directory_in_rev.dir=roots.dir WHERE directory_in_rev.loc!=roots.loc""", (b"",), ) directories = list(cursor.fetchall()) print(f"Total other uses of these frontiers: {len(directories)}") cursor.execute( - f"""SELECT roots.dir + """SELECT roots.dir FROM (SELECT dir, loc FROM directory_in_rev INNER JOIN location ON loc=location.id WHERE location.path=%s) AS roots JOIN directory_in_rev ON directory_in_rev.dir=roots.dir WHERE directory_in_rev.loc!=roots.loc GROUP BY roots.dir""", (b"",), ) directories = list(cursor.fetchall()) print(f"Total distinct other uses of frontiers: {len(directories)}") diff --git a/swh/provenance/tools/origins/client.py b/swh/provenance/tools/origins/client.py index c48a0df..f897a4f 100755 --- a/swh/provenance/tools/origins/client.py +++ b/swh/provenance/tools/origins/client.py @@ -1,104 +1,105 @@ #!/usr/bin/env python import logging import logging.handlers import multiprocessing import os import sys import time from typing import Any, Callable, Dict, List, Optional +import yaml +import zmq + from swh.core import config from swh.model.hashutil import hash_to_bytes from swh.provenance import get_archive, get_provenance from swh.provenance.origin import OriginEntry, origin_add -import yaml -import zmq CONFIG_ENVVAR = "SWH_CONFIG_FILENAME" DEFAULT_PATH = os.environ.get(CONFIG_ENVVAR, None) class Client(multiprocessing.Process): def __init__( self, conf: Dict[str, Any], group: None = None, target: Optional[Callable[..., Any]] = ..., name: Optional[str] = ..., ) -> None: super().__init__(group=group, target=target, name=name) self.archive_conf = conf["archive"] self.storage_conf = conf["storage"] self.url = f"tcp://{conf['org_server']['host']}:{conf['org_server']['port']}" logging.info(f"Client {self.name} created") def run(self): logging.info(f"Client {self.name} started") # XXX: should we reconnect on each iteration to save resources? archive = get_archive(**self.archive_conf) context = zmq.Context() socket: zmq.Socket = context.socket(zmq.REQ) socket.connect(self.url) with get_provenance(**self.storage_conf) as provenance: while True: socket.send(b"NEXT") response = socket.recv_json() if response is None: break batch = [] for origin in response: batch.append( OriginEntry(origin["url"], hash_to_bytes(origin["snapshot"])) ) origin_add(provenance, archive, batch) logging.info(f"Client {self.name} stopped") if __name__ == "__main__": # Check parameters if len(sys.argv) != 2: print("usage: client ") exit(-1) processes = int(sys.argv[1]) config_file = None # TODO: add as a cli option if ( config_file is None and DEFAULT_PATH is not None and config.config_exists(DEFAULT_PATH) ): config_file = DEFAULT_PATH if config_file is None or not os.path.exists(config_file): print("No configuration provided") exit(-1) conf = yaml.safe_load(open(config_file, "rb"))["provenance"] # Start counter start = time.time() # Launch as many clients as requested clients: List[Client] = [] for idx in range(processes): logging.info(f"MAIN: launching process {idx}") client = Client(conf, name=f"worker{idx}") client.start() clients.append(client) # Wait for all processes to complete their work for client in clients: logging.info(f"MAIN: waiting for process {client.name} to finish") client.join() logging.info(f"MAIN: process {client.name} finished executing") # Stop counter and report elapsed time stop = time.time() print("Elapsed time:", stop - start, "seconds") diff --git a/swh/provenance/tools/origins/server.py b/swh/provenance/tools/origins/server.py index 2bf9bde..5c65d7f 100755 --- a/swh/provenance/tools/origins/server.py +++ b/swh/provenance/tools/origins/server.py @@ -1,234 +1,234 @@ #!/usr/bin/env python from datetime import datetime, timezone from enum import Enum import gzip import io import os import queue import sys import threading import time from typing import Any, Callable, Dict, List, Optional +import yaml +import zmq + from swh.core import config from swh.provenance import get_provenance from swh.provenance.postgresql.provenance import ProvenanceStoragePostgreSql -import yaml -import zmq CONFIG_ENVVAR = "SWH_CONFIG_FILENAME" DEFAULT_BATCH_SIZE = 1 DEFAULT_PATH = os.environ.get(CONFIG_ENVVAR, None) DEFAULT_PORT = 5555 DEFAULT_STATS_RATE = 300 DEFAULT_SKIP_VALUE = 0 UTCEPOCH = datetime.fromtimestamp(0, timezone.utc) class Command(Enum): TERMINATE = "terminate" class StatsWorker(threading.Thread): def __init__( self, filename: str, storage_conf: Dict[str, Any], timeout: float = DEFAULT_STATS_RATE, group: None = None, target: Optional[Callable[..., Any]] = ..., name: Optional[str] = ..., ) -> None: super().__init__(group=group, target=target, name=name) self.filename = filename self.queue = queue.Queue() self.storage_conf = storage_conf self.timeout = timeout def get_tables_stats(self, tables: List[str]) -> Dict[str, int]: # TODO: use ProvenanceStorageInterface instead! with get_provenance(**self.storage_conf) as provenance: assert isinstance(provenance.storage, ProvenanceStoragePostgreSql) stats = {} for table in tables: with provenance.storage.transaction(readonly=True) as cursor: cursor.execute(f"SELECT COUNT(*) AS count FROM {table}") stats[table] = cursor.fetchone()["count"] return stats def init_stats(self, filename: str) -> List[str]: tables = [ "origin", "revision", "revision_in_origin", "revision_before_revision", ] header = ["datetime"] for table in tables: header.append(f"{table} rows") with io.open(filename, "w") as outfile: outfile.write(",".join(header)) outfile.write("\n") return tables def run(self) -> None: tables = self.init_stats(self.filename) start = time.monotonic() while True: now = time.monotonic() if now - start > self.timeout: self.write_stats(self.filename, self.get_tables_stats(tables)) start = now try: cmd = self.queue.get(timeout=1) if cmd == Command.TERMINATE: break except queue.Empty: continue def stop(self) -> None: self.queue.put(Command.TERMINATE) self.join() def write_stats(self, filename: str, stats: Dict[str, int]) -> None: line = [str(datetime.now())] for _, stat in stats.items(): line.append(str(stat)) with io.open(filename, "a") as outfile: outfile.write(",".join(line)) outfile.write("\n") class OriginWorker(threading.Thread): def __init__( self, filename: str, url: str, batch_size: int = DEFAULT_BATCH_SIZE, limit: Optional[int] = None, skip: int = DEFAULT_SKIP_VALUE, group: None = None, target: Optional[Callable[..., Any]] = ..., name: Optional[str] = ..., ) -> None: super().__init__(group=group, target=target, name=name) self.filename = filename self.batch_size = batch_size self.limit = limit self.queue = queue.Queue() self.skip = skip self.url = url def run(self) -> None: context = zmq.Context() socket: zmq.Socket = context.socket(zmq.REP) socket.bind(self.url) # TODO: improve this using a context manager file = ( io.open(self.filename, "r") if os.path.splitext(self.filename)[1] == ".csv" else gzip.open(self.filename, "rt") ) provider = ( line.strip().rsplit(",", maxsplit=1) for line in file if line.strip() ) count = 0 while True: if self.limit is not None and count > self.limit: break response = [] for url, snapshot in provider: count += 1 if count <= self.skip: continue response.append({"url": url, "snapshot": snapshot}) if len(response) == self.batch_size: break if not response: break # Wait for next request from client # (TODO: make it non-blocking or add timeout) socket.recv() socket.send_json(response) try: cmd = self.queue.get(block=False) if cmd == Command.TERMINATE: break except queue.Empty: continue while True: # TODO: improve shutdown logic socket.recv() socket.send_json(None) # context.term() def stop(self) -> None: self.queue.put(Command.TERMINATE) self.join() if __name__ == "__main__": # TODO: improve command line parsing if len(sys.argv) < 2: print("usage: server ") - print("where") print( - " filename : csv file containing the list of origins to be iterated (one per" + "filename: csv file containing the list of origins to be iterated (one per " + "line): origin url, snapshot sha1." ) - print(" line): origin url, snapshot sha1.") exit(-1) config_file = None # TODO: Add as a cli option if ( config_file is None and DEFAULT_PATH is not None and config.config_exists(DEFAULT_PATH) ): config_file = DEFAULT_PATH if config_file is None or not os.path.exists(config_file): print("No configuration provided") exit(-1) conf = yaml.safe_load(open(config_file, "rb"))["provenance"] # Init stats stats = conf["org_server"].pop("stats", None) if stats is not None: storage_conf = ( conf["storage"]["storage_config"] if conf["storage"]["cls"] == "rabbitmq" else conf["storage"] ) statsfile = f"stats_{datetime.now()}_{stats.pop('suffix')}" statsworker = StatsWorker(statsfile, storage_conf, **stats) statsworker.start() # Init origin provider orgsfile = sys.argv[1] host = conf["org_server"].pop("host", None) url = f"tcp://*:{conf['org_server'].pop('port', DEFAULT_PORT)}" orgsworker = OriginWorker(orgsfile, url, **conf["org_server"]) orgsworker.start() # Wait for user commands while True: try: command = input("Enter EXIT to stop service: ") if command.lower() == "exit": break except KeyboardInterrupt: pass # Release resources orgsworker.stop() if stats is not None: statsworker.stop() diff --git a/swh/provenance/tools/revisions/client.py b/swh/provenance/tools/revisions/client.py index e562547..3393394 100755 --- a/swh/provenance/tools/revisions/client.py +++ b/swh/provenance/tools/revisions/client.py @@ -1,141 +1,142 @@ #!/usr/bin/env python from datetime import timezone import logging import logging.handlers import multiprocessing import os import sys import time from typing import Any, Callable, Dict, List, Optional import iso8601 +import yaml +import zmq + from swh.core import config from swh.model.hashutil import hash_to_bytes from swh.provenance import get_archive, get_provenance from swh.provenance.revision import RevisionEntry, revision_add -import yaml -import zmq CONFIG_ENVVAR = "SWH_CONFIG_FILENAME" DEFAULT_PATH = os.environ.get(CONFIG_ENVVAR, None) class Client(multiprocessing.Process): def __init__( self, conf: Dict[str, Any], trackall: bool, flatten: bool, lower: bool, mindepth: int, group: None = None, target: Optional[Callable[..., Any]] = ..., name: Optional[str] = ..., ) -> None: super().__init__(group=group, target=target, name=name) self.archive_conf = conf["archive"] self.storage_conf = conf["storage"] self.url = f"tcp://{conf['rev_server']['host']}:{conf['rev_server']['port']}" self.trackall = trackall self.flatten = flatten self.lower = lower self.mindepth = mindepth logging.info(f"Client {self.name} created") def run(self): logging.info(f"Client {self.name} started") # XXX: should we reconnect on each iteration to save resources? archive = get_archive(**self.archive_conf) context = zmq.Context() socket: zmq.Socket = context.socket(zmq.REQ) socket.connect(self.url) with get_provenance(**self.storage_conf) as provenance: while True: socket.send(b"NEXT") response = socket.recv_json() if response is None: break batch = [] for revision in response: # Ensure date has a valid timezone date = iso8601.parse_date(revision["date"]) if date.tzinfo is None: date = date.replace(tzinfo=timezone.utc) batch.append( RevisionEntry( hash_to_bytes(revision["rev"]), date=date, root=hash_to_bytes(revision["root"]), ) ) revision_add( provenance, archive, batch, trackall=self.trackall, flatten=self.flatten, lower=self.lower, mindepth=self.mindepth, ) logging.info(f"Client {self.name} stopped") if __name__ == "__main__": # Check parameters if len(sys.argv) != 6: print("usage: client ") exit(-1) processes = int(sys.argv[1]) trackall = sys.argv[2].lower() != "false" flatten = sys.argv[3].lower() != "false" lower = sys.argv[4].lower() != "false" mindepth = int(sys.argv[5]) config_file = None # TODO: add as a cli option if ( config_file is None and DEFAULT_PATH is not None and config.config_exists(DEFAULT_PATH) ): config_file = DEFAULT_PATH if config_file is None or not os.path.exists(config_file): print("No configuration provided") exit(-1) conf = yaml.safe_load(open(config_file, "rb"))["provenance"] # Start counter start = time.time() # Launch as many clients as requested clients: List[Client] = [] for idx in range(processes): logging.info(f"MAIN: launching process {idx}") client = Client( conf, trackall=trackall, flatten=flatten, lower=lower, mindepth=mindepth, name=f"worker{idx}", ) client.start() clients.append(client) # Wait for all processes to complete their work for client in clients: logging.info(f"MAIN: waiting for process {client.name} to finish") client.join() logging.info(f"MAIN: process {client.name} finished executing") # Stop counter and report elapsed time stop = time.time() print("Elapsed time:", stop - start, "seconds") diff --git a/swh/provenance/tools/revisions/server.py b/swh/provenance/tools/revisions/server.py index 9163b00..9857223 100755 --- a/swh/provenance/tools/revisions/server.py +++ b/swh/provenance/tools/revisions/server.py @@ -1,242 +1,240 @@ #!/usr/bin/env python from datetime import datetime, timezone from enum import Enum import gzip import io import os import queue import sys import threading import time from typing import Any, Callable, Dict, List, Optional import iso8601 +import yaml +import zmq + from swh.core import config from swh.provenance import get_provenance from swh.provenance.postgresql.provenance import ProvenanceStoragePostgreSql -import yaml -import zmq CONFIG_ENVVAR = "SWH_CONFIG_FILENAME" DEFAULT_BATCH_SIZE = 1 DEFAULT_PATH = os.environ.get(CONFIG_ENVVAR, None) DEFAULT_PORT = 5556 DEFAULT_STATS_RATE = 300 DEFAULT_SKIP_VALUE = 0 UTCEPOCH = datetime.fromtimestamp(0, timezone.utc) class Command(Enum): TERMINATE = "terminate" class StatsWorker(threading.Thread): def __init__( self, filename: str, storage_conf: Dict[str, Any], timeout: float = DEFAULT_STATS_RATE, group: None = None, target: Optional[Callable[..., Any]] = ..., name: Optional[str] = ..., ) -> None: super().__init__(group=group, target=target, name=name) self.filename = filename self.queue = queue.Queue() self.storage_conf = storage_conf self.timeout = timeout def get_tables_stats(self, tables: List[str]) -> Dict[str, int]: # TODO: use ProvenanceStorageInterface instead! with get_provenance(**self.storage_conf) as provenance: assert isinstance(provenance.storage, ProvenanceStoragePostgreSql) stats = {} for table in tables: with provenance.storage.transaction(readonly=True) as cursor: cursor.execute(f"SELECT COUNT(*) AS count FROM {table}") stats[table] = cursor.fetchone()["count"] with provenance.storage.transaction(readonly=True) as cursor: - cursor.execute(f"SELECT MAX(date) AS date FROM revision") + cursor.execute("SELECT MAX(date) AS date FROM revision") stats["maxdate"] = cursor.fetchone()["date"] return stats def init_stats(self, filename: str) -> List[str]: tables = [ "content", "content_in_revision", "content_in_directory", "directory", "directory_in_revision", "location", "revision", ] header = ["datetime"] for table in tables: header.append(f"{table} rows") header.append("revision maxdate") with io.open(filename, "w") as outfile: outfile.write(",".join(header)) outfile.write("\n") return tables def run(self) -> None: tables = self.init_stats(self.filename) start = time.monotonic() while True: now = time.monotonic() if now - start > self.timeout: self.write_stats(self.filename, self.get_tables_stats(tables)) start = now try: cmd = self.queue.get(timeout=1) if cmd == Command.TERMINATE: break except queue.Empty: continue def stop(self) -> None: self.queue.put(Command.TERMINATE) self.join() def write_stats(self, filename: str, stats: Dict[str, int]) -> None: line = [str(datetime.now())] for _, stat in stats.items(): line.append(str(stat)) with io.open(filename, "a") as outfile: outfile.write(",".join(line)) outfile.write("\n") class RevisionWorker(threading.Thread): def __init__( self, filename: str, url: str, batch_size: int = DEFAULT_BATCH_SIZE, limit: Optional[int] = None, skip: int = DEFAULT_SKIP_VALUE, group: None = None, target: Optional[Callable[..., Any]] = ..., name: Optional[str] = ..., ) -> None: super().__init__(group=group, target=target, name=name) self.filename = filename self.batch_size = batch_size self.limit = limit self.queue = queue.Queue() self.skip = skip self.url = url def run(self) -> None: context = zmq.Context() socket: zmq.Socket = context.socket(zmq.REP) socket.bind(self.url) # TODO: improve this using a context manager file = ( io.open(self.filename, "r") if os.path.splitext(self.filename)[1] == ".csv" else gzip.open(self.filename, "rt") ) provider = (line.strip().split(",") for line in file if line.strip()) count = 0 while True: if self.limit is not None and count > self.limit: break response = [] for rev, date, root in provider: count += 1 if count <= self.skip or iso8601.parse_date(date) <= UTCEPOCH: continue response.append({"rev": rev, "date": date, "root": root}) if len(response) == self.batch_size: break if not response: break # Wait for next request from client # (TODO: make it non-blocking or add timeout) socket.recv() socket.send_json(response) try: cmd = self.queue.get(block=False) if cmd == Command.TERMINATE: break except queue.Empty: continue while True: # TODO: improve shutdown logic socket.recv() socket.send_json(None) # context.term() def stop(self) -> None: self.queue.put(Command.TERMINATE) self.join() if __name__ == "__main__": # TODO: improve command line parsing if len(sys.argv) < 2: print("usage: server ") - print("where") - print( - " filename : csv file containing the list of revisions to be iterated (one per" - ) print( - " line): revision sha1, date in ISO format, root directory sha1." + "filename: csv file containing the list of revisions to be iterated (one per" + " line): revision sha1, date in ISO format, root directory sha1." ) exit(-1) config_file = None # TODO: Add as a cli option if ( config_file is None and DEFAULT_PATH is not None and config.config_exists(DEFAULT_PATH) ): config_file = DEFAULT_PATH if config_file is None or not os.path.exists(config_file): print("No configuration provided") exit(-1) conf = yaml.safe_load(open(config_file, "rb"))["provenance"] # Init stats stats = conf["rev_server"].pop("stats", None) if stats is not None: storage_conf = ( conf["storage"]["storage_config"] if conf["storage"]["cls"] == "rabbitmq" else conf["storage"] ) statsfile = f"stats_{datetime.now()}_{stats.pop('suffix')}" statsworker = StatsWorker(statsfile, storage_conf, **stats) statsworker.start() # Init revision provider revsfile = sys.argv[1] host = conf["rev_server"].pop("host", None) url = f"tcp://*:{conf['rev_server'].pop('port', DEFAULT_PORT)}" revsworker = RevisionWorker(revsfile, url, **conf["rev_server"]) revsworker.start() # Wait for user commands while True: try: command = input("Enter EXIT to stop service: ") if command.lower() == "exit": break except KeyboardInterrupt: pass # Release resources revsworker.stop() if stats is not None: statsworker.stop() diff --git a/swh/provenance/tools/revisions_format.py b/swh/provenance/tools/revisions_format.py index 7f5bf92..956e458 100755 --- a/swh/provenance/tools/revisions_format.py +++ b/swh/provenance/tools/revisions_format.py @@ -1,74 +1,75 @@ #!/usr/bin/env python import gzip import sys from typing import IO, Iterable import psycopg2 + from swh.core.db import BaseDb from swh.model.hashutil import hash_to_bytes, hash_to_hex from swh.model.model import Sha1Git conninfo = { "host": "db.internal.softwareheritage.org", "dbname": "softwareheritage", "user": "guest", } def write_output( cursor: psycopg2.cursor, ids: Iterable[Sha1Git], outfile: IO[bytes] ) -> None: cursor.execute( """SELECT id, date, directory FROM revision WHERE id IN %s AND date IS NOT NULL ORDER BY date""", (tuple(ids),), ) for rev in cursor.fetchall(): assert rev is not None, rev assert rev[1] is not None, rev outfile.write(f"{hash_to_hex(rev[0])},{rev[1]},{hash_to_hex(rev[2])}\n") if __name__ == "__main__": if len(sys.argv) != 3: print("usage: revisions_format ") exit(-1) print(f"Connection to database: {conninfo}...") conn = BaseDb.connect(**conninfo).conn BaseDb.adapt_conn(conn) cursor = conn.cursor() infilename = sys.argv[1] outfilename = sys.argv[2] # with io.open(infilename) as infile: # with io.open(outfilename, "w") as outfile: # ids = json.loads(infile.read()) # print(f"Formatting {len(ids)} revisions") # for id in ids: # cursor.execute( # """SELECT id, date, directory # FROM revision # WHERE id=%s AND date IS NOT NULL""", # (hash_to_bytes(id),), # ) # rev = cursor.fetchone() # assert rev is not None # outfile.write(f"{hash_to_hex(rev[0])},{rev[1]},{hash_to_hex(rev[2])}\n") with gzip.open(infilename, "rt") as infile: with gzip.open(outfilename, "wt") as outfile: ids = [] for idx, line in enumerate(infile.readlines(), start=1): if line.strip(): ids.append(hash_to_bytes(line.split(",")[0])) if idx % 100 == 0: write_output(cursor, ids, outfile) ids = [] if ids: write_output(cursor, ids, outfile) diff --git a/swh/provenance/tools/revisions_pick.py b/swh/provenance/tools/revisions_pick.py index f54e62a..822cc05 100755 --- a/swh/provenance/tools/revisions_pick.py +++ b/swh/provenance/tools/revisions_pick.py @@ -1,60 +1,61 @@ #!/usr/bin/env python import io import sys import psycopg2 + from swh.core.db import BaseDb from swh.model.hashutil import hash_to_bytes, hash_to_hex conninfo = { "host": "db.internal.softwareheritage.org", "dbname": "softwareheritage", "user": "guest", } if __name__ == "__main__": if len(sys.argv) != 2: print("usage: listrevs ") exit(-1) filename = sys.argv[1] print(f"Connection to database: {conninfo}...") conn: psycopg2.connection = BaseDb.connect(**conninfo).conn BaseDb.adapt_conn(conn) cursor = conn.cursor() revisions = set( [ hash_to_bytes("1363496c1106606684d40447f5d1149b2c66a9f8"), hash_to_bytes("b91a781cbc1285d441aa682926d93d8c23678b0b"), hash_to_bytes("313315d9790c36e22bb5bb034e9c7d7f470cdf73"), hash_to_bytes("a3b54f0f5de1ad17889fd23aee7c230eefc300cd"), hash_to_bytes("74deb33d12bf275a3b3a9afc833f4760be90f031"), ] ) pending = revisions while pending: cursor.execute( """SELECT parent_id FROM revision_history WHERE id IN %s""", (tuple(pending),), ) parents = set(map(lambda row: row[0], cursor.fetchall())) pending = parents - revisions revisions = revisions | parents # print(f"Requesting {count} revisions out of {total} (probability {probability}).") cursor.execute( """SELECT id, date, directory FROM revision WHERE id IN %s""", (tuple(revisions),), ) ordered = [row for row in cursor.fetchall() if row[1] is not None] ordered.sort(key=lambda rev: rev[1]) print(f"Obtained {len(ordered)} revisions.") with io.open(filename, "w") as outfile: for rev in ordered: outfile.write(f"{hash_to_hex(rev[0])},{rev[1]},{hash_to_hex(rev[2])}\n") diff --git a/swh/provenance/tools/revisions_sort.py b/swh/provenance/tools/revisions_sort.py index a4905eb..896d8b0 100755 --- a/swh/provenance/tools/revisions_sort.py +++ b/swh/provenance/tools/revisions_sort.py @@ -1,47 +1,47 @@ #!/usr/bin/env python from datetime import datetime import gzip import sys from swh.model.hashutil import hash_to_bytes, hash_to_hex if __name__ == "__main__": if len(sys.argv) != 3: print("usage: revisions_sort ") exit(-1) infilename = sys.argv[1] outfilename = sys.argv[2] with gzip.open(infilename, "rt") as infile: revisions = [] sort = False for idx, line in enumerate(infile.readlines(), start=1): if line.strip(): splitted = line.split(",") revision = hash_to_bytes(splitted[0]) date = datetime.fromisoformat(splitted[1]) root = hash_to_bytes(splitted[2]) assert date is not None if revisions: last = revisions[-1] if date < last[1]: print("Out of order", last, f"({revision},{date},{root})") sort = True revisions.append((revision, date, root)) if sort: revisions = sorted(revisions, key=lambda rev: rev[1]) date = None with gzip.open(outfilename, "wt") as outfile: for rev in revisions: - assert date == None or date <= rev[1] + assert date is None or date <= rev[1] date = rev[1] outfile.write( f"{hash_to_hex(rev[0])},{rev[1]},{hash_to_hex(rev[2])}\n" )