Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7122916
D6386.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
21 KB
Subscribers
None
D6386.diff
View Options
diff --git a/requirements-swh.txt b/requirements-swh.txt
--- a/requirements-swh.txt
+++ b/requirements-swh.txt
@@ -1,5 +1,5 @@
swh.core >= 0.0.7
-swh.loader.core >= 5.0.0
swh.model >= 4.3.0
swh.scheduler >= 0.0.39
swh.storage >= 0.22.0
+swh.loader.core @ git+https://forge.softwareheritage.org/source/staging.git@ad5c8f49c1dea00e95b895811df21e3c93a6667d#egg=swh.loader.core
diff --git a/swh/loader/git/dumb.py b/swh/loader/git/dumb.py
--- a/swh/loader/git/dumb.py
+++ b/swh/loader/git/dumb.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2021 The Software Heritage developers
+# Copyright (C) 2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
@@ -21,7 +21,7 @@
from swh.loader.git.utils import HexBytes
if TYPE_CHECKING:
- from .loader import RepoRepresentation
+ from .loader import BaseRepoRepresentation
logger = logging.getLogger(__name__)
@@ -69,7 +69,7 @@
base_repo: State of repository archived by Software Heritage
"""
- def __init__(self, repo_url: str, base_repo: RepoRepresentation):
+ def __init__(self, repo_url: str, base_repo: BaseRepoRepresentation):
self._session = requests.Session()
self.repo_url = repo_url
self.base_repo = base_repo
diff --git a/swh/loader/git/loader.py b/swh/loader/git/loader.py
--- a/swh/loader/git/loader.py
+++ b/swh/loader/git/loader.py
@@ -42,15 +42,24 @@
heads_logger = logger.getChild("refs")
-class RepoRepresentation:
+DEFAULT_NUMBER_OF_HEADS_PER_PACKFILE = 200
+
+
+def do_print_progress(msg: bytes) -> None:
+ sys.stderr.buffer.write(msg)
+ sys.stderr.flush()
+
+
+class BaseRepoRepresentation:
"""Repository representation for a Software Heritage origin."""
def __init__(
self,
storage,
- base_snapshots: List[Snapshot] = None,
+ base_snapshots: Optional[List[Snapshot]] = None,
incremental: bool = True,
- statsd: Statsd = None,
+ statsd: Optional[Statsd] = None,
+ **kwargs: Any, # extra kwargs are just ignored
):
self.storage = storage
self.incremental = incremental
@@ -71,10 +80,9 @@
heads_logger.debug(" %r: %s", branch_name, branch.target.hex())
self.local_heads.add(HexBytes(hashutil.hash_to_bytehex(branch.target)))
- def graph_walker(self) -> ObjectStoreGraphWalker:
- return ObjectStoreGraphWalker(self.local_heads, get_parents=lambda commit: [])
+ self.walker = ObjectStoreGraphWalker(self.local_heads, lambda commit: [])
- def determine_wants(self, refs: Dict[bytes, HexBytes]) -> List[HexBytes]:
+ def compute_wanted_refs(self, refs: Dict[bytes, HexBytes]) -> List[HexBytes]:
"""Get the list of bytehex sha1s that the git loader should fetch.
This compares the remote refs sent by the server with the base snapshot
@@ -96,6 +104,11 @@
continue
remote_heads.add(ref_target)
+ if heads_logger.isEnabledFor(logging.DEBUG):
+ heads_logger.debug("Filtered remote heads:")
+ for value in remote_heads:
+ heads_logger.debug(" %s", value.decode())
+
logger.debug("local_heads_count=%s", len(self.local_heads))
logger.debug("remote_heads_count=%s", len(remote_heads))
wanted_refs = list(remote_heads - self.local_heads)
@@ -113,6 +126,107 @@
)
return wanted_refs
+ def confinue_fetch_refs(self) -> bool:
+ """Determine whether we are done fetching all refs."""
+ return False
+
+ def determine_wants(
+ self, refs: Dict[bytes, HexBytes], depth=None
+ ) -> List[HexBytes]:
+ """Get the list of bytehex sha1s that the git loader should fetch.
+
+ This compares the remote refs sent by the server with the base snapshot
+ provided by the loader.
+
+ """
+ raise NotImplementedError
+
+
+class RepoRepresentation(BaseRepoRepresentation):
+ """A RepoRepresentation object able to provide all refs to fetch.
+
+ Internally, this computes the full list of wanted_refs and returns it the first time
+ :meth:`determine_wants` method is called. It's expected to be called once. The
+ caller has then all the necessary refs to retrieve the packfile.
+
+ """
+
+ def determine_wants(
+ self, refs: Dict[bytes, HexBytes], depth=None
+ ) -> List[HexBytes]:
+ """Get the list of bytehex sha1s that the git loader should fetch.
+
+ This compares the remote refs sent by the server with the base snapshot
+ provided by the loader.
+
+ """
+ return self.compute_wanted_refs(refs)
+
+
+class RepoRepresentationPaginated(BaseRepoRepresentation):
+ """A RepoRepresentation objects able to provide interval of refs to fetch.
+
+ Internally, this computes the full list of wanted_refs but then provide interval of
+ number_of_heads_per_packfile refs each time :meth:`determine_wants` method is
+ called. This expects the caller to call the :meth:`continue_fetch_refs` method to
+ determine if more refs are needed to be fetched or not.
+
+ """
+
+ def __init__(
+ self,
+ *args,
+ number_of_heads_per_packfile=DEFAULT_NUMBER_OF_HEADS_PER_PACKFILE,
+ **kwargs,
+ ):
+ super().__init__(*args, **kwargs)
+ # Pagination index
+ self.index: int = 0
+ self.number_of_heads_per_packfile = number_of_heads_per_packfile
+ self.wanted_refs: Optional[List[HexBytes]] = None
+ self.previous_refs: List[HexBytes] = []
+
+ def confinue_fetch_refs(self) -> bool:
+ """Determine whether we need to fetch other refs or not."""
+ return self.wanted_refs is None or self.index < len(self.wanted_refs)
+
+ def determine_wants(
+ self, refs: Dict[bytes, HexBytes], depth=None
+ ) -> List[HexBytes]:
+ """Get the list of bytehex sha1s that the git loader should fetch.
+
+ This compares the remote refs sent by the server with the base snapshot
+ provided by the loader.
+
+ """
+ # First time around, we'll initialize all the wanted refs
+ if not self.wanted_refs:
+ self.wanted_refs = self.compute_wanted_refs(refs)
+
+ # If empty, then we are done
+ if not self.wanted_refs:
+ return []
+
+ # We have all wanted refs but we are ingesting them one interval of
+ # number_of_heads_per_packfile refs at a time
+ start = self.index
+ self.index += self.number_of_heads_per_packfile
+
+ assert self.wanted_refs
+ asked_refs = self.wanted_refs[start : min(self.index, len(self.wanted_refs))]
+ if heads_logger.isEnabledFor(logging.DEBUG):
+ heads_logger.debug("Asked remote heads:")
+ for value in asked_refs:
+ heads_logger.debug(" %s", value.decode())
+
+ if start > 0:
+ # Previous refs was already walked so we can remove them from the next walk
+ # iteration to avoid processing them again
+ self.walker.heads.update(self.previous_refs)
+ self.previous_refs = asked_refs
+ logger.debug("asked_refs_count=%s", len(asked_refs))
+ return asked_refs
+
@dataclass
class FetchPackReturn:
@@ -120,6 +234,8 @@
symbolic_refs: Dict[bytes, HexBytes]
pack_buffer: SpooledTemporaryFile
pack_size: int
+ confinue_fetch_refs: bool
+ """Determine whether we still have to fetch remaining references."""
class GitLoader(BaseGitLoader):
@@ -153,52 +269,78 @@
storage: StorageInterface,
url: str,
incremental: bool = True,
- repo_representation: Type[RepoRepresentation] = RepoRepresentation,
pack_size_bytes: int = 4 * 1024 * 1024 * 1024,
temp_file_cutoff: int = 100 * 1024 * 1024,
+ fetch_multiple_packfiles: bool = False,
+ number_of_heads_per_packfile: int = DEFAULT_NUMBER_OF_HEADS_PER_PACKFILE,
**kwargs: Any,
):
"""Initialize the bulk updater.
Args:
- repo_representation: swh's repository representation
- which is in charge of filtering between known and remote
- data.
- ...
-
incremental: If True, the default, this starts from the last known snapshot
(if any) references. Otherwise, this loads the full repository.
+ fetch_multiple_packfiles: If True, this ingests the repository using
+ (internally) multiple packfiles (creating partial incremental snapshots
+ along the way). When False, the default, this uses the existing ingestion
+ policy of retrieving one packfile to ingest.
+ number_of_heads_per_packfile: When fetch_multiple_packfiles is used, this
+ splits packfiles per a given number of heads (no guarantee on the used
+ size).
"""
super().__init__(storage=storage, origin_url=url, **kwargs)
+ # check if repository only supports git dumb transfer protocol,
+ # fetched pack file will be empty in that case as dulwich do
+ # not support it and do not fetch any refs
+ logger.debug("Transport url to communicate with server: %s", url)
+ self.client, self.path = dulwich.client.get_transport_and_path(
+ url, thin_packs=False
+ )
+ logger.debug("Client %s to fetch pack at %s", self.client, self.path)
+ self.dumb = url.startswith("http") and getattr(self.client, "dumb", False)
+ self.fetch_multiple_packfiles = fetch_multiple_packfiles
self.incremental = incremental
- self.repo_representation = repo_representation
self.pack_size_bytes = pack_size_bytes
self.temp_file_cutoff = temp_file_cutoff
# state initialized in fetch_data
self.remote_refs: Dict[bytes, HexBytes] = {}
self.symbolic_refs: Dict[bytes, HexBytes] = {}
self.ref_object_types: Dict[bytes, Optional[TargetType]] = {}
+ self.configure_packfile_fetching_policy(
+ fetch_multiple_packfiles, number_of_heads_per_packfile
+ )
+
+ def configure_packfile_fetching_policy(
+ self, fetch_multiple_packfiles: bool, number_of_heads_per_packfile: int
+ ):
+ """Configure the packfile fetching policy. The default is to fetch one packfile
+ to ingest everything unknown out of it. When fetch_multiple_packfiles is True
+ (and the ingestion passes through the smart protocol), the ingestion uses
+ packfiles (with a given number_of_heads_per_packfile). After each packfile is
+ loaded, a 'partial' (because incomplete) and 'incremental' (as in gathering seen
+ refs) so far snapshot is created (incremental).
+
+ """
+ # will create partial snapshot alongside fetching multiple packfiles (when the
+ # transfer protocol is not the 'dumb' one)
+ self.create_partial_snapshot = not self.dumb and fetch_multiple_packfiles
+ self.number_of_heads_per_packfile: Optional[int] = None
+ self.repo_representation: Type[BaseRepoRepresentation]
+ if self.create_partial_snapshot:
+ self.repo_representation = RepoRepresentationPaginated
+ self.number_of_heads_per_packfile = number_of_heads_per_packfile
+ else:
+ self.repo_representation = RepoRepresentation
def fetch_pack_from_origin(
self,
origin_url: str,
- base_repo: RepoRepresentation,
do_activity: Callable[[bytes], None],
) -> FetchPackReturn:
"""Fetch a pack from the origin"""
pack_buffer = SpooledTemporaryFile(max_size=self.temp_file_cutoff)
- transport_url = origin_url
-
- logger.debug("Transport url to communicate with server: %s", transport_url)
-
- client, path = dulwich.client.get_transport_and_path(
- transport_url, thin_packs=False
- )
-
- logger.debug("Client %s to fetch pack at %s", client, path)
-
size_limit = self.pack_size_bytes
def do_pack(data: bytes) -> None:
@@ -207,16 +349,17 @@
if cur_size + would_write > size_limit:
raise IOError(
f"Pack file too big for repository {origin_url}, "
- f"limit is {size_limit} bytes, current size is {cur_size}, "
+ f"number_of_heads_per_packfile is {size_limit} bytes, "
+ f"current size is {cur_size}, "
f"would write {would_write}"
)
pack_buffer.write(data)
- pack_result = client.fetch_pack(
- path,
- base_repo.determine_wants,
- base_repo.graph_walker(),
+ pack_result = self.client.fetch_pack(
+ self.path,
+ self.base_repo.determine_wants,
+ self.base_repo.walker,
do_pack,
progress=do_activity,
)
@@ -230,16 +373,12 @@
logger.debug("fetched_pack_size=%s", pack_size)
- # check if repository only supports git dumb transfer protocol,
- # fetched pack file will be empty in that case as dulwich do
- # not support it and do not fetch any refs
- self.dumb = transport_url.startswith("http") and getattr(client, "dumb", False)
-
return FetchPackReturn(
- remote_refs=utils.filter_refs(remote_refs),
- symbolic_refs=utils.filter_refs(symbolic_refs),
+ remote_refs=remote_refs,
+ symbolic_refs=symbolic_refs,
pack_buffer=pack_buffer,
pack_size=pack_size,
+ confinue_fetch_refs=self.base_repo.confinue_fetch_refs(),
)
def get_full_snapshot(self, origin_url) -> Optional[Snapshot]:
@@ -280,24 +419,23 @@
# count how many runs of the loader are with each incremental mode
self.statsd.increment("git_total", tags={})
- def fetch_data(self) -> bool:
- assert self.origin is not None
-
- base_repo = self.repo_representation(
+ self.base_repo = self.repo_representation(
storage=self.storage,
base_snapshots=self.base_snapshots,
incremental=self.incremental,
statsd=self.statsd,
+ # Only used when self.repo_representation is RepoRepresentationpaginated,
+ # ignored otherwise
+ number_of_heads_per_packfile=self.number_of_heads_per_packfile,
)
- def do_progress(msg: bytes) -> None:
- sys.stderr.buffer.write(msg)
- sys.stderr.flush()
+ def fetch_data(self) -> bool:
+ continue_fetch_refs = False
+ assert self.origin is not None
try:
- fetch_info = self.fetch_pack_from_origin(
- self.origin.url, base_repo, do_progress
- )
+ fetch_info = self.fetch_pack_from_origin(self.origin.url, do_print_progress)
+ continue_fetch_refs = fetch_info.confinue_fetch_refs
except (dulwich.client.HTTPUnauthorized, NotGitRepository) as e:
raise NotFound(e)
except GitProtocolError as e:
@@ -321,21 +459,29 @@
if not self.dumb:
raise
- logger.debug(
- "Protocol used for communication: %s", "dumb" if self.dumb else "smart"
- )
if self.dumb:
- self.dumb_fetcher = dumb.GitObjectsFetcher(self.origin.url, base_repo)
+ protocol = "dumb"
+ self.dumb_fetcher = dumb.GitObjectsFetcher(self.origin.url, self.base_repo)
self.dumb_fetcher.fetch_object_ids()
- self.remote_refs = utils.filter_refs(self.dumb_fetcher.refs)
- self.symbolic_refs = utils.filter_refs(self.dumb_fetcher.head)
+ remote_refs = self.dumb_fetcher.refs
+ symbolic_refs = self.dumb_fetcher.head
else:
+ protocol = "smart"
self.pack_buffer = fetch_info.pack_buffer
self.pack_size = fetch_info.pack_size
- self.remote_refs = fetch_info.remote_refs
- self.symbolic_refs = fetch_info.symbolic_refs
+ remote_refs = fetch_info.remote_refs
+ symbolic_refs = fetch_info.symbolic_refs
+
+ logger.debug("Protocol used for communication: %s", protocol)
- self.ref_object_types = {sha1: None for sha1 in self.remote_refs.values()}
+ # So the partial snapshot and the final ones creates the full branches
+ self.remote_refs.update(utils.filter_refs(remote_refs))
+ self.symbolic_refs.update(utils.filter_refs(symbolic_refs))
+
+ for sha1 in self.remote_refs.values():
+ if sha1 in self.ref_object_types:
+ continue
+ self.ref_object_types[sha1] = None
logger.info(
"Listed %d refs for repo %s",
@@ -348,8 +494,7 @@
},
)
- # No more data to fetch
- return False
+ return continue_fetch_refs
def save_data(self) -> None:
"""Store a pack for archival"""
@@ -374,6 +519,20 @@
with open(os.path.join(pack_dir, refs_name), "xb") as f:
pickle.dump(self.remote_refs, f)
+ def build_partial_snapshot(self) -> Optional[Snapshot]:
+ # Current implementation makes it a simple call to existing :meth:`get_snapshot`
+ return self.get_snapshot()
+
+ def store_data(self):
+ """Override the default implementation so we make sure to close the pack_buffer
+ if we use one in between loop (dumb loader does not actually one for example).
+
+ """
+ super().store_data()
+
+ if not self.dumb:
+ self.pack_buffer.close()
+
def iter_objects(self, object_type: bytes) -> Iterator[ShaFile]:
"""Read all the objects of type `object_type` from the packfile"""
if self.dumb:
@@ -538,18 +697,18 @@
if not targets_unknown:
break
- if unknown_objects:
- # This object was referenced by the server; We did not fetch
- # it, and we do not know it from the previous snapshot. This is
- # likely a bug in the loader.
- raise RuntimeError(
- "Unknown objects referenced by remote refs: %s"
- % (
+ if unknown_objects and not self.create_partial_snapshot:
+ # Let's warn about dangling object when loading full packfiles. The
+ # following objects were referenced by the server but we did not fetch
+ # it, and we do not know it from the previous snapshot.
+ logger.warning(
+ "Unknown objects referenced by remote refs: %s",
+ (
", ".join(
f"{name.decode()}: {hashutil.hash_to_hex(obj)}"
for name, obj in unknown_objects.items()
)
- )
+ ),
)
utils.warn_dangling_branches(
diff --git a/swh/loader/git/tests/test_loader.py b/swh/loader/git/tests/test_loader.py
--- a/swh/loader/git/tests/test_loader.py
+++ b/swh/loader/git/tests/test_loader.py
@@ -28,6 +28,7 @@
prepare_repository_from_archive,
)
from swh.model.model import Origin, OriginVisit, OriginVisitStatus, Snapshot
+from swh.storage.algos.origin import origin_get_latest_visit_status
class CommonGitLoaderNotFound:
@@ -95,6 +96,70 @@
snapshot=None,
)
+ def test_load_with_multiple_packfiles_with_partial_snapshot(self):
+ """Ingesting repositories in multiple packfiles should be ok"""
+ # Make the loader retrieve multiple packfiles As it's already instantiated, we
+ # must post configure the loader the attributes ourselves
+ self.loader.configure_packfile_fetching_policy(
+ fetch_multiple_packfiles=True, number_of_heads_per_packfile=1
+ )
+
+ res = self.loader.load()
+
+ assert res == {"status": "eventful"}
+
+ stats = get_stats(self.loader.storage)
+ nb_snapshots = stats.pop("snapshot")
+ assert stats == {
+ "content": 4,
+ "directory": 7,
+ "origin": 1,
+ "origin_visit": 1,
+ "release": 0,
+ "revision": 7,
+ "skipped_content": 0,
+ }
+
+ # This nb of snapshots will depend on the packfile ingestion order (and the git
+ # graph connectivity). If loading starts with a set of ref more connected than
+ # the other refs, we'll end up having less snapshots. Invertly, starting with
+ # less connected refs will create more snapshots. Hence the following
+ # comparison.
+ assert 2 <= nb_snapshots <= 4
+
+ # So we end up with at least one partial visit status with a snapshot
+ partial_visit = origin_get_latest_visit_status(
+ self.loader.storage,
+ self.repo_url,
+ type="git",
+ allowed_statuses=["partial"],
+ )
+ assert partial_visit is not None
+ assert partial_visit.snapshot is not None
+
+ partial_snapshot = self.loader.storage.snapshot_get_branches(
+ partial_visit.snapshot
+ )
+
+ # In the end, we have the final full visit targeting the snapshot
+ visit_status = assert_last_visit_matches(
+ self.loader.storage,
+ self.repo_url,
+ status="full",
+ type="git",
+ )
+ assert visit_status.snapshot is not None
+
+ snapshot = self.loader.storage.snapshot_get_branches(visit_status.snapshot)
+
+ # the partial snapshot branches set is a subset of the branches of the final
+ # snapshot
+ partial_branches = partial_snapshot["branches"].keys()
+ snapshot_branches = snapshot["branches"].keys()
+ assert 0 < len(partial_branches) <= len(snapshot_branches)
+ for branch in partial_branches:
+ assert branch in snapshot_branches
+
class TestGitLoader(FullGitLoaderTests, CommonGitLoaderNotFound):
"""Prepare a git directory repository to be loaded through a GitLoader.
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Dec 17 2024, 11:35 AM (11 w, 6 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3217260
Attached To
D6386: git: Load git repository through multiple packfiles fetch operations
Event Timeline
Log In to Comment