diff --git a/requirements-swh.txt b/requirements-swh.txt --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,5 +1,5 @@ swh.core >= 0.0.7 -swh.loader.core >= 5.0.0 swh.model >= 4.3.0 swh.scheduler >= 0.0.39 swh.storage >= 0.22.0 +swh.loader.core @ git+https://forge.softwareheritage.org/source/staging.git@ad5c8f49c1dea00e95b895811df21e3c93a6667d#egg=swh.loader.core diff --git a/swh/loader/git/loader.py b/swh/loader/git/loader.py --- a/swh/loader/git/loader.py +++ b/swh/loader/git/loader.py @@ -42,15 +42,24 @@ heads_logger = logger.getChild("refs") +DEFAULT_NUMBER_IDS_TO_FETCH = 200 + + +def do_print_progress(msg: bytes) -> None: + sys.stderr.buffer.write(msg) + sys.stderr.flush() + + class RepoRepresentation: """Repository representation for a Software Heritage origin.""" def __init__( self, storage, - base_snapshots: List[Snapshot] = None, + base_snapshots: Optional[List[Snapshot]] = None, incremental: bool = True, - statsd: Statsd = None, + statsd: Optional[Statsd] = None, + limit: int = DEFAULT_NUMBER_IDS_TO_FETCH, ): self.storage = storage self.incremental = incremental @@ -71,10 +80,24 @@ heads_logger.debug(" %r: %s", branch_name, branch.target.hex()) self.local_heads.add(HexBytes(hashutil.hash_to_bytehex(branch.target))) - def graph_walker(self) -> ObjectStoreGraphWalker: - return ObjectStoreGraphWalker(self.local_heads, get_parents=lambda commit: []) + self.heads: Set[HexBytes] = set() + self.wanted_refs: Optional[List[HexBytes]] = None + self.walker = ObjectStoreGraphWalker(self.heads, lambda commit: []) + # Pagination index + self.index: int = 0 + self.limit = limit + self.previous_refs: List[HexBytes] = [] + + def more_refs_to_fetch(self) -> bool: + """Did we fetch all wanted refs?""" + return self.wanted_refs is not None and self.index > len(self.wanted_refs) - def determine_wants(self, refs: Dict[bytes, HexBytes]) -> List[HexBytes]: + def graph_walker(self): + return self.walker + + def determine_wants( + self, refs: Dict[bytes, HexBytes], depth=None + ) -> List[HexBytes]: """Get the list of bytehex sha1s that the git loader should fetch. This compares the remote refs sent by the server with the base snapshot @@ -84,34 +107,52 @@ if not refs: return [] - if heads_logger.isEnabledFor(logging.DEBUG): - heads_logger.debug("Heads returned by the git remote:") - for name, value in refs.items(): - heads_logger.debug(" %r: %s", name, value.decode()) + if not self.wanted_refs: + # We'll compute all wanted_refs to ingest but we'll return it by batch of + # limit + if heads_logger.isEnabledFor(logging.DEBUG): + heads_logger.debug("Heads returned by the git remote:") + for name, value in refs.items(): + heads_logger.debug(" %r: %s", name, value.decode()) + + # Get the remote heads that we want to fetch + remote_heads: Set[HexBytes] = set() + for ref_name, ref_target in refs.items(): + if utils.ignore_branch_name(ref_name): + continue + remote_heads.add(ref_target) + + logger.debug("local_heads_count=%s", len(self.local_heads)) + logger.debug("remote_heads_count=%s", len(remote_heads)) + wanted_refs = list(remote_heads - self.local_heads) + logger.debug("wanted_refs_count=%s", len(wanted_refs)) + if self.statsd is not None: + self.statsd.histogram( + "git_ignored_refs_percent", + len(remote_heads - set(refs.values())) / len(refs), + tags={}, + ) + self.statsd.histogram( + "git_known_refs_percent", + len(self.local_heads & remote_heads) / len(remote_heads), + tags={}, + ) + # Determine all refs we want to ingest + self.wanted_refs = wanted_refs - # Get the remote heads that we want to fetch - remote_heads: Set[HexBytes] = set() - for ref_name, ref_target in refs.items(): - if utils.ignore_branch_name(ref_name): - continue - remote_heads.add(ref_target) - - logger.debug("local_heads_count=%s", len(self.local_heads)) - logger.debug("remote_heads_count=%s", len(remote_heads)) - wanted_refs = list(remote_heads - self.local_heads) - logger.debug("wanted_refs_count=%s", len(wanted_refs)) - if self.statsd is not None: - self.statsd.histogram( - "git_ignored_refs_percent", - len(remote_heads - set(refs.values())) / len(refs), - tags={}, - ) - self.statsd.histogram( - "git_known_refs_percent", - len(self.local_heads & remote_heads) / len(remote_heads), - tags={}, - ) - return wanted_refs + # We're gonna ingest them in smaller interval of refs + start = self.index + self.index += self.limit + + assert self.wanted_refs is not None + asked_refs = self.wanted_refs[start : self.index] + if start > 0: + # Previous refs was already walked so we can remove them from the next walk + # iteration to avoid processing them again + self.walker.heads.update(self.previous_refs) + self.previous_refs = asked_refs + logger.debug("asked_refs_count=%s", len(asked_refs)) + return asked_refs @dataclass @@ -120,6 +161,7 @@ symbolic_refs: Dict[bytes, HexBytes] pack_buffer: SpooledTemporaryFile pack_size: int + continue_loading: bool class GitLoader(BaseGitLoader): @@ -156,6 +198,7 @@ repo_representation: Type[RepoRepresentation] = RepoRepresentation, pack_size_bytes: int = 4 * 1024 * 1024 * 1024, temp_file_cutoff: int = 100 * 1024 * 1024, + packfile_chunk_size: int = DEFAULT_NUMBER_IDS_TO_FETCH, **kwargs: Any, ): """Initialize the bulk updater. @@ -171,6 +214,17 @@ """ super().__init__(storage=storage, origin_url=url, **kwargs) + # check if repository only supports git dumb transfer protocol, + # fetched pack file will be empty in that case as dulwich do + # not support it and do not fetch any refs + logger.debug("Transport url to communicate with server: %s", url) + self.client, self.path = dulwich.client.get_transport_and_path( + url, thin_packs=False + ) + logger.debug("Client %s to fetch pack at %s", self.client, self.path) + self.dumb = url.startswith("http") and getattr(self.client, "dumb", False) + # will create partial snapshot alongside fetching mutliple packfiles + self.create_partial_snapshot = not self.dumb and incremental self.incremental = incremental self.repo_representation = repo_representation self.pack_size_bytes = pack_size_bytes @@ -179,6 +233,7 @@ self.remote_refs: Dict[bytes, HexBytes] = {} self.symbolic_refs: Dict[bytes, HexBytes] = {} self.ref_object_types: Dict[bytes, Optional[TargetType]] = {} + self.packfile_chunk_size = packfile_chunk_size def fetch_pack_from_origin( self, @@ -189,16 +244,6 @@ """Fetch a pack from the origin""" pack_buffer = SpooledTemporaryFile(max_size=self.temp_file_cutoff) - transport_url = origin_url - - logger.debug("Transport url to communicate with server: %s", transport_url) - - client, path = dulwich.client.get_transport_and_path( - transport_url, thin_packs=False - ) - - logger.debug("Client %s to fetch pack at %s", client, path) - size_limit = self.pack_size_bytes def do_pack(data: bytes) -> None: @@ -213,8 +258,8 @@ pack_buffer.write(data) - pack_result = client.fetch_pack( - path, + pack_result = self.client.fetch_pack( + self.path, base_repo.determine_wants, base_repo.graph_walker(), do_pack, @@ -230,16 +275,12 @@ logger.debug("fetched_pack_size=%s", pack_size) - # check if repository only supports git dumb transfer protocol, - # fetched pack file will be empty in that case as dulwich do - # not support it and do not fetch any refs - self.dumb = transport_url.startswith("http") and getattr(client, "dumb", False) - return FetchPackReturn( - remote_refs=utils.filter_refs(remote_refs), - symbolic_refs=utils.filter_refs(symbolic_refs), + remote_refs=remote_refs, + symbolic_refs=symbolic_refs, pack_buffer=pack_buffer, pack_size=pack_size, + continue_loading=not self.base_repo.more_refs_to_fetch(), ) def get_full_snapshot(self, origin_url) -> Optional[Snapshot]: @@ -280,24 +321,23 @@ # count how many runs of the loader are with each incremental mode self.statsd.increment("git_total", tags={}) - def fetch_data(self) -> bool: - assert self.origin is not None - - base_repo = self.repo_representation( + self.base_repo = self.repo_representation( storage=self.storage, base_snapshots=self.base_snapshots, incremental=self.incremental, statsd=self.statsd, + limit=self.packfile_chunk_size, ) - def do_progress(msg: bytes) -> None: - sys.stderr.buffer.write(msg) - sys.stderr.flush() + def fetch_data(self) -> bool: + continue_loading = False + assert self.origin is not None try: fetch_info = self.fetch_pack_from_origin( - self.origin.url, base_repo, do_progress + self.origin.url, self.base_repo, do_print_progress ) + continue_loading = fetch_info.continue_loading except (dulwich.client.HTTPUnauthorized, NotGitRepository) as e: raise NotFound(e) except GitProtocolError as e: @@ -321,21 +361,29 @@ if not self.dumb: raise - logger.debug( - "Protocol used for communication: %s", "dumb" if self.dumb else "smart" - ) if self.dumb: - self.dumb_fetcher = dumb.GitObjectsFetcher(self.origin.url, base_repo) + protocol = "dumb" + self.dumb_fetcher = dumb.GitObjectsFetcher(self.origin.url, self.base_repo) self.dumb_fetcher.fetch_object_ids() - self.remote_refs = utils.filter_refs(self.dumb_fetcher.refs) - self.symbolic_refs = utils.filter_refs(self.dumb_fetcher.head) + remote_refs = self.dumb_fetcher.refs + symbolic_refs = self.dumb_fetcher.head else: + protocol = "smart" self.pack_buffer = fetch_info.pack_buffer self.pack_size = fetch_info.pack_size - self.remote_refs = fetch_info.remote_refs - self.symbolic_refs = fetch_info.symbolic_refs + remote_refs = fetch_info.remote_refs + symbolic_refs = fetch_info.symbolic_refs + + logger.debug("Protocol used for communication: %s", protocol) - self.ref_object_types = {sha1: None for sha1 in self.remote_refs.values()} + # So the partial snapshot and the final ones creates the full branches + self.remote_refs.update(utils.filter_refs(remote_refs)) + self.symbolic_refs.update(utils.filter_refs(symbolic_refs)) + + for sha1 in self.remote_refs.values(): + if sha1 in self.ref_object_types: + continue + self.ref_object_types[sha1] = None logger.info( "Listed %d refs for repo %s", @@ -348,8 +396,7 @@ }, ) - # No more data to fetch - return False + return continue_loading def save_data(self) -> None: """Store a pack for archival""" @@ -374,6 +421,20 @@ with open(os.path.join(pack_dir, refs_name), "xb") as f: pickle.dump(self.remote_refs, f) + def build_partial_snapshot(self) -> Optional[Snapshot]: + # Current implementation makes it a simple call to existing :meth:`get_snapshot` + return self.get_snapshot() + + def store_data(self): + """Override the default implementation so we make sure to close the pack_buffer + if we use one in between loop (dumb loader does not actually one for example). + + """ + super().store_data() + + if not self.dumb: + self.pack_buffer.close() + def iter_objects(self, object_type: bytes) -> Iterator[ShaFile]: """Read all the objects of type `object_type` from the packfile""" if self.dumb: @@ -541,15 +602,15 @@ if unknown_objects: # This object was referenced by the server; We did not fetch # it, and we do not know it from the previous snapshot. This is - # likely a bug in the loader. - raise RuntimeError( - "Unknown objects referenced by remote refs: %s" - % ( + # possible as we allow partial snapshot + logger.warning( + "Unknown objects referenced by remote refs: %s", + ( ", ".join( f"{name.decode()}: {hashutil.hash_to_hex(obj)}" for name, obj in unknown_objects.items() ) - ) + ), ) utils.warn_dangling_branches( diff --git a/swh/loader/git/tests/test_loader.py b/swh/loader/git/tests/test_loader.py --- a/swh/loader/git/tests/test_loader.py +++ b/swh/loader/git/tests/test_loader.py @@ -28,6 +28,7 @@ prepare_repository_from_archive, ) from swh.model.model import Origin, OriginVisit, OriginVisitStatus, Snapshot +from swh.storage.algos.origin import origin_get_latest_visit_status class CommonGitLoaderNotFound: @@ -95,6 +96,42 @@ snapshot=None, ) + def test_load_visit_multiple_times(self): + """Ingesting repositories in multiple packfiles should be ok""" + # Make the loader retrieve multiple packfiles + self.loader.packfile_chunk_size = 2 + + res = self.loader.load() + + assert res == {"status": "eventful"} + + stats = get_stats(self.loader.storage) + assert stats == { + "content": 4, + "directory": 7, + "origin": 1, + "origin_visit": 1, + "release": 0, + "revision": 7, + "skipped_content": 0, + "snapshot": 1 + 1, # one partial snapshot and one final + } + + partial_visit = origin_get_latest_visit_status( + self.loader.storage, self.repo_url, type="git", allowed_statuses=["partial"] + ) + assert partial_visit is not None + assert partial_visit.snapshot is not None + + # Final status is ok + visit_status = assert_last_visit_matches( + self.loader.storage, + self.repo_url, + status="full", + type="git", + ) + assert visit_status.snapshot is not None + class TestGitLoader(FullGitLoaderTests, CommonGitLoaderNotFound): """Prepare a git directory repository to be loaded through a GitLoader.