Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/git/loader.py
Show All 34 Lines | |||||
from swh.storage.interface import StorageInterface | from swh.storage.interface import StorageInterface | ||||
from . import converters, dumb, utils | from . import converters, dumb, utils | ||||
from .utils import HexBytes | from .utils import HexBytes | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
DEFAULT_NUMBER_IDS_TO_FETCH = 200 | |||||
def do_print_progress(msg: bytes) -> None: | |||||
sys.stderr.buffer.write(msg) | |||||
ardumont: Do we need this?
I'd very much see this as silently ignored... | |||||
Done Inline Actions
nvm, let's keep it. ardumont: > Do we need this?
> I'd very much see this as silently ignored...
nvm, let's keep it. | |||||
sys.stderr.flush() | |||||
class RepoRepresentation: | class RepoRepresentation: | ||||
"""Repository representation for a Software Heritage origin.""" | """Repository representation for a Software Heritage origin.""" | ||||
def __init__( | def __init__( | ||||
self, storage, base_snapshot: Optional[Snapshot] = None, ignore_history=False | self, | ||||
storage, | |||||
base_snapshot: Optional[Snapshot] = None, | |||||
ignore_history=False, | |||||
limit: int = DEFAULT_NUMBER_IDS_TO_FETCH, | |||||
): | ): | ||||
self.storage = storage | self.storage = storage | ||||
self.ignore_history = ignore_history | self.ignore_history = ignore_history | ||||
if base_snapshot and not ignore_history: | if base_snapshot and not ignore_history: | ||||
self.base_snapshot: Snapshot = base_snapshot | self.base_snapshot: Snapshot = base_snapshot | ||||
else: | else: | ||||
self.base_snapshot = Snapshot(branches={}) | self.base_snapshot = Snapshot(branches={}) | ||||
self.heads: Set[HexBytes] = set() | self.heads: Set[HexBytes] = set() | ||||
self.wanted_refs: Optional[List[HexBytes]] = None | |||||
# Pagination index | |||||
self.index: int = 0 | |||||
self.limit = limit | |||||
self.walker = ObjectStoreGraphWalker(self.heads, self.get_parents) | |||||
self.previous_refs: List[HexBytes] = [] | |||||
def get_parents(self, commit: bytes) -> List[bytes]: | def get_parents(self, commit: bytes) -> List[bytes]: | ||||
"""This method should return the list of known parents""" | """This method should return the list of known parents""" | ||||
return [] | return [] | ||||
def graph_walker(self) -> ObjectStoreGraphWalker: | def graph_walker(self) -> ObjectStoreGraphWalker: | ||||
return ObjectStoreGraphWalker(self.heads, self.get_parents) | return self.walker | ||||
def determine_wants(self, refs: Dict[bytes, HexBytes]) -> List[HexBytes]: | def wanted_refs_fetched(self) -> bool: | ||||
"""Did we fetch all wanted refs?""" | |||||
return self.wanted_refs is not None and self.index > len(self.wanted_refs) | |||||
def determine_wants( | |||||
self, refs: Dict[bytes, HexBytes], depth=None | |||||
) -> List[HexBytes]: | |||||
"""Get the list of bytehex sha1s that the git loader should fetch. | """Get the list of bytehex sha1s that the git loader should fetch. | ||||
This compares the remote refs sent by the server with the base snapshot | This compares the remote refs sent by the server with the base snapshot | ||||
provided by the loader. | provided by the loader. | ||||
""" | """ | ||||
if not refs: | if not refs: | ||||
return [] | return [] | ||||
if not self.wanted_refs: | |||||
# We'll compute all wanted_refs to ingest but we'll return it by batch of | |||||
# limit | |||||
# Cache existing heads | # Cache existing heads | ||||
local_heads: Set[HexBytes] = set() | local_heads: Set[HexBytes] = set() | ||||
for branch_name, branch in self.base_snapshot.branches.items(): | for branch_name, branch in self.base_snapshot.branches.items(): | ||||
if not branch or branch.target_type == TargetType.ALIAS: | if not branch or branch.target_type == TargetType.ALIAS: | ||||
continue | continue | ||||
local_heads.add(hashutil.hash_to_hex(branch.target).encode()) | local_heads.add(hashutil.hash_to_hex(branch.target).encode()) | ||||
self.heads = local_heads | self.heads = local_heads | ||||
# Get the remote heads that we want to fetch | # Get the remote heads that we want to fetch | ||||
remote_heads: Set[HexBytes] = set() | remote_heads: Set[HexBytes] = set() | ||||
for ref_name, ref_target in refs.items(): | for ref_name, ref_target in refs.items(): | ||||
if utils.ignore_branch_name(ref_name): | if utils.ignore_branch_name(ref_name): | ||||
continue | continue | ||||
remote_heads.add(ref_target) | remote_heads.add(ref_target) | ||||
logger.debug("local_heads_count=%s", len(local_heads)) | logger.debug("local_heads_count=%s", len(local_heads)) | ||||
logger.debug("remote_heads_count=%s", len(remote_heads)) | logger.debug("remote_heads_count=%s", len(remote_heads)) | ||||
wanted_refs = list(remote_heads - local_heads) | wanted_refs = list(remote_heads - local_heads) | ||||
logger.debug("wanted_refs_count=%s", len(wanted_refs)) | logger.debug("wanted_refs_count=%s", len(wanted_refs)) | ||||
return wanted_refs | self.wanted_refs = wanted_refs | ||||
start = self.index | |||||
self.index += self.limit | |||||
assert self.wanted_refs is not None | |||||
asked_refs = self.wanted_refs[start : self.index] | |||||
if start > 0: | |||||
# Previous refs was already walked so we can remove them from the next walk | |||||
# iteration to avoid processing them again | |||||
self.walker.heads.update(self.previous_refs) | |||||
self.previous_refs = asked_refs | |||||
logger.debug("asked_refs_count=%s", len(asked_refs)) | |||||
return asked_refs | |||||
@dataclass | @dataclass | ||||
class FetchPackReturn: | class FetchPackReturn: | ||||
remote_refs: Dict[bytes, HexBytes] | remote_refs: Dict[bytes, HexBytes] | ||||
symbolic_refs: Dict[bytes, HexBytes] | symbolic_refs: Dict[bytes, HexBytes] | ||||
pack_buffer: SpooledTemporaryFile | pack_buffer: SpooledTemporaryFile | ||||
pack_size: int | pack_size: int | ||||
continue_loading: bool | |||||
class GitLoader(DVCSLoader): | class GitLoader(DVCSLoader): | ||||
"""A bulk loader for a git repository""" | """A bulk loader for a git repository""" | ||||
visit_type = "git" | visit_type = "git" | ||||
def __init__( | def __init__( | ||||
self, | self, | ||||
storage: StorageInterface, | storage: StorageInterface, | ||||
url: str, | url: str, | ||||
base_url: Optional[str] = None, | base_url: Optional[str] = None, | ||||
ignore_history: bool = False, | ignore_history: bool = False, | ||||
repo_representation: Type[RepoRepresentation] = RepoRepresentation, | repo_representation: Type[RepoRepresentation] = RepoRepresentation, | ||||
pack_size_bytes: int = 4 * 1024 * 1024 * 1024, | pack_size_bytes: int = 4 * 1024 * 1024 * 1024, | ||||
temp_file_cutoff: int = 100 * 1024 * 1024, | temp_file_cutoff: int = 100 * 1024 * 1024, | ||||
save_data_path: Optional[str] = None, | save_data_path: Optional[str] = None, | ||||
max_content_size: Optional[int] = None, | max_content_size: Optional[int] = None, | ||||
# Number of ids per packfile | |||||
packfile_chunk_size: int = DEFAULT_NUMBER_IDS_TO_FETCH, | |||||
): | ): | ||||
"""Initialize the bulk updater. | """Initialize the bulk updater. | ||||
Args: | Args: | ||||
repo_representation: swh's repository representation | repo_representation: swh's repository representation | ||||
which is in charge of filtering between known and remote | which is in charge of filtering between known and remote | ||||
data. | data. | ||||
""" | """ | ||||
super().__init__( | super().__init__( | ||||
storage=storage, | storage=storage, | ||||
save_data_path=save_data_path, | save_data_path=save_data_path, | ||||
max_content_size=max_content_size, | max_content_size=max_content_size, | ||||
) | ) | ||||
self.origin_url = url | self.origin_url = url | ||||
self.base_url = base_url | self.base_url = base_url | ||||
self.ignore_history = ignore_history | self.ignore_history = ignore_history | ||||
self.repo_representation = repo_representation | self.repo_representation = repo_representation | ||||
self.pack_size_bytes = pack_size_bytes | self.pack_size_bytes = pack_size_bytes | ||||
self.temp_file_cutoff = temp_file_cutoff | self.temp_file_cutoff = temp_file_cutoff | ||||
# state initialized in fetch_data | # state initialized in fetch_data | ||||
self.remote_refs: Dict[bytes, HexBytes] = {} | self.remote_refs: Dict[bytes, HexBytes] = {} | ||||
self.symbolic_refs: Dict[bytes, HexBytes] = {} | self.symbolic_refs: Dict[bytes, HexBytes] = {} | ||||
self.ref_object_types: Dict[bytes, Optional[TargetType]] = {} | self.ref_object_types: Dict[bytes, Optional[TargetType]] = {} | ||||
self.packfile_chunk_size = packfile_chunk_size | |||||
def fetch_pack_from_origin( | def fetch_pack_from_origin( | ||||
self, | self, | ||||
origin_url: str, | origin_url: str, | ||||
base_repo: RepoRepresentation, | base_repo: RepoRepresentation, | ||||
do_activity: Callable[[bytes], None], | do_activity: Callable[[bytes], None], | ||||
) -> FetchPackReturn: | ) -> FetchPackReturn: | ||||
"""Fetch a pack from the origin""" | """Fetch a pack from the origin""" | ||||
▲ Show 20 Lines • Show All 61 Lines • ▼ Show 20 Lines | ) -> FetchPackReturn: | ||||
# not support it and do not fetch any refs | # not support it and do not fetch any refs | ||||
self.dumb = transport_url.startswith("http") and client.dumb | self.dumb = transport_url.startswith("http") and client.dumb | ||||
return FetchPackReturn( | return FetchPackReturn( | ||||
remote_refs=utils.filter_refs(remote_refs), | remote_refs=utils.filter_refs(remote_refs), | ||||
symbolic_refs=utils.filter_refs(symbolic_refs), | symbolic_refs=utils.filter_refs(symbolic_refs), | ||||
pack_buffer=pack_buffer, | pack_buffer=pack_buffer, | ||||
pack_size=pack_size, | pack_size=pack_size, | ||||
continue_loading=not self.base_repo.wanted_refs_fetched(), | |||||
) | ) | ||||
def prepare_origin_visit(self) -> None: | def prepare_origin_visit(self) -> None: | ||||
self.visit_date = datetime.datetime.now(tz=datetime.timezone.utc) | self.visit_date = datetime.datetime.now(tz=datetime.timezone.utc) | ||||
self.origin = Origin(url=self.origin_url) | self.origin = Origin(url=self.origin_url) | ||||
def get_full_snapshot(self, origin_url) -> Optional[Snapshot]: | def get_full_snapshot(self, origin_url) -> Optional[Snapshot]: | ||||
return snapshot_get_latest(self.storage, origin_url) | return snapshot_get_latest(self.storage, origin_url) | ||||
Show All 11 Lines | def prepare(self) -> None: | ||||
if base_origin: | if base_origin: | ||||
prev_snapshot = self.get_full_snapshot(base_origin.url) | prev_snapshot = self.get_full_snapshot(base_origin.url) | ||||
if prev_snapshot is not None: | if prev_snapshot is not None: | ||||
self.base_snapshot = prev_snapshot | self.base_snapshot = prev_snapshot | ||||
else: | else: | ||||
self.base_snapshot = Snapshot(branches={}) | self.base_snapshot = Snapshot(branches={}) | ||||
def fetch_data(self) -> bool: | self.base_repo = self.repo_representation( | ||||
assert self.origin is not None | |||||
base_repo = self.repo_representation( | |||||
storage=self.storage, | storage=self.storage, | ||||
base_snapshot=self.base_snapshot, | base_snapshot=self.base_snapshot, | ||||
ignore_history=self.ignore_history, | ignore_history=self.ignore_history, | ||||
limit=self.packfile_chunk_size, | |||||
) | ) | ||||
def do_progress(msg: bytes) -> None: | def fetch_data(self) -> bool: | ||||
sys.stderr.buffer.write(msg) | continue_loading = False | ||||
sys.stderr.flush() | assert self.origin is not None | ||||
try: | try: | ||||
fetch_info = self.fetch_pack_from_origin( | fetch_info = self.fetch_pack_from_origin( | ||||
self.origin.url, base_repo, do_progress | self.origin.url, self.base_repo, do_print_progress | ||||
) | ) | ||||
continue_loading = fetch_info.continue_loading | |||||
except NotGitRepository as e: | except NotGitRepository as e: | ||||
raise NotFound(e) | raise NotFound(e) | ||||
except GitProtocolError as e: | except GitProtocolError as e: | ||||
# unfortunately, that kind of error is not specific to a not found | # unfortunately, that kind of error is not specific to a not found | ||||
# scenario... It depends on the value of message within the exception. | # scenario... It depends on the value of message within the exception. | ||||
for msg in [ | for msg in [ | ||||
"Repository unavailable", # e.g DMCA takedown | "Repository unavailable", # e.g DMCA takedown | ||||
"Repository not found", | "Repository not found", | ||||
Show All 11 Lines | def fetch_data(self) -> bool: | ||||
self.dumb = dumb.check_protocol(self.origin_url) | self.dumb = dumb.check_protocol(self.origin_url) | ||||
if not self.dumb: | if not self.dumb: | ||||
raise | raise | ||||
logger.debug( | logger.debug( | ||||
"Protocol used for communication: %s", "dumb" if self.dumb else "smart" | "Protocol used for communication: %s", "dumb" if self.dumb else "smart" | ||||
) | ) | ||||
if self.dumb: | if self.dumb: | ||||
self.dumb_fetcher = dumb.GitObjectsFetcher(self.origin_url, base_repo) | self.dumb_fetcher = dumb.GitObjectsFetcher(self.origin_url, self.base_repo) | ||||
self.dumb_fetcher.fetch_object_ids() | self.dumb_fetcher.fetch_object_ids() | ||||
self.remote_refs = utils.filter_refs(self.dumb_fetcher.refs) # type: ignore | self.remote_refs = utils.filter_refs(self.dumb_fetcher.refs) # type: ignore | ||||
self.symbolic_refs = self.dumb_fetcher.head | self.symbolic_refs = self.dumb_fetcher.head | ||||
else: | else: | ||||
self.pack_buffer = fetch_info.pack_buffer | self.pack_buffer = fetch_info.pack_buffer | ||||
self.pack_size = fetch_info.pack_size | self.pack_size = fetch_info.pack_size | ||||
self.remote_refs = fetch_info.remote_refs | self.remote_refs.update(fetch_info.remote_refs) | ||||
self.symbolic_refs = fetch_info.symbolic_refs | self.symbolic_refs.update(fetch_info.symbolic_refs) | ||||
self.ref_object_types = {sha1: None for sha1 in self.remote_refs.values()} | for sha1 in self.remote_refs.values(): | ||||
if sha1 in self.ref_object_types: | |||||
continue | |||||
self.ref_object_types[sha1] = None | |||||
logger.info( | logger.info( | ||||
"Listed %d refs for repo %s", | "Listed %d refs for repo %s", | ||||
len(self.remote_refs), | len(self.remote_refs), | ||||
self.origin.url, | self.origin.url, | ||||
extra={ | extra={ | ||||
"swh_type": "git_repo_list_refs", | "swh_type": "git_repo_list_refs", | ||||
"swh_repo": self.origin.url, | "swh_repo": self.origin.url, | ||||
"swh_num_refs": len(self.remote_refs), | "swh_num_refs": len(self.remote_refs), | ||||
}, | }, | ||||
) | ) | ||||
# No more data to fetch | return continue_loading | ||||
return False | |||||
def save_data(self) -> None: | def save_data(self) -> None: | ||||
"""Store a pack for archival""" | """Store a pack for archival""" | ||||
assert isinstance(self.visit_date, datetime.datetime) | assert isinstance(self.visit_date, datetime.datetime) | ||||
write_size = 8192 | write_size = 8192 | ||||
pack_dir = self.get_save_data_path() | pack_dir = self.get_save_data_path() | ||||
pack_name = "%s.pack" % self.visit_date.isoformat() | pack_name = "%s.pack" % self.visit_date.isoformat() | ||||
refs_name = "%s.refs" % self.visit_date.isoformat() | refs_name = "%s.refs" % self.visit_date.isoformat() | ||||
with open(os.path.join(pack_dir, pack_name), "xb") as f: | with open(os.path.join(pack_dir, pack_name), "xb") as f: | ||||
self.pack_buffer.seek(0) | self.pack_buffer.seek(0) | ||||
while True: | while True: | ||||
r = self.pack_buffer.read(write_size) | r = self.pack_buffer.read(write_size) | ||||
if not r: | if not r: | ||||
break | break | ||||
f.write(r) | f.write(r) | ||||
self.pack_buffer.seek(0) | self.pack_buffer.seek(0) | ||||
with open(os.path.join(pack_dir, refs_name), "xb") as f: | with open(os.path.join(pack_dir, refs_name), "xb") as f: | ||||
pickle.dump(self.remote_refs, f) | pickle.dump(self.remote_refs, f) | ||||
def store_data(self, create_snapshot: bool = False): | |||||
super().store_data(create_snapshot) | |||||
if not self.dumb: | |||||
self.pack_buffer.close() | |||||
def iter_objects(self, object_type: bytes) -> Iterator[ShaFile]: | def iter_objects(self, object_type: bytes) -> Iterator[ShaFile]: | ||||
"""Read all the objects of type `object_type` from the packfile""" | """Read all the objects of type `object_type` from the packfile""" | ||||
if self.dumb: | if self.dumb: | ||||
yield from self.dumb_fetcher.iter_objects(object_type) | yield from self.dumb_fetcher.iter_objects(object_type) | ||||
else: | else: | ||||
self.pack_buffer.seek(0) | self.pack_buffer.seek(0) | ||||
count = 0 | count = 0 | ||||
for obj in PackInflater.for_pack_data( | for obj in PackInflater.for_pack_data( | ||||
▲ Show 20 Lines • Show All 110 Lines • ▼ Show 20 Lines | def get_snapshot(self) -> Snapshot: | ||||
# likely a bug in the loader. | # likely a bug in the loader. | ||||
raise RuntimeError( | raise RuntimeError( | ||||
"Unknown objects referenced by remote refs: %s" | "Unknown objects referenced by remote refs: %s" | ||||
% ( | % ( | ||||
", ".join( | ", ".join( | ||||
f"{name.decode()}: {hashutil.hash_to_hex(obj)}" | f"{name.decode()}: {hashutil.hash_to_hex(obj)}" | ||||
for name, obj in unknown_objects.items() | for name, obj in unknown_objects.items() | ||||
) | ) | ||||
) | ) | ||||
Done Inline ActionsThat warning becomes actually tedious (and possibly strenuous for elasticsearch)... As it will eventually be resolved if we get to the bottom of the repository, do we keep that log instruction? [1] P1194 ardumont: That warning becomes actually tedious (and possibly strenuous for elasticsearch)...
On staging… | |||||
) | ) | ||||
utils.warn_dangling_branches( | utils.warn_dangling_branches( | ||||
branches, dangling_branches, logger, self.origin_url | branches, dangling_branches, logger, self.origin_url | ||||
) | ) | ||||
self.snapshot = Snapshot(branches=branches) | self.snapshot = Snapshot(branches=branches) | ||||
return self.snapshot | return self.snapshot | ||||
Show All 39 Lines |
Do we need this?
I'd very much see this as silently ignored...