diff --git a/origins/client.py b/origins/client.py new file mode 100755 index 0000000..c48a0df --- /dev/null +++ b/origins/client.py @@ -0,0 +1,104 @@ +#!/usr/bin/env python + +import logging +import logging.handlers +import multiprocessing +import os +import sys +import time +from typing import Any, Callable, Dict, List, Optional + +from swh.core import config +from swh.model.hashutil import hash_to_bytes +from swh.provenance import get_archive, get_provenance +from swh.provenance.origin import OriginEntry, origin_add +import yaml +import zmq + +CONFIG_ENVVAR = "SWH_CONFIG_FILENAME" + +DEFAULT_PATH = os.environ.get(CONFIG_ENVVAR, None) + + +class Client(multiprocessing.Process): + def __init__( + self, + conf: Dict[str, Any], + group: None = None, + target: Optional[Callable[..., Any]] = ..., + name: Optional[str] = ..., + ) -> None: + super().__init__(group=group, target=target, name=name) + self.archive_conf = conf["archive"] + self.storage_conf = conf["storage"] + self.url = f"tcp://{conf['org_server']['host']}:{conf['org_server']['port']}" + logging.info(f"Client {self.name} created") + + def run(self): + logging.info(f"Client {self.name} started") + # XXX: should we reconnect on each iteration to save resources? + archive = get_archive(**self.archive_conf) + + context = zmq.Context() + socket: zmq.Socket = context.socket(zmq.REQ) + socket.connect(self.url) + + with get_provenance(**self.storage_conf) as provenance: + while True: + socket.send(b"NEXT") + response = socket.recv_json() + + if response is None: + break + + batch = [] + for origin in response: + batch.append( + OriginEntry(origin["url"], hash_to_bytes(origin["snapshot"])) + ) + origin_add(provenance, archive, batch) + logging.info(f"Client {self.name} stopped") + + +if __name__ == "__main__": + # Check parameters + if len(sys.argv) != 2: + print("usage: client ") + exit(-1) + + processes = int(sys.argv[1]) + + config_file = None # TODO: add as a cli option + if ( + config_file is None + and DEFAULT_PATH is not None + and config.config_exists(DEFAULT_PATH) + ): + config_file = DEFAULT_PATH + + if config_file is None or not os.path.exists(config_file): + print("No configuration provided") + exit(-1) + + conf = yaml.safe_load(open(config_file, "rb"))["provenance"] + + # Start counter + start = time.time() + + # Launch as many clients as requested + clients: List[Client] = [] + for idx in range(processes): + logging.info(f"MAIN: launching process {idx}") + client = Client(conf, name=f"worker{idx}") + client.start() + clients.append(client) + + # Wait for all processes to complete their work + for client in clients: + logging.info(f"MAIN: waiting for process {client.name} to finish") + client.join() + logging.info(f"MAIN: process {client.name} finished executing") + + # Stop counter and report elapsed time + stop = time.time() + print("Elapsed time:", stop - start, "seconds") diff --git a/revisions/server.py b/origins/server.py similarity index 83% copy from revisions/server.py copy to origins/server.py index 9cf1a1a..2bf9bde 100755 --- a/revisions/server.py +++ b/origins/server.py @@ -1,242 +1,234 @@ #!/usr/bin/env python from datetime import datetime, timezone from enum import Enum import gzip import io import os import queue import sys import threading import time from typing import Any, Callable, Dict, List, Optional -import iso8601 from swh.core import config from swh.provenance import get_provenance from swh.provenance.postgresql.provenance import ProvenanceStoragePostgreSql import yaml import zmq CONFIG_ENVVAR = "SWH_CONFIG_FILENAME" DEFAULT_BATCH_SIZE = 1 DEFAULT_PATH = os.environ.get(CONFIG_ENVVAR, None) DEFAULT_PORT = 5555 DEFAULT_STATS_RATE = 300 DEFAULT_SKIP_VALUE = 0 UTCEPOCH = datetime.fromtimestamp(0, timezone.utc) class Command(Enum): TERMINATE = "terminate" class StatsWorker(threading.Thread): def __init__( self, filename: str, storage_conf: Dict[str, Any], timeout: float = DEFAULT_STATS_RATE, group: None = None, target: Optional[Callable[..., Any]] = ..., name: Optional[str] = ..., ) -> None: super().__init__(group=group, target=target, name=name) self.filename = filename self.queue = queue.Queue() self.storage_conf = storage_conf self.timeout = timeout def get_tables_stats(self, tables: List[str]) -> Dict[str, int]: # TODO: use ProvenanceStorageInterface instead! with get_provenance(**self.storage_conf) as provenance: assert isinstance(provenance.storage, ProvenanceStoragePostgreSql) stats = {} for table in tables: with provenance.storage.transaction(readonly=True) as cursor: cursor.execute(f"SELECT COUNT(*) AS count FROM {table}") stats[table] = cursor.fetchone()["count"] - with provenance.storage.transaction(readonly=True) as cursor: - cursor.execute(f"SELECT MAX(date) AS date FROM revision") - stats["maxdate"] = cursor.fetchone()["date"] return stats def init_stats(self, filename: str) -> List[str]: tables = [ - "content", - "content_in_revision", - "content_in_directory", - "directory", - "directory_in_revision", - "location", + "origin", "revision", + "revision_in_origin", + "revision_before_revision", ] header = ["datetime"] for table in tables: header.append(f"{table} rows") - header.append("revision maxdate") with io.open(filename, "w") as outfile: outfile.write(",".join(header)) outfile.write("\n") return tables def run(self) -> None: tables = self.init_stats(self.filename) start = time.monotonic() while True: now = time.monotonic() if now - start > self.timeout: self.write_stats(self.filename, self.get_tables_stats(tables)) start = now try: cmd = self.queue.get(timeout=1) if cmd == Command.TERMINATE: break except queue.Empty: continue def stop(self) -> None: self.queue.put(Command.TERMINATE) self.join() def write_stats(self, filename: str, stats: Dict[str, int]) -> None: line = [str(datetime.now())] for _, stat in stats.items(): line.append(str(stat)) with io.open(filename, "a") as outfile: outfile.write(",".join(line)) outfile.write("\n") -class RevisionWorker(threading.Thread): +class OriginWorker(threading.Thread): def __init__( self, filename: str, url: str, batch_size: int = DEFAULT_BATCH_SIZE, limit: Optional[int] = None, skip: int = DEFAULT_SKIP_VALUE, group: None = None, target: Optional[Callable[..., Any]] = ..., name: Optional[str] = ..., ) -> None: super().__init__(group=group, target=target, name=name) self.filename = filename self.batch_size = batch_size self.limit = limit self.queue = queue.Queue() self.skip = skip self.url = url def run(self) -> None: context = zmq.Context() socket: zmq.Socket = context.socket(zmq.REP) socket.bind(self.url) # TODO: improve this using a context manager file = ( io.open(self.filename, "r") if os.path.splitext(self.filename)[1] == ".csv" else gzip.open(self.filename, "rt") ) - provider = (line.strip().split(",") for line in file if line.strip()) + provider = ( + line.strip().rsplit(",", maxsplit=1) for line in file if line.strip() + ) count = 0 while True: if self.limit is not None and count > self.limit: break response = [] - for rev, date, root in provider: + for url, snapshot in provider: count += 1 - if count <= self.skip or iso8601.parse_date(date) <= UTCEPOCH: + if count <= self.skip: continue - response.append({"rev": rev, "date": date, "root": root}) + response.append({"url": url, "snapshot": snapshot}) if len(response) == self.batch_size: break if not response: break # Wait for next request from client # (TODO: make it non-blocking or add timeout) socket.recv() socket.send_json(response) try: cmd = self.queue.get(block=False) if cmd == Command.TERMINATE: break except queue.Empty: continue while True: # TODO: improve shutdown logic socket.recv() socket.send_json(None) # context.term() def stop(self) -> None: self.queue.put(Command.TERMINATE) self.join() if __name__ == "__main__": # TODO: improve command line parsing if len(sys.argv) < 2: print("usage: server ") print("where") print( - " filename : csv file containing the list of revisions to be iterated (one per" - ) - print( - " line): revision sha1, date in ISO format, root directory sha1." + " filename : csv file containing the list of origins to be iterated (one per" ) + print(" line): origin url, snapshot sha1.") exit(-1) config_file = None # TODO: Add as a cli option if ( config_file is None and DEFAULT_PATH is not None and config.config_exists(DEFAULT_PATH) ): config_file = DEFAULT_PATH if config_file is None or not os.path.exists(config_file): print("No configuration provided") exit(-1) conf = yaml.safe_load(open(config_file, "rb"))["provenance"] # Init stats - stats = conf["rev_server"].pop("stats", None) + stats = conf["org_server"].pop("stats", None) if stats is not None: storage_conf = ( conf["storage"]["storage_config"] if conf["storage"]["cls"] == "rabbitmq" else conf["storage"] ) statsfile = f"stats_{datetime.now()}_{stats.pop('suffix')}" statsworker = StatsWorker(statsfile, storage_conf, **stats) statsworker.start() - # Init revision provider - revsfile = sys.argv[1] - host = conf["rev_server"].pop("host", None) - url = f"tcp://*:{conf['rev_server'].pop('port', DEFAULT_PORT)}" - revsworker = RevisionWorker(revsfile, url, **conf["rev_server"]) - revsworker.start() + # Init origin provider + orgsfile = sys.argv[1] + host = conf["org_server"].pop("host", None) + url = f"tcp://*:{conf['org_server'].pop('port', DEFAULT_PORT)}" + orgsworker = OriginWorker(orgsfile, url, **conf["org_server"]) + orgsworker.start() # Wait for user commands while True: try: command = input("Enter EXIT to stop service: ") if command.lower() == "exit": break except KeyboardInterrupt: pass # Release resources - revsworker.stop() + orgsworker.stop() if stats is not None: statsworker.stop() diff --git a/revisions/server.py b/revisions/server.py index 9cf1a1a..9163b00 100755 --- a/revisions/server.py +++ b/revisions/server.py @@ -1,242 +1,242 @@ #!/usr/bin/env python from datetime import datetime, timezone from enum import Enum import gzip import io import os import queue import sys import threading import time from typing import Any, Callable, Dict, List, Optional import iso8601 from swh.core import config from swh.provenance import get_provenance from swh.provenance.postgresql.provenance import ProvenanceStoragePostgreSql import yaml import zmq CONFIG_ENVVAR = "SWH_CONFIG_FILENAME" DEFAULT_BATCH_SIZE = 1 DEFAULT_PATH = os.environ.get(CONFIG_ENVVAR, None) -DEFAULT_PORT = 5555 +DEFAULT_PORT = 5556 DEFAULT_STATS_RATE = 300 DEFAULT_SKIP_VALUE = 0 UTCEPOCH = datetime.fromtimestamp(0, timezone.utc) class Command(Enum): TERMINATE = "terminate" class StatsWorker(threading.Thread): def __init__( self, filename: str, storage_conf: Dict[str, Any], timeout: float = DEFAULT_STATS_RATE, group: None = None, target: Optional[Callable[..., Any]] = ..., name: Optional[str] = ..., ) -> None: super().__init__(group=group, target=target, name=name) self.filename = filename self.queue = queue.Queue() self.storage_conf = storage_conf self.timeout = timeout def get_tables_stats(self, tables: List[str]) -> Dict[str, int]: # TODO: use ProvenanceStorageInterface instead! with get_provenance(**self.storage_conf) as provenance: assert isinstance(provenance.storage, ProvenanceStoragePostgreSql) stats = {} for table in tables: with provenance.storage.transaction(readonly=True) as cursor: cursor.execute(f"SELECT COUNT(*) AS count FROM {table}") stats[table] = cursor.fetchone()["count"] with provenance.storage.transaction(readonly=True) as cursor: cursor.execute(f"SELECT MAX(date) AS date FROM revision") stats["maxdate"] = cursor.fetchone()["date"] return stats def init_stats(self, filename: str) -> List[str]: tables = [ "content", "content_in_revision", "content_in_directory", "directory", "directory_in_revision", "location", "revision", ] header = ["datetime"] for table in tables: header.append(f"{table} rows") header.append("revision maxdate") with io.open(filename, "w") as outfile: outfile.write(",".join(header)) outfile.write("\n") return tables def run(self) -> None: tables = self.init_stats(self.filename) start = time.monotonic() while True: now = time.monotonic() if now - start > self.timeout: self.write_stats(self.filename, self.get_tables_stats(tables)) start = now try: cmd = self.queue.get(timeout=1) if cmd == Command.TERMINATE: break except queue.Empty: continue def stop(self) -> None: self.queue.put(Command.TERMINATE) self.join() def write_stats(self, filename: str, stats: Dict[str, int]) -> None: line = [str(datetime.now())] for _, stat in stats.items(): line.append(str(stat)) with io.open(filename, "a") as outfile: outfile.write(",".join(line)) outfile.write("\n") class RevisionWorker(threading.Thread): def __init__( self, filename: str, url: str, batch_size: int = DEFAULT_BATCH_SIZE, limit: Optional[int] = None, skip: int = DEFAULT_SKIP_VALUE, group: None = None, target: Optional[Callable[..., Any]] = ..., name: Optional[str] = ..., ) -> None: super().__init__(group=group, target=target, name=name) self.filename = filename self.batch_size = batch_size self.limit = limit self.queue = queue.Queue() self.skip = skip self.url = url def run(self) -> None: context = zmq.Context() socket: zmq.Socket = context.socket(zmq.REP) socket.bind(self.url) # TODO: improve this using a context manager file = ( io.open(self.filename, "r") if os.path.splitext(self.filename)[1] == ".csv" else gzip.open(self.filename, "rt") ) provider = (line.strip().split(",") for line in file if line.strip()) count = 0 while True: if self.limit is not None and count > self.limit: break response = [] for rev, date, root in provider: count += 1 if count <= self.skip or iso8601.parse_date(date) <= UTCEPOCH: continue response.append({"rev": rev, "date": date, "root": root}) if len(response) == self.batch_size: break if not response: break # Wait for next request from client # (TODO: make it non-blocking or add timeout) socket.recv() socket.send_json(response) try: cmd = self.queue.get(block=False) if cmd == Command.TERMINATE: break except queue.Empty: continue while True: # TODO: improve shutdown logic socket.recv() socket.send_json(None) # context.term() def stop(self) -> None: self.queue.put(Command.TERMINATE) self.join() if __name__ == "__main__": # TODO: improve command line parsing if len(sys.argv) < 2: print("usage: server ") print("where") print( " filename : csv file containing the list of revisions to be iterated (one per" ) print( " line): revision sha1, date in ISO format, root directory sha1." ) exit(-1) config_file = None # TODO: Add as a cli option if ( config_file is None and DEFAULT_PATH is not None and config.config_exists(DEFAULT_PATH) ): config_file = DEFAULT_PATH if config_file is None or not os.path.exists(config_file): print("No configuration provided") exit(-1) conf = yaml.safe_load(open(config_file, "rb"))["provenance"] # Init stats stats = conf["rev_server"].pop("stats", None) if stats is not None: storage_conf = ( conf["storage"]["storage_config"] if conf["storage"]["cls"] == "rabbitmq" else conf["storage"] ) statsfile = f"stats_{datetime.now()}_{stats.pop('suffix')}" statsworker = StatsWorker(statsfile, storage_conf, **stats) statsworker.start() # Init revision provider revsfile = sys.argv[1] host = conf["rev_server"].pop("host", None) url = f"tcp://*:{conf['rev_server'].pop('port', DEFAULT_PORT)}" revsworker = RevisionWorker(revsfile, url, **conf["rev_server"]) revsworker.start() # Wait for user commands while True: try: command = input("Enter EXIT to stop service: ") if command.lower() == "exit": break except KeyboardInterrupt: pass # Release resources revsworker.stop() if stats is not None: statsworker.stop()