Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9346351
D6386.id31893.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
15 KB
Subscribers
None
D6386.id31893.diff
View Options
diff --git a/requirements-swh.txt b/requirements-swh.txt
--- a/requirements-swh.txt
+++ b/requirements-swh.txt
@@ -1,5 +1,5 @@
swh.core >= 0.0.7
-swh.loader.core >= 5.0.0
swh.model >= 4.3.0
swh.scheduler >= 0.0.39
swh.storage >= 0.22.0
+swh.loader.core @ git+https://forge.softwareheritage.org/source/staging.git@ad5c8f49c1dea00e95b895811df21e3c93a6667d#egg=swh.loader.core
diff --git a/swh/loader/git/loader.py b/swh/loader/git/loader.py
--- a/swh/loader/git/loader.py
+++ b/swh/loader/git/loader.py
@@ -42,15 +42,24 @@
heads_logger = logger.getChild("refs")
+DEFAULT_NUMBER_IDS_TO_FETCH = 200
+
+
+def do_print_progress(msg: bytes) -> None:
+ sys.stderr.buffer.write(msg)
+ sys.stderr.flush()
+
+
class RepoRepresentation:
"""Repository representation for a Software Heritage origin."""
def __init__(
self,
storage,
- base_snapshots: List[Snapshot] = None,
+ base_snapshots: Optional[List[Snapshot]] = None,
incremental: bool = True,
- statsd: Statsd = None,
+ statsd: Optional[Statsd] = None,
+ limit: int = DEFAULT_NUMBER_IDS_TO_FETCH,
):
self.storage = storage
self.incremental = incremental
@@ -71,10 +80,24 @@
heads_logger.debug(" %r: %s", branch_name, branch.target.hex())
self.local_heads.add(HexBytes(hashutil.hash_to_bytehex(branch.target)))
- def graph_walker(self) -> ObjectStoreGraphWalker:
- return ObjectStoreGraphWalker(self.local_heads, get_parents=lambda commit: [])
+ self.heads: Set[HexBytes] = set()
+ self.wanted_refs: Optional[List[HexBytes]] = None
+ self.walker = ObjectStoreGraphWalker(self.heads, lambda commit: [])
+ # Pagination index
+ self.index: int = 0
+ self.limit = limit
+ self.previous_refs: List[HexBytes] = []
+
+ def more_refs_to_fetch(self) -> bool:
+ """Did we fetch all wanted refs?"""
+ return self.wanted_refs is not None and self.index > len(self.wanted_refs)
- def determine_wants(self, refs: Dict[bytes, HexBytes]) -> List[HexBytes]:
+ def graph_walker(self):
+ return self.walker
+
+ def determine_wants(
+ self, refs: Dict[bytes, HexBytes], depth=None
+ ) -> List[HexBytes]:
"""Get the list of bytehex sha1s that the git loader should fetch.
This compares the remote refs sent by the server with the base snapshot
@@ -84,34 +107,52 @@
if not refs:
return []
- if heads_logger.isEnabledFor(logging.DEBUG):
- heads_logger.debug("Heads returned by the git remote:")
- for name, value in refs.items():
- heads_logger.debug(" %r: %s", name, value.decode())
+ if not self.wanted_refs:
+ # We'll compute all wanted_refs to ingest but we'll return it by batch of
+ # limit
+ if heads_logger.isEnabledFor(logging.DEBUG):
+ heads_logger.debug("Heads returned by the git remote:")
+ for name, value in refs.items():
+ heads_logger.debug(" %r: %s", name, value.decode())
+
+ # Get the remote heads that we want to fetch
+ remote_heads: Set[HexBytes] = set()
+ for ref_name, ref_target in refs.items():
+ if utils.ignore_branch_name(ref_name):
+ continue
+ remote_heads.add(ref_target)
+
+ logger.debug("local_heads_count=%s", len(self.local_heads))
+ logger.debug("remote_heads_count=%s", len(remote_heads))
+ wanted_refs = list(remote_heads - self.local_heads)
+ logger.debug("wanted_refs_count=%s", len(wanted_refs))
+ if self.statsd is not None:
+ self.statsd.histogram(
+ "git_ignored_refs_percent",
+ len(remote_heads - set(refs.values())) / len(refs),
+ tags={},
+ )
+ self.statsd.histogram(
+ "git_known_refs_percent",
+ len(self.local_heads & remote_heads) / len(remote_heads),
+ tags={},
+ )
+ # Determine all refs we want to ingest
+ self.wanted_refs = wanted_refs
- # Get the remote heads that we want to fetch
- remote_heads: Set[HexBytes] = set()
- for ref_name, ref_target in refs.items():
- if utils.ignore_branch_name(ref_name):
- continue
- remote_heads.add(ref_target)
-
- logger.debug("local_heads_count=%s", len(self.local_heads))
- logger.debug("remote_heads_count=%s", len(remote_heads))
- wanted_refs = list(remote_heads - self.local_heads)
- logger.debug("wanted_refs_count=%s", len(wanted_refs))
- if self.statsd is not None:
- self.statsd.histogram(
- "git_ignored_refs_percent",
- len(remote_heads - set(refs.values())) / len(refs),
- tags={},
- )
- self.statsd.histogram(
- "git_known_refs_percent",
- len(self.local_heads & remote_heads) / len(remote_heads),
- tags={},
- )
- return wanted_refs
+ # We're gonna ingest them in smaller interval of refs
+ start = self.index
+ self.index += self.limit
+
+ assert self.wanted_refs is not None
+ asked_refs = self.wanted_refs[start : self.index]
+ if start > 0:
+ # Previous refs was already walked so we can remove them from the next walk
+ # iteration to avoid processing them again
+ self.walker.heads.update(self.previous_refs)
+ self.previous_refs = asked_refs
+ logger.debug("asked_refs_count=%s", len(asked_refs))
+ return asked_refs
@dataclass
@@ -120,6 +161,7 @@
symbolic_refs: Dict[bytes, HexBytes]
pack_buffer: SpooledTemporaryFile
pack_size: int
+ continue_loading: bool
class GitLoader(BaseGitLoader):
@@ -156,6 +198,7 @@
repo_representation: Type[RepoRepresentation] = RepoRepresentation,
pack_size_bytes: int = 4 * 1024 * 1024 * 1024,
temp_file_cutoff: int = 100 * 1024 * 1024,
+ packfile_chunk_size: int = DEFAULT_NUMBER_IDS_TO_FETCH,
**kwargs: Any,
):
"""Initialize the bulk updater.
@@ -171,6 +214,17 @@
"""
super().__init__(storage=storage, origin_url=url, **kwargs)
+ # check if repository only supports git dumb transfer protocol,
+ # fetched pack file will be empty in that case as dulwich do
+ # not support it and do not fetch any refs
+ logger.debug("Transport url to communicate with server: %s", url)
+ self.client, self.path = dulwich.client.get_transport_and_path(
+ url, thin_packs=False
+ )
+ logger.debug("Client %s to fetch pack at %s", self.client, self.path)
+ self.dumb = url.startswith("http") and getattr(self.client, "dumb", False)
+ # will create partial snapshot alongside fetching mutliple packfiles
+ self.create_partial_snapshot = not self.dumb and incremental
self.incremental = incremental
self.repo_representation = repo_representation
self.pack_size_bytes = pack_size_bytes
@@ -179,6 +233,7 @@
self.remote_refs: Dict[bytes, HexBytes] = {}
self.symbolic_refs: Dict[bytes, HexBytes] = {}
self.ref_object_types: Dict[bytes, Optional[TargetType]] = {}
+ self.packfile_chunk_size = packfile_chunk_size
def fetch_pack_from_origin(
self,
@@ -189,16 +244,6 @@
"""Fetch a pack from the origin"""
pack_buffer = SpooledTemporaryFile(max_size=self.temp_file_cutoff)
- transport_url = origin_url
-
- logger.debug("Transport url to communicate with server: %s", transport_url)
-
- client, path = dulwich.client.get_transport_and_path(
- transport_url, thin_packs=False
- )
-
- logger.debug("Client %s to fetch pack at %s", client, path)
-
size_limit = self.pack_size_bytes
def do_pack(data: bytes) -> None:
@@ -213,8 +258,8 @@
pack_buffer.write(data)
- pack_result = client.fetch_pack(
- path,
+ pack_result = self.client.fetch_pack(
+ self.path,
base_repo.determine_wants,
base_repo.graph_walker(),
do_pack,
@@ -230,16 +275,12 @@
logger.debug("fetched_pack_size=%s", pack_size)
- # check if repository only supports git dumb transfer protocol,
- # fetched pack file will be empty in that case as dulwich do
- # not support it and do not fetch any refs
- self.dumb = transport_url.startswith("http") and getattr(client, "dumb", False)
-
return FetchPackReturn(
- remote_refs=utils.filter_refs(remote_refs),
- symbolic_refs=utils.filter_refs(symbolic_refs),
+ remote_refs=remote_refs,
+ symbolic_refs=symbolic_refs,
pack_buffer=pack_buffer,
pack_size=pack_size,
+ continue_loading=not self.base_repo.more_refs_to_fetch(),
)
def get_full_snapshot(self, origin_url) -> Optional[Snapshot]:
@@ -280,24 +321,23 @@
# count how many runs of the loader are with each incremental mode
self.statsd.increment("git_total", tags={})
- def fetch_data(self) -> bool:
- assert self.origin is not None
-
- base_repo = self.repo_representation(
+ self.base_repo = self.repo_representation(
storage=self.storage,
base_snapshots=self.base_snapshots,
incremental=self.incremental,
statsd=self.statsd,
+ limit=self.packfile_chunk_size,
)
- def do_progress(msg: bytes) -> None:
- sys.stderr.buffer.write(msg)
- sys.stderr.flush()
+ def fetch_data(self) -> bool:
+ continue_loading = False
+ assert self.origin is not None
try:
fetch_info = self.fetch_pack_from_origin(
- self.origin.url, base_repo, do_progress
+ self.origin.url, self.base_repo, do_print_progress
)
+ continue_loading = fetch_info.continue_loading
except (dulwich.client.HTTPUnauthorized, NotGitRepository) as e:
raise NotFound(e)
except GitProtocolError as e:
@@ -321,21 +361,29 @@
if not self.dumb:
raise
- logger.debug(
- "Protocol used for communication: %s", "dumb" if self.dumb else "smart"
- )
if self.dumb:
- self.dumb_fetcher = dumb.GitObjectsFetcher(self.origin.url, base_repo)
+ protocol = "dumb"
+ self.dumb_fetcher = dumb.GitObjectsFetcher(self.origin.url, self.base_repo)
self.dumb_fetcher.fetch_object_ids()
- self.remote_refs = utils.filter_refs(self.dumb_fetcher.refs)
- self.symbolic_refs = utils.filter_refs(self.dumb_fetcher.head)
+ remote_refs = self.dumb_fetcher.refs
+ symbolic_refs = self.dumb_fetcher.head
else:
+ protocol = "smart"
self.pack_buffer = fetch_info.pack_buffer
self.pack_size = fetch_info.pack_size
- self.remote_refs = fetch_info.remote_refs
- self.symbolic_refs = fetch_info.symbolic_refs
+ remote_refs = fetch_info.remote_refs
+ symbolic_refs = fetch_info.symbolic_refs
+
+ logger.debug("Protocol used for communication: %s", protocol)
- self.ref_object_types = {sha1: None for sha1 in self.remote_refs.values()}
+ # So the partial snapshot and the final ones creates the full branches
+ self.remote_refs.update(utils.filter_refs(remote_refs))
+ self.symbolic_refs.update(utils.filter_refs(symbolic_refs))
+
+ for sha1 in self.remote_refs.values():
+ if sha1 in self.ref_object_types:
+ continue
+ self.ref_object_types[sha1] = None
logger.info(
"Listed %d refs for repo %s",
@@ -348,8 +396,7 @@
},
)
- # No more data to fetch
- return False
+ return continue_loading
def save_data(self) -> None:
"""Store a pack for archival"""
@@ -374,6 +421,20 @@
with open(os.path.join(pack_dir, refs_name), "xb") as f:
pickle.dump(self.remote_refs, f)
+ def build_partial_snapshot(self) -> Optional[Snapshot]:
+ # Current implementation makes it a simple call to existing :meth:`get_snapshot`
+ return self.get_snapshot()
+
+ def store_data(self):
+ """Override the default implementation so we make sure to close the pack_buffer
+ if we use one in between loop (dumb loader does not actually one for example).
+
+ """
+ super().store_data()
+
+ if not self.dumb:
+ self.pack_buffer.close()
+
def iter_objects(self, object_type: bytes) -> Iterator[ShaFile]:
"""Read all the objects of type `object_type` from the packfile"""
if self.dumb:
@@ -541,15 +602,15 @@
if unknown_objects:
# This object was referenced by the server; We did not fetch
# it, and we do not know it from the previous snapshot. This is
- # likely a bug in the loader.
- raise RuntimeError(
- "Unknown objects referenced by remote refs: %s"
- % (
+ # possible as we allow partial snapshot
+ logger.warning(
+ "Unknown objects referenced by remote refs: %s",
+ (
", ".join(
f"{name.decode()}: {hashutil.hash_to_hex(obj)}"
for name, obj in unknown_objects.items()
)
- )
+ ),
)
utils.warn_dangling_branches(
diff --git a/swh/loader/git/tests/test_loader.py b/swh/loader/git/tests/test_loader.py
--- a/swh/loader/git/tests/test_loader.py
+++ b/swh/loader/git/tests/test_loader.py
@@ -28,6 +28,7 @@
prepare_repository_from_archive,
)
from swh.model.model import Origin, OriginVisit, OriginVisitStatus, Snapshot
+from swh.storage.algos.origin import origin_get_latest_visit_status
class CommonGitLoaderNotFound:
@@ -95,6 +96,42 @@
snapshot=None,
)
+ def test_load_visit_multiple_times(self):
+ """Ingesting repositories in multiple packfiles should be ok"""
+ # Make the loader retrieve multiple packfiles
+ self.loader.packfile_chunk_size = 2
+
+ res = self.loader.load()
+
+ assert res == {"status": "eventful"}
+
+ stats = get_stats(self.loader.storage)
+ assert stats == {
+ "content": 4,
+ "directory": 7,
+ "origin": 1,
+ "origin_visit": 1,
+ "release": 0,
+ "revision": 7,
+ "skipped_content": 0,
+ "snapshot": 1 + 1, # one partial snapshot and one final
+ }
+
+ partial_visit = origin_get_latest_visit_status(
+ self.loader.storage, self.repo_url, type="git", allowed_statuses=["partial"]
+ )
+ assert partial_visit is not None
+ assert partial_visit.snapshot is not None
+
+ # Final status is ok
+ visit_status = assert_last_visit_matches(
+ self.loader.storage,
+ self.repo_url,
+ status="full",
+ type="git",
+ )
+ assert visit_status.snapshot is not None
+
class TestGitLoader(FullGitLoaderTests, CommonGitLoaderNotFound):
"""Prepare a git directory repository to be loaded through a GitLoader.
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Thu, Jul 3, 3:55 PM (2 w, 1 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3231725
Attached To
D6386: git: Load git repository through multiple packfiles fetch operations
Event Timeline
Log In to Comment