Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/git/loader.py
Show All 36 Lines | |||||
from . import converters, dumb, utils | from . import converters, dumb, utils | ||||
from .base import BaseGitLoader | from .base import BaseGitLoader | ||||
from .utils import HexBytes | from .utils import HexBytes | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
heads_logger = logger.getChild("refs") | heads_logger = logger.getChild("refs") | ||||
class RepoRepresentation: | DEFAULT_NUMBER_OF_HEADS_PER_PACKFILE = 200 | ||||
def do_print_progress(msg: bytes) -> None: | |||||
sys.stderr.buffer.write(msg) | |||||
ardumont: Do we need this?
I'd very much see this as silently ignored... | |||||
Done Inline Actions
nvm, let's keep it. ardumont: > Do we need this?
> I'd very much see this as silently ignored...
nvm, let's keep it. | |||||
sys.stderr.flush() | |||||
class BaseRepoRepresentation: | |||||
"""Repository representation for a Software Heritage origin.""" | """Repository representation for a Software Heritage origin.""" | ||||
def __init__( | def __init__( | ||||
self, | self, | ||||
storage, | storage, | ||||
base_snapshots: List[Snapshot] = None, | base_snapshots: Optional[List[Snapshot]] = None, | ||||
incremental: bool = True, | incremental: bool = True, | ||||
statsd: Statsd = None, | statsd: Optional[Statsd] = None, | ||||
**kwargs: Any, # extra kwargs are just ignored | |||||
): | ): | ||||
self.storage = storage | self.storage = storage | ||||
self.incremental = incremental | self.incremental = incremental | ||||
self.statsd = statsd | self.statsd = statsd | ||||
if base_snapshots and incremental: | if base_snapshots and incremental: | ||||
self.base_snapshots: List[Snapshot] = base_snapshots | self.base_snapshots: List[Snapshot] = base_snapshots | ||||
else: | else: | ||||
self.base_snapshots = [] | self.base_snapshots = [] | ||||
# Cache existing heads | # Cache existing heads | ||||
self.local_heads: Set[HexBytes] = set() | self.local_heads: Set[HexBytes] = set() | ||||
heads_logger.debug("Heads known in the archive:") | heads_logger.debug("Heads known in the archive:") | ||||
for base_snapshot in self.base_snapshots: | for base_snapshot in self.base_snapshots: | ||||
for branch_name, branch in base_snapshot.branches.items(): | for branch_name, branch in base_snapshot.branches.items(): | ||||
if not branch or branch.target_type == TargetType.ALIAS: | if not branch or branch.target_type == TargetType.ALIAS: | ||||
continue | continue | ||||
heads_logger.debug(" %r: %s", branch_name, branch.target.hex()) | heads_logger.debug(" %r: %s", branch_name, branch.target.hex()) | ||||
self.local_heads.add(HexBytes(hashutil.hash_to_bytehex(branch.target))) | self.local_heads.add(HexBytes(hashutil.hash_to_bytehex(branch.target))) | ||||
def graph_walker(self) -> ObjectStoreGraphWalker: | self.walker = ObjectStoreGraphWalker(self.local_heads, lambda commit: []) | ||||
return ObjectStoreGraphWalker(self.local_heads, get_parents=lambda commit: []) | |||||
def determine_wants(self, refs: Dict[bytes, HexBytes]) -> List[HexBytes]: | def compute_wanted_refs(self, refs: Dict[bytes, HexBytes]) -> List[HexBytes]: | ||||
"""Get the list of bytehex sha1s that the git loader should fetch. | """Get the list of bytehex sha1s that the git loader should fetch. | ||||
This compares the remote refs sent by the server with the base snapshot | This compares the remote refs sent by the server with the base snapshot | ||||
provided by the loader. | provided by the loader. | ||||
""" | """ | ||||
if not refs: | if not refs: | ||||
return [] | return [] | ||||
if heads_logger.isEnabledFor(logging.DEBUG): | if heads_logger.isEnabledFor(logging.DEBUG): | ||||
heads_logger.debug("Heads returned by the git remote:") | heads_logger.debug("Heads returned by the git remote:") | ||||
for name, value in refs.items(): | for name, value in refs.items(): | ||||
heads_logger.debug(" %r: %s", name, value.decode()) | heads_logger.debug(" %r: %s", name, value.decode()) | ||||
# Get the remote heads that we want to fetch | # Get the remote heads that we want to fetch | ||||
remote_heads: Set[HexBytes] = set() | remote_heads: Set[HexBytes] = set() | ||||
for ref_name, ref_target in refs.items(): | for ref_name, ref_target in refs.items(): | ||||
if utils.ignore_branch_name(ref_name): | if utils.ignore_branch_name(ref_name): | ||||
continue | continue | ||||
remote_heads.add(ref_target) | remote_heads.add(ref_target) | ||||
if heads_logger.isEnabledFor(logging.DEBUG): | |||||
heads_logger.debug("Filtered remote heads:") | |||||
for value in remote_heads: | |||||
heads_logger.debug(" %s", value.decode()) | |||||
logger.debug("local_heads_count=%s", len(self.local_heads)) | logger.debug("local_heads_count=%s", len(self.local_heads)) | ||||
logger.debug("remote_heads_count=%s", len(remote_heads)) | logger.debug("remote_heads_count=%s", len(remote_heads)) | ||||
wanted_refs = list(remote_heads - self.local_heads) | wanted_refs = list(remote_heads - self.local_heads) | ||||
logger.debug("wanted_refs_count=%s", len(wanted_refs)) | logger.debug("wanted_refs_count=%s", len(wanted_refs)) | ||||
if self.statsd is not None: | if self.statsd is not None: | ||||
self.statsd.histogram( | self.statsd.histogram( | ||||
"git_ignored_refs_percent", | "git_ignored_refs_percent", | ||||
len(remote_heads - set(refs.values())) / len(refs), | len(remote_heads - set(refs.values())) / len(refs), | ||||
tags={}, | tags={}, | ||||
) | ) | ||||
self.statsd.histogram( | self.statsd.histogram( | ||||
"git_known_refs_percent", | "git_known_refs_percent", | ||||
len(self.local_heads & remote_heads) / len(remote_heads), | len(self.local_heads & remote_heads) / len(remote_heads), | ||||
tags={}, | tags={}, | ||||
) | ) | ||||
return wanted_refs | return wanted_refs | ||||
def confinue_fetch_refs(self) -> bool: | |||||
"""Determine whether we are done fetching all refs.""" | |||||
return False | |||||
def determine_wants( | |||||
self, refs: Dict[bytes, HexBytes], depth=None | |||||
) -> List[HexBytes]: | |||||
"""Get the list of bytehex sha1s that the git loader should fetch. | |||||
This compares the remote refs sent by the server with the base snapshot | |||||
provided by the loader. | |||||
""" | |||||
raise NotImplementedError | |||||
class RepoRepresentation(BaseRepoRepresentation): | |||||
"""A RepoRepresentation object able to provide all refs to fetch. | |||||
Internally, this computes the full list of wanted_refs and returns it the first time | |||||
:meth:`determine_wants` method is called. It's expected to be called once. The | |||||
caller has then all the necessary refs to retrieve the packfile. | |||||
""" | |||||
def determine_wants( | |||||
self, refs: Dict[bytes, HexBytes], depth=None | |||||
) -> List[HexBytes]: | |||||
"""Get the list of bytehex sha1s that the git loader should fetch. | |||||
This compares the remote refs sent by the server with the base snapshot | |||||
provided by the loader. | |||||
""" | |||||
return self.compute_wanted_refs(refs) | |||||
class RepoRepresentationPaginated(BaseRepoRepresentation): | |||||
"""A RepoRepresentation objects able to provide interval of refs to fetch. | |||||
Internally, this computes the full list of wanted_refs but then provide interval of | |||||
number_of_heads_per_packfile refs each time :meth:`determine_wants` method is | |||||
called. This expects the caller to call the :meth:`continue_fetch_refs` method to | |||||
determine if more refs are needed to be fetched or not. | |||||
""" | |||||
def __init__( | |||||
self, | |||||
*args, | |||||
number_of_heads_per_packfile=DEFAULT_NUMBER_OF_HEADS_PER_PACKFILE, | |||||
**kwargs, | |||||
): | |||||
super().__init__(*args, **kwargs) | |||||
# Pagination index | |||||
self.index: int = 0 | |||||
self.number_of_heads_per_packfile = number_of_heads_per_packfile | |||||
self.wanted_refs: Optional[List[HexBytes]] = None | |||||
self.previous_refs: List[HexBytes] = [] | |||||
def confinue_fetch_refs(self) -> bool: | |||||
"""Determine whether we need to fetch other refs or not.""" | |||||
return self.wanted_refs is None or self.index < len(self.wanted_refs) | |||||
def determine_wants( | |||||
self, refs: Dict[bytes, HexBytes], depth=None | |||||
) -> List[HexBytes]: | |||||
"""Get the list of bytehex sha1s that the git loader should fetch. | |||||
This compares the remote refs sent by the server with the base snapshot | |||||
provided by the loader. | |||||
""" | |||||
# First time around, we'll initialize all the wanted refs | |||||
if not self.wanted_refs: | |||||
self.wanted_refs = self.compute_wanted_refs(refs) | |||||
# If empty, then we are done | |||||
if not self.wanted_refs: | |||||
return [] | |||||
# We have all wanted refs but we are ingesting them one interval of | |||||
# number_of_heads_per_packfile refs at a time | |||||
start = self.index | |||||
self.index += self.number_of_heads_per_packfile | |||||
assert self.wanted_refs | |||||
asked_refs = self.wanted_refs[start : min(self.index, len(self.wanted_refs))] | |||||
if heads_logger.isEnabledFor(logging.DEBUG): | |||||
heads_logger.debug("Asked remote heads:") | |||||
for value in asked_refs: | |||||
heads_logger.debug(" %s", value.decode()) | |||||
if start > 0: | |||||
# Previous refs was already walked so we can remove them from the next walk | |||||
# iteration to avoid processing them again | |||||
self.walker.heads.update(self.previous_refs) | |||||
self.previous_refs = asked_refs | |||||
logger.debug("asked_refs_count=%s", len(asked_refs)) | |||||
return asked_refs | |||||
@dataclass | @dataclass | ||||
class FetchPackReturn: | class FetchPackReturn: | ||||
remote_refs: Dict[bytes, HexBytes] | remote_refs: Dict[bytes, HexBytes] | ||||
symbolic_refs: Dict[bytes, HexBytes] | symbolic_refs: Dict[bytes, HexBytes] | ||||
pack_buffer: SpooledTemporaryFile | pack_buffer: SpooledTemporaryFile | ||||
pack_size: int | pack_size: int | ||||
confinue_fetch_refs: bool | |||||
"""Determine whether we still have to fetch remaining references.""" | |||||
class GitLoader(BaseGitLoader): | class GitLoader(BaseGitLoader): | ||||
"""A bulk loader for a git repository | """A bulk loader for a git repository | ||||
Emits the following statsd stats: | Emits the following statsd stats: | ||||
* increments ``swh_loader_git`` | * increments ``swh_loader_git`` | ||||
Show All 17 Lines | class GitLoader(BaseGitLoader): | ||||
visit_type = "git" | visit_type = "git" | ||||
def __init__( | def __init__( | ||||
self, | self, | ||||
storage: StorageInterface, | storage: StorageInterface, | ||||
url: str, | url: str, | ||||
incremental: bool = True, | incremental: bool = True, | ||||
repo_representation: Type[RepoRepresentation] = RepoRepresentation, | |||||
pack_size_bytes: int = 4 * 1024 * 1024 * 1024, | pack_size_bytes: int = 4 * 1024 * 1024 * 1024, | ||||
temp_file_cutoff: int = 100 * 1024 * 1024, | temp_file_cutoff: int = 100 * 1024 * 1024, | ||||
fetch_multiple_packfiles: bool = False, | |||||
number_of_heads_per_packfile: int = DEFAULT_NUMBER_OF_HEADS_PER_PACKFILE, | |||||
**kwargs: Any, | **kwargs: Any, | ||||
): | ): | ||||
"""Initialize the bulk updater. | """Initialize the bulk updater. | ||||
Args: | Args: | ||||
repo_representation: swh's repository representation | |||||
which is in charge of filtering between known and remote | |||||
data. | |||||
... | |||||
incremental: If True, the default, this starts from the last known snapshot | incremental: If True, the default, this starts from the last known snapshot | ||||
(if any) references. Otherwise, this loads the full repository. | (if any) references. Otherwise, this loads the full repository. | ||||
fetch_multiple_packfiles: If True, this ingests the repository using | |||||
(internally) multiple packfiles (creating partial incremental snapshots | |||||
along the way). When False, the default, this uses the existing ingestion | |||||
policy of retrieving one packfile to ingest. | |||||
number_of_heads_per_packfile: When fetch_multiple_packfiles is used, this | |||||
splits packfiles per a given number of heads (no guarantee on the used | |||||
size). | |||||
""" | """ | ||||
super().__init__(storage=storage, origin_url=url, **kwargs) | super().__init__(storage=storage, origin_url=url, **kwargs) | ||||
# check if repository only supports git dumb transfer protocol, | |||||
# fetched pack file will be empty in that case as dulwich do | |||||
# not support it and do not fetch any refs | |||||
logger.debug("Transport url to communicate with server: %s", url) | |||||
self.client, self.path = dulwich.client.get_transport_and_path( | |||||
url, thin_packs=False | |||||
) | |||||
logger.debug("Client %s to fetch pack at %s", self.client, self.path) | |||||
self.dumb = url.startswith("http") and getattr(self.client, "dumb", False) | |||||
self.fetch_multiple_packfiles = fetch_multiple_packfiles | |||||
self.incremental = incremental | self.incremental = incremental | ||||
self.repo_representation = repo_representation | |||||
self.pack_size_bytes = pack_size_bytes | self.pack_size_bytes = pack_size_bytes | ||||
self.temp_file_cutoff = temp_file_cutoff | self.temp_file_cutoff = temp_file_cutoff | ||||
# state initialized in fetch_data | # state initialized in fetch_data | ||||
self.remote_refs: Dict[bytes, HexBytes] = {} | self.remote_refs: Dict[bytes, HexBytes] = {} | ||||
self.symbolic_refs: Dict[bytes, HexBytes] = {} | self.symbolic_refs: Dict[bytes, HexBytes] = {} | ||||
self.ref_object_types: Dict[bytes, Optional[TargetType]] = {} | self.ref_object_types: Dict[bytes, Optional[TargetType]] = {} | ||||
self.configure_packfile_fetching_policy( | |||||
fetch_multiple_packfiles, number_of_heads_per_packfile | |||||
) | |||||
def configure_packfile_fetching_policy( | |||||
self, fetch_multiple_packfiles: bool, number_of_heads_per_packfile: int | |||||
): | |||||
"""Configure the packfile fetching policy. The default is to fetch one packfile | |||||
to ingest everything unknown out of it. When fetch_multiple_packfiles is True | |||||
(and the ingestion passes through the smart protocol), the ingestion uses | |||||
packfiles (with a given number_of_heads_per_packfile). After each packfile is | |||||
loaded, a 'partial' (because incomplete) and 'incremental' (as in gathering seen | |||||
refs) so far snapshot is created (incremental). | |||||
""" | |||||
# will create partial snapshot alongside fetching multiple packfiles (when the | |||||
# transfer protocol is not the 'dumb' one) | |||||
self.create_partial_snapshot = not self.dumb and fetch_multiple_packfiles | |||||
self.number_of_heads_per_packfile: Optional[int] = None | |||||
self.repo_representation: Type[BaseRepoRepresentation] | |||||
if self.create_partial_snapshot: | |||||
self.repo_representation = RepoRepresentationPaginated | |||||
self.number_of_heads_per_packfile = number_of_heads_per_packfile | |||||
else: | |||||
self.repo_representation = RepoRepresentation | |||||
def fetch_pack_from_origin( | def fetch_pack_from_origin( | ||||
self, | self, | ||||
origin_url: str, | origin_url: str, | ||||
base_repo: RepoRepresentation, | |||||
do_activity: Callable[[bytes], None], | do_activity: Callable[[bytes], None], | ||||
) -> FetchPackReturn: | ) -> FetchPackReturn: | ||||
"""Fetch a pack from the origin""" | """Fetch a pack from the origin""" | ||||
pack_buffer = SpooledTemporaryFile(max_size=self.temp_file_cutoff) | pack_buffer = SpooledTemporaryFile(max_size=self.temp_file_cutoff) | ||||
transport_url = origin_url | |||||
logger.debug("Transport url to communicate with server: %s", transport_url) | |||||
client, path = dulwich.client.get_transport_and_path( | |||||
transport_url, thin_packs=False | |||||
) | |||||
logger.debug("Client %s to fetch pack at %s", client, path) | |||||
size_limit = self.pack_size_bytes | size_limit = self.pack_size_bytes | ||||
def do_pack(data: bytes) -> None: | def do_pack(data: bytes) -> None: | ||||
cur_size = pack_buffer.tell() | cur_size = pack_buffer.tell() | ||||
would_write = len(data) | would_write = len(data) | ||||
if cur_size + would_write > size_limit: | if cur_size + would_write > size_limit: | ||||
raise IOError( | raise IOError( | ||||
f"Pack file too big for repository {origin_url}, " | f"Pack file too big for repository {origin_url}, " | ||||
f"limit is {size_limit} bytes, current size is {cur_size}, " | f"number_of_heads_per_packfile is {size_limit} bytes, " | ||||
f"current size is {cur_size}, " | |||||
f"would write {would_write}" | f"would write {would_write}" | ||||
) | ) | ||||
pack_buffer.write(data) | pack_buffer.write(data) | ||||
pack_result = client.fetch_pack( | pack_result = self.client.fetch_pack( | ||||
path, | self.path, | ||||
base_repo.determine_wants, | self.base_repo.determine_wants, | ||||
base_repo.graph_walker(), | self.base_repo.walker, | ||||
do_pack, | do_pack, | ||||
progress=do_activity, | progress=do_activity, | ||||
) | ) | ||||
remote_refs = pack_result.refs or {} | remote_refs = pack_result.refs or {} | ||||
symbolic_refs = pack_result.symrefs or {} | symbolic_refs = pack_result.symrefs or {} | ||||
pack_buffer.flush() | pack_buffer.flush() | ||||
pack_size = pack_buffer.tell() | pack_size = pack_buffer.tell() | ||||
pack_buffer.seek(0) | pack_buffer.seek(0) | ||||
logger.debug("fetched_pack_size=%s", pack_size) | logger.debug("fetched_pack_size=%s", pack_size) | ||||
# check if repository only supports git dumb transfer protocol, | |||||
# fetched pack file will be empty in that case as dulwich do | |||||
# not support it and do not fetch any refs | |||||
self.dumb = transport_url.startswith("http") and getattr(client, "dumb", False) | |||||
return FetchPackReturn( | return FetchPackReturn( | ||||
remote_refs=utils.filter_refs(remote_refs), | remote_refs=remote_refs, | ||||
symbolic_refs=utils.filter_refs(symbolic_refs), | symbolic_refs=symbolic_refs, | ||||
pack_buffer=pack_buffer, | pack_buffer=pack_buffer, | ||||
pack_size=pack_size, | pack_size=pack_size, | ||||
confinue_fetch_refs=self.base_repo.confinue_fetch_refs(), | |||||
) | ) | ||||
def get_full_snapshot(self, origin_url) -> Optional[Snapshot]: | def get_full_snapshot(self, origin_url) -> Optional[Snapshot]: | ||||
return snapshot_get_latest(self.storage, origin_url) | return snapshot_get_latest(self.storage, origin_url) | ||||
def prepare(self) -> None: | def prepare(self) -> None: | ||||
assert self.origin is not None | assert self.origin is not None | ||||
Show All 24 Lines | def prepare(self) -> None: | ||||
if parent_snapshot is not None: | if parent_snapshot is not None: | ||||
self.statsd.constant_tags["has_parent_snapshot"] = True | self.statsd.constant_tags["has_parent_snapshot"] = True | ||||
self.base_snapshots.append(parent_snapshot) | self.base_snapshots.append(parent_snapshot) | ||||
# Increments a metric with full name 'swh_loader_git'; which is useful to | # Increments a metric with full name 'swh_loader_git'; which is useful to | ||||
# count how many runs of the loader are with each incremental mode | # count how many runs of the loader are with each incremental mode | ||||
self.statsd.increment("git_total", tags={}) | self.statsd.increment("git_total", tags={}) | ||||
def fetch_data(self) -> bool: | self.base_repo = self.repo_representation( | ||||
assert self.origin is not None | |||||
base_repo = self.repo_representation( | |||||
storage=self.storage, | storage=self.storage, | ||||
base_snapshots=self.base_snapshots, | base_snapshots=self.base_snapshots, | ||||
incremental=self.incremental, | incremental=self.incremental, | ||||
statsd=self.statsd, | statsd=self.statsd, | ||||
# Only used when self.repo_representation is RepoRepresentationpaginated, | |||||
# ignored otherwise | |||||
number_of_heads_per_packfile=self.number_of_heads_per_packfile, | |||||
) | ) | ||||
def do_progress(msg: bytes) -> None: | def fetch_data(self) -> bool: | ||||
sys.stderr.buffer.write(msg) | continue_fetch_refs = False | ||||
sys.stderr.flush() | assert self.origin is not None | ||||
try: | try: | ||||
fetch_info = self.fetch_pack_from_origin( | fetch_info = self.fetch_pack_from_origin(self.origin.url, do_print_progress) | ||||
self.origin.url, base_repo, do_progress | continue_fetch_refs = fetch_info.confinue_fetch_refs | ||||
) | |||||
except (dulwich.client.HTTPUnauthorized, NotGitRepository) as e: | except (dulwich.client.HTTPUnauthorized, NotGitRepository) as e: | ||||
raise NotFound(e) | raise NotFound(e) | ||||
except GitProtocolError as e: | except GitProtocolError as e: | ||||
# unfortunately, that kind of error is not specific to a not found | # unfortunately, that kind of error is not specific to a not found | ||||
# scenario... It depends on the value of message within the exception. | # scenario... It depends on the value of message within the exception. | ||||
for msg in [ | for msg in [ | ||||
"Repository unavailable", # e.g DMCA takedown | "Repository unavailable", # e.g DMCA takedown | ||||
"Repository not found", | "Repository not found", | ||||
"unexpected http resp 401", | "unexpected http resp 401", | ||||
]: | ]: | ||||
if msg in e.args[0]: | if msg in e.args[0]: | ||||
raise NotFound(e) | raise NotFound(e) | ||||
# otherwise transmit the error | # otherwise transmit the error | ||||
raise | raise | ||||
except (AttributeError, NotImplementedError, ValueError): | except (AttributeError, NotImplementedError, ValueError): | ||||
# with old dulwich versions, those exceptions types can be raised | # with old dulwich versions, those exceptions types can be raised | ||||
# by the fetch_pack operation when encountering a repository with | # by the fetch_pack operation when encountering a repository with | ||||
# dumb transfer protocol so we check if the repository supports it | # dumb transfer protocol so we check if the repository supports it | ||||
# here to continue the loading if it is the case | # here to continue the loading if it is the case | ||||
self.dumb = dumb.check_protocol(self.origin.url) | self.dumb = dumb.check_protocol(self.origin.url) | ||||
if not self.dumb: | if not self.dumb: | ||||
raise | raise | ||||
logger.debug( | |||||
"Protocol used for communication: %s", "dumb" if self.dumb else "smart" | |||||
) | |||||
if self.dumb: | if self.dumb: | ||||
self.dumb_fetcher = dumb.GitObjectsFetcher(self.origin.url, base_repo) | protocol = "dumb" | ||||
self.dumb_fetcher = dumb.GitObjectsFetcher(self.origin.url, self.base_repo) | |||||
self.dumb_fetcher.fetch_object_ids() | self.dumb_fetcher.fetch_object_ids() | ||||
self.remote_refs = utils.filter_refs(self.dumb_fetcher.refs) | remote_refs = self.dumb_fetcher.refs | ||||
self.symbolic_refs = utils.filter_refs(self.dumb_fetcher.head) | symbolic_refs = self.dumb_fetcher.head | ||||
else: | else: | ||||
protocol = "smart" | |||||
self.pack_buffer = fetch_info.pack_buffer | self.pack_buffer = fetch_info.pack_buffer | ||||
self.pack_size = fetch_info.pack_size | self.pack_size = fetch_info.pack_size | ||||
self.remote_refs = fetch_info.remote_refs | remote_refs = fetch_info.remote_refs | ||||
self.symbolic_refs = fetch_info.symbolic_refs | symbolic_refs = fetch_info.symbolic_refs | ||||
logger.debug("Protocol used for communication: %s", protocol) | |||||
# So the partial snapshot and the final ones creates the full branches | |||||
self.remote_refs.update(utils.filter_refs(remote_refs)) | |||||
self.symbolic_refs.update(utils.filter_refs(symbolic_refs)) | |||||
self.ref_object_types = {sha1: None for sha1 in self.remote_refs.values()} | for sha1 in self.remote_refs.values(): | ||||
if sha1 in self.ref_object_types: | |||||
continue | |||||
self.ref_object_types[sha1] = None | |||||
logger.info( | logger.info( | ||||
"Listed %d refs for repo %s", | "Listed %d refs for repo %s", | ||||
len(self.remote_refs), | len(self.remote_refs), | ||||
self.origin.url, | self.origin.url, | ||||
extra={ | extra={ | ||||
"swh_type": "git_repo_list_refs", | "swh_type": "git_repo_list_refs", | ||||
"swh_repo": self.origin.url, | "swh_repo": self.origin.url, | ||||
"swh_num_refs": len(self.remote_refs), | "swh_num_refs": len(self.remote_refs), | ||||
}, | }, | ||||
) | ) | ||||
# No more data to fetch | return continue_fetch_refs | ||||
return False | |||||
def save_data(self) -> None: | def save_data(self) -> None: | ||||
"""Store a pack for archival""" | """Store a pack for archival""" | ||||
assert isinstance(self.visit_date, datetime.datetime) | assert isinstance(self.visit_date, datetime.datetime) | ||||
write_size = 8192 | write_size = 8192 | ||||
pack_dir = self.get_save_data_path() | pack_dir = self.get_save_data_path() | ||||
pack_name = "%s.pack" % self.visit_date.isoformat() | pack_name = "%s.pack" % self.visit_date.isoformat() | ||||
refs_name = "%s.refs" % self.visit_date.isoformat() | refs_name = "%s.refs" % self.visit_date.isoformat() | ||||
with open(os.path.join(pack_dir, pack_name), "xb") as f: | with open(os.path.join(pack_dir, pack_name), "xb") as f: | ||||
self.pack_buffer.seek(0) | self.pack_buffer.seek(0) | ||||
while True: | while True: | ||||
r = self.pack_buffer.read(write_size) | r = self.pack_buffer.read(write_size) | ||||
if not r: | if not r: | ||||
break | break | ||||
f.write(r) | f.write(r) | ||||
self.pack_buffer.seek(0) | self.pack_buffer.seek(0) | ||||
with open(os.path.join(pack_dir, refs_name), "xb") as f: | with open(os.path.join(pack_dir, refs_name), "xb") as f: | ||||
pickle.dump(self.remote_refs, f) | pickle.dump(self.remote_refs, f) | ||||
def build_partial_snapshot(self) -> Optional[Snapshot]: | |||||
# Current implementation makes it a simple call to existing :meth:`get_snapshot` | |||||
return self.get_snapshot() | |||||
def store_data(self): | |||||
"""Override the default implementation so we make sure to close the pack_buffer | |||||
if we use one in between loop (dumb loader does not actually one for example). | |||||
""" | |||||
super().store_data() | |||||
if not self.dumb: | |||||
self.pack_buffer.close() | |||||
def iter_objects(self, object_type: bytes) -> Iterator[ShaFile]: | def iter_objects(self, object_type: bytes) -> Iterator[ShaFile]: | ||||
"""Read all the objects of type `object_type` from the packfile""" | """Read all the objects of type `object_type` from the packfile""" | ||||
if self.dumb: | if self.dumb: | ||||
yield from self.dumb_fetcher.iter_objects(object_type) | yield from self.dumb_fetcher.iter_objects(object_type) | ||||
else: | else: | ||||
self.pack_buffer.seek(0) | self.pack_buffer.seek(0) | ||||
count = 0 | count = 0 | ||||
for obj in PackInflater.for_pack_data( | for obj in PackInflater.for_pack_data( | ||||
▲ Show 20 Lines • Show All 151 Lines • ▼ Show 20 Lines | def get_snapshot(self) -> Snapshot: | ||||
targets_unknown = missing | targets_unknown = missing | ||||
if not targets_unknown: | if not targets_unknown: | ||||
break | break | ||||
if unknown_objects: | if unknown_objects: | ||||
# This object was referenced by the server; We did not fetch | # This object was referenced by the server; We did not fetch | ||||
# it, and we do not know it from the previous snapshot. This is | # it, and we do not know it from the previous snapshot. This is | ||||
# likely a bug in the loader. | # possible as we allow partial snapshot | ||||
raise RuntimeError( | logger.warning( | ||||
"Unknown objects referenced by remote refs: %s" | "Unknown objects referenced by remote refs: %s", | ||||
% ( | ( | ||||
", ".join( | ", ".join( | ||||
f"{name.decode()}: {hashutil.hash_to_hex(obj)}" | f"{name.decode()}: {hashutil.hash_to_hex(obj)}" | ||||
for name, obj in unknown_objects.items() | for name, obj in unknown_objects.items() | ||||
) | ) | ||||
) | ), | ||||
Done Inline ActionsThat warning becomes actually tedious (and possibly strenuous for elasticsearch)... As it will eventually be resolved if we get to the bottom of the repository, do we keep that log instruction? [1] P1194 ardumont: That warning becomes actually tedious (and possibly strenuous for elasticsearch)...
On staging… | |||||
) | ) | ||||
utils.warn_dangling_branches( | utils.warn_dangling_branches( | ||||
branches, dangling_branches, logger, self.origin.url | branches, dangling_branches, logger, self.origin.url | ||||
) | ) | ||||
self.snapshot = Snapshot(branches=branches) | self.snapshot = Snapshot(branches=branches) | ||||
return self.snapshot | return self.snapshot | ||||
▲ Show 20 Lines • Show All 44 Lines • Show Last 20 Lines |
Do we need this?
I'd very much see this as silently ignored...