diff --git a/revisions/client.py b/revisions/client.py index 33bb00e..e562547 100755 --- a/revisions/client.py +++ b/revisions/client.py @@ -1,130 +1,141 @@ #!/usr/bin/env python from datetime import timezone import logging import logging.handlers import multiprocessing import os import sys import time -from typing import Any, Callable, Dict, Iterable, List, Mapping, Optional, Tuple +from typing import Any, Callable, Dict, List, Optional import iso8601 from swh.core import config from swh.model.hashutil import hash_to_bytes from swh.provenance import get_archive, get_provenance from swh.provenance.revision import RevisionEntry, revision_add import yaml import zmq CONFIG_ENVVAR = "SWH_CONFIG_FILENAME" DEFAULT_PATH = os.environ.get(CONFIG_ENVVAR, None) class Client(multiprocessing.Process): def __init__( self, conf: Dict[str, Any], trackall: bool, + flatten: bool, lower: bool, mindepth: int, group: None = None, target: Optional[Callable[..., Any]] = ..., name: Optional[str] = ..., ) -> None: super().__init__(group=group, target=target, name=name) self.archive_conf = conf["archive"] self.storage_conf = conf["storage"] self.url = f"tcp://{conf['rev_server']['host']}:{conf['rev_server']['port']}" self.trackall = trackall + self.flatten = flatten self.lower = lower self.mindepth = mindepth logging.info(f"Client {self.name} created") def run(self): logging.info(f"Client {self.name} started") # XXX: should we reconnect on each iteration to save resources? archive = get_archive(**self.archive_conf) context = zmq.Context() socket: zmq.Socket = context.socket(zmq.REQ) socket.connect(self.url) with get_provenance(**self.storage_conf) as provenance: while True: socket.send(b"NEXT") response = socket.recv_json() if response is None: break batch = [] for revision in response: # Ensure date has a valid timezone date = iso8601.parse_date(revision["date"]) if date.tzinfo is None: date = date.replace(tzinfo=timezone.utc) batch.append( RevisionEntry( hash_to_bytes(revision["rev"]), date=date, root=hash_to_bytes(revision["root"]), ) ) revision_add( provenance, archive, batch, trackall=self.trackall, + flatten=self.flatten, lower=self.lower, mindepth=self.mindepth, ) logging.info(f"Client {self.name} stopped") if __name__ == "__main__": # Check parameters - if len(sys.argv) != 5: - print("usage: client ") + if len(sys.argv) != 6: + print("usage: client ") exit(-1) processes = int(sys.argv[1]) trackall = sys.argv[2].lower() != "false" - lower = sys.argv[3].lower() != "false" - mindepth = int(sys.argv[4]) + flatten = sys.argv[3].lower() != "false" + lower = sys.argv[4].lower() != "false" + mindepth = int(sys.argv[5]) config_file = None # TODO: add as a cli option if ( config_file is None and DEFAULT_PATH is not None and config.config_exists(DEFAULT_PATH) ): config_file = DEFAULT_PATH if config_file is None or not os.path.exists(config_file): print("No configuration provided") exit(-1) conf = yaml.safe_load(open(config_file, "rb"))["provenance"] # Start counter start = time.time() # Launch as many clients as requested clients: List[Client] = [] for idx in range(processes): logging.info(f"MAIN: launching process {idx}") - client = Client(conf, trackall, lower, mindepth, name=f"worker{idx}") + client = Client( + conf, + trackall=trackall, + flatten=flatten, + lower=lower, + mindepth=mindepth, + name=f"worker{idx}", + ) client.start() clients.append(client) # Wait for all processes to complete their work for client in clients: logging.info(f"MAIN: waiting for process {client.name} to finish") client.join() logging.info(f"MAIN: process {client.name} finished executing") # Stop counter and report elapsed time stop = time.time() print("Elapsed time:", stop - start, "seconds") diff --git a/revisions/server.py b/revisions/server.py index 487182c..9cf1a1a 100755 --- a/revisions/server.py +++ b/revisions/server.py @@ -1,242 +1,242 @@ #!/usr/bin/env python from datetime import datetime, timezone from enum import Enum import gzip import io import os import queue import sys import threading import time -from typing import Any, Callable, Dict, Iterable, List, Mapping, Optional +from typing import Any, Callable, Dict, List, Optional import iso8601 from swh.core import config from swh.provenance import get_provenance from swh.provenance.postgresql.provenance import ProvenanceStoragePostgreSql import yaml import zmq CONFIG_ENVVAR = "SWH_CONFIG_FILENAME" DEFAULT_BATCH_SIZE = 1 DEFAULT_PATH = os.environ.get(CONFIG_ENVVAR, None) DEFAULT_PORT = 5555 DEFAULT_STATS_RATE = 300 DEFAULT_SKIP_VALUE = 0 UTCEPOCH = datetime.fromtimestamp(0, timezone.utc) class Command(Enum): TERMINATE = "terminate" class StatsWorker(threading.Thread): def __init__( self, filename: str, storage_conf: Dict[str, Any], timeout: float = DEFAULT_STATS_RATE, group: None = None, target: Optional[Callable[..., Any]] = ..., name: Optional[str] = ..., ) -> None: super().__init__(group=group, target=target, name=name) self.filename = filename self.queue = queue.Queue() self.storage_conf = storage_conf self.timeout = timeout def get_tables_stats(self, tables: List[str]) -> Dict[str, int]: # TODO: use ProvenanceStorageInterface instead! with get_provenance(**self.storage_conf) as provenance: assert isinstance(provenance.storage, ProvenanceStoragePostgreSql) stats = {} for table in tables: with provenance.storage.transaction(readonly=True) as cursor: cursor.execute(f"SELECT COUNT(*) AS count FROM {table}") stats[table] = cursor.fetchone()["count"] with provenance.storage.transaction(readonly=True) as cursor: cursor.execute(f"SELECT MAX(date) AS date FROM revision") stats["maxdate"] = cursor.fetchone()["date"] return stats def init_stats(self, filename: str) -> List[str]: tables = [ "content", "content_in_revision", "content_in_directory", "directory", "directory_in_revision", "location", "revision", ] header = ["datetime"] for table in tables: header.append(f"{table} rows") header.append("revision maxdate") with io.open(filename, "w") as outfile: outfile.write(",".join(header)) outfile.write("\n") return tables def run(self) -> None: tables = self.init_stats(self.filename) start = time.monotonic() while True: now = time.monotonic() if now - start > self.timeout: self.write_stats(self.filename, self.get_tables_stats(tables)) start = now try: cmd = self.queue.get(timeout=1) if cmd == Command.TERMINATE: break except queue.Empty: continue def stop(self) -> None: self.queue.put(Command.TERMINATE) self.join() def write_stats(self, filename: str, stats: Dict[str, int]) -> None: line = [str(datetime.now())] for _, stat in stats.items(): line.append(str(stat)) with io.open(filename, "a") as outfile: outfile.write(",".join(line)) outfile.write("\n") class RevisionWorker(threading.Thread): def __init__( self, filename: str, url: str, batch_size: int = DEFAULT_BATCH_SIZE, limit: Optional[int] = None, skip: int = DEFAULT_SKIP_VALUE, group: None = None, target: Optional[Callable[..., Any]] = ..., name: Optional[str] = ..., ) -> None: super().__init__(group=group, target=target, name=name) self.filename = filename self.batch_size = batch_size self.limit = limit self.queue = queue.Queue() self.skip = skip self.url = url def run(self) -> None: context = zmq.Context() socket: zmq.Socket = context.socket(zmq.REP) socket.bind(self.url) # TODO: improve this using a context manager file = ( io.open(self.filename, "r") if os.path.splitext(self.filename)[1] == ".csv" else gzip.open(self.filename, "rt") ) provider = (line.strip().split(",") for line in file if line.strip()) count = 0 while True: if self.limit is not None and count > self.limit: break response = [] for rev, date, root in provider: count += 1 if count <= self.skip or iso8601.parse_date(date) <= UTCEPOCH: continue response.append({"rev": rev, "date": date, "root": root}) if len(response) == self.batch_size: break if not response: break # Wait for next request from client # (TODO: make it non-blocking or add timeout) socket.recv() socket.send_json(response) try: cmd = self.queue.get(block=False) if cmd == Command.TERMINATE: break except queue.Empty: continue while True: # TODO: improve shutdown logic socket.recv() socket.send_json(None) # context.term() def stop(self) -> None: self.queue.put(Command.TERMINATE) self.join() if __name__ == "__main__": # TODO: improve command line parsing if len(sys.argv) < 2: print("usage: server ") print("where") print( " filename : csv file containing the list of revisions to be iterated (one per" ) print( " line): revision sha1, date in ISO format, root directory sha1." ) exit(-1) config_file = None # TODO: Add as a cli option if ( config_file is None and DEFAULT_PATH is not None and config.config_exists(DEFAULT_PATH) ): config_file = DEFAULT_PATH if config_file is None or not os.path.exists(config_file): print("No configuration provided") exit(-1) conf = yaml.safe_load(open(config_file, "rb"))["provenance"] # Init stats stats = conf["rev_server"].pop("stats", None) if stats is not None: storage_conf = ( conf["storage"]["storage_config"] if conf["storage"]["cls"] == "rabbitmq" else conf["storage"] ) statsfile = f"stats_{datetime.now()}_{stats.pop('suffix')}" statsworker = StatsWorker(statsfile, storage_conf, **stats) statsworker.start() # Init revision provider revsfile = sys.argv[1] host = conf["rev_server"].pop("host", None) url = f"tcp://*:{conf['rev_server'].pop('port', DEFAULT_PORT)}" revsworker = RevisionWorker(revsfile, url, **conf["rev_server"]) revsworker.start() # Wait for user commands while True: try: command = input("Enter EXIT to stop service: ") if command.lower() == "exit": break except KeyboardInterrupt: pass # Release resources revsworker.stop() if stats is not None: statsworker.stop()