diff --git a/mypy.ini b/mypy.ini --- a/mypy.ini +++ b/mypy.ini @@ -14,9 +14,6 @@ [mypy-iso8601.*] ignore_missing_imports = True -[mypy-methodtools.*] -ignore_missing_imports = True - [mypy-msgpack.*] ignore_missing_imports = True @@ -34,3 +31,6 @@ [mypy-psycopg2.*] ignore_missing_imports = True + +[mypy-systemd.*] +ignore_missing_imports = True diff --git a/requirements.txt b/requirements.txt --- a/requirements.txt +++ b/requirements.txt @@ -4,7 +4,6 @@ click deprecated iso8601 -methodtools pika PyYAML types-click diff --git a/swh/provenance/__init__.py b/swh/provenance/__init__.py --- a/swh/provenance/__init__.py +++ b/swh/provenance/__init__.py @@ -59,7 +59,11 @@ from .multiplexer.archive import ArchiveMultiplexed - archives = list((get_archive(**archive) for archive in kwargs["archives"])) + archives = [] + for ctr, archive in enumerate(kwargs["archives"]): + name = archive.pop("name", f"backend_{ctr}") + archives.append((name, get_archive(**archive))) + return ArchiveMultiplexed(archives) else: raise ValueError @@ -94,7 +98,7 @@ :cls:`ValueError` if passed an unknown archive class. """ if cls in ["local", "postgresql"]: - from .postgresql.provenance import ProvenanceStoragePostgreSql + from swh.provenance.postgresql.provenance import ProvenanceStoragePostgreSql if cls == "local": warnings.warn( diff --git a/swh/provenance/api/client.py b/swh/provenance/api/client.py --- a/swh/provenance/api/client.py +++ b/swh/provenance/api/client.py @@ -133,29 +133,33 @@ exchange = ProvenanceStorageRabbitMQServer.get_exchange( meth_name, relation ) - for routing_key, items in ranges.items(): - items_list = list(items) - batches = ( - items_list[idx : idx + self._batch_size] - for idx in range(0, len(items_list), self._batch_size) - ) - for batch in batches: - # FIXME: this is running in a different thread! Hence, if - # self._connection drops, there is no guarantee that the - # request can be sent for the current elements. This - # situation should be handled properly. - self._connection.ioloop.add_callback_threadsafe( - functools.partial( - ProvenanceStorageRabbitMQClient.request, - channel=self._channel, - reply_to=self._callback_queue, - exchange=exchange, - routing_key=routing_key, - correlation_id=self._correlation_id, - data=batch, - ) + try: + self._delay_close = True + for routing_key, items in ranges.items(): + items_list = list(items) + batches = ( + items_list[idx : idx + self._batch_size] + for idx in range(0, len(items_list), self._batch_size) ) - return self.wait_for_acks(acks_expected) + for batch in batches: + # FIXME: this is running in a different thread! Hence, if + # self._connection drops, there is no guarantee that the + # request can be sent for the current elements. This + # situation should be handled properly. + self._connection.ioloop.add_callback_threadsafe( + functools.partial( + ProvenanceStorageRabbitMQClient.request, + channel=self._channel, + reply_to=self._callback_queue, + exchange=exchange, + routing_key=routing_key, + correlation_id=self._correlation_id, + data=batch, + ) + ) + return self.wait_for_acks(meth_name, acks_expected) + finally: + self._delay_close = False except BaseException as ex: self.request_termination(str(ex)) return False @@ -219,6 +223,8 @@ self._wait_min = wait_min self._wait_per_batch = wait_per_batch + self._delay_close = False + def __enter__(self) -> ProvenanceStorageInterface: self.open() assert isinstance(self, ProvenanceStorageInterface) @@ -293,14 +299,14 @@ self._connection.ioloop.call_later(5, self._connection.ioloop.stop) def open_channel(self) -> None: - LOGGER.info("Creating a new channel") + LOGGER.debug("Creating a new channel") assert self._connection is not None self._connection.channel(on_open_callback=self.on_channel_open) def on_channel_open(self, channel: pika.channel.Channel) -> None: - LOGGER.info("Channel opened") + LOGGER.debug("Channel opened") self._channel = channel - LOGGER.info("Adding channel close callback") + LOGGER.debug("Adding channel close callback") assert self._channel is not None self._channel.add_on_close_callback(callback=self.on_channel_closed) self.setup_queue() @@ -312,14 +318,14 @@ self.close_connection() def setup_queue(self) -> None: - LOGGER.info("Declaring callback queue") + LOGGER.debug("Declaring callback queue") assert self._channel is not None self._channel.queue_declare( queue="", exclusive=True, callback=self.on_queue_declare_ok ) def on_queue_declare_ok(self, frame: pika.frame.Method) -> None: - LOGGER.info("Binding queue to default exchanger") + LOGGER.debug("Binding queue to default exchanger") assert self._channel is not None self._callback_queue = frame.method.queue self._channel.basic_qos( @@ -327,12 +333,12 @@ ) def on_basic_qos_ok(self, _unused_frame: pika.frame.Method) -> None: - LOGGER.info("QOS set to: %d", self._prefetch_count) + LOGGER.debug("QOS set to: %d", self._prefetch_count) self.start_consuming() def start_consuming(self) -> None: - LOGGER.info("Issuing consumer related RPC commands") - LOGGER.info("Adding consumer cancellation callback") + LOGGER.debug("Issuing consumer related RPC commands") + LOGGER.debug("Adding consumer cancellation callback") assert self._channel is not None self._channel.add_on_cancel_callback(callback=self.on_consumer_cancelled) assert self._callback_queue is not None @@ -342,7 +348,7 @@ self._consuming = True def on_consumer_cancelled(self, method_frame: pika.frame.Method) -> None: - LOGGER.info("Consumer was cancelled remotely, shutting down: %r", method_frame) + LOGGER.debug("Consumer was cancelled remotely, shutting down: %r", method_frame) if self._channel: self._channel.close() @@ -370,16 +376,16 @@ def stop_consuming(self) -> None: if self._channel: - LOGGER.info("Sending a Basic.Cancel RPC command to RabbitMQ") + LOGGER.debug("Sending a Basic.Cancel RPC command to RabbitMQ") self._channel.basic_cancel(self._consumer_tag, self.on_cancel_ok) def on_cancel_ok(self, _unused_frame: pika.frame.Method) -> None: self._consuming = False - LOGGER.info( + LOGGER.debug( "RabbitMQ acknowledged the cancellation of the consumer: %s", self._consumer_tag, ) - LOGGER.info("Closing the channel") + LOGGER.debug("Closing the channel") assert self._channel is not None self._channel.close() @@ -410,6 +416,19 @@ def stop(self) -> None: assert self._connection is not None if not self._closing: + if self._delay_close: + LOGGER.info("Delaying termination: waiting for a pending request") + delay_start = time.monotonic() + wait = 1 + while self._delay_close: + if wait >= 32: + LOGGER.warning( + "Still waiting for pending request (for %2f seconds)...", + time.monotonic() - delay_start, + ) + time.sleep(wait) + wait = min(wait * 2, 60) + self._closing = True LOGGER.info("Stopping") if self._consuming: @@ -442,28 +461,42 @@ ), ) - def wait_for_acks(self, acks_expected: int) -> bool: + def wait_for_acks(self, meth_name: str, acks_expected: int) -> bool: acks_received = 0 timeout = max( (acks_expected / self._batch_size) * self._wait_per_batch, self._wait_min, ) + start = time.monotonic() + end = start + timeout while acks_received < acks_expected: + local_timeout = end - time.monotonic() + if local_timeout < 1.0: + local_timeout = 1.0 try: - acks_received += self.wait_for_response(timeout=timeout) + acks_received += self.wait_for_response(timeout=local_timeout) except ResponseTimeout: LOGGER.warning( - "Timed out waiting for acks, %s received, %s expected", + "Timed out waiting for acks in %s, %s received, %s expected (in %ss)", + meth_name, acks_received, acks_expected, + time.monotonic() - start, ) return False return acks_received == acks_expected - def wait_for_response(self, timeout: float = 60.0) -> Any: + def wait_for_response(self, timeout: float = 120.0) -> Any: + start = time.monotonic() + end = start + timeout while True: try: - correlation_id, response = self._response_queue.get(timeout=timeout) + local_timeout = end - time.monotonic() + if local_timeout < 1.0: + local_timeout = 1.0 + correlation_id, response = self._response_queue.get( + timeout=local_timeout + ) if correlation_id == self._correlation_id: return response except queue.Empty: diff --git a/swh/provenance/api/server.py b/swh/provenance/api/server.py --- a/swh/provenance/api/server.py +++ b/swh/provenance/api/server.py @@ -30,7 +30,6 @@ from swh.model.hashutil import hash_to_hex from swh.model.model import Sha1Git -from .. import get_provenance_storage from ..interface import ( DirectoryData, EntityType, @@ -402,6 +401,8 @@ def run_request_thread( self, binding_key: str, meth_name: str, relation: Optional[RelationType] ) -> None: + from swh.provenance import get_provenance_storage + with get_provenance_storage(**self._storage_config) as storage: request_queue = self._request_queues[binding_key] merge_items = ProvenanceStorageRabbitMQWorker.get_conflicts_func(meth_name) diff --git a/swh/provenance/archive.py b/swh/provenance/archive.py --- a/swh/provenance/archive.py +++ b/swh/provenance/archive.py @@ -3,7 +3,7 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from typing import Any, Dict, Iterable +from typing import Any, Dict, Iterable, Tuple from typing_extensions import Protocol, runtime_checkable @@ -28,14 +28,17 @@ """ ... - def revision_get_parents(self, id: Sha1Git) -> Iterable[Sha1Git]: - """List parents of one revision. + def revision_get_some_outbound_edges( + self, id: Sha1Git + ) -> Iterable[Tuple[Sha1Git, Sha1Git]]: + """List some outbound edges from one revision. For each revision listed, + all of its outbound edges must be returned. Args: - revisions: sha1 id of the revision to list parents from. + id: sha1 id of the revision to list parents from. - Yields: - sha1 ids for the parents of such revision. + Returns: + list of edges (revision_id, parent_revision_id) """ ... diff --git a/swh/provenance/cli.py b/swh/provenance/cli.py --- a/swh/provenance/cli.py +++ b/swh/provenance/cli.py @@ -15,6 +15,11 @@ import iso8601 import yaml +try: + from systemd.daemon import notify +except ImportError: + notify = None + from swh.core import config from swh.core.cli import CONTEXT_SETTINGS from swh.core.cli import swh as swh_cli_group @@ -198,13 +203,19 @@ }, ) + if notify: + notify("READY=1") + try: - client.process(worker_fn) + with provenance: + client.process(worker_fn) except KeyboardInterrupt: ctx.exit(0) else: print("Done.") finally: + if notify: + notify("STOPPING=1") client.close() @@ -378,13 +389,19 @@ }, ) + if notify: + notify("READY=1") + try: - client.process(worker_fn) + with provenance: + client.process(worker_fn) except KeyboardInterrupt: ctx.exit(0) else: print("Done.") finally: + if notify: + notify("STOPPING=1") client.close() diff --git a/swh/provenance/graph.py b/swh/provenance/graph.py --- a/swh/provenance/graph.py +++ b/swh/provenance/graph.py @@ -10,7 +10,6 @@ from typing import Any, Dict, Optional, Set from swh.core.statsd import statsd -from swh.model.hashutil import hash_to_hex from swh.model.model import Sha1Git from .archive import ArchiveInterface @@ -30,39 +29,39 @@ archive: ArchiveInterface, revision: RevisionEntry, ) -> None: - self._head = revision - self._graph: Dict[RevisionEntry, Set[RevisionEntry]] = {} + self.head_id = revision.id + self._nodes: Set[Sha1Git] = set() + # rev -> set(parents) + self._edges: Dict[Sha1Git, Set[Sha1Git]] = {} - stack = [self._head] + stack = {self.head_id} while stack: current = stack.pop() - if current not in self._graph: - self._graph[current] = set() - current.retrieve_parents(archive) - for parent in current.parents: - self._graph[current].add(parent) - stack.append(parent) + if current not in self._nodes: + self._nodes.add(current) + self._edges.setdefault(current, set()) + for rev, parent in archive.revision_get_some_outbound_edges(current): + self._nodes.add(rev) + self._edges.setdefault(rev, set()).add(parent) + stack.add(parent) - @property - def head(self) -> RevisionEntry: - return self._head + # don't process nodes for which we've already retrieved outbound edges + stack -= self._nodes - @property - def parents(self) -> Dict[RevisionEntry, Set[RevisionEntry]]: - return self._graph + def parent_ids(self) -> Set[Sha1Git]: + """Get all the known parent ids in the current graph""" + return self._nodes - {self.head_id} def __str__(self) -> str: - return f" Dict[str, Any]: return { - "head": hash_to_hex(self.head.id), + "head": self.head_id.hex(), "graph": { - hash_to_hex(node.id): sorted( - [hash_to_hex(parent.id) for parent in parents] - ) - for node, parents in self._graph.items() + node.hex(): sorted(parent.hex() for parent in parents) + for node, parents in self._edges.items() }, } diff --git a/swh/provenance/interface.py b/swh/provenance/interface.py --- a/swh/provenance/interface.py +++ b/swh/provenance/interface.py @@ -369,9 +369,9 @@ ... def revision_add_before_revision( - self, head: RevisionEntry, revision: RevisionEntry + self, head_id: Sha1Git, revision_id: Sha1Git ) -> None: - """Associate `revision` to `head` as an ancestor of the latter.""" + """Associate `revision_id` to `head_id` as an ancestor of the latter.""" ... def revision_add_to_origin( @@ -389,14 +389,12 @@ """Retrieve the date associated to `revision`.""" ... - def revision_get_preferred_origin( - self, revision: RevisionEntry - ) -> Optional[Sha1Git]: + def revision_get_preferred_origin(self, revision_id: Sha1Git) -> Optional[Sha1Git]: """Retrieve the preferred origin associated to `revision`.""" ... def revision_set_preferred_origin( - self, origin: OriginEntry, revision: RevisionEntry + self, origin: OriginEntry, revision_id: Sha1Git ) -> None: """Associate `origin` as the preferred origin for `revision`.""" ... diff --git a/swh/provenance/journal_client.py b/swh/provenance/journal_client.py --- a/swh/provenance/journal_client.py +++ b/swh/provenance/journal_client.py @@ -3,16 +3,27 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import datetime + +try: + from systemd.daemon import notify +except ImportError: + notify = None + +import sentry_sdk + from swh.model.model import TimestampWithTimezone +from swh.provenance.archive import ArchiveInterface from swh.provenance.interface import ProvenanceInterface from swh.provenance.model import OriginEntry, RevisionEntry from swh.provenance.origin import origin_add from swh.provenance.revision import revision_add -from swh.storage.interface import StorageInterface + +EPOCH = datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc) def process_journal_origins( - messages, *, provenance: ProvenanceInterface, archive: StorageInterface, **cfg + messages, *, provenance: ProvenanceInterface, archive: ArchiveInterface, **cfg ) -> None: """Worker function for `JournalClient.process(worker_fn)`.""" assert set(messages) == {"origin_visit_status"}, set(messages) @@ -22,25 +33,37 @@ if visit["snapshot"] is not None ] if origin_entries: - with provenance: - origin_add(provenance, archive, origin_entries, **cfg) + origin_add(provenance, archive, origin_entries, **cfg) + if notify: + notify("WATCHDOG=1") def process_journal_revisions( - messages, *, provenance: ProvenanceInterface, archive: StorageInterface, **cfg + messages, *, provenance: ProvenanceInterface, archive: ArchiveInterface, **cfg ) -> None: """Worker function for `JournalClient.process(worker_fn)`.""" assert set(messages) == {"revision"}, set(messages) - revisions = [ - RevisionEntry( - id=rev["id"], - date=TimestampWithTimezone.from_dict(rev["date"]).to_datetime(), - root=rev["directory"], - parents=rev["parents"], + revisions = [] + for rev in messages["revision"]: + if rev["date"] is None: + continue + try: + date = TimestampWithTimezone.from_dict(rev["date"]).to_datetime() + except Exception: + sentry_sdk.capture_exception() + continue + + if date <= EPOCH: + continue + + revisions.append( + RevisionEntry( + id=rev["id"], + root=rev["directory"], + date=date, + ) ) - for rev in messages["revision"] - if rev["date"] is not None - ] if revisions: - with provenance: - revision_add(provenance, archive, revisions, **cfg) + revision_add(provenance, archive, revisions, **cfg) + if notify: + notify("WATCHDOG=1") diff --git a/swh/provenance/model.py b/swh/provenance/model.py --- a/swh/provenance/model.py +++ b/swh/provenance/model.py @@ -6,7 +6,7 @@ from __future__ import annotations from datetime import datetime -from typing import Iterable, Iterator, List, Optional +from typing import Iterator, List, Optional from swh.model.model import Origin, Sha1Git @@ -55,29 +55,11 @@ id: Sha1Git, date: Optional[datetime] = None, root: Optional[Sha1Git] = None, - parents: Optional[Iterable[Sha1Git]] = None, ) -> None: self.id = id self.date = date assert self.date is None or self.date.tzinfo is not None self.root = root - self._parents_ids = parents - self._parents_entries: Optional[List[RevisionEntry]] = None - - def retrieve_parents(self, archive: ArchiveInterface) -> None: - if self._parents_entries is None: - if self._parents_ids is None: - self._parents_ids = archive.revision_get_parents(self.id) - self._parents_entries = [RevisionEntry(id) for id in self._parents_ids] - - @property - def parents(self) -> Iterator[RevisionEntry]: - if self._parents_entries is None: - raise RuntimeError( - "Parents of this node has not yet been retrieved. " - "Please call retrieve_parents() before using this property." - ) - return (x for x in self._parents_entries) def __str__(self) -> str: return f"" diff --git a/swh/provenance/multiplexer/archive.py b/swh/provenance/multiplexer/archive.py --- a/swh/provenance/multiplexer/archive.py +++ b/swh/provenance/multiplexer/archive.py @@ -4,74 +4,107 @@ # See top-level LICENSE file for more information import logging -from typing import Any, Dict, Iterable, List +from typing import Any, Dict, Iterable, List, Tuple from swh.core.statsd import statsd -from swh.model.model import Sha1Git +from swh.model.model import Directory, Sha1Git from swh.provenance.archive import ArchiveInterface from swh.storage.interface import StorageInterface ARCHIVE_DURATION_METRIC = "swh_provenance_archive_multiplexed_duration_seconds" +ARCHIVE_OPS_METRIC = "swh_provenance_archive_multiplexed_per_backend_count" LOGGER = logging.getLogger(__name__) +EMPTY_DIR_ID = Directory(entries=()).id + class ArchiveMultiplexed: storage: StorageInterface - def __init__(self, archives: List[ArchiveInterface]) -> None: + def __init__(self, archives: List[Tuple[str, ArchiveInterface]]) -> None: self.archives = archives @statsd.timed(metric=ARCHIVE_DURATION_METRIC, tags={"method": "directory_ls"}) def directory_ls(self, id: Sha1Git, minsize: int = 0) -> Iterable[Dict[str, Any]]: - directories = None - for archive in self.archives: + if id == EMPTY_DIR_ID: + return [] + + for backend, archive in self.archives: try: - directories = list(archive.directory_ls(id)) + entries = list(archive.directory_ls(id, minsize=minsize)) except NotImplementedError: - pass + continue + + if entries: + statsd.increment( + ARCHIVE_OPS_METRIC, + tags={"method": "directory_ls", "backend": backend}, + ) + return entries - if directories: - return directories - LOGGER.debug( - "No parents found for revision %s via %s", id.hex(), archive.__class__ - ) + statsd.increment( + ARCHIVE_OPS_METRIC, + tags={"method": "directory_ls", "backend": "empty_or_not_found"}, + ) + LOGGER.debug("directory empty (only rev entries) or not found: %s", id.hex()) return [] @statsd.timed( - metric=ARCHIVE_DURATION_METRIC, tags={"method": "revision_get_parents"} + metric=ARCHIVE_DURATION_METRIC, + tags={"method": "revision_get_some_outbound_edges"}, ) - def revision_get_parents(self, id: Sha1Git) -> Iterable[Sha1Git]: - - for archive in self.archives: + def revision_get_some_outbound_edges( + self, revision_id: Sha1Git + ) -> Iterable[Tuple[Sha1Git, Sha1Git]]: + # TODO: what if the revision doesn't exist in the archive? + for backend, archive in self.archives: try: - parents = list(archive.revision_get_parents(id)) - if parents: - return parents + edges = list(archive.revision_get_some_outbound_edges(revision_id)) + if edges: + statsd.increment( + ARCHIVE_OPS_METRIC, + tags={ + "method": "revision_get_some_outbound_edges", + "backend": backend, + }, + ) + return edges LOGGER.debug( - "No parents found for revision %s via %s", - id.hex(), + "No outbound edges found for revision %s via %s", + revision_id.hex(), archive.__class__, ) except Exception as e: LOGGER.warn( - "Error retrieving parents of revision %s via %s: %s", - id.hex(), + "Error retrieving outbound edges of revision %s via %s: %s", + revision_id.hex(), archive.__class__, e, ) + statsd.increment( + ARCHIVE_OPS_METRIC, + tags={ + "method": "revision_get_some_outbound_edges", + "backend": "no_parents_or_not_found", + }, + ) return [] @statsd.timed(metric=ARCHIVE_DURATION_METRIC, tags={"method": "snapshot_get_heads"}) def snapshot_get_heads(self, id: Sha1Git) -> Iterable[Sha1Git]: - for archive in self.archives: + for backend, archive in self.archives: try: heads = list(archive.snapshot_get_heads(id)) if heads: + statsd.increment( + ARCHIVE_OPS_METRIC, + tags={"method": "snapshot_get_heads", "backend": backend}, + ) return heads LOGGER.debug( "No heads found for snapshot %s via %s", str(id), archive.__class__ @@ -84,4 +117,8 @@ e, ) + statsd.increment( + ARCHIVE_OPS_METRIC, + tags={"method": "snapshot_get_heads", "backend": "no_heads_or_not_found"}, + ) return [] diff --git a/swh/provenance/origin.py b/swh/provenance/origin.py --- a/swh/provenance/origin.py +++ b/swh/provenance/origin.py @@ -14,7 +14,7 @@ from .archive import ArchiveInterface from .graph import HistoryGraph from .interface import ProvenanceInterface -from .model import OriginEntry, RevisionEntry +from .model import OriginEntry ORIGIN_DURATION_METRIC = "swh_provenance_origin_revision_layer_duration_seconds" @@ -62,7 +62,7 @@ commit: bool = True, ) -> None: for origin in origins: - proceed_origin(provenance, archive, origin) + process_origin(provenance, archive, origin) if commit: start = datetime.now() LOGGER.debug("Flushing cache") @@ -70,8 +70,8 @@ LOGGER.info("Cache flushed in %s", (datetime.now() - start)) -@statsd.timed(metric=ORIGIN_DURATION_METRIC, tags={"method": "proceed_origin"}) -def proceed_origin( +@statsd.timed(metric=ORIGIN_DURATION_METRIC, tags={"method": "process_origin"}) +def process_origin( provenance: ProvenanceInterface, archive: ArchiveInterface, origin: OriginEntry ) -> None: LOGGER.info("Processing origin=%s", origin) @@ -100,7 +100,7 @@ # head is treated separately LOGGER.debug("Checking preferred origin") - check_preferred_origin(provenance, origin, revision) + check_preferred_origin(provenance, origin, revision.id) LOGGER.debug("Adding revision to origin") provenance.revision_add_to_origin(origin, revision) @@ -121,29 +121,23 @@ origin: OriginEntry, graph: HistoryGraph, ) -> None: - visited = {graph.head} - # head's history should be recursively iterated starting from its parents - stack = list(graph.parents[graph.head]) - while stack: - current = stack.pop() - check_preferred_origin(provenance, origin, current) + for parent_id in graph.parent_ids(): + check_preferred_origin(provenance, origin, parent_id) # create a link between it and the head, and recursively walk its history - provenance.revision_add_before_revision(graph.head, current) - visited.add(current) - for parent in graph.parents[current]: - if parent not in visited: - stack.append(parent) + provenance.revision_add_before_revision( + head_id=graph.head_id, revision_id=parent_id + ) @statsd.timed(metric=ORIGIN_DURATION_METRIC, tags={"method": "check_preferred_origin"}) def check_preferred_origin( provenance: ProvenanceInterface, origin: OriginEntry, - revision: RevisionEntry, + revision_id: Sha1Git, ) -> None: # if the revision has no preferred origin just set the given origin as the # preferred one. TODO: this should be improved in the future! - preferred = provenance.revision_get_preferred_origin(revision) + preferred = provenance.revision_get_preferred_origin(revision_id) if preferred is None: - provenance.revision_set_preferred_origin(origin, revision) + provenance.revision_set_preferred_origin(origin, revision_id) diff --git a/swh/provenance/postgresql/archive.py b/swh/provenance/postgresql/archive.py --- a/swh/provenance/postgresql/archive.py +++ b/swh/provenance/postgresql/archive.py @@ -3,9 +3,8 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from typing import Any, Dict, Iterable, List +from typing import Any, Dict, Iterable, List, Tuple -from methodtools import lru_cache import psycopg2.extensions from swh.core.statsd import statsd @@ -25,7 +24,6 @@ def directory_ls(self, id: Sha1Git, minsize: int = 0) -> Iterable[Dict[str, Any]]: yield from self._directory_ls(id, minsize=minsize) - @lru_cache(maxsize=100000) @statsd.timed(metric=ARCHIVE_DURATION_METRIC, tags={"method": "directory_ls"}) def _directory_ls(self, id: Sha1Git, minsize: int = 0) -> List[Dict[str, Any]]: with self.conn.cursor() as cursor: @@ -37,12 +35,12 @@ FROM directory WHERE id=%s), ls_d AS (SELECT DISTINCT UNNEST(dir_entries) AS entry_id FROM dir), ls_f AS (SELECT DISTINCT UNNEST(file_entries) AS entry_id FROM dir) - (SELECT 'dir'::directory_entry_type AS type, e.target, e.name + (SELECT 'dir' AS type, e.target, e.name FROM ls_d LEFT JOIN directory_entry_dir e ON ls_d.entry_id=e.id) UNION ALL (WITH known_contents AS - (SELECT 'file'::directory_entry_type AS type, e.target, e.name + (SELECT 'file' AS type, e.target, e.name FROM ls_f LEFT JOIN directory_entry_file e ON ls_f.entry_id=e.id INNER JOIN content c ON e.target=c.sha1_git @@ -50,7 +48,7 @@ ) SELECT * FROM known_contents UNION ALL - (SELECT 'file'::directory_entry_type AS type, e.target, e.name + (SELECT 'file' AS type, e.target, e.name FROM ls_f LEFT JOIN directory_entry_file e ON ls_f.entry_id=e.id LEFT JOIN skipped_content c ON e.target=c.sha1_git @@ -72,39 +70,49 @@ FROM directory WHERE id=%s), ls_d AS (SELECT DISTINCT UNNEST(dir_entries) AS entry_id FROM dir), ls_f AS (SELECT DISTINCT UNNEST(file_entries) AS entry_id FROM dir) - (SELECT 'dir'::directory_entry_type AS type, e.target, e.name + (SELECT 'dir' AS type, e.target, e.name FROM ls_d LEFT JOIN directory_entry_dir e ON ls_d.entry_id=e.id) UNION ALL - (SELECT 'file'::directory_entry_type AS type, e.target, e.name + (SELECT 'file' AS type, e.target, e.name FROM ls_f LEFT JOIN directory_entry_file e ON ls_f.entry_id=e.id) """, (id,), ) - return [ - {"type": row[0], "target": row[1], "name": row[2]} for row in cursor - ] - def revision_get_parents(self, id: Sha1Git) -> Iterable[Sha1Git]: - yield from self._revision_get_parents(id) + entries = [] + for entry_type, target, name in cursor: + if target is None or name is None: + # LEFT JOIN returned a NULL on the right hand side: + # directory_entry_{dir,file} are not up to date. + return [] + entries.append({"type": entry_type, "target": target, "name": name}) + return entries + + def revision_get_some_outbound_edges( + self, revision_id: Sha1Git + ) -> Iterable[Tuple[Sha1Git, Sha1Git]]: + yield from self._revision_get_some_outbound_edges(revision_id) - @lru_cache(maxsize=100000) @statsd.timed( - metric=ARCHIVE_DURATION_METRIC, tags={"method": "revision_get_parents"} + metric=ARCHIVE_DURATION_METRIC, + tags={"method": "revision_get_some_outbound_edges"}, ) - def _revision_get_parents(self, id: Sha1Git) -> List[Sha1Git]: + def _revision_get_some_outbound_edges( + self, revision_id: Sha1Git + ) -> List[Tuple[Sha1Git, Sha1Git]]: with self.conn.cursor() as cursor: cursor.execute( """ - SELECT RH.parent_id::bytea - FROM revision_history AS RH - WHERE RH.id=%s - ORDER BY RH.parent_rank + select + id, unnest(parents) as parent_id + from + swh_revision_list(ARRAY[%s::bytea], 1000); """, - (id,), + (revision_id,), ) - return [row[0] for row in cursor] + return cursor.fetchall() @statsd.timed(metric=ARCHIVE_DURATION_METRIC, tags={"method": "snapshot_get_heads"}) def snapshot_get_heads(self, id: Sha1Git) -> Iterable[Sha1Git]: diff --git a/swh/provenance/provenance.py b/swh/provenance/provenance.py --- a/swh/provenance/provenance.py +++ b/swh/provenance/provenance.py @@ -80,7 +80,7 @@ class Provenance: - MAX_CACHE_ELEMENTS = 100000 + MAX_CACHE_ELEMENTS = 40000 def __init__(self, storage: ProvenanceStorageInterface) -> None: self.storage = storage @@ -99,7 +99,7 @@ self.close() def _flush_limit_reached(self) -> bool: - return max(self._get_cache_stats().values()) > self.MAX_CACHE_ELEMENTS + return sum(self._get_cache_stats().values()) > self.MAX_CACHE_ELEMENTS def _get_cache_stats(self) -> Dict[str, int]: return { @@ -123,7 +123,7 @@ def flush_if_necessary(self) -> bool: """Flush if the number of cached information reached a limit.""" - LOGGER.info("Cache stats: %s", self._get_cache_stats()) + LOGGER.debug("Cache stats: %s", self._get_cache_stats()) if self._flush_limit_reached(): self.flush() return True @@ -483,10 +483,10 @@ self.cache["revision"]["added"].add(revision.id) def revision_add_before_revision( - self, head: RevisionEntry, revision: RevisionEntry + self, head_id: Sha1Git, revision_id: Sha1Git ) -> None: - self.cache["revision_before_revision"].setdefault(revision.id, set()).add( - head.id + self.cache["revision_before_revision"].setdefault(revision_id, set()).add( + head_id ) def revision_add_to_origin( @@ -500,20 +500,18 @@ def revision_get_date(self, revision: RevisionEntry) -> Optional[datetime]: return self.get_dates("revision", [revision.id]).get(revision.id) - def revision_get_preferred_origin( - self, revision: RevisionEntry - ) -> Optional[Sha1Git]: + def revision_get_preferred_origin(self, revision_id: Sha1Git) -> Optional[Sha1Git]: cache = self.cache["revision_origin"]["data"] - if revision.id not in cache: - ret = self.storage.revision_get([revision.id]) - if revision.id in ret: - origin = ret[revision.id].origin + if revision_id not in cache: + ret = self.storage.revision_get([revision_id]) + if revision_id in ret: + origin = ret[revision_id].origin if origin is not None: - cache[revision.id] = origin - return cache.get(revision.id) + cache[revision_id] = origin + return cache.get(revision_id) def revision_set_preferred_origin( - self, origin: OriginEntry, revision: RevisionEntry + self, origin: OriginEntry, revision_id: Sha1Git ) -> None: - self.cache["revision_origin"]["data"][revision.id] = origin.id - self.cache["revision_origin"]["added"].add(revision.id) + self.cache["revision_origin"]["data"][revision_id] = origin.id + self.cache["revision_origin"]["added"].add(revision_id) diff --git a/swh/provenance/revision.py b/swh/provenance/revision.py --- a/swh/provenance/revision.py +++ b/swh/provenance/revision.py @@ -4,6 +4,7 @@ # See top-level LICENSE file for more information from datetime import datetime, timezone +import logging from typing import Generator, Iterable, Iterator, List, Optional, Tuple from swh.core.statsd import statsd @@ -17,6 +18,10 @@ REVISION_DURATION_METRIC = "swh_provenance_revision_content_layer_duration_seconds" +logger = logging.getLogger(__name__) + +EPOCH = datetime(1970, 1, 1, tzinfo=timezone.utc) + class CSVRevisionIterator: """Iterator over revisions typically present in the given CSV file. @@ -63,12 +68,24 @@ minsize: int = 0, commit: bool = True, ) -> None: - for revision in revisions: + revs_processed = 0 + batch_size = len(revisions) + revs_to_commit = False + for batch_pos, revision in enumerate( + sorted(revisions, key=lambda r: r.date or EPOCH) + ): assert revision.date is not None assert revision.root is not None # Processed content starting from the revision's root directory. date = provenance.revision_get_date(revision) if date is None or revision.date < date: + logger.debug( + "Processing revision %s on %s (root %s)", + revision.id.hex(), + revision.date, + revision.root.hex(), + ) + logger.debug("provenance date: %s, building isochrone graph", date) graph = build_isochrone_graph( provenance, archive, @@ -76,6 +93,7 @@ DirectoryEntry(revision.root), minsize=minsize, ) + logger.debug("isochrone graph built, processing content") revision_process_content( provenance, archive, @@ -87,7 +105,20 @@ mindepth=mindepth, minsize=minsize, ) - if commit: + revs_processed += 1 + revs_to_commit = True + if revs_to_commit and commit: + flushed = provenance.flush_if_necessary() + if flushed: + revs_to_commit = False + logger.debug( + "flushed (rev %s/%s, processed %s)", + batch_pos + 1, + batch_size, + revs_processed, + ) + if revs_to_commit and commit: + logger.debug("flushing batch") provenance.flush() diff --git a/swh/provenance/storage/archive.py b/swh/provenance/storage/archive.py --- a/swh/provenance/storage/archive.py +++ b/swh/provenance/storage/archive.py @@ -30,12 +30,16 @@ } @statsd.timed( - metric=ARCHIVE_DURATION_METRIC, tags={"method": "revision_get_parents"} + metric=ARCHIVE_DURATION_METRIC, + tags={"method": "revision_get_some_outbound_edges"}, ) - def revision_get_parents(self, id: Sha1Git) -> Iterable[Sha1Git]: - rev = self.storage.revision_get([id])[0] + def revision_get_some_outbound_edges( + self, revision_id: Sha1Git + ) -> Iterable[Tuple[Sha1Git, Sha1Git]]: + rev = self.storage.revision_get([revision_id])[0] if rev is not None: - yield from rev.parents + for parent_id in rev.parents: + yield (revision_id, parent_id) @statsd.timed(metric=ARCHIVE_DURATION_METRIC, tags={"method": "snapshot_get_heads"}) def snapshot_get_heads(self, id: Sha1Git) -> Iterable[Sha1Git]: diff --git a/swh/provenance/swhgraph/archive.py b/swh/provenance/swhgraph/archive.py --- a/swh/provenance/swhgraph/archive.py +++ b/swh/provenance/swhgraph/archive.py @@ -3,7 +3,7 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from typing import Any, Dict, Iterable +from typing import Any, Dict, Iterable, Tuple from swh.core.statsd import statsd from swh.model.model import Sha1Git @@ -23,13 +23,21 @@ raise NotImplementedError @statsd.timed( - metric=ARCHIVE_DURATION_METRIC, tags={"method": "revision_get_parents"} + metric=ARCHIVE_DURATION_METRIC, + tags={"method": "revision_get_some_outbound_edges"}, ) - def revision_get_parents(self, id: Sha1Git) -> Iterable[Sha1Git]: - src = CoreSWHID(object_type=ObjectType.REVISION, object_id=id) - request = self.graph.neighbors(str(src), edges="rev:rev", return_types="rev") - - yield from (CoreSWHID.from_string(swhid).object_id for swhid in request if swhid) + def revision_get_some_outbound_edges( + self, revision_id: Sha1Git + ) -> Iterable[Tuple[Sha1Git, Sha1Git]]: + src = CoreSWHID(object_type=ObjectType.REVISION, object_id=revision_id) + request = self.graph.visit_edges(str(src), edges="rev:rev") + + for edge in request: + if edge: + yield ( + CoreSWHID.from_string(edge[0]).object_id, + CoreSWHID.from_string(edge[1]).object_id, + ) @statsd.timed(metric=ARCHIVE_DURATION_METRIC, tags={"method": "snapshot_get_heads"}) def snapshot_get_heads(self, id: Sha1Git) -> Iterable[Sha1Git]: @@ -38,4 +46,6 @@ str(src), edges="snp:rev,snp:rel,rel:rev", return_types="rev" ) - yield from (CoreSWHID.from_string(swhid).object_id for swhid in request if swhid) + yield from ( + CoreSWHID.from_string(swhid).object_id for swhid in request if swhid + ) diff --git a/swh/provenance/tests/test_archive_interface.py b/swh/provenance/tests/test_archive_interface.py --- a/swh/provenance/tests/test_archive_interface.py +++ b/swh/provenance/tests/test_archive_interface.py @@ -44,7 +44,9 @@ def directory_ls(self, id: Sha1Git, minsize: int = 0) -> Iterable[Dict[str, Any]]: return [] - def revision_get_parents(self, id: Sha1Git) -> Iterable[Sha1Git]: + def revision_get_some_outbound_edges( + self, revision_id: Sha1Git + ) -> Iterable[Tuple[Sha1Git, Sha1Git]]: return [] def snapshot_get_heads(self, id: Sha1Git) -> Iterable[Sha1Git]: @@ -62,17 +64,20 @@ assert entries_ref == entries -def check_revision_get_parents( +def check_revision_get_some_outbound_edges( reference: ArchiveInterface, archive: ArchiveInterface, data: Dict[str, List[dict]] ) -> None: for revision in data["revision"]: - parents_ref: TCounter[Sha1Git] = Counter( - reference.revision_get_parents(revision["id"]) + parents_ref: TCounter[Tuple[Sha1Git, Sha1Git]] = Counter( + reference.revision_get_some_outbound_edges(revision["id"]) ) - parents: TCounter[Sha1Git] = Counter( - archive.revision_get_parents(revision["id"]) + parents: TCounter[Tuple[Sha1Git, Sha1Git]] = Counter( + archive.revision_get_some_outbound_edges(revision["id"]) ) - assert parents_ref == parents + + # Check that all the reference outbound edges are included in the other + # archives's outbound edges + assert set(parents_ref.items()) <= set(parents.items()) def check_snapshot_get_heads( @@ -188,6 +193,8 @@ target_type = ExtendedObjectType.DIRECTORY elif entry.type == "rev": target_type = ExtendedObjectType.REVISION + else: + assert False, "unknown directory entry type" add_link(edges, directory, entry.target, target_type) for content in model["content"]: @@ -209,7 +216,7 @@ # test against ArchiveStorage archive_api = ArchiveStorage(archive.storage) check_directory_ls(archive, archive_api, data) - check_revision_get_parents(archive, archive_api, data) + check_revision_get_some_outbound_edges(archive, archive_api, data) check_snapshot_get_heads(archive, archive_api, data) # test against ArchivePostgreSQL @@ -219,7 +226,7 @@ BaseDb.adapt_conn(conn) archive_direct = ArchivePostgreSQL(conn) check_directory_ls(archive, archive_direct, data) - check_revision_get_parents(archive, archive_direct, data) + check_revision_get_some_outbound_edges(archive, archive_direct, data) check_snapshot_get_heads(archive, archive_direct, data) # test against ArchiveGraph @@ -228,21 +235,21 @@ archive_graph = ArchiveGraph(graph, archive.storage) with pytest.raises(NotImplementedError): check_directory_ls(archive, archive_graph, data) - check_revision_get_parents(archive, archive_graph, data) + check_revision_get_some_outbound_edges(archive, archive_graph, data) check_snapshot_get_heads(archive, archive_graph, data) # test against ArchiveMultiplexer archive_multiplexed = ArchiveMultiplexed( - [ArchiveNoop(), archive_graph, archive_api] + [("noop", ArchiveNoop()), ("graph", archive_graph), ("api", archive_api)] ) check_directory_ls(archive, archive_multiplexed, data) - check_revision_get_parents(archive, archive_multiplexed, data) + check_revision_get_some_outbound_edges(archive, archive_multiplexed, data) check_snapshot_get_heads(archive, archive_multiplexed, data) def test_noop_multiplexer(): - archive = ArchiveMultiplexed([ArchiveNoop()]) + archive = ArchiveMultiplexed([("noop", ArchiveNoop())]) assert not archive.directory_ls(Sha1Git(b"abcd")) - assert not archive.revision_get_parents(Sha1Git(b"abcd")) + assert not archive.revision_get_some_outbound_edges(Sha1Git(b"abcd")) assert not archive.snapshot_get_heads(Sha1Git(b"abcd")) diff --git a/swh/provenance/tests/test_init.py b/swh/provenance/tests/test_init.py --- a/swh/provenance/tests/test_init.py +++ b/swh/provenance/tests/test_init.py @@ -24,5 +24,7 @@ archive = get_archive(cls="multiplexer", **config) assert isinstance(archive, ArchiveMultiplexed) assert len(archive.archives) == 2 - assert isinstance(archive.archives[0], ArchiveGraph) - assert isinstance(archive.archives[1], ArchiveStorage) + assert isinstance(archive.archives[0][0], str) + assert isinstance(archive.archives[0][1], ArchiveGraph) + assert isinstance(archive.archives[1][0], str) + assert isinstance(archive.archives[1][1], ArchiveStorage)