diff --git a/swh/loader/git/loader.py b/swh/loader/git/loader.py --- a/swh/loader/git/loader.py +++ b/swh/loader/git/loader.py @@ -5,14 +5,28 @@ from dataclasses import dataclass import datetime +from io import BytesIO import logging import os import pickle import sys from tempfile import SpooledTemporaryFile -from typing import Any, Callable, Dict, Iterable, Iterator, List, Optional, Set, Type +from typing import ( + Any, + Callable, + Dict, + Iterable, + Iterator, + List, + Optional, + Set, + Tuple, + Type, +) +from urllib.parse import urljoin, urlparse import dulwich.client +from dulwich.config import ConfigFile, parse_submodules from dulwich.errors import GitProtocolError, NotGitRepository from dulwich.object_store import ObjectStoreGraphWalker from dulwich.objects import ShaFile @@ -23,6 +37,7 @@ from swh.model import hashutil from swh.model.model import ( BaseContent, + Content, Directory, Origin, Release, @@ -31,6 +46,10 @@ SnapshotBranch, TargetType, ) +from swh.scheduler import get_scheduler +from swh.scheduler.interface import SchedulerInterface +from swh.scheduler.utils import create_oneshot_task_dict +from swh.storage import get_storage from swh.storage.algos.snapshot import snapshot_get_latest from swh.storage.interface import StorageInterface @@ -123,6 +142,7 @@ temp_file_cutoff: int = 100 * 1024 * 1024, save_data_path: Optional[str] = None, max_content_size: Optional[int] = None, + scheduler: Optional[SchedulerInterface] = None, ): """Initialize the bulk updater. @@ -151,6 +171,146 @@ self.remote_refs: Dict[bytes, HexBytes] = {} self.symbolic_refs: Dict[bytes, HexBytes] = {} self.ref_object_types: Dict[bytes, Optional[TargetType]] = {} + self.gitmodules: Set[Tuple[bytes, bytes]] = set() + self.scheduler = scheduler + self.snapshot = Snapshot(branches={}) + + @classmethod + def from_config(cls, storage: Dict[str, Any], **config: Any): + for legacy_key in ("storage", "celery"): + config.pop(legacy_key, None) + # Instantiate the storage + storage_instance = get_storage(**storage) + # Instantiate the scheduler + scheduler_instance = None + if "scheduler" in config: + scheduler_instance = get_scheduler(**config.pop("scheduler")) + return cls(storage=storage_instance, scheduler=scheduler_instance, **config) + + def post_load(self, success: bool = True) -> None: + """Process git submodules found during the loading process and possibly + create scheduler tasks to archive the associated origins. + """ + if self.scheduler is None: + return + + submodule_origins = set() + processed_revisions = set() + contents: Dict[bytes, Content] = {} + if self.remote_refs: + # when reloading a stale git repo, we might have missed some submodules + # because the loader did not process them yet at the last loading time, + # so try to find submodule definition files in the root directories of + # the current snapshot branches + for branch in self.snapshot.branches.values(): + if branch is None: + continue + if branch.target_type == TargetType.REVISION: + rev = self.storage.revision_get([branch.target])[0] + if rev is None: + continue + gitmodules = self.storage.directory_entry_get_by_path( + rev.directory, [b".gitmodules"] + ) + if gitmodules is None or gitmodules["type"] != "file": + continue + self.gitmodules.add((rev.directory, gitmodules["target"])) + + def _get_content(sha1_git: bytes) -> Content: + if sha1_git not in contents: + query = {"sha1_git": sha1_git} + contents[sha1_git] = self.storage.content_find(query)[0] + return contents[sha1_git] + + # get .gitmodules files metadata + gitmodules_data = ( + (directory, _get_content(sha1_git)) + for directory, sha1_git in self.gitmodules + ) + + parsed_submodules: Dict[bytes, List[Tuple[bytes, bytes, bytes]]] = {} + + # iterate on each .gitmodules files and parse them + for directory, content in gitmodules_data: + if content.sha1 in parsed_submodules: + submodules = parsed_submodules[content.sha1] + else: + content_data = self.storage.content_get_data(content.sha1) + if content_data is None: + parsed_submodules[content.sha1] = [] + continue + try: + submodules = list( + parse_submodules(ConfigFile.from_file(BytesIO(content_data))) + ) + parsed_submodules[content.sha1] = submodules + except Exception: + self.log.warning( + ".gitmodules file with sha1_git %s could not be parsed", + content.sha1_git.hex(), + ) + parsed_submodules[content.sha1] = [] + continue + + # iterate on parsed submodules + for submodule in submodules: + # extract submodule path and URL + path, url = submodule[0], submodule[1].decode("ascii") + if url.startswith(("./", "../")): + # relative URL case + url = urljoin(self.origin_url.rstrip("/") + "/", url) + + origin_scheme = urlparse(self.origin_url).scheme + submodule_scheme = urlparse(url).scheme + + if ( + # submodule origin already marked to be archived + url in submodule_origins + # submodule origin URL scheme is not supported by the loader + or submodule_scheme not in ("git", "http", "https") + # submodule origin URL does not match those of unit tests + and not (origin_scheme == "file" and submodule_scheme == "file") + ): + continue + + # get directory entry for submodule path + rev_entry = self.storage.directory_entry_get_by_path(directory, [path]) + + if ( + # submodule path does not exist + rev_entry is None + # path is not a submodule + or rev_entry["type"] != "rev" + # target revision already processed + or rev_entry["target"] in processed_revisions + ): + continue + elif self.storage.revision_get([rev_entry["target"]])[0] is None: + self.log.debug( + "Target revision %s for submodule %s is not archived, " + "origin %s will be loaded afterwards to get it.", + rev_entry["target"].hex(), + path, + url, + ) + submodule_origins.add(url) + else: + self.log.debug( + "Target revision %s for submodule %s is already archived, " + "origin %s will not be reloaded", + rev_entry["target"].hex(), + path, + url, + ) + processed_revisions.add(rev_entry["target"]) + + if submodule_origins: + # create loading tasks for submodules with high priority + tasks = [ + create_oneshot_task_dict("load-git", priority="high", url=origin_url) + for origin_url in submodule_origins + ] + self.scheduler.create_tasks(tasks) def fetch_pack_from_origin( self, @@ -384,7 +544,17 @@ if raw_obj.id in self.ref_object_types: self.ref_object_types[raw_obj.id] = TargetType.DIRECTORY - yield converters.dulwich_tree_to_directory(raw_obj) + directory = converters.dulwich_tree_to_directory(raw_obj) + + if self.scheduler: + # keep track of submodule definition files for later processing + self.gitmodules.update( + (directory.id, entry.target) + for entry in directory.entries + if entry.type == "file" and entry.name == b".gitmodules" + ) + + yield directory def get_revisions(self) -> Iterable[Revision]: """Format commits as swh revisions""" diff --git a/swh/loader/git/tests/test_loader.py b/swh/loader/git/tests/test_loader.py --- a/swh/loader/git/tests/test_loader.py +++ b/swh/loader/git/tests/test_loader.py @@ -3,11 +3,14 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from contextlib import contextmanager from functools import partial from http.server import HTTPServer, SimpleHTTPRequestHandler import os +import shutil import subprocess from tempfile import SpooledTemporaryFile +from textwrap import dedent from threading import Thread from dulwich.errors import GitProtocolError, NotGitRepository, ObjectFormatException @@ -325,3 +328,170 @@ @classmethod def setup_class(cls): cls.with_pack_files = False + + +@contextmanager +def working_directory(path): + prev_cwd = os.getcwd() + os.chdir(path) + try: + yield + finally: + os.chdir(prev_cwd) + + +def test_loader_load_submodules_recursively(swh_storage, swh_scheduler, tmp_path): + """Create three sample git repos: main, submodule and subsubmodule: + + * main has a submodule targeting the submodule repo + * submodule has a submodule targeting the subsubmodule repo + * subsubmodule has a submodule targeting the main repo (to test cycles handling) + + After loading a repo, submodules loading tasks should have been created in the + scheduler to archive the associated origins. + """ + author = b"swh " + + main_repo_name = "main" + main_repo_path = os.path.join(tmp_path, main_repo_name) + main_repo_url = f"file://{main_repo_path}" + + submodule_repo_name = "submodule" + submodule_repo_path = os.path.join(tmp_path, submodule_repo_name) + submodule_repo_url = f"file://{submodule_repo_path}" + + subsubmodule_repo_name = "subsubmodule_repo" + subsubmodule_repo_path = os.path.join(tmp_path, subsubmodule_repo_name) + subsubmodule_repo_url = f"file://{subsubmodule_repo_path}" + + # create main repo + main_repo = dulwich.repo.Repo.init(main_repo_path, mkdir=True) + with working_directory(main_repo_path): + # add foo.sh file + with open("foo.sh", "wb") as new_file: + new_file.write(b"#!/bin/bash\n echo foo") + # add .gitmodules file as created by "git submodule add" + with open(".gitmodules", "w") as new_file: + submodules = dedent( + f"""[submodule "{submodule_repo_name}"] + path = {submodule_repo_name} + url = {submodule_repo_url} + """ + ) + new_file.write(submodules) + + # create submodule repo + submodule_repo = dulwich.repo.Repo.init(submodule_repo_path, mkdir=True) + with working_directory(submodule_repo_path): + # add bar.sh file + with open("bar.sh", "wb") as new_file: + new_file.write(b"#!/bin/bash\n echo bar") + # add .gitmodules file as created by "git submodule add" + with open(".gitmodules", "w") as new_file: + submodules = dedent( + f"""[submodule "{subsubmodule_repo_name}"] + path = {subsubmodule_repo_name} + url = ../{subsubmodule_repo_name} + """ # we use a relative URL for the submodule here + ) + new_file.write(submodules) + + # create subsubmodule repo + subsubmodule_repo = dulwich.repo.Repo.init(subsubmodule_repo_path, mkdir=True) + with working_directory(subsubmodule_repo_path): + # add baz.sh file + with open("baz.sh", "wb") as new_file: + new_file.write(b"#!/bin/bash\n echo baz") + + # stage baz.sh file in subsubmodule repo + subsubmodule_repo.stage([b"baz.sh"]) + # add commit to subsubmodule repo + subsubmodule_repo.do_commit( + message=b"Initial commit", author=author, committer=author + ) + + # copy subsubmodule repo in submodule repo (simulate "git submodule add") + shutil.copytree( + subsubmodule_repo_path, + os.path.join(submodule_repo_path, subsubmodule_repo_name), + ) + # stage bar.sh file, .gitmodules file and subsubmodule submodule in submodule repo + submodule_repo.stage([b"bar.sh", b".gitmodules", subsubmodule_repo_name.encode()]) + # add commit to submodule repo + submodule_repo.do_commit(message=b"Initial commit", author=author, committer=author) + + # copy submodule repo in main repo (simulate "git submodule add") + shutil.copytree( + submodule_repo_path, os.path.join(main_repo_path, submodule_repo_name), + ) + # stage foo.sh file, .gitmodules file and submodule submodule in main repo + main_repo.stage([b"foo.sh", b".gitmodules", submodule_repo_name.encode()]) + # add commit to main repo + main_repo.do_commit(message=b"Initial commit", author=author, committer=author) + + # update subsubmodule repo + with working_directory(subsubmodule_repo_path): + # add .gitmodules file as created by "git submodule add" + with open(".gitmodules", "w") as new_file: + submodules = dedent( + f"""[submodule "{main_repo_name}"] + path = {main_repo_name} + url = {main_repo_url} + """ + ) + new_file.write(submodules) + # copy main repo in subsubmodule repo (simulate "git submodule add") + shutil.copytree( + main_repo_path, os.path.join(subsubmodule_repo_path, main_repo_name) + ) + + # stage .gitmodules file and main submodule in subsubmodule repo + subsubmodule_repo.stage([b".gitmodules", main_repo_name.encode()]) + # add commit to subsubmodule repo + subsubmodule_repo.do_commit( + message=b"Add submodule targeting main repo", author=author, committer=author + ) + + def _load_origin_task_exists(origin_url): + tasks = [ + dict(row.items()) + for row in swh_scheduler.search_tasks(task_type="load-git") + ] + + return any(task["arguments"]["kwargs"]["url"] == origin_url for task in tasks) + + # load the main repo + loader = GitLoader(swh_storage, main_repo_url, scheduler=swh_scheduler) + assert loader.load() == {"status": "eventful"} + + # a task to load the submodule repo should have been created + assert _load_origin_task_exists(submodule_repo_url) + + # load the submodule repo (simulate its scheduling) + loader = GitLoader(swh_storage, submodule_repo_url, scheduler=swh_scheduler) + assert loader.load() == {"status": "eventful"} + + # a task to load the subsubmodule repo should have been created + assert _load_origin_task_exists(subsubmodule_repo_url) + + # load the subsubmodule repo (simulate its scheduling) + loader = GitLoader(swh_storage, subsubmodule_repo_url, scheduler=swh_scheduler) + assert loader.load() == {"status": "eventful"} + + # a task to load the main repo should not have been created + assert not _load_origin_task_exists(main_repo_url) + + # check submodules have been loaded + stats = get_stats(loader.storage) + assert stats == { + "content": 6, # one bash file and one .gitmodules file in each repo + "directory": 4, # one directory for main and submodule repo, + # two for the subsubmodule repo + "origin": 3, # three origins + "origin_visit": 3, # three visits + "release": 0, # no releases + "revision": 4, # one revision for main and submodule repo, + # two for the subsubmodule repo + "skipped_content": 0, + "snapshot": 3, # one snapshot per repo + }