diff --git a/requirements-swh.txt b/requirements-swh.txt index 159555c..523fb7f 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,5 +1,5 @@ swh.core >= 0.0.7 -swh.loader.core >= 0.17.0 +swh.loader.core >= 0.18.0 swh.model >= 0.4.0 swh.scheduler >= 0.0.39 swh.storage >= 0.22.0 diff --git a/swh/loader/git/from_disk.py b/swh/loader/git/from_disk.py index 043876f..1ad671b 100644 --- a/swh/loader/git/from_disk.py +++ b/swh/loader/git/from_disk.py @@ -1,389 +1,396 @@ -# Copyright (C) 2015-2020 The Software Heritage developers +# Copyright (C) 2015-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information - from collections import defaultdict +from datetime import datetime import os import shutil -from typing import Any, Dict, Optional +from typing import Dict, Optional from dulwich.errors import ObjectFormatException try: from dulwich.errors import EmptyFileException # type: ignore except ImportError: # dulwich >= 0.20 from dulwich.objects import EmptyFileException import dulwich.repo from swh.loader.core.loader import DVCSLoader from swh.model import hashutil from swh.model.model import Origin, Snapshot, SnapshotBranch, TargetType from swh.storage.algos.origin import origin_get_latest_visit_status +from swh.storage.interface import StorageInterface from . import converters, utils class GitLoaderFromDisk(DVCSLoader): """Load a git repository from a directory. """ visit_type = "git" def __init__( self, - url, - visit_date=None, - directory=None, - config: Optional[Dict[str, Any]] = None, + storage: StorageInterface, + url: str, + visit_date: Optional[datetime] = None, + directory: Optional[str] = None, + save_data_path: Optional[str] = None, + max_content_size: Optional[int] = None, ): - super().__init__(logging_class="swh.loader.git.Loader", config=config) + super().__init__( + storage=storage, + save_data_path=save_data_path, + max_content_size=max_content_size, + ) self.origin_url = url self.visit_date = visit_date self.directory = directory - def prepare_origin_visit(self, *args, **kwargs): + def prepare_origin_visit(self): self.origin = Origin(url=self.origin_url) - def prepare(self, *args, **kwargs): + def prepare(self): self.repo = dulwich.repo.Repo(self.directory) def iter_objects(self): object_store = self.repo.object_store for pack in object_store.packs: objs = list(pack.index.iterentries()) objs.sort(key=lambda x: x[1]) for sha, offset, crc32 in objs: yield hashutil.hash_to_bytehex(sha) yield from object_store._iter_loose_objects() yield from object_store._iter_alternate_objects() def _check(self, obj): """Check the object's repository representation. If any errors in check exists, an ObjectFormatException is raised. Args: obj (object): Dulwich object read from the repository. """ obj.check() from dulwich.objects import Commit, Tag try: # For additional checks on dulwich objects with date # for now, only checks on *time if isinstance(obj, Commit): commit_time = obj._commit_time utils.check_date_time(commit_time) author_time = obj._author_time utils.check_date_time(author_time) elif isinstance(obj, Tag): tag_time = obj._tag_time utils.check_date_time(tag_time) except Exception as e: raise ObjectFormatException(e) def get_object(self, oid): """Given an object id, return the object if it is found and not malformed in some way. Args: oid (bytes): the object's identifier Returns: The object if found without malformation """ try: # some errors are raised when reading the object obj = self.repo[oid] # some we need to check ourselves self._check(obj) except KeyError: _id = oid.decode("utf-8") self.log.warn( "object %s not found, skipping" % _id, extra={ "swh_type": "swh_loader_git_missing_object", "swh_object_id": _id, "origin_url": self.origin.url, }, ) return None except ObjectFormatException: _id = oid.decode("utf-8") self.log.warn( "object %s malformed, skipping" % _id, extra={ "swh_type": "swh_loader_git_missing_object", "swh_object_id": _id, "origin_url": self.origin.url, }, ) return None except EmptyFileException: _id = oid.decode("utf-8") self.log.warn( "object %s corrupted (empty file), skipping" % _id, extra={ "swh_type": "swh_loader_git_missing_object", "swh_object_id": _id, "origin_url": self.origin.url, }, ) else: return obj def fetch_data(self): """Fetch the data from the data source""" visit_status = origin_get_latest_visit_status( self.storage, self.origin_url, require_snapshot=True ) self.previous_snapshot_id = ( None if visit_status is None else visit_status.snapshot ) type_to_ids = defaultdict(list) for oid in self.iter_objects(): obj = self.get_object(oid) if not obj: continue type_name = obj.type_name type_to_ids[type_name].append(oid) self.type_to_ids = type_to_ids def has_contents(self): """Checks whether we need to load contents""" return bool(self.type_to_ids[b"blob"]) def get_content_ids(self): """Get the content identifiers from the git repository""" for oid in self.type_to_ids[b"blob"]: yield converters.dulwich_blob_to_content_id(self.repo[oid]) def get_contents(self): """Get the contents that need to be loaded""" missing_contents = set( self.storage.content_missing(self.get_content_ids(), "sha1_git") ) for oid in missing_contents: yield converters.dulwich_blob_to_content( self.repo[hashutil.hash_to_bytehex(oid)] ) def has_directories(self): """Checks whether we need to load directories""" return bool(self.type_to_ids[b"tree"]) def get_directory_ids(self): """Get the directory identifiers from the git repository""" return (hashutil.hash_to_bytes(id.decode()) for id in self.type_to_ids[b"tree"]) def get_directories(self): """Get the directories that need to be loaded""" missing_dirs = set( self.storage.directory_missing(sorted(self.get_directory_ids())) ) for oid in missing_dirs: yield converters.dulwich_tree_to_directory( self.repo[hashutil.hash_to_bytehex(oid)], log=self.log ) def has_revisions(self): """Checks whether we need to load revisions""" return bool(self.type_to_ids[b"commit"]) def get_revision_ids(self): """Get the revision identifiers from the git repository""" return ( hashutil.hash_to_bytes(id.decode()) for id in self.type_to_ids[b"commit"] ) def get_revisions(self): """Get the revisions that need to be loaded""" missing_revs = set( self.storage.revision_missing(sorted(self.get_revision_ids())) ) for oid in missing_revs: yield converters.dulwich_commit_to_revision( self.repo[hashutil.hash_to_bytehex(oid)], log=self.log ) def has_releases(self): """Checks whether we need to load releases""" return bool(self.type_to_ids[b"tag"]) def get_release_ids(self): """Get the release identifiers from the git repository""" return (hashutil.hash_to_bytes(id.decode()) for id in self.type_to_ids[b"tag"]) def get_releases(self): """Get the releases that need to be loaded""" missing_rels = set(self.storage.release_missing(sorted(self.get_release_ids()))) for oid in missing_rels: yield converters.dulwich_tag_to_release( self.repo[hashutil.hash_to_bytehex(oid)], log=self.log ) def get_snapshot(self): """Turn the list of branches into a snapshot to load""" branches: Dict[bytes, Optional[SnapshotBranch]] = {} for ref, target in self.repo.refs.as_dict().items(): if utils.ignore_branch_name(ref): continue obj = self.get_object(target) if obj: target_type = converters.DULWICH_TARGET_TYPES[obj.type_name] branches[ref] = SnapshotBranch( target=hashutil.bytehex_to_hash(target), target_type=target_type, ) else: branches[ref] = None dangling_branches = {} for ref, target in self.repo.refs.get_symrefs().items(): if utils.ignore_branch_name(ref): continue branches[ref] = SnapshotBranch(target=target, target_type=TargetType.ALIAS) if target not in branches: # This handles the case where the pointer is "dangling". # There's a chance that a further symbolic reference will # override this default value, which is totally fine. dangling_branches[target] = ref branches[target] = None utils.warn_dangling_branches( branches, dangling_branches, self.log, self.origin_url ) self.snapshot = Snapshot(branches=branches) return self.snapshot def get_fetch_history_result(self): """Return the data to store in fetch_history for the current loader""" return { "contents": len(self.type_to_ids[b"blob"]), "directories": len(self.type_to_ids[b"tree"]), "revisions": len(self.type_to_ids[b"commit"]), "releases": len(self.type_to_ids[b"tag"]), } def save_data(self): """We already have the data locally, no need to save it""" pass def load_status(self): """The load was eventful if the current occurrences are different to the ones we retrieved at the beginning of the run""" eventful = False if self.previous_snapshot_id: eventful = self.snapshot.id != self.previous_snapshot_id else: eventful = bool(self.snapshot.branches) return {"status": ("eventful" if eventful else "uneventful")} class GitLoaderFromArchive(GitLoaderFromDisk): """Load a git repository from an archive. This loader ingests a git repository compressed into an archive. The supported archive formats are ``.zip`` and ``.tar.gz``. From an input tarball named ``my-git-repo.zip``, the following layout is expected in it:: my-git-repo/ ├── .git │ ├── branches │ ├── COMMIT_EDITMSG │ ├── config │ ├── description │ ├── HEAD ... Nevertheless, the loader is able to ingest tarballs with the following layouts too:: . ├── .git │ ├── branches │ ├── COMMIT_EDITMSG │ ├── config │ ├── description │ ├── HEAD ... or:: other-repo-name/ ├── .git │ ├── branches │ ├── COMMIT_EDITMSG │ ├── config │ ├── description │ ├── HEAD ... """ def __init__(self, *args, archive_path, **kwargs): super().__init__(*args, **kwargs) self.temp_dir = self.repo_path = None self.archive_path = archive_path def project_name_from_archive(self, archive_path): """Compute the project name from the archive's path. """ archive_name = os.path.basename(archive_path) for ext in (".zip", ".tar.gz", ".tgz"): if archive_name.lower().endswith(ext): archive_name = archive_name[: -len(ext)] break return archive_name - def prepare(self, *args, **kwargs): + def prepare(self): """1. Uncompress the archive in temporary location. 2. Prepare as the GitLoaderFromDisk does 3. Load as GitLoaderFromDisk does """ project_name = self.project_name_from_archive(self.archive_path) self.temp_dir, self.repo_path = utils.init_git_repo_from_archive( project_name, self.archive_path ) self.log.info( "Project %s - Uncompressing archive %s at %s", self.origin_url, os.path.basename(self.archive_path), self.repo_path, ) self.directory = self.repo_path - super().prepare(*args, **kwargs) + super().prepare() def cleanup(self): """Cleanup the temporary location (if it exists). """ if self.temp_dir and os.path.exists(self.temp_dir): shutil.rmtree(self.temp_dir) self.log.info( "Project %s - Done injecting %s" % (self.origin_url, self.repo_path) ) diff --git a/swh/loader/git/loader.py b/swh/loader/git/loader.py index 61683a4..3102d2e 100644 --- a/swh/loader/git/loader.py +++ b/swh/loader/git/loader.py @@ -1,536 +1,540 @@ # Copyright (C) 2016-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict from dataclasses import dataclass import datetime from io import BytesIO import logging import os import pickle import sys from typing import Any, Callable, Dict, Iterable, List, Optional, Set, Tuple, Type import dulwich.client from dulwich.errors import GitProtocolError, NotGitRepository from dulwich.object_store import ObjectStoreGraphWalker from dulwich.pack import PackData, PackInflater -from swh.core.config import merge_configs from swh.loader.core.loader import DVCSLoader from swh.loader.exception import NotFound from swh.model import hashutil from swh.model.model import ( BaseContent, Directory, Origin, Release, Revision, Sha1Git, Snapshot, SnapshotBranch, TargetType, ) from swh.storage.algos.snapshot import snapshot_get_latest +from swh.storage.interface import StorageInterface from . import converters, utils class RepoRepresentation: """Repository representation for a Software Heritage origin.""" def __init__( self, storage, base_snapshot: Optional[Snapshot] = None, ignore_history=False ): self.storage = storage self.ignore_history = ignore_history if base_snapshot and not ignore_history: self.base_snapshot: Snapshot = base_snapshot else: self.base_snapshot = Snapshot(branches={}) self.heads: Set[bytes] = set() def get_parents(self, commit: bytes) -> List[bytes]: """This method should return the list of known parents""" return [] def graph_walker(self) -> ObjectStoreGraphWalker: return ObjectStoreGraphWalker(self.heads, self.get_parents) def determine_wants(self, refs: Dict[bytes, bytes]) -> List[bytes]: """Get the list of bytehex sha1s that the git loader should fetch. This compares the remote refs sent by the server with the base snapshot provided by the loader. """ if not refs: return [] # Cache existing heads local_heads: Set[bytes] = set() for branch_name, branch in self.base_snapshot.branches.items(): if not branch or branch.target_type == TargetType.ALIAS: continue local_heads.add(hashutil.hash_to_hex(branch.target).encode()) self.heads = local_heads # Get the remote heads that we want to fetch remote_heads: Set[bytes] = set() for ref_name, ref_target in refs.items(): if utils.ignore_branch_name(ref_name): continue remote_heads.add(ref_target) return list(remote_heads - local_heads) @dataclass class FetchPackReturn: remote_refs: Dict[bytes, bytes] symbolic_refs: Dict[bytes, bytes] pack_buffer: BytesIO pack_size: int -DEFAULT_CONFIG: Dict[str, Any] = { - "pack_size_bytes": 4 * 1024 * 1024 * 1024, -} - - class GitLoader(DVCSLoader): """A bulk loader for a git repository""" visit_type = "git" def __init__( self, + storage: StorageInterface, url: str, base_url: Optional[str] = None, ignore_history: bool = False, repo_representation: Type[RepoRepresentation] = RepoRepresentation, - config: Optional[Dict[str, Any]] = None, + pack_size_bytes: int = 4 * 1024 * 1024 * 1024, + save_data_path: Optional[str] = None, + max_content_size: Optional[int] = None, ): """Initialize the bulk updater. Args: repo_representation: swh's repository representation which is in charge of filtering between known and remote data. """ - super().__init__(logging_class="swh.loader.git.BulkLoader", config=config) - self.config = merge_configs(DEFAULT_CONFIG, self.config) + super().__init__( + storage=storage, + save_data_path=save_data_path, + max_content_size=max_content_size, + ) self.origin_url = url self.base_url = base_url self.ignore_history = ignore_history self.repo_representation = repo_representation - + self.pack_size_bytes = pack_size_bytes # state initialized in fetch_data self.remote_refs: Dict[bytes, bytes] = {} self.symbolic_refs: Dict[bytes, bytes] = {} def fetch_pack_from_origin( self, origin_url: str, base_snapshot: Optional[Snapshot], do_activity: Callable[[bytes], None], ) -> FetchPackReturn: """Fetch a pack from the origin""" pack_buffer = BytesIO() base_repo = self.repo_representation( storage=self.storage, base_snapshot=base_snapshot, ignore_history=self.ignore_history, ) client, path = dulwich.client.get_transport_and_path( origin_url, thin_packs=False ) - size_limit = self.config["pack_size_bytes"] + size_limit = self.pack_size_bytes def do_pack(data: bytes) -> None: cur_size = pack_buffer.tell() would_write = len(data) if cur_size + would_write > size_limit: raise IOError( - "Pack file too big for repository %s, " - "limit is %d bytes, current size is %d, " - "would write %d" % (origin_url, size_limit, cur_size, would_write) + f"Pack file too big for repository {origin_url}, " + "limit is {size_limit} bytes, current size is {cur_size}, " + "would write {would_write}" ) pack_buffer.write(data) pack_result = client.fetch_pack( path, base_repo.determine_wants, base_repo.graph_walker(), do_pack, progress=do_activity, ) remote_refs = pack_result.refs or {} symbolic_refs = pack_result.symrefs or {} pack_buffer.flush() pack_size = pack_buffer.tell() pack_buffer.seek(0) return FetchPackReturn( remote_refs=utils.filter_refs(remote_refs), symbolic_refs=utils.filter_refs(symbolic_refs), pack_buffer=pack_buffer, pack_size=pack_size, ) def list_pack( self, pack_data, pack_size ) -> Tuple[Dict[bytes, bytes], Dict[bytes, Set[bytes]]]: id_to_type = {} type_to_ids = defaultdict(set) inflater = self.get_inflater() for obj in inflater: type, id = obj.type_name, obj.id id_to_type[id] = type type_to_ids[type].add(id) return id_to_type, type_to_ids - def prepare_origin_visit(self, *args, **kwargs) -> None: + def prepare_origin_visit(self) -> None: self.visit_date = datetime.datetime.now(tz=datetime.timezone.utc) self.origin = Origin(url=self.origin_url) def get_full_snapshot(self, origin_url) -> Optional[Snapshot]: return snapshot_get_latest(self.storage, origin_url) - def prepare(self, *args, **kwargs) -> None: + def prepare(self) -> None: assert self.origin is not None prev_snapshot: Optional[Snapshot] = None if not self.ignore_history: prev_snapshot = self.get_full_snapshot(self.origin.url) if self.base_url and prev_snapshot is None: base_origin = list(self.storage.origin_get([self.base_url]))[0] if base_origin: prev_snapshot = self.get_full_snapshot(base_origin.url) if prev_snapshot is not None: self.base_snapshot = prev_snapshot else: self.base_snapshot = Snapshot(branches={}) def fetch_data(self) -> bool: assert self.origin is not None def do_progress(msg: bytes) -> None: sys.stderr.buffer.write(msg) sys.stderr.flush() try: fetch_info = self.fetch_pack_from_origin( self.origin.url, self.base_snapshot, do_progress ) except NotGitRepository as e: raise NotFound(e) except GitProtocolError as e: # unfortunately, that kind of error is not specific to a not found # scenario... It depends on the value of message within the exception. for msg in [ "Repository unavailable", # e.g DMCA takedown "Repository not found", "unexpected http resp 401", ]: if msg in e.args[0]: raise NotFound(e) # otherwise transmit the error raise self.pack_buffer = fetch_info.pack_buffer self.pack_size = fetch_info.pack_size self.remote_refs = fetch_info.remote_refs self.symbolic_refs = fetch_info.symbolic_refs self.log.info( "Listed %d refs for repo %s" % (len(self.remote_refs), self.origin.url), extra={ "swh_type": "git_repo_list_refs", "swh_repo": self.origin.url, "swh_num_refs": len(self.remote_refs), }, ) # We want to load the repository, walk all the objects id_to_type, type_to_ids = self.list_pack(self.pack_buffer, self.pack_size) self.id_to_type = id_to_type self.type_to_ids = type_to_ids # No more data to fetch return False def save_data(self) -> None: """Store a pack for archival""" assert isinstance(self.visit_date, datetime.datetime) write_size = 8192 pack_dir = self.get_save_data_path() pack_name = "%s.pack" % self.visit_date.isoformat() refs_name = "%s.refs" % self.visit_date.isoformat() with open(os.path.join(pack_dir, pack_name), "xb") as f: self.pack_buffer.seek(0) while True: r = self.pack_buffer.read(write_size) if not r: break f.write(r) self.pack_buffer.seek(0) with open(os.path.join(pack_dir, refs_name), "xb") as f: pickle.dump(self.remote_refs, f) def get_inflater(self) -> PackInflater: """Reset the pack buffer and get an object inflater from it""" self.pack_buffer.seek(0) return PackInflater.for_pack_data( PackData.from_file(self.pack_buffer, self.pack_size) ) def has_contents(self) -> bool: return bool(self.type_to_ids[b"blob"]) def get_content_ids(self) -> Iterable[Dict[str, Any]]: """Get the content identifiers from the git repository""" for raw_obj in self.get_inflater(): if raw_obj.type_name != b"blob": continue yield converters.dulwich_blob_to_content_id(raw_obj) def get_contents(self) -> Iterable[BaseContent]: """Format the blobs from the git repository as swh contents""" missing_contents = set( self.storage.content_missing(list(self.get_content_ids()), "sha1_git") ) for raw_obj in self.get_inflater(): if raw_obj.type_name != b"blob": continue if raw_obj.sha().digest() not in missing_contents: continue yield converters.dulwich_blob_to_content( raw_obj, max_content_size=self.max_content_size ) def has_directories(self) -> bool: return bool(self.type_to_ids[b"tree"]) def get_directory_ids(self) -> Iterable[Sha1Git]: """Get the directory identifiers from the git repository""" return (hashutil.hash_to_bytes(id.decode()) for id in self.type_to_ids[b"tree"]) def get_directories(self) -> Iterable[Directory]: """Format the trees as swh directories""" missing_dirs = set( self.storage.directory_missing(sorted(self.get_directory_ids())) ) for raw_obj in self.get_inflater(): if raw_obj.type_name != b"tree": continue if raw_obj.sha().digest() not in missing_dirs: continue yield converters.dulwich_tree_to_directory(raw_obj, log=self.log) def has_revisions(self) -> bool: return bool(self.type_to_ids[b"commit"]) def get_revision_ids(self) -> Iterable[Sha1Git]: """Get the revision identifiers from the git repository""" return ( hashutil.hash_to_bytes(id.decode()) for id in self.type_to_ids[b"commit"] ) def get_revisions(self) -> Iterable[Revision]: """Format commits as swh revisions""" missing_revs = set( self.storage.revision_missing(sorted(self.get_revision_ids())) ) for raw_obj in self.get_inflater(): if raw_obj.type_name != b"commit": continue if raw_obj.sha().digest() not in missing_revs: continue yield converters.dulwich_commit_to_revision(raw_obj, log=self.log) def has_releases(self) -> bool: return bool(self.type_to_ids[b"tag"]) def get_release_ids(self) -> Iterable[Sha1Git]: """Get the release identifiers from the git repository""" return (hashutil.hash_to_bytes(id.decode()) for id in self.type_to_ids[b"tag"]) def get_releases(self) -> Iterable[Release]: """Retrieve all the release objects from the git repository""" missing_rels = set(self.storage.release_missing(sorted(self.get_release_ids()))) for raw_obj in self.get_inflater(): if raw_obj.type_name != b"tag": continue if raw_obj.sha().digest() not in missing_rels: continue yield converters.dulwich_tag_to_release(raw_obj, log=self.log) def get_snapshot(self) -> Snapshot: """Get the snapshot for the current visit. The main complexity of this function is mapping target objects to their types, as the `refs` dictionaries returned by the git server only give us the identifiers for the target objects, and not their types. The loader itself only knows the types of the objects that it has fetched from the server (as it has parsed them while loading them to the archive). As we only fetched an increment between the previous snapshot and the current state of the server, we are missing the type information for the objects that would already have been referenced by the previous snapshot, and that the git server didn't send us. We infer the type of these objects from the previous snapshot. """ branches: Dict[bytes, Optional[SnapshotBranch]] = {} unfetched_refs: Dict[bytes, bytes] = {} # Retrieve types from the objects loaded by the current loader for ref_name, ref_object in self.remote_refs.items(): if ref_name in self.symbolic_refs: continue target = hashutil.hash_to_bytes(ref_object.decode()) object_type = self.id_to_type.get(ref_object) if object_type: branches[ref_name] = SnapshotBranch( target=target, target_type=converters.DULWICH_TARGET_TYPES[object_type], ) else: # The object pointed at by this ref was not fetched, supposedly # because it existed in the base snapshot. We record it here, # and we can get it from the base snapshot later. unfetched_refs[ref_name] = target dangling_branches = {} # Handle symbolic references as alias branches for ref_name, target in self.symbolic_refs.items(): branches[ref_name] = SnapshotBranch( target_type=TargetType.ALIAS, target=target, ) if target not in branches and target not in unfetched_refs: # This handles the case where the pointer is "dangling". # There's a chance that a further symbolic reference # override this default value, which is totally fine. dangling_branches[target] = ref_name branches[target] = None if unfetched_refs: # Handle inference of object types from the contents of the # previous snapshot unknown_objects = {} base_snapshot_reverse_branches = { branch.target: branch for branch in self.base_snapshot.branches.values() if branch and branch.target_type != TargetType.ALIAS } for ref_name, target in unfetched_refs.items(): branch = base_snapshot_reverse_branches.get(target) branches[ref_name] = branch if not branch: unknown_objects[ref_name] = target if unknown_objects: # This object was referenced by the server; We did not fetch # it, and we do not know it from the previous snapshot. This is # likely a bug in the loader. raise RuntimeError( "Unknown objects referenced by remote refs: %s" % ( ", ".join( f"{name.decode()}: {hashutil.hash_to_hex(obj)}" for name, obj in unknown_objects.items() ) ) ) utils.warn_dangling_branches( branches, dangling_branches, self.log, self.origin_url ) self.snapshot = Snapshot(branches=branches) return self.snapshot def get_fetch_history_result(self) -> Dict[str, int]: return { "contents": len(self.type_to_ids[b"blob"]), "directories": len(self.type_to_ids[b"tree"]), "revisions": len(self.type_to_ids[b"commit"]), "releases": len(self.type_to_ids[b"tag"]), } def load_status(self) -> Dict[str, Any]: """The load was eventful if the current snapshot is different to the one we retrieved at the beginning of the run""" eventful = False if self.base_snapshot and self.snapshot: eventful = self.snapshot.id != self.base_snapshot.id elif self.snapshot: eventful = bool(self.snapshot.branches) return {"status": ("eventful" if eventful else "uneventful")} if __name__ == "__main__": import click logging.basicConfig( level=logging.DEBUG, format="%(asctime)s %(process)d %(message)s" ) @click.command() @click.option("--origin-url", help="Origin url", required=True) @click.option("--base-url", default=None, help="Optional Base url") @click.option( "--ignore-history/--no-ignore-history", help="Ignore the repository history", default=False, ) def main(origin_url: str, base_url: str, ignore_history: bool) -> Dict[str, Any]: + from swh.storage import get_storage + + storage = get_storage(cls="memory") loader = GitLoader( - origin_url, base_url=base_url, ignore_history=ignore_history, + storage, origin_url, base_url=base_url, ignore_history=ignore_history, ) return loader.load() main() diff --git a/swh/loader/git/tasks.py b/swh/loader/git/tasks.py index 50d5f16..fd039e5 100644 --- a/swh/loader/git/tasks.py +++ b/swh/loader/git/tasks.py @@ -1,48 +1,52 @@ -# Copyright (C) 2015-2019 The Software Heritage developers +# Copyright (C) 2015-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Any, Dict, Optional from celery import shared_task import dateutil.parser from swh.loader.git.from_disk import GitLoaderFromArchive, GitLoaderFromDisk from swh.loader.git.loader import GitLoader @shared_task(name=__name__ + ".UpdateGitRepository") def load_git(*, url: str, base_url: Optional[str] = None) -> Dict[str, Any]: """Import a git repository from a remote location """ - loader = GitLoader(url, base_url=base_url) + loader = GitLoader.from_configfile(url=url, base_url=base_url) return loader.load() @shared_task(name=__name__ + ".LoadDiskGitRepository") def load_git_from_dir(*, url: str, directory: str, date: str) -> Dict[str, Any]: """Import a git repository from a local repository Import a git repository, cloned in `directory` from `origin_url` at `date`. """ visit_date = dateutil.parser.parse(date) - loader = GitLoaderFromDisk(url, directory=directory, visit_date=visit_date) + loader = GitLoaderFromDisk.from_configfile( + url=url, directory=directory, visit_date=visit_date + ) return loader.load() @shared_task(name=__name__ + ".UncompressAndLoadDiskGitRepository") def load_git_from_zip(*, url: str, archive_path: str, date: str) -> Dict[str, Any]: """Import a git repository from a zip archive 1. Uncompress an archive repository in a local and temporary folder 2. Load it through the git disk loader 3. Clean up the temporary folder """ visit_date = dateutil.parser.parse(date) - loader = GitLoaderFromArchive(url, archive_path=archive_path, visit_date=visit_date) + loader = GitLoaderFromArchive.from_configfile( + url=url, archive_path=archive_path, visit_date=visit_date + ) return loader.load() diff --git a/swh/loader/git/tests/conftest.py b/swh/loader/git/tests/conftest.py index 4c86cb3..1f13e41 100644 --- a/swh/loader/git/tests/conftest.py +++ b/swh/loader/git/tests/conftest.py @@ -1,35 +1,39 @@ -# Copyright (C) 2018-2020 The Software Heritage developers +# Copyright (C) 2018-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Any, Dict import pytest @pytest.fixture -def swh_loader_config(swh_storage_backend_config) -> Dict[str, Any]: - swh_storage_backend_config["journal_writer"] = {} +def swh_storage_backend_config(swh_storage_backend_config): + """Basic pg storage configuration with no journal collaborator + (to avoid pulling optional dependency on clients of this fixture) + + """ return { + "cls": "filter", "storage": { - "cls": "pipeline", - "steps": [ - {"cls": "filter"}, - { - "cls": "buffer", - "min_batch_size": { - "content": 10, - "content_bytes": 100 * 1024 * 1024, - "directory": 10, - "revision": 10, - "release": 10, - }, - }, - swh_storage_backend_config, - ], + "cls": "buffer", + "min_batch_size": { + "content": 10, + "content_bytes": 100 * 1024 * 1024, + "directory": 10, + "revision": 10, + "release": 10, + }, + "storage": swh_storage_backend_config, }, + } + + +@pytest.fixture +def swh_loader_config(swh_storage_backend_config) -> Dict[str, Any]: + return { + "storage": swh_storage_backend_config, "max_content_size": 100 * 1024 * 1024, - "pack_size_bytes": 4 * 1024 * 1024 * 1024, - "save_data": False, + "save_data_path": None, } diff --git a/swh/loader/git/tests/test_from_disk.py b/swh/loader/git/tests/test_from_disk.py index bc50e73..3a26e15 100644 --- a/swh/loader/git/tests/test_from_disk.py +++ b/swh/loader/git/tests/test_from_disk.py @@ -1,443 +1,445 @@ # Copyright (C) 2018-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import copy import datetime import os.path from unittest import TestCase import dulwich.repo import pytest from swh.loader.git.from_disk import GitLoaderFromArchive, GitLoaderFromDisk from swh.loader.tests import ( assert_last_visit_matches, check_snapshot, get_stats, prepare_repository_from_archive, ) from swh.model.hashutil import hash_to_bytes from swh.model.model import Snapshot, SnapshotBranch, TargetType from swh.storage.algos.snapshot import snapshot_get_all_branches SNAPSHOT1 = Snapshot( id=hash_to_bytes("a23699280a82a043f8c0994cf1631b568f716f95"), branches={ b"HEAD": SnapshotBranch( target=b"refs/heads/master", target_type=TargetType.ALIAS, ), b"refs/heads/master": SnapshotBranch( target=hash_to_bytes("2f01f5ca7e391a2f08905990277faf81e709a649"), target_type=TargetType.REVISION, ), b"refs/heads/branch1": SnapshotBranch( target=hash_to_bytes("b0a77609903f767a2fd3d769904ef9ef68468b87"), target_type=TargetType.REVISION, ), b"refs/heads/branch2": SnapshotBranch( target=hash_to_bytes("bd746cd1913721b269b395a56a97baf6755151c2"), target_type=TargetType.REVISION, ), b"refs/tags/branch2-after-delete": SnapshotBranch( target=hash_to_bytes("bd746cd1913721b269b395a56a97baf6755151c2"), target_type=TargetType.REVISION, ), b"refs/tags/branch2-before-delete": SnapshotBranch( target=hash_to_bytes("1135e94ccf73b5f9bd6ef07b3fa2c5cc60bba69b"), target_type=TargetType.REVISION, ), }, ) # directory hashes obtained with: # gco b6f40292c4e94a8f7e7b4aff50e6c7429ab98e2a # swh-hashtree --ignore '.git' --path . # gco 2f01f5ca7e391a2f08905990277faf81e709a649 # swh-hashtree --ignore '.git' --path . # gco bcdc5ebfde1a3cd6c96e0c2ea4eed19c13208777 # swh-hashtree --ignore '.git' --path . # gco 1135e94ccf73b5f9bd6ef07b3fa2c5cc60bba69b # swh-hashtree --ignore '.git' --path . # gco 79f65ac75f79dda6ff03d66e1242702ab67fb51c # swh-hashtree --ignore '.git' --path . # gco b0a77609903f767a2fd3d769904ef9ef68468b87 # swh-hashtree --ignore '.git' --path . # gco bd746cd1913721b269b395a56a97baf6755151c2 # swh-hashtree --ignore '.git' --path . REVISIONS1 = { "b6f40292c4e94a8f7e7b4aff50e6c7429ab98e2a": ( "40dbdf55dfd4065422462cc74a949254aefa972e" ), "2f01f5ca7e391a2f08905990277faf81e709a649": ( "e1d0d894835f91a0f887a4bc8b16f81feefdfbd5" ), "bcdc5ebfde1a3cd6c96e0c2ea4eed19c13208777": ( "b43724545b4759244bb54be053c690649161411c" ), "1135e94ccf73b5f9bd6ef07b3fa2c5cc60bba69b": ( "fbf70528223d263661b5ad4b80f26caf3860eb8e" ), "79f65ac75f79dda6ff03d66e1242702ab67fb51c": ( "5df34ec74d6f69072d9a0a6677d8efbed9b12e60" ), "b0a77609903f767a2fd3d769904ef9ef68468b87": ( "9ca0c7d6ffa3f9f0de59fd7912e08f11308a1338" ), "bd746cd1913721b269b395a56a97baf6755151c2": ( "e1d0d894835f91a0f887a4bc8b16f81feefdfbd5" ), } class CommonGitLoaderTests: """Common tests for all git loaders.""" def test_load(self): """Loads a simple repository (made available by `setUp()`), and checks everything was added in the storage.""" res = self.loader.load() assert res == {"status": "eventful"} assert_last_visit_matches( self.loader.storage, self.repo_url, status="full", type="git", snapshot=SNAPSHOT1.id, ) stats = get_stats(self.loader.storage) assert stats == { "content": 4, "directory": 7, "origin": 1, "origin_visit": 1, "release": 0, "revision": 7, "skipped_content": 0, "snapshot": 1, } check_snapshot(SNAPSHOT1, self.loader.storage) def test_load_unchanged(self): """Checks loading a repository a second time does not add any extra data.""" res = self.loader.load() assert res == {"status": "eventful"} assert_last_visit_matches( self.loader.storage, self.repo_url, status="full", type="git", snapshot=SNAPSHOT1.id, ) stats0 = get_stats(self.loader.storage) assert stats0 == { "content": 4, "directory": 7, "origin": 1, "origin_visit": 1, "release": 0, "revision": 7, "skipped_content": 0, "snapshot": 1, } res = self.loader.load() assert res == {"status": "uneventful"} stats1 = get_stats(self.loader.storage) expected_stats = copy.deepcopy(stats0) expected_stats["origin_visit"] += 1 assert stats1 == expected_stats check_snapshot(SNAPSHOT1, self.loader.storage) assert_last_visit_matches( self.loader.storage, self.repo_url, status="full", type="git", snapshot=SNAPSHOT1.id, ) def test_load_visit_without_snapshot_so_status_failed(self): # unfortunately, monkey-patch the hard way, self.loader is already instantiated # (patching won't work self.loader is already instantiated) # Make get_contents fail for some reason self.loader.get_contents = None res = self.loader.load() assert res == {"status": "failed"} assert_last_visit_matches( self.loader.storage, self.repo_url, status="failed", type="git", snapshot=None, ) def test_load_visit_with_snapshot_so_status_partial(self): # unfortunately, monkey-patch the hard way, self.loader is already instantiated # (patching won't work self.loader is already instantiated) # fake store_metadata raising for some reason, so we could have a snapshot id # at this point in time self.loader.store_metadata = None # fake having a snapshot so the visit status is partial self.loader.loaded_snapshot_id = hash_to_bytes( "a23699280a82a043f8c0994cf1631b568f716f95" ) res = self.loader.load() assert res == {"status": "failed"} assert_last_visit_matches( self.loader.storage, self.repo_url, status="partial", type="git", snapshot=None, ) class FullGitLoaderTests(CommonGitLoaderTests): """Tests for GitLoader (from disk or not). Includes the common ones, and add others that only work with a local dir. """ def test_load_changed(self): """Loads a repository, makes some changes by adding files, commits, and merges, load it again, and check the storage contains everything it should.""" # Initial load res = self.loader.load() assert res == {"status": "eventful"} stats0 = get_stats(self.loader.storage) assert stats0 == { "content": 4, "directory": 7, "origin": 1, "origin_visit": 1, "release": 0, "revision": 7, "skipped_content": 0, "snapshot": 1, } # Load with a new file + revision with open(os.path.join(self.destination_path, "hello.py"), "a") as fd: fd.write("print('Hello world')\n") self.repo.stage([b"hello.py"]) new_revision = self.repo.do_commit(b"Hello world\n").decode() new_dir = "85dae072a5aa9923ffa7a7568f819ff21bf49858" assert self.repo[new_revision.encode()].tree == new_dir.encode() revisions = REVISIONS1.copy() assert new_revision not in revisions revisions[new_revision] = new_dir res = self.loader.load() assert res == {"status": "eventful"} stats1 = get_stats(self.loader.storage) expected_stats = copy.deepcopy(stats0) # did one new visit expected_stats["origin_visit"] += 1 # with one more of the following objects expected_stats["content"] += 1 expected_stats["directory"] += 1 expected_stats["revision"] += 1 # concluding into 1 new snapshot expected_stats["snapshot"] += 1 assert stats1 == expected_stats visit_status = assert_last_visit_matches( self.loader.storage, self.repo_url, status="full", type="git" ) assert visit_status.snapshot is not None snapshot_id = visit_status.snapshot snapshot = snapshot_get_all_branches(self.loader.storage, snapshot_id) branches = snapshot.branches assert branches[b"HEAD"] == SnapshotBranch( target=b"refs/heads/master", target_type=TargetType.ALIAS, ) assert branches[b"refs/heads/master"] == SnapshotBranch( target=hash_to_bytes(new_revision), target_type=TargetType.REVISION, ) # Merge branch1 into HEAD. current = self.repo[b"HEAD"] branch1 = self.repo[b"refs/heads/branch1"] merged_tree = dulwich.objects.Tree() for item in self.repo[current.tree].items(): merged_tree.add(*item) for item in self.repo[branch1.tree].items(): merged_tree.add(*item) merged_dir_id = "dab8a37df8db8666d4e277bef9a546f585b5bedd" assert merged_tree.id.decode() == merged_dir_id self.repo.object_store.add_object(merged_tree) merge_commit = self.repo.do_commit( b"merge.\n", tree=merged_tree.id, merge_heads=[branch1.id] ) assert merge_commit.decode() not in revisions revisions[merge_commit.decode()] = merged_tree.id.decode() res = self.loader.load() assert res == {"status": "eventful"} stats2 = get_stats(self.loader.storage) expected_stats = copy.deepcopy(stats1) # one more visit expected_stats["origin_visit"] += 1 # with 1 new directory and revision expected_stats["directory"] += 1 expected_stats["revision"] += 1 # concluding into 1 new snapshot expected_stats["snapshot"] += 1 assert stats2 == expected_stats visit_status = assert_last_visit_matches( self.loader.storage, self.repo_url, status="full", type="git" ) assert visit_status.snapshot is not None merge_snapshot_id = visit_status.snapshot assert merge_snapshot_id != snapshot_id merge_snapshot = snapshot_get_all_branches( self.loader.storage, merge_snapshot_id ) merge_branches = merge_snapshot.branches assert merge_branches[b"HEAD"] == SnapshotBranch( target=b"refs/heads/master", target_type=TargetType.ALIAS, ) assert merge_branches[b"refs/heads/master"] == SnapshotBranch( target=hash_to_bytes(merge_commit.decode()), target_type=TargetType.REVISION, ) def test_load_filter_branches(self): filtered_branches = {b"refs/pull/42/merge"} unfiltered_branches = {b"refs/pull/42/head"} # Add branches to the repository on disk; some should be filtered by # the loader, some should not. for branch_name in filtered_branches | unfiltered_branches: self.repo[branch_name] = self.repo[b"refs/heads/master"] # Generate the expected snapshot from SNAPSHOT1 (which is the original # state of the git repo)... branches = dict(SNAPSHOT1.branches) # ... and the unfiltered_branches, which are all pointing to the same # commit as "refs/heads/master". for branch_name in unfiltered_branches: branches[branch_name] = branches[b"refs/heads/master"] expected_snapshot = Snapshot(branches=branches) # Load the modified repository res = self.loader.load() assert res == {"status": "eventful"} check_snapshot(expected_snapshot, self.loader.storage) assert_last_visit_matches( self.loader.storage, self.repo_url, status="full", type="git", snapshot=expected_snapshot.id, ) def test_load_dangling_symref(self): with open(os.path.join(self.destination_path, ".git/HEAD"), "wb") as f: f.write(b"ref: refs/heads/dangling-branch\n") res = self.loader.load() assert res == {"status": "eventful"} visit_status = assert_last_visit_matches( self.loader.storage, self.repo_url, status="full", type="git" ) snapshot_id = visit_status.snapshot assert snapshot_id is not None snapshot = snapshot_get_all_branches(self.loader.storage, snapshot_id) branches = snapshot.branches assert branches[b"HEAD"] == SnapshotBranch( target=b"refs/heads/dangling-branch", target_type=TargetType.ALIAS, ) assert branches[b"refs/heads/dangling-branch"] is None stats = get_stats(self.loader.storage) assert stats == { "content": 4, "directory": 7, "origin": 1, "origin_visit": 1, "release": 0, "revision": 7, "skipped_content": 0, "snapshot": 1, } class GitLoaderFromDiskTest(TestCase, FullGitLoaderTests): """Prepare a git directory repository to be loaded through a GitLoaderFromDisk. This tests all git loader scenario. """ @pytest.fixture(autouse=True) - def init(self, swh_config, datadir, tmp_path): + def init(self, swh_storage, datadir, tmp_path): archive_name = "testrepo" archive_path = os.path.join(datadir, f"{archive_name}.tgz") tmp_path = str(tmp_path) self.repo_url = prepare_repository_from_archive( archive_path, archive_name, tmp_path=tmp_path ) self.destination_path = os.path.join(tmp_path, archive_name) self.loader = GitLoaderFromDisk( + swh_storage, url=self.repo_url, visit_date=datetime.datetime( 2016, 5, 3, 15, 16, 32, tzinfo=datetime.timezone.utc ), directory=self.destination_path, ) self.repo = dulwich.repo.Repo(self.destination_path) class GitLoaderFromArchiveTest(TestCase, CommonGitLoaderTests): """Tests for GitLoaderFromArchive. Only tests common scenario.""" @pytest.fixture(autouse=True) - def init(self, swh_config, datadir, tmp_path): + def init(self, swh_storage, datadir, tmp_path): archive_name = "testrepo" archive_path = os.path.join(datadir, f"{archive_name}.tgz") self.repo_url = archive_path self.loader = GitLoaderFromArchive( + swh_storage, url=self.repo_url, archive_path=archive_path, visit_date=datetime.datetime( 2016, 5, 3, 15, 16, 32, tzinfo=datetime.timezone.utc ), ) diff --git a/swh/loader/git/tests/test_loader.py b/swh/loader/git/tests/test_loader.py index 908f426..b0b0881 100644 --- a/swh/loader/git/tests/test_loader.py +++ b/swh/loader/git/tests/test_loader.py @@ -1,126 +1,124 @@ # Copyright (C) 2018-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os from unittest import TestCase from dulwich.errors import GitProtocolError, NotGitRepository, ObjectFormatException import dulwich.repo import pytest from swh.loader.git.loader import GitLoader from swh.loader.git.tests.test_from_disk import FullGitLoaderTests from swh.loader.tests import assert_last_visit_matches, prepare_repository_from_archive class CommonGitLoaderNotFound: @pytest.fixture(autouse=True) def __inject_fixtures(self, mocker): """Inject required fixtures in unittest.TestCase class """ self.mocker = mocker def test_load_visit_not_found(self): """Ingesting an unknown url result in a visit with not_found status """ for failure_exception in [ GitProtocolError("Repository unavailable"), # e.g DMCA takedown GitProtocolError("Repository not found"), GitProtocolError("unexpected http resp 401"), NotGitRepository("not a git repo"), ]: with self.subTest(failure_exception=failure_exception): # simulate an initial communication error (e.g no repository found, ...) mock = self.mocker.patch( "swh.loader.git.loader.GitLoader.fetch_pack_from_origin" ) mock.side_effect = failure_exception res = self.loader.load() assert res == {"status": "uneventful"} assert_last_visit_matches( self.loader.storage, self.repo_url, status="not_found", type="git", snapshot=None, ) def test_load_visit_failure(self): """Failing during the fetch pack step result in failing visit """ for failure_exception in [ IOError, ObjectFormatException, OSError, ValueError, GitProtocolError, ]: with self.subTest(failure_exception=failure_exception): # simulate a fetch communication error after the initial connection # server error (e.g IOError, ObjectFormatException, ...) mock = self.mocker.patch( "swh.loader.git.loader.GitLoader.fetch_pack_from_origin" ) mock.side_effect = failure_exception("failure") res = self.loader.load() assert res == {"status": "failed"} assert_last_visit_matches( self.loader.storage, self.repo_url, status="failed", type="git", snapshot=None, ) class GitLoaderTest(TestCase, FullGitLoaderTests, CommonGitLoaderNotFound): """Prepare a git directory repository to be loaded through a GitLoader. This tests all git loader scenario. """ @pytest.fixture(autouse=True) - def init(self, swh_config, datadir, tmp_path): + def init(self, swh_storage, datadir, tmp_path): super().setUp() archive_name = "testrepo" archive_path = os.path.join(datadir, f"{archive_name}.tgz") tmp_path = str(tmp_path) self.repo_url = prepare_repository_from_archive( archive_path, archive_name, tmp_path=tmp_path ) self.destination_path = os.path.join(tmp_path, archive_name) - self.loader = GitLoader(self.repo_url) + self.loader = GitLoader(swh_storage, self.repo_url) self.repo = dulwich.repo.Repo(self.destination_path) class GitLoader2Test(TestCase, FullGitLoaderTests, CommonGitLoaderNotFound): """Mostly the same loading scenario but with a base-url different than the repo-url. To walk slightly different paths, the end result should stay the same. """ @pytest.fixture(autouse=True) - def init(self, swh_loader_config, datadir, tmp_path): + def init(self, swh_storage, datadir, tmp_path): super().setUp() archive_name = "testrepo" archive_path = os.path.join(datadir, f"{archive_name}.tgz") tmp_path = str(tmp_path) self.repo_url = prepare_repository_from_archive( archive_path, archive_name, tmp_path=tmp_path ) self.destination_path = os.path.join(tmp_path, archive_name) base_url = f"base://{self.repo_url}" - self.loader = GitLoader( - self.repo_url, base_url=base_url, config=swh_loader_config - ) + self.loader = GitLoader(swh_storage, self.repo_url, base_url=base_url) self.repo = dulwich.repo.Repo(self.destination_path)