Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/svn/ra.py
Show All 30 Lines | from typing import ( | ||||
cast, | cast, | ||||
) | ) | ||||
import click | import click | ||||
from subvertpy import SubversionException, delta, properties | from subvertpy import SubversionException, delta, properties | ||||
from subvertpy.ra import Auth, RemoteAccess, get_username_provider | from subvertpy.ra import Auth, RemoteAccess, get_username_provider | ||||
from swh.model import from_disk, hashutil | from swh.model import from_disk, hashutil | ||||
from swh.model.from_disk import DiskBackedContent | |||||
from swh.model.model import Content, Directory, SkippedContent | from swh.model.model import Content, Directory, SkippedContent | ||||
if TYPE_CHECKING: | if TYPE_CHECKING: | ||||
from swh.loader.svn.svn import SvnRepo | from swh.loader.svn.svn import SvnRepo | ||||
from swh.loader.svn.utils import parse_external_definition, svn_urljoin | from swh.loader.svn.utils import parse_external_definition, svn_urljoin | ||||
_eol_style = {"native": b"\n", "CRLF": b"\r\n", "LF": b"\n", "CR": b"\r"} | _eol_style = {"native": b"\n", "CRLF": b"\r\n", "LF": b"\n", "CR": b"\r"} | ||||
▲ Show 20 Lines • Show All 153 Lines • ▼ Show 20 Lines | class FileEditor: | ||||
): | ): | ||||
self.directory = directory | self.directory = directory | ||||
self.path = path | self.path = path | ||||
self.fullpath = os.path.join(rootpath, path) | self.fullpath = os.path.join(rootpath, path) | ||||
self.state = state | self.state = state | ||||
self.svnrepo = svnrepo | self.svnrepo = svnrepo | ||||
self.editor = svnrepo.swhreplay.editor | self.editor = svnrepo.swhreplay.editor | ||||
self.editor.modified_paths.add(path) | |||||
def change_prop(self, key: str, value: str) -> None: | def change_prop(self, key: str, value: str) -> None: | ||||
if key == properties.PROP_EXECUTABLE: | if key == properties.PROP_EXECUTABLE: | ||||
if value is None: # bit flip off | if value is None: # bit flip off | ||||
self.state.executable = NOEXEC_FLAG | self.state.executable = NOEXEC_FLAG | ||||
else: | else: | ||||
self.state.executable = EXEC_FLAG | self.state.executable = EXEC_FLAG | ||||
elif key == properties.PROP_SPECIAL: | elif key == properties.PROP_SPECIAL: | ||||
# Possibly a symbolic link. We cannot check further at | # Possibly a symbolic link. We cannot check further at | ||||
▲ Show 20 Lines • Show All 171 Lines • ▼ Show 20 Lines | ): | ||||
# build directory on init | # build directory on init | ||||
os.makedirs(rootpath, exist_ok=True) | os.makedirs(rootpath, exist_ok=True) | ||||
self.file_states = file_states | self.file_states = file_states | ||||
self.dir_states = dir_states | self.dir_states = dir_states | ||||
self.svnrepo = svnrepo | self.svnrepo = svnrepo | ||||
self.editor = svnrepo.swhreplay.editor | self.editor = svnrepo.swhreplay.editor | ||||
self.externals: Dict[str, Tuple[str, Optional[int], bool]] = {} | self.externals: Dict[str, Tuple[str, Optional[int], bool]] = {} | ||||
# repository root dir has empty path | |||||
if path: | |||||
self.editor.modified_paths.add(path) | |||||
def remove_child(self, path: bytes) -> None: | def remove_child(self, path: bytes) -> None: | ||||
"""Remove a path from the current objects. | """Remove a path from the current objects. | ||||
The path can be resolved as link, file or directory. | The path can be resolved as link, file or directory. | ||||
This function takes also care of removing the link between the | This function takes also care of removing the link between the | ||||
child and the parent. | child and the parent. | ||||
Show All 16 Lines | def remove_child(self, path: bytes) -> None: | ||||
# when deleting a directory ensure to remove any svn property for the | # when deleting a directory ensure to remove any svn property for the | ||||
# file it contains as they can be added again later in another revision | # file it contains as they can be added again later in another revision | ||||
# without the same property set | # without the same property set | ||||
fullpath = os.path.join(self.rootpath, path) | fullpath = os.path.join(self.rootpath, path) | ||||
for state_path in list(self.file_states): | for state_path in list(self.file_states): | ||||
if state_path.startswith(fullpath + b"/"): | if state_path.startswith(fullpath + b"/"): | ||||
del self.file_states[state_path] | del self.file_states[state_path] | ||||
self.editor.modified_paths.discard(path) | |||||
def open_directory(self, path: str, *args) -> DirEditor: | def open_directory(self, path: str, *args) -> DirEditor: | ||||
"""Updating existing directory. | """Updating existing directory. | ||||
""" | """ | ||||
return DirEditor( | return DirEditor( | ||||
self.directory, | self.directory, | ||||
rootpath=self.rootpath, | rootpath=self.rootpath, | ||||
path=os.fsencode(path), | path=os.fsencode(path), | ||||
▲ Show 20 Lines • Show All 176 Lines • ▼ Show 20 Lines | def close(self): | ||||
if os.path.exists(temp_path): | if os.path.exists(temp_path): | ||||
# external successfully exported | # external successfully exported | ||||
# remove previous path in from_disk model | # remove previous path in from_disk model | ||||
self.remove_child(dest_fullpath) | self.remove_child(dest_fullpath) | ||||
# copy exported path to reconstructed filesystem | # copy exported path to reconstructed filesystem | ||||
fullpath = os.path.join(self.rootpath, dest_fullpath) | fullpath = os.path.join(self.rootpath, dest_fullpath) | ||||
self.editor.external_paths.add(dest_fullpath) | |||||
self.editor.modified_paths.add(dest_fullpath) | |||||
# update from_disk model and store external paths | # update from_disk model and store external paths | ||||
self.editor.external_paths.add(dest_fullpath) | self.editor.external_paths.add(dest_fullpath) | ||||
if os.path.isfile(temp_path): | if os.path.isfile(temp_path): | ||||
shutil.copy(temp_path, fullpath) | shutil.copy(temp_path, fullpath) | ||||
self.directory[dest_fullpath] = from_disk.Content.from_file( | self.directory[dest_fullpath] = from_disk.Content.from_file( | ||||
path=fullpath | path=fullpath | ||||
) | ) | ||||
else: | else: | ||||
shutil.copytree(temp_path, fullpath) | shutil.copytree(temp_path, fullpath) | ||||
self.directory[dest_fullpath] = from_disk.Directory.from_disk( | self.directory[dest_fullpath] = from_disk.Directory.from_disk( | ||||
path=fullpath | path=fullpath | ||||
) | ) | ||||
external_paths = set() | |||||
for root, dirs, files in os.walk(fullpath): | for root, dirs, files in os.walk(fullpath): | ||||
self.editor.external_paths.update( | external_paths.update( | ||||
[ | [ | ||||
os.path.join(root.replace(self.rootpath + b"/", b""), p) | os.path.join(root.replace(self.rootpath + b"/", b""), p) | ||||
for p in chain(dirs, files) | for p in chain(dirs, files) | ||||
] | ] | ||||
) | ) | ||||
self.editor.external_paths.update(external_paths) | |||||
self.editor.modified_paths.update(external_paths) | |||||
# ensure hash update for the directory with externals set | # ensure hash update for the directory with externals set | ||||
self.directory[self.path].update_hash(force=True) | self.directory[self.path].update_hash(force=True) | ||||
# backup externals in directory state | # backup externals in directory state | ||||
if self.externals: | if self.externals: | ||||
self.dir_states[self.path].externals = self.externals | self.dir_states[self.path].externals = self.externals | ||||
▲ Show 20 Lines • Show All 61 Lines • ▼ Show 20 Lines | ): | ||||
self.dir_states: Dict[bytes, DirState] = defaultdict(DirState) | self.dir_states: Dict[bytes, DirState] = defaultdict(DirState) | ||||
self.external_paths: Set[bytes] = set() | self.external_paths: Set[bytes] = set() | ||||
self.valid_externals: Dict[bytes, Tuple[str, bool]] = {} | self.valid_externals: Dict[bytes, Tuple[str, bool]] = {} | ||||
self.dead_externals: Set[str] = set() | self.dead_externals: Set[str] = set() | ||||
self.externals_cache_dir = tempfile.mkdtemp(dir=temp_dir) | self.externals_cache_dir = tempfile.mkdtemp(dir=temp_dir) | ||||
self.externals_cache: Dict[Tuple[str, Optional[int]], str] = {} | self.externals_cache: Dict[Tuple[str, Optional[int]], str] = {} | ||||
self.svnrepo = svnrepo | self.svnrepo = svnrepo | ||||
self.revnum = None | self.revnum = None | ||||
# to store the set of paths added or modified when replaying a revision | |||||
self.modified_paths: Set[bytes] = set() | |||||
def set_target_revision(self, revnum) -> None: | def set_target_revision(self, revnum) -> None: | ||||
self.revnum = revnum | self.revnum = revnum | ||||
def abort(self) -> None: | def abort(self) -> None: | ||||
pass | pass | ||||
def close(self) -> None: | def close(self) -> None: | ||||
pass | pass | ||||
def open_root(self, base_revnum: int) -> DirEditor: | def open_root(self, base_revnum: int) -> DirEditor: | ||||
# a new revision is being replayed so clear the modified_paths set | |||||
self.modified_paths.clear() | |||||
return DirEditor( | return DirEditor( | ||||
self.directory, | self.directory, | ||||
rootpath=self.rootpath, | rootpath=self.rootpath, | ||||
path=b"", | path=b"", | ||||
file_states=self.file_states, | file_states=self.file_states, | ||||
dir_states=self.dir_states, | dir_states=self.dir_states, | ||||
svnrepo=self.svnrepo, | svnrepo=self.svnrepo, | ||||
) | ) | ||||
Show All 33 Lines | def replay(self, rev: int) -> from_disk.Directory: | ||||
codecs.register_error("strict", _ra_codecs_error_handler) | codecs.register_error("strict", _ra_codecs_error_handler) | ||||
self.conn.replay(rev, rev + 1, self.editor) | self.conn.replay(rev, rev + 1, self.editor) | ||||
codecs.register_error("strict", codecs.strict_errors) | codecs.register_error("strict", codecs.strict_errors) | ||||
return self.editor.directory | return self.editor.directory | ||||
def compute_objects( | def compute_objects( | ||||
self, rev: int | self, rev: int | ||||
) -> Tuple[List[Content], List[SkippedContent], List[Directory]]: | ) -> Tuple[List[Content], List[SkippedContent], List[Directory]]: | ||||
"""Compute objects at revisions rev. | """Compute objects added or modified at revisions rev. | ||||
Expects the state to be at previous revision's objects. | Expects the state to be at previous revision's objects. | ||||
Args: | Args: | ||||
rev: The revision to start the replay from. | rev: The revision to start the replay from. | ||||
Returns: | Returns: | ||||
The updated objects between rev and rev+1. Beware that this | The updated objects between rev and rev+1. Beware that this | ||||
mutates the filesystem at rootpath accordingly. | mutates the filesystem at rootpath accordingly. | ||||
""" | """ | ||||
self.replay(rev) | self.replay(rev) | ||||
return from_disk.iter_directory(self.directory) | |||||
contents: List[Content] = [] | |||||
skipped_contents: List[SkippedContent] = [] | |||||
directories: List[Directory] = [] | |||||
directories.append(self.editor.directory.to_model()) | |||||
for path in self.editor.modified_paths: | |||||
obj = self.directory[path].to_model() | |||||
obj_type = obj.object_type | |||||
if obj_type in (Content.object_type, DiskBackedContent.object_type): | |||||
contents.append(obj.with_data()) | |||||
elif obj_type == SkippedContent.object_type: | |||||
skipped_contents.append(obj) | |||||
elif obj_type == Directory.object_type: | |||||
directories.append(obj) | |||||
return contents, skipped_contents, directories | |||||
@click.command() | @click.command() | ||||
@click.option("--local-url", default="/tmp", help="local svn working copy") | @click.option("--local-url", default="/tmp", help="local svn working copy") | ||||
@click.option( | @click.option( | ||||
"--svn-url", | "--svn-url", | ||||
default="file:///home/storage/svn/repos/pkg-fox", | default="file:///home/storage/svn/repos/pkg-fox", | ||||
help="svn repository's url.", | help="svn repository's url.", | ||||
▲ Show 20 Lines • Show All 69 Lines • Show Last 20 Lines |