Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/swh/provenance/api/client.py b/swh/provenance/api/client.py
index b07b495..c51a1d8 100644
--- a/swh/provenance/api/client.py
+++ b/swh/provenance/api/client.py
@@ -1,470 +1,477 @@
# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from __future__ import annotations
import functools
import inspect
import logging
import queue
import threading
import time
from types import TracebackType
from typing import Any, Dict, Iterable, Optional, Set, Tuple, Type, Union
import uuid
import pika
import pika.channel
import pika.connection
import pika.frame
import pika.spec
from swh.core.api.serializers import encode_data_client as encode_data
from swh.core.api.serializers import msgpack_loads as decode_data
from swh.core.statsd import statsd
from .. import get_provenance_storage
from ..interface import ProvenanceStorageInterface, RelationData, RelationType
from .serializers import DECODERS, ENCODERS
from .server import ProvenanceStorageRabbitMQServer
LOG_FORMAT = (
"%(levelname) -10s %(asctime)s %(name) -30s %(funcName) "
"-35s %(lineno) -5d: %(message)s"
)
LOGGER = logging.getLogger(__name__)
STORAGE_DURATION_METRIC = "swh_provenance_storage_rabbitmq_duration_seconds"
class ResponseTimeout(Exception):
pass
class TerminateSignal(Exception):
pass
def split_ranges(
data: Iterable[bytes], meth_name: str, relation: Optional[RelationType] = None
) -> Dict[str, Set[Tuple[Any, ...]]]:
ranges: Dict[str, Set[Tuple[Any, ...]]] = {}
if relation is not None:
assert isinstance(data, dict), "Relation data must be provided in a dictionary"
for src, dsts in data.items():
key = ProvenanceStorageRabbitMQServer.get_routing_key(
src, meth_name, relation
)
for rel in dsts:
assert isinstance(
rel, RelationData
), "Values in the dictionary must be RelationData structures"
ranges.setdefault(key, set()).add((src, rel.dst, rel.path))
else:
items: Union[Set[Tuple[bytes, Any]], Set[Tuple[bytes]]]
if isinstance(data, dict):
items = set(data.items())
else:
items = {(item,) for item in data}
for id, *rest in items:
key = ProvenanceStorageRabbitMQServer.get_routing_key(id, meth_name)
ranges.setdefault(key, set()).add((id, *rest))
return ranges
class MetaRabbitMQClient(type):
def __new__(cls, name, bases, attributes):
# For each method wrapped with @remote_api_endpoint in an API backend
# (eg. :class:`swh.indexer.storage.IndexerStorage`), add a new
# method in RemoteStorage, with the same documentation.
#
# Note that, despite the usage of decorator magic (eg. functools.wrap),
# this never actually calls an IndexerStorage method.
backend_class = attributes.get("backend_class", None)
for base in bases:
if backend_class is not None:
break
backend_class = getattr(base, "backend_class", None)
if backend_class:
for meth_name, meth in backend_class.__dict__.items():
if hasattr(meth, "_endpoint_path"):
cls.__add_endpoint(meth_name, meth, attributes)
return super().__new__(cls, name, bases, attributes)
@staticmethod
def __add_endpoint(meth_name, meth, attributes):
wrapped_meth = inspect.unwrap(meth)
@functools.wraps(meth) # Copy signature and doc
def meth_(*args, **kwargs):
with statsd.timed(
metric=STORAGE_DURATION_METRIC, tags={"method": meth_name}
):
# Match arguments and parameters
data = inspect.getcallargs(wrapped_meth, *args, **kwargs)
# Remove arguments that should not be passed
self = data.pop("self")
# Call storage method with remaining arguments
return getattr(self._storage, meth_name)(**data)
@functools.wraps(meth) # Copy signature and doc
def write_meth_(*args, **kwargs):
with statsd.timed(
metric=STORAGE_DURATION_METRIC, tags={"method": meth_name}
):
# Match arguments and parameters
post_data = inspect.getcallargs(wrapped_meth, *args, **kwargs)
try:
# Remove arguments that should not be passed
self = post_data.pop("self")
relation = post_data.pop("relation", None)
assert len(post_data) == 1
data = next(iter(post_data.values()))
ranges = split_ranges(data, meth_name, relation)
acks_expected = sum(len(items) for items in ranges.values())
self._correlation_id = str(uuid.uuid4())
exchange = ProvenanceStorageRabbitMQServer.get_exchange(
meth_name, relation
)
for routing_key, items in ranges.items():
items_list = list(items)
batches = (
items_list[idx : idx + self._batch_size]
for idx in range(0, len(items_list), self._batch_size)
)
for batch in batches:
# FIXME: this is running in a different thread! Hence, if
# self._connection drops, there is no guarantee that the
# request can be sent for the current elements. This
# situation should be handled properly.
self._connection.ioloop.add_callback_threadsafe(
functools.partial(
ProvenanceStorageRabbitMQClient.request,
channel=self._channel,
reply_to=self._callback_queue,
exchange=exchange,
routing_key=routing_key,
correlation_id=self._correlation_id,
data=batch,
)
)
- return self.wait_for_acks(acks_expected)
+ return self.wait_for_acks(meth_name, acks_expected)
except BaseException as ex:
self.request_termination(str(ex))
return False
if meth_name not in attributes:
attributes[meth_name] = (
write_meth_
if ProvenanceStorageRabbitMQServer.is_write_method(meth_name)
else meth_
)
class ProvenanceStorageRabbitMQClient(threading.Thread, metaclass=MetaRabbitMQClient):
backend_class = ProvenanceStorageInterface
extra_type_decoders = DECODERS
extra_type_encoders = ENCODERS
def __init__(
self,
url: str,
storage_config: Dict[str, Any],
batch_size: int = 100,
prefetch_count: int = 100,
wait_min: float = 60,
wait_per_batch: float = 10,
) -> None:
"""Setup the client object, passing in the URL we will use to connect to
RabbitMQ, and the connection information for the local storage object used
for read-only operations.
:param str url: The URL for connecting to RabbitMQ
:param dict storage_config: Configuration parameters for the underlying
``ProvenanceStorage`` object expected by
``swh.provenance.get_provenance_storage``
:param int batch_size: Max amount of elements per package (after range
splitting) for writing operations
:param int prefetch_count: Prefetch value for the RabbitMQ connection when
receiving ack packages
:param float wait_min: Min waiting time for response on a writing operation, in
seconds
:param float wait_per_batch: Waiting time for response per batch of items on a
writing operation, in seconds
"""
super().__init__()
self._connection = None
self._callback_queue: Optional[str] = None
self._channel = None
self._closing = False
self._consumer_tag = None
self._consuming = False
self._correlation_id = str(uuid.uuid4())
self._prefetch_count = prefetch_count
self._batch_size = batch_size
self._response_queue: queue.Queue = queue.Queue()
self._storage = get_provenance_storage(**storage_config)
self._url = url
self._wait_min = wait_min
self._wait_per_batch = wait_per_batch
def __enter__(self) -> ProvenanceStorageInterface:
self.open()
assert isinstance(self, ProvenanceStorageInterface)
return self
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
self.close()
@statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "open"})
def open(self) -> None:
self.start()
while self._callback_queue is None:
time.sleep(0.1)
self._storage.open()
@statsd.timed(metric=STORAGE_DURATION_METRIC, tags={"method": "close"})
def close(self) -> None:
assert self._connection is not None
self._connection.ioloop.add_callback_threadsafe(self.request_termination)
self.join()
self._storage.close()
def request_termination(self, reason: str = "Normal shutdown") -> None:
assert self._connection is not None
def termination_callback():
raise TerminateSignal(reason)
self._connection.ioloop.add_callback_threadsafe(termination_callback)
def connect(self) -> pika.SelectConnection:
LOGGER.info("Connecting to %s", self._url)
return pika.SelectConnection(
parameters=pika.URLParameters(self._url),
on_open_callback=self.on_connection_open,
on_open_error_callback=self.on_connection_open_error,
on_close_callback=self.on_connection_closed,
)
def close_connection(self) -> None:
assert self._connection is not None
self._consuming = False
if self._connection.is_closing or self._connection.is_closed:
LOGGER.info("Connection is closing or already closed")
else:
LOGGER.info("Closing connection")
self._connection.close()
def on_connection_open(self, _unused_connection: pika.SelectConnection) -> None:
LOGGER.info("Connection opened")
self.open_channel()
def on_connection_open_error(
self, _unused_connection: pika.SelectConnection, err: Exception
) -> None:
LOGGER.error("Connection open failed, reopening in 5 seconds: %s", err)
assert self._connection is not None
self._connection.ioloop.call_later(5, self._connection.ioloop.stop)
def on_connection_closed(self, _unused_connection: pika.SelectConnection, reason):
assert self._connection is not None
self._channel = None
if self._closing:
self._connection.ioloop.stop()
else:
LOGGER.warning("Connection closed, reopening in 5 seconds: %s", reason)
self._connection.ioloop.call_later(5, self._connection.ioloop.stop)
def open_channel(self) -> None:
- LOGGER.info("Creating a new channel")
+ LOGGER.debug("Creating a new channel")
assert self._connection is not None
self._connection.channel(on_open_callback=self.on_channel_open)
def on_channel_open(self, channel: pika.channel.Channel) -> None:
- LOGGER.info("Channel opened")
+ LOGGER.debug("Channel opened")
self._channel = channel
- LOGGER.info("Adding channel close callback")
+ LOGGER.debug("Adding channel close callback")
assert self._channel is not None
self._channel.add_on_close_callback(callback=self.on_channel_closed)
self.setup_queue()
def on_channel_closed(
self, channel: pika.channel.Channel, reason: Exception
) -> None:
LOGGER.warning("Channel %i was closed: %s", channel, reason)
self.close_connection()
def setup_queue(self) -> None:
- LOGGER.info("Declaring callback queue")
+ LOGGER.debug("Declaring callback queue")
assert self._channel is not None
self._channel.queue_declare(
queue="", exclusive=True, callback=self.on_queue_declare_ok
)
def on_queue_declare_ok(self, frame: pika.frame.Method) -> None:
- LOGGER.info("Binding queue to default exchanger")
+ LOGGER.debug("Binding queue to default exchanger")
assert self._channel is not None
self._callback_queue = frame.method.queue
self._channel.basic_qos(
prefetch_count=self._prefetch_count, callback=self.on_basic_qos_ok
)
def on_basic_qos_ok(self, _unused_frame: pika.frame.Method) -> None:
- LOGGER.info("QOS set to: %d", self._prefetch_count)
+ LOGGER.debug("QOS set to: %d", self._prefetch_count)
self.start_consuming()
def start_consuming(self) -> None:
- LOGGER.info("Issuing consumer related RPC commands")
- LOGGER.info("Adding consumer cancellation callback")
+ LOGGER.debug("Issuing consumer related RPC commands")
+ LOGGER.debug("Adding consumer cancellation callback")
assert self._channel is not None
self._channel.add_on_cancel_callback(callback=self.on_consumer_cancelled)
assert self._callback_queue is not None
self._consumer_tag = self._channel.basic_consume(
queue=self._callback_queue, on_message_callback=self.on_response
)
self._consuming = True
def on_consumer_cancelled(self, method_frame: pika.frame.Method) -> None:
- LOGGER.info("Consumer was cancelled remotely, shutting down: %r", method_frame)
+ LOGGER.debug("Consumer was cancelled remotely, shutting down: %r", method_frame)
if self._channel:
self._channel.close()
def on_response(
self,
channel: pika.channel.Channel,
deliver: pika.spec.Basic.Deliver,
properties: pika.spec.BasicProperties,
body: bytes,
) -> None:
LOGGER.debug(
"Received message # %s from %s: %s",
deliver.delivery_tag,
properties.app_id,
body,
)
self._response_queue.put(
(
properties.correlation_id,
decode_data(body, extra_decoders=self.extra_type_decoders),
)
)
LOGGER.debug("Acknowledging message %s", deliver.delivery_tag)
channel.basic_ack(delivery_tag=deliver.delivery_tag)
def stop_consuming(self) -> None:
if self._channel:
- LOGGER.info("Sending a Basic.Cancel RPC command to RabbitMQ")
+ LOGGER.debug("Sending a Basic.Cancel RPC command to RabbitMQ")
self._channel.basic_cancel(self._consumer_tag, self.on_cancel_ok)
def on_cancel_ok(self, _unused_frame: pika.frame.Method) -> None:
self._consuming = False
- LOGGER.info(
+ LOGGER.debug(
"RabbitMQ acknowledged the cancellation of the consumer: %s",
self._consumer_tag,
)
- LOGGER.info("Closing the channel")
+ LOGGER.debug("Closing the channel")
assert self._channel is not None
self._channel.close()
def run(self) -> None:
while not self._closing:
try:
self._connection = self.connect()
assert self._connection is not None
self._connection.ioloop.start()
except KeyboardInterrupt:
LOGGER.info("Connection closed by keyboard interruption, reopening")
if self._connection is not None:
self._connection.ioloop.stop()
except TerminateSignal as ex:
LOGGER.info("Termination requested: %s", ex)
self.stop()
if self._connection is not None and not self._connection.is_closed:
# Finish closing
self._connection.ioloop.start()
except BaseException as ex:
LOGGER.warning("Unexpected exception, terminating: %s", ex)
self.stop()
if self._connection is not None and not self._connection.is_closed:
# Finish closing
self._connection.ioloop.start()
LOGGER.info("Stopped")
def stop(self) -> None:
assert self._connection is not None
if not self._closing:
self._closing = True
LOGGER.info("Stopping")
if self._consuming:
self.stop_consuming()
self._connection.ioloop.start()
else:
self._connection.ioloop.stop()
LOGGER.info("Stopped")
@staticmethod
def request(
channel: pika.channel.Channel,
reply_to: str,
exchange: str,
routing_key: str,
correlation_id: str,
**kwargs,
) -> None:
channel.basic_publish(
exchange=exchange,
routing_key=routing_key,
properties=pika.BasicProperties(
content_type="application/msgpack",
correlation_id=correlation_id,
reply_to=reply_to,
),
body=encode_data(
kwargs,
extra_encoders=ProvenanceStorageRabbitMQClient.extra_type_encoders,
),
)
- def wait_for_acks(self, acks_expected: int) -> bool:
+ def wait_for_acks(self, meth_name: str, acks_expected: int) -> bool:
acks_received = 0
timeout = max(
(acks_expected / self._batch_size) * self._wait_per_batch,
self._wait_min,
)
+ start = time.monotonic()
+ end = start + timeout
while acks_received < acks_expected:
+ local_timeout = end - time.monotonic()
+ if local_timeout < 1.0:
+ local_timeout = 1.0
try:
- acks_received += self.wait_for_response(timeout=timeout)
+ acks_received += self.wait_for_response(timeout=local_timeout)
except ResponseTimeout:
LOGGER.warning(
- "Timed out waiting for acks, %s received, %s expected",
+ "Timed out waiting for acks in %s, %s received, %s expected (in %ss)",
+ meth_name,
acks_received,
acks_expected,
+ time.monotonic() - start,
)
return False
return acks_received == acks_expected
def wait_for_response(self, timeout: float = 60.0) -> Any:
while True:
try:
correlation_id, response = self._response_queue.get(timeout=timeout)
if correlation_id == self._correlation_id:
return response
except queue.Empty:
raise ResponseTimeout
diff --git a/swh/provenance/provenance.py b/swh/provenance/provenance.py
index 8032b6b..f9fbc55 100644
--- a/swh/provenance/provenance.py
+++ b/swh/provenance/provenance.py
@@ -1,519 +1,519 @@
# Copyright (C) 2021-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from datetime import datetime
import logging
import os
from types import TracebackType
from typing import Dict, Generator, Iterable, Optional, Set, Tuple, Type
from typing_extensions import Literal, TypedDict
from swh.core.statsd import statsd
from swh.model.model import Sha1Git
from .interface import (
DirectoryData,
ProvenanceInterface,
ProvenanceResult,
ProvenanceStorageInterface,
RelationData,
RelationType,
RevisionData,
)
from .model import DirectoryEntry, FileEntry, OriginEntry, RevisionEntry
from .util import path_normalize
LOGGER = logging.getLogger(__name__)
BACKEND_DURATION_METRIC = "swh_provenance_backend_duration_seconds"
BACKEND_OPERATIONS_METRIC = "swh_provenance_backend_operations_total"
class DatetimeCache(TypedDict):
data: Dict[Sha1Git, Optional[datetime]] # None means unknown
added: Set[Sha1Git]
class OriginCache(TypedDict):
data: Dict[Sha1Git, str]
added: Set[Sha1Git]
class RevisionCache(TypedDict):
data: Dict[Sha1Git, Sha1Git]
added: Set[Sha1Git]
class ProvenanceCache(TypedDict):
content: DatetimeCache
directory: DatetimeCache
directory_flatten: Dict[Sha1Git, Optional[bool]] # None means unknown
revision: DatetimeCache
# below are insertion caches only
content_in_revision: Set[Tuple[Sha1Git, Sha1Git, bytes]]
content_in_directory: Set[Tuple[Sha1Git, Sha1Git, bytes]]
directory_in_revision: Set[Tuple[Sha1Git, Sha1Git, bytes]]
# these two are for the origin layer
origin: OriginCache
revision_origin: RevisionCache
revision_before_revision: Dict[Sha1Git, Set[Sha1Git]]
revision_in_origin: Set[Tuple[Sha1Git, Sha1Git]]
def new_cache() -> ProvenanceCache:
return ProvenanceCache(
content=DatetimeCache(data={}, added=set()),
directory=DatetimeCache(data={}, added=set()),
directory_flatten={},
revision=DatetimeCache(data={}, added=set()),
content_in_revision=set(),
content_in_directory=set(),
directory_in_revision=set(),
origin=OriginCache(data={}, added=set()),
revision_origin=RevisionCache(data={}, added=set()),
revision_before_revision={},
revision_in_origin=set(),
)
class Provenance:
MAX_CACHE_ELEMENTS = 100000
def __init__(self, storage: ProvenanceStorageInterface) -> None:
self.storage = storage
self.cache = new_cache()
def __enter__(self) -> ProvenanceInterface:
self.open()
return self
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
self.close()
def _flush_limit_reached(self) -> bool:
return max(self._get_cache_stats().values()) > self.MAX_CACHE_ELEMENTS
def _get_cache_stats(self) -> Dict[str, int]:
return {
k: len(v["data"])
if (isinstance(v, dict) and v.get("data") is not None)
else len(v) # type: ignore
for (k, v) in self.cache.items()
}
def clear_caches(self) -> None:
self.cache = new_cache()
def close(self) -> None:
self.storage.close()
@statsd.timed(metric=BACKEND_DURATION_METRIC, tags={"method": "flush"})
def flush(self) -> None:
self.flush_revision_content_layer()
self.flush_origin_revision_layer()
self.clear_caches()
def flush_if_necessary(self) -> bool:
"""Flush if the number of cached information reached a limit."""
- LOGGER.info("Cache stats: %s", self._get_cache_stats())
+ LOGGER.debug("Cache stats: %s", self._get_cache_stats())
if self._flush_limit_reached():
self.flush()
return True
else:
return False
@statsd.timed(
metric=BACKEND_DURATION_METRIC, tags={"method": "flush_origin_revision"}
)
def flush_origin_revision_layer(self) -> None:
# Origins and revisions should be inserted first so that internal ids'
# resolution works properly.
urls = {
sha1: url
for sha1, url in self.cache["origin"]["data"].items()
if sha1 in self.cache["origin"]["added"]
}
if urls:
while not self.storage.origin_add(urls):
statsd.increment(
metric=BACKEND_OPERATIONS_METRIC,
tags={"method": "flush_origin_revision_retry_origin"},
)
LOGGER.warning(
"Unable to write origins urls to the storage. Retrying..."
)
rev_orgs = {
# Destinations in this relation should match origins in the next one
**{
src: RevisionData(date=None, origin=None)
for src in self.cache["revision_before_revision"]
},
**{
# This relation comes second so that non-None origins take precedence
src: RevisionData(date=None, origin=org)
for src, org in self.cache["revision_in_origin"]
},
}
if rev_orgs:
while not self.storage.revision_add(rev_orgs):
statsd.increment(
metric=BACKEND_OPERATIONS_METRIC,
tags={"method": "flush_origin_revision_retry_revision"},
)
LOGGER.warning(
"Unable to write revision entities to the storage. Retrying..."
)
# Second, flat models for revisions' histories (ie. revision-before-revision).
if self.cache["revision_before_revision"]:
rev_before_rev = {
src: {RelationData(dst=dst, path=None) for dst in dsts}
for src, dsts in self.cache["revision_before_revision"].items()
}
while not self.storage.relation_add(
RelationType.REV_BEFORE_REV, rev_before_rev
):
statsd.increment(
metric=BACKEND_OPERATIONS_METRIC,
tags={
"method": "flush_origin_revision_retry_revision_before_revision"
},
)
LOGGER.warning(
"Unable to write %s rows to the storage. Retrying...",
RelationType.REV_BEFORE_REV,
)
# Heads (ie. revision-in-origin entries) should be inserted once flat models for
# their histories were already added. This is to guarantee consistent results if
# something needs to be reprocessed due to a failure: already inserted heads
# won't get reprocessed in such a case.
if self.cache["revision_in_origin"]:
rev_in_org: Dict[Sha1Git, Set[RelationData]] = {}
for src, dst in self.cache["revision_in_origin"]:
rev_in_org.setdefault(src, set()).add(RelationData(dst=dst, path=None))
while not self.storage.relation_add(RelationType.REV_IN_ORG, rev_in_org):
statsd.increment(
metric=BACKEND_OPERATIONS_METRIC,
tags={"method": "flush_origin_revision_retry_revision_in_origin"},
)
LOGGER.warning(
"Unable to write %s rows to the storage. Retrying...",
RelationType.REV_IN_ORG,
)
@statsd.timed(
metric=BACKEND_DURATION_METRIC, tags={"method": "flush_revision_content"}
)
def flush_revision_content_layer(self) -> None:
# Register in the storage all entities, to ensure the coming relations can
# properly resolve any internal reference if needed. Content and directory
# entries may safely be registered with their associated dates. In contrast,
# revision entries should be registered without date, as it is used to
# acknowledge that the flushing was successful. Also, directories are
# registered with their flatten flag not set.
cnt_dates = {
sha1: date
for sha1, date in self.cache["content"]["data"].items()
if sha1 in self.cache["content"]["added"] and date is not None
}
if cnt_dates:
while not self.storage.content_add(cnt_dates):
statsd.increment(
metric=BACKEND_OPERATIONS_METRIC,
tags={"method": "flush_revision_content_retry_content_date"},
)
LOGGER.warning(
"Unable to write content dates to the storage. Retrying..."
)
dir_dates = {
sha1: DirectoryData(date=date, flat=False)
for sha1, date in self.cache["directory"]["data"].items()
if sha1 in self.cache["directory"]["added"] and date is not None
}
if dir_dates:
while not self.storage.directory_add(dir_dates):
statsd.increment(
metric=BACKEND_OPERATIONS_METRIC,
tags={"method": "flush_revision_content_retry_directory_date"},
)
LOGGER.warning(
"Unable to write directory dates to the storage. Retrying..."
)
revs = {
sha1
for sha1, date in self.cache["revision"]["data"].items()
if sha1 in self.cache["revision"]["added"] and date is not None
}
if revs:
while not self.storage.revision_add(revs):
statsd.increment(
metric=BACKEND_OPERATIONS_METRIC,
tags={"method": "flush_revision_content_retry_revision_none"},
)
LOGGER.warning(
"Unable to write revision entities to the storage. Retrying..."
)
paths = {
path
for _, _, path in self.cache["content_in_revision"]
| self.cache["content_in_directory"]
| self.cache["directory_in_revision"]
}
if paths:
while not self.storage.location_add(paths):
statsd.increment(
metric=BACKEND_OPERATIONS_METRIC,
tags={"method": "flush_revision_content_retry_location"},
)
LOGGER.warning(
"Unable to write locations entities to the storage. Retrying..."
)
# For this layer, relations need to be inserted first so that, in case of
# failure, reprocessing the input does not generated an inconsistent database.
if self.cache["content_in_revision"]:
cnt_in_rev: Dict[Sha1Git, Set[RelationData]] = {}
for src, dst, path in self.cache["content_in_revision"]:
cnt_in_rev.setdefault(src, set()).add(RelationData(dst=dst, path=path))
while not self.storage.relation_add(
RelationType.CNT_EARLY_IN_REV, cnt_in_rev
):
statsd.increment(
metric=BACKEND_OPERATIONS_METRIC,
tags={"method": "flush_revision_content_retry_content_in_revision"},
)
LOGGER.warning(
"Unable to write %s rows to the storage. Retrying...",
RelationType.CNT_EARLY_IN_REV,
)
if self.cache["content_in_directory"]:
cnt_in_dir: Dict[Sha1Git, Set[RelationData]] = {}
for src, dst, path in self.cache["content_in_directory"]:
cnt_in_dir.setdefault(src, set()).add(RelationData(dst=dst, path=path))
while not self.storage.relation_add(RelationType.CNT_IN_DIR, cnt_in_dir):
statsd.increment(
metric=BACKEND_OPERATIONS_METRIC,
tags={
"method": "flush_revision_content_retry_content_in_directory"
},
)
LOGGER.warning(
"Unable to write %s rows to the storage. Retrying...",
RelationType.CNT_IN_DIR,
)
if self.cache["directory_in_revision"]:
dir_in_rev: Dict[Sha1Git, Set[RelationData]] = {}
for src, dst, path in self.cache["directory_in_revision"]:
dir_in_rev.setdefault(src, set()).add(RelationData(dst=dst, path=path))
while not self.storage.relation_add(RelationType.DIR_IN_REV, dir_in_rev):
statsd.increment(
metric=BACKEND_OPERATIONS_METRIC,
tags={
"method": "flush_revision_content_retry_directory_in_revision"
},
)
LOGGER.warning(
"Unable to write %s rows to the storage. Retrying...",
RelationType.DIR_IN_REV,
)
# After relations, flatten flags for directories can be safely set (if
# applicable) acknowledging those directories that have already be flattened.
# Similarly, dates for the revisions are set to acknowledge that these revisions
# won't need to be reprocessed in case of failure.
dir_acks = {
sha1: DirectoryData(
date=date, flat=self.cache["directory_flatten"].get(sha1) or False
)
for sha1, date in self.cache["directory"]["data"].items()
if self.cache["directory_flatten"].get(sha1) and date is not None
}
if dir_acks:
while not self.storage.directory_add(dir_acks):
statsd.increment(
metric=BACKEND_OPERATIONS_METRIC,
tags={"method": "flush_revision_content_retry_directory_ack"},
)
LOGGER.warning(
"Unable to write directory dates to the storage. Retrying..."
)
rev_dates = {
sha1: RevisionData(date=date, origin=None)
for sha1, date in self.cache["revision"]["data"].items()
if sha1 in self.cache["revision"]["added"] and date is not None
}
if rev_dates:
while not self.storage.revision_add(rev_dates):
statsd.increment(
metric=BACKEND_OPERATIONS_METRIC,
tags={"method": "flush_revision_content_retry_revision_date"},
)
LOGGER.warning(
"Unable to write revision dates to the storage. Retrying..."
)
def content_add_to_directory(
self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes
) -> None:
self.cache["content_in_directory"].add(
(blob.id, directory.id, path_normalize(os.path.join(prefix, blob.name)))
)
def content_add_to_revision(
self, revision: RevisionEntry, blob: FileEntry, prefix: bytes
) -> None:
self.cache["content_in_revision"].add(
(blob.id, revision.id, path_normalize(os.path.join(prefix, blob.name)))
)
def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]:
return self.storage.content_find_first(id)
def content_find_all(
self, id: Sha1Git, limit: Optional[int] = None
) -> Generator[ProvenanceResult, None, None]:
yield from self.storage.content_find_all(id, limit=limit)
def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]:
return self.get_dates("content", [blob.id]).get(blob.id)
def content_get_early_dates(
self, blobs: Iterable[FileEntry]
) -> Dict[Sha1Git, datetime]:
return self.get_dates("content", [blob.id for blob in blobs])
def content_set_early_date(self, blob: FileEntry, date: datetime) -> None:
self.cache["content"]["data"][blob.id] = date
self.cache["content"]["added"].add(blob.id)
def directory_add_to_revision(
self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes
) -> None:
self.cache["directory_in_revision"].add(
(directory.id, revision.id, path_normalize(path))
)
def directory_already_flattenned(self, directory: DirectoryEntry) -> Optional[bool]:
cache = self.cache["directory_flatten"]
if directory.id not in cache:
cache.setdefault(directory.id, None)
ret = self.storage.directory_get([directory.id])
if directory.id in ret:
dir = ret[directory.id]
cache[directory.id] = dir.flat
# date is kept to ensure we have it available when flushing
self.cache["directory"]["data"][directory.id] = dir.date
return cache.get(directory.id)
def directory_flag_as_flattenned(self, directory: DirectoryEntry) -> None:
self.cache["directory_flatten"][directory.id] = True
def directory_get_date_in_isochrone_frontier(
self, directory: DirectoryEntry
) -> Optional[datetime]:
return self.get_dates("directory", [directory.id]).get(directory.id)
def directory_get_dates_in_isochrone_frontier(
self, dirs: Iterable[DirectoryEntry]
) -> Dict[Sha1Git, datetime]:
return self.get_dates("directory", [directory.id for directory in dirs])
def directory_set_date_in_isochrone_frontier(
self, directory: DirectoryEntry, date: datetime
) -> None:
self.cache["directory"]["data"][directory.id] = date
self.cache["directory"]["added"].add(directory.id)
def get_dates(
self,
entity: Literal["content", "directory", "revision"],
ids: Iterable[Sha1Git],
) -> Dict[Sha1Git, datetime]:
cache = self.cache[entity]
missing_ids = set(id for id in ids if id not in cache)
if missing_ids:
if entity == "content":
cache["data"].update(self.storage.content_get(missing_ids))
elif entity == "directory":
cache["data"].update(
{
id: dir.date
for id, dir in self.storage.directory_get(missing_ids).items()
}
)
elif entity == "revision":
cache["data"].update(
{
id: rev.date
for id, rev in self.storage.revision_get(missing_ids).items()
}
)
dates: Dict[Sha1Git, datetime] = {}
for sha1 in ids:
date = cache["data"].setdefault(sha1, None)
if date is not None:
dates[sha1] = date
return dates
def open(self) -> None:
self.storage.open()
def origin_add(self, origin: OriginEntry) -> None:
self.cache["origin"]["data"][origin.id] = origin.url
self.cache["origin"]["added"].add(origin.id)
def revision_add(self, revision: RevisionEntry) -> None:
self.cache["revision"]["data"][revision.id] = revision.date
self.cache["revision"]["added"].add(revision.id)
def revision_add_before_revision(
self, head: RevisionEntry, revision: RevisionEntry
) -> None:
self.cache["revision_before_revision"].setdefault(revision.id, set()).add(
head.id
)
def revision_add_to_origin(
self, origin: OriginEntry, revision: RevisionEntry
) -> None:
self.cache["revision_in_origin"].add((revision.id, origin.id))
def revision_is_head(self, revision: RevisionEntry) -> bool:
return bool(self.storage.relation_get(RelationType.REV_IN_ORG, [revision.id]))
def revision_get_date(self, revision: RevisionEntry) -> Optional[datetime]:
return self.get_dates("revision", [revision.id]).get(revision.id)
def revision_get_preferred_origin(
self, revision: RevisionEntry
) -> Optional[Sha1Git]:
cache = self.cache["revision_origin"]["data"]
if revision.id not in cache:
ret = self.storage.revision_get([revision.id])
if revision.id in ret:
origin = ret[revision.id].origin
if origin is not None:
cache[revision.id] = origin
return cache.get(revision.id)
def revision_set_preferred_origin(
self, origin: OriginEntry, revision: RevisionEntry
) -> None:
self.cache["revision_origin"]["data"][revision.id] = origin.id
self.cache["revision_origin"]["added"].add(revision.id)
diff --git a/swh/provenance/revision.py b/swh/provenance/revision.py
index 3fbfc57..4011414 100644
--- a/swh/provenance/revision.py
+++ b/swh/provenance/revision.py
@@ -1,199 +1,211 @@
# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from datetime import datetime, timezone
+import logging
from typing import Generator, Iterable, Iterator, List, Optional, Tuple
from swh.core.statsd import statsd
from swh.model.model import Sha1Git
from .archive import ArchiveInterface
from .directory import directory_flatten
from .graph import IsochroneNode, build_isochrone_graph
from .interface import ProvenanceInterface
from .model import DirectoryEntry, RevisionEntry
REVISION_DURATION_METRIC = "swh_provenance_revision_content_layer_duration_seconds"
+logger = logging.getLogger(__name__)
+
class CSVRevisionIterator:
"""Iterator over revisions typically present in the given CSV file.
The input is an iterator that produces 3 elements per row:
(id, date, root)
where:
- id: is the id (sha1_git) of the revision
- date: is the author date
- root: sha1 of the directory
"""
def __init__(
self,
revisions: Iterable[Tuple[Sha1Git, datetime, Sha1Git]],
limit: Optional[int] = None,
) -> None:
self.revisions: Iterator[Tuple[Sha1Git, datetime, Sha1Git]]
if limit is not None:
from itertools import islice
self.revisions = islice(revisions, limit)
else:
self.revisions = iter(revisions)
def __iter__(self) -> Generator[RevisionEntry, None, None]:
for id, date, root in self.revisions:
if date.tzinfo is None:
date = date.replace(tzinfo=timezone.utc)
yield RevisionEntry(id, date=date, root=root)
@statsd.timed(metric=REVISION_DURATION_METRIC, tags={"method": "main"})
def revision_add(
provenance: ProvenanceInterface,
archive: ArchiveInterface,
revisions: List[RevisionEntry],
trackall: bool = True,
flatten: bool = True,
lower: bool = True,
mindepth: int = 1,
minsize: int = 0,
commit: bool = True,
) -> None:
for revision in revisions:
assert revision.date is not None
assert revision.root is not None
# Processed content starting from the revision's root directory.
date = provenance.revision_get_date(revision)
if date is None or revision.date < date:
+ logger.debug(
+ "Processing revision %s on %s (root %s)",
+ revision.id.hex(),
+ revision.date,
+ revision.root.hex(),
+ )
+ logger.debug("provenance date: %s, building isochrone graph", date)
graph = build_isochrone_graph(
provenance,
archive,
revision,
DirectoryEntry(revision.root),
minsize=minsize,
)
+ logger.debug("isochrone graph built, processing content")
revision_process_content(
provenance,
archive,
revision,
graph,
trackall=trackall,
flatten=flatten,
lower=lower,
mindepth=mindepth,
minsize=minsize,
)
if commit:
+ logger.debug("flushing batch")
provenance.flush()
@statsd.timed(metric=REVISION_DURATION_METRIC, tags={"method": "process_content"})
def revision_process_content(
provenance: ProvenanceInterface,
archive: ArchiveInterface,
revision: RevisionEntry,
graph: IsochroneNode,
trackall: bool = True,
flatten: bool = True,
lower: bool = True,
mindepth: int = 1,
minsize: int = 0,
) -> None:
assert revision.date is not None
provenance.revision_add(revision)
stack = [graph]
while stack:
current = stack.pop()
if current.dbdate is not None:
assert current.dbdate < revision.date
if trackall:
# Current directory is an outer isochrone frontier for a previously
# processed revision. It should be reused as is.
provenance.directory_add_to_revision(
revision, current.entry, current.path
)
else:
assert current.maxdate is not None
# Current directory is not an outer isochrone frontier for any previous
# revision. It might be eligible for this one.
if is_new_frontier(
current,
revision=revision,
lower=lower,
mindepth=mindepth,
):
# Outer frontier should be moved to current position in the isochrone
# graph. This is the first time this directory is found in the isochrone
# frontier.
provenance.directory_set_date_in_isochrone_frontier(
current.entry, current.maxdate
)
if trackall:
provenance.directory_add_to_revision(
revision, current.entry, current.path
)
if flatten:
directory_flatten(
provenance, archive, current.entry, minsize=minsize
)
else:
# If current node is an invalidated frontier, update its date for future
# revisions to get the proper value.
if current.invalid:
provenance.directory_set_date_in_isochrone_frontier(
current.entry, revision.date
)
# No point moving the frontier here. Either there are no files or they
# are being seen for the first time here. Add all blobs to current
# revision updating date if necessary, and recursively analyse
# subdirectories as candidates to the outer frontier.
for blob in current.entry.files:
date = provenance.content_get_early_date(blob)
if date is None or revision.date < date:
provenance.content_set_early_date(blob, revision.date)
provenance.content_add_to_revision(revision, blob, current.path)
for child in current.children:
stack.append(child)
def is_new_frontier(
node: IsochroneNode,
revision: RevisionEntry,
lower: bool = True,
mindepth: int = 1,
) -> bool:
assert node.maxdate is not None # for mypy
assert revision.date is not None # idem
# We want to ensure that all first occurrences end up in the content_early_in_rev
# relation. Thus, we force for every blob outside a frontier to have an strictly
# earlier date.
return (
node.maxdate < revision.date # all content is earlier than revision
and node.depth >= mindepth # deeper than the min allowed depth
and (has_blobs(node) if lower else True) # there is at least one blob
)
def has_blobs(node: IsochroneNode) -> bool:
# We may want to look for files in different ways to decide whether to define a
# frontier or not:
# 1. Only files in current node:
return any(node.entry.files)
# 2. Files anywhere in the isochrone graph
# stack = [node]
# while stack:
# current = stack.pop()
# if any(current.entry.files):
# return True
# else:
# # All children are directory entries.
# stack.extend(current.children)
# return False
# 3. Files in the intermediate directories between current node and any previously
# defined frontier:
# TODO: complete this case!

File Metadata

Mime Type
text/x-diff
Expires
Fri, Jul 4, 12:17 PM (2 w, 3 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3276635

Event Timeline