diff --git a/swh/provenance/postgresql/provenancedb_base.py b/swh/provenance/postgresql/provenancedb_base.py --- a/swh/provenance/postgresql/provenancedb_base.py +++ b/swh/provenance/postgresql/provenancedb_base.py @@ -102,57 +102,33 @@ self.read_cache[table][sha1] = date return dates - def insert_all(self): + def insert_entity(self, entity): # Perform insertions with cached information - if self.write_cache["content"]: + if self.write_cache[entity]: psycopg2.extras.execute_values( self.cursor, - """ - LOCK TABLE ONLY content; - INSERT INTO content(sha1, date) VALUES %s - ON CONFLICT (sha1) DO - UPDATE SET date=LEAST(EXCLUDED.date,content.date) + f""" + LOCK TABLE ONLY {entity}; + INSERT INTO {entity}(sha1, date) VALUES %s + ON CONFLICT (sha1) DO + UPDATE SET date=LEAST(EXCLUDED.date,{entity}.date) """, - self.write_cache["content"].items(), + self.write_cache[entity].items(), ) - self.write_cache["content"].clear() + self.write_cache[entity].clear() - if self.write_cache["directory"]: - psycopg2.extras.execute_values( - self.cursor, - """ - LOCK TABLE ONLY directory; - INSERT INTO directory(sha1, date) VALUES %s - ON CONFLICT (sha1) DO - UPDATE SET date=LEAST(EXCLUDED.date,directory.date) - """, - self.write_cache["directory"].items(), - ) - self.write_cache["directory"].clear() - - if self.write_cache["revision"]: - psycopg2.extras.execute_values( - self.cursor, - """ - LOCK TABLE ONLY revision; - INSERT INTO revision(sha1, date) VALUES %s - ON CONFLICT (sha1) DO - UPDATE SET date=LEAST(EXCLUDED.date,revision.date) - """, - self.write_cache["revision"].items(), - ) - self.write_cache["revision"].clear() - - # Relations should come after ids for elements were resolved - if self.write_cache["content_early_in_rev"]: - self.insert_location("content", "revision", "content_early_in_rev") - - if self.write_cache["content_in_dir"]: - self.insert_location("content", "directory", "content_in_dir") + def insert_all(self): + # First insert entities + self.insert_entity("content") + self.insert_entity("directory") + self.insert_entity("revision") - if self.write_cache["directory_in_rev"]: - self.insert_location("directory", "revision", "directory_in_rev") + # Relations should come after ids for entities were resolved + self.insert_relation("content", "revision", "content_early_in_rev") + self.insert_relation("content", "directory", "content_in_dir") + self.insert_relation("directory", "revision", "directory_in_rev") + # TODO: this should be updated when origin-revision layer gets properly updated. # if self.write_cache["revision_before_rev"]: # psycopg2.extras.execute_values( # self.cursor, @@ -164,7 +140,7 @@ # self.write_cache["revision_before_rev"], # ) # self.write_cache["revision_before_rev"].clear() - + # # if self.write_cache["revision_in_org"]: # psycopg2.extras.execute_values( # self.cursor, diff --git a/swh/provenance/postgresql/provenancedb_with_path.py b/swh/provenance/postgresql/provenancedb_with_path.py --- a/swh/provenance/postgresql/provenancedb_with_path.py +++ b/swh/provenance/postgresql/provenancedb_with_path.py @@ -94,64 +94,65 @@ (directory.id, revision.id, normalize(path)) ) - def insert_location(self, src0_table, src1_table, dst_table): - """Insert location entries in `dst_table` from the write_cache + def insert_relation(self, src, dst, relation): + """Insert entries in `relation` from the write_cache Also insert missing location entries in the 'location' table. """ - # TODO: find a better way of doing this; might be doable in a coupls of - # SQL queries (one to insert missing entries in the location' table, - # one to insert entries in the dst_table) - - # Resolve src0 ids - src0_sha1s = tuple(set(sha1 for (sha1, _, _) in self.write_cache[dst_table])) - fmt = ",".join(["%s"] * len(src0_sha1s)) - self.cursor.execute( - f"""SELECT sha1, id FROM {src0_table} WHERE sha1 IN ({fmt})""", - src0_sha1s, - ) - src0_values = dict(self.cursor.fetchall()) - - # Resolve src1 ids - src1_sha1s = tuple(set(sha1 for (_, sha1, _) in self.write_cache[dst_table])) - fmt = ",".join(["%s"] * len(src1_sha1s)) - self.cursor.execute( - f"""SELECT sha1, id FROM {src1_table} WHERE sha1 IN ({fmt})""", - src1_sha1s, - ) - src1_values = dict(self.cursor.fetchall()) - - # insert missing locations - locations = tuple(set((loc,) for (_, _, loc) in self.write_cache[dst_table])) - psycopg2.extras.execute_values( - self.cursor, - """ - LOCK TABLE ONLY location; - INSERT INTO location(path) VALUES %s - ON CONFLICT (path) DO NOTHING - """, - locations, - ) - # fetch location ids - fmt = ",".join(["%s"] * len(locations)) - self.cursor.execute( - f"SELECT path, id FROM location WHERE path IN ({fmt})", - locations, - ) - loc_ids = dict(self.cursor.fetchall()) - - # Insert values in dst_table - rows = [ - (src0_values[sha1_src], src1_values[sha1_dst], loc_ids[loc]) - for (sha1_src, sha1_dst, loc) in self.write_cache[dst_table] - ] - psycopg2.extras.execute_values( - self.cursor, - f""" - LOCK TABLE ONLY {dst_table}; - INSERT INTO {dst_table} VALUES %s - ON CONFLICT DO NOTHING - """, - rows, - ) - self.write_cache[dst_table].clear() + if self.write_cache[relation]: + # TODO: find a better way of doing this; might be doable in a couple of + # SQL queries (one to insert missing entries in the location' table, + # one to insert entries in the relation) + + # Resolve src ids + src_sha1s = tuple(set(sha1 for (sha1, _, _) in self.write_cache[relation])) + fmt = ",".join(["%s"] * len(src_sha1s)) + self.cursor.execute( + f"""SELECT sha1, id FROM {src} WHERE sha1 IN ({fmt})""", + src_sha1s, + ) + src_values = dict(self.cursor.fetchall()) + + # Resolve dst ids + dst_sha1s = tuple(set(sha1 for (_, sha1, _) in self.write_cache[relation])) + fmt = ",".join(["%s"] * len(dst_sha1s)) + self.cursor.execute( + f"""SELECT sha1, id FROM {dst} WHERE sha1 IN ({fmt})""", + dst_sha1s, + ) + dst_values = dict(self.cursor.fetchall()) + + # insert missing locations + locations = tuple(set((loc,) for (_, _, loc) in self.write_cache[relation])) + psycopg2.extras.execute_values( + self.cursor, + """ + LOCK TABLE ONLY location; + INSERT INTO location(path) VALUES %s + ON CONFLICT (path) DO NOTHING + """, + locations, + ) + # fetch location ids + fmt = ",".join(["%s"] * len(locations)) + self.cursor.execute( + f"SELECT path, id FROM location WHERE path IN ({fmt})", + locations, + ) + loc_ids = dict(self.cursor.fetchall()) + + # Insert values in relation + rows = [ + (src_values[sha1_src], dst_values[sha1_dst], loc_ids[loc]) + for (sha1_src, sha1_dst, loc) in self.write_cache[relation] + ] + psycopg2.extras.execute_values( + self.cursor, + f""" + LOCK TABLE ONLY {relation}; + INSERT INTO {relation} VALUES %s + ON CONFLICT DO NOTHING + """, + rows, + ) + self.write_cache[relation].clear() diff --git a/swh/provenance/postgresql/provenancedb_without_path.py b/swh/provenance/postgresql/provenancedb_without_path.py --- a/swh/provenance/postgresql/provenancedb_without_path.py +++ b/swh/provenance/postgresql/provenancedb_without_path.py @@ -100,41 +100,42 @@ ): self.write_cache["directory_in_rev"].add((directory.id, revision.id)) - def insert_location(self, src0_table, src1_table, dst_table): - # Resolve src0 ids - src0_values = dict().fromkeys( - map(operator.itemgetter(0), self.write_cache[dst_table]) - ) - values = ", ".join(itertools.repeat("%s", len(src0_values))) - self.cursor.execute( - f"""SELECT sha1, id FROM {src0_table} WHERE sha1 IN ({values})""", - tuple(src0_values), - ) - src0_values = dict(self.cursor.fetchall()) + def insert_relation(self, src, dst, relation): + if self.write_cache[relation]: + # Resolve src ids + src_values = dict().fromkeys( + map(operator.itemgetter(0), self.write_cache[relation]) + ) + values = ", ".join(itertools.repeat("%s", len(src_values))) + self.cursor.execute( + f"""SELECT sha1, id FROM {src} WHERE sha1 IN ({values})""", + tuple(src_values), + ) + src_values = dict(self.cursor.fetchall()) - # Resolve src1 ids - src1_values = dict().fromkeys( - map(operator.itemgetter(1), self.write_cache[dst_table]) - ) - values = ", ".join(itertools.repeat("%s", len(src1_values))) - self.cursor.execute( - f"""SELECT sha1, id FROM {src1_table} WHERE sha1 IN ({values})""", - tuple(src1_values), - ) - src1_values = dict(self.cursor.fetchall()) + # Resolve dst ids + dst_values = dict().fromkeys( + map(operator.itemgetter(1), self.write_cache[relation]) + ) + values = ", ".join(itertools.repeat("%s", len(dst_values))) + self.cursor.execute( + f"""SELECT sha1, id FROM {dst} WHERE sha1 IN ({values})""", + tuple(dst_values), + ) + dst_values = dict(self.cursor.fetchall()) - # Insert values in dst_table - rows = map( - lambda row: (src0_values[row[0]], src1_values[row[1]]), - self.write_cache[dst_table], - ) - psycopg2.extras.execute_values( - self.cursor, - f""" - LOCK TABLE ONLY {dst_table}; - INSERT INTO {dst_table} VALUES %s - ON CONFLICT DO NOTHING - """, - rows, - ) - self.write_cache[dst_table].clear() + # Insert values in relation + rows = map( + lambda row: (src_values[row[0]], dst_values[row[1]]), + self.write_cache[relation], + ) + psycopg2.extras.execute_values( + self.cursor, + f""" + LOCK TABLE ONLY {relation}; + INSERT INTO {relation} VALUES %s + ON CONFLICT DO NOTHING + """, + rows, + ) + self.write_cache[relation].clear()