diff --git a/swh/dataset/luigi.py b/swh/dataset/luigi.py --- a/swh/dataset/luigi.py +++ b/swh/dataset/luigi.py @@ -97,14 +97,14 @@ config=graph.staging.yml processes=16 - [RunAll] + [RunExportAll] formats=edges,orc s3_athena_output_location=s3://vlorentz-test2/tmp/athena-output/ And run this command, for example:: - luigi --log-level INFO --local-scheduler --module swh.dataset.luigi RunAll \ - --UploadToS3-local-export-path=/poolswh/softwareheritage/2022-11-09_staging/ \ + luigi --log-level INFO --local-scheduler --module swh.dataset.luigi RunExportAll \ + --UploadExportToS3-local-export-path=/poolswh/softwareheritage/2022-11-09_staging/ \ --s3-export-path=s3://vlorentz-test2/vlorentz_2022-11-09_staging/ \ --athena-db-name=vlorentz_20221109_staging @@ -341,14 +341,13 @@ json.dump(meta, fd, indent=4) -class UploadToS3(luigi.Task): +class UploadExportToS3(luigi.Task): """Uploads a local dataset export to S3; creating automatically if it does not exist. Example invocation:: - luigi --local-scheduler --module swh.dataset.luigi UploadToS3 \ - --config=graph.prod.yml \ + luigi --local-scheduler --module swh.dataset.luigi UploadExportToS3 \ --local-export-path=export/ \ --formats=edges \ --s3-export-path=s3://softwareheritage/graph/swh_2022-11-08 @@ -419,15 +418,14 @@ ) -class DownloadFromS3(luigi.Task): +class DownloadExportFromS3(luigi.Task): """Downloads a local dataset export from S3. - This performs the inverse operation of :class:`UploadToS3` + This performs the inverse operation of :class:`UploadExportToS3` Example invocation:: - luigi --local-scheduler --module swh.dataset.luigi DownloadFromS3 \ - --config=graph.prod.yml \ + luigi --local-scheduler --module swh.dataset.luigi DownloadExportFromS3 \ --local-export-path=export/ \ --formats=edges \ --s3-export-path=s3://softwareheritage/graph/swh_2022-11-08 @@ -444,8 +442,7 @@ """Returns a :class:`ExportGraph` task that writes local files at the expected location.""" return [ - UploadToS3( - local_export_path=self.local_export_path, + UploadExportToS3( formats=self.formats, object_types=self.object_types, s3_export_path=self.s3_export_path, @@ -501,7 +498,7 @@ class LocalExport(luigi.Task): """Task that depends on a local dataset being present -- either directly from - :class:`ExportGraph` or via :class:`DownloadFromS3`. + :class:`ExportGraph` or via :class:`DownloadExportFromS3`. """ local_export_path = PathParameter(is_dir=True) @@ -510,16 +507,17 @@ enum=ObjectType, default=list(ObjectType), batch_method=merge_lists ) export_task_type = luigi.TaskParameter( - default=DownloadFromS3, + default=DownloadExportFromS3, significant=False, description="""The task used to get the dataset if it is not present. Should be either ``swh.dataset.luigi.ExportGraph`` or - ``swh.dataset.luigi.DownloadFromS3``.""", + ``swh.dataset.luigi.DownloadExportFromS3``.""", ) def requires(self) -> List[luigi.Task]: - """Returns an instance of either :class:`ExportGraph` or :class:`DownloadFromS3` - depending on the value of :attr:`export_task_type`.""" + """Returns an instance of either :class:`ExportGraph` or + :class:`DownloadExportFromS3` depending on the value of + :attr:`export_task_type`.""" if issubclass(self.export_task_type, ExportGraph): return [ @@ -529,9 +527,9 @@ object_types=self.object_types, ) ] - elif issubclass(self.export_task_type, DownloadFromS3): + elif issubclass(self.export_task_type, DownloadExportFromS3): return [ - DownloadFromS3( + DownloadExportFromS3( local_export_path=self.local_export_path, formats=self.formats, object_types=self.object_types, @@ -611,10 +609,10 @@ ) def requires(self) -> List[luigi.Task]: - """Returns the corresponding :class:`UploadToS3` instance, with ORC as only - format.""" + """Returns the corresponding :class:`UploadExportToS3` instance, + with ORC as only format.""" return [ - UploadToS3( + UploadExportToS3( formats=[Format.orc], # type: ignore[attr-defined] object_types=self.object_types, s3_export_path=self.s3_export_path, @@ -637,15 +635,15 @@ ) -class RunAll(luigi.Task): +class RunExportAll(luigi.Task): """Runs both the S3 and Athena export. Example invocation:: - luigi --local-scheduler --module swh.dataset.luigi RunAll \ + luigi --local-scheduler --module swh.dataset.luigi RunExportAll \ --ExportGraph-config=graph.staging.yml \ --ExportGraph-processes=12 \ - --UploadToS3-local-export-path=/tmp/export_2022-11-08_staging/ \ + --UploadExportToS3-local-export-path=/tmp/export_2022-11-08_staging/ \ --formats=edges \ --s3-export-path=s3://softwareheritage/graph/swh_2022-11-08 \ --athena-db-name=swh_20221108 \ @@ -662,8 +660,8 @@ athena_db_name = luigi.Parameter() def requires(self) -> List[luigi.Task]: - # CreateAthena depends on UploadToS3(formats=[edges]), so we need to - # explicitly depend on UploadToS3(formats=self.formats) here, to also + # CreateAthena depends on UploadExportToS3(formats=[edges]), so we need to + # explicitly depend on UploadExportToS3(formats=self.formats) here, to also # export the formats requested by the user. return [ CreateAthena( @@ -672,7 +670,7 @@ s3_athena_output_location=self.s3_athena_output_location, athena_db_name=self.athena_db_name, ), - UploadToS3( + UploadExportToS3( formats=self.formats, object_types=self.object_types, s3_export_path=self.s3_export_path,