diff --git a/swh/dataset/luigi.py b/swh/dataset/luigi.py --- a/swh/dataset/luigi.py +++ b/swh/dataset/luigi.py @@ -39,6 +39,7 @@ ``stamps`` files are written after corresponding directories are written. Their presence indicates the corresponding directory was fully generated/copied. This allows skipping work that was already done, while ignoring interrupted jobs. +They are omitted after the initial export (ie. when downloading to/from other machines). ``meta/export.json`` contains information about the dataset, for provenance tracking. For example: @@ -111,7 +112,7 @@ # control import enum from pathlib import Path -from typing import Hashable, Iterator, List, TypeVar +from typing import Hashable, Iterator, List, TypeVar, Union import luigi @@ -212,6 +213,19 @@ ] +def _export_metadata_has_object_types( + export_metadata: Union[luigi.LocalTarget, "luigi.contrib.s3.S3Target"], + object_types: List[ObjectType], +) -> bool: + import json + + with export_metadata.open() as fd: + meta = json.load(fd) + return set(meta["object_type"]) <= { + object_type.name for object_type in object_types + } + + class ExportGraph(luigi.Task): """Exports the entire graph to the local filesystem. @@ -347,25 +361,20 @@ def output(self) -> List[luigi.Target]: """Returns stamp and meta paths on S3.""" - return self._stamps() + [self._meta()] - - def _stamps(self): - import luigi.contrib.s3 - - return [ - luigi.contrib.s3.S3Target(f"{self.s3_export_path}/{path}") - for path in stamps_paths(self.formats, self.object_types) - ] + return [self._meta()] def _meta(self): import luigi.contrib.s3 return luigi.contrib.s3.S3Target(f"{self.s3_export_path}/meta/export.json") + def complete(self) -> bool: + return super().complete() and _export_metadata_has_object_types( + self._meta(), self.object_types + ) + def run(self) -> None: - """Copies all files: first the export itself, then stamps, then - :file:`meta.json`. - """ + """Copies all files: first the export itself, then :file:`meta.json`.""" import os import luigi.contrib.s3 @@ -390,13 +399,6 @@ local_dir / file_, f"{s3_dir}/{file_}", ACL="public-read" ) - for stamp in stamps_paths(self.formats, self.object_types): - client.put_multipart( - self.local_export_path / stamp, - f"{self.s3_export_path}/{stamp}", - ACL="public-read", - ) - client.put( self.local_export_path / "meta" / "export.json", self._meta().path, @@ -438,28 +440,25 @@ ] def output(self) -> List[luigi.Target]: - """Returns stamp and meta paths on S3.""" - return self._stamps() + [self._meta()] + """Returns stamp and meta paths on the local filesystem.""" + return [self._meta()] - def _stamps(self): - return [ - luigi.LocalTarget(self.local_export_path / path) - for path in stamps_paths(self.formats, self.object_types) - ] + def complete(self) -> bool: + return super().complete() and _export_metadata_has_object_types( + self._meta(), self.object_types + ) def _meta(self): return luigi.LocalTarget(self.local_export_path / "meta" / "export.json") def run(self) -> None: - """Copies all files: first the export itself, then stamps, then - :file:`meta.json`. - """ + """Copies all files: first the export itself, then :file:`meta.json`.""" import luigi.contrib.s3 import tqdm client = luigi.contrib.s3.S3Client() - # recursively copy local files to S3, and end with stamps and export metadata + # recursively copy local files to S3, and end with export metadata for format_ in self.formats: for object_type in self.object_types: local_dir = self.local_export_path / format_.name / object_type.name @@ -479,14 +478,6 @@ str(local_dir / file_), ) - for stamp in stamps_paths(self.formats, self.object_types): - stamp_path = self.local_export_path / stamp - stamp_path.parent.mkdir(parents=True, exist_ok=True) - client.get( - f"{self.s3_export_path}/{stamp}", - str(stamp_path), - ) - export_json_path = self.local_export_path / "meta" / "export.json" export_json_path.parent.mkdir(exist_ok=True) client.get(