diff --git a/revisions/client.py b/revisions/client.py index 297ad85..5bc30d1 100755 --- a/revisions/client.py +++ b/revisions/client.py @@ -1,166 +1,168 @@ #!/usr/bin/env python import logging import logging.handlers +import os import sys import time from datetime import timezone from multiprocessing import Process from threading import Thread from typing import Any, Dict import iso8601 +import yaml import zmq +from swh.core import config from swh.model.hashutil import hash_to_bytes from swh.provenance import get_archive, get_provenance from swh.provenance.archive import ArchiveInterface from swh.provenance.revision import RevisionEntry, revision_add -# TODO: take this from a configuration file -conninfo = { - "archive": { - "cls": "direct", - "db": { - "host": "belvedere.internal.softwareheritage.org", - "port": "5433", - "dbname": "softwareheritage", - "user": "guest", - }, - }, - "provenance": { - "cls": "rabbitmq", - "url": "amqp://localhost:5672/%2f", - "storage_config": {"cls": "postgresql", "db": {"service": "provenance"}}, - }, -} +# All generic config code should reside in swh.core.config +CONFIG_ENVVAR = "SWH_CONFIG_FILENAME" +DEFAULT_PATH = os.environ.get(CONFIG_ENVVAR, None) class Client(Process): def __init__( self, idx: int, threads: int, - conninfo: Dict[str, Any], + conf: Dict[str, Any], trackall: bool, lower: bool, mindepth: int, ): super().__init__() self.idx = idx self.threads = threads - self.conninfo = conninfo + self.conf = conf self.trackall = trackall self.lower = lower self.mindepth = mindepth def run(self): # Using the same archive object for every worker to share internal caches. - archive = get_archive(**self.conninfo["archive"]) + archive = get_archive(**self.conf["archive"]) # Launch as many threads as requested workers = [] for idx in range(self.threads): logging.info(f"Process {self.idx}: launching thread {idx}") worker = Worker( - idx, archive, self.conninfo, self.trackall, self.lower, self.mindepth + idx, archive, self.conf, self.trackall, self.lower, self.mindepth ) worker.start() workers.append(worker) # Wait for all threads to complete their work for idx, worker in enumerate(workers): logging.info(f"Process {self.idx}: waiting for thread {idx} to finish") worker.join() logging.info(f"Process {self.idx}: thread {idx} finished executing") class Worker(Thread): def __init__( self, idx: int, archive: ArchiveInterface, - conninfo: Dict[str, Any], + conf: Dict[str, Any], trackall: bool, lower: bool, mindepth: int, ): super().__init__() self.idx = idx self.archive = archive - self.server = conninfo["rev_server"] + self.storage_conf = conf["storage"] + self.url = f"tcp://{conf['rev_server']['host']}:{conf['rev_server']['port']}" # Each worker has its own provenance object to isolate # the processing of each revision. - # self.provenance = get_provenance(**conninfo["provenance"]) + # self.provenance = get_provenance(**storage_conf) self.trackall = trackall self.lower = lower self.mindepth = mindepth logging.info( f"Worker {self.idx} created ({self.trackall}, {self.lower}, {self.mindepth})" ) def run(self): context = zmq.Context() socket = context.socket(zmq.REQ) - socket.connect(self.server) - with get_provenance(**conninfo["provenance"]) as provenance: + socket.connect(self.url) + with get_provenance(**self.storage_conf) as provenance: while True: socket.send(b"NEXT") response = socket.recv_json() if response is None: break # Ensure date has a valid timezone date = iso8601.parse_date(response["date"]) if date.tzinfo is None: date = date.replace(tzinfo=timezone.utc) revision = RevisionEntry( hash_to_bytes(response["rev"]), date=date, root=hash_to_bytes(response["root"]), ) revision_add( provenance, self.archive, [revision], trackall=self.trackall, lower=self.lower, mindepth=self.mindepth, ) if __name__ == "__main__": # Check parameters - if len(sys.argv) != 6: - print("usage: client ") + if len(sys.argv) != 5: + print("usage: client ") exit(-1) processes = int(sys.argv[1]) - port = int(sys.argv[2]) threads = 1 # int(sys.argv[2]) - trackall = sys.argv[3].lower() != "false" - lower = sys.argv[4].lower() != "false" - mindepth = int(sys.argv[5]) - conninfo["rev_server"] = f"tcp://localhost:{port}" + trackall = sys.argv[2].lower() != "false" + lower = sys.argv[3].lower() != "false" + mindepth = int(sys.argv[4]) + + config_file = None # TODO: Add as a cli option + if ( + config_file is None + and DEFAULT_PATH is not None + and config.config_exists(DEFAULT_PATH) + ): + config_file = DEFAULT_PATH + + if config_file is None or not os.path.exists(config_file): + print("No configuration provided") + exit(-1) + + conf = yaml.safe_load(open(config_file, "rb"))["provenance"] # Start counter start = time.time() # Launch as many clients as requested clients = [] for idx in range(processes): logging.info(f"MAIN: launching process {idx}") - client = Client(idx, threads, conninfo, trackall, lower, mindepth) + client = Client(idx, threads, conf, trackall, lower, mindepth) client.start() clients.append(client) # Wait for all processes to complete their work for idx, client in enumerate(clients): logging.info(f"MAIN: waiting for process {idx} to finish") client.join() logging.info(f"MAIN: process {idx} finished executing") # Stop counter and report elapsed time stop = time.time() print("Elapsed time:", stop - start, "seconds") diff --git a/revisions/server.py b/revisions/server.py index d90b662..716fe16 100755 --- a/revisions/server.py +++ b/revisions/server.py @@ -1,161 +1,175 @@ #!/usr/bin/env python import gzip import io import json import os import sys from datetime import datetime, timezone import iso8601 +import yaml import zmq +from swh.core import config from swh.provenance import get_provenance from swh.provenance.provenance import ProvenanceInterface -UTCEPOCH = datetime.fromtimestamp(0, timezone.utc) +# All generic config code should reside in swh.core.config +CONFIG_ENVVAR = "SWH_CONFIG_FILENAME" +DEFAULT_PATH = os.environ.get(CONFIG_ENVVAR, None) -# TODO: take this from a configuration file -conninfo = { - "provenance": {"cls": "postgresql", "db": {"service": "provenance"}}, -} +UTCEPOCH = datetime.fromtimestamp(0, timezone.utc) def get_tables_stats(provenance: ProvenanceInterface): tables = { "content": dict(), "content_in_revision": dict(), "content_in_directory": dict(), "directory": dict(), "directory_in_revision": dict(), "location": dict(), "revision": dict(), } for table in tables: # TODO: use ProvenanceStorageInterface instead! with provenance.storage.transaction(readonly=True) as cursor: cursor.execute(f"SELECT COUNT(*) AS count FROM {table}") tables[table]["row_count"] = cursor.fetchone()["count"] # cursor.execute(f"SELECT pg_table_size('{table}') AS size") # tables[table]["table_size"] = cursor.fetchone()["size"] # cursor.execute(f"SELECT pg_indexes_size('{table}') AS size") # tables[table]["indexes_size"] = cursor.fetchone()["size"] # # cursor.execute(f"SELECT pg_total_relation_size('{table}') AS size") # # relation_size[table] = cursor.fetchone()["size"] # tables[table]["relation_size"] = ( # tables[table]["table_size"] + tables[table]["indexes_size"] # ) return tables def init_stats(filename): tables = [ "content", "content_early_in_rev", "content_in_dir", "directory", "directory_in_rev", "location", "revision", ] header = ["revisions count", "datetime"] for table in tables: header.append(f"{table} rows") # header.append(f"{table} table size") # header.append(f"{table} index size") # header.append(f"{table} relation size") with io.open(filename, "w") as outfile: outfile.write(",".join(header)) outfile.write("\n") def write_stats(filename, count, tables): line = [str(count), str(datetime.now())] for table, stats in tables.items(): line.append(str(stats["row_count"])) # line.append(str(stats["table_size"])) # line.append(str(stats["indexes_size"])) # line.append(str(stats["relation_size"])) with io.open(filename, "a") as outfile: outfile.write(",".join(line)) outfile.write("\n") if __name__ == "__main__": - if len(sys.argv) < 3: - print("usage: server [stats] [limit]") + if len(sys.argv) < 2: + print("usage: server ") print("where") print( " filename : csv file containing the list of revisions to be iterated (one per" ) print( " line): revision sha1, date in ISO format, root directory sha1." ) - print(" port : server listening port.") - print( - " stats : number of iteration after which stats should be taken." - ) - print( - " limit : max number of revisions to be retrieved from the file." - ) exit(-1) filename = sys.argv[1] - port = int(sys.argv[2]) - stats = int(sys.argv[3]) if len(sys.argv) > 3 else None - limit = int(sys.argv[4]) if len(sys.argv) > 4 else None + + config_file = None # TODO: Add as a cli option + if ( + config_file is None + and DEFAULT_PATH is not None + and config.config_exists(DEFAULT_PATH) + ): + config_file = DEFAULT_PATH + + if config_file is None or not os.path.exists(config_file): + print("No configuration provided") + exit(-1) + + conf = yaml.safe_load(open(config_file, "rb"))["provenance"] context = zmq.Context() socket = context.socket(zmq.REP) - socket.bind(f"tcp://*:{port}") + socket.bind(f"tcp://*:{conf['rev_server']['port']}") + + storage_conf = ( + conf["storage"] + if conf["storage"]["cls"] == "postgresql" + else conf["storage"]["storage_config"] + ) + dbname = storage_conf["db"].get("dbname", storage_conf["db"].get("service")) - statsfile = f"stats_{conninfo['provenance']['db']['service']}_{datetime.now()}.csv" + stats = conf["rev_server"].get("stats") + statsfile = f"stats_{dbname}_{datetime.now()}.csv" if stats is not None: init_stats(statsfile) revisions_provider = ( (line.strip().split(",") for line in io.open(filename, "r") if line.strip()) if os.path.splitext(filename)[1] == ".csv" else ( line.strip().split(",") for line in gzip.open(filename, "rt") if line.strip() ) ) - start = None - with get_provenance(**conninfo["provenance"]) as provenance: + limit = conf["rev_server"].get("limit") + skip = conf["rev_server"].get("skip", 0) + with get_provenance(**storage_conf) as provenance: for idx, (rev, date, root) in enumerate(revisions_provider): if iso8601.parse_date(date) <= UTCEPOCH: continue if limit is not None and limit <= idx: break - if stats is not None and idx > 0 and idx % stats == 0: + if stats is not None and idx > skip and idx % stats == 0: write_stats(statsfile, idx, get_tables_stats(provenance)) # Wait for next request from client request = socket.recv() response = { "rev": rev, "date": date, "root": root, } socket.send_json(response) if stats is not None: write_stats(statsfile, 0, get_tables_stats(provenance)) while True: # Force all clients to exit request = socket.recv() socket.send_json(None)