Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/git/loader.py
# Copyright (C) 2016-2021 The Software Heritage developers | # Copyright (C) 2016-2021 The Software Heritage developers | ||||||||||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||||||||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||||||||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||||||||||
from dataclasses import dataclass | from dataclasses import dataclass | ||||||||||||
import datetime | import datetime | ||||||||||||
import logging | import logging | ||||||||||||
import os | import os | ||||||||||||
import pickle | import pickle | ||||||||||||
import signal | |||||||||||||
import sys | import sys | ||||||||||||
from tempfile import SpooledTemporaryFile | from tempfile import SpooledTemporaryFile | ||||||||||||
import tracemalloc | |||||||||||||
from typing import Any, Callable, Dict, Iterable, Iterator, List, Optional, Set, Type | from typing import Any, Callable, Dict, Iterable, Iterator, List, Optional, Set, Type | ||||||||||||
import dulwich.client | import dulwich.client | ||||||||||||
from dulwich.errors import GitProtocolError, NotGitRepository | from dulwich.errors import GitProtocolError, NotGitRepository | ||||||||||||
from dulwich.object_store import ObjectStoreGraphWalker | from dulwich.object_store import ObjectStoreGraphWalker | ||||||||||||
from dulwich.objects import ShaFile | from dulwich.objects import ShaFile | ||||||||||||
from dulwich.pack import PackData, PackInflater | from dulwich.pack import PackData, PackInflater | ||||||||||||
Show All 12 Lines | |||||||||||||
) | ) | ||||||||||||
from swh.storage.algos.snapshot import snapshot_get_latest | from swh.storage.algos.snapshot import snapshot_get_latest | ||||||||||||
from swh.storage.interface import StorageInterface | from swh.storage.interface import StorageInterface | ||||||||||||
from . import converters, dumb, utils | from . import converters, dumb, utils | ||||||||||||
from .utils import HexBytes | from .utils import HexBytes | ||||||||||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||||||||||
tracemalloc_logger = logging.getLogger(__name__ + ".tracemalloc") | |||||||||||||
ardumont: Drop some spurious logs at the same time. | |||||||||||||
def log_tracemalloc(msg: str, snapshot: tracemalloc.Snapshot): | |||||||||||||
top_stats = snapshot.statistics("lineno") | |||||||||||||
tracemalloc_logger.debug("[ Top 10 memory users %s!]", msg) | |||||||||||||
for stat in top_stats[:10]: | |||||||||||||
tracemalloc_logger.debug(stat) | |||||||||||||
def log_tracemalloc_diff( | |||||||||||||
msg: str, snapshot1: tracemalloc.Snapshot, snapshot2: tracemalloc.Snapshot | |||||||||||||
): | |||||||||||||
top_stats = snapshot2.compare_to(snapshot1, "lineno") | |||||||||||||
tracemalloc_logger.debug("[ Top 10 differences after %s!]", msg) | |||||||||||||
for stat in top_stats[:10]: | |||||||||||||
tracemalloc_logger.debug(stat) | |||||||||||||
class RepoRepresentation: | class RepoRepresentation: | ||||||||||||
"""Repository representation for a Software Heritage origin.""" | """Repository representation for a Software Heritage origin.""" | ||||||||||||
def __init__( | def __init__( | ||||||||||||
self, storage, base_snapshot: Optional[Snapshot] = None, ignore_history=False | self, storage, base_snapshot: Optional[Snapshot] = None, ignore_history=False | ||||||||||||
): | ): | ||||||||||||
▲ Show 20 Lines • Show All 91 Lines • ▼ Show 20 Lines | ): | ||||||||||||
self.repo_representation = repo_representation | self.repo_representation = repo_representation | ||||||||||||
self.pack_size_bytes = pack_size_bytes | self.pack_size_bytes = pack_size_bytes | ||||||||||||
self.temp_file_cutoff = temp_file_cutoff | self.temp_file_cutoff = temp_file_cutoff | ||||||||||||
# state initialized in fetch_data | # state initialized in fetch_data | ||||||||||||
self.remote_refs: Dict[bytes, HexBytes] = {} | self.remote_refs: Dict[bytes, HexBytes] = {} | ||||||||||||
self.symbolic_refs: Dict[bytes, HexBytes] = {} | self.symbolic_refs: Dict[bytes, HexBytes] = {} | ||||||||||||
self.ref_object_types: Dict[bytes, Optional[TargetType]] = {} | self.ref_object_types: Dict[bytes, Optional[TargetType]] = {} | ||||||||||||
self.tracemalloc_snapshot = tracemalloc.take_snapshot() | |||||||||||||
signal.signal(signal.SIGUSR1, self.tracemalloc_handler) | |||||||||||||
def do_tracemalloc(self, msg: str): | |||||||||||||
tracemalloc_snapshot = tracemalloc.take_snapshot() | |||||||||||||
log_tracemalloc(msg, tracemalloc_snapshot) | |||||||||||||
log_tracemalloc_diff( | |||||||||||||
msg, self.tracemalloc_snapshot, tracemalloc_snapshot, | |||||||||||||
) | |||||||||||||
ardumontUnsubmitted Not Done Inline Actions
as per irc suggestion because that takes time. ardumont: as per irc suggestion because that takes time. | |||||||||||||
self.tracemalloc_snapshot = tracemalloc_snapshot | |||||||||||||
def tracemalloc_handler(self, _signum, _frame): | |||||||||||||
self.do_tracemalloc("on_signal") | |||||||||||||
def fetch_pack_from_origin( | def fetch_pack_from_origin( | ||||||||||||
self, | self, | ||||||||||||
origin_url: str, | origin_url: str, | ||||||||||||
base_repo: RepoRepresentation, | base_repo: RepoRepresentation, | ||||||||||||
do_activity: Callable[[bytes], None], | do_activity: Callable[[bytes], None], | ||||||||||||
) -> FetchPackReturn: | ) -> FetchPackReturn: | ||||||||||||
"""Fetch a pack from the origin""" | """Fetch a pack from the origin""" | ||||||||||||
▲ Show 20 Lines • Show All 55 Lines • ▼ Show 20 Lines | ) -> FetchPackReturn: | ||||||||||||
logger.debug("fetched_pack_size=%s", pack_size) | logger.debug("fetched_pack_size=%s", pack_size) | ||||||||||||
# check if repository only supports git dumb transfer protocol, | # check if repository only supports git dumb transfer protocol, | ||||||||||||
# fetched pack file will be empty in that case as dulwich do | # fetched pack file will be empty in that case as dulwich do | ||||||||||||
# not support it and do not fetch any refs | # not support it and do not fetch any refs | ||||||||||||
self.dumb = transport_url.startswith("http") and client.dumb | self.dumb = transport_url.startswith("http") and client.dumb | ||||||||||||
return FetchPackReturn( | ret = FetchPackReturn( | ||||||||||||
remote_refs=utils.filter_refs(remote_refs), | remote_refs=utils.filter_refs(remote_refs), | ||||||||||||
symbolic_refs=utils.filter_refs(symbolic_refs), | symbolic_refs=utils.filter_refs(symbolic_refs), | ||||||||||||
pack_buffer=pack_buffer, | pack_buffer=pack_buffer, | ||||||||||||
pack_size=pack_size, | pack_size=pack_size, | ||||||||||||
) | ) | ||||||||||||
self.do_tracemalloc("fetch_pack_from_origins") | |||||||||||||
return ret | |||||||||||||
def prepare_origin_visit(self) -> None: | def prepare_origin_visit(self) -> None: | ||||||||||||
self.visit_date = datetime.datetime.now(tz=datetime.timezone.utc) | self.visit_date = datetime.datetime.now(tz=datetime.timezone.utc) | ||||||||||||
self.origin = Origin(url=self.origin_url) | self.origin = Origin(url=self.origin_url) | ||||||||||||
def get_full_snapshot(self, origin_url) -> Optional[Snapshot]: | def get_full_snapshot(self, origin_url) -> Optional[Snapshot]: | ||||||||||||
return snapshot_get_latest(self.storage, origin_url) | return snapshot_get_latest(self.storage, origin_url) | ||||||||||||
def prepare(self) -> None: | def prepare(self) -> None: | ||||||||||||
▲ Show 20 Lines • Show All 118 Lines • ▼ Show 20 Lines | def iter_objects(self, object_type: bytes) -> Iterator[ShaFile]: | ||||||||||||
PackData.from_file(self.pack_buffer, self.pack_size) | PackData.from_file(self.pack_buffer, self.pack_size) | ||||||||||||
): | ): | ||||||||||||
if obj.type_name != object_type: | if obj.type_name != object_type: | ||||||||||||
continue | continue | ||||||||||||
yield obj | yield obj | ||||||||||||
count += 1 | count += 1 | ||||||||||||
logger.debug("packfile_read_count_%s=%s", object_type.decode(), count) | logger.debug("packfile_read_count_%s=%s", object_type.decode(), count) | ||||||||||||
self.do_tracemalloc("iter_objects") | |||||||||||||
def get_contents(self) -> Iterable[BaseContent]: | def get_contents(self) -> Iterable[BaseContent]: | ||||||||||||
"""Format the blobs from the git repository as swh contents""" | """Format the blobs from the git repository as swh contents""" | ||||||||||||
for raw_obj in self.iter_objects(b"blob"): | for raw_obj in self.iter_objects(b"blob"): | ||||||||||||
if raw_obj.id in self.ref_object_types: | if raw_obj.id in self.ref_object_types: | ||||||||||||
self.ref_object_types[raw_obj.id] = TargetType.CONTENT | self.ref_object_types[raw_obj.id] = TargetType.CONTENT | ||||||||||||
yield converters.dulwich_blob_to_content( | yield converters.dulwich_blob_to_content( | ||||||||||||
raw_obj, max_content_size=self.max_content_size | raw_obj, max_content_size=self.max_content_size | ||||||||||||
▲ Show 20 Lines • Show All 150 Lines • Show Last 20 Lines |
Drop some spurious logs at the same time.