diff --git a/swh/dataset/luigi.py b/swh/dataset/luigi.py --- a/swh/dataset/luigi.py +++ b/swh/dataset/luigi.py @@ -486,6 +486,62 @@ ) +class LocalExport(luigi.Task): + """Task that depends on a local dataset being present -- either directly from + :class:`ExportGraph` or via :class:`DownloadFromS3`. + """ + + local_export_path = PathParameter(is_dir=True) + formats = luigi.EnumListParameter(enum=Format, batch_method=merge_lists) + object_types = luigi.EnumListParameter( + enum=ObjectType, default=list(ObjectType), batch_method=merge_lists + ) + export_task_type = luigi.TaskParameter( + default=DownloadFromS3, + significant=False, + description="""The task used to get the dataset if it is not present. + Should be either ``swh.dataset.luigi.ExportGraph`` or + ``swh.dataset.luigi.DownloadFromS3``.""", + ) + + def requires(self) -> List[luigi.Task]: + """Returns an instance of either :class:`ExportGraph` or :class:`DownloadFromS3` + depending on the value of :attr:`export_task_type`.""" + + if issubclass(self.export_task_type, ExportGraph): + return [ + ExportGraph( + local_export_path=self.local_export_path, + formats=self.formats, + object_types=self.object_types, + ) + ] + elif issubclass(self.export_task_type, DownloadFromS3): + return [ + DownloadFromS3( + local_export_path=self.local_export_path, + formats=self.formats, + object_types=self.object_types, + ) + ] + else: + raise ValueError( + f"Unexpected export_task_type: {self.export_task_type.__name__}" + ) + + def output(self) -> List[luigi.Target]: + """Returns stamp and meta paths on the local filesystem.""" + return [self._meta()] + + def _meta(self): + return luigi.LocalTarget(self.local_export_path / "meta" / "export.json") + + def complete(self) -> bool: + return super().complete() and _export_metadata_has_object_types( + self._meta(), self.object_types + ) + + class AthenaDatabaseTarget(luigi.Target): """Target for the existence of a database on Athena."""