Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9343145
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
81 KB
Subscribers
None
View Options
diff --git a/swh/provenance/cli.py b/swh/provenance/cli.py
index 8dc31f2..ad0d042 100644
--- a/swh/provenance/cli.py
+++ b/swh/provenance/cli.py
@@ -1,255 +1,292 @@
# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
# WARNING: do not import unnecessary things here to keep cli startup time under
# control
from datetime import datetime, timezone
import os
from typing import Any, Dict, Generator, Optional, Tuple
import click
import iso8601
import yaml
from swh.core import config
from swh.core.cli import CONTEXT_SETTINGS
from swh.core.cli import swh as swh_cli_group
from swh.model.hashutil import hash_to_bytes, hash_to_hex
from swh.model.model import Sha1Git
# All generic config code should reside in swh.core.config
CONFIG_ENVVAR = "SWH_CONFIG_FILENAME"
DEFAULT_PATH = os.environ.get(CONFIG_ENVVAR, None)
DEFAULT_CONFIG: Dict[str, Any] = {
"provenance": {
"archive": {
# Storage API based Archive object
# "cls": "api",
# "storage": {
# "cls": "remote",
# "url": "http://uffizi.internal.softwareheritage.org:5002",
# }
# Direct access Archive object
"cls": "direct",
"db": {
"host": "belvedere.internal.softwareheritage.org",
"port": 5432,
"dbname": "softwareheritage",
"user": "guest",
},
},
"storage": {
# Local PostgreSQL Storage
# "cls": "postgresql",
# "db": {
# "host": "localhost",
# "user": "postgres",
# "password": "postgres",
# "dbname": "provenance",
# },
# Local MongoDB Storage
# "cls": "mongodb",
# "db": {
# "dbname": "provenance",
# },
# Remote RabbitMQ/PostgreSQL Storage
"cls": "rabbitmq",
"url": "amqp://localhost:5672/%2f",
"storage_config": {
"cls": "postgresql",
"db": {
"host": "localhost",
"user": "postgres",
"password": "postgres",
"dbname": "provenance",
},
},
"batch_size": 100,
"prefetch_count": 100,
},
}
}
CONFIG_FILE_HELP = f"""
\b Configuration can be loaded from a yaml file given either as --config-file
option or the {CONFIG_ENVVAR} environment variable. If no configuration file
is specified, use the following default configuration::
\b
{yaml.dump(DEFAULT_CONFIG)}"""
PROVENANCE_HELP = f"""Software Heritage provenance index database tools
{CONFIG_FILE_HELP}
"""
@swh_cli_group.group(
name="provenance", context_settings=CONTEXT_SETTINGS, help=PROVENANCE_HELP
)
@click.option(
"-C",
"--config-file",
default=None,
type=click.Path(exists=True, dir_okay=False, path_type=str),
help="""YAML configuration file.""",
)
@click.option(
"-P",
"--profile",
default=None,
type=click.Path(exists=False, dir_okay=False, path_type=str),
help="""Enable profiling to specified file.""",
)
@click.pass_context
def cli(ctx: click.core.Context, config_file: Optional[str], profile: str) -> None:
if (
config_file is None
and DEFAULT_PATH is not None
and config.config_exists(DEFAULT_PATH)
):
config_file = DEFAULT_PATH
if config_file is None:
conf = DEFAULT_CONFIG
else:
# read_raw_config do not fail on ENOENT
if not os.path.exists(config_file):
raise FileNotFoundError(config_file)
conf = yaml.safe_load(open(config_file, "rb"))
ctx.ensure_object(dict)
ctx.obj["config"] = conf
if profile:
import atexit
import cProfile
print("Profiling...")
pr = cProfile.Profile()
pr.enable()
def exit() -> None:
pr.disable()
pr.dump_stats(profile)
atexit.register(exit)
+@cli.command(name="iter-frontiers")
+@click.argument("filename")
+@click.option("-l", "--limit", type=int)
+@click.option("-s", "--min-size", default=0, type=int)
+@click.pass_context
+def iter_frontiers(
+ ctx: click.core.Context,
+ filename: str,
+ limit: Optional[int],
+ min_size: int,
+) -> None:
+ """Process a provided list of directories in the isochrone frontier."""
+ from . import get_archive, get_provenance
+ from .directory import CSVDirectoryIterator, directory_add
+
+ archive = get_archive(**ctx.obj["config"]["provenance"]["archive"])
+ directories_provider = generate_directory_ids(filename)
+ directories = CSVDirectoryIterator(directories_provider, limit=limit)
+
+ with get_provenance(**ctx.obj["config"]["provenance"]["storage"]) as provenance:
+ for directory in directories:
+ directory_add(
+ provenance,
+ archive,
+ [directory],
+ minsize=min_size,
+ )
+
+
+def generate_directory_ids(
+ filename: str,
+) -> Generator[Sha1Git, None, None]:
+ for line in open(filename, "r"):
+ if line.strip():
+ yield hash_to_bytes(line.strip())
+
+
@cli.command(name="iter-revisions")
@click.argument("filename")
@click.option("-a", "--track-all", default=True, type=bool)
@click.option("-l", "--limit", type=int)
@click.option("-m", "--min-depth", default=1, type=int)
@click.option("-r", "--reuse", default=True, type=bool)
@click.option("-s", "--min-size", default=0, type=int)
@click.pass_context
def iter_revisions(
ctx: click.core.Context,
filename: str,
track_all: bool,
limit: Optional[int],
min_depth: int,
reuse: bool,
min_size: int,
) -> None:
"""Process a provided list of revisions."""
from . import get_archive, get_provenance
from .revision import CSVRevisionIterator, revision_add
archive = get_archive(**ctx.obj["config"]["provenance"]["archive"])
revisions_provider = generate_revision_tuples(filename)
revisions = CSVRevisionIterator(revisions_provider, limit=limit)
with get_provenance(**ctx.obj["config"]["provenance"]["storage"]) as provenance:
for revision in revisions:
revision_add(
provenance,
archive,
[revision],
trackall=track_all,
lower=reuse,
mindepth=min_depth,
minsize=min_size,
)
def generate_revision_tuples(
filename: str,
) -> Generator[Tuple[Sha1Git, datetime, Sha1Git], None, None]:
for line in open(filename, "r"):
if line.strip():
revision, date, root = line.strip().split(",")
yield (
hash_to_bytes(revision),
iso8601.parse_date(date, default_timezone=timezone.utc),
hash_to_bytes(root),
)
@cli.command(name="iter-origins")
@click.argument("filename")
@click.option("-l", "--limit", type=int)
@click.pass_context
def iter_origins(ctx: click.core.Context, filename: str, limit: Optional[int]) -> None:
"""Process a provided list of origins."""
from . import get_archive, get_provenance
from .origin import CSVOriginIterator, origin_add
archive = get_archive(**ctx.obj["config"]["provenance"]["archive"])
origins_provider = generate_origin_tuples(filename)
origins = CSVOriginIterator(origins_provider, limit=limit)
with get_provenance(**ctx.obj["config"]["provenance"]["storage"]) as provenance:
for origin in origins:
origin_add(provenance, archive, [origin])
def generate_origin_tuples(filename: str) -> Generator[Tuple[str, bytes], None, None]:
for line in open(filename, "r"):
if line.strip():
url, snapshot = line.strip().split(",")
yield (url, hash_to_bytes(snapshot))
@cli.command(name="find-first")
@click.argument("swhid")
@click.pass_context
def find_first(ctx: click.core.Context, swhid: str) -> None:
"""Find first occurrence of the requested blob."""
from . import get_provenance
with get_provenance(**ctx.obj["config"]["provenance"]["storage"]) as provenance:
occur = provenance.content_find_first(hash_to_bytes(swhid))
if occur is not None:
print(
f"swh:1:cnt:{hash_to_hex(occur.content)}, "
f"swh:1:rev:{hash_to_hex(occur.revision)}, "
f"{occur.date}, "
f"{occur.origin}, "
f"{os.fsdecode(occur.path)}"
)
else:
print(f"Cannot find a content with the id {swhid}")
@cli.command(name="find-all")
@click.argument("swhid")
@click.option("-l", "--limit", type=int)
@click.pass_context
def find_all(ctx: click.core.Context, swhid: str, limit: Optional[int]) -> None:
"""Find all occurrences of the requested blob."""
from . import get_provenance
with get_provenance(**ctx.obj["config"]["provenance"]["storage"]) as provenance:
for occur in provenance.content_find_all(hash_to_bytes(swhid), limit=limit):
print(
f"swh:1:cnt:{hash_to_hex(occur.content)}, "
f"swh:1:rev:{hash_to_hex(occur.revision)}, "
f"{occur.date}, "
f"{occur.origin}, "
f"{os.fsdecode(occur.path)}"
)
diff --git a/swh/provenance/directory.py b/swh/provenance/directory.py
new file mode 100644
index 0000000..5430454
--- /dev/null
+++ b/swh/provenance/directory.py
@@ -0,0 +1,86 @@
+# Copyright (C) 2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import os
+from typing import Generator, Iterable, Iterator, List, Optional
+
+from swh.core.statsd import statsd
+from swh.model.model import Sha1Git
+
+from .archive import ArchiveInterface
+from .interface import ProvenanceInterface
+from .model import DirectoryEntry
+
+REVISION_DURATION_METRIC = "swh_provenance_directory_duration_seconds"
+
+
+class CSVDirectoryIterator:
+ """Iterator over directories typically present in the given CSV file.
+
+ The input is an iterator that produces ids (sha1_git) of directories
+ """
+
+ def __init__(
+ self,
+ directories: Iterable[Sha1Git],
+ limit: Optional[int] = None,
+ ) -> None:
+ self.directories: Iterator[Sha1Git]
+ if limit is not None:
+ from itertools import islice
+
+ self.directories = islice(directories, limit)
+ else:
+ self.directories = iter(directories)
+
+ def __iter__(self) -> Generator[DirectoryEntry, None, None]:
+ for id in self.directories:
+ yield DirectoryEntry(id)
+
+
+@statsd.timed(metric=REVISION_DURATION_METRIC, tags={"method": "main"})
+def directory_add(
+ provenance: ProvenanceInterface,
+ archive: ArchiveInterface,
+ directories: List[DirectoryEntry],
+ minsize: int = 0,
+ commit: bool = True,
+) -> None:
+ for directory in directories:
+ # Only flatten directories that are present in the provenance model, but not
+ # flattenned yet.
+ flattenned = provenance.directory_already_flattenned(directory)
+ if flattenned is not None and not flattenned:
+ directory_flatten(
+ provenance,
+ archive,
+ directory,
+ minsize=minsize,
+ )
+ if commit:
+ provenance.flush()
+
+
+@statsd.timed(metric=REVISION_DURATION_METRIC, tags={"method": "flatten"})
+def directory_flatten(
+ provenance: ProvenanceInterface,
+ archive: ArchiveInterface,
+ directory: DirectoryEntry,
+ minsize: int = 0,
+) -> None:
+ """Recursively retrieve all the files of 'directory' and insert them in the
+ 'provenance' database in the 'content_to_directory' table.
+ """
+ stack = [(directory, b"")]
+ while stack:
+ current, prefix = stack.pop()
+ current.retrieve_children(archive, minsize=minsize)
+ for f_child in current.files:
+ # Add content to the directory with the computed prefix.
+ provenance.content_add_to_directory(directory, f_child, prefix)
+ for d_child in current.dirs:
+ # Recursively walk the child directory.
+ stack.append((d_child, os.path.join(prefix, d_child.name)))
+ provenance.directory_flag_as_flattenned(directory)
diff --git a/swh/provenance/interface.py b/swh/provenance/interface.py
index 7b5ba3f..78a7279 100644
--- a/swh/provenance/interface.py
+++ b/swh/provenance/interface.py
@@ -1,384 +1,396 @@
# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime
import enum
from types import TracebackType
from typing import Dict, Generator, Iterable, Optional, Set, Type, Union
from typing_extensions import Protocol, runtime_checkable
from swh.core.api import remote_api_endpoint
from swh.model.model import Sha1Git
from .model import DirectoryEntry, FileEntry, OriginEntry, RevisionEntry
class EntityType(enum.Enum):
CONTENT = "content"
DIRECTORY = "directory"
REVISION = "revision"
ORIGIN = "origin"
class RelationType(enum.Enum):
CNT_EARLY_IN_REV = "content_in_revision"
CNT_IN_DIR = "content_in_directory"
DIR_IN_REV = "directory_in_revision"
REV_IN_ORG = "revision_in_origin"
REV_BEFORE_REV = "revision_before_revision"
@dataclass(eq=True, frozen=True)
class ProvenanceResult:
content: Sha1Git
revision: Sha1Git
date: datetime
origin: Optional[str]
path: bytes
@dataclass(eq=True, frozen=True)
class DirectoryData:
"""Object representing the data associated to a directory in the provenance model,
where `date` is the date of the directory in the isochrone frontier, and `flat` is a
flag acknowledging that a flat model for the elements outside the frontier has
already been created.
"""
date: datetime
flat: bool
@dataclass(eq=True, frozen=True)
class RevisionData:
"""Object representing the data associated to a revision in the provenance model,
where `date` is the optional date of the revision (specifying it acknowledges that
the revision was already processed by the revision-content algorithm); and `origin`
identifies the preferred origin for the revision, if any.
"""
date: Optional[datetime]
origin: Optional[Sha1Git]
@dataclass(eq=True, frozen=True)
class RelationData:
"""Object representing a relation entry in the provenance model, where `src` and
`dst` are the sha1 ids of the entities being related, and `path` is optional
depending on the relation being represented.
"""
dst: Sha1Git
path: Optional[bytes]
@runtime_checkable
class ProvenanceStorageInterface(Protocol):
def __enter__(self) -> ProvenanceStorageInterface:
...
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
...
@remote_api_endpoint("close")
def close(self) -> None:
"""Close connection to the storage and release resources."""
...
@remote_api_endpoint("content_add")
def content_add(self, cnts: Dict[Sha1Git, datetime]) -> bool:
"""Add blobs identified by sha1 ids, with an associated date (as paired in
`cnts`) to the provenance storage. Return a boolean stating whether the
information was successfully stored.
"""
...
@remote_api_endpoint("content_find_first")
def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]:
"""Retrieve the first occurrence of the blob identified by `id`."""
...
@remote_api_endpoint("content_find_all")
def content_find_all(
self, id: Sha1Git, limit: Optional[int] = None
) -> Generator[ProvenanceResult, None, None]:
"""Retrieve all the occurrences of the blob identified by `id`."""
...
@remote_api_endpoint("content_get")
def content_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, datetime]:
"""Retrieve the associated date for each blob sha1 in `ids`."""
...
@remote_api_endpoint("directory_add")
def directory_add(self, dirs: Dict[Sha1Git, DirectoryData]) -> bool:
"""Add directories identified by sha1 ids, with associated date and (optional)
flatten flag (as paired in `dirs`) to the provenance storage. If the flatten
flag is set to None, the previous value present in the storage is preserved.
Return a boolean stating if the information was successfully stored.
"""
...
@remote_api_endpoint("directory_get")
def directory_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, DirectoryData]:
"""Retrieve the associated date and (optional) flatten flag for each directory
sha1 in `ids`. If some directories has no associated date, it is not present in
the resulting dictionary.
"""
...
@remote_api_endpoint("entity_get_all")
def entity_get_all(self, entity: EntityType) -> Set[Sha1Git]:
"""Retrieve all sha1 ids for entities of type `entity` present in the provenance
model. This method is used only in tests.
"""
...
@remote_api_endpoint("location_add")
def location_add(self, paths: Iterable[bytes]) -> bool:
"""Register the given `paths` in the storage."""
...
@remote_api_endpoint("location_get_all")
def location_get_all(self) -> Set[bytes]:
"""Retrieve all paths present in the provenance model.
This method is used only in tests."""
...
@remote_api_endpoint("open")
def open(self) -> None:
"""Open connection to the storage and allocate necessary resources."""
...
@remote_api_endpoint("origin_add")
def origin_add(self, orgs: Dict[Sha1Git, str]) -> bool:
"""Add origins identified by sha1 ids, with their corresponding url (as paired
in `orgs`) to the provenance storage. Return a boolean stating if the
information was successfully stored.
"""
...
@remote_api_endpoint("origin_get")
def origin_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, str]:
"""Retrieve the associated url for each origin sha1 in `ids`."""
...
@remote_api_endpoint("revision_add")
def revision_add(
self, revs: Union[Iterable[Sha1Git], Dict[Sha1Git, RevisionData]]
) -> bool:
"""Add revisions identified by sha1 ids, with optional associated date or origin
(as paired in `revs`) to the provenance storage. Return a boolean stating if the
information was successfully stored.
"""
...
@remote_api_endpoint("revision_get")
def revision_get(self, ids: Iterable[Sha1Git]) -> Dict[Sha1Git, RevisionData]:
"""Retrieve the associated date and origin for each revision sha1 in `ids`. If
some revision has no associated date nor origin, it is not present in the
resulting dictionary.
"""
...
@remote_api_endpoint("relation_add")
def relation_add(
self, relation: RelationType, data: Dict[Sha1Git, Set[RelationData]]
) -> bool:
"""Add entries in the selected `relation`. This method assumes all entities
being related are already registered in the storage. See `content_add`,
`directory_add`, `origin_add`, and `revision_add`.
"""
...
@remote_api_endpoint("relation_get")
def relation_get(
self, relation: RelationType, ids: Iterable[Sha1Git], reverse: bool = False
) -> Dict[Sha1Git, Set[RelationData]]:
"""Retrieve all entries in the selected `relation` whose source entities are
identified by some sha1 id in `ids`. If `reverse` is set, destination entities
are matched instead.
"""
...
@remote_api_endpoint("relation_get_all")
def relation_get_all(
self, relation: RelationType
) -> Dict[Sha1Git, Set[RelationData]]:
"""Retrieve all entries in the selected `relation` that are present in the
provenance model. This method is used only in tests.
"""
...
@remote_api_endpoint("with_path")
def with_path(self) -> bool:
...
@runtime_checkable
class ProvenanceInterface(Protocol):
storage: ProvenanceStorageInterface
def __enter__(self) -> ProvenanceInterface:
...
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
...
def close(self) -> None:
"""Close connection to the underlying `storage` and release resources."""
...
def flush(self) -> None:
"""Flush internal cache to the underlying `storage`."""
...
def content_add_to_directory(
self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes
) -> None:
"""Associate `blob` with `directory` in the provenance model. `prefix` is the
relative path from `directory` to `blob` (excluding `blob`'s name).
"""
...
def content_add_to_revision(
self, revision: RevisionEntry, blob: FileEntry, prefix: bytes
) -> None:
"""Associate `blob` with `revision` in the provenance model. `prefix` is the
absolute path from `revision`'s root directory to `blob` (excluding `blob`'s
name).
"""
...
def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]:
"""Retrieve the first occurrence of the blob identified by `id`."""
...
def content_find_all(
self, id: Sha1Git, limit: Optional[int] = None
) -> Generator[ProvenanceResult, None, None]:
"""Retrieve all the occurrences of the blob identified by `id`."""
...
def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]:
"""Retrieve the earliest known date of `blob`."""
...
def content_get_early_dates(
self, blobs: Iterable[FileEntry]
) -> Dict[Sha1Git, datetime]:
"""Retrieve the earliest known date for each blob in `blobs`. If some blob has
no associated date, it is not present in the resulting dictionary.
"""
...
def content_set_early_date(self, blob: FileEntry, date: datetime) -> None:
"""Associate `date` to `blob` as it's earliest known date."""
...
def directory_add_to_revision(
self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes
) -> None:
"""Associate `directory` with `revision` in the provenance model. `path` is the
absolute path from `revision`'s root directory to `directory` (including
`directory`'s name).
"""
...
+ def directory_already_flattenned(self, directory: DirectoryEntry) -> Optional[bool]:
+ """Check if the directory is already flattenned in the provenance model. If the
+ directory is unknown for the model, the methods returns None.
+ """
+ ...
+
+ def directory_flag_as_flattenned(self, directory: DirectoryEntry) -> None:
+ """Mark the directory as flattenned in the provenance model. If the
+ directory is unknown for the model, this method has no effect.
+ """
+ ...
+
def directory_get_date_in_isochrone_frontier(
self, directory: DirectoryEntry
) -> Optional[datetime]:
"""Retrieve the earliest known date of `directory` as an isochrone frontier in
the provenance model.
"""
...
def directory_get_dates_in_isochrone_frontier(
self, dirs: Iterable[DirectoryEntry]
) -> Dict[Sha1Git, datetime]:
"""Retrieve the earliest known date for each directory in `dirs` as isochrone
frontiers provenance model. If some directory has no associated date, it is not
present in the resulting dictionary.
"""
...
def directory_set_date_in_isochrone_frontier(
self, directory: DirectoryEntry, date: datetime
) -> None:
"""Associate `date` to `directory` as it's earliest known date as an isochrone
frontier in the provenance model.
"""
...
def open(self) -> None:
"""Open connection to the underlying `storage` and allocate necessary
resources.
"""
...
def origin_add(self, origin: OriginEntry) -> None:
"""Add `origin` to the provenance model."""
...
def revision_add(self, revision: RevisionEntry) -> None:
"""Add `revision` to the provenance model. This implies storing `revision`'s
date in the model, thus `revision.date` must be a valid date.
"""
...
def revision_add_before_revision(
self, head: RevisionEntry, revision: RevisionEntry
) -> None:
"""Associate `revision` to `head` as an ancestor of the latter."""
...
def revision_add_to_origin(
self, origin: OriginEntry, revision: RevisionEntry
) -> None:
"""Associate `revision` to `origin` as a head revision of the latter (ie. the
target of an snapshot for `origin` in the archive)."""
...
def revision_get_date(self, revision: RevisionEntry) -> Optional[datetime]:
"""Retrieve the date associated to `revision`."""
...
def revision_get_preferred_origin(
self, revision: RevisionEntry
) -> Optional[Sha1Git]:
"""Retrieve the preferred origin associated to `revision`."""
...
def revision_in_history(self, revision: RevisionEntry) -> bool:
"""Check if `revision` is known to be an ancestor of some head revision in the
provenance model.
"""
...
def revision_set_preferred_origin(
self, origin: OriginEntry, revision: RevisionEntry
) -> None:
"""Associate `origin` as the preferred origin for `revision`."""
...
def revision_visited(self, revision: RevisionEntry) -> bool:
"""Check if `revision` is known to be a head revision for some origin in the
provenance model.
"""
...
diff --git a/swh/provenance/provenance.py b/swh/provenance/provenance.py
index 5ca39b2..b0f43c6 100644
--- a/swh/provenance/provenance.py
+++ b/swh/provenance/provenance.py
@@ -1,489 +1,504 @@
# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from datetime import datetime
import logging
import os
from types import TracebackType
from typing import Dict, Generator, Iterable, Optional, Set, Tuple, Type
from typing_extensions import Literal, TypedDict
from swh.core.statsd import statsd
from swh.model.model import Sha1Git
from .interface import (
DirectoryData,
ProvenanceInterface,
ProvenanceResult,
ProvenanceStorageInterface,
RelationData,
RelationType,
RevisionData,
)
from .model import DirectoryEntry, FileEntry, OriginEntry, RevisionEntry
from .util import path_normalize
LOGGER = logging.getLogger(__name__)
BACKEND_DURATION_METRIC = "swh_provenance_backend_duration_seconds"
BACKEND_OPERATIONS_METRIC = "swh_provenance_backend_operations_total"
class DatetimeCache(TypedDict):
data: Dict[Sha1Git, Optional[datetime]] # None means unknown
added: Set[Sha1Git]
class OriginCache(TypedDict):
data: Dict[Sha1Git, str]
added: Set[Sha1Git]
class RevisionCache(TypedDict):
data: Dict[Sha1Git, Sha1Git]
added: Set[Sha1Git]
class ProvenanceCache(TypedDict):
content: DatetimeCache
directory: DatetimeCache
directory_flatten: Dict[Sha1Git, Optional[bool]] # None means unknown
revision: DatetimeCache
# below are insertion caches only
content_in_revision: Set[Tuple[Sha1Git, Sha1Git, bytes]]
content_in_directory: Set[Tuple[Sha1Git, Sha1Git, bytes]]
directory_in_revision: Set[Tuple[Sha1Git, Sha1Git, bytes]]
# these two are for the origin layer
origin: OriginCache
revision_origin: RevisionCache
revision_before_revision: Dict[Sha1Git, Set[Sha1Git]]
revision_in_origin: Set[Tuple[Sha1Git, Sha1Git]]
def new_cache() -> ProvenanceCache:
return ProvenanceCache(
content=DatetimeCache(data={}, added=set()),
directory=DatetimeCache(data={}, added=set()),
directory_flatten={},
revision=DatetimeCache(data={}, added=set()),
content_in_revision=set(),
content_in_directory=set(),
directory_in_revision=set(),
origin=OriginCache(data={}, added=set()),
revision_origin=RevisionCache(data={}, added=set()),
revision_before_revision={},
revision_in_origin=set(),
)
class Provenance:
def __init__(self, storage: ProvenanceStorageInterface) -> None:
self.storage = storage
self.cache = new_cache()
def __enter__(self) -> ProvenanceInterface:
self.open()
return self
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
self.close()
def clear_caches(self) -> None:
self.cache = new_cache()
def close(self) -> None:
self.storage.close()
@statsd.timed(metric=BACKEND_DURATION_METRIC, tags={"method": "flush"})
def flush(self) -> None:
self.flush_revision_content_layer()
self.flush_origin_revision_layer()
self.clear_caches()
@statsd.timed(
metric=BACKEND_DURATION_METRIC, tags={"method": "flush_origin_revision"}
)
def flush_origin_revision_layer(self) -> None:
# Origins and revisions should be inserted first so that internal ids'
# resolution works properly.
urls = {
sha1: url
for sha1, url in self.cache["origin"]["data"].items()
if sha1 in self.cache["origin"]["added"]
}
if urls:
while not self.storage.origin_add(urls):
statsd.increment(
metric=BACKEND_OPERATIONS_METRIC,
tags={"method": "flush_origin_revision_retry_origin"},
)
LOGGER.warning(
"Unable to write origins urls to the storage. Retrying..."
)
rev_orgs = {
# Destinations in this relation should match origins in the next one
**{
src: RevisionData(date=None, origin=None)
for src in self.cache["revision_before_revision"]
},
**{
# This relation comes second so that non-None origins take precedence
src: RevisionData(date=None, origin=org)
for src, org in self.cache["revision_in_origin"]
},
}
if rev_orgs:
while not self.storage.revision_add(rev_orgs):
statsd.increment(
metric=BACKEND_OPERATIONS_METRIC,
tags={"method": "flush_origin_revision_retry_revision"},
)
LOGGER.warning(
"Unable to write revision entities to the storage. Retrying..."
)
# Second, flat models for revisions' histories (ie. revision-before-revision).
if self.cache["revision_before_revision"]:
rev_before_rev = {
src: {RelationData(dst=dst, path=None) for dst in dsts}
for src, dsts in self.cache["revision_before_revision"].items()
}
while not self.storage.relation_add(
RelationType.REV_BEFORE_REV, rev_before_rev
):
statsd.increment(
metric=BACKEND_OPERATIONS_METRIC,
tags={
"method": "flush_origin_revision_retry_revision_before_revision"
},
)
LOGGER.warning(
"Unable to write %s rows to the storage. Retrying...",
RelationType.REV_BEFORE_REV,
)
# Heads (ie. revision-in-origin entries) should be inserted once flat models for
# their histories were already added. This is to guarantee consistent results if
# something needs to be reprocessed due to a failure: already inserted heads
# won't get reprocessed in such a case.
if self.cache["revision_in_origin"]:
rev_in_org: Dict[Sha1Git, Set[RelationData]] = {}
for src, dst in self.cache["revision_in_origin"]:
rev_in_org.setdefault(src, set()).add(RelationData(dst=dst, path=None))
while not self.storage.relation_add(RelationType.REV_IN_ORG, rev_in_org):
statsd.increment(
metric=BACKEND_OPERATIONS_METRIC,
tags={"method": "flush_origin_revision_retry_revision_in_origin"},
)
LOGGER.warning(
"Unable to write %s rows to the storage. Retrying...",
RelationType.REV_IN_ORG,
)
@statsd.timed(
metric=BACKEND_DURATION_METRIC, tags={"method": "flush_revision_content"}
)
def flush_revision_content_layer(self) -> None:
# Register in the storage all entities, to ensure the coming relations can
# properly resolve any internal reference if needed. Content and directory
# entries may safely be registered with their associated dates. In contrast,
# revision entries should be registered without date, as it is used to
# acknowledge that the flushing was successful. Also, directories are
# registered with their flatten flag not set.
cnt_dates = {
sha1: date
for sha1, date in self.cache["content"]["data"].items()
if sha1 in self.cache["content"]["added"] and date is not None
}
if cnt_dates:
while not self.storage.content_add(cnt_dates):
statsd.increment(
metric=BACKEND_OPERATIONS_METRIC,
tags={"method": "flush_revision_content_retry_content_date"},
)
LOGGER.warning(
"Unable to write content dates to the storage. Retrying..."
)
dir_dates = {
sha1: DirectoryData(date=date, flat=False)
for sha1, date in self.cache["directory"]["data"].items()
if sha1 in self.cache["directory"]["added"] and date is not None
}
if dir_dates:
while not self.storage.directory_add(dir_dates):
statsd.increment(
metric=BACKEND_OPERATIONS_METRIC,
tags={"method": "flush_revision_content_retry_directory_date"},
)
LOGGER.warning(
"Unable to write directory dates to the storage. Retrying..."
)
revs = {
sha1
for sha1, date in self.cache["revision"]["data"].items()
if sha1 in self.cache["revision"]["added"] and date is not None
}
if revs:
while not self.storage.revision_add(revs):
statsd.increment(
metric=BACKEND_OPERATIONS_METRIC,
tags={"method": "flush_revision_content_retry_revision_none"},
)
LOGGER.warning(
"Unable to write revision entities to the storage. Retrying..."
)
paths = {
path
for _, _, path in self.cache["content_in_revision"]
| self.cache["content_in_directory"]
| self.cache["directory_in_revision"]
}
if paths:
while not self.storage.location_add(paths):
statsd.increment(
metric=BACKEND_OPERATIONS_METRIC,
tags={"method": "flush_revision_content_retry_location"},
)
LOGGER.warning(
"Unable to write locations entities to the storage. Retrying..."
)
# For this layer, relations need to be inserted first so that, in case of
# failure, reprocessing the input does not generated an inconsistent database.
if self.cache["content_in_revision"]:
cnt_in_rev: Dict[Sha1Git, Set[RelationData]] = {}
for src, dst, path in self.cache["content_in_revision"]:
cnt_in_rev.setdefault(src, set()).add(RelationData(dst=dst, path=path))
while not self.storage.relation_add(
RelationType.CNT_EARLY_IN_REV, cnt_in_rev
):
statsd.increment(
metric=BACKEND_OPERATIONS_METRIC,
tags={"method": "flush_revision_content_retry_content_in_revision"},
)
LOGGER.warning(
"Unable to write %s rows to the storage. Retrying...",
RelationType.CNT_EARLY_IN_REV,
)
if self.cache["content_in_directory"]:
cnt_in_dir: Dict[Sha1Git, Set[RelationData]] = {}
for src, dst, path in self.cache["content_in_directory"]:
cnt_in_dir.setdefault(src, set()).add(RelationData(dst=dst, path=path))
while not self.storage.relation_add(RelationType.CNT_IN_DIR, cnt_in_dir):
statsd.increment(
metric=BACKEND_OPERATIONS_METRIC,
tags={
"method": "flush_revision_content_retry_content_in_directory"
},
)
LOGGER.warning(
"Unable to write %s rows to the storage. Retrying...",
RelationType.CNT_IN_DIR,
)
if self.cache["directory_in_revision"]:
dir_in_rev: Dict[Sha1Git, Set[RelationData]] = {}
for src, dst, path in self.cache["directory_in_revision"]:
dir_in_rev.setdefault(src, set()).add(RelationData(dst=dst, path=path))
while not self.storage.relation_add(RelationType.DIR_IN_REV, dir_in_rev):
statsd.increment(
metric=BACKEND_OPERATIONS_METRIC,
tags={
"method": "flush_revision_content_retry_directory_in_revision"
},
)
LOGGER.warning(
"Unable to write %s rows to the storage. Retrying...",
RelationType.DIR_IN_REV,
)
# After relations, flatten flags for directories can be safely set (if
# applicable) acknowledging those directories that have already be flattened.
# Similarly, dates for the revisions are set to acknowledge that these revisions
# won't need to be reprocessed in case of failure.
dir_acks = {
sha1: DirectoryData(
date=date, flat=self.cache["directory_flatten"].get(sha1) or False
)
for sha1, date in self.cache["directory"]["data"].items()
- if sha1 in self.cache["directory"]["added"] and date is not None
+ if self.cache["directory_flatten"].get(sha1) and date is not None
}
if dir_acks:
while not self.storage.directory_add(dir_acks):
statsd.increment(
metric=BACKEND_OPERATIONS_METRIC,
tags={"method": "flush_revision_content_retry_directory_ack"},
)
LOGGER.warning(
"Unable to write directory dates to the storage. Retrying..."
)
rev_dates = {
sha1: RevisionData(date=date, origin=None)
for sha1, date in self.cache["revision"]["data"].items()
if sha1 in self.cache["revision"]["added"] and date is not None
}
if rev_dates:
while not self.storage.revision_add(rev_dates):
statsd.increment(
metric=BACKEND_OPERATIONS_METRIC,
tags={"method": "flush_revision_content_retry_revision_date"},
)
LOGGER.warning(
"Unable to write revision dates to the storage. Retrying..."
)
def content_add_to_directory(
self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes
) -> None:
self.cache["content_in_directory"].add(
(blob.id, directory.id, path_normalize(os.path.join(prefix, blob.name)))
)
def content_add_to_revision(
self, revision: RevisionEntry, blob: FileEntry, prefix: bytes
) -> None:
self.cache["content_in_revision"].add(
(blob.id, revision.id, path_normalize(os.path.join(prefix, blob.name)))
)
def content_find_first(self, id: Sha1Git) -> Optional[ProvenanceResult]:
return self.storage.content_find_first(id)
def content_find_all(
self, id: Sha1Git, limit: Optional[int] = None
) -> Generator[ProvenanceResult, None, None]:
yield from self.storage.content_find_all(id, limit=limit)
def content_get_early_date(self, blob: FileEntry) -> Optional[datetime]:
return self.get_dates("content", [blob.id]).get(blob.id)
def content_get_early_dates(
self, blobs: Iterable[FileEntry]
) -> Dict[Sha1Git, datetime]:
return self.get_dates("content", [blob.id for blob in blobs])
def content_set_early_date(self, blob: FileEntry, date: datetime) -> None:
self.cache["content"]["data"][blob.id] = date
self.cache["content"]["added"].add(blob.id)
def directory_add_to_revision(
self, revision: RevisionEntry, directory: DirectoryEntry, path: bytes
) -> None:
self.cache["directory_in_revision"].add(
(directory.id, revision.id, path_normalize(path))
)
+ def directory_already_flattenned(self, directory: DirectoryEntry) -> Optional[bool]:
+ cache = self.cache["directory_flatten"]
+ if directory.id not in cache:
+ cache.setdefault(directory.id, None)
+ ret = self.storage.directory_get([directory.id])
+ if directory.id in ret:
+ dir = ret[directory.id]
+ cache[directory.id] = dir.flat
+ # date is kept to ensure we have it available when flushing
+ self.cache["directory"]["data"][directory.id] = dir.date
+ return cache.get(directory.id)
+
+ def directory_flag_as_flattenned(self, directory: DirectoryEntry) -> None:
+ self.cache["directory_flatten"][directory.id] = True
+
def directory_get_date_in_isochrone_frontier(
self, directory: DirectoryEntry
) -> Optional[datetime]:
return self.get_dates("directory", [directory.id]).get(directory.id)
def directory_get_dates_in_isochrone_frontier(
self, dirs: Iterable[DirectoryEntry]
) -> Dict[Sha1Git, datetime]:
return self.get_dates("directory", [directory.id for directory in dirs])
def directory_set_date_in_isochrone_frontier(
self, directory: DirectoryEntry, date: datetime
) -> None:
self.cache["directory"]["data"][directory.id] = date
self.cache["directory"]["added"].add(directory.id)
def get_dates(
self,
entity: Literal["content", "directory", "revision"],
ids: Iterable[Sha1Git],
) -> Dict[Sha1Git, datetime]:
cache = self.cache[entity]
missing_ids = set(id for id in ids if id not in cache)
if missing_ids:
if entity == "content":
cache["data"].update(self.storage.content_get(missing_ids))
elif entity == "directory":
cache["data"].update(
{
id: dir.date
for id, dir in self.storage.directory_get(missing_ids).items()
}
)
elif entity == "revision":
cache["data"].update(
{
id: rev.date
for id, rev in self.storage.revision_get(missing_ids).items()
}
)
dates: Dict[Sha1Git, datetime] = {}
for sha1 in ids:
date = cache["data"].setdefault(sha1, None)
if date is not None:
dates[sha1] = date
return dates
def open(self) -> None:
self.storage.open()
def origin_add(self, origin: OriginEntry) -> None:
self.cache["origin"]["data"][origin.id] = origin.url
self.cache["origin"]["added"].add(origin.id)
def revision_add(self, revision: RevisionEntry) -> None:
self.cache["revision"]["data"][revision.id] = revision.date
self.cache["revision"]["added"].add(revision.id)
def revision_add_before_revision(
self, head: RevisionEntry, revision: RevisionEntry
) -> None:
self.cache["revision_before_revision"].setdefault(revision.id, set()).add(
head.id
)
def revision_add_to_origin(
self, origin: OriginEntry, revision: RevisionEntry
) -> None:
self.cache["revision_in_origin"].add((revision.id, origin.id))
def revision_get_date(self, revision: RevisionEntry) -> Optional[datetime]:
return self.get_dates("revision", [revision.id]).get(revision.id)
def revision_get_preferred_origin(
self, revision: RevisionEntry
) -> Optional[Sha1Git]:
cache = self.cache["revision_origin"]["data"]
if revision.id not in cache:
ret = self.storage.revision_get([revision.id])
if revision.id in ret:
origin = ret[revision.id].origin
if origin is not None:
cache[revision.id] = origin
return cache.get(revision.id)
def revision_in_history(self, revision: RevisionEntry) -> bool:
return revision.id in self.cache["revision_before_revision"] or bool(
self.storage.relation_get(RelationType.REV_BEFORE_REV, [revision.id])
)
def revision_set_preferred_origin(
self, origin: OriginEntry, revision: RevisionEntry
) -> None:
self.cache["revision_origin"]["data"][revision.id] = origin.id
self.cache["revision_origin"]["added"].add(revision.id)
def revision_visited(self, revision: RevisionEntry) -> bool:
return revision.id in dict(self.cache["revision_in_origin"]) or bool(
self.storage.relation_get(RelationType.REV_IN_ORG, [revision.id])
)
diff --git a/swh/provenance/revision.py b/swh/provenance/revision.py
index a6c81ec..2b578e1 100644
--- a/swh/provenance/revision.py
+++ b/swh/provenance/revision.py
@@ -1,246 +1,224 @@
# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from datetime import datetime, timezone
-import os
from typing import Generator, Iterable, Iterator, List, Optional, Tuple
from swh.core.statsd import statsd
from swh.model.model import Sha1Git
from .archive import ArchiveInterface
+from .directory import directory_flatten
from .graph import IsochroneNode, build_isochrone_graph
from .interface import ProvenanceInterface
from .model import DirectoryEntry, RevisionEntry
REVISION_DURATION_METRIC = "swh_provenance_revision_content_layer_duration_seconds"
class CSVRevisionIterator:
"""Iterator over revisions typically present in the given CSV file.
The input is an iterator that produces 3 elements per row:
(id, date, root)
where:
- id: is the id (sha1_git) of the revision
- date: is the author date
- root: sha1 of the directory
"""
def __init__(
self,
revisions: Iterable[Tuple[Sha1Git, datetime, Sha1Git]],
limit: Optional[int] = None,
) -> None:
self.revisions: Iterator[Tuple[Sha1Git, datetime, Sha1Git]]
if limit is not None:
from itertools import islice
self.revisions = islice(revisions, limit)
else:
self.revisions = iter(revisions)
def __iter__(self) -> Generator[RevisionEntry, None, None]:
for id, date, root in self.revisions:
if date.tzinfo is None:
date = date.replace(tzinfo=timezone.utc)
yield RevisionEntry(id, date=date, root=root)
@statsd.timed(metric=REVISION_DURATION_METRIC, tags={"method": "main"})
def revision_add(
provenance: ProvenanceInterface,
archive: ArchiveInterface,
revisions: List[RevisionEntry],
trackall: bool = True,
lower: bool = True,
mindepth: int = 1,
minsize: int = 0,
commit: bool = True,
) -> None:
for revision in revisions:
assert revision.date is not None
assert revision.root is not None
# Processed content starting from the revision's root directory.
date = provenance.revision_get_date(revision)
if date is None or revision.date < date:
graph = build_isochrone_graph(
provenance,
archive,
revision,
DirectoryEntry(revision.root),
minsize=minsize,
)
revision_process_content(
provenance,
archive,
revision,
graph,
trackall=trackall,
lower=lower,
mindepth=mindepth,
minsize=minsize,
)
if commit:
provenance.flush()
@statsd.timed(metric=REVISION_DURATION_METRIC, tags={"method": "process_content"})
def revision_process_content(
provenance: ProvenanceInterface,
archive: ArchiveInterface,
revision: RevisionEntry,
graph: IsochroneNode,
trackall: bool = True,
lower: bool = True,
mindepth: int = 1,
minsize: int = 0,
) -> None:
assert revision.date is not None
provenance.revision_add(revision)
stack = [graph]
while stack:
current = stack.pop()
if current.dbdate is not None:
assert current.dbdate <= revision.date
if trackall:
# Current directory is an outer isochrone frontier for a previously
# processed revision. It should be reused as is.
provenance.directory_add_to_revision(
revision, current.entry, current.path
)
else:
assert current.maxdate is not None
# Current directory is not an outer isochrone frontier for any previous
# revision. It might be eligible for this one.
if is_new_frontier(
current,
revision=revision,
trackall=trackall,
lower=lower,
mindepth=mindepth,
):
# Outer frontier should be moved to current position in the isochrone
# graph. This is the first time this directory is found in the isochrone
# frontier.
provenance.directory_set_date_in_isochrone_frontier(
current.entry, current.maxdate
)
if trackall:
provenance.directory_add_to_revision(
revision, current.entry, current.path
)
- flatten_directory(
+ directory_flatten(
provenance, archive, current.entry, minsize=minsize
)
else:
# If current node is an invalidated frontier, update its date for future
# revisions to get the proper value.
if current.invalid:
provenance.directory_set_date_in_isochrone_frontier(
current.entry, current.maxdate
)
# No point moving the frontier here. Either there are no files or they
# are being seen for the first time here. Add all blobs to current
# revision updating date if necessary, and recursively analyse
# subdirectories as candidates to the outer frontier.
for blob in current.entry.files:
date = provenance.content_get_early_date(blob)
if date is None or revision.date < date:
provenance.content_set_early_date(blob, revision.date)
provenance.content_add_to_revision(revision, blob, current.path)
for child in current.children:
stack.append(child)
-@statsd.timed(metric=REVISION_DURATION_METRIC, tags={"method": "flatten_directory"})
-def flatten_directory(
- provenance: ProvenanceInterface,
- archive: ArchiveInterface,
- directory: DirectoryEntry,
- minsize: int = 0,
-) -> None:
- """Recursively retrieve all the files of 'directory' and insert them in the
- 'provenance' database in the 'content_to_directory' table.
- """
- stack = [(directory, b"")]
- while stack:
- current, prefix = stack.pop()
- current.retrieve_children(archive, minsize=minsize)
- for f_child in current.files:
- # Add content to the directory with the computed prefix.
- provenance.content_add_to_directory(directory, f_child, prefix)
- for d_child in current.dirs:
- # Recursively walk the child directory.
- stack.append((d_child, os.path.join(prefix, d_child.name)))
-
-
def is_new_frontier(
node: IsochroneNode,
revision: RevisionEntry,
trackall: bool = True,
lower: bool = True,
mindepth: int = 1,
) -> bool:
assert node.maxdate is not None # for mypy
assert revision.date is not None # idem
if trackall:
# The only real condition for a directory to be a frontier is that its content
# is already known and its maxdate is less (or equal) than current revision's
# date. Checking mindepth is meant to skip root directories (or any arbitrary
# depth) to improve the result. The option lower tries to maximize the reuse
# rate of previously defined frontiers by keeping them low in the directory
# tree.
return (
node.known
and node.maxdate <= revision.date # all content is earlier than revision
and node.depth
>= mindepth # current node is deeper than the min allowed depth
and (has_blobs(node) if lower else True) # there is at least one blob in it
)
else:
# If we are only tracking first occurrences, we want to ensure that all first
# occurrences end up in the content_early_in_rev relation. Thus, we force for
# every blob outside a frontier to have an strictly earlier date.
return (
node.maxdate < revision.date # all content is earlier than revision
and node.depth >= mindepth # deeper than the min allowed depth
and (has_blobs(node) if lower else True) # there is at least one blob
)
def has_blobs(node: IsochroneNode) -> bool:
# We may want to look for files in different ways to decide whether to define a
# frontier or not:
# 1. Only files in current node:
return any(node.entry.files)
# 2. Files anywhere in the isochrone graph
# stack = [node]
# while stack:
# current = stack.pop()
# if any(
# map(lambda child: isinstance(child.entry, FileEntry), current.children)):
# return True
# else:
# # All children are directory entries.
# stack.extend(current.children)
# return False
# 3. Files in the intermediate directories between current node and any previously
# defined frontier:
# TODO: complete this case!
# return any(
# map(lambda child: isinstance(child.entry, FileEntry), node.children)
# ) or all(
# map(
# lambda child: (
# not (isinstance(child.entry, DirectoryEntry) and child.date is None)
# )
# or has_blobs(child),
# node.children,
# )
# )
diff --git a/swh/provenance/tests/test_cli.py b/swh/provenance/tests/test_cli.py
index cceb30a..6ed98a2 100644
--- a/swh/provenance/tests/test_cli.py
+++ b/swh/provenance/tests/test_cli.py
@@ -1,104 +1,105 @@
# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from typing import Set
from _pytest.monkeypatch import MonkeyPatch
from click.testing import CliRunner
import psycopg2.extensions
import pytest
from swh.core.cli import swh as swhmain
import swh.core.cli.db # noqa ; ensure cli is loaded
from swh.core.db import BaseDb
import swh.provenance.cli # noqa ; ensure cli is loaded
def test_cli_swh_db_help() -> None:
# swhmain.add_command(provenance_cli)
result = CliRunner().invoke(swhmain, ["provenance", "-h"])
assert result.exit_code == 0
assert "Commands:" in result.output
commands = result.output.split("Commands:")[1]
for command in (
"find-all",
"find-first",
+ "iter-frontiers",
"iter-origins",
"iter-revisions",
):
assert f" {command} " in commands
TABLES = {
"dbflavor",
"dbversion",
"content",
"content_in_revision",
"content_in_directory",
"directory",
"directory_in_revision",
"location",
"origin",
"revision",
"revision_before_revision",
"revision_in_origin",
}
@pytest.mark.parametrize(
"flavor, dbtables", (("with-path", TABLES | {"location"}), ("without-path", TABLES))
)
def test_cli_db_create_and_init_db_with_flavor(
monkeypatch: MonkeyPatch,
postgresql: psycopg2.extensions.connection,
flavor: str,
dbtables: Set[str],
) -> None:
"""Test that 'swh db init provenance' works with flavors
for both with-path and without-path flavors"""
dbname = f"{flavor}-db"
# DB creation using 'swh db create'
db_params = postgresql.get_dsn_parameters()
monkeypatch.setenv("PGHOST", db_params["host"])
monkeypatch.setenv("PGUSER", db_params["user"])
monkeypatch.setenv("PGPORT", db_params["port"])
result = CliRunner().invoke(swhmain, ["db", "create", "-d", dbname, "provenance"])
assert result.exit_code == 0, result.output
# DB init using 'swh db init'
result = CliRunner().invoke(
swhmain, ["db", "init", "-d", dbname, "--flavor", flavor, "provenance"]
)
assert result.exit_code == 0, result.output
assert f"(flavor {flavor})" in result.output
db_params["dbname"] = dbname
cnx = BaseDb.connect(**db_params).conn
# check the DB looks OK (check for db_flavor and expected tables)
with cnx.cursor() as cur:
cur.execute("select swh_get_dbflavor()")
assert cur.fetchone() == (flavor,)
cur.execute(
"select table_name from information_schema.tables "
"where table_schema = 'public' "
f"and table_catalog = '{dbname}'"
)
tables = set(x for (x,) in cur.fetchall())
assert tables == dbtables
def test_cli_init_db_default_flavor(postgresql: psycopg2.extensions.connection) -> None:
"Test that 'swh db init provenance' defaults to a with-path flavored DB"
dbname = postgresql.dsn
result = CliRunner().invoke(swhmain, ["db", "init", "-d", dbname, "provenance"])
assert result.exit_code == 0, result.output
with postgresql.cursor() as cur:
cur.execute("select swh_get_dbflavor()")
assert cur.fetchone() == ("with-path",)
diff --git a/swh/provenance/tests/test_directory_flatten.py b/swh/provenance/tests/test_directory_flatten.py
new file mode 100644
index 0000000..82f7257
--- /dev/null
+++ b/swh/provenance/tests/test_directory_flatten.py
@@ -0,0 +1,72 @@
+# Copyright (C) 2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+
+from datetime import datetime, timezone
+
+from swh.model.hashutil import hash_to_bytes
+from swh.provenance.archive import ArchiveInterface
+from swh.provenance.directory import directory_add
+from swh.provenance.interface import (
+ DirectoryData,
+ ProvenanceInterface,
+ RelationData,
+ RelationType,
+)
+from swh.provenance.model import DirectoryEntry, FileEntry
+from swh.provenance.tests.conftest import fill_storage, load_repo_data
+
+
+def test_directory_add(
+ provenance: ProvenanceInterface,
+ archive: ArchiveInterface,
+) -> None:
+ # read data/README.md for more details on how these datasets are generated
+ data = load_repo_data("cmdbts2")
+ fill_storage(archive.storage, data)
+
+ # just take a directory that is known to exists in cmdbts2
+ directory = DirectoryEntry(
+ id=hash_to_bytes("48007c961cc734d1f63886d0413a6dc605e3e2ea")
+ )
+ content1 = FileEntry(
+ id=hash_to_bytes("20329687bb9c1231a7e05afe86160343ad49b494"), name=b"a"
+ )
+ content2 = FileEntry(
+ id=hash_to_bytes("50e9cdb03f9719261dd39d7f2920b906db3711a3"), name=b"b"
+ )
+ date = datetime.fromtimestamp(1000000010, timezone.utc)
+
+ # directory_add and the internal directory_flatten require the directory and its
+ # content to be known by the provenance object. Otherwise, they do nothing
+ provenance.directory_set_date_in_isochrone_frontier(directory, date)
+ provenance.content_set_early_date(content1, date)
+ provenance.content_set_early_date(content2, date)
+ provenance.flush()
+ assert provenance.storage.directory_get([directory.id]) == {
+ directory.id: DirectoryData(date=date, flat=False)
+ }
+ assert provenance.storage.content_get([content1.id, content2.id]) == {
+ content1.id: date,
+ content2.id: date,
+ }
+
+ # this query forces the directory date to be retrieved from the storage and cached
+ # (otherwise, the flush below won't update the directory flatten flag)
+ flattenned = provenance.directory_already_flattenned(directory)
+ assert flattenned is not None and not flattenned
+
+ # flatten the directory and check the expected result
+ directory_add(provenance, archive, [directory])
+ assert provenance.storage.directory_get([directory.id]) == {
+ directory.id: DirectoryData(date=date, flat=True)
+ }
+ assert provenance.storage.relation_get_all(RelationType.CNT_IN_DIR) == {
+ content1.id: {
+ RelationData(dst=directory.id, path=b"a"),
+ RelationData(dst=directory.id, path=b"C/a"),
+ },
+ content2.id: {RelationData(dst=directory.id, path=b"C/b")},
+ }
diff --git a/swh/provenance/tests/test_directory_iterator.py b/swh/provenance/tests/test_directory_iterator.py
new file mode 100644
index 0000000..f86c8c4
--- /dev/null
+++ b/swh/provenance/tests/test_directory_iterator.py
@@ -0,0 +1,29 @@
+# Copyright (C) 2021 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import pytest
+
+from swh.provenance.directory import CSVDirectoryIterator
+from swh.provenance.tests.conftest import fill_storage, load_repo_data
+from swh.storage.interface import StorageInterface
+
+
+@pytest.mark.parametrize(
+ "repo",
+ (
+ "cmdbts2",
+ "out-of-order",
+ ),
+)
+def test_revision_iterator(swh_storage: StorageInterface, repo: str) -> None:
+ """Test CSVDirectoryIterator"""
+ data = load_repo_data(repo)
+ fill_storage(swh_storage, data)
+
+ directories_ids = [dir["id"] for dir in data["directory"]]
+ directories = list(CSVDirectoryIterator(directories_ids))
+
+ assert directories
+ assert len(directories) == len(data["directory"])
diff --git a/swh/provenance/tests/test_revision_content_layer.py b/swh/provenance/tests/test_revision_content_layer.py
index b7bc070..6b81a37 100644
--- a/swh/provenance/tests/test_revision_content_layer.py
+++ b/swh/provenance/tests/test_revision_content_layer.py
@@ -1,456 +1,453 @@
# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import re
from typing import Any, Dict, Iterable, Iterator, List, Optional, Set, Tuple
import pytest
from typing_extensions import TypedDict
from swh.model.hashutil import hash_to_bytes
from swh.model.model import Sha1Git
from swh.provenance.archive import ArchiveInterface
from swh.provenance.interface import EntityType, ProvenanceInterface, RelationType
from swh.provenance.model import RevisionEntry
from swh.provenance.revision import revision_add
from swh.provenance.tests.conftest import (
fill_storage,
get_datafile,
load_repo_data,
ts2dt,
)
class SynthRelation(TypedDict):
prefix: Optional[str]
path: str
src: Sha1Git
dst: Sha1Git
rel_ts: float
class SynthRevision(TypedDict):
sha1: Sha1Git
date: float
msg: str
R_C: List[SynthRelation]
R_D: List[SynthRelation]
D_C: List[SynthRelation]
def synthetic_revision_content_result(filename: str) -> Iterator[SynthRevision]:
"""Generates dict representations of synthetic revisions found in the synthetic
file (from the data/ directory) given as argument of the generator.
Generated SynthRevision (typed dict) with the following elements:
"sha1": (Sha1Git) sha1 of the revision,
"date": (float) timestamp of the revision,
"msg": (str) commit message of the revision,
"R_C": (list) new R---C relations added by this revision
"R_D": (list) new R-D relations added by this revision
"D_C": (list) new D-C relations added by this revision
Each relation above is a SynthRelation typed dict with:
"path": (str) location
"src": (Sha1Git) sha1 of the source of the relation
"dst": (Sha1Git) sha1 of the destination of the relation
"rel_ts": (float) timestamp of the target of the relation
(related to the timestamp of the revision)
"""
with open(get_datafile(filename), "r") as fobj:
yield from _parse_synthetic_revision_content_file(fobj)
def _parse_synthetic_revision_content_file(
fobj: Iterable[str],
) -> Iterator[SynthRevision]:
"""Read a 'synthetic' file and generate a dict representation of the synthetic
revision for each revision listed in the synthetic file.
"""
regs = [
"(?P<revname>R[0-9]{2,4})?",
"(?P<reltype>[^| ]*)",
"([+] )?(?P<path>[^| +]*?)[/]?",
"(?P<type>[RDC]) (?P<sha1>[0-9a-f]{40})",
"(?P<ts>-?[0-9]+(.[0-9]+)?)",
]
regex = re.compile("^ *" + r" *[|] *".join(regs) + r" *(#.*)?$")
current_rev: List[dict] = []
for m in (regex.match(line) for line in fobj):
if m:
d = m.groupdict()
if d["revname"]:
if current_rev:
yield _mk_synth_rev(current_rev)
current_rev.clear()
current_rev.append(d)
if current_rev:
yield _mk_synth_rev(current_rev)
def _mk_synth_rev(synth_rev: List[Dict[str, str]]) -> SynthRevision:
assert synth_rev[0]["type"] == "R"
rev = SynthRevision(
sha1=hash_to_bytes(synth_rev[0]["sha1"]),
date=float(synth_rev[0]["ts"]),
msg=synth_rev[0]["revname"],
R_C=[],
R_D=[],
D_C=[],
)
current_path = None
# path of the last R-D relation we parsed, used a prefix for next D-C
# relations
for row in synth_rev[1:]:
if row["reltype"] == "R---C":
assert row["type"] == "C"
rev["R_C"].append(
SynthRelation(
prefix=None,
path=row["path"],
src=rev["sha1"],
dst=hash_to_bytes(row["sha1"]),
rel_ts=float(row["ts"]),
)
)
current_path = None
elif row["reltype"] == "R-D":
assert row["type"] == "D"
rev["R_D"].append(
SynthRelation(
prefix=None,
path=row["path"],
src=rev["sha1"],
dst=hash_to_bytes(row["sha1"]),
rel_ts=float(row["ts"]),
)
)
current_path = row["path"]
elif row["reltype"] == "D-C":
assert row["type"] == "C"
rev["D_C"].append(
SynthRelation(
prefix=current_path,
path=row["path"],
src=rev["R_D"][-1]["dst"],
dst=hash_to_bytes(row["sha1"]),
rel_ts=float(row["ts"]),
)
)
return rev
@pytest.mark.parametrize(
"repo, lower, mindepth",
(
("cmdbts2", True, 1),
("cmdbts2", False, 1),
("cmdbts2", True, 2),
("cmdbts2", False, 2),
("out-of-order", True, 1),
),
)
def test_revision_content_result(
provenance: ProvenanceInterface,
archive: ArchiveInterface,
repo: str,
lower: bool,
mindepth: int,
) -> None:
# read data/README.md for more details on how these datasets are generated
data = load_repo_data(repo)
fill_storage(archive.storage, data)
syntheticfile = get_datafile(
f"synthetic_{repo}_{'lower' if lower else 'upper'}_{mindepth}.txt"
)
revisions = {rev["id"]: rev for rev in data["revision"]}
rows: Dict[str, Set[Any]] = {
"content": set(),
"content_in_directory": set(),
"content_in_revision": set(),
"directory": set(),
"directory_in_revision": set(),
"location": set(),
"revision": set(),
}
def maybe_path(path: str) -> Optional[bytes]:
if provenance.storage.with_path():
return path.encode("utf-8")
return None
for synth_rev in synthetic_revision_content_result(syntheticfile):
revision = revisions[synth_rev["sha1"]]
entry = RevisionEntry(
id=revision["id"],
date=ts2dt(revision["date"]),
root=revision["directory"],
)
revision_add(provenance, archive, [entry], lower=lower, mindepth=mindepth)
# each "entry" in the synth file is one new revision
rows["revision"].add(synth_rev["sha1"])
assert rows["revision"] == provenance.storage.entity_get_all(
EntityType.REVISION
), synth_rev["msg"]
# check the timestamp of the revision
rev_ts = synth_rev["date"]
rev_data = provenance.storage.revision_get([synth_rev["sha1"]])[
synth_rev["sha1"]
]
assert (
rev_data.date is not None and rev_ts == rev_data.date.timestamp()
), synth_rev["msg"]
# this revision might have added new content objects
rows["content"] |= set(x["dst"] for x in synth_rev["R_C"])
rows["content"] |= set(x["dst"] for x in synth_rev["D_C"])
assert rows["content"] == provenance.storage.entity_get_all(
EntityType.CONTENT
), synth_rev["msg"]
# check for R-C (direct) entries
# these are added directly in the content_early_in_rev table
rows["content_in_revision"] |= set(
(x["dst"], x["src"], maybe_path(x["path"])) for x in synth_rev["R_C"]
)
assert rows["content_in_revision"] == {
(src, rel.dst, rel.path)
for src, rels in provenance.storage.relation_get_all(
RelationType.CNT_EARLY_IN_REV
).items()
for rel in rels
}, synth_rev["msg"]
# check timestamps
for rc in synth_rev["R_C"]:
assert (
rev_ts + rc["rel_ts"]
== provenance.storage.content_get([rc["dst"]])[rc["dst"]].timestamp()
), synth_rev["msg"]
# check directories
# each directory stored in the provenance index is an entry
# in the "directory" table...
rows["directory"] |= set(x["dst"] for x in synth_rev["R_D"])
assert rows["directory"] == provenance.storage.entity_get_all(
EntityType.DIRECTORY
), synth_rev["msg"]
# ... + a number of rows in the "directory_in_rev" table...
# check for R-D entries
rows["directory_in_revision"] |= set(
(x["dst"], x["src"], maybe_path(x["path"])) for x in synth_rev["R_D"]
)
assert rows["directory_in_revision"] == {
(src, rel.dst, rel.path)
for src, rels in provenance.storage.relation_get_all(
RelationType.DIR_IN_REV
).items()
for rel in rels
}, synth_rev["msg"]
# check timestamps
for rd in synth_rev["R_D"]:
- assert (
- rev_ts + rd["rel_ts"]
- == provenance.storage.directory_get([rd["dst"]])[
- rd["dst"]
- ].date.timestamp()
- ), synth_rev["msg"]
+ dir_data = provenance.storage.directory_get([rd["dst"]])[rd["dst"]]
+ assert rev_ts + rd["rel_ts"] == dir_data.date.timestamp(), synth_rev["msg"]
+ assert dir_data.flat, synth_rev["msg"]
# ... + a number of rows in the "content_in_dir" table
# for content of the directory.
# check for D-C entries
rows["content_in_directory"] |= set(
(x["dst"], x["src"], maybe_path(x["path"])) for x in synth_rev["D_C"]
)
assert rows["content_in_directory"] == {
(src, rel.dst, rel.path)
for src, rels in provenance.storage.relation_get_all(
RelationType.CNT_IN_DIR
).items()
for rel in rels
}, synth_rev["msg"]
# check timestamps
for dc in synth_rev["D_C"]:
assert (
rev_ts + dc["rel_ts"]
== provenance.storage.content_get([dc["dst"]])[dc["dst"]].timestamp()
), synth_rev["msg"]
if provenance.storage.with_path():
# check for location entries
rows["location"] |= set(x["path"].encode() for x in synth_rev["R_C"])
rows["location"] |= set(x["path"].encode() for x in synth_rev["D_C"])
rows["location"] |= set(x["path"].encode() for x in synth_rev["R_D"])
assert rows["location"] == provenance.storage.location_get_all(), synth_rev[
"msg"
]
@pytest.mark.parametrize(
"repo, lower, mindepth",
(
("cmdbts2", True, 1),
("cmdbts2", False, 1),
("cmdbts2", True, 2),
("cmdbts2", False, 2),
("out-of-order", True, 1),
),
)
@pytest.mark.parametrize("batch", (True, False))
def test_provenance_heuristics_content_find_all(
provenance: ProvenanceInterface,
archive: ArchiveInterface,
repo: str,
lower: bool,
mindepth: int,
batch: bool,
) -> None:
# read data/README.md for more details on how these datasets are generated
data = load_repo_data(repo)
fill_storage(archive.storage, data)
revisions = [
RevisionEntry(
id=revision["id"],
date=ts2dt(revision["date"]),
root=revision["directory"],
)
for revision in data["revision"]
]
def maybe_path(path: str) -> str:
if provenance.storage.with_path():
return path
return ""
if batch:
revision_add(provenance, archive, revisions, lower=lower, mindepth=mindepth)
else:
for revision in revisions:
revision_add(
provenance, archive, [revision], lower=lower, mindepth=mindepth
)
syntheticfile = get_datafile(
f"synthetic_{repo}_{'lower' if lower else 'upper'}_{mindepth}.txt"
)
expected_occurrences: Dict[str, List[Tuple[str, float, Optional[str], str]]] = {}
for synth_rev in synthetic_revision_content_result(syntheticfile):
rev_id = synth_rev["sha1"].hex()
rev_ts = synth_rev["date"]
for rc in synth_rev["R_C"]:
expected_occurrences.setdefault(rc["dst"].hex(), []).append(
(rev_id, rev_ts, None, maybe_path(rc["path"]))
)
for dc in synth_rev["D_C"]:
assert dc["prefix"] is not None # to please mypy
expected_occurrences.setdefault(dc["dst"].hex(), []).append(
(rev_id, rev_ts, None, maybe_path(dc["prefix"] + "/" + dc["path"]))
)
for content_id, results in expected_occurrences.items():
expected = [(content_id, *result) for result in results]
db_occurrences = [
(
occur.content.hex(),
occur.revision.hex(),
occur.date.timestamp(),
occur.origin,
occur.path.decode(),
)
for occur in provenance.content_find_all(hash_to_bytes(content_id))
]
if provenance.storage.with_path():
# this is not true if the db stores no path, because a same content
# that appears several times in a given revision may be reported
# only once by content_find_all()
assert len(db_occurrences) == len(expected)
assert set(db_occurrences) == set(expected)
@pytest.mark.parametrize(
"repo, lower, mindepth",
(
("cmdbts2", True, 1),
("cmdbts2", False, 1),
("cmdbts2", True, 2),
("cmdbts2", False, 2),
("out-of-order", True, 1),
),
)
@pytest.mark.parametrize("batch", (True, False))
def test_provenance_heuristics_content_find_first(
provenance: ProvenanceInterface,
archive: ArchiveInterface,
repo: str,
lower: bool,
mindepth: int,
batch: bool,
) -> None:
# read data/README.md for more details on how these datasets are generated
data = load_repo_data(repo)
fill_storage(archive.storage, data)
revisions = [
RevisionEntry(
id=revision["id"],
date=ts2dt(revision["date"]),
root=revision["directory"],
)
for revision in data["revision"]
]
if batch:
revision_add(provenance, archive, revisions, lower=lower, mindepth=mindepth)
else:
for revision in revisions:
revision_add(
provenance, archive, [revision], lower=lower, mindepth=mindepth
)
syntheticfile = get_datafile(
f"synthetic_{repo}_{'lower' if lower else 'upper'}_{mindepth}.txt"
)
expected_first: Dict[str, Tuple[str, float, List[str]]] = {}
# dict of tuples (blob_id, rev_id, [path, ...]) the third element for path
# is a list because a content can be added at several places in a single
# revision, in which case the result of content_find_first() is one of
# those path, but we have no guarantee which one it will return.
for synth_rev in synthetic_revision_content_result(syntheticfile):
rev_id = synth_rev["sha1"].hex()
rev_ts = synth_rev["date"]
for rc in synth_rev["R_C"]:
sha1 = rc["dst"].hex()
if sha1 not in expected_first:
assert rc["rel_ts"] == 0
expected_first[sha1] = (rev_id, rev_ts, [rc["path"]])
else:
if rev_ts == expected_first[sha1][1]:
expected_first[sha1][2].append(rc["path"])
elif rev_ts < expected_first[sha1][1]:
expected_first[sha1] = (rev_id, rev_ts, [rc["path"]])
for dc in synth_rev["D_C"]:
sha1 = rc["dst"].hex()
assert sha1 in expected_first
# nothing to do there, this content cannot be a "first seen file"
for content_id, (rev_id, ts, paths) in expected_first.items():
occur = provenance.content_find_first(hash_to_bytes(content_id))
assert occur is not None
assert occur.content.hex() == content_id
assert occur.revision.hex() == rev_id
assert occur.date.timestamp() == ts
assert occur.origin is None
if provenance.storage.with_path():
assert occur.path.decode() in paths
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Fri, Jul 4, 1:18 PM (1 w, 7 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3282606
Attached To
rDPROV Provenance database
Event Timeline
Log In to Comment