Changeset View
Changeset View
Standalone View
Standalone View
swh/model/from_disk.py
# Copyright (C) 2017-2018 The Software Heritage developers | # Copyright (C) 2017-2018 The Software Heritage developers | ||||
ardumont: copyright update ;) | |||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import enum | import enum | ||||
import os | import os | ||||
import stat | import stat | ||||
from pathlib import PosixPath | |||||
import attr | import attr | ||||
from typing import List, Optional | from typing import List, Optional | ||||
from .hashutil import MultiHash | from .hashutil import MultiHash | ||||
from .merkle import MerkleLeaf, MerkleNode | from .merkle import MerkleLeaf, MerkleNode | ||||
from .identifiers import ( | from .identifiers import ( | ||||
directory_entry_sort_key, directory_identifier, | directory_entry_sort_key, directory_identifier, | ||||
▲ Show 20 Lines • Show All 174 Lines • ▼ Show 20 Lines | def to_model(self) -> model.BaseContent: | ||||
data.pop('path', None) | data.pop('path', None) | ||||
return model.SkippedContent.from_dict(data) | return model.SkippedContent.from_dict(data) | ||||
elif 'data' in data: | elif 'data' in data: | ||||
return model.Content.from_dict(data) | return model.Content.from_dict(data) | ||||
else: | else: | ||||
return DiskBackedContent.from_dict(data) | return DiskBackedContent.from_dict(data) | ||||
def accept_all_directories(dirname, entries): | def accept_all_directories(dirname, entries): | ||||
Not Done Inline ActionsI would be explicit instead and name that: def accept_all_directories(dirpath: str, dirname: str, entries: Iterable[Any]) -> bool: and also with types ardumont: I would be explicit instead and name that:
```
def accept_all_directories(dirpath: str… | |||||
"""Default filter for :func:`Directory.from_disk` accepting all | """Default filter for :func:`Directory.from_disk` accepting all | ||||
directories | directories | ||||
Args: | Args: | ||||
dirname (bytes): directory name | dirname (bytes): directory name | ||||
entries (list): directory entries | entries (list): directory entries | ||||
""" | """ | ||||
return True | return True | ||||
Show All 21 Lines | Args: | ||||
way | way | ||||
Returns: | Returns: | ||||
a directory filter for :func:`directory_to_objects` | a directory filter for :func:`directory_to_objects` | ||||
""" | """ | ||||
if not case_sensitive: | if not case_sensitive: | ||||
names = [name.lower() for name in names] | names = [name.lower() for name in names] | ||||
def named_filter(dirname, entries, | def named_filter(dirname, entries, | ||||
names=names, case_sensitive=case_sensitive): | names=names, case_sensitive=case_sensitive): | ||||
Not Done Inline Actionsthis one must change as well. ardumont: this one must change as well. | |||||
if case_sensitive: | if case_sensitive: | ||||
return dirname not in names | return dirname not in names | ||||
else: | else: | ||||
return dirname.lower() not in names | return dirname.lower() not in names | ||||
return named_filter | return named_filter | ||||
def ignore_path(path, exclude_paths) -> bool: | |||||
ardumontUnsubmitted Not Done Inline Actionstype? def ignore_path(path: PosixPath, Set[PosixPath]) -> bool: ardumont: type?
```
def ignore_path(path: PosixPath, Set[PosixPath]) -> bool:
``` | |||||
"""Check if the given path has one of the parents in exclude_paths | |||||
""" | |||||
if exclude_paths is None: | |||||
ardumontUnsubmitted Not Done Inline Actionsif not exclude_paths: # None or {} should be treated the same. ardumont: ```
if not exclude_paths: # None or {} should be treated the same.
``` | |||||
return False | |||||
else: | |||||
path = PosixPath(path.decode()) | |||||
if set(path.parents).intersection(exclude_paths) \ | |||||
ardumontUnsubmitted Not Done Inline Actionsdirectly return the conditional return set(path.parents).intersection... ardumont: directly return the conditional
```
return set(path.parents).intersection...
``` | |||||
or path in exclude_paths: | |||||
return True | |||||
else: | |||||
return False | |||||
class Directory(MerkleNode): | class Directory(MerkleNode): | ||||
"""Representation of a Software Heritage directory as a node in a Merkle Tree. | """Representation of a Software Heritage directory as a node in a Merkle Tree. | ||||
This class can be used to generate, from an on-disk directory, all the | This class can be used to generate, from an on-disk directory, all the | ||||
objects that need to be sent to the Software Heritage archive. | objects that need to be sent to the Software Heritage archive. | ||||
The :func:`from_disk` constructor allows you to generate the data structure | The :func:`from_disk` constructor allows you to generate the data structure | ||||
from a directory on disk. The resulting :class:`Directory` can then be | from a directory on disk. The resulting :class:`Directory` can then be | ||||
manipulated as a dictionary, using the path as key. | manipulated as a dictionary, using the path as key. | ||||
The :func:`collect` method is used to retrieve all the objects that need to | The :func:`collect` method is used to retrieve all the objects that need to | ||||
be added to the Software Heritage archive since the last collection, by | be added to the Software Heritage archive since the last collection, by | ||||
class (contents and directories). | class (contents and directories). | ||||
When using the dict-like methods to update the contents of the directory, | When using the dict-like methods to update the contents of the directory, | ||||
the affected levels of hierarchy are reset and can be collected again using | the affected levels of hierarchy are reset and can be collected again using | ||||
the same method. This enables the efficient collection of updated nodes, | the same method. This enables the efficient collection of updated nodes, | ||||
for instance when the client is applying diffs. | for instance when the client is applying diffs. | ||||
""" | """ | ||||
__slots__ = ['__entries'] | __slots__ = ['__entries'] | ||||
type = 'directory' | type = 'directory' | ||||
@classmethod | @classmethod | ||||
def from_disk(cls, *, path, | def from_disk(cls, *, path, | ||||
dir_filter=accept_all_directories, | dir_filter=accept_all_directories, | ||||
ardumontUnsubmitted Not Done Inline Actionswondering if the ignore_path should not be implemented with the dir_filter callable now. ardumont: wondering if the `ignore_path` should not be implemented with the `dir_filter` callable now. | |||||
vlorentzUnsubmitted Not Done Inline ActionsYes, I think it should. vlorentz: Yes, I think it should. | |||||
exclude_paths=None, | |||||
ardumontUnsubmitted Not Done Inline Actionsmaybe make that a default empty set instead, that'd avoid to have to define Optional[Set[PosixPath]] ardumont: maybe make that a default empty set instead, that'd avoid to have to define `Optional[Set… | |||||
max_content_length=None): | max_content_length=None): | ||||
"""Compute the Software Heritage objects for a given directory tree | """Compute the Software Heritage objects for a given directory tree | ||||
Args: | Args: | ||||
path (bytes): the directory to traverse | path (bytes): the directory to traverse | ||||
data (bool): whether to add the data to the content objects | data (bool): whether to add the data to the content objects | ||||
save_path (bool): whether to add the path to the content objects | save_path (bool): whether to add the path to the content objects | ||||
exclude_paths (set[PosixPath]): set of path to ignore | |||||
ardumontUnsubmitted Not Done Inline Actionschange the method's signature to include types. Since you are modifying it and you know the types now ;) Regarding the docstring, you can then remove the defined types next to the variable names (since it's already in the signature of the method). ardumont: change the method's signature to include types.
Since you are modifying it and you know the… | |||||
dir_filter (function): a filter to ignore some directories by | dir_filter (function): a filter to ignore some directories by | ||||
name or contents. Takes two arguments: dirname and entries, and | name or contents. Takes two arguments: dirname and entries, and | ||||
returns True if the directory should be added, False if the | returns True if the directory should be added, False if the | ||||
directory should be ignored. | directory should be ignored. | ||||
max_content_length (Optional[int]): if given, all contents larger | max_content_length (Optional[int]): if given, all contents larger | ||||
than this will be skipped. | than this will be skipped. | ||||
""" | """ | ||||
top_path = path | top_path = path | ||||
dirs = {} | dirs = {} | ||||
for root, dentries, fentries in os.walk(top_path, topdown=False): | for root, dentries, fentries in os.walk(top_path, topdown=False): | ||||
entries = {} | entries = {} | ||||
# Join fentries and dentries in the same processing, as symbolic | # Join fentries and dentries in the same processing, as symbolic | ||||
# links to directories appear in dentries... | # links to directories appear in dentries... | ||||
for name in fentries + dentries: | for name in fentries + dentries: | ||||
path = os.path.join(root, name) | path = os.path.join(root, name) | ||||
if not os.path.isdir(path) or os.path.islink(path): | if not os.path.isdir(path) or os.path.islink(path): | ||||
content = Content.from_file( | content = Content.from_file( | ||||
path=path, max_content_length=max_content_length) | path=path, max_content_length=max_content_length) | ||||
entries[name] = content | entries[name] = content | ||||
else: | else: | ||||
if dir_filter(name, dirs[path].entries): | if dir_filter(name, dirs[path].entries) \ | ||||
and not ignore_path(path, exclude_paths): | |||||
entries[name] = dirs[path] | entries[name] = dirs[path] | ||||
dirs[root] = cls({'name': os.path.basename(root)}) | dirs[root] = cls({'name': os.path.basename(root)}) | ||||
dirs[root].update(entries) | dirs[root].update(entries) | ||||
return dirs[top_path] | return dirs[top_path] | ||||
def __init__(self, data=None): | def __init__(self, data=None): | ||||
▲ Show 20 Lines • Show All 99 Lines • Show Last 20 Lines |
copyright update ;)