Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9344495
in_memory.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
17 KB
Subscribers
None
in_memory.py
View Options
# Copyright (C) 2018-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import
itertools
import
json
import
operator
import
math
import
re
from
collections
import
defaultdict
,
Counter
from
typing
import
Any
,
Dict
,
List
,
Optional
from
swh.core.collections
import
SortedList
from
swh.model.model
import
SHA1_SIZE
from
swh.model.hashutil
import
hash_to_hex
,
hash_to_bytes
from
swh.storage.utils
import
get_partition_bounds_bytes
from
.
import
MAPPING_NAMES
,
check_id_duplicates
from
.exc
import
IndexerStorageArgumentException
from
.interface
import
PagedResult
,
Sha1
SHA1_DIGEST_SIZE
=
160
def
_transform_tool
(
tool
):
return
{
"id"
:
tool
[
"id"
],
"name"
:
tool
[
"tool_name"
],
"version"
:
tool
[
"tool_version"
],
"configuration"
:
tool
[
"tool_configuration"
],
}
def
check_id_types
(
data
:
List
[
Dict
[
str
,
Any
]]):
"""Checks all elements of the list have an 'id' whose type is 'bytes'."""
if
not
all
(
isinstance
(
item
.
get
(
"id"
),
bytes
)
for
item
in
data
):
raise
IndexerStorageArgumentException
(
"identifiers must be bytes."
)
class
SubStorage
:
"""Implements common missing/get/add logic for each indexer type."""
def
__init__
(
self
,
tools
):
self
.
_tools
=
tools
self
.
_sorted_ids
=
SortedList
[
bytes
,
bytes
]()
self
.
_data
=
{}
# map (id_, tool_id) -> metadata_dict
self
.
_tools_per_id
=
defaultdict
(
set
)
# map id_ -> Set[tool_id]
def
missing
(
self
,
ids
):
"""List data missing from storage.
Args:
data (iterable): dictionaries with keys:
- **id** (bytes): sha1 identifier
- **indexer_configuration_id** (int): tool used to compute
the results
Yields:
missing sha1s
"""
for
id_
in
ids
:
tool_id
=
id_
[
"indexer_configuration_id"
]
id_
=
id_
[
"id"
]
if
tool_id
not
in
self
.
_tools_per_id
.
get
(
id_
,
set
()):
yield
id_
def
get
(
self
,
ids
):
"""Retrieve data per id.
Args:
ids (iterable): sha1 checksums
Yields:
dict: dictionaries with the following keys:
- **id** (bytes)
- **tool** (dict): tool used to compute metadata
- arbitrary data (as provided to `add`)
"""
for
id_
in
ids
:
for
tool_id
in
self
.
_tools_per_id
.
get
(
id_
,
set
()):
key
=
(
id_
,
tool_id
)
yield
{
"id"
:
id_
,
"tool"
:
_transform_tool
(
self
.
_tools
[
tool_id
]),
**
self
.
_data
[
key
],
}
def
get_all
(
self
):
yield from
self
.
get
(
self
.
_sorted_ids
)
def
get_partition
(
self
,
indexer_configuration_id
:
int
,
partition_id
:
int
,
nb_partitions
:
int
,
page_token
:
Optional
[
str
]
=
None
,
limit
:
int
=
1000
,
)
->
PagedResult
[
Sha1
]:
"""Retrieve ids of content with `indexer_type` within partition partition_id
bound by limit.
Args:
**indexer_type**: Type of data content to index (mimetype, language, etc...)
**indexer_configuration_id**: The tool used to index data
**partition_id**: index of the partition to fetch
**nb_partitions**: total number of partitions to split into
**page_token**: opaque token used for pagination
**limit**: Limit result (default to 1000)
**with_textual_data** (bool): Deal with only textual content (True) or all
content (all contents by defaults, False)
Raises:
IndexerStorageArgumentException for;
- limit to None
- wrong indexer_type provided
Returns:
PagedResult of Sha1. If next_page_token is None, there is no more data to
fetch
"""
if
limit
is
None
:
raise
IndexerStorageArgumentException
(
"limit should not be None"
)
(
start
,
end
)
=
get_partition_bounds_bytes
(
partition_id
,
nb_partitions
,
SHA1_SIZE
)
if
page_token
:
start
=
hash_to_bytes
(
page_token
)
if
end
is
None
:
end
=
b
"
\xff
"
*
SHA1_SIZE
next_page_token
:
Optional
[
str
]
=
None
ids
:
List
[
Sha1
]
=
[]
sha1s
=
(
sha1
for
sha1
in
self
.
_sorted_ids
.
iter_from
(
start
))
for
counter
,
sha1
in
enumerate
(
sha1s
):
if
sha1
>
end
:
break
if
counter
>=
limit
:
next_page_token
=
hash_to_hex
(
sha1
)
break
ids
.
append
(
sha1
)
assert
len
(
ids
)
<=
limit
return
PagedResult
(
results
=
ids
,
next_page_token
=
next_page_token
)
def
add
(
self
,
data
:
List
[
Dict
],
conflict_update
:
bool
)
->
int
:
"""Add data not present in storage.
Args:
data (iterable): dictionaries with keys:
- **id**: sha1
- **indexer_configuration_id**: tool used to compute the
results
- arbitrary data
conflict_update (bool): Flag to determine if we want to overwrite
(true) or skip duplicates (false)
"""
data
=
list
(
data
)
check_id_duplicates
(
data
)
count
=
0
for
item
in
data
:
item
=
item
.
copy
()
tool_id
=
item
.
pop
(
"indexer_configuration_id"
)
id_
=
item
.
pop
(
"id"
)
data_item
=
item
if
not
conflict_update
and
tool_id
in
self
.
_tools_per_id
.
get
(
id_
,
set
()):
# Duplicate, should not be updated
continue
key
=
(
id_
,
tool_id
)
self
.
_data
[
key
]
=
data_item
self
.
_tools_per_id
[
id_
]
.
add
(
tool_id
)
count
+=
1
if
id_
not
in
self
.
_sorted_ids
:
self
.
_sorted_ids
.
add
(
id_
)
return
count
def
add_merge
(
self
,
new_data
:
List
[
Dict
],
conflict_update
:
bool
,
merged_key
:
str
)
->
int
:
added
=
0
all_subitems
:
List
for
new_item
in
new_data
:
id_
=
new_item
[
"id"
]
tool_id
=
new_item
[
"indexer_configuration_id"
]
if
conflict_update
:
all_subitems
=
[]
else
:
existing
=
list
(
self
.
get
([
id_
]))
all_subitems
=
[
old_subitem
for
existing_item
in
existing
if
existing_item
[
"tool"
][
"id"
]
==
tool_id
for
old_subitem
in
existing_item
[
merged_key
]
]
for
new_subitem
in
new_item
[
merged_key
]:
if
new_subitem
not
in
all_subitems
:
all_subitems
.
append
(
new_subitem
)
added
+=
self
.
add
(
[
{
"id"
:
id_
,
"indexer_configuration_id"
:
tool_id
,
merged_key
:
all_subitems
,
}
],
conflict_update
=
True
,
)
if
id_
not
in
self
.
_sorted_ids
:
self
.
_sorted_ids
.
add
(
id_
)
return
added
def
delete
(
self
,
entries
:
List
[
Dict
])
->
int
:
"""Delete entries and return the number of entries deleted.
"""
deleted
=
0
for
entry
in
entries
:
(
id_
,
tool_id
)
=
(
entry
[
"id"
],
entry
[
"indexer_configuration_id"
])
key
=
(
id_
,
tool_id
)
if
tool_id
in
self
.
_tools_per_id
[
id_
]:
self
.
_tools_per_id
[
id_
]
.
remove
(
tool_id
)
if
key
in
self
.
_data
:
deleted
+=
1
del
self
.
_data
[
key
]
return
deleted
class
IndexerStorage
:
"""In-memory SWH indexer storage."""
def
__init__
(
self
):
self
.
_tools
=
{}
self
.
_mimetypes
=
SubStorage
(
self
.
_tools
)
self
.
_languages
=
SubStorage
(
self
.
_tools
)
self
.
_content_ctags
=
SubStorage
(
self
.
_tools
)
self
.
_licenses
=
SubStorage
(
self
.
_tools
)
self
.
_content_metadata
=
SubStorage
(
self
.
_tools
)
self
.
_revision_intrinsic_metadata
=
SubStorage
(
self
.
_tools
)
self
.
_origin_intrinsic_metadata
=
SubStorage
(
self
.
_tools
)
def
check_config
(
self
,
*
,
check_write
):
return
True
def
content_mimetype_missing
(
self
,
mimetypes
):
yield from
self
.
_mimetypes
.
missing
(
mimetypes
)
def
content_mimetype_get_partition
(
self
,
indexer_configuration_id
:
int
,
partition_id
:
int
,
nb_partitions
:
int
,
page_token
:
Optional
[
str
]
=
None
,
limit
:
int
=
1000
,
)
->
PagedResult
[
Sha1
]:
return
self
.
_mimetypes
.
get_partition
(
indexer_configuration_id
,
partition_id
,
nb_partitions
,
page_token
,
limit
)
def
content_mimetype_add
(
self
,
mimetypes
:
List
[
Dict
],
conflict_update
:
bool
=
False
)
->
Dict
[
str
,
int
]:
check_id_types
(
mimetypes
)
added
=
self
.
_mimetypes
.
add
(
mimetypes
,
conflict_update
)
return
{
"content_mimetype:add"
:
added
}
def
content_mimetype_get
(
self
,
ids
):
yield from
self
.
_mimetypes
.
get
(
ids
)
def
content_language_missing
(
self
,
languages
):
yield from
self
.
_languages
.
missing
(
languages
)
def
content_language_get
(
self
,
ids
):
yield from
self
.
_languages
.
get
(
ids
)
def
content_language_add
(
self
,
languages
:
List
[
Dict
],
conflict_update
:
bool
=
False
)
->
Dict
[
str
,
int
]:
check_id_types
(
languages
)
added
=
self
.
_languages
.
add
(
languages
,
conflict_update
)
return
{
"content_language:add"
:
added
}
def
content_ctags_missing
(
self
,
ctags
):
yield from
self
.
_content_ctags
.
missing
(
ctags
)
def
content_ctags_get
(
self
,
ids
):
for
item
in
self
.
_content_ctags
.
get
(
ids
):
for
item_ctags_item
in
item
[
"ctags"
]:
yield
{
"id"
:
item
[
"id"
],
"tool"
:
item
[
"tool"
],
**
item_ctags_item
}
def
content_ctags_add
(
self
,
ctags
:
List
[
Dict
],
conflict_update
:
bool
=
False
)
->
Dict
[
str
,
int
]:
check_id_types
(
ctags
)
added
=
self
.
_content_ctags
.
add_merge
(
ctags
,
conflict_update
,
"ctags"
)
return
{
"content_ctags:add"
:
added
}
def
content_ctags_search
(
self
,
expression
,
limit
=
10
,
last_sha1
=
None
):
nb_matches
=
0
for
((
id_
,
tool_id
),
item
)
in
sorted
(
self
.
_content_ctags
.
_data
.
items
()):
if
id_
<=
(
last_sha1
or
bytes
(
0
for
_
in
range
(
SHA1_DIGEST_SIZE
))):
continue
for
ctags_item
in
item
[
"ctags"
]:
if
ctags_item
[
"name"
]
!=
expression
:
continue
nb_matches
+=
1
yield
{
"id"
:
id_
,
"tool"
:
_transform_tool
(
self
.
_tools
[
tool_id
]),
**
ctags_item
,
}
if
nb_matches
>=
limit
:
return
def
content_fossology_license_get
(
self
,
ids
):
# Rewrites the output of SubStorage.get from the old format to
# the new one. SubStorage.get should be updated once all other
# *_get methods use the new format.
# See: https://forge.softwareheritage.org/T1433
res
=
{}
for
d
in
self
.
_licenses
.
get
(
ids
):
res
.
setdefault
(
d
.
pop
(
"id"
),
[])
.
append
(
d
)
for
(
id_
,
facts
)
in
res
.
items
():
yield
{
id_
:
facts
}
def
content_fossology_license_add
(
self
,
licenses
:
List
[
Dict
],
conflict_update
:
bool
=
False
)
->
Dict
[
str
,
int
]:
check_id_types
(
licenses
)
added
=
self
.
_licenses
.
add_merge
(
licenses
,
conflict_update
,
"licenses"
)
return
{
"fossology_license_add:add"
:
added
}
def
content_fossology_license_get_partition
(
self
,
indexer_configuration_id
:
int
,
partition_id
:
int
,
nb_partitions
:
int
,
page_token
:
Optional
[
str
]
=
None
,
limit
:
int
=
1000
,
)
->
PagedResult
[
Sha1
]:
return
self
.
_licenses
.
get_partition
(
indexer_configuration_id
,
partition_id
,
nb_partitions
,
page_token
,
limit
)
def
content_metadata_missing
(
self
,
metadata
):
yield from
self
.
_content_metadata
.
missing
(
metadata
)
def
content_metadata_get
(
self
,
ids
):
yield from
self
.
_content_metadata
.
get
(
ids
)
def
content_metadata_add
(
self
,
metadata
:
List
[
Dict
],
conflict_update
:
bool
=
False
)
->
Dict
[
str
,
int
]:
check_id_types
(
metadata
)
added
=
self
.
_content_metadata
.
add
(
metadata
,
conflict_update
)
return
{
"content_metadata:add"
:
added
}
def
revision_intrinsic_metadata_missing
(
self
,
metadata
):
yield from
self
.
_revision_intrinsic_metadata
.
missing
(
metadata
)
def
revision_intrinsic_metadata_get
(
self
,
ids
):
yield from
self
.
_revision_intrinsic_metadata
.
get
(
ids
)
def
revision_intrinsic_metadata_add
(
self
,
metadata
:
List
[
Dict
],
conflict_update
:
bool
=
False
)
->
Dict
[
str
,
int
]:
check_id_types
(
metadata
)
added
=
self
.
_revision_intrinsic_metadata
.
add
(
metadata
,
conflict_update
)
return
{
"revision_intrinsic_metadata:add"
:
added
}
def
revision_intrinsic_metadata_delete
(
self
,
entries
:
List
[
Dict
])
->
Dict
:
deleted
=
self
.
_revision_intrinsic_metadata
.
delete
(
entries
)
return
{
"revision_intrinsic_metadata:del"
:
deleted
}
def
origin_intrinsic_metadata_get
(
self
,
ids
):
yield from
self
.
_origin_intrinsic_metadata
.
get
(
ids
)
def
origin_intrinsic_metadata_add
(
self
,
metadata
:
List
[
Dict
],
conflict_update
:
bool
=
False
)
->
Dict
[
str
,
int
]:
added
=
self
.
_origin_intrinsic_metadata
.
add
(
metadata
,
conflict_update
)
return
{
"origin_intrinsic_metadata:add"
:
added
}
def
origin_intrinsic_metadata_delete
(
self
,
entries
:
List
[
Dict
])
->
Dict
:
deleted
=
self
.
_origin_intrinsic_metadata
.
delete
(
entries
)
return
{
"origin_intrinsic_metadata:del"
:
deleted
}
def
origin_intrinsic_metadata_search_fulltext
(
self
,
conjunction
,
limit
=
100
):
# A very crude fulltext search implementation, but that's enough
# to work on English metadata
tokens_re
=
re
.
compile
(
"[a-zA-Z0-9]+"
)
search_tokens
=
list
(
itertools
.
chain
(
*
map
(
tokens_re
.
findall
,
conjunction
)))
def
rank
(
data
):
# Tokenize the metadata
text
=
json
.
dumps
(
data
[
"metadata"
])
text_tokens
=
tokens_re
.
findall
(
text
)
text_token_occurences
=
Counter
(
text_tokens
)
# Count the number of occurrences of search tokens in the text
score
=
0
for
search_token
in
search_tokens
:
if
text_token_occurences
[
search_token
]
==
0
:
# Search token is not in the text.
return
0
score
+=
text_token_occurences
[
search_token
]
# Normalize according to the text's length
return
score
/
math
.
log
(
len
(
text_tokens
))
results
=
[
(
rank
(
data
),
data
)
for
data
in
self
.
_origin_intrinsic_metadata
.
get_all
()
]
results
=
[(
rank_
,
data
)
for
(
rank_
,
data
)
in
results
if
rank_
>
0
]
results
.
sort
(
key
=
operator
.
itemgetter
(
0
),
reverse
=
True
# Don't try to order 'data'
)
for
(
rank_
,
result
)
in
results
[:
limit
]:
yield
result
def
origin_intrinsic_metadata_search_by_producer
(
self
,
page_token
=
""
,
limit
=
100
,
ids_only
=
False
,
mappings
=
None
,
tool_ids
=
None
):
assert
isinstance
(
page_token
,
str
)
nb_results
=
0
if
mappings
is
not
None
:
mappings
=
frozenset
(
mappings
)
if
tool_ids
is
not
None
:
tool_ids
=
frozenset
(
tool_ids
)
origins
=
[]
# we go to limit+1 to check whether we should add next_page_token in
# the response
for
entry
in
self
.
_origin_intrinsic_metadata
.
get_all
():
if
entry
[
"id"
]
<=
page_token
:
continue
if
nb_results
>=
(
limit
+
1
):
break
if
mappings
is
not
None
and
mappings
.
isdisjoint
(
entry
[
"mappings"
]):
continue
if
tool_ids
is
not
None
and
entry
[
"tool"
][
"id"
]
not
in
tool_ids
:
continue
origins
.
append
(
entry
)
nb_results
+=
1
result
=
{}
if
len
(
origins
)
>
limit
:
origins
=
origins
[:
limit
]
result
[
"next_page_token"
]
=
origins
[
-
1
][
"id"
]
if
ids_only
:
origins
=
[
origin
[
"id"
]
for
origin
in
origins
]
result
[
"origins"
]
=
origins
return
result
def
origin_intrinsic_metadata_stats
(
self
):
mapping_count
=
{
m
:
0
for
m
in
MAPPING_NAMES
}
total
=
non_empty
=
0
for
data
in
self
.
_origin_intrinsic_metadata
.
get_all
():
total
+=
1
if
set
(
data
[
"metadata"
])
-
{
"@context"
}:
non_empty
+=
1
for
mapping
in
data
[
"mappings"
]:
mapping_count
[
mapping
]
+=
1
return
{
"per_mapping"
:
mapping_count
,
"total"
:
total
,
"non_empty"
:
non_empty
}
def
indexer_configuration_add
(
self
,
tools
):
inserted
=
[]
for
tool
in
tools
:
tool
=
tool
.
copy
()
id_
=
self
.
_tool_key
(
tool
)
tool
[
"id"
]
=
id_
self
.
_tools
[
id_
]
=
tool
inserted
.
append
(
tool
)
return
inserted
def
indexer_configuration_get
(
self
,
tool
):
return
self
.
_tools
.
get
(
self
.
_tool_key
(
tool
))
def
_tool_key
(
self
,
tool
):
return
hash
(
(
tool
[
"tool_name"
],
tool
[
"tool_version"
],
json
.
dumps
(
tool
[
"tool_configuration"
],
sort_keys
=
True
),
)
)
File Metadata
Details
Attached
Mime Type
text/x-python
Expires
Fri, Jul 4, 2:30 PM (2 d, 6 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3341782
Attached To
rDCIDX Metadata indexer
Event Timeline
Log In to Comment