Changeset View
Changeset View
Standalone View
Standalone View
swh/clearlydefined/orchestrator.py
# Copyright (C) 2021 The Software Heritage developers | # Copyright (C) 2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU Affero General Public License version 3, or any later version | # License: GNU Affero General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from datetime import datetime | from datetime import datetime | ||||
from typing import Optional | from typing import List, Optional | ||||
import attr | import attr | ||||
import psycopg2 | |||||
import dateutil | import dateutil | ||||
import psycopg2 | |||||
from swh.clearlydefined.mapping_utils import ( | from swh.clearlydefined.mapping_utils import ( | ||||
AUTHORITY, | AUTHORITY, | ||||
FETCHER, | FETCHER, | ||||
MappingStatus, | |||||
get_type_of_tool, | get_type_of_tool, | ||||
map_row, | map_row, | ||||
) | ) | ||||
from swh.model.model import RawExtrinsicMetadata | from swh.model.model import RawExtrinsicMetadata | ||||
from swh.storage.interface import StorageInterface | from swh.storage.interface import StorageInterface | ||||
class Row: | class Row: | ||||
▲ Show 20 Lines • Show All 52 Lines • ▼ Show 20 Lines | def get_last_run_date(cursor) -> Optional[datetime]: | ||||
cursor.execute("SELECT value FROM clearcode_env WHERE key='date';") | cursor.execute("SELECT value FROM clearcode_env WHERE key='date';") | ||||
rows = cursor.fetchall() | rows = cursor.fetchall() | ||||
if len(rows) < 1: | if len(rows) < 1: | ||||
return None | return None | ||||
date = rows[0][0] | date = rows[0][0] | ||||
return dateutil.parser.isoparse(date) | return dateutil.parser.isoparse(date) | ||||
def orchestrate_row(storage: StorageInterface, cursor, connection, row: Row) -> bool: | def write_data_from_list( | ||||
storage: StorageInterface, metadata_list: List[RawExtrinsicMetadata] | |||||
): | |||||
""" | |||||
Take list of RawExtrinsicMetadata and | |||||
write in storage | |||||
""" | |||||
for data in metadata_list: | |||||
write_in_storage(storage=storage, metadata=data) | |||||
def orchestrate_row( | |||||
storage: StorageInterface, cursor, connection, row: Row | |||||
) -> Optional[bool]: | |||||
""" | """ | ||||
Take storage, cursor, connection, row as input | Take storage, cursor, connection, row as input | ||||
and if able to completely map that row then write | and if able to completely map that row then write | ||||
data in storage, else store the ID in unmapped_data | data in storage, else store the ID in unmapped_data | ||||
table and return mapping_status of that row | table and return true if that row is fully mapped | ||||
false for partial or no mapping | |||||
vlorentz: It does not return a `MappingStatus` | |||||
""" | """ | ||||
able_to_be_mapped = map_row( | able_to_be_mapped = map_row( | ||||
metadata=row.metadata, id=row.path, date=row.date, storage=storage | metadata=row.metadata, id=row.path, date=row.date, storage=storage | ||||
) | ) | ||||
if not able_to_be_mapped: | |||||
mapping_status, metadata_list = able_to_be_mapped | |||||
if mapping_status == MappingStatus.IGNORE: | |||||
return None | |||||
Done Inline Actionsdon't need the pass vlorentz: don't need the `pass` | |||||
elif mapping_status == MappingStatus.UNMAPPED: | |||||
# This is a case when no metadata of row is not able to be mapped | # This is a case when no metadata of row is not able to be mapped | ||||
write_in_not_mapped( | write_in_not_mapped( | ||||
cd_path=row.path, cursor=cursor, write_connection=connection | cd_path=row.path, cursor=cursor, write_connection=connection | ||||
) | ) | ||||
write_data_from_list(storage=storage, metadata_list=metadata_list) | |||||
return False | return False | ||||
else: | else: | ||||
# This is a case when partial metadata of that row is able to be mapped | # This is a case when partial metadata of that row is able to be mapped | ||||
mapping_status, metadata_list = able_to_be_mapped | write_data_from_list(storage=storage, metadata_list=metadata_list) | ||||
if not mapping_status: | return True | ||||
write_in_not_mapped( | |||||
cd_path=row.path, cursor=cursor, write_connection=connection | |||||
) | |||||
for data in metadata_list: | |||||
write_in_storage(storage=storage, metadata=data) | |||||
return mapping_status | |||||
def map_previously_unmapped_data(storage: StorageInterface, cursor, connection) -> None: | def map_previously_unmapped_data(storage: StorageInterface, cursor, connection) -> None: | ||||
""" | """ | ||||
Take storage, cursor, connection as input and map previously | Take storage, cursor, connection as input and map previously | ||||
unmapped data | unmapped data | ||||
""" | """ | ||||
cursor.execute("SELECT path FROM unmapped_data ;") | cursor.execute("SELECT path FROM unmapped_data ;") | ||||
▲ Show 20 Lines • Show All 94 Lines • Show Last 20 Lines |
It does not return a MappingStatus