Changeset View
Changeset View
Standalone View
Standalone View
swh/dataset/luigi.py
Show First 20 Lines • Show All 91 Lines • ▼ Show 20 Lines | |||||
.. code-block: yaml | .. code-block: yaml | ||||
:caption: luigi.cfg | :caption: luigi.cfg | ||||
[ExportGraph] | [ExportGraph] | ||||
config=graph.staging.yml | config=graph.staging.yml | ||||
processes=16 | processes=16 | ||||
[RunAll] | [RunExportAll] | ||||
formats=edges,orc | formats=edges,orc | ||||
s3_athena_output_location=s3://vlorentz-test2/tmp/athena-output/ | s3_athena_output_location=s3://vlorentz-test2/tmp/athena-output/ | ||||
And run this command, for example:: | And run this command, for example:: | ||||
luigi --log-level INFO --local-scheduler --module swh.dataset.luigi RunAll \ | luigi --log-level INFO --local-scheduler --module swh.dataset.luigi RunExportAll \ | ||||
--UploadToS3-local-export-path=/poolswh/softwareheritage/2022-11-09_staging/ \ | --UploadExportToS3-local-export-path=/poolswh/softwareheritage/2022-11-09_staging/ \ | ||||
--s3-export-path=s3://vlorentz-test2/vlorentz_2022-11-09_staging/ \ | --s3-export-path=s3://vlorentz-test2/vlorentz_2022-11-09_staging/ \ | ||||
--athena-db-name=vlorentz_20221109_staging | --athena-db-name=vlorentz_20221109_staging | ||||
Note that this arbitrarily divides config options between :file:`luigi.cfg` and the CLI | Note that this arbitrarily divides config options between :file:`luigi.cfg` and the CLI | ||||
for readability; but `they can be used interchangeably <https://luigi.readthedocs.io/en/stable/configuration.html#parameters-from-config-ingestion>`__ | for readability; but `they can be used interchangeably <https://luigi.readthedocs.io/en/stable/configuration.html#parameters-from-config-ingestion>`__ | ||||
""" # noqa | """ # noqa | ||||
# WARNING: do not import unnecessary things here to keep cli startup time under | # WARNING: do not import unnecessary things here to keep cli startup time under | ||||
▲ Show 20 Lines • Show All 220 Lines • ▼ Show 20 Lines | def run(self) -> None: | ||||
"name": "swh.dataset", | "name": "swh.dataset", | ||||
"version": pkg_resources.get_distribution("swh.dataset").version, | "version": pkg_resources.get_distribution("swh.dataset").version, | ||||
}, | }, | ||||
} | } | ||||
with self._meta().open("w") as fd: | with self._meta().open("w") as fd: | ||||
json.dump(meta, fd, indent=4) | json.dump(meta, fd, indent=4) | ||||
class UploadToS3(luigi.Task): | class UploadExportToS3(luigi.Task): | ||||
"""Uploads a local dataset export to S3; creating automatically if it does | """Uploads a local dataset export to S3; creating automatically if it does | ||||
not exist. | not exist. | ||||
Example invocation:: | Example invocation:: | ||||
luigi --local-scheduler --module swh.dataset.luigi UploadToS3 \ | luigi --local-scheduler --module swh.dataset.luigi UploadExportToS3 \ | ||||
--config=graph.prod.yml \ | |||||
--local-export-path=export/ \ | --local-export-path=export/ \ | ||||
--formats=edges \ | --formats=edges \ | ||||
--s3-export-path=s3://softwareheritage/graph/swh_2022-11-08 | --s3-export-path=s3://softwareheritage/graph/swh_2022-11-08 | ||||
""" | """ | ||||
local_export_path = PathParameter(is_dir=True, create=True, significant=False) | local_export_path = PathParameter(is_dir=True, create=True, significant=False) | ||||
formats = luigi.EnumListParameter(enum=Format, batch_method=merge_lists) | formats = luigi.EnumListParameter(enum=Format, batch_method=merge_lists) | ||||
object_types = luigi.EnumListParameter( | object_types = luigi.EnumListParameter( | ||||
▲ Show 20 Lines • Show All 54 Lines • ▼ Show 20 Lines | def run(self) -> None: | ||||
client.put( | client.put( | ||||
self.local_export_path / "meta" / "export.json", | self.local_export_path / "meta" / "export.json", | ||||
self._meta().path, | self._meta().path, | ||||
ACL="public-read", | ACL="public-read", | ||||
) | ) | ||||
class DownloadFromS3(luigi.Task): | class DownloadExportFromS3(luigi.Task): | ||||
"""Downloads a local dataset export from S3. | """Downloads a local dataset export from S3. | ||||
This performs the inverse operation of :class:`UploadToS3` | This performs the inverse operation of :class:`UploadExportToS3` | ||||
Example invocation:: | Example invocation:: | ||||
luigi --local-scheduler --module swh.dataset.luigi DownloadFromS3 \ | luigi --local-scheduler --module swh.dataset.luigi DownloadExportFromS3 \ | ||||
--config=graph.prod.yml \ | |||||
--local-export-path=export/ \ | --local-export-path=export/ \ | ||||
--formats=edges \ | --formats=edges \ | ||||
--s3-export-path=s3://softwareheritage/graph/swh_2022-11-08 | --s3-export-path=s3://softwareheritage/graph/swh_2022-11-08 | ||||
""" | """ | ||||
local_export_path = PathParameter(is_dir=True, create=True) | local_export_path = PathParameter(is_dir=True, create=True) | ||||
formats = luigi.EnumListParameter(enum=Format, batch_method=merge_lists) | formats = luigi.EnumListParameter(enum=Format, batch_method=merge_lists) | ||||
object_types = luigi.EnumListParameter( | object_types = luigi.EnumListParameter( | ||||
enum=ObjectType, default=list(ObjectType), batch_method=merge_lists | enum=ObjectType, default=list(ObjectType), batch_method=merge_lists | ||||
) | ) | ||||
s3_export_path = S3PathParameter(significant=False) | s3_export_path = S3PathParameter(significant=False) | ||||
def requires(self) -> List[luigi.Task]: | def requires(self) -> List[luigi.Task]: | ||||
"""Returns a :class:`ExportGraph` task that writes local files at the | """Returns a :class:`ExportGraph` task that writes local files at the | ||||
expected location.""" | expected location.""" | ||||
return [ | return [ | ||||
UploadToS3( | UploadExportToS3( | ||||
local_export_path=self.local_export_path, | |||||
formats=self.formats, | formats=self.formats, | ||||
object_types=self.object_types, | object_types=self.object_types, | ||||
s3_export_path=self.s3_export_path, | s3_export_path=self.s3_export_path, | ||||
) | ) | ||||
] | ] | ||||
def output(self) -> List[luigi.Target]: | def output(self) -> List[luigi.Target]: | ||||
"""Returns stamp and meta paths on the local filesystem.""" | """Returns stamp and meta paths on the local filesystem.""" | ||||
Show All 39 Lines | def run(self) -> None: | ||||
client.get( | client.get( | ||||
f"{self.s3_export_path}/meta/export.json", | f"{self.s3_export_path}/meta/export.json", | ||||
self._meta().path, | self._meta().path, | ||||
) | ) | ||||
class LocalExport(luigi.Task): | class LocalExport(luigi.Task): | ||||
"""Task that depends on a local dataset being present -- either directly from | """Task that depends on a local dataset being present -- either directly from | ||||
:class:`ExportGraph` or via :class:`DownloadFromS3`. | :class:`ExportGraph` or via :class:`DownloadExportFromS3`. | ||||
""" | """ | ||||
local_export_path = PathParameter(is_dir=True) | local_export_path = PathParameter(is_dir=True) | ||||
formats = luigi.EnumListParameter(enum=Format, batch_method=merge_lists) | formats = luigi.EnumListParameter(enum=Format, batch_method=merge_lists) | ||||
object_types = luigi.EnumListParameter( | object_types = luigi.EnumListParameter( | ||||
enum=ObjectType, default=list(ObjectType), batch_method=merge_lists | enum=ObjectType, default=list(ObjectType), batch_method=merge_lists | ||||
) | ) | ||||
export_task_type = luigi.TaskParameter( | export_task_type = luigi.TaskParameter( | ||||
default=DownloadFromS3, | default=DownloadExportFromS3, | ||||
significant=False, | significant=False, | ||||
description="""The task used to get the dataset if it is not present. | description="""The task used to get the dataset if it is not present. | ||||
Should be either ``swh.dataset.luigi.ExportGraph`` or | Should be either ``swh.dataset.luigi.ExportGraph`` or | ||||
``swh.dataset.luigi.DownloadFromS3``.""", | ``swh.dataset.luigi.DownloadExportFromS3``.""", | ||||
) | ) | ||||
def requires(self) -> List[luigi.Task]: | def requires(self) -> List[luigi.Task]: | ||||
"""Returns an instance of either :class:`ExportGraph` or :class:`DownloadFromS3` | """Returns an instance of either :class:`ExportGraph` or | ||||
depending on the value of :attr:`export_task_type`.""" | :class:`DownloadExportFromS3` depending on the value of | ||||
:attr:`export_task_type`.""" | |||||
if issubclass(self.export_task_type, ExportGraph): | if issubclass(self.export_task_type, ExportGraph): | ||||
return [ | return [ | ||||
ExportGraph( | ExportGraph( | ||||
local_export_path=self.local_export_path, | local_export_path=self.local_export_path, | ||||
formats=self.formats, | formats=self.formats, | ||||
object_types=self.object_types, | object_types=self.object_types, | ||||
) | ) | ||||
] | ] | ||||
elif issubclass(self.export_task_type, DownloadFromS3): | elif issubclass(self.export_task_type, DownloadExportFromS3): | ||||
return [ | return [ | ||||
DownloadFromS3( | DownloadExportFromS3( | ||||
local_export_path=self.local_export_path, | local_export_path=self.local_export_path, | ||||
formats=self.formats, | formats=self.formats, | ||||
object_types=self.object_types, | object_types=self.object_types, | ||||
) | ) | ||||
] | ] | ||||
else: | else: | ||||
raise ValueError( | raise ValueError( | ||||
f"Unexpected export_task_type: {self.export_task_type.__name__}" | f"Unexpected export_task_type: {self.export_task_type.__name__}" | ||||
▲ Show 20 Lines • Show All 63 Lines • ▼ Show 20 Lines | def __init__(self, *args, **kwargs): | ||||
if not self.s3_export_path.replace("-", "").endswith(f"/{self.athena_db_name}"): | if not self.s3_export_path.replace("-", "").endswith(f"/{self.athena_db_name}"): | ||||
raise ValueError( | raise ValueError( | ||||
f"S3 export path ({self.s3_export_path}) does not match " | f"S3 export path ({self.s3_export_path}) does not match " | ||||
f"Athena database name ({self.athena_db_name})" | f"Athena database name ({self.athena_db_name})" | ||||
) | ) | ||||
def requires(self) -> List[luigi.Task]: | def requires(self) -> List[luigi.Task]: | ||||
"""Returns the corresponding :class:`UploadToS3` instance, with ORC as only | """Returns the corresponding :class:`UploadExportToS3` instance, | ||||
format.""" | with ORC as only format.""" | ||||
return [ | return [ | ||||
UploadToS3( | UploadExportToS3( | ||||
formats=[Format.orc], # type: ignore[attr-defined] | formats=[Format.orc], # type: ignore[attr-defined] | ||||
object_types=self.object_types, | object_types=self.object_types, | ||||
s3_export_path=self.s3_export_path, | s3_export_path=self.s3_export_path, | ||||
) | ) | ||||
] | ] | ||||
def output(self) -> List[luigi.Target]: | def output(self) -> List[luigi.Target]: | ||||
"""Returns an instance of :class:`AthenaDatabaseTarget`.""" | """Returns an instance of :class:`AthenaDatabaseTarget`.""" | ||||
return [AthenaDatabaseTarget(self.athena_db_name)] | return [AthenaDatabaseTarget(self.athena_db_name)] | ||||
def run(self) -> None: | def run(self) -> None: | ||||
"""Creates tables from the ORC dataset.""" | """Creates tables from the ORC dataset.""" | ||||
from swh.dataset.athena import create_tables | from swh.dataset.athena import create_tables | ||||
create_tables( | create_tables( | ||||
self.athena_db_name, | self.athena_db_name, | ||||
self.s3_export_path, | self.s3_export_path, | ||||
output_location=self.s3_athena_output_location, | output_location=self.s3_athena_output_location, | ||||
replace=True, | replace=True, | ||||
) | ) | ||||
class RunAll(luigi.Task): | class RunExportAll(luigi.Task): | ||||
"""Runs both the S3 and Athena export. | """Runs both the S3 and Athena export. | ||||
Example invocation:: | Example invocation:: | ||||
luigi --local-scheduler --module swh.dataset.luigi RunAll \ | luigi --local-scheduler --module swh.dataset.luigi RunExportAll \ | ||||
--ExportGraph-config=graph.staging.yml \ | --ExportGraph-config=graph.staging.yml \ | ||||
--ExportGraph-processes=12 \ | --ExportGraph-processes=12 \ | ||||
--UploadToS3-local-export-path=/tmp/export_2022-11-08_staging/ \ | --UploadExportToS3-local-export-path=/tmp/export_2022-11-08_staging/ \ | ||||
--formats=edges \ | --formats=edges \ | ||||
--s3-export-path=s3://softwareheritage/graph/swh_2022-11-08 \ | --s3-export-path=s3://softwareheritage/graph/swh_2022-11-08 \ | ||||
--athena-db-name=swh_20221108 \ | --athena-db-name=swh_20221108 \ | ||||
--object-types=origin,origin_visit \ | --object-types=origin,origin_visit \ | ||||
--s3-athena-output-location=s3://softwareheritage/graph/tmp/athena | --s3-athena-output-location=s3://softwareheritage/graph/tmp/athena | ||||
""" | """ | ||||
formats = luigi.EnumListParameter(enum=Format, batch_method=merge_lists) | formats = luigi.EnumListParameter(enum=Format, batch_method=merge_lists) | ||||
object_types = luigi.EnumListParameter( | object_types = luigi.EnumListParameter( | ||||
enum=ObjectType, default=list(ObjectType), batch_method=merge_lists | enum=ObjectType, default=list(ObjectType), batch_method=merge_lists | ||||
) | ) | ||||
s3_export_path = S3PathParameter() | s3_export_path = S3PathParameter() | ||||
s3_athena_output_location = S3PathParameter() | s3_athena_output_location = S3PathParameter() | ||||
athena_db_name = luigi.Parameter() | athena_db_name = luigi.Parameter() | ||||
def requires(self) -> List[luigi.Task]: | def requires(self) -> List[luigi.Task]: | ||||
# CreateAthena depends on UploadToS3(formats=[edges]), so we need to | # CreateAthena depends on UploadExportToS3(formats=[edges]), so we need to | ||||
# explicitly depend on UploadToS3(formats=self.formats) here, to also | # explicitly depend on UploadExportToS3(formats=self.formats) here, to also | ||||
# export the formats requested by the user. | # export the formats requested by the user. | ||||
return [ | return [ | ||||
CreateAthena( | CreateAthena( | ||||
object_types=self.object_types, | object_types=self.object_types, | ||||
s3_export_path=self.s3_export_path, | s3_export_path=self.s3_export_path, | ||||
s3_athena_output_location=self.s3_athena_output_location, | s3_athena_output_location=self.s3_athena_output_location, | ||||
athena_db_name=self.athena_db_name, | athena_db_name=self.athena_db_name, | ||||
), | ), | ||||
UploadToS3( | UploadExportToS3( | ||||
formats=self.formats, | formats=self.formats, | ||||
object_types=self.object_types, | object_types=self.object_types, | ||||
s3_export_path=self.s3_export_path, | s3_export_path=self.s3_export_path, | ||||
), | ), | ||||
] | ] | ||||
def complete(self) -> bool: | def complete(self) -> bool: | ||||
# Dependencies perform their own completeness check, and this task | # Dependencies perform their own completeness check, and this task | ||||
# does no work itself | # does no work itself | ||||
return False | return False |