Changeset View
Standalone View
swh/clearlydefined/orchestrator.py
- This file was added.
# Copyright (C) 2021 The Software Heritage developers | |||||
# See the AUTHORS file at the top-level directory of this distribution | |||||
# License: GNU Affero General Public License version 3, or any later version | |||||
# See top-level LICENSE file for more information | |||||
from datetime import datetime | |||||
from typing import Optional | |||||
import attr | |||||
import psycopg2 | |||||
import dateutil | |||||
from swh.clearlydefined.mapping_utils import ( | |||||
AUTHORITY, | |||||
FETCHER, | |||||
get_type_of_tool, | |||||
map_row, | |||||
) | |||||
from swh.model.model import RawExtrinsicMetadata | |||||
from swh.storage.interface import StorageInterface | |||||
class Row: | |||||
def __init__(self, path, metadata, date): | |||||
self.path = path | |||||
self.metadata = metadata | |||||
self.date = date | |||||
def write_in_storage( | |||||
storage: StorageInterface, | |||||
metadata: RawExtrinsicMetadata, | |||||
) -> None: | |||||
""" | |||||
Take storage and metadata as input | |||||
and add metadata in storage | |||||
""" | |||||
storage.raw_extrinsic_metadata_add([metadata]) | |||||
def init_storage(storage: StorageInterface) -> None: | |||||
""" | |||||
Take storage as input and add MetadataFetcher, MetadataAuthority inside storage | |||||
""" | |||||
vlorentz: this belongs in the mapping, not the orchestrator. | |||||
storage.metadata_authority_add([attr.evolve(AUTHORITY, metadata={})]) | |||||
storage.metadata_fetcher_add([attr.evolve(FETCHER, metadata={})]) | |||||
def write_next_date( | |||||
cursor, update_connection, previous_date: Optional[datetime], new_date: datetime | |||||
) -> None: | |||||
""" | |||||
Take cursor, update_connection, previous_date, new_date as input | |||||
Done Inline Actionsin the sql/ dir. and you shouldn't need ON CONFLICT vlorentz: in the sql/ dir.
and you shouldn't need `ON CONFLICT` | |||||
Done Inline ActionsCan you elaborate on this ? TG1999: Can you elaborate on this ? | |||||
Done Inline Actionsall the initialization should be in sql/30-schema.sql, like swh-storage does. vlorentz: all the initialization should be in `sql/30-schema.sql`, like swh-storage does. | |||||
Done Inline ActionsOkay, then can I remove init_tables function? TG1999: Okay, then can I remove init_tables function? | |||||
and if it previous_date is None, then enter new_date, else | |||||
update the date stored in table with new_date | |||||
""" | |||||
if not previous_date: | |||||
cursor.execute( | |||||
"""INSERT into clearcode_env (key, value) VALUES(%s,%s)""", | |||||
("date", new_date), | |||||
) | |||||
else: | |||||
cursor.execute( | |||||
"""UPDATE clearcode_env SET value = %s WHERE key='date'""", | |||||
(new_date,), | |||||
) | |||||
update_connection.commit() | |||||
Done Inline Actionsuse constants instead of functions. vlorentz: use constants instead of functions. | |||||
def get_last_run_date(cursor) -> Optional[datetime]: | |||||
""" | |||||
Take cursor as input and get last run date from which | |||||
new rows will be orchestered, return None if it's first | |||||
orchestration | |||||
""" | |||||
cursor.execute("SELECT value FROM clearcode_env WHERE key='date';") | |||||
rows = cursor.fetchall() | |||||
if len(rows) < 1: | |||||
return None | |||||
Done Inline Actionswe already have a way to initialize databases in a standard way across SWH. It also includes a "dbversion" field so we can keep track of migrations. Check out swh/storage/sql and sql/upgrades/ in swh-storage, and swh/core/db/ in swh-core. vlorentz: we already have a way to initialize databases in a standard way across SWH. It also includes a… | |||||
date = rows[0][0] | |||||
return dateutil.parser.isoparse(date) | |||||
Done Inline Actionsmissing type annotation (StorageInterface) vlorentz: missing type annotation (`StorageInterface`) | |||||
def orchestrate_row(storage: StorageInterface, cursor, connection, row: Row) -> bool: | |||||
""" | |||||
Take storage, cursor, connection, row as input | |||||
and if able to completely map that row then write | |||||
Done Inline ActionsIn what state? And what is the returned value? vlorentz: In what state?
And what is the returned value? | |||||
data in storage, else store the ID in unmapped_data | |||||
table and return mapping_status of that row | |||||
""" | |||||
able_to_be_mapped = map_row( | |||||
metadata=row.metadata, id=row.path, date=row.date, storage=storage | |||||
Done Inline Actionsmissing type annotations. vlorentz: missing type annotations. | |||||
) | |||||
if not able_to_be_mapped: | |||||
# This is a case when no metadata of row is not able to be mapped | |||||
write_in_not_mapped( | |||||
cd_path=row.path, cursor=cursor, write_connection=connection | |||||
) | |||||
return False | |||||
Done Inline Actionswhat if there is already a line in last_run_date? vlorentz: what if there is already a line in `last_run_date`? | |||||
Done Inline Actionsprevious_date ensures that, if we have don't have a previous date, then only we will insert value TG1999: previous_date ensures that, if we have don't have a previous date, then only we will insert… | |||||
else: | |||||
# This is a case when partial metadata of that row is able to be mapped | |||||
mapping_status, metadata_list = able_to_be_mapped | |||||
if not mapping_status: | |||||
write_in_not_mapped( | |||||
cd_path=row.path, cursor=cursor, write_connection=connection | |||||
) | |||||
Done Inline ActionsThis is neither a good signature or a good docstring. Readers have no idea what row should be. It's also very easy to pass the wrong fields in row or pass them in the wrong order Instead, either use a class, or use a dict and document every field. The name also isn't very explicit; there should at least be a verb or a noun representing an action in function names, because functions do something. orchestor_row sounds like a variable or class name, because it does not describe an action/computation vlorentz: This is neither a good signature or a good docstring. Readers have no idea what `row` should be. | |||||
for data in metadata_list: | |||||
write_in_storage(storage=storage, metadata=data) | |||||
Done Inline Actionsinstead of a table for just one value; you could create a table that will hold this kind of data; for example with a "key" and a value" column. For now it will only have one row, but we may need to add more in the future. vlorentz: instead of a table for just one value; you could create a table that will hold this kind of… | |||||
return mapping_status | |||||
def map_previously_unmapped_data(storage: StorageInterface, cursor, connection) -> None: | |||||
""" | |||||
Take storage, cursor, connection as input and map previously | |||||
unmapped data | |||||
""" | |||||
cursor.execute("SELECT path FROM unmapped_data ;") | |||||
rows = cursor.fetchall() | |||||
for row in rows: | |||||
cd_path = row[0] | |||||
cursor.execute( | |||||
Done Inline Actionswhat if there is more than one? vlorentz: what if there is more than one? | |||||
"""SELECT path,content,last_modified_date FROM | |||||
clearcode_cditem WHERE path=%s;""", | |||||
(cd_path,), | |||||
) | |||||
unmapped_row = cursor.fetchall()[0] | |||||
if orchestrate_row( | |||||
storage=storage, | |||||
Done Inline Actionswhat? vlorentz: what? | |||||
row=Row( | |||||
path=unmapped_row[0], metadata=unmapped_row[1], date=unmapped_row[2] | |||||
), | |||||
cursor=cursor, | |||||
connection=connection, | |||||
Done Inline ActionsIf I make this code run unconditionally, tests still pass. So either the conditional is useless or there is a test missing vlorentz: If I make this code run unconditionally, tests still pass. So either the conditional is useless… | |||||
Done Inline ActionsHey, your observation is correct, can you tell me how you inferred that, so I can correct this part of the code, since I have asserted orchestrate_row it also returns False (when needed), and I thought line 169in test_orchestrator, was covering this part, since we had 1 row in unmapped_data before running orchestrator again on 168, and it left 1 row after, so its not getting deleted as well, I am a little bit confused TG1999: Hey, your observation is correct, can you tell me how you inferred that, so I can correct this… | |||||
): | |||||
cursor.execute("DELETE FROM unmapped_data WHERE path=%s", (cd_path,)) | |||||
connection.commit() | |||||
def write_in_not_mapped(cursor, write_connection, cd_path: str) -> None: | |||||
""" | |||||
Done Inline ActionsI don't understand what is happening here. vlorentz: I don't understand what is happening here. | |||||
Done Inline ActionsI still don't. I don't see a change in the code and there are still no comments. (note: this comment applies to the second branch of orchestor_row) vlorentz: I still don't. I don't see a change in the code and there are still no comments. (note: this… | |||||
Done Inline ActionsI have updated the doc strings, if any other change is required can you elaborate on that? TG1999: I have updated the doc strings, if any other change is required can you elaborate on that? | |||||
Done Inline ActionsMostly, what is the content of mapped/data_list? It's rather confusing that the variable mapped seems to be related to the row that is not yet mapped. vlorentz: Mostly, what is the content of `mapped`/`data_list`?
It's rather confusing that the variable… | |||||
Take cursor, write_connection, cd_path as input | |||||
and write 'cd_path' if 'cd_path' does not exists | |||||
inside unmapped_data | |||||
""" | |||||
cursor.execute( | |||||
"INSERT INTO unmapped_data (path) VALUES (%s) ON CONFLICT (path) DO NOTHING;", | |||||
(cd_path,), | |||||
) | |||||
write_connection.commit() | |||||
return | |||||
def read_from_clearcode_and_write_in_swh( | |||||
storage: StorageInterface, cursor, connection, date: Optional[datetime] | |||||
) -> None: | |||||
""" | |||||
Take storage, cursor, connection, date as input | |||||
and read from clearcode database and write only | |||||
the data that is discovered after 'date' in swh storage. | |||||
'date' is the last discovery date of the object that was | |||||
stored at the time of previous run. | |||||
Done Inline ActionsIf I understand this correct, your code first marks a row as mapped, then actually maps it. This is incorrect, because if the process crash between the two steps, the row will be incorrectly considered mapped. vlorentz: If I understand this correct, your code first marks a row as mapped, then actually maps it. | |||||
""" | |||||
if date: | |||||
cursor.execute( | |||||
"SELECT path,content,last_modified_date FROM clearcode_cditem " | |||||
"WHERE last_modified_date < %s " | |||||
"ORDER BY last_modified_date DESC;", | |||||
(date,), | |||||
) | |||||
else: | |||||
cursor.execute( | |||||
"""SELECT path,content,last_modified_date FROM clearcode_cditem | |||||
ORDER BY last_modified_date DESC;""" | |||||
Done Inline Actionsuse an upsert instead vlorentz: use an upsert instead | |||||
) | |||||
rows = cursor.fetchall() | |||||
if len(rows) < 1: | |||||
return | |||||
new_date = rows[0][2] | |||||
write_next_date( | |||||
cursor=cursor, | |||||
update_connection=connection, | |||||
previous_date=date, | |||||
new_date=new_date, | |||||
) | |||||
for row in rows: | |||||
tool = get_type_of_tool(row[0]).value | |||||
if tool == "fossology": | |||||
pass | |||||
else: | |||||
orchestrate_row( | |||||
storage=storage, | |||||
cursor=cursor, | |||||
connection=connection, | |||||
row=Row(path=row[0], metadata=row[1], date=row[2]), | |||||
) | |||||
def orchestrator(storage: StorageInterface, clearcode_dsn: str) -> None: | |||||
""" | |||||
Take clearcode_dsn, swh_storage_backend_config as input | |||||
and write data periodically from clearcode database to | |||||
swh raw extrensic metadata | |||||
""" | |||||
connection = psycopg2.connect(dsn=clearcode_dsn) | |||||
Done Inline Actionssame issue as above regarding the order vlorentz: same issue as above regarding the order | |||||
cursor = connection.cursor() | |||||
init_storage(storage=storage) | |||||
map_previously_unmapped_data(storage=storage, cursor=cursor, connection=connection) | |||||
date = get_last_run_date(cursor=cursor) | |||||
read_from_clearcode_and_write_in_swh( | |||||
storage=storage, cursor=cursor, connection=connection, date=date | |||||
) |
this belongs in the mapping, not the orchestrator.