Changeset View
Changeset View
Standalone View
Standalone View
swh/vault/to_disk.py
Show First 20 Lines • Show All 65 Lines • ▼ Show 20 Lines | def apply_chunked(func, input_list, chunk_size): | ||||
"""Apply func on input_list divided in chunks of size chunk_size""" | """Apply func on input_list divided in chunks of size chunk_size""" | ||||
for i in range(0, len(input_list), chunk_size): | for i in range(0, len(input_list), chunk_size): | ||||
yield from func(input_list[i : i + chunk_size]) | yield from func(input_list[i : i + chunk_size]) | ||||
class DirectoryBuilder: | class DirectoryBuilder: | ||||
"""Reconstructs the on-disk representation of a directory in the storage.""" | """Reconstructs the on-disk representation of a directory in the storage.""" | ||||
def __init__(self, storage, root, dir_id): | def __init__(self, storage: StorageInterface, root: bytes, dir_id: bytes): | ||||
"""Initialize the directory builder. | """Initialize the directory builder. | ||||
Args: | Args: | ||||
storage: the storage object | storage: the storage object | ||||
root: the path where the directory should be reconstructed | root: the path where the directory should be reconstructed | ||||
dir_id: the identifier of the directory in the storage | dir_id: the identifier of the directory in the storage | ||||
""" | """ | ||||
self.storage = storage | self.storage = storage | ||||
self.root = root | self.root = root | ||||
self.dir_id = dir_id | self.dir_id = dir_id | ||||
def build(self): | def build(self) -> None: | ||||
"""Perform the reconstruction of the directory in the given root.""" | """Perform the reconstruction of the directory in the given root.""" | ||||
# Retrieve data from the database. | # Retrieve data from the database. | ||||
# Split into files, revisions and directory data. | # Split into files, revisions and directory data. | ||||
entries = collections.defaultdict(list) | entries = collections.defaultdict(list) | ||||
for entry in dir_iterator(self.storage, self.dir_id): | for entry in dir_iterator(self.storage, self.dir_id): | ||||
entries[entry["type"]].append(entry) | entries[entry["type"]].append(entry) | ||||
# Recreate the directory's subtree and then the files into it. | # Recreate the directory's subtree and then the files into it. | ||||
self._create_tree(entries["dir"]) | self._create_tree(entries["dir"]) | ||||
self._create_files(entries["file"]) | self._create_files(entries["file"]) | ||||
self._create_revisions(entries["rev"]) | self._create_revisions(entries["rev"]) | ||||
def _create_tree(self, directories): | def _create_tree(self, directories: List[Dict[str, Any]]) -> None: | ||||
"""Create a directory tree from the given paths | """Create a directory tree from the given paths | ||||
The tree is created from `root` and each given directory in | The tree is created from `root` and each given directory in | ||||
`directories` will be created. | `directories` will be created. | ||||
""" | """ | ||||
# Directories are sorted by depth so they are created in the | # Directories are sorted by depth so they are created in the | ||||
# right order | # right order | ||||
bsep = os.path.sep.encode() | bsep = os.path.sep.encode() | ||||
directories = sorted(directories, key=lambda x: len(x["path"].split(bsep))) | directories = sorted(directories, key=lambda x: len(x["path"].split(bsep))) | ||||
for dir in directories: | for dir in directories: | ||||
os.makedirs(os.path.join(self.root, dir["path"])) | os.makedirs(os.path.join(self.root, dir["path"])) | ||||
def _create_files(self, files_data): | def _create_files(self, files_data: List[Dict[str, Any]]) -> None: | ||||
"""Create the files in the tree and fetch their contents.""" | """Create the files in the tree and fetch their contents.""" | ||||
f = functools.partial(get_filtered_files_content, self.storage) | f = functools.partial(get_filtered_files_content, self.storage) | ||||
files_data = apply_chunked(f, files_data, 1000) | files_data = apply_chunked(f, files_data, 1000) | ||||
for file_data in files_data: | for file_data in files_data: | ||||
path = os.path.join(self.root, file_data["path"]) | path = os.path.join(self.root, file_data["path"]) | ||||
self._create_file(path, file_data["content"], file_data["perms"]) | self._create_file(path, file_data["content"], file_data["perms"]) | ||||
def _create_revisions(self, revs_data): | def _create_revisions(self, revs_data: List[Dict[str, Any]]) -> None: | ||||
"""Create the revisions in the tree as broken symlinks to the target | """Create the revisions in the tree as broken symlinks to the target | ||||
identifier.""" | identifier.""" | ||||
for file_data in revs_data: | for file_data in revs_data: | ||||
path = os.path.join(self.root, file_data["path"]) | path = os.path.join(self.root, file_data["path"]) | ||||
target = hashutil.hash_to_hex(file_data["target"]) | target = hashutil.hash_to_hex(file_data["target"]) | ||||
self._create_file(path, target, mode=DentryPerms.symlink) | self._create_file(path, target, mode=DentryPerms.symlink) | ||||
def _create_file(self, path, content, mode=DentryPerms.content): | def _create_file( | ||||
self, path: bytes, content: bytes, mode: int = DentryPerms.content | |||||
) -> None: | |||||
"""Create the given file and fill it with content.""" | """Create the given file and fill it with content.""" | ||||
perms = mode_to_perms(mode) | perms = mode_to_perms(mode) | ||||
if perms == DentryPerms.symlink: | if perms == DentryPerms.symlink: | ||||
os.symlink(content, path) | os.symlink(content, path) | ||||
else: | else: | ||||
with open(path, "wb") as f: | with open(path, "wb") as f: | ||||
f.write(content) | f.write(content) | ||||
os.chmod(path, perms.value) | os.chmod(path, perms.value) |