Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7437732
origin_contributors.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
7 KB
Subscribers
None
origin_contributors.py
View Options
# Copyright (C) 2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""
Luigi tasks for contribution graph
==================================
This module contains `Luigi <https://luigi.readthedocs.io/>`_ tasks
driving the creation of the graph of contributions of people (pseudonymized
by default).
"""
# WARNING: do not import unnecessary things here to keep cli startup time under
# control
from
pathlib
import
Path
from
typing
import
Dict
,
List
,
Tuple
import
luigi
from
.compressed_graph
import
LocalGraph
from
.misc_datasets
import
TopoSort
from
.utils
import
run_script
class
ListOriginContributors
(
luigi
.
Task
):
"""Creates a file that contains all SWHIDs in topological order from a compressed
graph."""
local_graph_path
=
luigi
.
PathParameter
()
topological_order_path
=
luigi
.
PathParameter
()
origin_contributors_path
=
luigi
.
PathParameter
()
graph_name
=
luigi
.
Parameter
(
default
=
"graph"
)
def
requires
(
self
)
->
List
[
luigi
.
Task
]:
"""Returns an instance of :class:`swh.graph.luigi.compressed_graph.LocalGraph`
and :class:`swh.graph.luigi.misc_datasets.TopoSort`."""
return
[
LocalGraph
(
local_graph_path
=
self
.
local_graph_path
),
TopoSort
(
local_graph_path
=
self
.
local_graph_path
,
topological_order_path
=
self
.
topological_order_path
,
graph_name
=
self
.
graph_name
,
),
]
def
output
(
self
)
->
luigi
.
Target
:
""".csv.zst file that contains the topological order."""
return
luigi
.
LocalTarget
(
self
.
origin_contributors_path
)
def
run
(
self
)
->
None
:
"""Runs org.softwareheritage.graph.utils.TopoSort and compresses"""
class_name
=
"org.softwareheritage.graph.utils.ListOriginContributors"
script
=
f
"""
zstdcat {self.topological_order_path} \
| java {class_name} '{self.local_graph_path}/{self.graph_name}' \
| pv --line-mode --wait \
| zstdmt -19
"""
run_script
(
script
,
self
.
origin_contributors_path
)
class
ExportDeanonymizationTable
(
luigi
.
Task
):
"""Exports (from swh-storage) a .csv.zst file that contains the columns:
``base64(sha256(full_name))`, ``base64(full_name)``, and ``escape(full_name)``.
The first column is the anonymized full name found in :file:`graph.persons.csv.zst`
in the compressed graph, and the latter two are the original name."""
storage_dsn
=
luigi
.
Parameter
(
default
=
"service=swh"
,
description
=
"postgresql DSN of the swh-storage database to read from."
,
)
deanonymization_table_path
=
luigi
.
PathParameter
()
def
output
(
self
)
->
luigi
.
Target
:
""".csv.zst file that contains the table."""
return
luigi
.
LocalTarget
(
self
.
deanonymization_table_path
)
def
run
(
self
)
->
None
:
"""Runs a postgresql query to compute the table."""
run_script
(
f
"""
psql '{self.storage_dsn}' -c "COPY (select encode(digest(fullname, 'sha256'), 'base64') as sha256_base64, encode(fullname, 'base64') as base64, encode(fullname, 'escape') as escaped from person) TO STDOUT CSV HEADER" | zstdmt -19
"""
,
# noqa
self
.
deanonymization_table_path
,
)
class
DeanonymizeOriginContributors
(
luigi
.
Task
):
"""Generates a .csv.zst file similar to :class:`ListOriginContributors`'s,
but with ``person_base64`` and ``person_escaped`` columns in addition to
``person_id``.
This assumes that :file:`graph.persons.csv.zst` is anonymized (SHA256 of names
instead of names); which may not be true depending on how the swh-dataset export
cas configured.
"""
local_graph_path
=
luigi
.
PathParameter
()
graph_name
=
luigi
.
Parameter
(
default
=
"graph"
)
origin_contributors_path
=
luigi
.
PathParameter
()
deanonymization_table_path
=
luigi
.
PathParameter
()
deanonymized_origin_contributors_path
=
luigi
.
PathParameter
()
def
requires
(
self
)
->
List
[
luigi
.
Task
]:
"""Returns instances of :class:`LocalGraph`, :class:`ListOriginContributors`,
and :class:`ExportDeanonymizationTable`."""
return
[
LocalGraph
(
local_graph_path
=
self
.
local_graph_path
),
ListOriginContributors
(
local_graph_path
=
self
.
local_graph_path
,
origin_contributors_path
=
self
.
origin_contributors_path
,
),
ExportDeanonymizationTable
(
deanonymization_table_path
=
self
.
deanonymization_table_path
,
),
]
def
output
(
self
)
->
luigi
.
Target
:
""".csv.zst file similar to :meth:`ListOriginContributors.output`'s,
but with ``person_base64`` and ``person_escaped`` columns in addition to
``person_id``"""
return
luigi
.
LocalTarget
(
self
.
deanonymized_origin_contributors_path
)
def
run
(
self
)
->
None
:
"""Loads the list of persons (``graph.persons.csv.zst`` in the graph dataset
and the deanonymization table in memory, then uses them to map each row
in the original (anonymized) contributors list to the deanonymized one."""
# TODO: .persons.csv.zst may be already deanonymized (if the swh-dataset export
# was configured to do so); this should add support for it.
import
base64
import
csv
import
pyzstd
# Load the deanonymization table, to map sha256(name) to base64(name)
# and escape(name)
sha256_to_names
:
Dict
[
bytes
,
Tuple
[
bytes
,
str
]]
=
{}
with
pyzstd
.
open
(
self
.
deanonymization_table_path
,
"rt"
)
as
fd
:
csv_reader
=
csv
.
reader
(
fd
)
header
=
next
(
csv_reader
)
assert
header
==
[
"sha256_base64"
,
"base64"
,
"escaped"
],
header
for
line
in
csv_reader
:
(
base64_sha256_name
,
base64_name
,
escaped_name
)
=
line
sha256_name
=
base64
.
b64decode
(
base64_sha256_name
)
name
=
base64
.
b64decode
(
base64_name
)
sha256_to_names
[
sha256_name
]
=
(
name
,
escaped_name
)
# Combine with the list of sha256(name), to get the list of base64(name)
# and escape(name)
persons_path
=
self
.
local_graph_path
/
f
"{self.graph_name}.persons.csv.zst"
with
pyzstd
.
open
(
persons_path
,
"rb"
)
as
fd
:
person_id_to_names
:
List
[
Tuple
[
bytes
,
str
]]
=
[
sha256_to_names
.
pop
(
base64
.
b64decode
(
line
.
strip
()),
(
b
""
,
""
))
for
line
in
fd
]
tmp_output_path
=
Path
(
f
"{self.deanonymized_origin_contributors_path}.tmp"
)
# Finally, write a new table of origin_contributors, by reading the anonymized
# table line-by-line and deanonymizing each id
# Open temporary output for writes as CSV
with
pyzstd
.
open
(
tmp_output_path
,
"wt"
)
as
output_fd
:
csv_writer
=
csv
.
writer
(
output_fd
,
lineterminator
=
"
\n
"
)
# write header
csv_writer
.
writerow
((
"origin_SWHID"
,
"person_base64"
,
"person_escaped"
))
# Open input for reads as CSV
with
pyzstd
.
open
(
self
.
origin_contributors_path
,
"rt"
)
as
input_fd
:
csv_reader
=
csv
.
reader
(
input_fd
)
header
=
next
(
csv_reader
)
assert
header
==
[
"origin_SWHID"
,
"person_id"
],
header
for
(
origin_swhid
,
person_id
)
in
csv_reader
:
if
person_id
==
"null"
:
# FIXME: workaround for a bug in contribution graphs generated
# before 2022-12-01. Those were only used in tests and never
# published, so the conditional can be removed when this is
# productionized
continue
(
name
,
escaped_name
)
=
person_id_to_names
[
int
(
person_id
)]
base64_name
=
base64
.
b64encode
(
name
)
.
decode
(
"ascii"
)
csv_writer
.
writerow
((
origin_swhid
,
base64_name
,
escaped_name
))
tmp_output_path
.
replace
(
self
.
deanonymized_origin_contributors_path
)
File Metadata
Details
Attached
Mime Type
text/x-python
Expires
Tue, Apr 15, 3:23 AM (9 h, 7 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3286504
Attached To
rDGRPH Compressed graph representation
Event Timeline
Log In to Comment