Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/git/loader.py
Show All 40 Lines | |||||
class RepoRepresentation: | class RepoRepresentation: | ||||
"""Repository representation for a Software Heritage origin.""" | """Repository representation for a Software Heritage origin.""" | ||||
def __init__( | def __init__( | ||||
self, | self, | ||||
storage, | storage, | ||||
base_snapshot: Optional[Snapshot] = None, | base_snapshots: List[Snapshot] = None, | ||||
incremental: bool = True, | incremental: bool = True, | ||||
statsd: Statsd = None, | statsd: Statsd = None, | ||||
): | ): | ||||
self.storage = storage | self.storage = storage | ||||
self.incremental = incremental | self.incremental = incremental | ||||
self.statsd = statsd | self.statsd = statsd | ||||
if base_snapshot and incremental: | if base_snapshots and incremental: | ||||
self.base_snapshot: Snapshot = base_snapshot | self.base_snapshots: List[Snapshot] = base_snapshots | ||||
else: | else: | ||||
self.base_snapshot = Snapshot(branches={}) | self.base_snapshots = [] | ||||
self.heads: Set[HexBytes] = set() | self.heads: Set[HexBytes] = set() | ||||
def get_parents(self, commit: bytes) -> List[bytes]: | def get_parents(self, commit: bytes) -> List[bytes]: | ||||
"""This method should return the list of known parents""" | """This method should return the list of known parents""" | ||||
return [] | return [] | ||||
def graph_walker(self) -> ObjectStoreGraphWalker: | def graph_walker(self) -> ObjectStoreGraphWalker: | ||||
return ObjectStoreGraphWalker(self.heads, self.get_parents) | return ObjectStoreGraphWalker(self.heads, self.get_parents) | ||||
def determine_wants(self, refs: Dict[bytes, HexBytes]) -> List[HexBytes]: | def determine_wants(self, refs: Dict[bytes, HexBytes]) -> List[HexBytes]: | ||||
"""Get the list of bytehex sha1s that the git loader should fetch. | """Get the list of bytehex sha1s that the git loader should fetch. | ||||
This compares the remote refs sent by the server with the base snapshot | This compares the remote refs sent by the server with the base snapshot | ||||
provided by the loader. | provided by the loader. | ||||
""" | """ | ||||
if not refs: | if not refs: | ||||
return [] | return [] | ||||
# Cache existing heads | # Cache existing heads | ||||
local_heads: Set[HexBytes] = set() | local_heads: Set[HexBytes] = set() | ||||
for branch_name, branch in self.base_snapshot.branches.items(): | for base_snapshot in self.base_snapshots: | ||||
for branch_name, branch in base_snapshot.branches.items(): | |||||
if not branch or branch.target_type == TargetType.ALIAS: | if not branch or branch.target_type == TargetType.ALIAS: | ||||
continue | continue | ||||
local_heads.add(hashutil.hash_to_hex(branch.target).encode()) | local_heads.add(hashutil.hash_to_hex(branch.target).encode()) | ||||
self.heads = local_heads | self.heads = local_heads | ||||
# Get the remote heads that we want to fetch | # Get the remote heads that we want to fetch | ||||
remote_heads: Set[HexBytes] = set() | remote_heads: Set[HexBytes] = set() | ||||
for ref_name, ref_target in refs.items(): | for ref_name, ref_target in refs.items(): | ||||
if utils.ignore_branch_name(ref_name): | if utils.ignore_branch_name(ref_name): | ||||
continue | continue | ||||
▲ Show 20 Lines • Show All 166 Lines • ▼ Show 20 Lines | ) -> FetchPackReturn: | ||||
) | ) | ||||
def get_full_snapshot(self, origin_url) -> Optional[Snapshot]: | def get_full_snapshot(self, origin_url) -> Optional[Snapshot]: | ||||
return snapshot_get_latest(self.storage, origin_url) | return snapshot_get_latest(self.storage, origin_url) | ||||
def prepare(self) -> None: | def prepare(self) -> None: | ||||
assert self.origin is not None | assert self.origin is not None | ||||
prev_snapshot: Optional[Snapshot] = None | self.prev_snapshot = Snapshot(branches={}) | ||||
"""Last snapshot of this origin if any; empty snapshot otherwise""" | |||||
self.base_snapshots = [] | |||||
"""Last snapshot of this origin and all its parents, if any.""" | |||||
self.statsd.constant_tags["incremental_enabled"] = self.incremental | self.statsd.constant_tags["incremental_enabled"] = self.incremental | ||||
self.statsd.constant_tags["has_parent_origins"] = bool(self.parent_origins) | self.statsd.constant_tags["has_parent_origins"] = bool(self.parent_origins) | ||||
# May be set to True later | |||||
self.statsd.constant_tags["has_parent_snapshot"] = False | |||||
if self.incremental: | if self.incremental: | ||||
prev_snapshot = self.get_full_snapshot(self.origin.url) | prev_snapshot = self.get_full_snapshot(self.origin.url) | ||||
self.statsd.constant_tags["has_previous_snapshot"] = bool(prev_snapshot) | |||||
if prev_snapshot: | if prev_snapshot: | ||||
incremental_snapshot_origin = "self" | self.prev_snapshot = prev_snapshot | ||||
self.base_snapshots.append(prev_snapshot) | |||||
elif self.parent_origins is not None: | if self.parent_origins is not None: | ||||
# If this is the first time we load this origin and it is a forge | # If this is the first time we load this origin and it is a forge | ||||
# fork, load incrementally from one of the origins it was forked from, | # fork, load incrementally from one of the origins it was forked from, | ||||
# closest parent first | # closest parent first | ||||
for parent_origin in self.parent_origins: | for parent_origin in self.parent_origins: | ||||
prev_snapshot = self.get_full_snapshot(parent_origin.url) | parent_snapshot = self.get_full_snapshot(parent_origin.url) | ||||
if prev_snapshot is not None: | if parent_snapshot is not None: | ||||
incremental_snapshot_origin = "parent" | self.statsd.constant_tags["has_parent_snapshot"] = True | ||||
break | self.base_snapshots.append(parent_snapshot) | ||||
else: | |||||
incremental_snapshot_origin = "none" | |||||
else: | |||||
incremental_snapshot_origin = "none" | |||||
self.statsd.constant_tags[ | |||||
"incremental_snapshot_origin" | |||||
] = incremental_snapshot_origin | |||||
# Increments a metric with full name 'swh_loader_git'; which is useful to | # Increments a metric with full name 'swh_loader_git'; which is useful to | ||||
# count how many runs of the loader are with each incremental mode | # count how many runs of the loader are with each incremental mode | ||||
self.statsd.increment("git_total", tags={}) | self.statsd.increment("git_total", tags={}) | ||||
if prev_snapshot is not None: | |||||
self.base_snapshot = prev_snapshot | |||||
else: | |||||
self.base_snapshot = Snapshot(branches={}) | |||||
def fetch_data(self) -> bool: | def fetch_data(self) -> bool: | ||||
assert self.origin is not None | assert self.origin is not None | ||||
base_repo = self.repo_representation( | base_repo = self.repo_representation( | ||||
storage=self.storage, | storage=self.storage, | ||||
base_snapshot=self.base_snapshot, | base_snapshots=self.base_snapshots, | ||||
incremental=self.incremental, | incremental=self.incremental, | ||||
statsd=self.statsd, | statsd=self.statsd, | ||||
) | ) | ||||
def do_progress(msg: bytes) -> None: | def do_progress(msg: bytes) -> None: | ||||
sys.stderr.buffer.write(msg) | sys.stderr.buffer.write(msg) | ||||
sys.stderr.flush() | sys.stderr.flush() | ||||
▲ Show 20 Lines • Show All 178 Lines • ▼ Show 20 Lines | def get_snapshot(self) -> Snapshot: | ||||
branches[target] = None | branches[target] = None | ||||
if unfetched_refs: | if unfetched_refs: | ||||
# Handle inference of object types from the contents of the | # Handle inference of object types from the contents of the | ||||
# previous snapshot | # previous snapshot | ||||
unknown_objects = {} | unknown_objects = {} | ||||
base_snapshot_reverse_branches = { | base_snapshot_reverse_branches = { | ||||
branch.target: branch | branch.target: branch | ||||
for branch in self.base_snapshot.branches.values() | for branch in self.prev_snapshot.branches.values() | ||||
if branch and branch.target_type != TargetType.ALIAS | if branch and branch.target_type != TargetType.ALIAS | ||||
} | } | ||||
for ref_name, target in unfetched_refs.items(): | for ref_name, target in unfetched_refs.items(): | ||||
branch = base_snapshot_reverse_branches.get(target) | branch = base_snapshot_reverse_branches.get(target) | ||||
branches[ref_name] = branch | branches[ref_name] = branch | ||||
if not branch: | if not branch: | ||||
unknown_objects[ref_name] = target | unknown_objects[ref_name] = target | ||||
Show All 19 Lines | def get_snapshot(self) -> Snapshot: | ||||
self.snapshot = Snapshot(branches=branches) | self.snapshot = Snapshot(branches=branches) | ||||
return self.snapshot | return self.snapshot | ||||
def load_status(self) -> Dict[str, Any]: | def load_status(self) -> Dict[str, Any]: | ||||
"""The load was eventful if the current snapshot is different to | """The load was eventful if the current snapshot is different to | ||||
the one we retrieved at the beginning of the run""" | the one we retrieved at the beginning of the run""" | ||||
eventful = False | eventful = False | ||||
if self.base_snapshot and self.snapshot: | if self.prev_snapshot and self.snapshot: | ||||
eventful = self.snapshot.id != self.base_snapshot.id | eventful = self.snapshot.id != self.prev_snapshot.id | ||||
elif self.snapshot: | elif self.snapshot: | ||||
eventful = bool(self.snapshot.branches) | eventful = bool(self.snapshot.branches) | ||||
return {"status": ("eventful" if eventful else "uneventful")} | return {"status": ("eventful" if eventful else "uneventful")} | ||||
if __name__ == "__main__": | if __name__ == "__main__": | ||||
import click | import click | ||||
Show All 28 Lines |