diff --git a/swh/provenance/model.py b/swh/provenance/model.py index 13dd696..5ace0e6 100644 --- a/swh/provenance/model.py +++ b/swh/provenance/model.py @@ -1,147 +1,153 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from __future__ import annotations from datetime import datetime from typing import Iterable, Iterator, List, Optional from swh.model.model import Origin, Sha1Git from .archive import ArchiveInterface class OriginEntry: def __init__(self, url: str, snapshot: Sha1Git) -> None: self.url = url self.id = Origin(url=self.url).id self.snapshot = snapshot self._revisions: Optional[List[RevisionEntry]] = None + self._revisions_count = -1 def retrieve_revisions(self, archive: ArchiveInterface) -> None: if self._revisions is None: self._revisions = [ RevisionEntry(rev) for rev in archive.snapshot_get_heads(self.snapshot) ] + self._revisions_count = len(self._revisions) + + @property + def revision_count(self) -> int: + return self._revisions_count @property def revisions(self) -> Iterator[RevisionEntry]: if self._revisions is None: raise RuntimeError( "Revisions of this node has not yet been retrieved. " "Please call retrieve_revisions() before using this property." ) return (x for x in self._revisions) def __str__(self) -> str: return f"" class RevisionEntry: def __init__( self, id: Sha1Git, date: Optional[datetime] = None, root: Optional[Sha1Git] = None, parents: Optional[Iterable[Sha1Git]] = None, ) -> None: self.id = id self.date = date assert self.date is None or self.date.tzinfo is not None self.root = root self._parents_ids = parents self._parents_entries: Optional[List[RevisionEntry]] = None def retrieve_parents(self, archive: ArchiveInterface) -> None: if self._parents_entries is None: if self._parents_ids is None: self._parents_ids = archive.revision_get_parents(self.id) self._parents_entries = [RevisionEntry(id) for id in self._parents_ids] @property def parents(self) -> Iterator[RevisionEntry]: if self._parents_entries is None: raise RuntimeError( "Parents of this node has not yet been retrieved. " "Please call retrieve_parents() before using this property." ) return (x for x in self._parents_entries) def __str__(self) -> str: return f"" def __eq__(self, other) -> bool: return isinstance(other, RevisionEntry) and self.id == other.id def __hash__(self) -> int: return hash(self.id) class DirectoryEntry: def __init__(self, id: Sha1Git, name: bytes = b"") -> None: self.id = id self.name = name self._files: Optional[List[FileEntry]] = None self._dirs: Optional[List[DirectoryEntry]] = None def retrieve_children(self, archive: ArchiveInterface, minsize: int = 0) -> None: if self._files is None and self._dirs is None: self._files = [] self._dirs = [] for child in archive.directory_ls(self.id, minsize=minsize): if child["type"] == "dir": self._dirs.append( DirectoryEntry(child["target"], name=child["name"]) ) elif child["type"] == "file": self._files.append(FileEntry(child["target"], child["name"])) @property def files(self) -> Iterator[FileEntry]: if self._files is None: raise RuntimeError( "Children of this node has not yet been retrieved. " "Please call retrieve_children() before using this property." ) return (x for x in self._files) @property def dirs(self) -> Iterator[DirectoryEntry]: if self._dirs is None: raise RuntimeError( "Children of this node has not yet been retrieved. " "Please call retrieve_children() before using this property." ) return (x for x in self._dirs) def __str__(self) -> str: return f"" def __eq__(self, other) -> bool: return isinstance(other, DirectoryEntry) and (self.id, self.name) == ( other.id, other.name, ) def __hash__(self) -> int: return hash((self.id, self.name)) class FileEntry: def __init__(self, id: Sha1Git, name: bytes) -> None: self.id = id self.name = name def __str__(self) -> str: return f"" def __eq__(self, other) -> bool: return isinstance(other, FileEntry) and (self.id, self.name) == ( other.id, other.name, ) def __hash__(self) -> int: return hash((self.id, self.name)) diff --git a/swh/provenance/origin.py b/swh/provenance/origin.py index 5c7414e..eed0349 100644 --- a/swh/provenance/origin.py +++ b/swh/provenance/origin.py @@ -1,123 +1,143 @@ # Copyright (C) 2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from datetime import datetime from itertools import islice from typing import Generator, Iterable, Iterator, List, Optional, Tuple from swh.core.statsd import statsd from swh.model.model import Sha1Git from .archive import ArchiveInterface from .graph import HistoryGraph from .interface import ProvenanceInterface from .model import OriginEntry, RevisionEntry ORIGIN_DURATION_METRIC = "swh_provenance_origin_revision_layer_duration_seconds" LOG_FORMAT = ( "%(levelname) -10s %(asctime)s %(name) -30s %(funcName) " "-35s %(lineno) -5d: %(message)s" ) LOGGER = logging.getLogger(__name__) class CSVOriginIterator: """Iterator over origin visit statuses typically present in the given CSV file. The input is an iterator that produces 2 elements per row: (url, snap) where: - url: is the origin url of the visit - snap: sha1_git of the snapshot pointed by the visit status """ def __init__( self, statuses: Iterable[Tuple[str, Sha1Git]], limit: Optional[int] = None, ) -> None: self.statuses: Iterator[Tuple[str, Sha1Git]] if limit is not None: self.statuses = islice(statuses, limit) else: self.statuses = iter(statuses) def __iter__(self) -> Generator[OriginEntry, None, None]: return (OriginEntry(url, snapshot) for url, snapshot in self.statuses) @statsd.timed(metric=ORIGIN_DURATION_METRIC, tags={"method": "main"}) def origin_add( provenance: ProvenanceInterface, archive: ArchiveInterface, origins: List[OriginEntry], commit: bool = True, ) -> None: for origin in origins: proceed_origin(provenance, archive, origin) if commit: provenance.flush() @statsd.timed(metric=ORIGIN_DURATION_METRIC, tags={"method": "proceed_origin"}) def proceed_origin( provenance: ProvenanceInterface, archive: ArchiveInterface, origin: OriginEntry ) -> None: - LOGGER.info("Processing origin %s", origin.url) + LOGGER.info("Processing origin=%s", origin) start = datetime.now() + + LOGGER.debug("Add origin") provenance.origin_add(origin) + + LOGGER.debug("Retrieving head revisions") origin.retrieve_revisions(archive) - for revision in origin.revisions: + LOGGER.info("%d heads founds", origin.revision_count) + + for idx, revision in enumerate(origin.revisions): + LOGGER.info( + "checking revision %s (%d/%d)", revision, idx + 1, origin.revision_count + ) + if not provenance.revision_is_head(revision): + LOGGER.debug("revision %s not in heads", revision) + graph = HistoryGraph(archive, revision) + LOGGER.debug("History graph built") + origin_add_revision(provenance, origin, graph) + LOGGER.debug("Revision added") + # head is treated separately + LOGGER.debug("Checking preferred origin") check_preferred_origin(provenance, origin, revision) + + LOGGER.debug("Adding revision to origin") provenance.revision_add_to_origin(origin, revision) + end = datetime.now() LOGGER.info("Processed origin %s in %s", origin.url, (end - start)) @statsd.timed(metric=ORIGIN_DURATION_METRIC, tags={"method": "process_revision"}) def origin_add_revision( provenance: ProvenanceInterface, origin: OriginEntry, graph: HistoryGraph, ) -> None: visited = {graph.head} # head's history should be recursively iterated starting from its parents stack = list(graph.parents[graph.head]) while stack: current = stack.pop() check_preferred_origin(provenance, origin, current) # create a link between it and the head, and recursively walk its history provenance.revision_add_before_revision(graph.head, current) visited.add(current) for parent in graph.parents[current]: if parent not in visited: stack.append(parent) @statsd.timed(metric=ORIGIN_DURATION_METRIC, tags={"method": "check_preferred_origin"}) def check_preferred_origin( provenance: ProvenanceInterface, origin: OriginEntry, revision: RevisionEntry, ) -> None: # if the revision has no preferred origin just set the given origin as the # preferred one. TODO: this should be improved in the future! preferred = provenance.revision_get_preferred_origin(revision) if preferred is None: provenance.revision_set_preferred_origin(origin, revision) diff --git a/swh/provenance/tools/origins/client.py b/swh/provenance/tools/origins/client.py index 3d6309a..b161d12 100755 --- a/swh/provenance/tools/origins/client.py +++ b/swh/provenance/tools/origins/client.py @@ -1,108 +1,112 @@ #!/usr/bin/env python import logging import logging.handlers import multiprocessing import os import sys import time from typing import Any, Callable, Dict, List, Optional import yaml import zmq from swh.core import config from swh.model.hashutil import hash_to_bytes from swh.provenance import get_archive, get_provenance from swh.provenance.origin import OriginEntry, origin_add CONFIG_ENVVAR = "SWH_CONFIG_FILENAME" DEFAULT_PATH = os.environ.get(CONFIG_ENVVAR, None) +LOG_FORMAT = ( + "%(asctime)s-%(levelname)s - %(process)d - %(name)s.%(funcName)s:%(lineno)d" + ": %(message)s" +) class Client(multiprocessing.Process): def __init__( self, conf: Dict[str, Any], group: None = None, target: Optional[Callable[..., Any]] = ..., name: Optional[str] = ..., ) -> None: super().__init__(group=group, target=target, name=name) self.archive_conf = conf["archive"] self.storage_conf = conf["storage"] self.url = f"tcp://{conf['org_server']['host']}:{conf['org_server']['port']}" logging.info(f"Client {self.name} created") def run(self): logging.info(f"Client {self.name} started") # XXX: should we reconnect on each iteration to save resources? archive = get_archive(**self.archive_conf) context = zmq.Context() socket: zmq.Socket = context.socket(zmq.REQ) socket.connect(self.url) with get_provenance(**self.storage_conf) as provenance: while True: socket.send(b"NEXT") response = socket.recv_json() if response is None: break batch = [] for origin in response: batch.append( OriginEntry(origin["url"], hash_to_bytes(origin["snapshot"])) ) origin_add(provenance, archive, batch) logging.info(f"Client {self.name} stopped") if __name__ == "__main__": - logging.basicConfig(level=logging.INFO) + logging.basicConfig(level=logging.WARN, format=LOG_FORMAT) # Check parameters if len(sys.argv) != 2: print("usage: client ") exit(-1) processes = int(sys.argv[1]) config_file = None # TODO: add as a cli option if ( config_file is None and DEFAULT_PATH is not None and config.config_exists(DEFAULT_PATH) ): config_file = DEFAULT_PATH if config_file is None or not os.path.exists(config_file): print("No configuration provided") exit(-1) conf = yaml.safe_load(open(config_file, "rb"))["provenance"] # Start counter start = time.time() # Launch as many clients as requested clients: List[Client] = [] for idx in range(processes): logging.info(f"MAIN: launching process {idx}") client = Client(conf, name=f"worker{idx}") client.start() clients.append(client) # Wait for all processes to complete their work for client in clients: logging.info(f"MAIN: waiting for process {client.name} to finish") client.join() logging.info(f"MAIN: process {client.name} finished executing") # Stop counter and report elapsed time stop = time.time() print("Elapsed time:", stop - start, "seconds")