Changeset View
Standalone View
swh/loader/git/loader.py
Show All 12 Lines | |||||||||||||||||||
from typing import Any, Callable, Dict, Iterable, Iterator, List, Optional, Set, Type | from typing import Any, Callable, Dict, Iterable, Iterator, List, Optional, Set, Type | ||||||||||||||||||
import dulwich.client | import dulwich.client | ||||||||||||||||||
from dulwich.errors import GitProtocolError, NotGitRepository | from dulwich.errors import GitProtocolError, NotGitRepository | ||||||||||||||||||
from dulwich.object_store import ObjectStoreGraphWalker | from dulwich.object_store import ObjectStoreGraphWalker | ||||||||||||||||||
from dulwich.objects import ShaFile | from dulwich.objects import ShaFile | ||||||||||||||||||
from dulwich.pack import PackData, PackInflater | from dulwich.pack import PackData, PackInflater | ||||||||||||||||||
from swh.core.statsd import Statsd | |||||||||||||||||||
from swh.loader.core.loader import DVCSLoader | from swh.loader.core.loader import DVCSLoader | ||||||||||||||||||
from swh.loader.exception import NotFound | from swh.loader.exception import NotFound | ||||||||||||||||||
from swh.model import hashutil | from swh.model import hashutil | ||||||||||||||||||
from swh.model.model import ( | from swh.model.model import ( | ||||||||||||||||||
BaseContent, | BaseContent, | ||||||||||||||||||
Directory, | Directory, | ||||||||||||||||||
Release, | Release, | ||||||||||||||||||
Revision, | Revision, | ||||||||||||||||||
Snapshot, | Snapshot, | ||||||||||||||||||
SnapshotBranch, | SnapshotBranch, | ||||||||||||||||||
TargetType, | TargetType, | ||||||||||||||||||
) | ) | ||||||||||||||||||
from swh.storage.algos.snapshot import snapshot_get_latest | from swh.storage.algos.snapshot import snapshot_get_latest | ||||||||||||||||||
from swh.storage.interface import StorageInterface | from swh.storage.interface import StorageInterface | ||||||||||||||||||
from . import converters, dumb, utils | from . import converters, dumb, utils | ||||||||||||||||||
from .utils import HexBytes | from .utils import HexBytes | ||||||||||||||||||
olasdUnsubmitted Not Done Inline Actions
olasd: | |||||||||||||||||||
Done Inline Actionsactually not relevant anymore, as I'll use the core's self.statsd vlorentz: actually not relevant anymore, as I'll use the core's self.statsd | |||||||||||||||||||
STATSD_PREFIX = "swh_loader_git" | |||||||||||||||||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||||||||||||||||
class RepoRepresentation: | class RepoRepresentation: | ||||||||||||||||||
"""Repository representation for a Software Heritage origin.""" | """Repository representation for a Software Heritage origin.""" | ||||||||||||||||||
def __init__( | def __init__( | ||||||||||||||||||
self, | self, | ||||||||||||||||||
storage, | storage, | ||||||||||||||||||
base_snapshot: Optional[Snapshot] = None, | base_snapshot: Optional[Snapshot] = None, | ||||||||||||||||||
incremental: bool = True, | incremental: bool = True, | ||||||||||||||||||
statsd: Statsd = None, | |||||||||||||||||||
): | ): | ||||||||||||||||||
self.storage = storage | self.storage = storage | ||||||||||||||||||
self.incremental = incremental | self.incremental = incremental | ||||||||||||||||||
self.statsd = statsd | |||||||||||||||||||
if base_snapshot and incremental: | if base_snapshot and incremental: | ||||||||||||||||||
self.base_snapshot: Snapshot = base_snapshot | self.base_snapshot: Snapshot = base_snapshot | ||||||||||||||||||
else: | else: | ||||||||||||||||||
self.base_snapshot = Snapshot(branches={}) | self.base_snapshot = Snapshot(branches={}) | ||||||||||||||||||
self.heads: Set[HexBytes] = set() | self.heads: Set[HexBytes] = set() | ||||||||||||||||||
Show All 28 Lines | def determine_wants(self, refs: Dict[bytes, HexBytes]) -> List[HexBytes]: | ||||||||||||||||||
for ref_name, ref_target in refs.items(): | for ref_name, ref_target in refs.items(): | ||||||||||||||||||
if utils.ignore_branch_name(ref_name): | if utils.ignore_branch_name(ref_name): | ||||||||||||||||||
continue | continue | ||||||||||||||||||
remote_heads.add(ref_target) | remote_heads.add(ref_target) | ||||||||||||||||||
logger.debug("local_heads_count=%s", len(local_heads)) | logger.debug("local_heads_count=%s", len(local_heads)) | ||||||||||||||||||
logger.debug("remote_heads_count=%s", len(remote_heads)) | logger.debug("remote_heads_count=%s", len(remote_heads)) | ||||||||||||||||||
wanted_refs = list(remote_heads - local_heads) | wanted_refs = list(remote_heads - local_heads) | ||||||||||||||||||
logger.debug("wanted_refs_count=%s", len(wanted_refs)) | logger.debug("wanted_refs_count=%s", len(wanted_refs)) | ||||||||||||||||||
if self.statsd is not None: | |||||||||||||||||||
self.statsd.histogram( | |||||||||||||||||||
"git_ignored_refs_percent", | |||||||||||||||||||
len(remote_heads - set(refs.values())) / len(refs), | |||||||||||||||||||
tags={}, | |||||||||||||||||||
) | |||||||||||||||||||
self.statsd.histogram( | |||||||||||||||||||
"git_known_refs_percent", | |||||||||||||||||||
len(local_heads & remote_heads) / len(remote_heads), | |||||||||||||||||||
Not Done Inline Actions
We will need these to be configured with specific buckets as well (same as D7727) olasd: We will need these to be configured with specific buckets as well (same as D7727) | |||||||||||||||||||
Done Inline Actionsalready fixed, and renamed to _percent as recommended in prometheus' documentation vlorentz: already fixed, and renamed to _percent as recommended in prometheus' documentation | |||||||||||||||||||
tags={}, | |||||||||||||||||||
) | |||||||||||||||||||
return wanted_refs | return wanted_refs | ||||||||||||||||||
@dataclass | @dataclass | ||||||||||||||||||
class FetchPackReturn: | class FetchPackReturn: | ||||||||||||||||||
remote_refs: Dict[bytes, HexBytes] | remote_refs: Dict[bytes, HexBytes] | ||||||||||||||||||
symbolic_refs: Dict[bytes, HexBytes] | symbolic_refs: Dict[bytes, HexBytes] | ||||||||||||||||||
pack_buffer: SpooledTemporaryFile | pack_buffer: SpooledTemporaryFile | ||||||||||||||||||
pack_size: int | pack_size: int | ||||||||||||||||||
class GitLoader(DVCSLoader): | class GitLoader(DVCSLoader): | ||||||||||||||||||
"""A bulk loader for a git repository""" | """A bulk loader for a git repository | ||||||||||||||||||
Emits the following statsd stats: | |||||||||||||||||||
* increments ``swh_loader_git`` | |||||||||||||||||||
* histogram ``swh_loader_git_ignored_refs_percent`` is the ratio of refs ignored | |||||||||||||||||||
over all refs of the remote repository | |||||||||||||||||||
* histogram ``swh_loader_git_known_refs_percent`` is the ratio of (non-ignored) | |||||||||||||||||||
remote heads that are already local over all non-ignored remote heads | |||||||||||||||||||
All three are tagged with ``{{"incremental": "<incremental_mode>"}}`` where | |||||||||||||||||||
``incremental_mode`` is one of: | |||||||||||||||||||
* ``from_same_origin`` when the origin was already loaded | |||||||||||||||||||
* ``from_parent_origin`` when the origin was not already loaded, | |||||||||||||||||||
but it was detected as a forge-fork of an origin that was already loaded | |||||||||||||||||||
* ``no_previous_snapshot`` when the origin was not already loaded, | |||||||||||||||||||
and it was detected as a forge-fork of origins that were not already loaded either | |||||||||||||||||||
* ``no_parent_origin`` when the origin was no already loaded, and it was not | |||||||||||||||||||
detected as a forge-fork of any other origin | |||||||||||||||||||
* ``disabled`` when incremental loading is disabled by configuration | |||||||||||||||||||
""" | |||||||||||||||||||
Not Done Inline Actionsf-strings can't be used as docstrings. olasd: f-strings can't be used as docstrings. | |||||||||||||||||||
Done Inline Actionsaw, sad. anyway, I don't need them anymore now that I'm removing the STATSD_PREFIX. vlorentz: aw, sad. anyway, I don't need them anymore now that I'm removing the STATSD_PREFIX. | |||||||||||||||||||
visit_type = "git" | visit_type = "git" | ||||||||||||||||||
def __init__( | def __init__( | ||||||||||||||||||
self, | self, | ||||||||||||||||||
storage: StorageInterface, | storage: StorageInterface, | ||||||||||||||||||
url: str, | url: str, | ||||||||||||||||||
incremental: bool = True, | incremental: bool = True, | ||||||||||||||||||
▲ Show 20 Lines • Show All 111 Lines • ▼ Show 20 Lines | class GitLoader(DVCSLoader): | ||||||||||||||||||
def prepare(self) -> None: | def prepare(self) -> None: | ||||||||||||||||||
assert self.origin is not None | assert self.origin is not None | ||||||||||||||||||
prev_snapshot: Optional[Snapshot] = None | prev_snapshot: Optional[Snapshot] = None | ||||||||||||||||||
if self.incremental: | if self.incremental: | ||||||||||||||||||
prev_snapshot = self.get_full_snapshot(self.origin.url) | prev_snapshot = self.get_full_snapshot(self.origin.url) | ||||||||||||||||||
if prev_snapshot: | |||||||||||||||||||
if self.parent_origins is not None: | statsd_incremental_mode = "from_same_origin" | ||||||||||||||||||
elif self.parent_origins is not None: | |||||||||||||||||||
# If this is the first time we load this origin and it is a forge | # If this is the first time we load this origin and it is a forge | ||||||||||||||||||
# fork, load incrementally from one of the origins it was forked from, | # fork, load incrementally from one of the origins it was forked from, | ||||||||||||||||||
# closest parent first | # closest parent first | ||||||||||||||||||
for parent_origin in self.parent_origins: | for parent_origin in self.parent_origins: | ||||||||||||||||||
prev_snapshot = self.get_full_snapshot(parent_origin.url) | |||||||||||||||||||
if prev_snapshot is not None: | if prev_snapshot is not None: | ||||||||||||||||||
statsd_incremental_mode = "from_parent_origin" | |||||||||||||||||||
break | break | ||||||||||||||||||
prev_snapshot = self.get_full_snapshot(parent_origin.url) | else: | ||||||||||||||||||
statsd_incremental_mode = "no_previous_snapshot" | |||||||||||||||||||
else: | |||||||||||||||||||
statsd_incremental_mode = "no_parent_origin" | |||||||||||||||||||
else: | |||||||||||||||||||
statsd_incremental_mode = "disabled" | |||||||||||||||||||
Not Done Inline Actions
As you may have guessed from my comment on D7726, I think this should be set as a tag on a common metric, rather than generate a set of distinct metrics. And this should be pushed as a "constant" tag to used on further calls to statsd in this loader, without having to bother with passing the list of tags around on all calls, e.g. the ones for the refs ratios. I also wonder if this should be a couple of tags rather than a single one:
Is it worth distinguishing "has parent, no snapshot found" no_previous_snapshot from "doesn't have parent, no snapshot found" no_parent_origin? If so, maybe we can add a has_parent tag?) olasd: As you may have guessed from my comment on D7726, I think this should be set as a tag on a… | |||||||||||||||||||
Done Inline Actionsalready fixed too, but in a different way. I can do it your way too, if you prefer.
I don't know, but there is no harm in storing it, is there? vlorentz: already fixed too, but in a different way. I can do it your way too, if you prefer.
> Is it… | |||||||||||||||||||
self.statsd.constant_tags["incremental"] = statsd_incremental_mode | |||||||||||||||||||
# Increments a metric with full name 'swh_loader_git'; which is useful to | |||||||||||||||||||
# count how many runs of the loader are with each incremental mode | |||||||||||||||||||
Done Inline ActionsThis should end with _total to conform with prom naming conventions olasd: This should end with `_total` to conform with prom naming conventions | |||||||||||||||||||
self.statsd.increment("git", tags={}) | |||||||||||||||||||
if prev_snapshot is not None: | if prev_snapshot is not None: | ||||||||||||||||||
self.base_snapshot = prev_snapshot | self.base_snapshot = prev_snapshot | ||||||||||||||||||
else: | else: | ||||||||||||||||||
self.base_snapshot = Snapshot(branches={}) | self.base_snapshot = Snapshot(branches={}) | ||||||||||||||||||
def fetch_data(self) -> bool: | def fetch_data(self) -> bool: | ||||||||||||||||||
assert self.origin is not None | assert self.origin is not None | ||||||||||||||||||
base_repo = self.repo_representation( | base_repo = self.repo_representation( | ||||||||||||||||||
storage=self.storage, | storage=self.storage, | ||||||||||||||||||
base_snapshot=self.base_snapshot, | base_snapshot=self.base_snapshot, | ||||||||||||||||||
incremental=self.incremental, | incremental=self.incremental, | ||||||||||||||||||
statsd=self.statsd, | |||||||||||||||||||
) | ) | ||||||||||||||||||
def do_progress(msg: bytes) -> None: | def do_progress(msg: bytes) -> None: | ||||||||||||||||||
sys.stderr.buffer.write(msg) | sys.stderr.buffer.write(msg) | ||||||||||||||||||
sys.stderr.flush() | sys.stderr.flush() | ||||||||||||||||||
try: | try: | ||||||||||||||||||
fetch_info = self.fetch_pack_from_origin( | fetch_info = self.fetch_pack_from_origin( | ||||||||||||||||||
▲ Show 20 Lines • Show All 258 Lines • Show Last 20 Lines |