Changeset View
Changeset View
Standalone View
Standalone View
swh/clearlydefined/read.py
- This file was added.
# Copyright (C) 2017-2021 The Software Heritage developers | |||||
# See the AUTHORS file at the top-level directory of this distribution | |||||
# License: GNU Affero General Public License version 3, or any later version | |||||
# See top-level LICENSE file for more information | |||||
import gzip | |||||
import psycopg2 | |||||
from swh.clearlydefined.mapping_utils import map_harvest | |||||
from swh.clearlydefined.mapping_utils import map_definition | |||||
def write_next_date(previous_date, new_date, clearcode_dsn): | |||||
update_connection = psycopg2.connect(dsn=clearcode_dsn) | |||||
cur = update_connection.cursor() | |||||
cur.execute( | |||||
"UPDATE last_run_date SET time= %s WHERE time= %s;", | |||||
( | |||||
new_date, | |||||
previous_date, | |||||
), | |||||
) | |||||
update_connection.commit() | |||||
def get_last_run_date(clearcode_dsn): | |||||
read_connection = psycopg2.connect(dsn=clearcode_dsn) | |||||
cur = read_connection.cursor() | |||||
cur.execute("SELECT * FROM last_run_date;") | |||||
row = cur.fetchall()[0] | |||||
date = row[0] | |||||
return date | |||||
def map_row(row, swh_dsn): | |||||
cd_path = row[0] | |||||
list_cd_path = cd_path.split("/") | |||||
metadata_string = gzip.decompress(row[1]).decode() | |||||
if metadata_string == "": | |||||
return False | |||||
if len(list_cd_path) == 9: | |||||
tool = list_cd_path[7] | |||||
return map_harvest(tool=tool, metadata_string=metadata_string, dsn=swh_dsn) | |||||
if len(list_cd_path) == 6: | |||||
return map_definition(metadata_string=metadata_string, dsn=swh_dsn) | |||||
def map_previously_unmapped_data(clearcode_dsn, swh_dsn): | |||||
read_connection = psycopg2.connect(dsn=clearcode_dsn) | |||||
cur = read_connection.cursor() | |||||
cur.execute("SELECT * FROM unmapped_data ;") | |||||
rows = cur.fetchall() | |||||
for row in rows: | |||||
cd_path = row[0] | |||||
cur.execute("SELECT * FROM clearcode_cditem WHERE path=%s;", (cd_path,)) | |||||
unmapped_row = cur.fetchall()[0] | |||||
if map_row(row=unmapped_row, swh_dsn=swh_dsn): | |||||
cur.execute("DELETE * FROM unmapped_data WHERE path= %s;", (cd_path,)) | |||||
read_connection.commit() | |||||
def write_in_not_mapped(cd_path, clearcode_dsn): | |||||
write_connection = psycopg2.connect(dsn=clearcode_dsn) | |||||
cur = write_connection.cursor() | |||||
cur.execute("INSERT INTO unmapped_data (path) VALUES (%s):", (cd_path,)) | |||||
write_connection.commit() | |||||
def read_from_clearcode_and_write_in_swh(clearcode_dsn, swh_dsn, date=None): | |||||
read_connection = psycopg2.connect(dsn=clearcode_dsn) | |||||
cur = read_connection.cursor() | |||||
cur.execute("SELECT * FROM clearcode_cditem ORDER BY last_modified_date DESC;") | |||||
rows = cur.fetchall() | |||||
if date: | |||||
new_date = rows[0][2] | |||||
write_next_date( | |||||
previous_date=date, new_date=new_date, clearcode_dsn=clearcode_dsn | |||||
) | |||||
for row in rows: | |||||
cd_path = row[0] | |||||
if date and row[2] <= date: | |||||
return | |||||
mapped = map_row(row=row, swh_dsn=swh_dsn) | |||||
if not mapped: | |||||
write_in_not_mapped(cd_path=cd_path, clearcode_dsn=clearcode_dsn) | |||||
def main(clearcode_dsn, swh_dsn): | |||||
map_previously_unmapped_data(clearcode_dsn=clearcode_dsn, swh_dsn=swh_dsn) | |||||
date = get_last_run_date(clearcode_dsn=clearcode_dsn) | |||||
read_from_clearcode_and_write_in_swh( | |||||
clearcode_dsn=clearcode_dsn, swh_dsn=swh_dsn, date=date | |||||
) |