diff --git a/requirements-swh.txt b/requirements-swh.txt --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,5 +1,5 @@ swh.core >= 0.0.7 -swh.loader.core >= 0.18.0 +swh.loader.core >= 0.26.0 swh.model >= 2.9.0 swh.scheduler >= 0.0.39 swh.storage >= 0.22.0 diff --git a/swh/loader/git/loader.py b/swh/loader/git/loader.py --- a/swh/loader/git/loader.py +++ b/swh/loader/git/loader.py @@ -40,11 +40,23 @@ logger = logging.getLogger(__name__) +DEFAULT_NUMBER_IDS_TO_FETCH = 200 + + +def do_print_progress(msg: bytes) -> None: + sys.stderr.buffer.write(msg) + sys.stderr.flush() + + class RepoRepresentation: """Repository representation for a Software Heritage origin.""" def __init__( - self, storage, base_snapshot: Optional[Snapshot] = None, ignore_history=False + self, + storage, + base_snapshot: Optional[Snapshot] = None, + ignore_history=False, + limit: int = DEFAULT_NUMBER_IDS_TO_FETCH, ): self.storage = storage self.ignore_history = ignore_history @@ -55,15 +67,27 @@ self.base_snapshot = Snapshot(branches={}) self.heads: Set[HexBytes] = set() + self.wanted_refs: Optional[List[HexBytes]] = None + # Pagination index + self.index: int = 0 + self.limit = limit + self.walker = ObjectStoreGraphWalker(self.heads, self.get_parents) + self.previous_refs: List[HexBytes] = [] def get_parents(self, commit: bytes) -> List[bytes]: """This method should return the list of known parents""" return [] def graph_walker(self) -> ObjectStoreGraphWalker: - return ObjectStoreGraphWalker(self.heads, self.get_parents) + return self.walker + + def wanted_refs_fetched(self) -> bool: + """Did we fetch all wanted refs?""" + return self.wanted_refs is not None and self.index > len(self.wanted_refs) - def determine_wants(self, refs: Dict[bytes, HexBytes]) -> List[HexBytes]: + def determine_wants( + self, refs: Dict[bytes, HexBytes], depth=None + ) -> List[HexBytes]: """Get the list of bytehex sha1s that the git loader should fetch. This compares the remote refs sent by the server with the base snapshot @@ -73,27 +97,44 @@ if not refs: return [] - # Cache existing heads - local_heads: Set[HexBytes] = set() - for branch_name, branch in self.base_snapshot.branches.items(): - if not branch or branch.target_type == TargetType.ALIAS: - continue - local_heads.add(hashutil.hash_to_hex(branch.target).encode()) + if not self.wanted_refs: + # We'll compute all wanted_refs to ingest but we'll return it by batch of + # limit - self.heads = local_heads + # Cache existing heads + local_heads: Set[HexBytes] = set() + for branch_name, branch in self.base_snapshot.branches.items(): + if not branch or branch.target_type == TargetType.ALIAS: + continue + local_heads.add(hashutil.hash_to_hex(branch.target).encode()) - # Get the remote heads that we want to fetch - remote_heads: Set[HexBytes] = set() - for ref_name, ref_target in refs.items(): - if utils.ignore_branch_name(ref_name): - continue - remote_heads.add(ref_target) + self.heads = local_heads + + # Get the remote heads that we want to fetch + remote_heads: Set[HexBytes] = set() + for ref_name, ref_target in refs.items(): + if utils.ignore_branch_name(ref_name): + continue + remote_heads.add(ref_target) + + logger.debug("local_heads_count=%s", len(local_heads)) + logger.debug("remote_heads_count=%s", len(remote_heads)) + wanted_refs = list(remote_heads - local_heads) + logger.debug("wanted_refs_count=%s", len(wanted_refs)) + self.wanted_refs = wanted_refs - logger.debug("local_heads_count=%s", len(local_heads)) - logger.debug("remote_heads_count=%s", len(remote_heads)) - wanted_refs = list(remote_heads - local_heads) - logger.debug("wanted_refs_count=%s", len(wanted_refs)) - return wanted_refs + start = self.index + self.index += self.limit + + assert self.wanted_refs is not None + asked_refs = self.wanted_refs[start : self.index] + if start > 0: + # Previous refs was already walked so we can remove them from the next walk + # iteration to avoid processing them again + self.walker.heads.update(self.previous_refs) + self.previous_refs = asked_refs + logger.debug("asked_refs_count=%s", len(asked_refs)) + return asked_refs @dataclass @@ -102,6 +143,7 @@ symbolic_refs: Dict[bytes, HexBytes] pack_buffer: SpooledTemporaryFile pack_size: int + continue_loading: bool class GitLoader(DVCSLoader): @@ -120,6 +162,8 @@ temp_file_cutoff: int = 100 * 1024 * 1024, save_data_path: Optional[str] = None, max_content_size: Optional[int] = None, + # Number of ids per packfile + packfile_chunk_size: int = DEFAULT_NUMBER_IDS_TO_FETCH, ): """Initialize the bulk updater. @@ -144,6 +188,7 @@ self.remote_refs: Dict[bytes, HexBytes] = {} self.symbolic_refs: Dict[bytes, HexBytes] = {} self.ref_object_types: Dict[bytes, Optional[TargetType]] = {} + self.packfile_chunk_size = packfile_chunk_size def fetch_pack_from_origin( self, @@ -221,6 +266,7 @@ symbolic_refs=utils.filter_refs(symbolic_refs), pack_buffer=pack_buffer, pack_size=pack_size, + continue_loading=not self.base_repo.wanted_refs_fetched(), ) def prepare_origin_visit(self) -> None: @@ -248,23 +294,22 @@ else: self.base_snapshot = Snapshot(branches={}) - def fetch_data(self) -> bool: - assert self.origin is not None - - base_repo = self.repo_representation( + self.base_repo = self.repo_representation( storage=self.storage, base_snapshot=self.base_snapshot, ignore_history=self.ignore_history, + limit=self.packfile_chunk_size, ) - def do_progress(msg: bytes) -> None: - sys.stderr.buffer.write(msg) - sys.stderr.flush() + def fetch_data(self) -> bool: + continue_loading = False + assert self.origin is not None try: fetch_info = self.fetch_pack_from_origin( - self.origin.url, base_repo, do_progress + self.origin.url, self.base_repo, do_print_progress ) + continue_loading = fetch_info.continue_loading except NotGitRepository as e: raise NotFound(e) except GitProtocolError as e: @@ -292,17 +337,20 @@ "Protocol used for communication: %s", "dumb" if self.dumb else "smart" ) if self.dumb: - self.dumb_fetcher = dumb.GitObjectsFetcher(self.origin_url, base_repo) + self.dumb_fetcher = dumb.GitObjectsFetcher(self.origin_url, self.base_repo) self.dumb_fetcher.fetch_object_ids() self.remote_refs = utils.filter_refs(self.dumb_fetcher.refs) # type: ignore self.symbolic_refs = self.dumb_fetcher.head else: self.pack_buffer = fetch_info.pack_buffer self.pack_size = fetch_info.pack_size - self.remote_refs = fetch_info.remote_refs - self.symbolic_refs = fetch_info.symbolic_refs + self.remote_refs.update(fetch_info.remote_refs) + self.symbolic_refs.update(fetch_info.symbolic_refs) - self.ref_object_types = {sha1: None for sha1 in self.remote_refs.values()} + for sha1 in self.remote_refs.values(): + if sha1 in self.ref_object_types: + continue + self.ref_object_types[sha1] = None logger.info( "Listed %d refs for repo %s", @@ -315,8 +363,7 @@ }, ) - # No more data to fetch - return False + return continue_loading def save_data(self) -> None: """Store a pack for archival""" @@ -341,6 +388,16 @@ with open(os.path.join(pack_dir, refs_name), "xb") as f: pickle.dump(self.remote_refs, f) + def store_data(self, create_partial_visit: bool = False): + """Override the default implementation so we make sure to close the pack_buffer + if we use one in between loop (dumb loader does not actually one for example). + + """ + super().store_data(create_partial_visit) + + if not self.dumb: + self.pack_buffer.close() + def iter_objects(self, object_type: bytes) -> Iterator[ShaFile]: """Read all the objects of type `object_type` from the packfile""" if self.dumb: @@ -459,15 +516,15 @@ if unknown_objects: # This object was referenced by the server; We did not fetch # it, and we do not know it from the previous snapshot. This is - # likely a bug in the loader. - raise RuntimeError( - "Unknown objects referenced by remote refs: %s" - % ( + # possible as we allow partial snapshot + logger.warn( + "Unknown objects referenced by remote refs: %s", + ( ", ".join( f"{name.decode()}: {hashutil.hash_to_hex(obj)}" for name, obj in unknown_objects.items() ) - ) + ), ) utils.warn_dangling_branches( diff --git a/swh/loader/git/tests/test_loader.py b/swh/loader/git/tests/test_loader.py --- a/swh/loader/git/tests/test_loader.py +++ b/swh/loader/git/tests/test_loader.py @@ -21,6 +21,7 @@ get_stats, prepare_repository_from_archive, ) +from swh.storage.algos.origin import origin_get_latest_visit_status class CommonGitLoaderNotFound: @@ -88,6 +89,41 @@ snapshot=None, ) + def test_load_visit_multiple_times(self): + """Ingesting repositories in multiple packfiles should be ok + + """ + # Make the loader retrieve multiple packfiles + self.loader.packfile_chunk_size = 3 + + res = self.loader.load() + + assert res == {"status": "eventful"} + + stats = get_stats(self.loader.storage) + assert stats == { + "content": 4, + "directory": 7, + "origin": 1, + "origin_visit": 1, + "release": 0, + "revision": 7, + "skipped_content": 0, + "snapshot": 1 + 1, # one partial snapshot and one final + } + + partial_visit = origin_get_latest_visit_status( + self.loader.storage, self.repo_url, type="git", allowed_statuses=["partial"] + ) + assert partial_visit is not None + assert partial_visit.snapshot is not None + + # Final status is ok + visit_status = assert_last_visit_matches( + self.loader.storage, self.repo_url, status="full", type="git", + ) + assert visit_status.snapshot is not None + class TestGitLoader(FullGitLoaderTests, CommonGitLoaderNotFound): """Prepare a git directory repository to be loaded through a GitLoader.