diff --git a/revisions/client.py b/revisions/client.py index 68d7e91..09b0ffb 100755 --- a/revisions/client.py +++ b/revisions/client.py @@ -1,156 +1,160 @@ #!/usr/bin/env python +import iso8601 import logging -import subprocess import sys import time import zmq from multiprocessing import Process from threading import Thread -from datetime import datetime -from swh.model.hashutil import hash_to_bytes, hash_to_hex +from swh.model.hashutil import hash_to_bytes from swh.provenance import get_archive, get_provenance from swh.provenance.archive import ArchiveInterface from swh.provenance.provenance import revision_add from swh.provenance.revision import RevisionEntry from typing import Any, Dict # TODO: take this from a configuration file conninfo = { "archive": { "cls": "direct", "db": { "host": "somerset.internal.softwareheritage.org", "port": "5433", "dbname": "softwareheritage", "user": "guest", }, }, "provenance": { "cls": "local", "db": {"host": "/var/run/postgresql", "port": "5436", "dbname": "provenance"}, }, } class Client(Process): def __init__( self, idx: int, threads: int, conninfo: Dict[str, Any], lower: bool, mindepth: int, ): super().__init__() self.idx = idx self.threads = threads self.conninfo = conninfo self.lower = lower self.mindepth = mindepth def run(self): # Using the same archive object for every worker to share internal caches. archive = get_archive(**self.conninfo["archive"]) # Launch as many threads as requested workers = [] for idx in range(self.threads): logging.info(f"Process {self.idx}: launching thread {idx}") worker = Worker(idx, archive, self.conninfo, self.lower, self.mindepth) worker.start() workers.append(worker) # Wait for all threads to complete their work for idx, worker in enumerate(workers): logging.info(f"Process {self.idx}: waiting for thread {idx} to finish") worker.join() logging.info(f"Process {self.idx}: thread {idx} finished executing") class Worker(Thread): def __init__( self, idx: int, archive: ArchiveInterface, conninfo: Dict[str, Any], lower: bool, mindepth: int, ): super().__init__() self.idx = idx self.archive = archive self.server = conninfo["server"] # Each worker has its own provenance object to isolate # the processing of each revision. self.provenance = get_provenance(**conninfo["provenance"]) self.lower = lower self.mindepth = mindepth def run(self): context = zmq.Context() socket = context.socket(zmq.REQ) socket.connect(self.server) while True: socket.send(b"NEXT") - message = socket.recv_json() + response = socket.recv_json() - if message is None: + if response is None: break + # Ensure date has a valid timezone + date = iso8601.parse_date(response["date"]) + if date.tzinfo is None: + date = date.replace(tzinfo=timezone.utc) + revision = RevisionEntry( self.archive, - hash_to_bytes(message["rev"]), - date=datetime.fromisoformat(message["date"]), - root=hash_to_bytes(message["root"]), + hash_to_bytes(response["rev"]), + date=date, + root=hash_to_bytes(response["root"]), ) revision_add( self.provenance, self.archive, revision, lower=self.lower, mindepth=self.mindepth, ) if __name__ == "__main__": # Check parameters if len(sys.argv) != 5: print("usage: client ") exit(-1) processes = int(sys.argv[1]) port = int(sys.argv[2]) threads = 1 # int(sys.argv[2]) lower = bool(sys.argv[3]) mindepth = int(sys.argv[4]) dbname = conninfo["provenance"]["db"]["dbname"] conninfo["server"] = f"tcp://localhost:{port}" # Set logging level # logging.getLogger().setLevel(logging.INFO) # Start counter start = time.time() # Launch as many clients as requested clients = [] for idx in range(processes): logging.info(f"MAIN: launching process {idx}") client = Client(idx, threads, conninfo, lower, mindepth) client.start() clients.append(client) # Wait for all processes to complete their work for idx, client in enumerate(clients): logging.info(f"MAIN: waiting for process {idx} to finish") client.join() logging.info(f"MAIN: process {idx} finished executing") # Stop counter and report elapsed time stop = time.time() print("Elapsed time:", stop - start, "seconds") diff --git a/revisions/server.py b/revisions/server.py index 67d89b1..f2e0e04 100755 --- a/revisions/server.py +++ b/revisions/server.py @@ -1,159 +1,148 @@ #!/usr/bin/env python import io -import itertools import json -import logging -import subprocess import sys import zmq -from swh.model.hashutil import hash_to_hex -from swh.provenance import get_archive, get_provenance +from swh.provenance import get_provenance from swh.provenance.provenance import ProvenanceInterface -from swh.provenance.revision import CSVRevisionIterator # TODO: take this from a configuration file conninfo = { - "archive": { - "cls": "direct", - "db": { - "host": "somerset.internal.softwareheritage.org", - "port": "5433", - "dbname": "softwareheritage", - "user": "guest", - }, - }, "provenance": { "cls": "local", "db": {"host": "/var/run/postgresql", "port": "5436", "dbname": "provenance"}, }, } def get_tables_stats(provenance: ProvenanceInterface): tables = { "content": dict(), "content_early_in_rev": dict(), "content_in_dir": dict(), "directory": dict(), "directory_in_rev": dict(), "location": dict(), "revision": dict(), } for table in tables: provenance.cursor.execute(f"SELECT COUNT(*) FROM {table}") tables[table]["row_count"] = provenance.cursor.fetchone()[0] provenance.cursor.execute(f"SELECT pg_table_size('{table}')") tables[table]["table_size"] = provenance.cursor.fetchone()[0] provenance.cursor.execute(f"SELECT pg_indexes_size('{table}')") tables[table]["indexes_size"] = provenance.cursor.fetchone()[0] # provenance.cursor.execute(f"SELECT pg_total_relation_size('{table}')") # relation_size[table] = provenance.cursor.fetchone()[0] tables[table]["relation_size"] = ( tables[table]["table_size"] + tables[table]["indexes_size"] ) return tables def init_stats(filename): tables = [ "content", "content_early_in_rev", "content_in_dir", "directory", "directory_in_rev", "location", "revision", ] header = ["revisions count"] for table in tables: header.append(f"{table} rows") header.append(f"{table} table size") header.append(f"{table} index size") header.append(f"{table} relation size") with io.open(filename, "w") as outfile: outfile.write(",".join(header)) outfile.write("\n") def write_stats(filename, count, tables): line = [str(count)] for table, stats in tables.items(): line.append(str(stats["row_count"])) line.append(str(stats["table_size"])) line.append(str(stats["indexes_size"])) line.append(str(stats["relation_size"])) with io.open(filename, "a") as outfile: outfile.write(",".join(line)) outfile.write("\n") if __name__ == "__main__": if len(sys.argv) < 3: - print("usage: server [limit]") + print("usage: server [stats] [limit]") print("where") print( " filename : csv file containing the list of revisions to be iterated (one per" ) print( " line): revision sha1, date in ISO format, root directory sha1." ) print(" port : server listening port.") print( - " limit : max number of revisions to be retrieved from the file." + " stats : number of iteration after which stats should be taken." ) print( - " stats : number of iteration after which stats should be taken." + " limit : max number of revisions to be retrieved from the file." ) exit(-1) filename = sys.argv[1] port = int(sys.argv[2]) - limit = int(sys.argv[3]) if len(sys.argv) > 3 else None - stats = int(sys.argv[4]) if len(sys.argv) > 3 else None + stats = int(sys.argv[3]) if len(sys.argv) > 3 else None + limit = int(sys.argv[4]) if len(sys.argv) > 4 else None context = zmq.Context() socket = context.socket(zmq.REP) socket.bind(f"tcp://*:{port}") - archive = get_archive(**conninfo["archive"]) provenance = get_provenance(**conninfo["provenance"]) statsfile = f"stats_{conninfo['provenance']['db']['dbname']}.csv" if stats is not None: init_stats(statsfile) revisions_provider = ( line.strip().split(",") for line in open(filename, "r") if line.strip() ) - for idx, revision in enumerate( - CSVRevisionIterator(revisions_provider, archive, limit=limit) - ): - if stats is not None and idx != 0 and idx % stats == 0: + for idx, (rev, date, root) in enumerate(revisions_provider): + if limit is not None and idx > limit: + break + + if stats is not None and idx > 0 and idx % stats == 0: write_stats(statsfile, idx, get_tables_stats(provenance)) # Wait for next request from client - message = socket.recv() - message = { - "rev": hash_to_hex(revision.id), - "date": str(revision.date), - "root": hash_to_hex(revision.root), + request = socket.recv() + response = { + "rev": rev, + "date": date, + "root": root, } - socket.send_json(message) + socket.send_json(response) + + if stats is not None: + write_stats(statsfile, 0, get_tables_stats(provenance)) while True: # Force all clients to exit - message = socket.recv() + request = socket.recv() socket.send_json(None)