Changeset View
Changeset View
Standalone View
Standalone View
swh/dataset/cli.py
Show First 20 Lines • Show All 92 Lines • ▼ Show 20 Lines | @click.option( | ||||
), | ), | ||||
) | ) | ||||
@click.pass_context | @click.pass_context | ||||
def export_graph( | def export_graph( | ||||
ctx, export_path, export_id, formats, exclude, object_types, processes, margin | ctx, export_path, export_id, formats, exclude, object_types, processes, margin | ||||
): | ): | ||||
"""Export the Software Heritage graph as an edge dataset.""" | """Export the Software Heritage graph as an edge dataset.""" | ||||
from importlib import import_module | from importlib import import_module | ||||
import logging | |||||
import resource | |||||
import uuid | import uuid | ||||
from swh.dataset.journalprocessor import ParallelJournalProcessor | from swh.dataset.journalprocessor import ParallelJournalProcessor | ||||
logger = logging.getLogger(__name__) | |||||
config = ctx.obj["config"] | config = ctx.obj["config"] | ||||
if not export_id: | if not export_id: | ||||
export_id = str(uuid.uuid4()) | export_id = str(uuid.uuid4()) | ||||
if object_types: | if object_types: | ||||
object_types = {o.strip() for o in object_types.split(",")} | object_types = {o.strip() for o in object_types.split(",")} | ||||
invalid_object_types = object_types - set(MAIN_TABLES.keys()) | invalid_object_types = object_types - set(MAIN_TABLES.keys()) | ||||
if invalid_object_types: | if invalid_object_types: | ||||
Show All 11 Lines | for f in export_formats: | ||||
option_name="formats", message=f"{f} is not an available format." | option_name="formats", message=f"{f} is not an available format." | ||||
) | ) | ||||
# Enforce order (from origin to contents) to reduce number of holes in the graph. | # Enforce order (from origin to contents) to reduce number of holes in the graph. | ||||
object_types = [ | object_types = [ | ||||
obj_type for obj_type in MAIN_TABLES.keys() if obj_type in object_types | obj_type for obj_type in MAIN_TABLES.keys() if obj_type in object_types | ||||
] | ] | ||||
# ParallelJournalProcessor opens 256 LevelDBs in total. Depending on the number of | |||||
# processes, this can exceed the maximum number of file descriptors (soft limit | |||||
# defaults to 1024 on Debian), so let's increase it. | |||||
(soft, hard) = resource.getrlimit(resource.RLIMIT_NOFILE) | |||||
nb_shards = 256 # TODO: make this configurable or detect nb of kafka partitions | |||||
open_fds_per_shard = 61 # estimated with plyvel==1.3.0 and libleveldb1d==1.22-3 | |||||
spare = 1024 # for everything other than LevelDB | |||||
want_fd = nb_shards * open_fds_per_shard + spare | |||||
if hard < want_fd: | |||||
logger.warning( | |||||
"Hard limit of open file descriptors (%d) is lower than ideal (%d)", | |||||
hard, | |||||
want_fd, | |||||
) | |||||
if soft < want_fd: | |||||
want_fd = min(want_fd, hard) | |||||
logger.info( | |||||
"Soft limit of open file descriptors (%d) is too low, increasing to %d", | |||||
olasd: The logging is inconsistent with the actual new value of the limit, which may end up surprising… | |||||
soft, | |||||
want_fd, | |||||
) | |||||
resource.setrlimit(resource.RLIMIT_NOFILE, (want_fd, hard)) | |||||
def importcls(clspath): | def importcls(clspath): | ||||
mod, cls = clspath.split(":") | mod, cls = clspath.split(":") | ||||
m = import_module(mod) | m = import_module(mod) | ||||
return getattr(m, cls) | return getattr(m, cls) | ||||
exporter_cls = dict( | exporter_cls = dict( | ||||
(fmt, importcls(clspath)) | (fmt, importcls(clspath)) | ||||
for (fmt, clspath) in AVAILABLE_EXPORTERS.items() | for (fmt, clspath) in AVAILABLE_EXPORTERS.items() | ||||
▲ Show 20 Lines • Show All 132 Lines • Show Last 20 Lines |
The logging is inconsistent with the actual new value of the limit, which may end up surprising people