diff --git a/swh/model/from_disk.py b/swh/model/from_disk.py --- a/swh/model/from_disk.py +++ b/swh/model/from_disk.py @@ -10,7 +10,7 @@ import attr from attrs_strict import type_validator -from typing import List, Optional, Iterable, Any +from typing import Any, Iterable, List, Optional, Tuple from typing_extensions import Final from .hashutil import MultiHash @@ -279,6 +279,42 @@ return named_filter +def iter_directory( + directory, +) -> Tuple[ + Iterable[model.Content], Iterable[model.SkippedContent], Iterable[model.Directory] +]: + """Return the directory listing from a disk-memory directory instance. + + Raises: + TypeError in case an unexpected object type is listed. + + Returns: + Tuple of respectively iterable of content, skipped content and directories. + + """ + contents: List[model.Content] = [] + skipped_contents: List[model.SkippedContent] = [] + directories: List[model.Directory] = [] + + for obj in directory.iter_tree(): + obj = obj.to_model() + obj_type = obj.object_type + if obj_type in ("content", "content_file"): + # FIXME: read the data from disk later (when the + # storage buffer is flushed). + obj = obj.with_data() + contents.append(obj) + elif obj_type == "skipped_content": + skipped_contents.append(obj) + elif obj_type == "directory": + directories.append(obj) + else: + raise TypeError(f"Unexpected object type from disk: {obj}") + + return contents, skipped_contents, directories + + class Directory(MerkleNode): """Representation of a Software Heritage directory as a node in a Merkle Tree. diff --git a/swh/model/tests/test_from_disk.py b/swh/model/tests/test_from_disk.py --- a/swh/model/tests/test_from_disk.py +++ b/swh/model/tests/test_from_disk.py @@ -1,4 +1,4 @@ -# Copyright (C) 2017 The Software Heritage developers +# Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -9,6 +9,7 @@ import tempfile import unittest +from collections import defaultdict from typing import ClassVar, Optional from swh.model import from_disk @@ -858,6 +859,31 @@ raise self.failureException("Unknown type for %s" % obj) +class TarballIterDirectory(DataMixin, unittest.TestCase): + def setUp(self): + super().setUp() + self.make_from_tarball(self.tmpdir_name) + + def test_iter_directory(self): + """Iter from_disk.directory should yield the full arborescence tree + + """ + directory = Directory.from_disk( + path=os.path.join(self.tmpdir_name, b"sample-folder") + ) + + contents, skipped_contents, directories = from_disk.iter_directory(directory) + + expected_nb = defaultdict(int) + for name in self.tarball_contents.keys(): + obj = directory[name] + expected_nb[obj.object_type] += 1 + + assert len(contents) == expected_nb["content"] and len(contents) > 0 + assert len(skipped_contents) == 0 + assert len(directories) == expected_nb["directory"] and len(directories) > 0 + + class DirectoryManipulation(DataMixin, unittest.TestCase): def test_directory_access_nested(self): d = Directory()