Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/git/loader.py
# Copyright (C) 2016-2020 The Software Heritage developers | # Copyright (C) 2016-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from collections import defaultdict | from collections import defaultdict | ||||
from dataclasses import dataclass | |||||
import datetime | import datetime | ||||
from io import BytesIO | from io import BytesIO | ||||
import logging | import logging | ||||
import os | import os | ||||
import pickle | import pickle | ||||
import sys | import sys | ||||
from typing import Any, Dict, Iterable, List, Optional, Union | from typing import Any, Dict, Iterable, List, Optional, Set | ||||
import dulwich.client | import dulwich.client | ||||
from dulwich.object_store import ObjectStoreGraphWalker | from dulwich.object_store import ObjectStoreGraphWalker | ||||
from dulwich.pack import PackData, PackInflater | from dulwich.pack import PackData, PackInflater | ||||
from swh.model import hashutil | from swh.model import hashutil | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
BaseContent, | BaseContent, | ||||
Directory, | Directory, | ||||
Origin, | Origin, | ||||
Revision, | Revision, | ||||
Release, | Release, | ||||
Snapshot, | Snapshot, | ||||
SnapshotBranch, | SnapshotBranch, | ||||
TargetType, | TargetType, | ||||
Sha1Git, | Sha1Git, | ||||
) | ) | ||||
from swh.loader.core.loader import DVCSLoader | from swh.loader.core.loader import DVCSLoader | ||||
from swh.storage.algos.snapshot import snapshot_get_all_branches | from swh.storage.algos.snapshot import snapshot_get_all_branches | ||||
from . import converters | from . import converters | ||||
def ignore_branch_name(branch_name: bytes) -> bool: | |||||
"""Should the git loader ignore the branch named `branch_name`?""" | |||||
if branch_name.endswith(b"^{}"): | |||||
# Peeled refs make the git protocol explode | |||||
return True | |||||
elif branch_name.startswith(b"refs/pull/") and branch_name.endswith(b"/merge"): | |||||
# We filter-out auto-merged GitHub pull requests | |||||
return True | |||||
return False | |||||
def filter_refs(refs: Dict[bytes, bytes]) -> Dict[bytes, bytes]: | |||||
"""Filter the refs dictionary using the policy set in `ignore_branch_name`""" | |||||
return { | |||||
name: target for name, target in refs.items() if not ignore_branch_name(name) | |||||
} | |||||
class RepoRepresentation: | class RepoRepresentation: | ||||
"""Repository representation for a Software Heritage origin.""" | """Repository representation for a Software Heritage origin.""" | ||||
def __init__(self, storage, base_snapshot=None, ignore_history=False): | def __init__( | ||||
self, storage, base_snapshot: Optional[Snapshot] = None, ignore_history=False | |||||
): | |||||
self.storage = storage | self.storage = storage | ||||
self._parents_cache = {} | |||||
self._type_cache: Dict[bytes, TargetType] = {} | |||||
self.ignore_history = ignore_history | self.ignore_history = ignore_history | ||||
if base_snapshot and not ignore_history: | if base_snapshot and not ignore_history: | ||||
self.heads = set(self._cache_heads(base_snapshot)) | self.base_snapshot: Snapshot = base_snapshot | ||||
else: | else: | ||||
self.heads = set() | self.base_snapshot = Snapshot(branches={}) | ||||
def _fill_parents_cache(self, commits): | |||||
"""When querying for a commit's parents, we fill the cache to a depth of 1000 | |||||
commits.""" | |||||
root_revs = self._encode_for_storage(commits) | |||||
for rev, parents in self.storage.revision_shortlog(root_revs, 1000): | |||||
rev_id = hashutil.hash_to_bytehex(rev) | |||||
if rev_id not in self._parents_cache: | |||||
self._parents_cache[rev_id] = [ | |||||
hashutil.hash_to_bytehex(parent) for parent in parents | |||||
] | |||||
for rev in commits: | |||||
if rev not in self._parents_cache: | |||||
self._parents_cache[rev] = [] | |||||
def _cache_heads(self, base_snapshot): | |||||
"""Return all the known head commits for the given snapshot""" | |||||
_git_types = list(converters.DULWICH_TARGET_TYPES.values()) | |||||
if not base_snapshot: | |||||
return [] | |||||
snapshot_targets = set() | |||||
for branch in base_snapshot.branches.values(): | |||||
if branch and branch.target_type in _git_types: | |||||
snapshot_targets.add(branch.target) | |||||
decoded_targets = self._decode_from_storage(snapshot_targets) | |||||
for id, objs in self.get_stored_objects(decoded_targets).items(): | |||||
if not objs: | |||||
logging.warn("Missing head: %s" % hashutil.hash_to_hex(id)) | |||||
return [] | |||||
return decoded_targets | self.heads: Set[bytes] = set() | ||||
def get_parents(self, commit): | def get_parents(self, commit: bytes) -> List[bytes]: | ||||
"""Bogus method to prevent expensive recursion, at the expense of less | """This method should return the list of known parents""" | ||||
efficient downloading""" | |||||
return [] | return [] | ||||
def get_heads(self): | def graph_walker(self) -> ObjectStoreGraphWalker: | ||||
return self.heads | return ObjectStoreGraphWalker(self.heads, self.get_parents) | ||||
@staticmethod | def determine_wants(self, refs: Dict[bytes, bytes]) -> List[bytes]: | ||||
def _encode_for_storage(objects): | """Get the list of bytehex sha1s that the git loader should fetch. | ||||
return [hashutil.bytehex_to_hash(object) for object in objects] | |||||
@staticmethod | |||||
def _decode_from_storage(objects): | |||||
return set(hashutil.hash_to_bytehex(object) for object in objects) | |||||
def graph_walker(self): | |||||
return ObjectStoreGraphWalker(self.get_heads(), self.get_parents) | |||||
@staticmethod | |||||
def filter_unwanted_refs(refs): | |||||
"""Filter the unwanted references from refs""" | |||||
ret = {} | |||||
for ref, val in refs.items(): | |||||
if ref.endswith(b"^{}"): | |||||
# Peeled refs make the git protocol explode | |||||
continue | |||||
elif ref.startswith(b"refs/pull/") and ref.endswith(b"/merge"): | |||||
# We filter-out auto-merged GitHub pull requests | |||||
continue | |||||
else: | |||||
ret[ref] = val | |||||
return ret | This compares the remote refs sent by the server with the base snapshot | ||||
provided by the loader. | |||||
def determine_wants(self, refs): | |||||
"""Filter the remote references to figure out which ones | |||||
Software Heritage needs. | |||||
""" | """ | ||||
if not refs: | if not refs: | ||||
return [] | return [] | ||||
# Find what objects Software Heritage has | # Cache existing heads | ||||
refs = self.find_remote_ref_types_in_swh(refs) | local_heads: Set[bytes] = set() | ||||
for branch_name, branch in self.base_snapshot.branches.items(): | |||||
# Cache the objects found in swh as existing heads | if not branch or branch.target_type == TargetType.ALIAS: | ||||
for target in refs.values(): | continue | ||||
if target["target_type"] is not None: | local_heads.add(hashutil.hash_to_hex(branch.target).encode()) | ||||
self.heads.add(target["target"]) | |||||
ret = set() | |||||
for target in self.filter_unwanted_refs(refs).values(): | |||||
if target["target_type"] is None: | |||||
# The target doesn't exist in Software Heritage, let's retrieve | |||||
# it. | |||||
ret.add(target["target"]) | |||||
return list(ret) | |||||
def _get_stored_objects_batch( | |||||
self, query | |||||
) -> Dict[bytes, List[Dict[str, Union[bytes, TargetType]]]]: | |||||
results = self.storage.object_find_by_sha1_git(self._encode_for_storage(query)) | |||||
ret: Dict[bytes, List[Dict[str, Union[bytes, TargetType]]]] = {} | |||||
for (id, objects) in results.items(): | |||||
assert id not in ret | |||||
ret[id] = [ | |||||
{"sha1_git": obj["sha1_git"], "type": TargetType(obj["type"]),} | |||||
for obj in objects | |||||
] | |||||
return ret | |||||
def get_stored_objects( | |||||
self, objects | |||||
) -> Dict[bytes, List[Dict[str, Union[bytes, TargetType]]]]: | |||||
"""Find which of these objects were stored in the archive. | |||||
Do the request in packets to avoid a server timeout. | self.heads = local_heads | ||||
""" | |||||
if self.ignore_history: | |||||
return {} | |||||
packet_size = 1000 | # Get the remote heads that we want to fetch | ||||
remote_heads: Set[bytes] = set() | |||||
for ref_name, ref_target in refs.items(): | |||||
if ignore_branch_name(ref_name): | |||||
continue | |||||
remote_heads.add(ref_target) | |||||
ret: Dict[bytes, List[Dict[str, Union[bytes, TargetType]]]] = {} | return list(remote_heads - local_heads) | ||||
query = [] | |||||
for object in objects: | |||||
query.append(object) | |||||
if len(query) >= packet_size: | |||||
ret.update(self._get_stored_objects_batch(query)) | |||||
query = [] | |||||
if query: | |||||
ret.update(self._get_stored_objects_batch(query)) | |||||
return ret | |||||
def find_remote_ref_types_in_swh(self, remote_refs) -> Dict[bytes, Dict[str, Any]]: | |||||
"""Parse the remote refs information and list the objects that exist in | |||||
Software Heritage. | |||||
Returns: | |||||
dict whose keys are branch names, and values are tuples | |||||
`(target, target_type)`. | |||||
""" | |||||
all_objs = set(remote_refs.values()) - set(self._type_cache) | |||||
type_by_id: Dict[bytes, TargetType] = {} | |||||
for id, objs in self.get_stored_objects(all_objs).items(): | @dataclass | ||||
id = hashutil.hash_to_bytehex(id) | class FetchPackReturn: | ||||
if objs: | remote_refs: Dict[bytes, bytes] | ||||
type_ = objs[0]["type"] | symbolic_refs: Dict[bytes, bytes] | ||||
assert isinstance(type_, TargetType) | pack_buffer: BytesIO | ||||
type_by_id[id] = type_ | pack_size: int | ||||
self._type_cache.update(type_by_id) | |||||
ret = {} | |||||
for ref, id in remote_refs.items(): | |||||
ret[ref] = { | |||||
"target": id, | |||||
"target_type": self._type_cache.get(id), | |||||
} | |||||
return ret | |||||
class GitLoader(DVCSLoader): | class GitLoader(DVCSLoader): | ||||
"""A bulk loader for a git repository""" | """A bulk loader for a git repository""" | ||||
CONFIG_BASE_FILENAME = "loader/git" | CONFIG_BASE_FILENAME = "loader/git" | ||||
ADDITIONAL_CONFIG = { | ADDITIONAL_CONFIG = { | ||||
Show All 18 Lines | ): | ||||
data. | data. | ||||
""" | """ | ||||
super().__init__(logging_class="swh.loader.git.BulkLoader", config=config) | super().__init__(logging_class="swh.loader.git.BulkLoader", config=config) | ||||
self.origin_url = url | self.origin_url = url | ||||
self.base_url = base_url | self.base_url = base_url | ||||
self.ignore_history = ignore_history | self.ignore_history = ignore_history | ||||
self.repo_representation = repo_representation | self.repo_representation = repo_representation | ||||
# state initialized in fetch_data | # state initialized in fetch_data | ||||
self.remote_refs = [] | self.remote_refs = [] | ||||
self.local_refs = {} | |||||
self.symbolic_refs = {} | self.symbolic_refs = {} | ||||
def fetch_pack_from_origin(self, origin_url, base_snapshot, do_activity): | def fetch_pack_from_origin(self, origin_url, base_snapshot, do_activity): | ||||
"""Fetch a pack from the origin""" | """Fetch a pack from the origin""" | ||||
pack_buffer = BytesIO() | pack_buffer = BytesIO() | ||||
base_repo = self.repo_representation( | base_repo = self.repo_representation( | ||||
storage=self.storage, | storage=self.storage, | ||||
Show All 22 Lines | def fetch_pack_from_origin(self, origin_url, base_snapshot, do_activity): | ||||
pack_result = client.fetch_pack( | pack_result = client.fetch_pack( | ||||
path, | path, | ||||
base_repo.determine_wants, | base_repo.determine_wants, | ||||
base_repo.graph_walker(), | base_repo.graph_walker(), | ||||
do_pack, | do_pack, | ||||
progress=do_activity, | progress=do_activity, | ||||
) | ) | ||||
remote_refs = pack_result.refs | remote_refs = pack_result.refs or {} | ||||
symbolic_refs = pack_result.symrefs | symbolic_refs = pack_result.symrefs or {} | ||||
if remote_refs: | |||||
local_refs = base_repo.find_remote_ref_types_in_swh(remote_refs) | |||||
else: | |||||
local_refs = remote_refs = {} | |||||
pack_buffer.flush() | pack_buffer.flush() | ||||
pack_size = pack_buffer.tell() | pack_size = pack_buffer.tell() | ||||
pack_buffer.seek(0) | pack_buffer.seek(0) | ||||
return { | return FetchPackReturn( | ||||
"remote_refs": base_repo.filter_unwanted_refs(remote_refs), | remote_refs=filter_refs(remote_refs), | ||||
"local_refs": local_refs, | symbolic_refs=filter_refs(symbolic_refs), | ||||
"symbolic_refs": symbolic_refs, | pack_buffer=pack_buffer, | ||||
"pack_buffer": pack_buffer, | pack_size=pack_size, | ||||
"pack_size": pack_size, | ) | ||||
} | |||||
def list_pack(self, pack_data, pack_size): | def list_pack(self, pack_data, pack_size): | ||||
id_to_type = {} | id_to_type = {} | ||||
type_to_ids = defaultdict(set) | type_to_ids = defaultdict(set) | ||||
inflater = self.get_inflater() | inflater = self.get_inflater() | ||||
for obj in inflater: | for obj in inflater: | ||||
Show All 20 Lines | class GitLoader(DVCSLoader): | ||||
def prepare(self, *args, **kwargs): | def prepare(self, *args, **kwargs): | ||||
base_origin_url = origin_url = self.origin.url | base_origin_url = origin_url = self.origin.url | ||||
prev_snapshot = None | prev_snapshot = None | ||||
if not self.ignore_history: | if not self.ignore_history: | ||||
prev_snapshot = self.get_full_snapshot(origin_url) | prev_snapshot = self.get_full_snapshot(origin_url) | ||||
if self.base_url and not prev_snapshot: | if self.base_url and prev_snapshot is None: | ||||
base_origin = Origin(url=self.base_url) | base_origin = Origin(url=self.base_url) | ||||
base_origin = self.storage.origin_get(base_origin) | base_origin = self.storage.origin_get(base_origin) | ||||
if base_origin: | if base_origin: | ||||
base_origin_url = base_origin["url"] | base_origin_url = base_origin.url | ||||
prev_snapshot = self.get_full_snapshot(base_origin_url) | prev_snapshot = self.get_full_snapshot(base_origin_url) | ||||
if prev_snapshot is not None: | |||||
self.base_snapshot = prev_snapshot | self.base_snapshot = prev_snapshot | ||||
self.base_origin_url = base_origin_url | else: | ||||
self.base_snapshot = Snapshot(branches={}) | |||||
def fetch_data(self): | def fetch_data(self): | ||||
def do_progress(msg): | def do_progress(msg): | ||||
sys.stderr.buffer.write(msg) | sys.stderr.buffer.write(msg) | ||||
sys.stderr.flush() | sys.stderr.flush() | ||||
fetch_info = self.fetch_pack_from_origin( | fetch_info = self.fetch_pack_from_origin( | ||||
self.origin.url, self.base_snapshot, do_progress | self.origin.url, self.base_snapshot, do_progress | ||||
) | ) | ||||
self.pack_buffer = fetch_info["pack_buffer"] | self.pack_buffer = fetch_info.pack_buffer | ||||
self.pack_size = fetch_info["pack_size"] | self.pack_size = fetch_info.pack_size | ||||
self.remote_refs = fetch_info["remote_refs"] | self.remote_refs = fetch_info.remote_refs | ||||
self.local_refs = fetch_info["local_refs"] | self.symbolic_refs = fetch_info.symbolic_refs | ||||
self.symbolic_refs = fetch_info["symbolic_refs"] | |||||
origin_url = self.origin.url | origin_url = self.origin.url | ||||
self.log.info( | self.log.info( | ||||
"Listed %d refs for repo %s" % (len(self.remote_refs), origin_url), | "Listed %d refs for repo %s" % (len(self.remote_refs), origin_url), | ||||
extra={ | extra={ | ||||
"swh_type": "git_repo_list_refs", | "swh_type": "git_repo_list_refs", | ||||
"swh_repo": origin_url, | "swh_repo": origin_url, | ||||
"swh_num_refs": len(self.remote_refs), | "swh_num_refs": len(self.remote_refs), | ||||
}, | }, | ||||
) | ) | ||||
# We want to load the repository, walk all the objects | # We want to load the repository, walk all the objects | ||||
id_to_type, type_to_ids = self.list_pack(self.pack_buffer, self.pack_size) | id_to_type, type_to_ids = self.list_pack(self.pack_buffer, self.pack_size) | ||||
self.id_to_type = id_to_type | self.id_to_type = id_to_type | ||||
self.type_to_ids = type_to_ids | self.type_to_ids = type_to_ids | ||||
# No more data to fetch | |||||
return False | |||||
def save_data(self): | def save_data(self): | ||||
"""Store a pack for archival""" | """Store a pack for archival""" | ||||
write_size = 8192 | write_size = 8192 | ||||
pack_dir = self.get_save_data_path() | pack_dir = self.get_save_data_path() | ||||
pack_name = "%s.pack" % self.visit_date.isoformat() | pack_name = "%s.pack" % self.visit_date.isoformat() | ||||
refs_name = "%s.refs" % self.visit_date.isoformat() | refs_name = "%s.refs" % self.visit_date.isoformat() | ||||
▲ Show 20 Lines • Show All 108 Lines • ▼ Show 20 Lines | def get_releases(self) -> Iterable[Release]: | ||||
continue | continue | ||||
if raw_obj.sha().digest() not in missing_rels: | if raw_obj.sha().digest() not in missing_rels: | ||||
continue | continue | ||||
yield converters.dulwich_tag_to_release(raw_obj, log=self.log) | yield converters.dulwich_tag_to_release(raw_obj, log=self.log) | ||||
def get_snapshot(self) -> Snapshot: | def get_snapshot(self) -> Snapshot: | ||||
branches: Dict[bytes, Optional[SnapshotBranch]] = {} | """Get the snapshot for the current visit. | ||||
for ref in self.remote_refs: | The main complexity of this function is mapping target objects to their | ||||
ret_ref = self.local_refs[ref].copy() | types, as the `refs` dictionaries returned by the git server only give | ||||
if not ret_ref["target_type"]: | us the identifiers for the target objects, and not their types. | ||||
target_type = self.id_to_type[ret_ref["target"]] | |||||
ret_ref["target_type"] = converters.DULWICH_TARGET_TYPES[target_type] | The loader itself only knows the types of the objects that it has | ||||
fetched from the server (as it has parsed them while loading them to | |||||
the archive). As we only fetched an increment between the previous | |||||
snapshot and the current state of the server, we are missing the type | |||||
information for the objects that would already have been referenced by | |||||
the previous snapshot, and that the git server didn't send us. We infer | |||||
the type of these objects from the previous snapshot. | |||||
""" | |||||
branches: Dict[bytes, Optional[SnapshotBranch]] = {} | |||||
ret_ref["target"] = hashutil.bytehex_to_hash(ret_ref["target"]) | unfetched_refs: Dict[bytes, bytes] = {} | ||||
branches[ref] = SnapshotBranch( | # Retrieve types from the objects loaded by the current loader | ||||
target_type=ret_ref["target_type"], target=ret_ref["target"], | for ref_name, ref_object in self.remote_refs.items(): | ||||
if ref_name in self.symbolic_refs: | |||||
continue | |||||
target = hashutil.hash_to_bytes(ref_object.decode()) | |||||
object_type = self.id_to_type.get(ref_object) | |||||
if object_type: | |||||
branches[ref_name] = SnapshotBranch( | |||||
target=target, | |||||
target_type=converters.DULWICH_TARGET_TYPES[object_type], | |||||
) | ) | ||||
ardumont: curious me, how is this happening? | |||||
Done Inline ActionsThis is really the crux of this function. So I've made the comments (very) verbose, I hope it's clearer now. olasd: This is really the crux of this function. So I've made the comments (very) verbose, I hope it's… | |||||
Not Done Inline Actionsyes, thanks. ardumont: yes, thanks. | |||||
Not Done Inline Actionsthanks a lot even ;) ardumont: thanks a lot even ;) | |||||
else: | |||||
# The object pointed at by this ref was not fetched, supposedly | |||||
# because it existed in the base snapshot. We record it here, | |||||
# and we can get it from the base snapshot later. | |||||
unfetched_refs[ref_name] = target | |||||
# Handle symbolic references as alias branches | |||||
for ref_name, target in self.symbolic_refs.items(): | |||||
branches[ref_name] = SnapshotBranch( | |||||
target_type=TargetType.ALIAS, target=target, | |||||
) | |||||
if unfetched_refs: | |||||
# Handle inference of object types from the contents of the | |||||
# previous snapshot | |||||
unknown_objects = {} | |||||
base_snapshot_reverse_branches = { | |||||
branch.target: branch | |||||
for branch in self.base_snapshot.branches.values() | |||||
if branch and branch.target_type != TargetType.ALIAS | |||||
} | |||||
for ref, target in self.symbolic_refs.items(): | for ref_name, target in unfetched_refs.items(): | ||||
branches[ref] = SnapshotBranch(target_type=TargetType.ALIAS, target=target,) | branch = base_snapshot_reverse_branches.get(target) | ||||
branches[ref_name] = branch | |||||
if not branch: | |||||
unknown_objects[ref_name] = target | |||||
if unknown_objects: | |||||
# This object was referenced by the server; We did not fetch | |||||
# it, and we do not know it from the previous snapshot. This is | |||||
# likely a bug in the loader. | |||||
raise RuntimeError( | |||||
"Unknown objects referenced by remote refs: %s" | |||||
% ( | |||||
", ".join( | |||||
f"{name.decode()}: {hashutil.hash_to_hex(obj)}" | |||||
for name, obj in unknown_objects.items() | |||||
) | |||||
) | |||||
) | |||||
self.snapshot = Snapshot(branches=branches) | self.snapshot = Snapshot(branches=branches) | ||||
return self.snapshot | return self.snapshot | ||||
def get_fetch_history_result(self) -> Dict[str, int]: | def get_fetch_history_result(self) -> Dict[str, int]: | ||||
return { | return { | ||||
"contents": len(self.type_to_ids[b"blob"]), | "contents": len(self.type_to_ids[b"blob"]), | ||||
"directories": len(self.type_to_ids[b"tree"]), | "directories": len(self.type_to_ids[b"tree"]), | ||||
Show All 39 Lines |
curious me, how is this happening?