Changeset View
Changeset View
Standalone View
Standalone View
swh/model/from_disk.py
Show First 20 Lines • Show All 395 Lines • ▼ Show 20 Lines | class Directory(MerkleNode): | ||||
class (contents and directories). | class (contents and directories). | ||||
When using the dict-like methods to update the contents of the directory, | When using the dict-like methods to update the contents of the directory, | ||||
the affected levels of hierarchy are reset and can be collected again using | the affected levels of hierarchy are reset and can be collected again using | ||||
the same method. This enables the efficient collection of updated nodes, | the same method. This enables the efficient collection of updated nodes, | ||||
for instance when the client is applying diffs. | for instance when the client is applying diffs. | ||||
""" | """ | ||||
__slots__ = ["__entries"] | __slots__ = ["__entries", "__model_object"] | ||||
object_type: Final = "directory" | object_type: Final = "directory" | ||||
@classmethod | @classmethod | ||||
def from_disk( | def from_disk( | ||||
cls, *, path, dir_filter=accept_all_directories, max_content_length=None | cls, *, path, dir_filter=accept_all_directories, max_content_length=None | ||||
): | ): | ||||
"""Compute the Software Heritage objects for a given directory tree | """Compute the Software Heritage objects for a given directory tree | ||||
Show All 29 Lines | ): | ||||
dirs[root] = cls({"name": os.path.basename(root), "path": root}) | dirs[root] = cls({"name": os.path.basename(root), "path": root}) | ||||
dirs[root].update(entries) | dirs[root].update(entries) | ||||
return dirs[top_path] | return dirs[top_path] | ||||
def __init__(self, data=None): | def __init__(self, data=None): | ||||
super().__init__(data=data) | super().__init__(data=data) | ||||
self.__entries = None | self.__entries = None | ||||
self.__model_object = None | |||||
def invalidate_hash(self): | def invalidate_hash(self): | ||||
self.__entries = None | self.__entries = None | ||||
self.__model_object = None | |||||
super().invalidate_hash() | super().invalidate_hash() | ||||
@staticmethod | @staticmethod | ||||
def child_to_directory_entry(name, child): | def child_to_directory_entry(name, child): | ||||
if child.object_type == "directory": | if child.object_type == "directory": | ||||
return { | return { | ||||
"type": "dir", | "type": "dir", | ||||
"perms": DentryPerms.directory, | "perms": DentryPerms.directory, | ||||
Show All 31 Lines | def entries(self): | ||||
return self.__entries | return self.__entries | ||||
def swhid(self) -> CoreSWHID: | def swhid(self) -> CoreSWHID: | ||||
"""Return node identifier as a SWHID""" | """Return node identifier as a SWHID""" | ||||
return CoreSWHID(object_type=ObjectType.DIRECTORY, object_id=self.hash) | return CoreSWHID(object_type=ObjectType.DIRECTORY, object_id=self.hash) | ||||
def compute_hash(self): | def compute_hash(self): | ||||
return model.Directory.from_dict({"entries": self.entries}).id | return self.to_model().id | ||||
def to_model(self) -> model.Directory: | def to_model(self) -> model.Directory: | ||||
"""Builds a `model.Directory` object based on this node; | """Builds a `model.Directory` object based on this node; | ||||
ignoring its children.""" | ignoring its children.""" | ||||
return model.Directory.from_dict(self.get_data()) | if self.__model_object is None: | ||||
self.__model_object = model.Directory.from_dict({"entries": self.entries}) | |||||
return self.__model_object | |||||
def __getitem__(self, key): | def __getitem__(self, key): | ||||
if not isinstance(key, bytes): | if not isinstance(key, bytes): | ||||
raise ValueError("Can only get a bytes from Directory") | raise ValueError("Can only get a bytes from Directory") | ||||
# Convenience shortcut | # Convenience shortcut | ||||
if key == b"": | if key == b"": | ||||
return self | return self | ||||
▲ Show 20 Lines • Show All 48 Lines • Show Last 20 Lines |