diff --git a/revisions/client.py b/revisions/client.py index 9a815e5..297ad85 100755 --- a/revisions/client.py +++ b/revisions/client.py @@ -1,173 +1,166 @@ #!/usr/bin/env python -import iso8601 import logging import logging.handlers import sys import time -import zmq - +from datetime import timezone from multiprocessing import Process from threading import Thread +from typing import Any, Dict - +import iso8601 +import zmq from swh.model.hashutil import hash_to_bytes from swh.provenance import get_archive, get_provenance from swh.provenance.archive import ArchiveInterface -from swh.provenance.provenance import revision_add -from swh.provenance.model import RevisionEntry -from typing import Any, Dict - +from swh.provenance.revision import RevisionEntry, revision_add # TODO: take this from a configuration file conninfo = { "archive": { "cls": "direct", "db": { "host": "belvedere.internal.softwareheritage.org", "port": "5433", "dbname": "softwareheritage", "user": "guest", }, }, "provenance": { - "cls": "local", - "db": { - "host": "/var/run/postgresql", - "port": "5436", - "dbname": "provenance", - }, + "cls": "rabbitmq", + "url": "amqp://localhost:5672/%2f", + "storage_config": {"cls": "postgresql", "db": {"service": "provenance"}}, }, } class Client(Process): def __init__( self, idx: int, threads: int, conninfo: Dict[str, Any], trackall: bool, lower: bool, mindepth: int, ): super().__init__() self.idx = idx self.threads = threads self.conninfo = conninfo self.trackall = trackall self.lower = lower self.mindepth = mindepth def run(self): # Using the same archive object for every worker to share internal caches. archive = get_archive(**self.conninfo["archive"]) # Launch as many threads as requested workers = [] for idx in range(self.threads): logging.info(f"Process {self.idx}: launching thread {idx}") worker = Worker( idx, archive, self.conninfo, self.trackall, self.lower, self.mindepth ) worker.start() workers.append(worker) # Wait for all threads to complete their work for idx, worker in enumerate(workers): logging.info(f"Process {self.idx}: waiting for thread {idx} to finish") worker.join() logging.info(f"Process {self.idx}: thread {idx} finished executing") class Worker(Thread): def __init__( self, idx: int, archive: ArchiveInterface, conninfo: Dict[str, Any], trackall: bool, lower: bool, mindepth: int, ): super().__init__() self.idx = idx self.archive = archive - self.server = conninfo["server"] + self.server = conninfo["rev_server"] # Each worker has its own provenance object to isolate # the processing of each revision. - self.provenance = get_provenance(**conninfo["provenance"]) + # self.provenance = get_provenance(**conninfo["provenance"]) self.trackall = trackall self.lower = lower self.mindepth = mindepth logging.info( f"Worker {self.idx} created ({self.trackall}, {self.lower}, {self.mindepth})" ) def run(self): context = zmq.Context() socket = context.socket(zmq.REQ) socket.connect(self.server) - while True: - socket.send(b"NEXT") - response = socket.recv_json() - - if response is None: - break - - revision = RevisionEntry( - hash_to_bytes(response["rev"]), - date=iso8601.parse_date( - response["date"], default_timezone=timezone.utc - ), - root=hash_to_bytes(response["root"]), - ) - revision_add( - self.provenance, - self.archive, - [revision], - trackall=self.trackall, - lower=self.lower, - mindepth=self.mindepth, - ) + with get_provenance(**conninfo["provenance"]) as provenance: + while True: + socket.send(b"NEXT") + response = socket.recv_json() + + if response is None: + break + + # Ensure date has a valid timezone + date = iso8601.parse_date(response["date"]) + if date.tzinfo is None: + date = date.replace(tzinfo=timezone.utc) + + revision = RevisionEntry( + hash_to_bytes(response["rev"]), + date=date, + root=hash_to_bytes(response["root"]), + ) + revision_add( + provenance, + self.archive, + [revision], + trackall=self.trackall, + lower=self.lower, + mindepth=self.mindepth, + ) if __name__ == "__main__": # Check parameters if len(sys.argv) != 6: print("usage: client ") exit(-1) processes = int(sys.argv[1]) port = int(sys.argv[2]) threads = 1 # int(sys.argv[2]) trackall = sys.argv[3].lower() != "false" lower = sys.argv[4].lower() != "false" mindepth = int(sys.argv[5]) - dbname = conninfo["provenance"]["db"]["dbname"] - conninfo["server"] = f"tcp://localhost:{port}" - # conninfo["server"] = f"tcp://128.93.73.182:{port}" # petit-palais - - # Set logging level - logging.getLogger().setLevel(logging.INFO) + conninfo["rev_server"] = f"tcp://localhost:{port}" # Start counter start = time.time() # Launch as many clients as requested clients = [] for idx in range(processes): logging.info(f"MAIN: launching process {idx}") client = Client(idx, threads, conninfo, trackall, lower, mindepth) client.start() clients.append(client) # Wait for all processes to complete their work for idx, client in enumerate(clients): logging.info(f"MAIN: waiting for process {idx} to finish") client.join() logging.info(f"MAIN: process {idx} finished executing") # Stop counter and report elapsed time stop = time.time() print("Elapsed time:", stop - start, "seconds") diff --git a/revisions/server.py b/revisions/server.py index f2e0e04..d90b662 100755 --- a/revisions/server.py +++ b/revisions/server.py @@ -1,148 +1,161 @@ #!/usr/bin/env python +import gzip import io import json +import os import sys -import zmq +from datetime import datetime, timezone +import iso8601 +import zmq from swh.provenance import get_provenance from swh.provenance.provenance import ProvenanceInterface +UTCEPOCH = datetime.fromtimestamp(0, timezone.utc) # TODO: take this from a configuration file conninfo = { - "provenance": { - "cls": "local", - "db": {"host": "/var/run/postgresql", "port": "5436", "dbname": "provenance"}, - }, + "provenance": {"cls": "postgresql", "db": {"service": "provenance"}}, } def get_tables_stats(provenance: ProvenanceInterface): tables = { "content": dict(), - "content_early_in_rev": dict(), - "content_in_dir": dict(), + "content_in_revision": dict(), + "content_in_directory": dict(), "directory": dict(), - "directory_in_rev": dict(), + "directory_in_revision": dict(), "location": dict(), "revision": dict(), } for table in tables: - provenance.cursor.execute(f"SELECT COUNT(*) FROM {table}") - tables[table]["row_count"] = provenance.cursor.fetchone()[0] + # TODO: use ProvenanceStorageInterface instead! + with provenance.storage.transaction(readonly=True) as cursor: + cursor.execute(f"SELECT COUNT(*) AS count FROM {table}") + tables[table]["row_count"] = cursor.fetchone()["count"] - provenance.cursor.execute(f"SELECT pg_table_size('{table}')") - tables[table]["table_size"] = provenance.cursor.fetchone()[0] + # cursor.execute(f"SELECT pg_table_size('{table}') AS size") + # tables[table]["table_size"] = cursor.fetchone()["size"] - provenance.cursor.execute(f"SELECT pg_indexes_size('{table}')") - tables[table]["indexes_size"] = provenance.cursor.fetchone()[0] + # cursor.execute(f"SELECT pg_indexes_size('{table}') AS size") + # tables[table]["indexes_size"] = cursor.fetchone()["size"] - # provenance.cursor.execute(f"SELECT pg_total_relation_size('{table}')") - # relation_size[table] = provenance.cursor.fetchone()[0] - tables[table]["relation_size"] = ( - tables[table]["table_size"] + tables[table]["indexes_size"] - ) + # # cursor.execute(f"SELECT pg_total_relation_size('{table}') AS size") + # # relation_size[table] = cursor.fetchone()["size"] + # tables[table]["relation_size"] = ( + # tables[table]["table_size"] + tables[table]["indexes_size"] + # ) return tables def init_stats(filename): tables = [ "content", "content_early_in_rev", "content_in_dir", "directory", "directory_in_rev", "location", "revision", ] - header = ["revisions count"] + header = ["revisions count", "datetime"] for table in tables: header.append(f"{table} rows") - header.append(f"{table} table size") - header.append(f"{table} index size") - header.append(f"{table} relation size") + # header.append(f"{table} table size") + # header.append(f"{table} index size") + # header.append(f"{table} relation size") with io.open(filename, "w") as outfile: outfile.write(",".join(header)) outfile.write("\n") def write_stats(filename, count, tables): - line = [str(count)] + line = [str(count), str(datetime.now())] for table, stats in tables.items(): line.append(str(stats["row_count"])) - line.append(str(stats["table_size"])) - line.append(str(stats["indexes_size"])) - line.append(str(stats["relation_size"])) + # line.append(str(stats["table_size"])) + # line.append(str(stats["indexes_size"])) + # line.append(str(stats["relation_size"])) with io.open(filename, "a") as outfile: outfile.write(",".join(line)) outfile.write("\n") if __name__ == "__main__": if len(sys.argv) < 3: print("usage: server [stats] [limit]") print("where") print( " filename : csv file containing the list of revisions to be iterated (one per" ) print( " line): revision sha1, date in ISO format, root directory sha1." ) print(" port : server listening port.") print( " stats : number of iteration after which stats should be taken." ) print( " limit : max number of revisions to be retrieved from the file." ) exit(-1) filename = sys.argv[1] port = int(sys.argv[2]) stats = int(sys.argv[3]) if len(sys.argv) > 3 else None limit = int(sys.argv[4]) if len(sys.argv) > 4 else None context = zmq.Context() socket = context.socket(zmq.REP) socket.bind(f"tcp://*:{port}") - provenance = get_provenance(**conninfo["provenance"]) - - statsfile = f"stats_{conninfo['provenance']['db']['dbname']}.csv" + statsfile = f"stats_{conninfo['provenance']['db']['service']}_{datetime.now()}.csv" if stats is not None: init_stats(statsfile) revisions_provider = ( - line.strip().split(",") for line in open(filename, "r") if line.strip() + (line.strip().split(",") for line in io.open(filename, "r") if line.strip()) + if os.path.splitext(filename)[1] == ".csv" + else ( + line.strip().split(",") + for line in gzip.open(filename, "rt") + if line.strip() + ) ) - for idx, (rev, date, root) in enumerate(revisions_provider): - if limit is not None and idx > limit: - break - - if stats is not None and idx > 0 and idx % stats == 0: - write_stats(statsfile, idx, get_tables_stats(provenance)) - - # Wait for next request from client - request = socket.recv() - response = { - "rev": rev, - "date": date, - "root": root, - } - socket.send_json(response) - - if stats is not None: - write_stats(statsfile, 0, get_tables_stats(provenance)) - - while True: - # Force all clients to exit - request = socket.recv() - socket.send_json(None) + start = None + with get_provenance(**conninfo["provenance"]) as provenance: + for idx, (rev, date, root) in enumerate(revisions_provider): + if iso8601.parse_date(date) <= UTCEPOCH: + continue + + if limit is not None and limit <= idx: + break + + if stats is not None and idx > 0 and idx % stats == 0: + write_stats(statsfile, idx, get_tables_stats(provenance)) + + # Wait for next request from client + request = socket.recv() + response = { + "rev": rev, + "date": date, + "root": root, + } + socket.send_json(response) + + if stats is not None: + write_stats(statsfile, 0, get_tables_stats(provenance)) + + while True: + # Force all clients to exit + request = socket.recv() + socket.send_json(None)