Page MenuHomeSoftware Heritage

D6386.diff
No OneTemporary

D6386.diff

diff --git a/requirements-swh.txt b/requirements-swh.txt
--- a/requirements-swh.txt
+++ b/requirements-swh.txt
@@ -1,5 +1,5 @@
swh.core >= 0.0.7
-swh.loader.core >= 5.0.0
swh.model >= 4.3.0
swh.scheduler >= 0.0.39
swh.storage >= 0.22.0
+swh.loader.core @ git+https://forge.softwareheritage.org/source/staging.git@ad5c8f49c1dea00e95b895811df21e3c93a6667d#egg=swh.loader.core
diff --git a/swh/loader/git/dumb.py b/swh/loader/git/dumb.py
--- a/swh/loader/git/dumb.py
+++ b/swh/loader/git/dumb.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2021 The Software Heritage developers
+# Copyright (C) 2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
@@ -21,7 +21,7 @@
from swh.loader.git.utils import HexBytes
if TYPE_CHECKING:
- from .loader import RepoRepresentation
+ from .loader import BaseRepoRepresentation
logger = logging.getLogger(__name__)
@@ -69,7 +69,7 @@
base_repo: State of repository archived by Software Heritage
"""
- def __init__(self, repo_url: str, base_repo: RepoRepresentation):
+ def __init__(self, repo_url: str, base_repo: BaseRepoRepresentation):
self._session = requests.Session()
self.repo_url = repo_url
self.base_repo = base_repo
diff --git a/swh/loader/git/loader.py b/swh/loader/git/loader.py
--- a/swh/loader/git/loader.py
+++ b/swh/loader/git/loader.py
@@ -42,15 +42,24 @@
heads_logger = logger.getChild("refs")
-class RepoRepresentation:
+DEFAULT_NUMBER_OF_HEADS_PER_PACKFILE = 200
+
+
+def do_print_progress(msg: bytes) -> None:
+ sys.stderr.buffer.write(msg)
+ sys.stderr.flush()
+
+
+class BaseRepoRepresentation:
"""Repository representation for a Software Heritage origin."""
def __init__(
self,
storage,
- base_snapshots: List[Snapshot] = None,
+ base_snapshots: Optional[List[Snapshot]] = None,
incremental: bool = True,
- statsd: Statsd = None,
+ statsd: Optional[Statsd] = None,
+ **kwargs: Any, # extra kwargs are just ignored
):
self.storage = storage
self.incremental = incremental
@@ -71,10 +80,9 @@
heads_logger.debug(" %r: %s", branch_name, branch.target.hex())
self.local_heads.add(HexBytes(hashutil.hash_to_bytehex(branch.target)))
- def graph_walker(self) -> ObjectStoreGraphWalker:
- return ObjectStoreGraphWalker(self.local_heads, get_parents=lambda commit: [])
+ self.walker = ObjectStoreGraphWalker(self.local_heads, lambda commit: [])
- def determine_wants(self, refs: Dict[bytes, HexBytes]) -> List[HexBytes]:
+ def compute_wanted_refs(self, refs: Dict[bytes, HexBytes]) -> List[HexBytes]:
"""Get the list of bytehex sha1s that the git loader should fetch.
This compares the remote refs sent by the server with the base snapshot
@@ -96,6 +104,11 @@
continue
remote_heads.add(ref_target)
+ if heads_logger.isEnabledFor(logging.DEBUG):
+ heads_logger.debug("Filtered remote heads:")
+ for value in remote_heads:
+ heads_logger.debug(" %s", value.decode())
+
logger.debug("local_heads_count=%s", len(self.local_heads))
logger.debug("remote_heads_count=%s", len(remote_heads))
wanted_refs = list(remote_heads - self.local_heads)
@@ -113,6 +126,107 @@
)
return wanted_refs
+ def confinue_fetch_refs(self) -> bool:
+ """Determine whether we are done fetching all refs."""
+ return False
+
+ def determine_wants(
+ self, refs: Dict[bytes, HexBytes], depth=None
+ ) -> List[HexBytes]:
+ """Get the list of bytehex sha1s that the git loader should fetch.
+
+ This compares the remote refs sent by the server with the base snapshot
+ provided by the loader.
+
+ """
+ raise NotImplementedError
+
+
+class RepoRepresentation(BaseRepoRepresentation):
+ """A RepoRepresentation object able to provide all refs to fetch.
+
+ Internally, this computes the full list of wanted_refs and returns it the first time
+ :meth:`determine_wants` method is called. It's expected to be called once. The
+ caller has then all the necessary refs to retrieve the packfile.
+
+ """
+
+ def determine_wants(
+ self, refs: Dict[bytes, HexBytes], depth=None
+ ) -> List[HexBytes]:
+ """Get the list of bytehex sha1s that the git loader should fetch.
+
+ This compares the remote refs sent by the server with the base snapshot
+ provided by the loader.
+
+ """
+ return self.compute_wanted_refs(refs)
+
+
+class RepoRepresentationPaginated(BaseRepoRepresentation):
+ """A RepoRepresentation objects able to provide interval of refs to fetch.
+
+ Internally, this computes the full list of wanted_refs but then provide interval of
+ number_of_heads_per_packfile refs each time :meth:`determine_wants` method is
+ called. This expects the caller to call the :meth:`continue_fetch_refs` method to
+ determine if more refs are needed to be fetched or not.
+
+ """
+
+ def __init__(
+ self,
+ *args,
+ number_of_heads_per_packfile=DEFAULT_NUMBER_OF_HEADS_PER_PACKFILE,
+ **kwargs,
+ ):
+ super().__init__(*args, **kwargs)
+ # Pagination index
+ self.index: int = 0
+ self.number_of_heads_per_packfile = number_of_heads_per_packfile
+ self.wanted_refs: Optional[List[HexBytes]] = None
+ self.previous_refs: List[HexBytes] = []
+
+ def confinue_fetch_refs(self) -> bool:
+ """Determine whether we need to fetch other refs or not."""
+ return self.wanted_refs is None or self.index < len(self.wanted_refs)
+
+ def determine_wants(
+ self, refs: Dict[bytes, HexBytes], depth=None
+ ) -> List[HexBytes]:
+ """Get the list of bytehex sha1s that the git loader should fetch.
+
+ This compares the remote refs sent by the server with the base snapshot
+ provided by the loader.
+
+ """
+ # First time around, we'll initialize all the wanted refs
+ if not self.wanted_refs:
+ self.wanted_refs = self.compute_wanted_refs(refs)
+
+ # If empty, then we are done
+ if not self.wanted_refs:
+ return []
+
+ # We have all wanted refs but we are ingesting them one interval of
+ # number_of_heads_per_packfile refs at a time
+ start = self.index
+ self.index += self.number_of_heads_per_packfile
+
+ assert self.wanted_refs
+ asked_refs = self.wanted_refs[start : min(self.index, len(self.wanted_refs))]
+ if heads_logger.isEnabledFor(logging.DEBUG):
+ heads_logger.debug("Asked remote heads:")
+ for value in asked_refs:
+ heads_logger.debug(" %s", value.decode())
+
+ if start > 0:
+ # Previous refs was already walked so we can remove them from the next walk
+ # iteration to avoid processing them again
+ self.walker.heads.update(self.previous_refs)
+ self.previous_refs = asked_refs
+ logger.debug("asked_refs_count=%s", len(asked_refs))
+ return asked_refs
+
@dataclass
class FetchPackReturn:
@@ -120,6 +234,8 @@
symbolic_refs: Dict[bytes, HexBytes]
pack_buffer: SpooledTemporaryFile
pack_size: int
+ confinue_fetch_refs: bool
+ """Determine whether we still have to fetch remaining references."""
class GitLoader(BaseGitLoader):
@@ -153,52 +269,78 @@
storage: StorageInterface,
url: str,
incremental: bool = True,
- repo_representation: Type[RepoRepresentation] = RepoRepresentation,
pack_size_bytes: int = 4 * 1024 * 1024 * 1024,
temp_file_cutoff: int = 100 * 1024 * 1024,
+ fetch_multiple_packfiles: bool = False,
+ number_of_heads_per_packfile: int = DEFAULT_NUMBER_OF_HEADS_PER_PACKFILE,
**kwargs: Any,
):
"""Initialize the bulk updater.
Args:
- repo_representation: swh's repository representation
- which is in charge of filtering between known and remote
- data.
- ...
-
incremental: If True, the default, this starts from the last known snapshot
(if any) references. Otherwise, this loads the full repository.
+ fetch_multiple_packfiles: If True, this ingests the repository using
+ (internally) multiple packfiles (creating partial incremental snapshots
+ along the way). When False, the default, this uses the existing ingestion
+ policy of retrieving one packfile to ingest.
+ number_of_heads_per_packfile: When fetch_multiple_packfiles is used, this
+ splits packfiles per a given number of heads (no guarantee on the used
+ size).
"""
super().__init__(storage=storage, origin_url=url, **kwargs)
+ # check if repository only supports git dumb transfer protocol,
+ # fetched pack file will be empty in that case as dulwich do
+ # not support it and do not fetch any refs
+ logger.debug("Transport url to communicate with server: %s", url)
+ self.client, self.path = dulwich.client.get_transport_and_path(
+ url, thin_packs=False
+ )
+ logger.debug("Client %s to fetch pack at %s", self.client, self.path)
+ self.dumb = url.startswith("http") and getattr(self.client, "dumb", False)
+ self.fetch_multiple_packfiles = fetch_multiple_packfiles
self.incremental = incremental
- self.repo_representation = repo_representation
self.pack_size_bytes = pack_size_bytes
self.temp_file_cutoff = temp_file_cutoff
# state initialized in fetch_data
self.remote_refs: Dict[bytes, HexBytes] = {}
self.symbolic_refs: Dict[bytes, HexBytes] = {}
self.ref_object_types: Dict[bytes, Optional[TargetType]] = {}
+ self.configure_packfile_fetching_policy(
+ fetch_multiple_packfiles, number_of_heads_per_packfile
+ )
+
+ def configure_packfile_fetching_policy(
+ self, fetch_multiple_packfiles: bool, number_of_heads_per_packfile: int
+ ):
+ """Configure the packfile fetching policy. The default is to fetch one packfile
+ to ingest everything unknown out of it. When fetch_multiple_packfiles is True
+ (and the ingestion passes through the smart protocol), the ingestion uses
+ packfiles (with a given number_of_heads_per_packfile). After each packfile is
+ loaded, a 'partial' (because incomplete) and 'incremental' (as in gathering seen
+ refs) so far snapshot is created (incremental).
+
+ """
+ # will create partial snapshot alongside fetching multiple packfiles (when the
+ # transfer protocol is not the 'dumb' one)
+ self.create_partial_snapshot = not self.dumb and fetch_multiple_packfiles
+ self.number_of_heads_per_packfile: Optional[int] = None
+ self.repo_representation: Type[BaseRepoRepresentation]
+ if self.create_partial_snapshot:
+ self.repo_representation = RepoRepresentationPaginated
+ self.number_of_heads_per_packfile = number_of_heads_per_packfile
+ else:
+ self.repo_representation = RepoRepresentation
def fetch_pack_from_origin(
self,
origin_url: str,
- base_repo: RepoRepresentation,
do_activity: Callable[[bytes], None],
) -> FetchPackReturn:
"""Fetch a pack from the origin"""
pack_buffer = SpooledTemporaryFile(max_size=self.temp_file_cutoff)
- transport_url = origin_url
-
- logger.debug("Transport url to communicate with server: %s", transport_url)
-
- client, path = dulwich.client.get_transport_and_path(
- transport_url, thin_packs=False
- )
-
- logger.debug("Client %s to fetch pack at %s", client, path)
-
size_limit = self.pack_size_bytes
def do_pack(data: bytes) -> None:
@@ -207,16 +349,17 @@
if cur_size + would_write > size_limit:
raise IOError(
f"Pack file too big for repository {origin_url}, "
- f"limit is {size_limit} bytes, current size is {cur_size}, "
+ f"number_of_heads_per_packfile is {size_limit} bytes, "
+ f"current size is {cur_size}, "
f"would write {would_write}"
)
pack_buffer.write(data)
- pack_result = client.fetch_pack(
- path,
- base_repo.determine_wants,
- base_repo.graph_walker(),
+ pack_result = self.client.fetch_pack(
+ self.path,
+ self.base_repo.determine_wants,
+ self.base_repo.walker,
do_pack,
progress=do_activity,
)
@@ -230,16 +373,12 @@
logger.debug("fetched_pack_size=%s", pack_size)
- # check if repository only supports git dumb transfer protocol,
- # fetched pack file will be empty in that case as dulwich do
- # not support it and do not fetch any refs
- self.dumb = transport_url.startswith("http") and getattr(client, "dumb", False)
-
return FetchPackReturn(
- remote_refs=utils.filter_refs(remote_refs),
- symbolic_refs=utils.filter_refs(symbolic_refs),
+ remote_refs=remote_refs,
+ symbolic_refs=symbolic_refs,
pack_buffer=pack_buffer,
pack_size=pack_size,
+ confinue_fetch_refs=self.base_repo.confinue_fetch_refs(),
)
def get_full_snapshot(self, origin_url) -> Optional[Snapshot]:
@@ -280,24 +419,23 @@
# count how many runs of the loader are with each incremental mode
self.statsd.increment("git_total", tags={})
- def fetch_data(self) -> bool:
- assert self.origin is not None
-
- base_repo = self.repo_representation(
+ self.base_repo = self.repo_representation(
storage=self.storage,
base_snapshots=self.base_snapshots,
incremental=self.incremental,
statsd=self.statsd,
+ # Only used when self.repo_representation is RepoRepresentationpaginated,
+ # ignored otherwise
+ number_of_heads_per_packfile=self.number_of_heads_per_packfile,
)
- def do_progress(msg: bytes) -> None:
- sys.stderr.buffer.write(msg)
- sys.stderr.flush()
+ def fetch_data(self) -> bool:
+ continue_fetch_refs = False
+ assert self.origin is not None
try:
- fetch_info = self.fetch_pack_from_origin(
- self.origin.url, base_repo, do_progress
- )
+ fetch_info = self.fetch_pack_from_origin(self.origin.url, do_print_progress)
+ continue_fetch_refs = fetch_info.confinue_fetch_refs
except (dulwich.client.HTTPUnauthorized, NotGitRepository) as e:
raise NotFound(e)
except GitProtocolError as e:
@@ -321,21 +459,29 @@
if not self.dumb:
raise
- logger.debug(
- "Protocol used for communication: %s", "dumb" if self.dumb else "smart"
- )
if self.dumb:
- self.dumb_fetcher = dumb.GitObjectsFetcher(self.origin.url, base_repo)
+ protocol = "dumb"
+ self.dumb_fetcher = dumb.GitObjectsFetcher(self.origin.url, self.base_repo)
self.dumb_fetcher.fetch_object_ids()
- self.remote_refs = utils.filter_refs(self.dumb_fetcher.refs)
- self.symbolic_refs = utils.filter_refs(self.dumb_fetcher.head)
+ remote_refs = self.dumb_fetcher.refs
+ symbolic_refs = self.dumb_fetcher.head
else:
+ protocol = "smart"
self.pack_buffer = fetch_info.pack_buffer
self.pack_size = fetch_info.pack_size
- self.remote_refs = fetch_info.remote_refs
- self.symbolic_refs = fetch_info.symbolic_refs
+ remote_refs = fetch_info.remote_refs
+ symbolic_refs = fetch_info.symbolic_refs
+
+ logger.debug("Protocol used for communication: %s", protocol)
- self.ref_object_types = {sha1: None for sha1 in self.remote_refs.values()}
+ # So the partial snapshot and the final ones creates the full branches
+ self.remote_refs.update(utils.filter_refs(remote_refs))
+ self.symbolic_refs.update(utils.filter_refs(symbolic_refs))
+
+ for sha1 in self.remote_refs.values():
+ if sha1 in self.ref_object_types:
+ continue
+ self.ref_object_types[sha1] = None
logger.info(
"Listed %d refs for repo %s",
@@ -348,8 +494,7 @@
},
)
- # No more data to fetch
- return False
+ return continue_fetch_refs
def save_data(self) -> None:
"""Store a pack for archival"""
@@ -374,6 +519,20 @@
with open(os.path.join(pack_dir, refs_name), "xb") as f:
pickle.dump(self.remote_refs, f)
+ def build_partial_snapshot(self) -> Optional[Snapshot]:
+ # Current implementation makes it a simple call to existing :meth:`get_snapshot`
+ return self.get_snapshot()
+
+ def store_data(self):
+ """Override the default implementation so we make sure to close the pack_buffer
+ if we use one in between loop (dumb loader does not actually one for example).
+
+ """
+ super().store_data()
+
+ if not self.dumb:
+ self.pack_buffer.close()
+
def iter_objects(self, object_type: bytes) -> Iterator[ShaFile]:
"""Read all the objects of type `object_type` from the packfile"""
if self.dumb:
@@ -538,18 +697,18 @@
if not targets_unknown:
break
- if unknown_objects:
- # This object was referenced by the server; We did not fetch
- # it, and we do not know it from the previous snapshot. This is
- # likely a bug in the loader.
- raise RuntimeError(
- "Unknown objects referenced by remote refs: %s"
- % (
+ if unknown_objects and not self.create_partial_snapshot:
+ # Let's warn about dangling object when loading full packfiles. The
+ # following objects were referenced by the server but we did not fetch
+ # it, and we do not know it from the previous snapshot.
+ logger.warning(
+ "Unknown objects referenced by remote refs: %s",
+ (
", ".join(
f"{name.decode()}: {hashutil.hash_to_hex(obj)}"
for name, obj in unknown_objects.items()
)
- )
+ ),
)
utils.warn_dangling_branches(
diff --git a/swh/loader/git/tests/test_loader.py b/swh/loader/git/tests/test_loader.py
--- a/swh/loader/git/tests/test_loader.py
+++ b/swh/loader/git/tests/test_loader.py
@@ -28,6 +28,7 @@
prepare_repository_from_archive,
)
from swh.model.model import Origin, OriginVisit, OriginVisitStatus, Snapshot
+from swh.storage.algos.origin import origin_get_latest_visit_status
class CommonGitLoaderNotFound:
@@ -95,6 +96,70 @@
snapshot=None,
)
+ def test_load_with_multiple_packfiles_with_partial_snapshot(self):
+ """Ingesting repositories in multiple packfiles should be ok"""
+ # Make the loader retrieve multiple packfiles As it's already instantiated, we
+ # must post configure the loader the attributes ourselves
+ self.loader.configure_packfile_fetching_policy(
+ fetch_multiple_packfiles=True, number_of_heads_per_packfile=1
+ )
+
+ res = self.loader.load()
+
+ assert res == {"status": "eventful"}
+
+ stats = get_stats(self.loader.storage)
+ nb_snapshots = stats.pop("snapshot")
+ assert stats == {
+ "content": 4,
+ "directory": 7,
+ "origin": 1,
+ "origin_visit": 1,
+ "release": 0,
+ "revision": 7,
+ "skipped_content": 0,
+ }
+
+ # This nb of snapshots will depend on the packfile ingestion order (and the git
+ # graph connectivity). If loading starts with a set of ref more connected than
+ # the other refs, we'll end up having less snapshots. Invertly, starting with
+ # less connected refs will create more snapshots. Hence the following
+ # comparison.
+ assert 2 <= nb_snapshots <= 4
+
+ # So we end up with at least one partial visit status with a snapshot
+ partial_visit = origin_get_latest_visit_status(
+ self.loader.storage,
+ self.repo_url,
+ type="git",
+ allowed_statuses=["partial"],
+ )
+ assert partial_visit is not None
+ assert partial_visit.snapshot is not None
+
+ partial_snapshot = self.loader.storage.snapshot_get_branches(
+ partial_visit.snapshot
+ )
+
+ # In the end, we have the final full visit targeting the snapshot
+ visit_status = assert_last_visit_matches(
+ self.loader.storage,
+ self.repo_url,
+ status="full",
+ type="git",
+ )
+ assert visit_status.snapshot is not None
+
+ snapshot = self.loader.storage.snapshot_get_branches(visit_status.snapshot)
+
+ # the partial snapshot branches set is a subset of the branches of the final
+ # snapshot
+ partial_branches = partial_snapshot["branches"].keys()
+ snapshot_branches = snapshot["branches"].keys()
+ assert 0 < len(partial_branches) <= len(snapshot_branches)
+ for branch in partial_branches:
+ assert branch in snapshot_branches
+
class TestGitLoader(FullGitLoaderTests, CommonGitLoaderNotFound):
"""Prepare a git directory repository to be loaded through a GitLoader.

File Metadata

Mime Type
text/plain
Expires
Dec 17 2024, 11:35 AM (11 w, 6 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3217260

Event Timeline