Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9344870
lister.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
6 KB
Subscribers
None
lister.py
View Options
# Copyright (C) 2018-2021 the Software Heritage developers
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from
dataclasses
import
asdict
,
dataclass
import
logging
from
typing
import
Any
,
Dict
,
Iterator
,
List
,
Optional
import
iso8601
import
requests
from
tenacity.before_sleep
import
before_sleep_log
from
swh.lister
import
USER_AGENT
from
swh.lister.pattern
import
CredentialsType
,
Lister
from
swh.lister.utils
import
throttling_retry
from
swh.scheduler.interface
import
SchedulerInterface
from
swh.scheduler.model
import
ListedOrigin
logger
=
logging
.
getLogger
(
__name__
)
@dataclass
class
NpmListerState
:
"""State of npm lister"""
last_seq
:
Optional
[
int
]
=
None
class
NpmLister
(
Lister
[
NpmListerState
,
List
[
Dict
[
str
,
Any
]]]):
"""
List all packages hosted on the npm registry.
The lister is based on the npm replication API powered by a
CouchDB database (https://docs.couchdb.org/en/stable/api/database/).
Args:
scheduler: a scheduler instance
page_size: number of packages info to return per page when querying npm API
incremental: defines if incremental listing should be used, in that case
only modified or new packages since last incremental listing operation
will be returned, otherwise all packages will be listed in lexicographical
order
"""
LISTER_NAME
=
"npm"
INSTANCE
=
"npm"
API_BASE_URL
=
"https://replicate.npmjs.com"
API_INCREMENTAL_LISTING_URL
=
f
"{API_BASE_URL}/_changes"
API_FULL_LISTING_URL
=
f
"{API_BASE_URL}/_all_docs"
PACKAGE_URL_TEMPLATE
=
"https://www.npmjs.com/package/{package_name}"
def
__init__
(
self
,
scheduler
:
SchedulerInterface
,
page_size
:
int
=
1000
,
incremental
:
bool
=
False
,
credentials
:
CredentialsType
=
None
,
):
super
()
.
__init__
(
scheduler
=
scheduler
,
credentials
=
credentials
,
url
=
self
.
API_INCREMENTAL_LISTING_URL
if
incremental
else
self
.
API_FULL_LISTING_URL
,
instance
=
self
.
INSTANCE
,
)
self
.
page_size
=
page_size
if
not
incremental
:
# in full listing mode, first package in each page corresponds to the one
# provided as the startkey query parameter value, so we increment the page
# size by one to avoid double package processing
self
.
page_size
+=
1
self
.
incremental
=
incremental
self
.
session
=
requests
.
Session
()
self
.
session
.
headers
.
update
(
{
"Accept"
:
"application/json"
,
"User-Agent"
:
USER_AGENT
}
)
def
state_from_dict
(
self
,
d
:
Dict
[
str
,
Any
])
->
NpmListerState
:
return
NpmListerState
(
**
d
)
def
state_to_dict
(
self
,
state
:
NpmListerState
)
->
Dict
[
str
,
Any
]:
return
asdict
(
state
)
def
request_params
(
self
,
last_package_id
:
str
)
->
Dict
[
str
,
Any
]:
# include package JSON document to get its last update date
params
=
{
"limit"
:
self
.
page_size
,
"include_docs"
:
"true"
}
if
self
.
incremental
:
params
[
"since"
]
=
last_package_id
else
:
params
[
"startkey"
]
=
last_package_id
return
params
@throttling_retry
(
before_sleep
=
before_sleep_log
(
logger
,
logging
.
WARNING
))
def
page_request
(
self
,
last_package_id
:
str
)
->
requests
.
Response
:
params
=
self
.
request_params
(
last_package_id
)
logger
.
debug
(
"Fetching URL
%s
with params
%s
"
,
self
.
url
,
params
)
response
=
self
.
session
.
get
(
self
.
url
,
params
=
params
)
if
response
.
status_code
!=
200
:
logger
.
warning
(
"Unexpected HTTP status code
%s
on
%s
:
%s
"
,
response
.
status_code
,
response
.
url
,
response
.
content
,
)
response
.
raise_for_status
()
return
response
def
get_pages
(
self
)
->
Iterator
[
List
[
Dict
[
str
,
Any
]]]:
last_package_id
:
str
=
"0"
if
self
.
incremental
else
'""'
if
(
self
.
incremental
and
self
.
state
is
not
None
and
self
.
state
.
last_seq
is
not
None
):
last_package_id
=
str
(
self
.
state
.
last_seq
)
while
True
:
response
=
self
.
page_request
(
last_package_id
)
data
=
response
.
json
()
page
=
data
[
"results"
]
if
self
.
incremental
else
data
[
"rows"
]
if
not
page
:
break
if
self
.
incremental
or
len
(
page
)
<
self
.
page_size
:
yield
page
else
:
yield
page
[:
-
1
]
if
len
(
page
)
<
self
.
page_size
:
break
last_package_id
=
(
str
(
page
[
-
1
][
"seq"
])
if
self
.
incremental
else
f
'"{page[-1]["id"]}"'
)
def
get_origins_from_page
(
self
,
page
:
List
[
Dict
[
str
,
Any
]]
)
->
Iterator
[
ListedOrigin
]:
"""Convert a page of Npm repositories into a list of ListedOrigin."""
assert
self
.
lister_obj
.
id
is
not
None
for
package
in
page
:
# no source code to archive here
if
not
package
[
"doc"
]
.
get
(
"versions"
,
{}):
continue
package_name
=
package
[
"doc"
][
"name"
]
package_latest_version
=
(
package
[
"doc"
]
.
get
(
"dist-tags"
,
{})
.
get
(
"latest"
,
""
)
)
last_update
=
None
if
package_latest_version
in
package
[
"doc"
]
.
get
(
"time"
,
{}):
last_update
=
iso8601
.
parse_date
(
package
[
"doc"
][
"time"
][
package_latest_version
]
)
yield
ListedOrigin
(
lister_id
=
self
.
lister_obj
.
id
,
url
=
self
.
PACKAGE_URL_TEMPLATE
.
format
(
package_name
=
package_name
),
visit_type
=
"npm"
,
last_update
=
last_update
,
)
def
commit_page
(
self
,
page
:
List
[
Dict
[
str
,
Any
]]):
"""Update the currently stored state using the latest listed page."""
if
self
.
incremental
:
last_package
=
page
[
-
1
]
last_seq
=
last_package
[
"seq"
]
if
self
.
state
.
last_seq
is
None
or
last_seq
>
self
.
state
.
last_seq
:
self
.
state
.
last_seq
=
last_seq
def
finalize
(
self
):
if
self
.
incremental
and
self
.
state
.
last_seq
is
not
None
:
scheduler_state
=
self
.
get_state_from_scheduler
()
if
(
scheduler_state
.
last_seq
is
None
or
self
.
state
.
last_seq
>
scheduler_state
.
last_seq
):
self
.
updated
=
True
File Metadata
Details
Attached
Mime Type
text/x-python
Expires
Fri, Jul 4, 2:52 PM (4 d, 8 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3241728
Attached To
rDLS Listers
Event Timeline
Log In to Comment