Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9345347
lister.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
15 KB
Subscribers
None
lister.py
View Options
# Copyright (C) 2021-2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from
dataclasses
import
asdict
,
dataclass
from
datetime
import
datetime
,
timezone
import
logging
import
re
from
typing
import
Any
,
Dict
,
Iterator
,
Optional
from
urllib.parse
import
urljoin
from
bs4
import
BeautifulSoup
import
lxml
import
requests
from
tenacity.before_sleep
import
before_sleep_log
from
swh.core.github.utils
import
GitHubSession
from
swh.lister.utils
import
throttling_retry
from
swh.scheduler.interface
import
SchedulerInterface
from
swh.scheduler.model
import
ListedOrigin
from
..
import
USER_AGENT
from
..pattern
import
CredentialsType
,
Lister
logger
=
logging
.
getLogger
(
__name__
)
RepoPage
=
Dict
[
str
,
Any
]
SUPPORTED_SCM_TYPES
=
(
"git"
,
"svn"
,
"hg"
,
"cvs"
,
"bzr"
)
@dataclass
class
MavenListerState
:
"""State of the MavenLister"""
last_seen_doc
:
int
=
-
1
"""Last doc ID ingested during an incremental pass
"""
last_seen_pom
:
int
=
-
1
"""Last doc ID related to a pom and ingested during
an incremental pass
"""
class
MavenLister
(
Lister
[
MavenListerState
,
RepoPage
]):
"""List origins from a Maven repository.
Maven Central provides artifacts for Java builds.
It includes POM files and source archives, which we download to get
the source code of artifacts and links to their scm repository.
This lister yields origins of types: git/svn/hg or whatever the Artifacts
use as repository type, plus maven types for the maven loader (tgz, jar)."""
LISTER_NAME
=
"maven"
def
__init__
(
self
,
scheduler
:
SchedulerInterface
,
url
:
str
,
index_url
:
str
=
None
,
instance
:
Optional
[
str
]
=
None
,
credentials
:
CredentialsType
=
None
,
incremental
:
bool
=
True
,
):
"""Lister class for Maven repositories.
Args:
url: main URL of the Maven repository, i.e. url of the base index
used to fetch maven artifacts. For Maven central use
https://repo1.maven.org/maven2/
index_url: the URL to download the exported text indexes from.
Would typically be a local host running the export docker image.
See README.md in this directory for more information.
instance: Name of maven instance. Defaults to url's network location
if unset.
incremental: bool, defaults to True. Defines if incremental listing
is activated or not.
"""
self
.
BASE_URL
=
url
self
.
INDEX_URL
=
index_url
self
.
incremental
=
incremental
super
()
.
__init__
(
scheduler
=
scheduler
,
credentials
=
credentials
,
url
=
url
,
instance
=
instance
,
)
self
.
session
=
requests
.
Session
()
self
.
session
.
headers
.
update
(
{
"Accept"
:
"application/json"
,
"User-Agent"
:
USER_AGENT
,
}
)
self
.
jar_origins
:
Dict
[
str
,
ListedOrigin
]
=
{}
self
.
github_session
=
GitHubSession
(
credentials
=
self
.
credentials
,
user_agent
=
USER_AGENT
)
def
state_from_dict
(
self
,
d
:
Dict
[
str
,
Any
])
->
MavenListerState
:
return
MavenListerState
(
**
d
)
def
state_to_dict
(
self
,
state
:
MavenListerState
)
->
Dict
[
str
,
Any
]:
return
asdict
(
state
)
@throttling_retry
(
before_sleep
=
before_sleep_log
(
logger
,
logging
.
WARNING
))
def
page_request
(
self
,
url
:
str
,
params
:
Dict
[
str
,
Any
])
->
requests
.
Response
:
logger
.
info
(
"Fetching URL
%s
with params
%s
"
,
url
,
params
)
response
=
self
.
session
.
get
(
url
,
params
=
params
)
if
response
.
status_code
!=
200
:
logger
.
warning
(
"Unexpected HTTP status code
%s
on
%s
:
%s
"
,
response
.
status_code
,
response
.
url
,
response
.
content
,
)
response
.
raise_for_status
()
return
response
def
get_pages
(
self
)
->
Iterator
[
RepoPage
]:
"""Retrieve and parse exported maven indexes to
identify all pom files and src archives.
"""
# Example of returned RepoPage's:
# [
# {
# "type": "maven",
# "url": "https://maven.xwiki.org/..-5.4.2-sources.jar",
# "time": 1626109619335,
# "gid": "org.xwiki.platform",
# "aid": "xwiki-platform-wikistream-events-xwiki",
# "version": "5.4.2"
# },
# {
# "type": "scm",
# "url": "scm:git:git://github.com/openengsb/openengsb-framework.git",
# "project": "openengsb-framework",
# },
# ...
# ]
# Download the main text index file.
logger
.
info
(
"Downloading computed index from
%s
."
,
self
.
INDEX_URL
)
assert
self
.
INDEX_URL
is
not
None
response
=
requests
.
get
(
self
.
INDEX_URL
,
stream
=
True
)
if
response
.
status_code
!=
200
:
logger
.
error
(
"Index
%s
not found, stopping"
,
self
.
INDEX_URL
)
response
.
raise_for_status
()
# Prepare regexes to parse index exports.
# Parse doc id.
# Example line: "doc 13"
re_doc
=
re
.
compile
(
r"^doc (?P<doc>\d+)$"
)
# Parse gid, aid, version, classifier, extension.
# Example line: " value al.aldi|sprova4j|0.1.0|sources|jar"
re_val
=
re
.
compile
(
r"^\s{4}value (?P<gid>[^|]+)\|(?P<aid>[^|]+)\|(?P<version>[^|]+)\|"
+
r"(?P<classifier>[^|]+)\|(?P<ext>[^|]+)$"
)
# Parse last modification time.
# Example line: " value jar|1626109619335|14316|2|2|0|jar"
re_time
=
re
.
compile
(
r"^\s{4}value ([^|]+)\|(?P<mtime>[^|]+)\|([^|]+)\|([^|]+)\|([^|]+)"
+
r"\|([^|]+)\|([^|]+)$"
)
# Read file line by line and process it
out_pom
:
Dict
=
{}
jar_src
:
Dict
=
{}
doc_id
:
int
=
0
jar_src
[
"doc"
]
=
None
url_src
=
None
iterator
=
response
.
iter_lines
(
chunk_size
=
1024
)
for
line_bytes
in
iterator
:
# Read the index text export and get URLs and SCMs.
line
=
line_bytes
.
decode
(
errors
=
"ignore"
)
m_doc
=
re_doc
.
match
(
line
)
if
m_doc
is
not
None
:
doc_id
=
int
(
m_doc
.
group
(
"doc"
))
# jar_src["doc"] contains the id of the current document, whatever
# its type (scm or jar).
jar_src
[
"doc"
]
=
doc_id
else
:
m_val
=
re_val
.
match
(
line
)
if
m_val
is
not
None
:
(
gid
,
aid
,
version
,
classifier
,
ext
)
=
m_val
.
groups
()
ext
=
ext
.
strip
()
path
=
"/"
.
join
(
gid
.
split
(
"."
))
if
classifier
==
"NA"
and
ext
.
lower
()
==
"pom"
:
# If incremental mode, we don't record any line that is
# before our last recorded doc id.
if
(
self
.
incremental
and
self
.
state
and
self
.
state
.
last_seen_pom
and
self
.
state
.
last_seen_pom
>=
doc_id
):
continue
url_path
=
f
"{path}/{aid}/{version}/{aid}-{version}.{ext}"
url_pom
=
urljoin
(
self
.
BASE_URL
,
url_path
,
)
out_pom
[
url_pom
]
=
doc_id
elif
(
classifier
.
lower
()
==
"sources"
or
(
"src"
in
classifier
)
)
and
ext
.
lower
()
in
(
"zip"
,
"jar"
):
url_path
=
(
f
"{path}/{aid}/{version}/{aid}-{version}-{classifier}.{ext}"
)
url_src
=
urljoin
(
self
.
BASE_URL
,
url_path
)
jar_src
[
"gid"
]
=
gid
jar_src
[
"aid"
]
=
aid
jar_src
[
"version"
]
=
version
else
:
m_time
=
re_time
.
match
(
line
)
if
m_time
is
not
None
and
url_src
is
not
None
:
time
=
m_time
.
group
(
"mtime"
)
jar_src
[
"time"
]
=
int
(
time
)
artifact_metadata_d
=
{
"type"
:
"maven"
,
"url"
:
url_src
,
**
jar_src
,
}
logger
.
debug
(
"* Yielding jar
%s
:
%s
"
,
url_src
,
artifact_metadata_d
)
yield
artifact_metadata_d
url_src
=
None
logger
.
info
(
"Found
%s
poms."
,
len
(
out_pom
))
# Now fetch pom files and scan them for scm info.
logger
.
info
(
"Fetching poms.."
)
for
pom
in
out_pom
:
try
:
response
=
self
.
page_request
(
pom
,
{})
parsed_pom
=
BeautifulSoup
(
response
.
content
,
"xml"
)
project
=
parsed_pom
.
find
(
"project"
)
if
project
is
None
:
continue
scm
=
project
.
find
(
"scm"
)
if
scm
is
not
None
:
connection
=
scm
.
find
(
"connection"
)
if
connection
is
not
None
:
artifact_metadata_d
=
{
"type"
:
"scm"
,
"doc"
:
out_pom
[
pom
],
"url"
:
connection
.
text
,
}
logger
.
debug
(
"* Yielding pom
%s
:
%s
"
,
pom
,
artifact_metadata_d
)
yield
artifact_metadata_d
else
:
logger
.
debug
(
"No scm.connection in pom
%s
"
,
pom
)
else
:
logger
.
debug
(
"No scm in pom
%s
"
,
pom
)
except
requests
.
HTTPError
:
logger
.
warning
(
"POM info page could not be fetched, skipping project '
%s
'"
,
pom
,
)
except
lxml
.
etree
.
Error
as
error
:
logger
.
info
(
"Could not parse POM
%s
XML:
%s
."
,
pom
,
error
)
def
get_scm
(
self
,
page
:
RepoPage
)
->
Optional
[
ListedOrigin
]:
"""Retrieve scm origin out of the page information. Only called when type of the
page is scm.
Try and detect an scm/vcs repository. Note that official format is in the form:
scm:{type}:git://example.org/{user}/{repo}.git but some projects directly put
the repo url (without the "scm:type"), so we have to check against the content
to extract the type and url properly.
Raises
AssertionError when the type of the page is not 'scm'
Returns
ListedOrigin with proper canonical scm url (for github) if any is found,
None otherwise.
"""
assert
page
[
"type"
]
==
"scm"
visit_type
:
Optional
[
str
]
=
None
url
:
Optional
[
str
]
=
None
m_scm
=
re
.
match
(
r"^scm:(?P<type>[^:]+):(?P<url>.*)$"
,
page
[
"url"
])
if
m_scm
is
None
:
return
None
scm_type
=
m_scm
.
group
(
"type"
)
if
scm_type
and
scm_type
in
SUPPORTED_SCM_TYPES
:
url
=
m_scm
.
group
(
"url"
)
visit_type
=
scm_type
elif
page
[
"url"
]
.
endswith
(
".git"
):
url
=
page
[
"url"
]
.
lstrip
(
"scm:"
)
visit_type
=
"git"
else
:
return
None
if
url
and
visit_type
==
"git"
:
# Non-github urls will be returned as is, github ones will be canonical ones
url
=
self
.
github_session
.
get_canonical_url
(
url
)
if
not
url
:
return
None
assert
visit_type
is
not
None
assert
self
.
lister_obj
.
id
is
not
None
return
ListedOrigin
(
lister_id
=
self
.
lister_obj
.
id
,
url
=
url
,
visit_type
=
visit_type
,
)
def
get_origins_from_page
(
self
,
page
:
RepoPage
)
->
Iterator
[
ListedOrigin
]:
"""Convert a page of Maven repositories into a list of ListedOrigins."""
if
page
[
"type"
]
==
"scm"
:
listed_origin
=
self
.
get_scm
(
page
)
if
listed_origin
:
yield
listed_origin
else
:
# Origin is gathering source archives:
last_update_dt
=
None
last_update_iso
=
""
try
:
last_update_seconds
=
str
(
page
[
"time"
])[:
-
3
]
last_update_dt
=
datetime
.
fromtimestamp
(
int
(
last_update_seconds
))
last_update_dt
=
last_update_dt
.
astimezone
(
timezone
.
utc
)
except
(
OverflowError
,
ValueError
):
logger
.
warning
(
"- Failed to convert datetime
%s
."
,
last_update_seconds
)
if
last_update_dt
:
last_update_iso
=
last_update_dt
.
isoformat
()
# Origin URL will target page holding sources for all versions of
# an artifactId (package name) inside a groupId (namespace)
path
=
"/"
.
join
(
page
[
"gid"
]
.
split
(
"."
))
origin_url
=
urljoin
(
self
.
BASE_URL
,
f
"{path}/{page['aid']}"
)
artifact
=
{
**
{
k
:
v
for
k
,
v
in
page
.
items
()
if
k
!=
"doc"
},
"time"
:
last_update_iso
,
"base_url"
:
self
.
BASE_URL
,
}
if
origin_url
not
in
self
.
jar_origins
:
# Create ListedOrigin instance if we did not see that origin yet
assert
self
.
lister_obj
.
id
is
not
None
jar_origin
=
ListedOrigin
(
lister_id
=
self
.
lister_obj
.
id
,
url
=
origin_url
,
visit_type
=
page
[
"type"
],
last_update
=
last_update_dt
,
extra_loader_arguments
=
{
"artifacts"
:
[
artifact
]},
)
self
.
jar_origins
[
origin_url
]
=
jar_origin
else
:
# Update list of source artifacts for that origin otherwise
jar_origin
=
self
.
jar_origins
[
origin_url
]
artifacts
=
jar_origin
.
extra_loader_arguments
[
"artifacts"
]
if
artifact
not
in
artifacts
:
artifacts
.
append
(
artifact
)
if
(
jar_origin
.
last_update
and
last_update_dt
and
last_update_dt
>
jar_origin
.
last_update
):
jar_origin
.
last_update
=
last_update_dt
if
not
self
.
incremental
or
(
self
.
state
and
page
[
"doc"
]
>
self
.
state
.
last_seen_doc
):
# Yield origin with updated source artifacts, multiple instances of
# ListedOrigin for the same origin URL but with different artifacts
# list will be sent to the scheduler but it will deduplicate them and
# take the latest one to upsert in database
yield
jar_origin
def
commit_page
(
self
,
page
:
RepoPage
)
->
None
:
"""Update currently stored state using the latest listed doc.
Note: this is a noop for full listing mode
"""
if
self
.
incremental
and
self
.
state
:
# We need to differentiate the two state counters according
# to the type of origin.
if
page
[
"type"
]
==
"maven"
and
page
[
"doc"
]
>
self
.
state
.
last_seen_doc
:
self
.
state
.
last_seen_doc
=
page
[
"doc"
]
elif
page
[
"type"
]
==
"scm"
and
page
[
"doc"
]
>
self
.
state
.
last_seen_pom
:
self
.
state
.
last_seen_doc
=
page
[
"doc"
]
self
.
state
.
last_seen_pom
=
page
[
"doc"
]
def
finalize
(
self
)
->
None
:
"""Finalize the lister state, set update if any progress has been made.
Note: this is a noop for full listing mode
"""
if
self
.
incremental
and
self
.
state
:
last_seen_doc
=
self
.
state
.
last_seen_doc
last_seen_pom
=
self
.
state
.
last_seen_pom
scheduler_state
=
self
.
get_state_from_scheduler
()
if
last_seen_doc
and
last_seen_pom
:
if
(
scheduler_state
.
last_seen_doc
<
last_seen_doc
)
or
(
scheduler_state
.
last_seen_pom
<
last_seen_pom
):
self
.
updated
=
True
File Metadata
Details
Attached
Mime Type
text/x-python
Expires
Fri, Jul 4, 3:19 PM (4 d, 15 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3354779
Attached To
rDLS Listers
Event Timeline
Log In to Comment