Changeset View
Standalone View
swh/clearlydefined/orchestrator.py
- This file was added.
# Copyright (C) 2021 The Software Heritage developers | |||||
# See the AUTHORS file at the top-level directory of this distribution | |||||
# License: GNU Affero General Public License version 3, or any later version | |||||
# See top-level LICENSE file for more information | |||||
import psycopg2 | |||||
from typing import Optional, Tuple, Dict | |||||
from datetime import datetime | |||||
from swh.model.model import ( | |||||
MetadataAuthority, | |||||
MetadataAuthorityType, | |||||
MetadataFetcher, | |||||
Origin, | |||||
RawExtrinsicMetadata, | |||||
MetadataTargetType, | |||||
) | |||||
from swh.model.identifiers import parse_swhid | |||||
import attr | |||||
import json | |||||
from swh.clearlydefined.mapping_utils import map_row | |||||
from swh.clearlydefined.mapping_utils import get_type_of_tool | |||||
def write_in_storage( | |||||
storage, | |||||
data: Tuple[str, MetadataTargetType, Optional[Origin], Dict], | |||||
date: datetime, | |||||
) -> None: | |||||
""" | |||||
Take storage and data as input and write | |||||
data inside RawExtrensicMetadata table inside | |||||
swh storage | |||||
""" | |||||
metadata = RawExtrinsicMetadata( | |||||
type=data[1], | |||||
target=parse_swhid(data[0]), | |||||
discovery_date=date, | |||||
authority=attr.evolve(get_metadata_authority(), metadata=None), | |||||
fetcher=attr.evolve(get_metadata_fetcher(), metadata=None), | |||||
format="json", | |||||
origin=data[2].url if isinstance(data[2], Origin) else None, | |||||
metadata=json.dumps(data[3]).encode("utf-8"), | |||||
) | |||||
vlorentz: this belongs in the mapping, not the orchestrator. | |||||
storage.raw_extrinsic_metadata_add([metadata]) | |||||
def get_metadata_authority() -> MetadataAuthority: | |||||
""" | |||||
return MetadataAuthority | |||||
""" | |||||
return MetadataAuthority( | |||||
type=MetadataAuthorityType.DEPOSIT_CLIENT, | |||||
Done Inline Actionsin the sql/ dir. and you shouldn't need ON CONFLICT vlorentz: in the sql/ dir.
and you shouldn't need `ON CONFLICT` | |||||
Done Inline ActionsCan you elaborate on this ? TG1999: Can you elaborate on this ? | |||||
Done Inline Actionsall the initialization should be in sql/30-schema.sql, like swh-storage does. vlorentz: all the initialization should be in `sql/30-schema.sql`, like swh-storage does. | |||||
Done Inline ActionsOkay, then can I remove init_tables function? TG1999: Okay, then can I remove init_tables function? | |||||
url="https://clearlydefined.io/", | |||||
metadata={}, | |||||
) | |||||
def get_metadata_fetcher() -> MetadataFetcher: | |||||
""" | |||||
return MetadataFetcher | |||||
""" | |||||
return MetadataFetcher( | |||||
name="swh-clearlydefined", | |||||
version="0.0.1", | |||||
metadata={}, | |||||
) | |||||
Done Inline Actionsuse constants instead of functions. vlorentz: use constants instead of functions. | |||||
def init_tables(cursor, connection) -> None: | |||||
""" | |||||
Take connection and cursor as input and initialize tables if | |||||
they don't exists | |||||
""" | |||||
cursor.execute( | |||||
"""CREATE TABLE IF NOT EXISTS unmapped_data (path VARCHAR PRIMARY KEY); | |||||
CREATE TABLE IF NOT EXISTS last_run_date (time TIMESTAMPTZ);""" | |||||
) | |||||
connection.commit() | |||||
Done Inline Actionswe already have a way to initialize databases in a standard way across SWH. It also includes a "dbversion" field so we can keep track of migrations. Check out swh/storage/sql and sql/upgrades/ in swh-storage, and swh/core/db/ in swh-core. vlorentz: we already have a way to initialize databases in a standard way across SWH. It also includes a… | |||||
def init_storage(storage) -> None: | |||||
Done Inline Actionsmissing type annotation (StorageInterface) vlorentz: missing type annotation (`StorageInterface`) | |||||
""" | |||||
Take storage as input and add MetadataFetcher, MetadataAuthority inside storage | |||||
""" | |||||
metadata_authority = get_metadata_authority() | |||||
storage.metadata_authority_add([metadata_authority]) | |||||
Done Inline ActionsIn what state? And what is the returned value? vlorentz: In what state?
And what is the returned value? | |||||
metadata_fetcher = get_metadata_fetcher() | |||||
storage.metadata_fetcher_add([metadata_fetcher]) | |||||
def write_next_date(cursor, update_connection, previous_date, new_date) -> None: | |||||
Done Inline Actionsmissing type annotations. vlorentz: missing type annotations. | |||||
""" | |||||
Take cursor, update_connection, previous_date, new_date as input | |||||
and if it previous_date is None, then enter new_date, else | |||||
update the previous_date with new_date | |||||
""" | |||||
if not previous_date: | |||||
cursor.execute("INSERT into last_run_date VALUES(%s)", (new_date,)) | |||||
Done Inline Actionswhat if there is already a line in last_run_date? vlorentz: what if there is already a line in `last_run_date`? | |||||
Done Inline Actionsprevious_date ensures that, if we have don't have a previous date, then only we will insert value TG1999: previous_date ensures that, if we have don't have a previous date, then only we will insert… | |||||
else: | |||||
cursor.execute( | |||||
"UPDATE last_run_date SET time= %s WHERE time= %s;", | |||||
( | |||||
new_date, | |||||
previous_date, | |||||
), | |||||
Done Inline ActionsThis is neither a good signature or a good docstring. Readers have no idea what row should be. It's also very easy to pass the wrong fields in row or pass them in the wrong order Instead, either use a class, or use a dict and document every field. The name also isn't very explicit; there should at least be a verb or a noun representing an action in function names, because functions do something. orchestor_row sounds like a variable or class name, because it does not describe an action/computation vlorentz: This is neither a good signature or a good docstring. Readers have no idea what `row` should be. | |||||
) | |||||
update_connection.commit() | |||||
Done Inline Actionsinstead of a table for just one value; you could create a table that will hold this kind of data; for example with a "key" and a value" column. For now it will only have one row, but we may need to add more in the future. vlorentz: instead of a table for just one value; you could create a table that will hold this kind of… | |||||
def get_last_run_date(cursor) -> Optional[datetime]: | |||||
""" | |||||
Take cursor as input and get last run date from which | |||||
new rows will be orchestered, return None if it's first | |||||
orchestration | |||||
""" | |||||
cursor.execute("SELECT * FROM last_run_date;") | |||||
rows = cursor.fetchall() | |||||
if len(rows) < 1: | |||||
return None | |||||
date = rows[0][0] | |||||
Done Inline Actionswhat if there is more than one? vlorentz: what if there is more than one? | |||||
return date | |||||
def orchestor_row(storage, cursor, connection, row) -> None: | |||||
""" | |||||
Take storage, cursor, connection, row as input | |||||
and orchestor that row | |||||
Done Inline Actionswhat? vlorentz: what? | |||||
""" | |||||
mapped = map_row(row=row, storage=storage) | |||||
if not mapped: | |||||
write_in_not_mapped(cd_path=row[0], cursor=cursor, write_connection=connection) | |||||
else: | |||||
Done Inline ActionsIf I make this code run unconditionally, tests still pass. So either the conditional is useless or there is a test missing vlorentz: If I make this code run unconditionally, tests still pass. So either the conditional is useless… | |||||
Done Inline ActionsHey, your observation is correct, can you tell me how you inferred that, so I can correct this part of the code, since I have asserted orchestrate_row it also returns False (when needed), and I thought line 169in test_orchestrator, was covering this part, since we had 1 row in unmapped_data before running orchestrator again on 168, and it left 1 row after, so its not getting deleted as well, I am a little bit confused TG1999: Hey, your observation is correct, can you tell me how you inferred that, so I can correct this… | |||||
mapping_status, data_list = mapped | |||||
if not mapping_status: | |||||
write_in_not_mapped( | |||||
cd_path=row[0], cursor=cursor, write_connection=connection | |||||
) | |||||
for data in data_list: | |||||
write_in_storage(storage=storage, data=data, date=row[2]) | |||||
Done Inline ActionsI don't understand what is happening here. vlorentz: I don't understand what is happening here. | |||||
Done Inline ActionsI still don't. I don't see a change in the code and there are still no comments. (note: this comment applies to the second branch of orchestor_row) vlorentz: I still don't. I don't see a change in the code and there are still no comments. (note: this… | |||||
Done Inline ActionsI have updated the doc strings, if any other change is required can you elaborate on that? TG1999: I have updated the doc strings, if any other change is required can you elaborate on that? | |||||
Done Inline ActionsMostly, what is the content of mapped/data_list? It's rather confusing that the variable mapped seems to be related to the row that is not yet mapped. vlorentz: Mostly, what is the content of `mapped`/`data_list`?
It's rather confusing that the variable… | |||||
def map_previously_unmapped_data(storage, cursor, connection) -> None: | |||||
""" | |||||
Take storage, cursor, connection as input and map previously | |||||
unmapped data | |||||
""" | |||||
cursor.execute("SELECT * FROM unmapped_data ;") | |||||
rows = cursor.fetchall() | |||||
for row in rows: | |||||
cd_path = row[0] | |||||
cursor.execute("DELETE FROM unmapped_data WHERE path=%s", (cd_path,)) | |||||
connection.commit() | |||||
cursor.execute("SELECT * FROM clearcode_cditem WHERE path=%s;", (cd_path,)) | |||||
unmapped_row = cursor.fetchall()[0] | |||||
orchestor_row( | |||||
storage=storage, | |||||
row=unmapped_row, | |||||
cursor=cursor, | |||||
connection=connection, | |||||
) | |||||
Done Inline ActionsIf I understand this correct, your code first marks a row as mapped, then actually maps it. This is incorrect, because if the process crash between the two steps, the row will be incorrectly considered mapped. vlorentz: If I understand this correct, your code first marks a row as mapped, then actually maps it. | |||||
def write_in_not_mapped(cursor, write_connection, cd_path) -> None: | |||||
""" | |||||
Take cursor, write_connection, cd_path as input | |||||
and write 'cd_path' if 'cd_path' does not exists | |||||
inside unmapped_data | |||||
""" | |||||
cursor.execute("SELECT * FROM unmapped_data WHERE path=%s;", (cd_path,)) | |||||
if len(cursor.fetchall()) == 1: | |||||
return | |||||
cursor.execute("INSERT INTO unmapped_data (path) VALUES (%s);", (cd_path,)) | |||||
Done Inline Actionsuse an upsert instead vlorentz: use an upsert instead | |||||
write_connection.commit() | |||||
return | |||||
def read_from_clearcode_and_write_in_swh( | |||||
storage, cursor, connection, date: Optional[datetime] | |||||
) -> None: | |||||
""" | |||||
Take storage, cursor, connection, date as input | |||||
and read from clearcode database and write only | |||||
the data that is discovered after 'date' in swh storage | |||||
""" | |||||
if date: | |||||
cursor.execute( | |||||
"SELECT * FROM clearcode_cditem " | |||||
"WHERE last_modified_date < %s " | |||||
"ORDER BY last_modified_date DESC;", | |||||
(date,), | |||||
) | |||||
else: | |||||
cursor.execute( | |||||
"SELECT * FROM clearcode_cditem ORDER BY last_modified_date DESC;" | |||||
) | |||||
rows = cursor.fetchall() | |||||
new_date = rows[0][2] | |||||
write_next_date( | |||||
cursor=cursor, | |||||
update_connection=connection, | |||||
previous_date=date, | |||||
new_date=new_date, | |||||
) | |||||
Done Inline Actionssame issue as above regarding the order vlorentz: same issue as above regarding the order | |||||
for row in rows: | |||||
tool = get_type_of_tool(row[0]) | |||||
if tool == "fossology": | |||||
pass | |||||
else: | |||||
orchestor_row( | |||||
storage=storage, cursor=cursor, connection=connection, row=row | |||||
) | |||||
def orchestrator(storage, clearcode_dsn: str) -> None: | |||||
""" | |||||
Take clearcode_dsn, swh_storage_backend_config as input | |||||
and write data periodically from clearcode database to | |||||
swh raw extrensic metadata | |||||
""" | |||||
connection = psycopg2.connect(dsn=clearcode_dsn) | |||||
cursor = connection.cursor() | |||||
init_tables(cursor=cursor, connection=connection) | |||||
init_storage(storage=storage) | |||||
map_previously_unmapped_data(storage=storage, cursor=cursor, connection=connection) | |||||
date = get_last_run_date(cursor=cursor) | |||||
read_from_clearcode_and_write_in_swh( | |||||
storage=storage, cursor=cursor, connection=connection, date=date | |||||
) |
this belongs in the mapping, not the orchestrator.