Page MenuHomeSoftware Heritage

D6386.id31893.diff
No OneTemporary

D6386.id31893.diff

diff --git a/requirements-swh.txt b/requirements-swh.txt
--- a/requirements-swh.txt
+++ b/requirements-swh.txt
@@ -1,5 +1,5 @@
swh.core >= 0.0.7
-swh.loader.core >= 5.0.0
swh.model >= 4.3.0
swh.scheduler >= 0.0.39
swh.storage >= 0.22.0
+swh.loader.core @ git+https://forge.softwareheritage.org/source/staging.git@ad5c8f49c1dea00e95b895811df21e3c93a6667d#egg=swh.loader.core
diff --git a/swh/loader/git/loader.py b/swh/loader/git/loader.py
--- a/swh/loader/git/loader.py
+++ b/swh/loader/git/loader.py
@@ -42,15 +42,24 @@
heads_logger = logger.getChild("refs")
+DEFAULT_NUMBER_IDS_TO_FETCH = 200
+
+
+def do_print_progress(msg: bytes) -> None:
+ sys.stderr.buffer.write(msg)
+ sys.stderr.flush()
+
+
class RepoRepresentation:
"""Repository representation for a Software Heritage origin."""
def __init__(
self,
storage,
- base_snapshots: List[Snapshot] = None,
+ base_snapshots: Optional[List[Snapshot]] = None,
incremental: bool = True,
- statsd: Statsd = None,
+ statsd: Optional[Statsd] = None,
+ limit: int = DEFAULT_NUMBER_IDS_TO_FETCH,
):
self.storage = storage
self.incremental = incremental
@@ -71,10 +80,24 @@
heads_logger.debug(" %r: %s", branch_name, branch.target.hex())
self.local_heads.add(HexBytes(hashutil.hash_to_bytehex(branch.target)))
- def graph_walker(self) -> ObjectStoreGraphWalker:
- return ObjectStoreGraphWalker(self.local_heads, get_parents=lambda commit: [])
+ self.heads: Set[HexBytes] = set()
+ self.wanted_refs: Optional[List[HexBytes]] = None
+ self.walker = ObjectStoreGraphWalker(self.heads, lambda commit: [])
+ # Pagination index
+ self.index: int = 0
+ self.limit = limit
+ self.previous_refs: List[HexBytes] = []
+
+ def more_refs_to_fetch(self) -> bool:
+ """Did we fetch all wanted refs?"""
+ return self.wanted_refs is not None and self.index > len(self.wanted_refs)
- def determine_wants(self, refs: Dict[bytes, HexBytes]) -> List[HexBytes]:
+ def graph_walker(self):
+ return self.walker
+
+ def determine_wants(
+ self, refs: Dict[bytes, HexBytes], depth=None
+ ) -> List[HexBytes]:
"""Get the list of bytehex sha1s that the git loader should fetch.
This compares the remote refs sent by the server with the base snapshot
@@ -84,34 +107,52 @@
if not refs:
return []
- if heads_logger.isEnabledFor(logging.DEBUG):
- heads_logger.debug("Heads returned by the git remote:")
- for name, value in refs.items():
- heads_logger.debug(" %r: %s", name, value.decode())
+ if not self.wanted_refs:
+ # We'll compute all wanted_refs to ingest but we'll return it by batch of
+ # limit
+ if heads_logger.isEnabledFor(logging.DEBUG):
+ heads_logger.debug("Heads returned by the git remote:")
+ for name, value in refs.items():
+ heads_logger.debug(" %r: %s", name, value.decode())
+
+ # Get the remote heads that we want to fetch
+ remote_heads: Set[HexBytes] = set()
+ for ref_name, ref_target in refs.items():
+ if utils.ignore_branch_name(ref_name):
+ continue
+ remote_heads.add(ref_target)
+
+ logger.debug("local_heads_count=%s", len(self.local_heads))
+ logger.debug("remote_heads_count=%s", len(remote_heads))
+ wanted_refs = list(remote_heads - self.local_heads)
+ logger.debug("wanted_refs_count=%s", len(wanted_refs))
+ if self.statsd is not None:
+ self.statsd.histogram(
+ "git_ignored_refs_percent",
+ len(remote_heads - set(refs.values())) / len(refs),
+ tags={},
+ )
+ self.statsd.histogram(
+ "git_known_refs_percent",
+ len(self.local_heads & remote_heads) / len(remote_heads),
+ tags={},
+ )
+ # Determine all refs we want to ingest
+ self.wanted_refs = wanted_refs
- # Get the remote heads that we want to fetch
- remote_heads: Set[HexBytes] = set()
- for ref_name, ref_target in refs.items():
- if utils.ignore_branch_name(ref_name):
- continue
- remote_heads.add(ref_target)
-
- logger.debug("local_heads_count=%s", len(self.local_heads))
- logger.debug("remote_heads_count=%s", len(remote_heads))
- wanted_refs = list(remote_heads - self.local_heads)
- logger.debug("wanted_refs_count=%s", len(wanted_refs))
- if self.statsd is not None:
- self.statsd.histogram(
- "git_ignored_refs_percent",
- len(remote_heads - set(refs.values())) / len(refs),
- tags={},
- )
- self.statsd.histogram(
- "git_known_refs_percent",
- len(self.local_heads & remote_heads) / len(remote_heads),
- tags={},
- )
- return wanted_refs
+ # We're gonna ingest them in smaller interval of refs
+ start = self.index
+ self.index += self.limit
+
+ assert self.wanted_refs is not None
+ asked_refs = self.wanted_refs[start : self.index]
+ if start > 0:
+ # Previous refs was already walked so we can remove them from the next walk
+ # iteration to avoid processing them again
+ self.walker.heads.update(self.previous_refs)
+ self.previous_refs = asked_refs
+ logger.debug("asked_refs_count=%s", len(asked_refs))
+ return asked_refs
@dataclass
@@ -120,6 +161,7 @@
symbolic_refs: Dict[bytes, HexBytes]
pack_buffer: SpooledTemporaryFile
pack_size: int
+ continue_loading: bool
class GitLoader(BaseGitLoader):
@@ -156,6 +198,7 @@
repo_representation: Type[RepoRepresentation] = RepoRepresentation,
pack_size_bytes: int = 4 * 1024 * 1024 * 1024,
temp_file_cutoff: int = 100 * 1024 * 1024,
+ packfile_chunk_size: int = DEFAULT_NUMBER_IDS_TO_FETCH,
**kwargs: Any,
):
"""Initialize the bulk updater.
@@ -171,6 +214,17 @@
"""
super().__init__(storage=storage, origin_url=url, **kwargs)
+ # check if repository only supports git dumb transfer protocol,
+ # fetched pack file will be empty in that case as dulwich do
+ # not support it and do not fetch any refs
+ logger.debug("Transport url to communicate with server: %s", url)
+ self.client, self.path = dulwich.client.get_transport_and_path(
+ url, thin_packs=False
+ )
+ logger.debug("Client %s to fetch pack at %s", self.client, self.path)
+ self.dumb = url.startswith("http") and getattr(self.client, "dumb", False)
+ # will create partial snapshot alongside fetching mutliple packfiles
+ self.create_partial_snapshot = not self.dumb and incremental
self.incremental = incremental
self.repo_representation = repo_representation
self.pack_size_bytes = pack_size_bytes
@@ -179,6 +233,7 @@
self.remote_refs: Dict[bytes, HexBytes] = {}
self.symbolic_refs: Dict[bytes, HexBytes] = {}
self.ref_object_types: Dict[bytes, Optional[TargetType]] = {}
+ self.packfile_chunk_size = packfile_chunk_size
def fetch_pack_from_origin(
self,
@@ -189,16 +244,6 @@
"""Fetch a pack from the origin"""
pack_buffer = SpooledTemporaryFile(max_size=self.temp_file_cutoff)
- transport_url = origin_url
-
- logger.debug("Transport url to communicate with server: %s", transport_url)
-
- client, path = dulwich.client.get_transport_and_path(
- transport_url, thin_packs=False
- )
-
- logger.debug("Client %s to fetch pack at %s", client, path)
-
size_limit = self.pack_size_bytes
def do_pack(data: bytes) -> None:
@@ -213,8 +258,8 @@
pack_buffer.write(data)
- pack_result = client.fetch_pack(
- path,
+ pack_result = self.client.fetch_pack(
+ self.path,
base_repo.determine_wants,
base_repo.graph_walker(),
do_pack,
@@ -230,16 +275,12 @@
logger.debug("fetched_pack_size=%s", pack_size)
- # check if repository only supports git dumb transfer protocol,
- # fetched pack file will be empty in that case as dulwich do
- # not support it and do not fetch any refs
- self.dumb = transport_url.startswith("http") and getattr(client, "dumb", False)
-
return FetchPackReturn(
- remote_refs=utils.filter_refs(remote_refs),
- symbolic_refs=utils.filter_refs(symbolic_refs),
+ remote_refs=remote_refs,
+ symbolic_refs=symbolic_refs,
pack_buffer=pack_buffer,
pack_size=pack_size,
+ continue_loading=not self.base_repo.more_refs_to_fetch(),
)
def get_full_snapshot(self, origin_url) -> Optional[Snapshot]:
@@ -280,24 +321,23 @@
# count how many runs of the loader are with each incremental mode
self.statsd.increment("git_total", tags={})
- def fetch_data(self) -> bool:
- assert self.origin is not None
-
- base_repo = self.repo_representation(
+ self.base_repo = self.repo_representation(
storage=self.storage,
base_snapshots=self.base_snapshots,
incremental=self.incremental,
statsd=self.statsd,
+ limit=self.packfile_chunk_size,
)
- def do_progress(msg: bytes) -> None:
- sys.stderr.buffer.write(msg)
- sys.stderr.flush()
+ def fetch_data(self) -> bool:
+ continue_loading = False
+ assert self.origin is not None
try:
fetch_info = self.fetch_pack_from_origin(
- self.origin.url, base_repo, do_progress
+ self.origin.url, self.base_repo, do_print_progress
)
+ continue_loading = fetch_info.continue_loading
except (dulwich.client.HTTPUnauthorized, NotGitRepository) as e:
raise NotFound(e)
except GitProtocolError as e:
@@ -321,21 +361,29 @@
if not self.dumb:
raise
- logger.debug(
- "Protocol used for communication: %s", "dumb" if self.dumb else "smart"
- )
if self.dumb:
- self.dumb_fetcher = dumb.GitObjectsFetcher(self.origin.url, base_repo)
+ protocol = "dumb"
+ self.dumb_fetcher = dumb.GitObjectsFetcher(self.origin.url, self.base_repo)
self.dumb_fetcher.fetch_object_ids()
- self.remote_refs = utils.filter_refs(self.dumb_fetcher.refs)
- self.symbolic_refs = utils.filter_refs(self.dumb_fetcher.head)
+ remote_refs = self.dumb_fetcher.refs
+ symbolic_refs = self.dumb_fetcher.head
else:
+ protocol = "smart"
self.pack_buffer = fetch_info.pack_buffer
self.pack_size = fetch_info.pack_size
- self.remote_refs = fetch_info.remote_refs
- self.symbolic_refs = fetch_info.symbolic_refs
+ remote_refs = fetch_info.remote_refs
+ symbolic_refs = fetch_info.symbolic_refs
+
+ logger.debug("Protocol used for communication: %s", protocol)
- self.ref_object_types = {sha1: None for sha1 in self.remote_refs.values()}
+ # So the partial snapshot and the final ones creates the full branches
+ self.remote_refs.update(utils.filter_refs(remote_refs))
+ self.symbolic_refs.update(utils.filter_refs(symbolic_refs))
+
+ for sha1 in self.remote_refs.values():
+ if sha1 in self.ref_object_types:
+ continue
+ self.ref_object_types[sha1] = None
logger.info(
"Listed %d refs for repo %s",
@@ -348,8 +396,7 @@
},
)
- # No more data to fetch
- return False
+ return continue_loading
def save_data(self) -> None:
"""Store a pack for archival"""
@@ -374,6 +421,20 @@
with open(os.path.join(pack_dir, refs_name), "xb") as f:
pickle.dump(self.remote_refs, f)
+ def build_partial_snapshot(self) -> Optional[Snapshot]:
+ # Current implementation makes it a simple call to existing :meth:`get_snapshot`
+ return self.get_snapshot()
+
+ def store_data(self):
+ """Override the default implementation so we make sure to close the pack_buffer
+ if we use one in between loop (dumb loader does not actually one for example).
+
+ """
+ super().store_data()
+
+ if not self.dumb:
+ self.pack_buffer.close()
+
def iter_objects(self, object_type: bytes) -> Iterator[ShaFile]:
"""Read all the objects of type `object_type` from the packfile"""
if self.dumb:
@@ -541,15 +602,15 @@
if unknown_objects:
# This object was referenced by the server; We did not fetch
# it, and we do not know it from the previous snapshot. This is
- # likely a bug in the loader.
- raise RuntimeError(
- "Unknown objects referenced by remote refs: %s"
- % (
+ # possible as we allow partial snapshot
+ logger.warning(
+ "Unknown objects referenced by remote refs: %s",
+ (
", ".join(
f"{name.decode()}: {hashutil.hash_to_hex(obj)}"
for name, obj in unknown_objects.items()
)
- )
+ ),
)
utils.warn_dangling_branches(
diff --git a/swh/loader/git/tests/test_loader.py b/swh/loader/git/tests/test_loader.py
--- a/swh/loader/git/tests/test_loader.py
+++ b/swh/loader/git/tests/test_loader.py
@@ -28,6 +28,7 @@
prepare_repository_from_archive,
)
from swh.model.model import Origin, OriginVisit, OriginVisitStatus, Snapshot
+from swh.storage.algos.origin import origin_get_latest_visit_status
class CommonGitLoaderNotFound:
@@ -95,6 +96,42 @@
snapshot=None,
)
+ def test_load_visit_multiple_times(self):
+ """Ingesting repositories in multiple packfiles should be ok"""
+ # Make the loader retrieve multiple packfiles
+ self.loader.packfile_chunk_size = 2
+
+ res = self.loader.load()
+
+ assert res == {"status": "eventful"}
+
+ stats = get_stats(self.loader.storage)
+ assert stats == {
+ "content": 4,
+ "directory": 7,
+ "origin": 1,
+ "origin_visit": 1,
+ "release": 0,
+ "revision": 7,
+ "skipped_content": 0,
+ "snapshot": 1 + 1, # one partial snapshot and one final
+ }
+
+ partial_visit = origin_get_latest_visit_status(
+ self.loader.storage, self.repo_url, type="git", allowed_statuses=["partial"]
+ )
+ assert partial_visit is not None
+ assert partial_visit.snapshot is not None
+
+ # Final status is ok
+ visit_status = assert_last_visit_matches(
+ self.loader.storage,
+ self.repo_url,
+ status="full",
+ type="git",
+ )
+ assert visit_status.snapshot is not None
+
class TestGitLoader(FullGitLoaderTests, CommonGitLoaderNotFound):
"""Prepare a git directory repository to be loaded through a GitLoader.

File Metadata

Mime Type
text/plain
Expires
Thu, Jul 3, 3:55 PM (2 w, 1 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3231725

Event Timeline