Changeset View
Changeset View
Standalone View
Standalone View
swh/dataset/luigi.py
Show All 33 Lines | swh_<date>[_<flavor>]/ | ||||
snapshot | snapshot | ||||
... | ... | ||||
meta/ | meta/ | ||||
export.json | export.json | ||||
``stamps`` files are written after corresponding directories are written. | ``stamps`` files are written after corresponding directories are written. | ||||
Their presence indicates the corresponding directory was fully generated/copied. | Their presence indicates the corresponding directory was fully generated/copied. | ||||
This allows skipping work that was already done, while ignoring interrupted jobs. | This allows skipping work that was already done, while ignoring interrupted jobs. | ||||
They are omitted after the initial export (ie. when downloading to/from other machines). | |||||
``meta/export.json`` contains information about the dataset, for provenance tracking. | ``meta/export.json`` contains information about the dataset, for provenance tracking. | ||||
For example: | For example: | ||||
.. code-block:: json | .. code-block:: json | ||||
{ | { | ||||
"flavor": "full", | "flavor": "full", | ||||
▲ Show 20 Lines • Show All 56 Lines • ▼ Show 20 Lines | |||||
Note that this arbitrarily divides config options between :file:`luigi.cfg` and the CLI | Note that this arbitrarily divides config options between :file:`luigi.cfg` and the CLI | ||||
for readability; but `they can be used interchangeably <https://luigi.readthedocs.io/en/stable/configuration.html#parameters-from-config-ingestion>`__ | for readability; but `they can be used interchangeably <https://luigi.readthedocs.io/en/stable/configuration.html#parameters-from-config-ingestion>`__ | ||||
""" # noqa | """ # noqa | ||||
# WARNING: do not import unnecessary things here to keep cli startup time under | # WARNING: do not import unnecessary things here to keep cli startup time under | ||||
# control | # control | ||||
import enum | import enum | ||||
from pathlib import Path | from pathlib import Path | ||||
from typing import Hashable, Iterator, List, TypeVar | from typing import Hashable, Iterator, List, TypeVar, Union | ||||
import luigi | import luigi | ||||
from swh.dataset import cli | from swh.dataset import cli | ||||
from swh.dataset.relational import MAIN_TABLES | from swh.dataset.relational import MAIN_TABLES | ||||
ObjectType = enum.Enum( # type: ignore[misc] | ObjectType = enum.Enum( # type: ignore[misc] | ||||
"ObjectType", [obj_type for obj_type in MAIN_TABLES.keys()] | "ObjectType", [obj_type for obj_type in MAIN_TABLES.keys()] | ||||
▲ Show 20 Lines • Show All 84 Lines • ▼ Show 20 Lines | def stamps_paths(formats: List[Format], object_types: List[ObjectType]) -> List[str]: | ||||
""" | """ | ||||
return [ | return [ | ||||
f"{format_.name}/stamps/{object_type.name.lower()}" | f"{format_.name}/stamps/{object_type.name.lower()}" | ||||
for format_ in formats | for format_ in formats | ||||
for object_type in object_types | for object_type in object_types | ||||
] | ] | ||||
def _export_metadata_has_object_types( | |||||
export_metadata: Union[luigi.LocalTarget, "luigi.contrib.s3.S3Target"], | |||||
object_types: List[ObjectType], | |||||
) -> bool: | |||||
import json | |||||
with export_metadata.open() as fd: | |||||
meta = json.load(fd) | |||||
return set(meta["object_type"]) >= { | |||||
object_type.name for object_type in object_types | |||||
} | |||||
class ExportGraph(luigi.Task): | class ExportGraph(luigi.Task): | ||||
"""Exports the entire graph to the local filesystem. | """Exports the entire graph to the local filesystem. | ||||
Example invocation:: | Example invocation:: | ||||
luigi --local-scheduler --module swh.dataset.luigi ExportGraph \ | luigi --local-scheduler --module swh.dataset.luigi ExportGraph \ | ||||
--config=graph.prod.yml \ | --config=graph.prod.yml \ | ||||
--local-export-path=export/ \ | --local-export-path=export/ \ | ||||
▲ Show 20 Lines • Show All 119 Lines • ▼ Show 20 Lines | def requires(self) -> List[luigi.Task]: | ||||
local_export_path=self.local_export_path, | local_export_path=self.local_export_path, | ||||
formats=self.formats, | formats=self.formats, | ||||
object_types=self.object_types, | object_types=self.object_types, | ||||
) | ) | ||||
] | ] | ||||
def output(self) -> List[luigi.Target]: | def output(self) -> List[luigi.Target]: | ||||
"""Returns stamp and meta paths on S3.""" | """Returns stamp and meta paths on S3.""" | ||||
return self._stamps() + [self._meta()] | return [self._meta()] | ||||
def _stamps(self): | |||||
import luigi.contrib.s3 | |||||
return [ | |||||
luigi.contrib.s3.S3Target(f"{self.s3_export_path}/{path}") | |||||
for path in stamps_paths(self.formats, self.object_types) | |||||
] | |||||
def _meta(self): | def _meta(self): | ||||
import luigi.contrib.s3 | import luigi.contrib.s3 | ||||
return luigi.contrib.s3.S3Target(f"{self.s3_export_path}/meta/export.json") | return luigi.contrib.s3.S3Target(f"{self.s3_export_path}/meta/export.json") | ||||
def complete(self) -> bool: | |||||
return super().complete() and _export_metadata_has_object_types( | |||||
self._meta(), self.object_types | |||||
) | |||||
def run(self) -> None: | def run(self) -> None: | ||||
"""Copies all files: first the export itself, then stamps, then | """Copies all files: first the export itself, then :file:`meta.json`.""" | ||||
:file:`meta.json`. | |||||
""" | |||||
import os | import os | ||||
import luigi.contrib.s3 | import luigi.contrib.s3 | ||||
import tqdm | import tqdm | ||||
client = luigi.contrib.s3.S3Client() | client = luigi.contrib.s3.S3Client() | ||||
# recursively copy local files to S3, and end with stamps and export metadata | # recursively copy local files to S3, and end with stamps and export metadata | ||||
for format_ in self.formats: | for format_ in self.formats: | ||||
for object_type in self.object_types: | for object_type in self.object_types: | ||||
local_dir = self.local_export_path / format_.name / object_type.name | local_dir = self.local_export_path / format_.name / object_type.name | ||||
s3_dir = f"{self.s3_export_path}/{format_.name}/{object_type.name}" | s3_dir = f"{self.s3_export_path}/{format_.name}/{object_type.name}" | ||||
if not local_dir.exists(): | if not local_dir.exists(): | ||||
# intermediary object types (eg. origin_visit, origin_visit_status) | # intermediary object types (eg. origin_visit, origin_visit_status) | ||||
# do not have their own directory | # do not have their own directory | ||||
continue | continue | ||||
for file_ in tqdm.tqdm( | for file_ in tqdm.tqdm( | ||||
list(os.listdir(local_dir)), | list(os.listdir(local_dir)), | ||||
desc=f"Uploading {format_.name}/{object_type.name}/", | desc=f"Uploading {format_.name}/{object_type.name}/", | ||||
): | ): | ||||
client.put_multipart( | client.put_multipart( | ||||
local_dir / file_, f"{s3_dir}/{file_}", ACL="public-read" | local_dir / file_, f"{s3_dir}/{file_}", ACL="public-read" | ||||
) | ) | ||||
for stamp in stamps_paths(self.formats, self.object_types): | |||||
client.put_multipart( | |||||
self.local_export_path / stamp, | |||||
f"{self.s3_export_path}/{stamp}", | |||||
ACL="public-read", | |||||
) | |||||
client.put( | client.put( | ||||
self.local_export_path / "meta" / "export.json", | self.local_export_path / "meta" / "export.json", | ||||
self._meta().path, | self._meta().path, | ||||
ACL="public-read", | ACL="public-read", | ||||
) | ) | ||||
class DownloadFromS3(luigi.Task): | class DownloadFromS3(luigi.Task): | ||||
Show All 25 Lines | def requires(self) -> List[luigi.Task]: | ||||
local_export_path=self.local_export_path, | local_export_path=self.local_export_path, | ||||
formats=self.formats, | formats=self.formats, | ||||
object_types=self.object_types, | object_types=self.object_types, | ||||
s3_export_path=self.s3_export_path, | s3_export_path=self.s3_export_path, | ||||
) | ) | ||||
] | ] | ||||
def output(self) -> List[luigi.Target]: | def output(self) -> List[luigi.Target]: | ||||
"""Returns stamp and meta paths on S3.""" | """Returns stamp and meta paths on the local filesystem.""" | ||||
return self._stamps() + [self._meta()] | return [self._meta()] | ||||
def _stamps(self): | def complete(self) -> bool: | ||||
return [ | return super().complete() and _export_metadata_has_object_types( | ||||
luigi.LocalTarget(self.local_export_path / path) | self._meta(), self.object_types | ||||
for path in stamps_paths(self.formats, self.object_types) | ) | ||||
] | |||||
def _meta(self): | def _meta(self): | ||||
return luigi.LocalTarget(self.local_export_path / "meta" / "export.json") | return luigi.LocalTarget(self.local_export_path / "meta" / "export.json") | ||||
def run(self) -> None: | def run(self) -> None: | ||||
"""Copies all files: first the export itself, then stamps, then | """Copies all files: first the export itself, then :file:`meta.json`.""" | ||||
:file:`meta.json`. | |||||
""" | |||||
import luigi.contrib.s3 | import luigi.contrib.s3 | ||||
import tqdm | import tqdm | ||||
client = luigi.contrib.s3.S3Client() | client = luigi.contrib.s3.S3Client() | ||||
# recursively copy local files to S3, and end with stamps and export metadata | # recursively copy local files to S3, and end with export metadata | ||||
for format_ in self.formats: | for format_ in self.formats: | ||||
for object_type in self.object_types: | for object_type in self.object_types: | ||||
local_dir = self.local_export_path / format_.name / object_type.name | local_dir = self.local_export_path / format_.name / object_type.name | ||||
s3_dir = f"{self.s3_export_path}/{format_.name}/{object_type.name}" | s3_dir = f"{self.s3_export_path}/{format_.name}/{object_type.name}" | ||||
files = list(client.list(s3_dir)) | files = list(client.list(s3_dir)) | ||||
if not files: | if not files: | ||||
# intermediary object types (eg. origin_visit, origin_visit_status) | # intermediary object types (eg. origin_visit, origin_visit_status) | ||||
# do not have their own directory | # do not have their own directory | ||||
continue | continue | ||||
local_dir.mkdir(parents=True, exist_ok=True) | local_dir.mkdir(parents=True, exist_ok=True) | ||||
for file_ in tqdm.tqdm( | for file_ in tqdm.tqdm( | ||||
files, | files, | ||||
desc=f"Downloading {format_.name}/{object_type.name}/", | desc=f"Downloading {format_.name}/{object_type.name}/", | ||||
): | ): | ||||
client.get( | client.get( | ||||
f"{s3_dir}/{file_}", | f"{s3_dir}/{file_}", | ||||
str(local_dir / file_), | str(local_dir / file_), | ||||
) | ) | ||||
for stamp in stamps_paths(self.formats, self.object_types): | |||||
stamp_path = self.local_export_path / stamp | |||||
stamp_path.parent.mkdir(parents=True, exist_ok=True) | |||||
client.get( | |||||
f"{self.s3_export_path}/{stamp}", | |||||
str(stamp_path), | |||||
) | |||||
export_json_path = self.local_export_path / "meta" / "export.json" | export_json_path = self.local_export_path / "meta" / "export.json" | ||||
export_json_path.parent.mkdir(exist_ok=True) | export_json_path.parent.mkdir(exist_ok=True) | ||||
client.get( | client.get( | ||||
f"{self.s3_export_path}/meta/export.json", | f"{self.s3_export_path}/meta/export.json", | ||||
self._meta().path, | self._meta().path, | ||||
) | ) | ||||
▲ Show 20 Lines • Show All 128 Lines • Show Last 20 Lines |