Changeset View
Changeset View
Standalone View
Standalone View
swh/dataset/luigi.py
Show First 20 Lines • Show All 480 Lines • ▼ Show 20 Lines | def run(self) -> None: | ||||
export_json_path = self.local_export_path / "meta" / "export.json" | export_json_path = self.local_export_path / "meta" / "export.json" | ||||
export_json_path.parent.mkdir(exist_ok=True) | export_json_path.parent.mkdir(exist_ok=True) | ||||
client.get( | client.get( | ||||
f"{self.s3_export_path}/meta/export.json", | f"{self.s3_export_path}/meta/export.json", | ||||
self._meta().path, | self._meta().path, | ||||
) | ) | ||||
class LocalExport(luigi.Task): | |||||
"""Task that depends on a local dataset being present -- either directly from | |||||
:class:`ExportGraph` or via :class:`DownloadFromS3`. | |||||
""" | |||||
local_export_path = PathParameter(is_dir=True) | |||||
formats = luigi.EnumListParameter(enum=Format, batch_method=merge_lists) | |||||
object_types = luigi.EnumListParameter( | |||||
enum=ObjectType, default=list(ObjectType), batch_method=merge_lists | |||||
) | |||||
export_task_type = luigi.TaskParameter( | |||||
default=DownloadFromS3, | |||||
significant=False, | |||||
description="""The task used to get the dataset if it is not present. | |||||
Should be either ``swh.dataset.luigi.ExportGraph`` or | |||||
ardumont: typo here, maybe this is more correct? | |||||
``swh.dataset.luigi.DownloadFromS3``.""", | |||||
) | |||||
def requires(self) -> List[luigi.Task]: | |||||
"""Returns an instance of either :class:`ExportGraph` or :class:`DownloadFromS3` | |||||
depending on the value of :attr:`export_task_type`.""" | |||||
if issubclass(self.export_task_type, ExportGraph): | |||||
return [ | |||||
ExportGraph( | |||||
local_export_path=self.local_export_path, | |||||
formats=self.formats, | |||||
object_types=self.object_types, | |||||
) | |||||
] | |||||
elif issubclass(self.export_task_type, DownloadFromS3): | |||||
return [ | |||||
DownloadFromS3( | |||||
local_export_path=self.local_export_path, | |||||
formats=self.formats, | |||||
object_types=self.object_types, | |||||
) | |||||
] | |||||
else: | |||||
raise ValueError( | |||||
f"Unexpected export_task_type: {self.export_task_type.__name__}" | |||||
) | |||||
def output(self) -> List[luigi.Target]: | |||||
"""Returns stamp and meta paths on the local filesystem.""" | |||||
return [self._meta()] | |||||
def _meta(self): | |||||
return luigi.LocalTarget(self.local_export_path / "meta" / "export.json") | |||||
def complete(self) -> bool: | |||||
return super().complete() and _export_metadata_has_object_types( | |||||
self._meta(), self.object_types | |||||
) | |||||
class AthenaDatabaseTarget(luigi.Target): | class AthenaDatabaseTarget(luigi.Target): | ||||
"""Target for the existence of a database on Athena.""" | """Target for the existence of a database on Athena.""" | ||||
def __init__(self, name: str): | def __init__(self, name: str): | ||||
self.name = name | self.name = name | ||||
def exists(self) -> bool: | def exists(self) -> bool: | ||||
import boto3 | import boto3 | ||||
▲ Show 20 Lines • Show All 120 Lines • Show Last 20 Lines |
typo here, maybe this is more correct?