Changeset View
Changeset View
Standalone View
Standalone View
swh/provenance/tests/test_archive_interface.py
# Copyright (C) 2021-2022 The Software Heritage developers | # Copyright (C) 2021-2022 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from collections import Counter | from collections import Counter | ||||
from operator import itemgetter | from operator import itemgetter | ||||
from typing import Any | from typing import Any | ||||
from typing import Counter as TCounter | from typing import Counter as TCounter | ||||
from typing import Dict, Iterable, List, Set, Tuple, Type, Union | from typing import Dict, Iterable, List, Set, Tuple, Type, Union | ||||
import pytest | import pytest | ||||
from swh.core.db import BaseDb | from swh.core.db import BaseDb | ||||
from swh.graph.naive_client import NaiveClient | from swh.graph.naive_client import NaiveClient | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
SWH_MODEL_OBJECT_TYPES, | |||||
BaseModel, | BaseModel, | ||||
Content, | Content, | ||||
Directory, | Directory, | ||||
DirectoryEntry, | DirectoryEntry, | ||||
ObjectType, | |||||
Origin, | Origin, | ||||
OriginVisit, | |||||
OriginVisitStatus, | OriginVisitStatus, | ||||
Release, | |||||
Revision, | Revision, | ||||
Sha1Git, | Sha1Git, | ||||
Snapshot, | Snapshot, | ||||
SnapshotBranch, | SnapshotBranch, | ||||
TargetType, | TargetType, | ||||
) | ) | ||||
from swh.model.swhids import CoreSWHID, ExtendedObjectType, ExtendedSWHID | from swh.model.swhids import CoreSWHID, ExtendedObjectType, ExtendedSWHID | ||||
from swh.provenance.archive import ArchiveInterface | from swh.provenance.archive import ArchiveInterface | ||||
▲ Show 20 Lines • Show All 55 Lines • ▼ Show 20 Lines | for snapshot in data["snapshot"]: | ||||
heads_ref: TCounter[Sha1Git] = Counter( | heads_ref: TCounter[Sha1Git] = Counter( | ||||
reference.snapshot_get_heads(snapshot["id"]) | reference.snapshot_get_heads(snapshot["id"]) | ||||
) | ) | ||||
heads: TCounter[Sha1Git] = Counter(archive.snapshot_get_heads(snapshot["id"])) | heads: TCounter[Sha1Git] = Counter(archive.snapshot_get_heads(snapshot["id"])) | ||||
assert heads_ref == heads | assert heads_ref == heads | ||||
def get_object_class(object_type: str) -> Type[BaseModel]: | def get_object_class(object_type: str) -> Type[BaseModel]: | ||||
if object_type == "origin": | return SWH_MODEL_OBJECT_TYPES[object_type] | ||||
return Origin | |||||
elif object_type == "origin_visit": | |||||
return OriginVisit | |||||
elif object_type == "origin_visit_status": | |||||
return OriginVisitStatus | |||||
elif object_type == "content": | |||||
return Content | |||||
elif object_type == "directory": | |||||
return Directory | |||||
elif object_type == "revision": | |||||
return Revision | |||||
elif object_type == "snapshot": | |||||
return Snapshot | |||||
raise ValueError | |||||
def data_to_model(data: Dict[str, List[dict]]) -> Dict[str, List[BaseModel]]: | def data_to_model(data: Dict[str, List[dict]]) -> Dict[str, List[BaseModel]]: | ||||
model: Dict[str, List[BaseModel]] = {} | model: Dict[str, List[BaseModel]] = {} | ||||
for object_type, objects in data.items(): | for object_type, objects in data.items(): | ||||
for object in objects: | for object in objects: | ||||
model.setdefault(object_type, []).append( | model.setdefault(object_type, []).append( | ||||
get_object_class(object_type).from_dict(object) | get_object_class(object_type).from_dict(object) | ||||
) | ) | ||||
return model | return model | ||||
def add_link( | def add_link( | ||||
edges: Set[ | edges: Set[ | ||||
Tuple[ | Tuple[ | ||||
Union[CoreSWHID, ExtendedSWHID, str], Union[CoreSWHID, ExtendedSWHID, str] | Union[CoreSWHID, ExtendedSWHID, str], Union[CoreSWHID, ExtendedSWHID, str] | ||||
] | ] | ||||
], | ], | ||||
src_obj: Union[Origin, Snapshot, Revision, Directory, Content], | src_obj: Union[Content, Directory, Origin, Release, Revision, Snapshot], | ||||
dst_id: bytes, | dst_id: bytes, | ||||
dst_type: ExtendedObjectType, | dst_type: ExtendedObjectType, | ||||
) -> None: | ) -> None: | ||||
swhid = ExtendedSWHID(object_type=dst_type, object_id=dst_id) | swhid = ExtendedSWHID(object_type=dst_type, object_id=dst_id) | ||||
edges.add((src_obj.swhid(), swhid)) | edges.add((src_obj.swhid(), swhid)) | ||||
def get_graph_data( | def get_graph_data( | ||||
Show All 40 Lines | for revision in model["revision"]: | ||||
assert isinstance(revision, Revision) | assert isinstance(revision, Revision) | ||||
nodes.add(revision.swhid()) | nodes.add(revision.swhid()) | ||||
# root directory | # root directory | ||||
add_link(edges, revision, revision.directory, ExtendedObjectType.DIRECTORY) | add_link(edges, revision, revision.directory, ExtendedObjectType.DIRECTORY) | ||||
# parent | # parent | ||||
for parent in revision.parents: | for parent in revision.parents: | ||||
add_link(edges, revision, parent, ExtendedObjectType.REVISION) | add_link(edges, revision, parent, ExtendedObjectType.REVISION) | ||||
dir_entry_types = { | |||||
"file": ExtendedObjectType.CONTENT, | |||||
"dir": ExtendedObjectType.DIRECTORY, | |||||
"rev": ExtendedObjectType.REVISION, | |||||
} | |||||
for directory in model["directory"]: | for directory in model["directory"]: | ||||
assert isinstance(directory, Directory) | assert isinstance(directory, Directory) | ||||
nodes.add(directory.swhid()) | nodes.add(directory.swhid()) | ||||
for entry in directory.entries: | for entry in directory.entries: | ||||
assert isinstance(entry, DirectoryEntry) | assert isinstance(entry, DirectoryEntry) | ||||
if entry.type == "file": | add_link(edges, directory, entry.target, dir_entry_types[entry.type]) | ||||
target_type = ExtendedObjectType.CONTENT | |||||
elif entry.type == "dir": | |||||
target_type = ExtendedObjectType.DIRECTORY | |||||
elif entry.type == "rev": | |||||
target_type = ExtendedObjectType.REVISION | |||||
else: | |||||
assert False, "unknown directory entry type" | |||||
add_link(edges, directory, entry.target, target_type) | |||||
for content in model["content"]: | for content in model["content"]: | ||||
assert isinstance(content, Content) | assert isinstance(content, Content) | ||||
nodes.add(content.swhid()) | nodes.add(content.swhid()) | ||||
object_type = { | |||||
ObjectType.CONTENT: ExtendedObjectType.CONTENT, | |||||
ObjectType.DIRECTORY: ExtendedObjectType.DIRECTORY, | |||||
ObjectType.REVISION: ExtendedObjectType.REVISION, | |||||
ObjectType.RELEASE: ExtendedObjectType.RELEASE, | |||||
ObjectType.SNAPSHOT: ExtendedObjectType.SNAPSHOT, | |||||
} | |||||
for release in model["release"]: | |||||
assert isinstance(release, Release) | |||||
nodes.add(release.swhid()) | |||||
if release.target is not None: | |||||
add_link(edges, release, release.target, object_type[release.target_type]) | |||||
return list(nodes), list(edges) | return list(nodes), list(edges) | ||||
@pytest.mark.parametrize( | @pytest.mark.parametrize( | ||||
"repo", | "repo", | ||||
("cmdbts2", "out-of-order", "with-merges"), | ("cmdbts2", "out-of-order", "with-merges"), | ||||
) | ) | ||||
def test_archive_interface(repo: str, archive: ArchiveInterface) -> None: | def test_archive_interface(repo: str, archive: ArchiveInterface) -> None: | ||||
▲ Show 20 Lines • Show All 44 Lines • Show Last 20 Lines |