Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9345705
D3694.id13014.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
10 KB
Subscribers
None
D3694.id13014.diff
View Options
diff --git a/swh/storage/tests/storage_data.py b/swh/storage/tests/storage_data.py
--- a/swh/storage/tests/storage_data.py
+++ b/swh/storage/tests/storage_data.py
@@ -165,7 +165,7 @@
DirectoryEntry(
name=b"hello",
type="file",
- target=directory5.id,
+ target=content2.sha1_git,
perms=from_disk.DentryPerms.content,
),
],
diff --git a/swh/storage/tests/test_storage.py b/swh/storage/tests/test_storage.py
--- a/swh/storage/tests/test_storage.py
+++ b/swh/storage/tests/test_storage.py
@@ -21,13 +21,14 @@
from hypothesis import given, strategies, settings, HealthCheck
-from typing import ClassVar, Optional
+from typing import Any, ClassVar, Dict, Iterator, Optional
from swh.model import from_disk
from swh.model.hashutil import hash_to_bytes
from swh.model.identifiers import SWHID
from swh.model.model import (
Content,
+ Directory,
MetadataTargetType,
Origin,
OriginVisit,
@@ -52,24 +53,44 @@
yield db, cur
-def transform_entries(dir_, *, prefix=b""):
- for ent in dir_.entries:
- yield {
- "dir_id": dir_.id,
- "type": ent.type,
- "target": ent.target,
- "name": prefix + ent.name,
- "perms": ent.perms,
- "status": None,
- "sha1": None,
- "sha1_git": None,
- "sha256": None,
- "length": None,
- }
+def transform_entries(
+ storage: StorageInterface, dir_: Directory, *, prefix: bytes = b""
+) -> Iterator[Dict[str, Any]]:
+ """Transform directory entry to expected listed directory entries from the directory_ls
+ function. When a directory entry is a file, fetch the necessary information from
+ the storage.
-
-def cmpdir(directory):
- return (directory["type"], directory["dir_id"])
+ """
+ for ent in dir_.entries:
+ if ent.type == "dir":
+ yield {
+ "dir_id": dir_.id,
+ "type": ent.type,
+ "target": ent.target,
+ "name": prefix + ent.name,
+ "perms": ent.perms,
+ "status": None,
+ "sha1": None,
+ "sha1_git": None,
+ "sha256": None,
+ "length": None,
+ }
+ elif ent.type == "file":
+ contents = storage.content_find({"sha1_git": ent.target})
+ assert contents
+ ent_dict = contents[0].to_dict()
+ for key in ["ctime", "blake2s256"]:
+ del ent_dict[key]
+ ent_dict.update(
+ {
+ "dir_id": dir_.id,
+ "type": ent.type,
+ "target": ent.target,
+ "name": prefix + ent.name,
+ "perms": ent.perms,
+ }
+ )
+ yield ent_dict
def assert_contents_ok(
@@ -638,7 +659,10 @@
}
def test_directory_add(self, swh_storage, sample_data):
+ content = sample_data.content
directory = sample_data.directories[1]
+ assert directory.entries[0].target == content.sha1_git
+ swh_storage.content_add([content])
init_missing = list(swh_storage.directory_missing([directory.id]))
assert [directory.id] == init_missing
@@ -646,14 +670,15 @@
actual_result = swh_storage.directory_add([directory])
assert actual_result == {"directory:add": 1}
- assert list(swh_storage.journal_writer.journal.objects) == [
- ("directory", directory)
- ]
+ assert ("directory", directory) in list(
+ swh_storage.journal_writer.journal.objects
+ )
actual_data = list(swh_storage.directory_ls(directory.id))
- expected_data = list(transform_entries(directory))
+ expected_data = list(transform_entries(swh_storage, directory))
- assert sorted(expected_data, key=cmpdir) == sorted(actual_data, key=cmpdir)
+ for data in actual_data:
+ assert data in expected_data
after_missing = list(swh_storage.directory_missing([directory.id]))
assert after_missing == []
@@ -678,79 +703,84 @@
("directory", directory)
]
- def test_directory_get_recursive(self, swh_storage, sample_data):
+ def test_directory_ls_recursive(self, swh_storage, sample_data):
+ # create consistent dataset regarding the directories we want to list
+ content, content2 = sample_data.contents[:2]
+ swh_storage.content_add([content, content2])
dir1, dir2, dir3 = sample_data.directories[:3]
- init_missing = list(swh_storage.directory_missing([dir1.id]))
- assert init_missing == [dir1.id]
+ dir_ids = [d.id for d in [dir1, dir2, dir3]]
+ init_missing = list(swh_storage.directory_missing(dir_ids))
+ assert init_missing == dir_ids
actual_result = swh_storage.directory_add([dir1, dir2, dir3])
assert actual_result == {"directory:add": 3}
- assert list(swh_storage.journal_writer.journal.objects) == [
- ("directory", dir1),
- ("directory", dir2),
- ("directory", dir3),
- ]
-
- # List directory containing a file and an unknown subdirectory
+ # List directory containing one file
actual_data = list(swh_storage.directory_ls(dir1.id, recursive=True))
- expected_data = list(transform_entries(dir1))
- assert sorted(expected_data, key=cmpdir) == sorted(actual_data, key=cmpdir)
+ expected_data = list(transform_entries(swh_storage, dir1))
+ for data in actual_data:
+ assert data in expected_data
# List directory containing a file and an unknown subdirectory
actual_data = list(swh_storage.directory_ls(dir2.id, recursive=True))
- expected_data = list(transform_entries(dir2))
- assert sorted(expected_data, key=cmpdir) == sorted(actual_data, key=cmpdir)
+ expected_data = list(transform_entries(swh_storage, dir2))
+ for data in actual_data:
+ assert data in expected_data
- # List directory containing a known subdirectory, entries should
- # be both those of the directory and of the subdir
+ # List directory containing both a known and unknown subdirectory, entries
+ # should be both those of the directory and of the known subdir (up to contents)
actual_data = list(swh_storage.directory_ls(dir3.id, recursive=True))
expected_data = list(
itertools.chain(
- transform_entries(dir3), transform_entries(dir2, prefix=b"subdir/"),
+ transform_entries(swh_storage, dir3),
+ transform_entries(swh_storage, dir2, prefix=b"subdir/"),
)
)
- assert sorted(expected_data, key=cmpdir) == sorted(actual_data, key=cmpdir)
- def test_directory_get_non_recursive(self, swh_storage, sample_data):
- dir1, dir2, dir3 = sample_data.directories[:3]
+ for data in actual_data:
+ assert data in expected_data
- init_missing = list(swh_storage.directory_missing([dir1.id]))
- assert init_missing == [dir1.id]
+ def test_directory_ls_non_recursive(self, swh_storage, sample_data):
+ # create consistent dataset regarding the directories we want to list
+ content, content2 = sample_data.contents[:2]
+ swh_storage.content_add([content, content2])
+ dir1, dir2, dir3, _, dir5 = sample_data.directories[:5]
- actual_result = swh_storage.directory_add([dir1, dir2, dir3])
- assert actual_result == {"directory:add": 3}
+ dir_ids = [d.id for d in [dir1, dir2, dir3, dir5]]
+ init_missing = list(swh_storage.directory_missing(dir_ids))
+ assert init_missing == dir_ids
- assert list(swh_storage.journal_writer.journal.objects) == [
- ("directory", dir1),
- ("directory", dir2),
- ("directory", dir3),
- ]
+ actual_result = swh_storage.directory_add([dir1, dir2, dir3, dir5])
+ assert actual_result == {"directory:add": 4}
# List directory containing a file and an unknown subdirectory
actual_data = list(swh_storage.directory_ls(dir1.id))
- expected_data = list(transform_entries(dir1))
- assert sorted(expected_data, key=cmpdir) == sorted(actual_data, key=cmpdir)
+ expected_data = list(transform_entries(swh_storage, dir1))
+ for data in actual_data:
+ assert data in expected_data
- # List directory contaiining a single file
+ # List directory containing a single file
actual_data = list(swh_storage.directory_ls(dir2.id))
- expected_data = list(transform_entries(dir2))
- assert sorted(expected_data, key=cmpdir) == sorted(actual_data, key=cmpdir)
+ expected_data = list(transform_entries(swh_storage, dir2))
+ for data in actual_data:
+ assert data in expected_data
# List directory containing a known subdirectory, entries should
# only be those of the parent directory, not of the subdir
actual_data = list(swh_storage.directory_ls(dir3.id))
- expected_data = list(transform_entries(dir3))
- assert sorted(expected_data, key=cmpdir) == sorted(actual_data, key=cmpdir)
+ expected_data = list(transform_entries(swh_storage, dir3))
+ for data in actual_data:
+ assert data in expected_data
def test_directory_entry_get_by_path(self, swh_storage, sample_data):
- cont = sample_data.content
+ cont, content2 = sample_data.contents[:2]
dir1, dir2, dir3, dir4, dir5 = sample_data.directories[:5]
# given
- init_missing = list(swh_storage.directory_missing([dir3.id]))
- assert init_missing == [dir3.id]
+ dir_ids = [d.id for d in [dir1, dir2, dir3, dir4, dir5]]
+ init_missing = list(swh_storage.directory_missing(dir_ids))
+ assert init_missing == dir_ids
actual_result = swh_storage.directory_add([dir3, dir4])
assert actual_result == {"directory:add": 2}
@@ -784,7 +814,7 @@
"dir_id": dir3.id,
"name": b"hello",
"type": "file",
- "target": dir5.id,
+ "target": content2.sha1_git,
"sha1": None,
"sha1_git": None,
"sha256": None,
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Thu, Jul 3, 3:28 PM (1 w, 2 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3235017
Attached To
D3694: tests: Improve coverage on `directory_ls` endpoints
Event Timeline
Log In to Comment