diff --git a/requirements-swh.txt b/requirements-swh.txt --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,5 +1,5 @@ swh.core >= 0.0.7 -swh.loader.core >= 5.0.0 swh.model >= 4.3.0 swh.scheduler >= 0.0.39 swh.storage >= 0.22.0 +swh.loader.core @ git+https://forge.softwareheritage.org/source/staging.git@ad5c8f49c1dea00e95b895811df21e3c93a6667d#egg=swh.loader.core diff --git a/swh/loader/git/dumb.py b/swh/loader/git/dumb.py --- a/swh/loader/git/dumb.py +++ b/swh/loader/git/dumb.py @@ -1,4 +1,4 @@ -# Copyright (C) 2021 The Software Heritage developers +# Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -21,7 +21,7 @@ from swh.loader.git.utils import HexBytes if TYPE_CHECKING: - from .loader import RepoRepresentation + from .loader import BaseRepoRepresentation logger = logging.getLogger(__name__) @@ -69,7 +69,7 @@ base_repo: State of repository archived by Software Heritage """ - def __init__(self, repo_url: str, base_repo: RepoRepresentation): + def __init__(self, repo_url: str, base_repo: BaseRepoRepresentation): self._session = requests.Session() self.repo_url = repo_url self.base_repo = base_repo diff --git a/swh/loader/git/loader.py b/swh/loader/git/loader.py --- a/swh/loader/git/loader.py +++ b/swh/loader/git/loader.py @@ -42,15 +42,24 @@ heads_logger = logger.getChild("refs") -class RepoRepresentation: +DEFAULT_NUMBER_OF_HEADS_PER_PACKFILE = 200 + + +def do_print_progress(msg: bytes) -> None: + sys.stderr.buffer.write(msg) + sys.stderr.flush() + + +class BaseRepoRepresentation: """Repository representation for a Software Heritage origin.""" def __init__( self, storage, - base_snapshots: List[Snapshot] = None, + base_snapshots: Optional[List[Snapshot]] = None, incremental: bool = True, - statsd: Statsd = None, + statsd: Optional[Statsd] = None, + **kwargs: Any, # extra kwargs are just ignored ): self.storage = storage self.incremental = incremental @@ -71,10 +80,9 @@ heads_logger.debug(" %r: %s", branch_name, branch.target.hex()) self.local_heads.add(HexBytes(hashutil.hash_to_bytehex(branch.target))) - def graph_walker(self) -> ObjectStoreGraphWalker: - return ObjectStoreGraphWalker(self.local_heads, get_parents=lambda commit: []) + self.walker = ObjectStoreGraphWalker(self.local_heads, lambda commit: []) - def determine_wants(self, refs: Dict[bytes, HexBytes]) -> List[HexBytes]: + def compute_wanted_refs(self, refs: Dict[bytes, HexBytes]) -> List[HexBytes]: """Get the list of bytehex sha1s that the git loader should fetch. This compares the remote refs sent by the server with the base snapshot @@ -96,6 +104,11 @@ continue remote_heads.add(ref_target) + if heads_logger.isEnabledFor(logging.DEBUG): + heads_logger.debug("Filtered remote heads:") + for value in remote_heads: + heads_logger.debug(" %s", value.decode()) + logger.debug("local_heads_count=%s", len(self.local_heads)) logger.debug("remote_heads_count=%s", len(remote_heads)) wanted_refs = list(remote_heads - self.local_heads) @@ -113,6 +126,107 @@ ) return wanted_refs + def confinue_fetch_refs(self) -> bool: + """Determine whether we are done fetching all refs.""" + return False + + def determine_wants( + self, refs: Dict[bytes, HexBytes], depth=None + ) -> List[HexBytes]: + """Get the list of bytehex sha1s that the git loader should fetch. + + This compares the remote refs sent by the server with the base snapshot + provided by the loader. + + """ + raise NotImplementedError + + +class RepoRepresentation(BaseRepoRepresentation): + """A RepoRepresentation object able to provide all refs to fetch. + + Internally, this computes the full list of wanted_refs and returns it the first time + :meth:`determine_wants` method is called. It's expected to be called once. The + caller has then all the necessary refs to retrieve the packfile. + + """ + + def determine_wants( + self, refs: Dict[bytes, HexBytes], depth=None + ) -> List[HexBytes]: + """Get the list of bytehex sha1s that the git loader should fetch. + + This compares the remote refs sent by the server with the base snapshot + provided by the loader. + + """ + return self.compute_wanted_refs(refs) + + +class RepoRepresentationPaginated(BaseRepoRepresentation): + """A RepoRepresentation objects able to provide interval of refs to fetch. + + Internally, this computes the full list of wanted_refs but then provide interval of + number_of_heads_per_packfile refs each time :meth:`determine_wants` method is + called. This expects the caller to call the :meth:`continue_fetch_refs` method to + determine if more refs are needed to be fetched or not. + + """ + + def __init__( + self, + *args, + number_of_heads_per_packfile=DEFAULT_NUMBER_OF_HEADS_PER_PACKFILE, + **kwargs, + ): + super().__init__(*args, **kwargs) + # Pagination index + self.index: int = 0 + self.number_of_heads_per_packfile = number_of_heads_per_packfile + self.wanted_refs: Optional[List[HexBytes]] = None + self.previous_refs: List[HexBytes] = [] + + def confinue_fetch_refs(self) -> bool: + """Determine whether we need to fetch other refs or not.""" + return self.wanted_refs is None or self.index < len(self.wanted_refs) + + def determine_wants( + self, refs: Dict[bytes, HexBytes], depth=None + ) -> List[HexBytes]: + """Get the list of bytehex sha1s that the git loader should fetch. + + This compares the remote refs sent by the server with the base snapshot + provided by the loader. + + """ + # First time around, we'll initialize all the wanted refs + if not self.wanted_refs: + self.wanted_refs = self.compute_wanted_refs(refs) + + # If empty, then we are done + if not self.wanted_refs: + return [] + + # We have all wanted refs but we are ingesting them one interval of + # number_of_heads_per_packfile refs at a time + start = self.index + self.index += self.number_of_heads_per_packfile + + assert self.wanted_refs + asked_refs = self.wanted_refs[start : min(self.index, len(self.wanted_refs))] + if heads_logger.isEnabledFor(logging.DEBUG): + heads_logger.debug("Asked remote heads:") + for value in asked_refs: + heads_logger.debug(" %s", value.decode()) + + if start > 0: + # Previous refs was already walked so we can remove them from the next walk + # iteration to avoid processing them again + self.walker.heads.update(self.previous_refs) + self.previous_refs = asked_refs + logger.debug("asked_refs_count=%s", len(asked_refs)) + return asked_refs + @dataclass class FetchPackReturn: @@ -120,6 +234,8 @@ symbolic_refs: Dict[bytes, HexBytes] pack_buffer: SpooledTemporaryFile pack_size: int + confinue_fetch_refs: bool + """Determine whether we still have to fetch remaining references.""" class GitLoader(BaseGitLoader): @@ -153,52 +269,78 @@ storage: StorageInterface, url: str, incremental: bool = True, - repo_representation: Type[RepoRepresentation] = RepoRepresentation, pack_size_bytes: int = 4 * 1024 * 1024 * 1024, temp_file_cutoff: int = 100 * 1024 * 1024, + fetch_multiple_packfiles: bool = False, + number_of_heads_per_packfile: int = DEFAULT_NUMBER_OF_HEADS_PER_PACKFILE, **kwargs: Any, ): """Initialize the bulk updater. Args: - repo_representation: swh's repository representation - which is in charge of filtering between known and remote - data. - ... - incremental: If True, the default, this starts from the last known snapshot (if any) references. Otherwise, this loads the full repository. + fetch_multiple_packfiles: If True, this ingests the repository using + (internally) multiple packfiles (creating partial incremental snapshots + along the way). When False, the default, this uses the existing ingestion + policy of retrieving one packfile to ingest. + number_of_heads_per_packfile: When fetch_multiple_packfiles is used, this + splits packfiles per a given number of heads (no guarantee on the used + size). """ super().__init__(storage=storage, origin_url=url, **kwargs) + # check if repository only supports git dumb transfer protocol, + # fetched pack file will be empty in that case as dulwich do + # not support it and do not fetch any refs + logger.debug("Transport url to communicate with server: %s", url) + self.client, self.path = dulwich.client.get_transport_and_path( + url, thin_packs=False + ) + logger.debug("Client %s to fetch pack at %s", self.client, self.path) + self.dumb = url.startswith("http") and getattr(self.client, "dumb", False) + self.fetch_multiple_packfiles = fetch_multiple_packfiles self.incremental = incremental - self.repo_representation = repo_representation self.pack_size_bytes = pack_size_bytes self.temp_file_cutoff = temp_file_cutoff # state initialized in fetch_data self.remote_refs: Dict[bytes, HexBytes] = {} self.symbolic_refs: Dict[bytes, HexBytes] = {} self.ref_object_types: Dict[bytes, Optional[TargetType]] = {} + self.configure_packfile_fetching_policy( + fetch_multiple_packfiles, number_of_heads_per_packfile + ) + + def configure_packfile_fetching_policy( + self, fetch_multiple_packfiles: bool, number_of_heads_per_packfile: int + ): + """Configure the packfile fetching policy. The default is to fetch one packfile + to ingest everything unknown out of it. When fetch_multiple_packfiles is True + (and the ingestion passes through the smart protocol), the ingestion uses + packfiles (with a given number_of_heads_per_packfile). After each packfile is + loaded, a 'partial' (because incomplete) and 'incremental' (as in gathering seen + refs) so far snapshot is created (incremental). + + """ + # will create partial snapshot alongside fetching multiple packfiles (when the + # transfer protocol is not the 'dumb' one) + self.create_partial_snapshot = not self.dumb and fetch_multiple_packfiles + self.number_of_heads_per_packfile: Optional[int] = None + self.repo_representation: Type[BaseRepoRepresentation] + if self.create_partial_snapshot: + self.repo_representation = RepoRepresentationPaginated + self.number_of_heads_per_packfile = number_of_heads_per_packfile + else: + self.repo_representation = RepoRepresentation def fetch_pack_from_origin( self, origin_url: str, - base_repo: RepoRepresentation, do_activity: Callable[[bytes], None], ) -> FetchPackReturn: """Fetch a pack from the origin""" pack_buffer = SpooledTemporaryFile(max_size=self.temp_file_cutoff) - transport_url = origin_url - - logger.debug("Transport url to communicate with server: %s", transport_url) - - client, path = dulwich.client.get_transport_and_path( - transport_url, thin_packs=False - ) - - logger.debug("Client %s to fetch pack at %s", client, path) - size_limit = self.pack_size_bytes def do_pack(data: bytes) -> None: @@ -207,16 +349,17 @@ if cur_size + would_write > size_limit: raise IOError( f"Pack file too big for repository {origin_url}, " - f"limit is {size_limit} bytes, current size is {cur_size}, " + f"number_of_heads_per_packfile is {size_limit} bytes, " + f"current size is {cur_size}, " f"would write {would_write}" ) pack_buffer.write(data) - pack_result = client.fetch_pack( - path, - base_repo.determine_wants, - base_repo.graph_walker(), + pack_result = self.client.fetch_pack( + self.path, + self.base_repo.determine_wants, + self.base_repo.walker, do_pack, progress=do_activity, ) @@ -230,16 +373,12 @@ logger.debug("fetched_pack_size=%s", pack_size) - # check if repository only supports git dumb transfer protocol, - # fetched pack file will be empty in that case as dulwich do - # not support it and do not fetch any refs - self.dumb = transport_url.startswith("http") and getattr(client, "dumb", False) - return FetchPackReturn( - remote_refs=utils.filter_refs(remote_refs), - symbolic_refs=utils.filter_refs(symbolic_refs), + remote_refs=remote_refs, + symbolic_refs=symbolic_refs, pack_buffer=pack_buffer, pack_size=pack_size, + confinue_fetch_refs=self.base_repo.confinue_fetch_refs(), ) def get_full_snapshot(self, origin_url) -> Optional[Snapshot]: @@ -280,24 +419,23 @@ # count how many runs of the loader are with each incremental mode self.statsd.increment("git_total", tags={}) - def fetch_data(self) -> bool: - assert self.origin is not None - - base_repo = self.repo_representation( + self.base_repo = self.repo_representation( storage=self.storage, base_snapshots=self.base_snapshots, incremental=self.incremental, statsd=self.statsd, + # Only used when self.repo_representation is RepoRepresentationpaginated, + # ignored otherwise + number_of_heads_per_packfile=self.number_of_heads_per_packfile, ) - def do_progress(msg: bytes) -> None: - sys.stderr.buffer.write(msg) - sys.stderr.flush() + def fetch_data(self) -> bool: + continue_fetch_refs = False + assert self.origin is not None try: - fetch_info = self.fetch_pack_from_origin( - self.origin.url, base_repo, do_progress - ) + fetch_info = self.fetch_pack_from_origin(self.origin.url, do_print_progress) + continue_fetch_refs = fetch_info.confinue_fetch_refs except (dulwich.client.HTTPUnauthorized, NotGitRepository) as e: raise NotFound(e) except GitProtocolError as e: @@ -321,21 +459,29 @@ if not self.dumb: raise - logger.debug( - "Protocol used for communication: %s", "dumb" if self.dumb else "smart" - ) if self.dumb: - self.dumb_fetcher = dumb.GitObjectsFetcher(self.origin.url, base_repo) + protocol = "dumb" + self.dumb_fetcher = dumb.GitObjectsFetcher(self.origin.url, self.base_repo) self.dumb_fetcher.fetch_object_ids() - self.remote_refs = utils.filter_refs(self.dumb_fetcher.refs) - self.symbolic_refs = utils.filter_refs(self.dumb_fetcher.head) + remote_refs = self.dumb_fetcher.refs + symbolic_refs = self.dumb_fetcher.head else: + protocol = "smart" self.pack_buffer = fetch_info.pack_buffer self.pack_size = fetch_info.pack_size - self.remote_refs = fetch_info.remote_refs - self.symbolic_refs = fetch_info.symbolic_refs + remote_refs = fetch_info.remote_refs + symbolic_refs = fetch_info.symbolic_refs + + logger.debug("Protocol used for communication: %s", protocol) - self.ref_object_types = {sha1: None for sha1 in self.remote_refs.values()} + # So the partial snapshot and the final ones creates the full branches + self.remote_refs.update(utils.filter_refs(remote_refs)) + self.symbolic_refs.update(utils.filter_refs(symbolic_refs)) + + for sha1 in self.remote_refs.values(): + if sha1 in self.ref_object_types: + continue + self.ref_object_types[sha1] = None logger.info( "Listed %d refs for repo %s", @@ -348,8 +494,7 @@ }, ) - # No more data to fetch - return False + return continue_fetch_refs def save_data(self) -> None: """Store a pack for archival""" @@ -374,6 +519,20 @@ with open(os.path.join(pack_dir, refs_name), "xb") as f: pickle.dump(self.remote_refs, f) + def build_partial_snapshot(self) -> Optional[Snapshot]: + # Current implementation makes it a simple call to existing :meth:`get_snapshot` + return self.get_snapshot() + + def store_data(self): + """Override the default implementation so we make sure to close the pack_buffer + if we use one in between loop (dumb loader does not actually one for example). + + """ + super().store_data() + + if not self.dumb: + self.pack_buffer.close() + def iter_objects(self, object_type: bytes) -> Iterator[ShaFile]: """Read all the objects of type `object_type` from the packfile""" if self.dumb: @@ -538,18 +697,18 @@ if not targets_unknown: break - if unknown_objects: - # This object was referenced by the server; We did not fetch - # it, and we do not know it from the previous snapshot. This is - # likely a bug in the loader. - raise RuntimeError( - "Unknown objects referenced by remote refs: %s" - % ( + if unknown_objects and not self.create_partial_snapshot: + # Let's warn about dangling object when loading full packfiles. The + # following objects were referenced by the server but we did not fetch + # it, and we do not know it from the previous snapshot. + logger.warning( + "Unknown objects referenced by remote refs: %s", + ( ", ".join( f"{name.decode()}: {hashutil.hash_to_hex(obj)}" for name, obj in unknown_objects.items() ) - ) + ), ) utils.warn_dangling_branches( diff --git a/swh/loader/git/tests/test_loader.py b/swh/loader/git/tests/test_loader.py --- a/swh/loader/git/tests/test_loader.py +++ b/swh/loader/git/tests/test_loader.py @@ -28,6 +28,7 @@ prepare_repository_from_archive, ) from swh.model.model import Origin, OriginVisit, OriginVisitStatus, Snapshot +from swh.storage.algos.origin import origin_get_latest_visit_status class CommonGitLoaderNotFound: @@ -95,6 +96,70 @@ snapshot=None, ) + def test_load_with_multiple_packfiles_with_partial_snapshot(self): + """Ingesting repositories in multiple packfiles should be ok""" + # Make the loader retrieve multiple packfiles As it's already instantiated, we + # must post configure the loader the attributes ourselves + self.loader.configure_packfile_fetching_policy( + fetch_multiple_packfiles=True, number_of_heads_per_packfile=1 + ) + + res = self.loader.load() + + assert res == {"status": "eventful"} + + stats = get_stats(self.loader.storage) + nb_snapshots = stats.pop("snapshot") + assert stats == { + "content": 4, + "directory": 7, + "origin": 1, + "origin_visit": 1, + "release": 0, + "revision": 7, + "skipped_content": 0, + } + + # This nb of snapshots will depend on the packfile ingestion order (and the git + # graph connectivity). If loading starts with a set of ref more connected than + # the other refs, we'll end up having less snapshots. Invertly, starting with + # less connected refs will create more snapshots. Hence the following + # comparison. + assert 2 <= nb_snapshots <= 4 + + # So we end up with at least one partial visit status with a snapshot + partial_visit = origin_get_latest_visit_status( + self.loader.storage, + self.repo_url, + type="git", + allowed_statuses=["partial"], + ) + assert partial_visit is not None + assert partial_visit.snapshot is not None + + partial_snapshot = self.loader.storage.snapshot_get_branches( + partial_visit.snapshot + ) + + # In the end, we have the final full visit targeting the snapshot + visit_status = assert_last_visit_matches( + self.loader.storage, + self.repo_url, + status="full", + type="git", + ) + assert visit_status.snapshot is not None + + snapshot = self.loader.storage.snapshot_get_branches(visit_status.snapshot) + + # the partial snapshot branches set is a subset of the branches of the final + # snapshot + partial_branches = partial_snapshot["branches"].keys() + snapshot_branches = snapshot["branches"].keys() + assert 0 < len(partial_branches) <= len(snapshot_branches) + for branch in partial_branches: + assert branch in snapshot_branches + class TestGitLoader(FullGitLoaderTests, CommonGitLoaderNotFound): """Prepare a git directory repository to be loaded through a GitLoader.