diff --git a/swh/loader/git/converters.py b/swh/loader/git/converters.py index 75f5206..9dfd5bd 100644 --- a/swh/loader/git/converters.py +++ b/swh/loader/git/converters.py @@ -1,217 +1,227 @@ # Copyright (C) 2015-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Convert dulwich objects to dictionaries suitable for swh.storage""" -from swh.model import identifiers +from typing import Any, Dict, Optional + from swh.model.hashutil import ( DEFAULT_ALGORITHMS, hash_to_bytes, MultiHash ) +from swh.model.model import ( + BaseContent, Content, Directory, DirectoryEntry, + ObjectType, Person, Release, Revision, RevisionType, + SkippedContent, TargetType, Timestamp, TimestampWithTimezone, +) HASH_ALGORITHMS = DEFAULT_ALGORITHMS - {'sha1_git'} -def origin_url_to_origin(origin_url): - """Format a pygit2.Repository as an origin suitable for swh.storage""" - return { - 'url': origin_url, - } - - -def dulwich_blob_to_content_id(blob): +def dulwich_blob_to_content_id(blob) -> Dict[str, Any]: """Convert a dulwich blob to a Software Heritage content id""" if blob.type_name != b'blob': - return + raise ValueError('Argument is not a blob.') size = blob.raw_length() data = blob.as_raw_string() hashes = MultiHash.from_data(data, HASH_ALGORITHMS).digest() hashes['sha1_git'] = blob.sha().digest() hashes['length'] = size return hashes -def dulwich_blob_to_content(blob): +def dulwich_blob_to_content(blob, max_content_size=None) -> BaseContent: """Convert a dulwich blob to a Software Heritage content """ if blob.type_name != b'blob': - return - ret = dulwich_blob_to_content_id(blob) - data = blob.as_raw_string() - ret['data'] = data - ret['status'] = 'visible' - return ret + raise ValueError('Argument is not a blob.') + hashes = dulwich_blob_to_content_id(blob) + if max_content_size is not None and hashes['length'] >= max_content_size: + return SkippedContent( + status='absent', + reason='Content too large', + **hashes, + ) + else: + return Content( + data=blob.as_raw_string(), + status='visible', + **hashes, + ) -def dulwich_tree_to_directory(tree, log=None): +def dulwich_tree_to_directory(tree, log=None) -> Directory: """Format a tree as a directory""" if tree.type_name != b'tree': - return + raise ValueError('Argument is not a tree.') - ret = { - 'id': tree.sha().digest(), - } entries = [] - ret['entries'] = entries entry_mode_map = { 0o040000: 'dir', 0o160000: 'rev', 0o100644: 'file', 0o100755: 'file', 0o120000: 'file', } for entry in tree.iteritems(): - entries.append({ - 'type': entry_mode_map.get(entry.mode, 'file'), - 'perms': entry.mode, - 'name': entry.path, - 'target': hash_to_bytes(entry.sha.decode('ascii')), - }) + entries.append(DirectoryEntry( + type=entry_mode_map.get(entry.mode, 'file'), + perms=entry.mode, + name=entry.path, + target=hash_to_bytes(entry.sha.decode('ascii')), + )) - return ret + return Directory( + id=tree.sha().digest(), + entries=entries, + ) -def parse_author(name_email): +def parse_author(name_email: bytes) -> Person: """Parse an author line""" - if name_email is None: - return None + raise ValueError('fullname is None') try: open_bracket = name_email.index(b'<') except ValueError: name = email = None else: raw_name = name_email[:open_bracket] raw_email = name_email[open_bracket+1:] if not raw_name: name = None elif raw_name.endswith(b' '): name = raw_name[:-1] else: name = raw_name try: close_bracket = raw_email.index(b'>') except ValueError: email = None else: email = raw_email[:close_bracket] - return { - 'name': name, - 'email': email, - 'fullname': name_email, - } + return Person( + name=name, + email=email, + fullname=name_email, + ) -def dulwich_tsinfo_to_timestamp(timestamp, timezone, timezone_neg_utc): +def dulwich_tsinfo_to_timestamp( + timestamp, timezone, timezone_neg_utc) -> TimestampWithTimezone: """Convert the dulwich timestamp information to a structure compatible with Software Heritage""" - return { - 'timestamp': timestamp, - 'offset': timezone // 60, - 'negative_utc': timezone_neg_utc if timezone == 0 else None, - } + return TimestampWithTimezone( + timestamp=Timestamp( + seconds=timestamp, + microseconds=0, + ), + offset=timezone // 60, + negative_utc=timezone_neg_utc if timezone == 0 else None, + ) -def dulwich_commit_to_revision(commit, log=None): +def dulwich_commit_to_revision(commit, log=None) -> Revision: if commit.type_name != b'commit': - return - - ret = { - 'id': commit.sha().digest(), - 'author': parse_author(commit.author), - 'date': dulwich_tsinfo_to_timestamp( - commit.author_time, - commit.author_timezone, - commit._author_timezone_neg_utc, - ), - 'committer': parse_author(commit.committer), - 'committer_date': dulwich_tsinfo_to_timestamp( - commit.commit_time, - commit.commit_timezone, - commit._commit_timezone_neg_utc, - ), - 'type': 'git', - 'directory': bytes.fromhex(commit.tree.decode()), - 'message': commit.message, - 'metadata': None, - 'synthetic': False, - 'parents': [bytes.fromhex(p.decode()) for p in commit.parents], - } + raise ValueError('Argument is not a commit.') git_metadata = [] if commit.encoding is not None: git_metadata.append(['encoding', commit.encoding]) if commit.mergetag: for mergetag in commit.mergetag: raw_string = mergetag.as_raw_string() assert raw_string.endswith(b'\n') git_metadata.append(['mergetag', raw_string[:-1]]) if commit.extra: git_metadata.extend([k.decode('utf-8'), v] for k, v in commit.extra) if commit.gpgsig: git_metadata.append(['gpgsig', commit.gpgsig]) if git_metadata: - ret['metadata'] = { + metadata: Optional[Dict[str, Any]] = { 'extra_headers': git_metadata, } + else: + metadata = None - return ret + return Revision( + id=commit.sha().digest(), + author=parse_author(commit.author), + date=dulwich_tsinfo_to_timestamp( + commit.author_time, + commit.author_timezone, + commit._author_timezone_neg_utc, + ), + committer=parse_author(commit.committer), + committer_date=dulwich_tsinfo_to_timestamp( + commit.commit_time, + commit.commit_timezone, + commit._commit_timezone_neg_utc, + ), + type=RevisionType.GIT, + directory=bytes.fromhex(commit.tree.decode()), + message=commit.message, + metadata=metadata, + synthetic=False, + parents=[bytes.fromhex(p.decode()) for p in commit.parents], + ) + + +DULWICH_TARGET_TYPES = { + b'blob': TargetType.CONTENT, + b'tree': TargetType.DIRECTORY, + b'commit': TargetType.REVISION, + b'tag': TargetType.RELEASE, +} -DULWICH_TYPES = { - b'blob': 'content', - b'tree': 'directory', - b'commit': 'revision', - b'tag': 'release', +DULWICH_OBJECT_TYPES = { + b'blob': ObjectType.CONTENT, + b'tree': ObjectType.DIRECTORY, + b'commit': ObjectType.REVISION, + b'tag': ObjectType.RELEASE, } -def dulwich_tag_to_release(tag, log=None): +def dulwich_tag_to_release(tag, log=None) -> Release: if tag.type_name != b'tag': - return + raise ValueError('Argument is not a tag.') target_type, target = tag.object - ret = { - 'id': tag.sha().digest(), - 'name': tag.name, - 'target': bytes.fromhex(target.decode()), - 'target_type': DULWICH_TYPES[target_type.type_name], - 'message': tag._message, - 'metadata': None, - 'synthetic': False, - } if tag.tagger: - ret['author'] = parse_author(tag.tagger) + author: Optional[Person] = parse_author(tag.tagger) if not tag.tag_time: - ret['date'] = None + date = None else: - ret['date'] = dulwich_tsinfo_to_timestamp( + date = dulwich_tsinfo_to_timestamp( tag.tag_time, tag.tag_timezone, tag._tag_timezone_neg_utc, ) else: - ret['author'] = ret['date'] = None - - return ret - - -def branches_to_snapshot(branches): - snapshot = {'branches': branches} - snapshot_id = identifiers.snapshot_identifier(snapshot) - snapshot['id'] = identifiers.identifier_to_bytes(snapshot_id) - - return snapshot + author = date = None + + return Release( + id=tag.sha().digest(), + author=author, + date=date, + name=tag.name, + target=bytes.fromhex(target.decode()), + target_type=DULWICH_OBJECT_TYPES[target_type.type_name], + message=tag._message, + metadata=None, + synthetic=False, + ) diff --git a/swh/loader/git/from_disk.py b/swh/loader/git/from_disk.py index 3bd3ca9..7571957 100644 --- a/swh/loader/git/from_disk.py +++ b/swh/loader/git/from_disk.py @@ -1,366 +1,371 @@ # Copyright (C) 2015-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import dulwich.repo import os import shutil +from typing import Dict, Optional from dulwich.errors import ObjectFormatException, EmptyFileException from collections import defaultdict from swh.model import hashutil +from swh.model.model import ( + Origin, Snapshot, SnapshotBranch, TargetType) from swh.loader.core.loader import DVCSLoader + from . import converters, utils class GitLoaderFromDisk(DVCSLoader): """Load a git repository from a directory. """ CONFIG_BASE_FILENAME = 'loader/git-disk' visit_type = 'git' def __init__(self, url, visit_date=None, directory=None, config=None): super().__init__(logging_class='swh.loader.git.Loader', config=config) self.origin_url = url self.visit_date = visit_date self.directory = directory def prepare_origin_visit(self, *args, **kwargs): - self.origin = converters.origin_url_to_origin(self.origin_url) + self.origin = Origin(url=self.origin_url) def prepare(self, *args, **kwargs): self.repo = dulwich.repo.Repo(self.directory) def iter_objects(self): object_store = self.repo.object_store for pack in object_store.packs: objs = list(pack.index.iterentries()) objs.sort(key=lambda x: x[1]) for sha, offset, crc32 in objs: yield hashutil.hash_to_bytehex(sha) yield from object_store._iter_loose_objects() yield from object_store._iter_alternate_objects() def _check(self, obj): """Check the object's repository representation. If any errors in check exists, an ObjectFormatException is raised. Args: obj (object): Dulwich object read from the repository. """ obj.check() from dulwich.objects import Commit, Tag try: # For additional checks on dulwich objects with date # for now, only checks on *time if isinstance(obj, Commit): commit_time = obj._commit_time utils.check_date_time(commit_time) author_time = obj._author_time utils.check_date_time(author_time) elif isinstance(obj, Tag): tag_time = obj._tag_time utils.check_date_time(tag_time) except Exception as e: raise ObjectFormatException(e) def get_object(self, oid): """Given an object id, return the object if it is found and not malformed in some way. Args: oid (bytes): the object's identifier Returns: The object if found without malformation """ try: # some errors are raised when reading the object obj = self.repo[oid] # some we need to check ourselves self._check(obj) except KeyError: _id = oid.decode('utf-8') self.log.warn('object %s not found, skipping' % _id, extra={ 'swh_type': 'swh_loader_git_missing_object', 'swh_object_id': _id, - 'origin_url': self.origin['url'], + 'origin_url': self.origin.url, }) return None except ObjectFormatException: _id = oid.decode('utf-8') self.log.warn('object %s malformed, skipping' % _id, extra={ 'swh_type': 'swh_loader_git_missing_object', 'swh_object_id': _id, - 'origin_url': self.origin['url'], + 'origin_url': self.origin.url, }) return None except EmptyFileException: _id = oid.decode('utf-8') self.log.warn('object %s corrupted (empty file), skipping' % _id, extra={ 'swh_type': 'swh_loader_git_missing_object', 'swh_object_id': _id, - 'origin_url': self.origin['url'], + 'origin_url': self.origin.url, }) else: return obj def fetch_data(self): """Fetch the data from the data source""" previous_visit = self.storage.origin_visit_get_latest( - self.origin['url'], require_snapshot=True) + self.origin.url, require_snapshot=True) if previous_visit: self.previous_snapshot_id = previous_visit['snapshot'] else: self.previous_snapshot_id = None type_to_ids = defaultdict(list) for oid in self.iter_objects(): obj = self.get_object(oid) if not obj: continue type_name = obj.type_name type_to_ids[type_name].append(oid) self.type_to_ids = type_to_ids def has_contents(self): """Checks whether we need to load contents""" return bool(self.type_to_ids[b'blob']) def get_content_ids(self): """Get the content identifiers from the git repository""" for oid in self.type_to_ids[b'blob']: yield converters.dulwich_blob_to_content_id(self.repo[oid]) def get_contents(self): """Get the contents that need to be loaded""" missing_contents = set(self.storage.content_missing( self.get_content_ids(), 'sha1_git')) for oid in missing_contents: yield converters.dulwich_blob_to_content( self.repo[hashutil.hash_to_bytehex(oid)]) def has_directories(self): """Checks whether we need to load directories""" return bool(self.type_to_ids[b'tree']) def get_directory_ids(self): """Get the directory identifiers from the git repository""" return (hashutil.hash_to_bytes(id.decode()) for id in self.type_to_ids[b'tree']) def get_directories(self): """Get the directories that need to be loaded""" missing_dirs = set(self.storage.directory_missing( sorted(self.get_directory_ids()))) for oid in missing_dirs: yield converters.dulwich_tree_to_directory( self.repo[hashutil.hash_to_bytehex(oid)], log=self.log) def has_revisions(self): """Checks whether we need to load revisions""" return bool(self.type_to_ids[b'commit']) def get_revision_ids(self): """Get the revision identifiers from the git repository""" return (hashutil.hash_to_bytes(id.decode()) for id in self.type_to_ids[b'commit']) def get_revisions(self): """Get the revisions that need to be loaded""" missing_revs = set(self.storage.revision_missing( sorted(self.get_revision_ids()))) for oid in missing_revs: yield converters.dulwich_commit_to_revision( self.repo[hashutil.hash_to_bytehex(oid)], log=self.log) def has_releases(self): """Checks whether we need to load releases""" return bool(self.type_to_ids[b'tag']) def get_release_ids(self): """Get the release identifiers from the git repository""" return (hashutil.hash_to_bytes(id.decode()) for id in self.type_to_ids[b'tag']) def get_releases(self): """Get the releases that need to be loaded""" missing_rels = set(self.storage.release_missing( sorted(self.get_release_ids()))) for oid in missing_rels: yield converters.dulwich_tag_to_release( self.repo[hashutil.hash_to_bytehex(oid)], log=self.log) def get_snapshot(self): """Turn the list of branches into a snapshot to load""" - branches = {} + branches: Dict[bytes, Optional[SnapshotBranch]] = {} for ref, target in self.repo.refs.as_dict().items(): obj = self.get_object(target) if obj: - branches[ref] = { - 'target': hashutil.bytehex_to_hash(target), - 'target_type': converters.DULWICH_TYPES[obj.type_name], - } + target_type = converters.DULWICH_TARGET_TYPES[obj.type_name] + branches[ref] = SnapshotBranch( + target=hashutil.bytehex_to_hash(target), + target_type=target_type, + ) else: branches[ref] = None for ref, target in self.repo.refs.get_symrefs().items(): - branches[ref] = { - 'target': target, - 'target_type': 'alias', - } + branches[ref] = SnapshotBranch( + target=target, + target_type=TargetType.ALIAS, + ) - self.snapshot = converters.branches_to_snapshot(branches) + self.snapshot = Snapshot(branches=branches) return self.snapshot def get_fetch_history_result(self): """Return the data to store in fetch_history for the current loader""" return { 'contents': len(self.type_to_ids[b'blob']), 'directories': len(self.type_to_ids[b'tree']), 'revisions': len(self.type_to_ids[b'commit']), 'releases': len(self.type_to_ids[b'tag']), } def save_data(self): """We already have the data locally, no need to save it""" pass def load_status(self): """The load was eventful if the current occurrences are different to the ones we retrieved at the beginning of the run""" eventful = False if self.previous_snapshot_id: - eventful = self.snapshot['id'] != self.previous_snapshot_id + eventful = self.snapshot.id != self.previous_snapshot_id else: - eventful = bool(self.snapshot['branches']) + eventful = bool(self.snapshot.branches) return {'status': ('eventful' if eventful else 'uneventful')} class GitLoaderFromArchive(GitLoaderFromDisk): """Load a git repository from an archive. This loader ingests a git repository compressed into an archive. The supported archive formats are ``.zip`` and ``.tar.gz``. From an input tarball named ``my-git-repo.zip``, the following layout is expected in it:: my-git-repo/ ├── .git │ ├── branches │ ├── COMMIT_EDITMSG │ ├── config │ ├── description │ ├── HEAD ... Nevertheless, the loader is able to ingest tarballs with the following layouts too:: . ├── .git │ ├── branches │ ├── COMMIT_EDITMSG │ ├── config │ ├── description │ ├── HEAD ... or:: other-repo-name/ ├── .git │ ├── branches │ ├── COMMIT_EDITMSG │ ├── config │ ├── description │ ├── HEAD ... """ def __init__(self, *args, archive_path, **kwargs): super().__init__(*args, **kwargs) self.temp_dir = self.repo_path = None self.archive_path = archive_path def project_name_from_archive(self, archive_path): """Compute the project name from the archive's path. """ archive_name = os.path.basename(archive_path) for ext in ('.zip', '.tar.gz', '.tgz'): if archive_name.lower().endswith(ext): archive_name = archive_name[:-len(ext)] break return archive_name def prepare(self, *args, **kwargs): """1. Uncompress the archive in temporary location. 2. Prepare as the GitLoaderFromDisk does 3. Load as GitLoaderFromDisk does """ project_name = self.project_name_from_archive(self.archive_path) self.temp_dir, self.repo_path = utils.init_git_repo_from_archive( project_name, self.archive_path) self.log.info('Project %s - Uncompressing archive %s at %s', self.origin_url, os.path.basename(self.archive_path), self.repo_path) self.directory = self.repo_path super().prepare(*args, **kwargs) def cleanup(self): """Cleanup the temporary location (if it exists). """ if self.temp_dir and os.path.exists(self.temp_dir): shutil.rmtree(self.temp_dir) self.log.info('Project %s - Done injecting %s' % ( self.origin_url, self.repo_path)) if __name__ == '__main__': import click import logging logging.basicConfig( level=logging.DEBUG, format='%(asctime)s %(process)d %(message)s' ) @click.command() @click.option('--origin-url', help='origin url') @click.option('--git-directory', help='Path to git repository to load') @click.option('--visit-date', default=None, help='Visit date') def main(origin_url, git_directory, visit_date): if not visit_date: visit_date = datetime.datetime.now(tz=datetime.timezone.utc) return GitLoaderFromDisk().load(origin_url, git_directory, visit_date) main() diff --git a/swh/loader/git/loader.py b/swh/loader/git/loader.py index afb8368..78ba4b1 100644 --- a/swh/loader/git/loader.py +++ b/swh/loader/git/loader.py @@ -1,523 +1,544 @@ # Copyright (C) 2016-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from collections import defaultdict import datetime -import dulwich.client +from io import BytesIO import logging import os import pickle import sys +from typing import Any, Dict, Iterable, Optional -from collections import defaultdict -from io import BytesIO +import dulwich.client from dulwich.object_store import ObjectStoreGraphWalker from dulwich.pack import PackData, PackInflater from swh.model import hashutil +from swh.model.model import ( + BaseContent, Directory, Origin, Revision, + Release, Snapshot, SnapshotBranch, TargetType, Sha1Git) from swh.loader.core.loader import DVCSLoader from swh.storage.algos.snapshot import snapshot_get_all_branches + from . import converters class RepoRepresentation: """Repository representation for a Software Heritage origin.""" def __init__(self, storage, base_snapshot=None, ignore_history=False): self.storage = storage self._parents_cache = {} self._type_cache = {} self.ignore_history = ignore_history if base_snapshot and not ignore_history: self.heads = set(self._cache_heads(base_snapshot)) else: self.heads = set() def _fill_parents_cache(self, commits): """When querying for a commit's parents, we fill the cache to a depth of 1000 commits.""" root_revs = self._encode_for_storage(commits) for rev, parents in self.storage.revision_shortlog(root_revs, 1000): rev_id = hashutil.hash_to_bytehex(rev) if rev_id not in self._parents_cache: self._parents_cache[rev_id] = [ hashutil.hash_to_bytehex(parent) for parent in parents ] for rev in commits: if rev not in self._parents_cache: self._parents_cache[rev] = [] def _cache_heads(self, base_snapshot): """Return all the known head commits for the given snapshot""" - _git_types = ['content', 'directory', 'revision', 'release'] + _git_types = list(converters.DULWICH_TARGET_TYPES.values()) if not base_snapshot: return [] snapshot_targets = set() - for target in base_snapshot['branches'].values(): - if target and target['target_type'] in _git_types: - snapshot_targets.add(target['target']) + for branch in base_snapshot.branches.values(): + if branch and branch.target_type in _git_types: + snapshot_targets.add(branch.target) decoded_targets = self._decode_from_storage(snapshot_targets) for id, objs in self.get_stored_objects(decoded_targets).items(): if not objs: logging.warn('Missing head: %s' % hashutil.hash_to_hex(id)) return [] return decoded_targets def get_parents(self, commit): """Bogus method to prevent expensive recursion, at the expense of less efficient downloading""" return [] def get_heads(self): return self.heads @staticmethod def _encode_for_storage(objects): return [hashutil.bytehex_to_hash(object) for object in objects] @staticmethod def _decode_from_storage(objects): return set(hashutil.hash_to_bytehex(object) for object in objects) def graph_walker(self): return ObjectStoreGraphWalker(self.get_heads(), self.get_parents) @staticmethod def filter_unwanted_refs(refs): """Filter the unwanted references from refs""" ret = {} for ref, val in refs.items(): if ref.endswith(b'^{}'): # Peeled refs make the git protocol explode continue elif ref.startswith(b'refs/pull/') and ref.endswith(b'/merge'): # We filter-out auto-merged GitHub pull requests continue else: ret[ref] = val return ret def determine_wants(self, refs): """Filter the remote references to figure out which ones Software Heritage needs. """ if not refs: return [] # Find what objects Software Heritage has refs = self.find_remote_ref_types_in_swh(refs) # Cache the objects found in swh as existing heads for target in refs.values(): if target['target_type'] is not None: self.heads.add(target['target']) ret = set() for target in self.filter_unwanted_refs(refs).values(): if target['target_type'] is None: # The target doesn't exist in Software Heritage, let's retrieve # it. ret.add(target['target']) return list(ret) def get_stored_objects(self, objects): """Find which of these objects were stored in the archive. Do the request in packets to avoid a server timeout. """ if self.ignore_history: return {} packet_size = 1000 ret = {} query = [] for object in objects: query.append(object) if len(query) >= packet_size: ret.update( self.storage.object_find_by_sha1_git( self._encode_for_storage(query) ) ) query = [] if query: ret.update( self.storage.object_find_by_sha1_git( self._encode_for_storage(query) ) ) return ret - def find_remote_ref_types_in_swh(self, remote_refs): + def find_remote_ref_types_in_swh( + self, remote_refs) -> Dict[bytes, Dict[str, Any]]: """Parse the remote refs information and list the objects that exist in Software Heritage. + + Returns: + dict whose keys are branch names, and values are tuples + `(target, target_type)`. """ all_objs = set(remote_refs.values()) - set(self._type_cache) type_by_id = {} for id, objs in self.get_stored_objects(all_objs).items(): id = hashutil.hash_to_bytehex(id) if objs: type_by_id[id] = objs[0]['type'] self._type_cache.update(type_by_id) ret = {} for ref, id in remote_refs.items(): ret[ref] = { 'target': id, 'target_type': self._type_cache.get(id), } return ret class GitLoader(DVCSLoader): """A bulk loader for a git repository""" CONFIG_BASE_FILENAME = 'loader/git' ADDITIONAL_CONFIG = { 'pack_size_bytes': ('int', 4 * 1024 * 1024 * 1024), } visit_type = 'git' def __init__(self, url, base_url=None, ignore_history=False, repo_representation=RepoRepresentation, config=None): """Initialize the bulk updater. Args: repo_representation: swh's repository representation which is in charge of filtering between known and remote data. """ super().__init__(logging_class='swh.loader.git.BulkLoader', config=config) self.origin_url = url self.base_url = base_url self.ignore_history = ignore_history self.repo_representation = repo_representation def fetch_pack_from_origin(self, origin_url, base_snapshot, do_activity): """Fetch a pack from the origin""" pack_buffer = BytesIO() base_repo = self.repo_representation( storage=self.storage, base_snapshot=base_snapshot, ignore_history=self.ignore_history, ) client, path = dulwich.client.get_transport_and_path(origin_url, thin_packs=False) size_limit = self.config['pack_size_bytes'] def do_pack(data): cur_size = pack_buffer.tell() would_write = len(data) if cur_size + would_write > size_limit: raise IOError('Pack file too big for repository %s, ' 'limit is %d bytes, current size is %d, ' 'would write %d' % (origin_url, size_limit, cur_size, would_write)) pack_buffer.write(data) pack_result = client.fetch_pack(path, base_repo.determine_wants, base_repo.graph_walker(), do_pack, progress=do_activity) remote_refs = pack_result.refs symbolic_refs = pack_result.symrefs if remote_refs: local_refs = base_repo.find_remote_ref_types_in_swh(remote_refs) else: local_refs = remote_refs = {} pack_buffer.flush() pack_size = pack_buffer.tell() pack_buffer.seek(0) return { 'remote_refs': base_repo.filter_unwanted_refs(remote_refs), 'local_refs': local_refs, 'symbolic_refs': symbolic_refs, 'pack_buffer': pack_buffer, 'pack_size': pack_size, } def list_pack(self, pack_data, pack_size): id_to_type = {} type_to_ids = defaultdict(set) inflater = self.get_inflater() for obj in inflater: type, id = obj.type_name, obj.id id_to_type[id] = type type_to_ids[type].add(id) return id_to_type, type_to_ids def prepare_origin_visit(self, *args, **kwargs): self.visit_date = datetime.datetime.now(tz=datetime.timezone.utc) - self.origin = converters.origin_url_to_origin(self.origin_url) + self.origin = Origin(url=self.origin_url) - def get_full_snapshot(self, origin_url): + def get_full_snapshot(self, origin_url) -> Optional[Snapshot]: visit = self.storage.origin_visit_get_latest( origin_url, require_snapshot=True) if visit and visit['snapshot']: - return snapshot_get_all_branches( + snapshot = snapshot_get_all_branches( self.storage, visit['snapshot']) else: + snapshot = None + if snapshot is None: return None + return Snapshot.from_dict(snapshot) def prepare(self, *args, **kwargs): - base_origin_url = origin_url = self.origin['url'] + base_origin_url = origin_url = self.origin.url prev_snapshot = None if not self.ignore_history: prev_snapshot = self.get_full_snapshot(origin_url) if self.base_url and not prev_snapshot: - base_origin = converters.origin_url_to_origin(self.base_url) + base_origin = Origin(url=self.base_url) base_origin = self.storage.origin_get(base_origin) if base_origin: base_origin_url = base_origin['url'] prev_snapshot = self.get_full_snapshot(base_origin_url) self.base_snapshot = prev_snapshot self.base_origin_url = base_origin_url def fetch_data(self): def do_progress(msg): sys.stderr.buffer.write(msg) sys.stderr.flush() fetch_info = self.fetch_pack_from_origin( - self.origin['url'], self.base_snapshot, + self.origin.url, self.base_snapshot, do_progress) self.pack_buffer = fetch_info['pack_buffer'] self.pack_size = fetch_info['pack_size'] self.remote_refs = fetch_info['remote_refs'] self.local_refs = fetch_info['local_refs'] self.symbolic_refs = fetch_info['symbolic_refs'] - origin_url = self.origin['url'] + origin_url = self.origin.url self.log.info('Listed %d refs for repo %s' % ( len(self.remote_refs), origin_url), extra={ 'swh_type': 'git_repo_list_refs', 'swh_repo': origin_url, 'swh_num_refs': len(self.remote_refs), }) # We want to load the repository, walk all the objects id_to_type, type_to_ids = self.list_pack(self.pack_buffer, self.pack_size) self.id_to_type = id_to_type self.type_to_ids = type_to_ids def save_data(self): """Store a pack for archival""" write_size = 8192 pack_dir = self.get_save_data_path() pack_name = "%s.pack" % self.visit_date.isoformat() refs_name = "%s.refs" % self.visit_date.isoformat() with open(os.path.join(pack_dir, pack_name), 'xb') as f: self.pack_buffer.seek(0) while True: r = self.pack_buffer.read(write_size) if not r: break f.write(r) self.pack_buffer.seek(0) with open(os.path.join(pack_dir, refs_name), 'xb') as f: pickle.dump(self.remote_refs, f) def get_inflater(self): """Reset the pack buffer and get an object inflater from it""" self.pack_buffer.seek(0) return PackInflater.for_pack_data( PackData.from_file(self.pack_buffer, self.pack_size)) def has_contents(self): return bool(self.type_to_ids[b'blob']) - def get_content_ids(self): + def get_content_ids(self) -> Iterable[Dict[str, Any]]: """Get the content identifiers from the git repository""" for raw_obj in self.get_inflater(): if raw_obj.type_name != b'blob': continue yield converters.dulwich_blob_to_content_id(raw_obj) - def get_contents(self): + def get_contents(self) -> Iterable[BaseContent]: """Format the blobs from the git repository as swh contents""" missing_contents = set(self.storage.content_missing( self.get_content_ids(), 'sha1_git')) for raw_obj in self.get_inflater(): if raw_obj.type_name != b'blob': continue if raw_obj.sha().digest() not in missing_contents: continue - yield converters.dulwich_blob_to_content(raw_obj) + yield converters.dulwich_blob_to_content( + raw_obj, max_content_size=self.max_content_size) - def has_directories(self): + def has_directories(self) -> bool: return bool(self.type_to_ids[b'tree']) - def get_directory_ids(self): + def get_directory_ids(self) -> Iterable[Sha1Git]: """Get the directory identifiers from the git repository""" return (hashutil.hash_to_bytes(id.decode()) for id in self.type_to_ids[b'tree']) - def get_directories(self): + def get_directories(self) -> Iterable[Directory]: """Format the trees as swh directories""" missing_dirs = set(self.storage.directory_missing( sorted(self.get_directory_ids()))) for raw_obj in self.get_inflater(): if raw_obj.type_name != b'tree': continue if raw_obj.sha().digest() not in missing_dirs: continue yield converters.dulwich_tree_to_directory(raw_obj, log=self.log) - def has_revisions(self): + def has_revisions(self) -> bool: return bool(self.type_to_ids[b'commit']) - def get_revision_ids(self): + def get_revision_ids(self) -> Iterable[Sha1Git]: """Get the revision identifiers from the git repository""" return (hashutil.hash_to_bytes(id.decode()) for id in self.type_to_ids[b'commit']) - def get_revisions(self): + def get_revisions(self) -> Iterable[Revision]: """Format commits as swh revisions""" missing_revs = set(self.storage.revision_missing( sorted(self.get_revision_ids()))) for raw_obj in self.get_inflater(): if raw_obj.type_name != b'commit': continue if raw_obj.sha().digest() not in missing_revs: continue yield converters.dulwich_commit_to_revision(raw_obj, log=self.log) - def has_releases(self): + def has_releases(self) -> bool: return bool(self.type_to_ids[b'tag']) - def get_release_ids(self): + def get_release_ids(self) -> Iterable[Sha1Git]: """Get the release identifiers from the git repository""" return (hashutil.hash_to_bytes(id.decode()) for id in self.type_to_ids[b'tag']) - def get_releases(self): + def get_releases(self) -> Iterable[Release]: """Retrieve all the release objects from the git repository""" missing_rels = set(self.storage.release_missing( sorted(self.get_release_ids()))) for raw_obj in self.get_inflater(): if raw_obj.type_name != b'tag': continue if raw_obj.sha().digest() not in missing_rels: continue yield converters.dulwich_tag_to_release(raw_obj, log=self.log) - def get_snapshot(self): - branches = {} + def get_snapshot(self) -> Snapshot: + branches: Dict[bytes, Optional[SnapshotBranch]] = {} for ref in self.remote_refs: ret_ref = self.local_refs[ref].copy() if not ret_ref['target_type']: target_type = self.id_to_type[ret_ref['target']] - ret_ref['target_type'] = converters.DULWICH_TYPES[target_type] + ret_ref['target_type'] = \ + converters.DULWICH_TARGET_TYPES[target_type] ret_ref['target'] = hashutil.bytehex_to_hash(ret_ref['target']) - branches[ref] = ret_ref + branches[ref] = SnapshotBranch( + target_type=ret_ref['target_type'], + target=ret_ref['target'], + ) for ref, target in self.symbolic_refs.items(): - branches[ref] = {'target_type': 'alias', 'target': target} + branches[ref] = SnapshotBranch( + target_type=TargetType.ALIAS, + target=target, + ) - self.snapshot = converters.branches_to_snapshot(branches) + self.snapshot = Snapshot(branches=branches) return self.snapshot - def get_fetch_history_result(self): + def get_fetch_history_result(self) -> Dict[str, int]: return { 'contents': len(self.type_to_ids[b'blob']), 'directories': len(self.type_to_ids[b'tree']), 'revisions': len(self.type_to_ids[b'commit']), 'releases': len(self.type_to_ids[b'tag']), } - def load_status(self): + def load_status(self) -> Dict[str, Any]: """The load was eventful if the current snapshot is different to the one we retrieved at the beginning of the run""" eventful = False if self.base_snapshot: - eventful = self.snapshot['id'] != self.base_snapshot['id'] + eventful = self.snapshot.id != self.base_snapshot.id else: - eventful = bool(self.snapshot['branches']) + eventful = bool(self.snapshot.branches) return {'status': ('eventful' if eventful else 'uneventful')} if __name__ == '__main__': import click logging.basicConfig( level=logging.DEBUG, format='%(asctime)s %(process)d %(message)s' ) @click.command() @click.option('--origin-url', help='Origin url', required=True) @click.option('--base-url', default=None, help='Optional Base url') @click.option('--ignore-history/--no-ignore-history', help='Ignore the repository history', default=False) def main(origin_url, base_url, ignore_history): loader = GitLoader( origin_url, base_url=base_url, ignore_history=ignore_history, ) return loader.load() main() diff --git a/swh/loader/git/tests/__init__.py b/swh/loader/git/tests/__init__.py index 3f142b9..fc649d4 100644 --- a/swh/loader/git/tests/__init__.py +++ b/swh/loader/git/tests/__init__.py @@ -1,34 +1,31 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information TEST_LOADER_CONFIG = { 'storage': { 'cls': 'pipeline', 'steps': [ - { - 'cls': 'validate', - }, { 'cls': 'filter' }, { 'cls': 'buffer', 'min_batch_size': { 'content': 10, 'content_bytes': 100 * 1024 * 1024, 'directory': 10, 'revision': 10, 'release': 10, }, }, { 'cls': 'memory' }, ] }, 'max_content_size': 100 * 1024 * 1024, 'pack_size_bytes': 4 * 1024 * 1024 * 1024, 'save_data': False, } diff --git a/swh/loader/git/tests/test_converters.py b/swh/loader/git/tests/test_converters.py index 58c1efc..ab794e8 100644 --- a/swh/loader/git/tests/test_converters.py +++ b/swh/loader/git/tests/test_converters.py @@ -1,313 +1,325 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import pytest import shutil import subprocess import tempfile import unittest import dulwich.repo -import swh.loader.git.converters as converters from swh.model.hashutil import bytehex_to_hash, hash_to_bytes +from swh.model.model import ( + Content, Person, Release, Revision, RevisionType, ObjectType, + Timestamp, TimestampWithTimezone, +) + +import swh.loader.git.converters as converters TEST_DATA = os.path.join(os.path.dirname(__file__), 'data') -class SWHTargetType: - """Dulwich lookalike TargetType class +class SWHObjectType: + """Dulwich lookalike ObjectType class """ def __init__(self, type_name): self.type_name = type_name class SWHTag: """Dulwich lookalike tag class """ def __init__(self, name, type_name, target, target_type, tagger, tag_time, tag_timezone, message): self.name = name self.type_name = type_name - self.object = SWHTargetType(target_type), target + self.object = SWHObjectType(target_type), target self.tagger = tagger self._message = message self.tag_time = tag_time self.tag_timezone = tag_timezone self._tag_timezone_neg_utc = False def sha(self): from hashlib import sha1 return sha1() @pytest.mark.fs class TestConverters(unittest.TestCase): @classmethod def setUpClass(cls): super().setUpClass() cls.repo_path = tempfile.mkdtemp() cls.repo = dulwich.repo.Repo.init_bare(cls.repo_path) fast_export = os.path.join( TEST_DATA, 'git-repos', 'example-submodule.fast-export.xz') xz = subprocess.Popen( ['xzcat'], stdin=open(fast_export, 'rb'), stdout=subprocess.PIPE, ) git = subprocess.Popen( ['git', 'fast-import', '--quiet'], stdin=xz.stdout, cwd=cls.repo_path, ) # flush stdout of xz xz.stdout.close() git.communicate() @classmethod def tearDownClass(cls): super().tearDownClass() shutil.rmtree(cls.repo_path) - def setUp(self): - super().setUp() - - self.blob_id = b'28c6f4023d65f74e3b59a2dea3c4277ed9ee07b0' - self.blob = { - 'sha1_git': bytehex_to_hash(self.blob_id), - 'sha1': hash_to_bytes('4850a3420a2262ff061cb296fb915430fa92301c'), - 'sha256': hash_to_bytes('fee7c8a485a10321ad94b64135073cb5' - '5f22cb9f57fa2417d2adfb09d310adef'), - 'blake2s256': hash_to_bytes('5d71873f42a137f6d89286e43677721e574' - '1fa05ce4cd5e3c7ea7c44d4c2d10b'), - 'data': (b'[submodule "example-dependency"]\n' - b'\tpath = example-dependency\n' - b'\turl = https://github.com/githubtraining/' - b'example-dependency.git\n'), - 'length': 124, - 'status': 'visible', - } - def test_blob_to_content(self): - content = converters.dulwich_blob_to_content(self.repo[self.blob_id]) - self.assertEqual(self.blob, content) + content_id = b'28c6f4023d65f74e3b59a2dea3c4277ed9ee07b0' + content = converters.dulwich_blob_to_content(self.repo[content_id]) + expected_content = Content( + sha1_git=bytehex_to_hash(content_id), + sha1=hash_to_bytes('4850a3420a2262ff061cb296fb915430fa92301c'), + sha256=hash_to_bytes('fee7c8a485a10321ad94b64135073cb5' + '5f22cb9f57fa2417d2adfb09d310adef'), + blake2s256=hash_to_bytes('5d71873f42a137f6d89286e43677721e574' + '1fa05ce4cd5e3c7ea7c44d4c2d10b'), + data=(b'[submodule "example-dependency"]\n' + b'\tpath = example-dependency\n' + b'\turl = https://github.com/githubtraining/' + b'example-dependency.git\n'), + length=124, + status='visible', + ) + self.assertEqual(content, expected_content) def test_convertion_wrong_input(self): class Something: type_name = b'something-not-the-right-type' m = { 'blob': converters.dulwich_blob_to_content, 'blob2': converters.dulwich_blob_to_content_id, 'tree': converters.dulwich_tree_to_directory, 'commit': converters.dulwich_tree_to_directory, 'tag': converters.dulwich_tag_to_release, } for _callable in m.values(): - self.assertIsNone(_callable(Something())) + with self.assertRaises(ValueError): + _callable(Something()) def test_commit_to_revision(self): sha1 = b'9768d0b576dbaaecd80abedad6dfd0d72f1476da' revision = converters.dulwich_commit_to_revision(self.repo[sha1]) - expected_revision = { - 'id': hash_to_bytes('9768d0b576dbaaecd80abedad6dfd0d72f1476da'), - 'directory': b'\xf0i\\./\xa7\xce\x9dW@#\xc3A7a\xa4s\xe5\x00\xca', - 'type': 'git', - 'committer': { - 'name': b'Stefano Zacchiroli', - 'fullname': b'Stefano Zacchiroli ', - 'email': b'zack@upsilon.cc', - }, - 'author': { - 'name': b'Stefano Zacchiroli', - 'fullname': b'Stefano Zacchiroli ', - 'email': b'zack@upsilon.cc', - }, - 'committer_date': { - 'negative_utc': None, - 'timestamp': 1443083765, - 'offset': 120, - }, - 'message': b'add submodule dependency\n', - 'metadata': None, - 'date': { - 'negative_utc': None, - 'timestamp': 1443083765, - 'offset': 120, - }, - 'parents': [ + expected_revision = Revision( + id=hash_to_bytes('9768d0b576dbaaecd80abedad6dfd0d72f1476da'), + directory=b'\xf0i\\./\xa7\xce\x9dW@#\xc3A7a\xa4s\xe5\x00\xca', + type=RevisionType.GIT, + committer=Person( + name=b'Stefano Zacchiroli', + fullname=b'Stefano Zacchiroli ', + email=b'zack@upsilon.cc', + ), + author=Person( + name=b'Stefano Zacchiroli', + fullname=b'Stefano Zacchiroli ', + email=b'zack@upsilon.cc', + ), + committer_date=TimestampWithTimezone( + timestamp=Timestamp( + seconds=1443083765, + microseconds=0, + ), + negative_utc=None, + offset=120, + ), + message=b'add submodule dependency\n', + metadata=None, + date=TimestampWithTimezone( + timestamp=Timestamp( + seconds=1443083765, + microseconds=0, + ), + negative_utc=None, + offset=120, + ), + parents=[ b'\xc3\xc5\x88q23`\x9f[\xbb\xb2\xd9\xe7\xf3\xfbJf\x0f?r' ], - 'synthetic': False, - } + synthetic=False, + ) self.assertEqual(revision, expected_revision) def test_author_line_to_author(self): # edge case out of the way - self.assertIsNone(converters.parse_author(None)) + with self.assertRaises(ValueError): + converters.parse_author(None) tests = { - b'a ': { - 'name': b'a', - 'email': b'b@c.com', - 'fullname': b'a ', - }, - b'': { - 'name': None, - 'email': b'foo@bar.com', - 'fullname': b'', - }, - b'malformed ': { - 'name': b'trailing', - 'email': b'sp@c.e', - 'fullname': b'trailing ', - }, - b'no': { - 'name': b'no', - 'email': b'sp@c.e', - 'fullname': b'no', - }, - b' <>': { - 'name': b'', - 'email': b'', - 'fullname': b' <>', - }, - b'something': { - 'name': None, - 'email': None, - 'fullname': b'something' - } + b'a ': Person( + name=b'a', + email=b'b@c.com', + fullname=b'a ', + ), + b'': Person( + name=None, + email=b'foo@bar.com', + fullname=b'', + ), + b'malformed ': Person( + name=b'trailing', + email=b'sp@c.e', + fullname=b'trailing ', + ), + b'no': Person( + name=b'no', + email=b'sp@c.e', + fullname=b'no', + ), + b' <>': Person( + name=b'', + email=b'', + fullname=b' <>', + ), + b'something': Person( + name=None, + email=None, + fullname=b'something' + ) } for author in sorted(tests): parsed_author = tests[author] self.assertEqual(parsed_author, converters.parse_author(author)) def test_dulwich_tag_to_release_no_author_no_date(self): target = b'641fb6e08ddb2e4fd096dcf18e80b894bf' message = b'some release message' tag = SWHTag(name='blah', type_name=b'tag', target=target, target_type=b'commit', message=message, tagger=None, tag_time=None, tag_timezone=None) # when actual_release = converters.dulwich_tag_to_release(tag) # then - expected_release = { - 'author': None, - 'date': None, - 'id': b'\xda9\xa3\xee^kK\r2U\xbf\xef\x95`\x18\x90\xaf\xd8\x07\t', - 'message': message, - 'metadata': None, - 'name': 'blah', - 'synthetic': False, - 'target': hash_to_bytes(target.decode()), - 'target_type': 'revision' - } + expected_release = Release( + author=None, + date=None, + id=b'\xda9\xa3\xee^kK\r2U\xbf\xef\x95`\x18\x90\xaf\xd8\x07\t', + message=message, + metadata=None, + name='blah', + synthetic=False, + target=hash_to_bytes(target.decode()), + target_type=ObjectType.REVISION, + ) self.assertEqual(actual_release, expected_release) def test_dulwich_tag_to_release_author_and_date(self): tagger = b'hey dude ' target = b'641fb6e08ddb2e4fd096dcf18e80b894bf' message = b'some release message' import datetime date = datetime.datetime( 2007, 12, 5, tzinfo=datetime.timezone.utc ).timestamp() tag = SWHTag(name='blah', type_name=b'tag', target=target, target_type=b'commit', message=message, tagger=tagger, tag_time=date, tag_timezone=0) # when actual_release = converters.dulwich_tag_to_release(tag) # then - expected_release = { - 'author': { - 'email': b'hello@mail.org', - 'fullname': b'hey dude ', - 'name': b'hey dude' - }, - 'date': { - 'negative_utc': False, - 'offset': 0, - 'timestamp': 1196812800.0 - }, - 'id': b'\xda9\xa3\xee^kK\r2U\xbf\xef\x95`\x18\x90\xaf\xd8\x07\t', - 'message': message, - 'metadata': None, - 'name': 'blah', - 'synthetic': False, - 'target': hash_to_bytes(target.decode()), - 'target_type': 'revision' - } + expected_release = Release( + author=Person( + email=b'hello@mail.org', + fullname=b'hey dude ', + name=b'hey dude' + ), + date=TimestampWithTimezone( + negative_utc=False, + offset=0, + timestamp=Timestamp( + seconds=1196812800, + microseconds=0, + ) + ), + id=b'\xda9\xa3\xee^kK\r2U\xbf\xef\x95`\x18\x90\xaf\xd8\x07\t', + message=message, + metadata=None, + name='blah', + synthetic=False, + target=hash_to_bytes(target.decode()), + target_type=ObjectType.REVISION, + ) self.assertEqual(actual_release, expected_release) def test_dulwich_tag_to_release_author_no_date(self): # to reproduce bug T815 (fixed) tagger = b'hey dude ' target = b'641fb6e08ddb2e4fd096dcf18e80b894bf' message = b'some release message' tag = SWHTag(name='blah', type_name=b'tag', target=target, target_type=b'commit', message=message, tagger=tagger, tag_time=None, tag_timezone=None) # when actual_release = converters.dulwich_tag_to_release(tag) # then - expected_release = { - 'author': { - 'email': b'hello@mail.org', - 'fullname': b'hey dude ', - 'name': b'hey dude' - }, - 'date': None, - 'id': b'\xda9\xa3\xee^kK\r2U\xbf\xef\x95`\x18\x90\xaf\xd8\x07\t', - 'message': message, - 'metadata': None, - 'name': 'blah', - 'synthetic': False, - 'target': hash_to_bytes(target.decode()), - 'target_type': 'revision' - } + expected_release = Release( + author=Person( + email=b'hello@mail.org', + fullname=b'hey dude ', + name=b'hey dude' + ), + date=None, + id=b'\xda9\xa3\xee^kK\r2U\xbf\xef\x95`\x18\x90\xaf\xd8\x07\t', + message=message, + metadata=None, + name='blah', + synthetic=False, + target=hash_to_bytes(target.decode()), + target_type=ObjectType.REVISION, + ) self.assertEqual(actual_release, expected_release)