Changeset View
Changeset View
Standalone View
Standalone View
swh/dataset/luigi.py
Show First 20 Lines • Show All 398 Lines • ▼ Show 20 Lines | def run(self) -> None: | ||||
client.put( | client.put( | ||||
self.local_export_path / "meta" / "export.json", | self.local_export_path / "meta" / "export.json", | ||||
self._meta().path, | self._meta().path, | ||||
ACL="public-read", | ACL="public-read", | ||||
) | ) | ||||
class DownloadFromS3(luigi.Task): | |||||
"""Downloads a local dataset export from S3. | |||||
This performs the inverse operation of :class:`UploadToS3` | |||||
Example invocation:: | |||||
luigi --local-scheduler --module swh.dataset.luigi DownloadFromS3 \ | |||||
--config=graph.prod.yml \ | |||||
--local-export-path=export/ \ | |||||
--formats=edges \ | |||||
--s3-export-path=s3://softwareheritage/graph/swh_2022-11-08 | |||||
""" | |||||
local_export_path = PathParameter(is_dir=True, create=True) | |||||
formats = luigi.EnumListParameter(enum=Format, batch_method=merge_lists) | |||||
object_types = luigi.EnumListParameter( | |||||
enum=ObjectType, default=list(ObjectType), batch_method=merge_lists | |||||
) | |||||
ardumont: What does that do? | |||||
Done Inline Actionssignificant=False means that if two tasks are called with different values for this parameter but equal values for all others, then Luigi will consider one of the tasks to be redundant and won't run it. vlorentz: `significant=False` means that if two tasks are called with different values for this parameter… | |||||
Not Done Inline Actionsneat ardumont: neat | |||||
s3_export_path = S3PathParameter(significant=False) | |||||
def requires(self) -> List[luigi.Task]: | |||||
"""Returns a :class:`ExportGraph` task that writes local files at the | |||||
expected location.""" | |||||
return [ | |||||
UploadToS3( | |||||
local_export_path=self.local_export_path, | |||||
formats=self.formats, | |||||
object_types=self.object_types, | |||||
s3_export_path=self.s3_export_path, | |||||
Not Done Inline ActionsWill that trigger an upload to s3 task? If so, I gather this task is triggered from the appropriate node (with data and s3 access, etc...), right? ardumont: Will that trigger an upload to s3 task?
If so, I gather this task is triggered from the… | |||||
Done Inline ActionsLuigi tasks cannot be triggered remotely. This requirements means that the task will run if it is properly configured, or the whole workflow will fail if it is not. vlorentz: Luigi tasks cannot be triggered remotely.
This requirements means that the task will run if it… | |||||
Not Done Inline Actionsneat too. ardumont: neat too. | |||||
) | |||||
] | |||||
def output(self) -> List[luigi.Target]: | |||||
"""Returns stamp and meta paths on S3.""" | |||||
return self._stamps() + [self._meta()] | |||||
def _stamps(self): | |||||
return [ | |||||
luigi.LocalTarget(self.local_export_path / path) | |||||
for path in stamps_paths(self.formats, self.object_types) | |||||
] | |||||
def _meta(self): | |||||
return luigi.LocalTarget(self.local_export_path / "meta" / "export.json") | |||||
def run(self) -> None: | |||||
"""Copies all files: first the export itself, then stamps, then | |||||
:file:`meta.json`. | |||||
""" | |||||
import luigi.contrib.s3 | |||||
import tqdm | |||||
client = luigi.contrib.s3.S3Client() | |||||
# recursively copy local files to S3, and end with stamps and export metadata | |||||
for format_ in self.formats: | |||||
for object_type in self.object_types: | |||||
local_dir = self.local_export_path / format_.name / object_type.name | |||||
s3_dir = f"{self.s3_export_path}/{format_.name}/{object_type.name}" | |||||
files = list(client.list(s3_dir)) | |||||
if not files: | |||||
# intermediary object types (eg. origin_visit, origin_visit_status) | |||||
# do not have their own directory | |||||
continue | |||||
local_dir.mkdir(parents=True, exist_ok=True) | |||||
for file_ in tqdm.tqdm( | |||||
files, | |||||
desc=f"Downloading {format_.name}/{object_type.name}/", | |||||
): | |||||
client.get( | |||||
f"{s3_dir}/{file_}", | |||||
str(local_dir / file_), | |||||
) | |||||
for stamp in stamps_paths(self.formats, self.object_types): | |||||
stamp_path = self.local_export_path / stamp | |||||
stamp_path.parent.mkdir(parents=True, exist_ok=True) | |||||
client.get( | |||||
f"{self.s3_export_path}/{stamp}", | |||||
str(stamp_path), | |||||
) | |||||
export_json_path = self.local_export_path / "meta" / "export.json" | |||||
export_json_path.parent.mkdir(exist_ok=True) | |||||
client.get( | |||||
f"{self.s3_export_path}/meta/export.json", | |||||
self._meta().path, | |||||
) | |||||
class AthenaDatabaseTarget(luigi.Target): | class AthenaDatabaseTarget(luigi.Target): | ||||
"""Target for the existence of a database on Athena.""" | """Target for the existence of a database on Athena.""" | ||||
def __init__(self, name: str): | def __init__(self, name: str): | ||||
self.name = name | self.name = name | ||||
def exists(self) -> bool: | def exists(self) -> bool: | ||||
import boto3 | import boto3 | ||||
▲ Show 20 Lines • Show All 120 Lines • Show Last 20 Lines |
What does that do?