Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F8392362
test_journal_client.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
5 KB
Subscribers
None
test_journal_client.py
View Options
# Copyright (C) 2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from
collections
import
Counter
from
typing
import
Dict
,
Optional
from
urllib.parse
import
urlparse
from
hypothesis
import
given
from
hypothesis.strategies
import
lists
import
msgpack
from
swh.counters.journal_client
import
(
process_journal_messages
,
process_origins
,
process_releases
,
process_revisions
,
)
from
swh.counters.redis
import
Redis
from
swh.model.hashutil
import
hash_to_bytes
from
swh.model.hypothesis_strategies
import
origins
from
swh.model.model
import
(
ObjectType
,
Origin
,
Person
,
Release
,
Revision
,
RevisionType
,
Timestamp
,
TimestampWithTimezone
,
)
DATE
=
TimestampWithTimezone
(
timestamp
=
Timestamp
(
seconds
=
0
,
microseconds
=
0
),
offset
=
0
,
negative_utc
=
False
)
def
_create_release
(
author_fullname
:
Optional
[
str
])
->
Dict
:
"""Use Release.to_dict to be sure the field's name used to retrieve
the author is correct"""
author
=
None
if
author_fullname
:
author
=
Person
(
fullname
=
bytes
(
author_fullname
,
"utf-8"
),
name
=
None
,
email
=
None
)
release
=
Release
(
name
=
b
"Release"
,
message
=
b
"Message"
,
target
=
hash_to_bytes
(
"34973274ccef6ab4dfaaf86599792fa9c3fe4689"
),
target_type
=
ObjectType
.
CONTENT
,
synthetic
=
True
,
author
=
author
,
)
return
release
.
to_dict
()
def
_create_revision
(
author_fullname
:
str
,
committer_fullname
:
str
)
->
Dict
:
"""Use Revision.to_dict to be sure the names of the fields used to retrieve
the author and the committer are correct"""
revision
=
Revision
(
committer_date
=
DATE
,
date
=
None
,
type
=
RevisionType
.
GIT
,
parents
=
(),
directory
=
hash_to_bytes
(
"34973274ccef6ab4dfaaf86599792fa9c3fe4689"
),
synthetic
=
True
,
message
=
None
,
author
=
Person
(
fullname
=
bytes
(
author_fullname
,
"utf-8"
),
name
=
None
,
email
=
None
),
committer
=
Person
(
fullname
=
bytes
(
committer_fullname
,
"utf-8"
),
name
=
None
,
email
=
None
),
)
return
revision
.
to_dict
()
RELEASES
=
{
rel
[
"id"
]:
msgpack
.
dumps
(
rel
)
for
rel
in
[
_create_release
(
author_fullname
=
"author 1"
),
_create_release
(
author_fullname
=
"author 2"
),
_create_release
(
author_fullname
=
None
),
]
}
RELEASES_AUTHOR_FULLNAMES
=
{
b
"author 1"
,
b
"author 2"
}
REVISIONS
=
{
rev
[
"id"
]:
msgpack
.
dumps
(
rev
)
for
rev
in
[
_create_revision
(
author_fullname
=
"author 1"
,
committer_fullname
=
"committer 1"
),
_create_revision
(
author_fullname
=
"author 2"
,
committer_fullname
=
"committer 2"
),
_create_revision
(
author_fullname
=
"author 2"
,
committer_fullname
=
"committer 1"
),
_create_revision
(
author_fullname
=
"author 1"
,
committer_fullname
=
"committer 2"
),
]
}
REVISIONS_AUTHOR_FULLNAMES
=
{
b
"author 1"
,
b
"author 2"
}
REVISIONS_COMMITTER_FULLNAMES
=
{
b
"committer 1"
,
b
"committer 2"
}
REVISIONS_PERSON_FULLNAMES
=
REVISIONS_AUTHOR_FULLNAMES
|
REVISIONS_COMMITTER_FULLNAMES
def
test_journal_client_all_keys
(
local_redis_host
):
redis
=
Redis
(
host
=
local_redis_host
)
keys
=
{
"coll1"
:
{
b
"key1"
:
b
"value1"
,
b
"key2"
:
b
"value2"
},
"coll2"
:
{
b
"key3"
:
b
"value3"
,
b
"key4"
:
b
"value4"
,
b
"key5"
:
b
"value5"
},
}
process_journal_messages
(
messages
=
keys
,
counters
=
redis
)
assert
redis
.
get_counts
(
redis
.
get_counters
())
==
{
b
"coll1"
:
2
,
b
"coll2"
:
3
}
def
test_journal_client_process_revisions
(
local_redis_host
):
redis
=
Redis
(
host
=
local_redis_host
)
process_revisions
(
REVISIONS
,
redis
)
assert
redis
.
get_counts
(
redis
.
get_counters
())
==
{
b
"person"
:
len
(
REVISIONS_PERSON_FULLNAMES
)
}
def
test_journal_client_process_releases
(
local_redis_host
):
redis
=
Redis
(
host
=
local_redis_host
)
process_releases
(
RELEASES
,
redis
)
assert
redis
.
get_counts
(
redis
.
get_counters
())
==
{
b
"person"
:
len
(
RELEASES_AUTHOR_FULLNAMES
)
}
def
test_journal_client_process_releases_without_authors
(
local_redis_host
):
releases
=
{
rel
[
"id"
]:
msgpack
.
dumps
(
rel
)
for
rel
in
[
_create_release
(
author_fullname
=
None
),
_create_release
(
author_fullname
=
None
),
]
}
redis
=
Redis
(
host
=
local_redis_host
)
process_releases
(
releases
,
redis
)
assert
redis
.
get_counts
(
redis
.
get_counters
())
==
{}
def
test_journal_client_process_origins
(
local_redis_host
,
redisdb
):
# hypothesis does not play well with pytest function scoped fixtures
# so we use an inner test function as workaround
@given
(
lists
(
origins
()))
def
inner
(
origins
):
origins_
=
{
msgpack
.
dumps
(
origin
.
to_dict
()):
msgpack
.
dumps
(
origin
.
to_dict
())
for
origin
in
origins
}
# explicitly flush redis db content due to hypothesis use
redisdb
.
flushall
()
redis
=
Redis
(
host
=
local_redis_host
)
process_origins
(
origins_
,
redis
)
expected_counts
=
Counter
(
[
f
"origin_netloc:{urlparse(origin.url).netloc}"
.
encode
()
for
origin
in
set
(
origins
)
]
)
assert
redis
.
get_counts
(
redis
.
get_counters
())
==
expected_counts
inner
()
def
test_journal_client_process_googlecode_origins
(
local_redis_host
):
origins
=
[
Origin
(
url
=
"https://foo.googlecode.com"
),
Origin
(
url
=
"https://bar.googlecode.com"
),
]
origins_
=
{
msgpack
.
dumps
(
origin
.
to_dict
()):
msgpack
.
dumps
(
origin
.
to_dict
())
for
origin
in
origins
}
redis
=
Redis
(
host
=
local_redis_host
)
process_origins
(
origins_
,
redis
)
assert
redis
.
get_counts
(
redis
.
get_counters
())
==
{
b
"origin_netloc:googlecode.com"
:
2
}
File Metadata
Details
Attached
Mime Type
text/x-python
Expires
Jun 4 2025, 6:58 PM (10 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3360945
Attached To
rDCNT Archive counters
Event Timeline
Log In to Comment