Changeset View
Changeset View
Standalone View
Standalone View
swh/dataset/luigi.py
Show First 20 Lines • Show All 56 Lines • ▼ Show 20 Lines | .. code-block:: json | ||||
"formats": [ | "formats": [ | ||||
"edges", | "edges", | ||||
"orc" | "orc" | ||||
], | ], | ||||
"object_type": [ | "object_type": [ | ||||
"origin", | "origin", | ||||
"origin_visit" | "origin_visit" | ||||
], | ], | ||||
"privileged": false, | |||||
"hostname": "desktop5", | "hostname": "desktop5", | ||||
"privileged": false | "tool": { | ||||
"name": "swh.dataset", | |||||
"version": "0.3.2", | |||||
} | |||||
} | } | ||||
Running all on staging | Running all on staging | ||||
---------------------- | ---------------------- | ||||
An easy way to run it (eg. on the staging database), is to have these config | An easy way to run it (eg. on the staging database), is to have these config | ||||
files: | files: | ||||
▲ Show 20 Lines • Show All 204 Lines • ▼ Show 20 Lines | def _meta(self): | ||||
return luigi.LocalTarget(self.local_export_path / "meta" / "export.json") | return luigi.LocalTarget(self.local_export_path / "meta" / "export.json") | ||||
def run(self) -> None: | def run(self) -> None: | ||||
"""Runs the full export, then writes stamps, then :file:`meta.json`.""" | """Runs the full export, then writes stamps, then :file:`meta.json`.""" | ||||
import datetime | import datetime | ||||
import json | import json | ||||
import socket | import socket | ||||
import pkg_resources | |||||
from swh.core import config | from swh.core import config | ||||
# we are about to overwrite files, so remove any existing stamp | # we are about to overwrite files, so remove any existing stamp | ||||
for output in self.output(): | for output in self.output(): | ||||
if output.exists(): | if output.exists(): | ||||
output.remove() | output.remove() | ||||
conf = config.read(self.config_file) | conf = config.read(self.config_file) | ||||
Show All 21 Lines | def run(self) -> None: | ||||
meta = { | meta = { | ||||
"flavor": "full", | "flavor": "full", | ||||
"export_start": start_date.isoformat(), | "export_start": start_date.isoformat(), | ||||
"export_end": end_date.isoformat(), | "export_end": end_date.isoformat(), | ||||
"brokers": conf["journal"]["brokers"], | "brokers": conf["journal"]["brokers"], | ||||
"prefix": conf["journal"]["prefix"], | "prefix": conf["journal"]["prefix"], | ||||
"formats": [format_.name for format_ in self.formats], | "formats": [format_.name for format_ in self.formats], | ||||
"object_type": [object_type.name for object_type in self.object_types], | "object_type": [object_type.name for object_type in self.object_types], | ||||
"hostname": socket.getfqdn(), | |||||
"privileged": conf["journal"].get("privileged"), | "privileged": conf["journal"].get("privileged"), | ||||
"hostname": socket.getfqdn(), | |||||
"tool": { | |||||
"name": "swh.dataset", | |||||
"version": pkg_resources.get_distribution("swh.dataset").version, | |||||
}, | |||||
} | } | ||||
with self._meta().open("w") as fd: | with self._meta().open("w") as fd: | ||||
json.dump(meta, fd, indent=4) | json.dump(meta, fd, indent=4) | ||||
class UploadToS3(luigi.Task): | class UploadToS3(luigi.Task): | ||||
"""Uploads a local dataset export to S3; creating automatically if it does | """Uploads a local dataset export to S3; creating automatically if it does | ||||
not exist. | not exist. | ||||
▲ Show 20 Lines • Show All 339 Lines • Show Last 20 Lines |