Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9313610
loader.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
18 KB
Subscribers
None
loader.py
View Options
# Copyright (C) 2015-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""Loader in charge of injecting either new or existing cvs repositories to
swh-storage.
"""
from
datetime
import
datetime
import
os
import
subprocess
import
tempfile
import
time
from
typing
import
Iterator
,
List
,
Optional
,
Sequence
,
Tuple
from
urllib3.util
import
parse_url
from
swh.loader.core.loader
import
BaseLoader
from
swh.loader.core.utils
import
clean_dangling_folders
from
swh.loader.cvs.cvs2gitdump.cvs2gitdump
import
(
CHANGESET_FUZZ_SEC
,
ChangeSetKey
,
CvsConv
,
RcsKeywords
,
file_path
,
)
import
swh.loader.cvs.cvsclient
as
cvsclient
import
swh.loader.cvs.rcsparse
as
rcsparse
from
swh.loader.cvs.rlog
import
RlogConv
from
swh.loader.exception
import
NotFound
from
swh.model
import
from_disk
,
hashutil
from
swh.model.model
import
(
Content
,
Directory
,
Origin
,
Person
,
Revision
,
RevisionType
,
SkippedContent
,
Snapshot
,
SnapshotBranch
,
TargetType
,
TimestampWithTimezone
,
)
from
swh.storage.interface
import
StorageInterface
DEFAULT_BRANCH
=
b
"HEAD"
TEMPORARY_DIR_PREFIX_PATTERN
=
"swh.loader.cvs."
class
CvsLoader
(
BaseLoader
):
"""Swh cvs loader.
The repository is local. The loader deals with
update on an already previously loaded repository.
"""
visit_type
=
"cvs"
def
__init__
(
self
,
storage
:
StorageInterface
,
url
:
str
,
origin_url
:
Optional
[
str
]
=
None
,
visit_date
:
Optional
[
datetime
]
=
None
,
cvsroot_path
:
Optional
[
str
]
=
None
,
temp_directory
:
str
=
"/tmp"
,
max_content_size
:
Optional
[
int
]
=
None
,
):
super
()
.
__init__
(
storage
=
storage
,
logging_class
=
"swh.loader.cvs.CvsLoader"
,
max_content_size
=
max_content_size
,
)
self
.
cvsroot_url
=
url
# origin url as unique identifier for origin in swh archive
self
.
origin_url
=
origin_url
if
origin_url
else
self
.
cvsroot_url
self
.
temp_directory
=
temp_directory
self
.
done
=
False
self
.
cvs_module_name
=
None
# XXX At present changeset IDs are recomputed on the fly during every visit.
# If we were able to maintain a cached somewhere which can be indexed by a
# cvs2gitdump.ChangeSetKey and yields an SWH revision hash we could avoid
# doing a lot of redundant work during every visit.
self
.
cvs_changesets
=
None
# remote CVS repository access (history is parsed from CVS rlog):
self
.
cvsclient
=
None
self
.
rlog_file
=
None
# internal state used to store swh objects
self
.
_contents
:
List
[
Content
]
=
[]
self
.
_skipped_contents
:
List
[
SkippedContent
]
=
[]
self
.
_directories
:
List
[
Directory
]
=
[]
self
.
_revisions
:
List
[
Revision
]
=
[]
self
.
swh_revision_gen
=
None
# internal state, current visit
self
.
_last_revision
=
None
self
.
_visit_status
=
"full"
self
.
_load_status
=
"uneventful"
self
.
visit_date
=
visit_date
self
.
cvsroot_path
=
cvsroot_path
self
.
snapshot
=
None
def
compute_swh_revision
(
self
,
k
,
logmsg
):
"""Compute swh hash data per CVS changeset.
Returns:
tuple (rev, swh_directory)
- rev: current SWH revision computed from checked out work tree
- swh_directory: dictionary of path, swh hash data with type
"""
# Compute SWH revision from the on-disk state
swh_dir
=
from_disk
.
Directory
.
from_disk
(
path
=
os
.
fsencode
(
self
.
worktree_path
))
if
self
.
_last_revision
:
parents
=
(
self
.
_last_revision
.
id
,)
else
:
parents
=
()
revision
=
self
.
build_swh_revision
(
k
,
logmsg
,
swh_dir
.
hash
,
parents
)
self
.
log
.
debug
(
"SWH revision ID:
%s
"
%
hashutil
.
hash_to_hex
(
revision
.
id
))
self
.
_last_revision
=
revision
if
self
.
_load_status
==
"uneventful"
:
# We have an eventful load if this revision is not already
# present in the archive
if
not
self
.
storage
.
revision_get
([
revision
.
id
])[
0
]:
self
.
_load_status
=
"eventful"
return
(
revision
,
swh_dir
)
def
swh_hash_data_per_cvs_changeset
(
self
):
"""Compute SWH revision and swh hash data for a CVS changeset.
Yields:
tuple (rev, swh_directory)
- rev: current SWH revision computed from checked out work tree
- swh_directory: dictionary of path, swh hash data with type
"""
for
k
in
self
.
cvs_changesets
:
tstr
=
time
.
strftime
(
"
%c
"
,
time
.
gmtime
(
k
.
max_time
))
self
.
log
.
info
(
"changeset from
%s
by
%s
on branch
%s
"
,
tstr
,
k
.
author
,
k
.
branch
)
logmsg
=
""
# Check out the on-disk state of this revision
for
f
in
k
.
revs
:
rcsfile
=
None
path
=
file_path
(
self
.
cvsroot_path
,
f
.
path
)
wtpath
=
os
.
path
.
join
(
self
.
worktree_path
,
path
)
self
.
log
.
info
(
"rev
%s
of file
%s
"
%
(
f
.
rev
,
f
.
path
))
if
not
logmsg
:
rcsfile
=
rcsparse
.
rcsfile
(
f
.
path
)
logmsg
=
rcsfile
.
getlog
(
k
.
revs
[
0
]
.
rev
)
if
f
.
state
==
"dead"
:
# remove this file from work tree
try
:
os
.
remove
(
wtpath
)
except
FileNotFoundError
:
pass
else
:
# create, or update, this file in the work tree
if
not
rcsfile
:
rcsfile
=
rcsparse
.
rcsfile
(
f
.
path
)
rcs
=
RcsKeywords
()
contents
=
rcs
.
expand_keyword
(
f
.
path
,
rcsfile
,
f
.
rev
)
try
:
outfile
=
open
(
wtpath
,
mode
=
"wb"
)
except
FileNotFoundError
:
os
.
makedirs
(
os
.
path
.
dirname
(
wtpath
))
outfile
=
open
(
wtpath
,
mode
=
"wb"
)
outfile
.
write
(
contents
)
outfile
.
close
()
(
revision
,
swh_dir
)
=
self
.
compute_swh_revision
(
k
,
logmsg
)
yield
revision
,
swh_dir
def
swh_hash_data_per_cvs_rlog_changeset
(
self
):
"""Compute swh hash data per CVS rlog changeset.
Yields:
tuple (rev, swh_directory)
- rev: current SWH revision computed from checked out work tree
- swh_directory: dictionary of path, swh hash data with type
"""
for
k
in
self
.
cvs_changesets
:
tstr
=
time
.
strftime
(
"
%c
"
,
time
.
gmtime
(
k
.
max_time
))
self
.
log
.
info
(
"changeset from
%s
by
%s
on branch
%s
"
,
tstr
,
k
.
author
,
k
.
branch
)
logmsg
=
""
# Check out the on-disk state of this revision
for
f
in
k
.
revs
:
path
=
file_path
(
self
.
cvsroot_path
,
f
.
path
)
wtpath
=
os
.
path
.
join
(
self
.
worktree_path
,
path
)
self
.
log
.
info
(
"rev
%s
of file
%s
"
%
(
f
.
rev
,
f
.
path
))
if
not
logmsg
:
logmsg
=
self
.
rlog
.
getlog
(
self
.
rlog_file
,
f
.
path
,
k
.
revs
[
0
]
.
rev
)
self
.
log
.
debug
(
"f.state is
%s
\n
"
%
f
.
state
)
if
f
.
state
==
"dead"
:
# remove this file from work tree
try
:
os
.
remove
(
wtpath
)
except
FileNotFoundError
:
pass
else
:
dirname
=
os
.
path
.
dirname
(
wtpath
)
try
:
os
.
makedirs
(
dirname
)
except
FileExistsError
:
pass
self
.
log
.
debug
(
"checkout to
%s
\n
"
%
wtpath
)
fp
=
self
.
cvsclient
.
checkout
(
f
.
path
,
f
.
rev
,
dirname
)
os
.
rename
(
fp
.
name
,
wtpath
)
try
:
fp
.
close
()
except
FileNotFoundError
:
# Well, we have just renamed the file...
pass
# TODO: prune empty directories?
(
revision
,
swh_dir
)
=
self
.
compute_swh_revision
(
k
,
logmsg
)
yield
revision
,
swh_dir
def
process_cvs_changesets
(
self
,
)
->
Iterator
[
Tuple
[
List
[
Content
],
List
[
SkippedContent
],
List
[
Directory
],
Revision
]
]:
"""Process CVS revisions.
At each CVS revision, check out contents and compute swh hashes.
Yields:
tuple (contents, skipped-contents, directories, revision) of dict as a
dictionary with keys, sha1_git, sha1, etc...
"""
for
swh_revision
,
swh_dir
in
self
.
swh_hash_data_per_cvs_changeset
():
# Send the associated contents/directories
(
_contents
,
_skipped_contents
,
_directories
)
=
from_disk
.
iter_directory
(
swh_dir
)
yield
_contents
,
_skipped_contents
,
_directories
,
swh_revision
def
process_cvs_rlog_changesets
(
self
,
)
->
Iterator
[
Tuple
[
List
[
Content
],
List
[
SkippedContent
],
List
[
Directory
],
Revision
]
]:
"""Process CVS rlog revisions.
At each CVS revision, check out contents and compute swh hashes.
Yields:
tuple (contents, skipped-contents, directories, revision) of dict as a
dictionary with keys, sha1_git, sha1, etc...
"""
for
swh_revision
,
swh_dir
in
self
.
swh_hash_data_per_cvs_rlog_changeset
():
# Send the associated contents/directories
(
_contents
,
_skipped_contents
,
_directories
)
=
from_disk
.
iter_directory
(
swh_dir
)
yield
_contents
,
_skipped_contents
,
_directories
,
swh_revision
def
prepare_origin_visit
(
self
):
self
.
origin
=
Origin
(
url
=
self
.
origin_url
if
self
.
origin_url
else
self
.
cvsroot_url
)
def
pre_cleanup
(
self
):
"""Cleanup potential dangling files from prior runs (e.g. OOM killed
tasks)
"""
clean_dangling_folders
(
self
.
temp_directory
,
pattern_check
=
TEMPORARY_DIR_PREFIX_PATTERN
,
log
=
self
.
log
,
)
def
cleanup
(
self
):
self
.
log
.
info
(
"cleanup"
)
def
fetch_cvs_repo_with_rsync
(
self
,
host
,
path
):
# URL *must* end with a trailing slash in order to get CVSROOT listed
url
=
"rsync://
%s%s
/"
%
(
host
,
os
.
path
.
dirname
(
path
))
rsync
=
subprocess
.
run
([
"rsync"
,
url
],
capture_output
=
True
,
encoding
=
"ascii"
)
rsync
.
check_returncode
()
have_cvsroot
=
False
have_module
=
False
for
line
in
rsync
.
stdout
.
split
(
"
\n
"
):
self
.
log
.
debug
(
"rsync server:
%s
"
%
line
)
if
line
.
endswith
(
" CVSROOT"
):
have_cvsroot
=
True
elif
line
.
endswith
(
"
%s
"
%
self
.
cvs_module_name
):
have_module
=
True
if
have_module
and
have_cvsroot
:
break
if
not
have_module
:
raise
NotFound
(
"CVS module
%s
not found at
%s
"
%
(
self
.
cvs_module_name
,
url
)
)
if
not
have_cvsroot
:
raise
NotFound
(
"No CVSROOT directory found at
%s
"
%
url
)
rsync
=
subprocess
.
run
([
"rsync"
,
"-a"
,
url
,
self
.
cvsroot_path
])
rsync
.
check_returncode
()
def
prepare
(
self
):
self
.
_last_revision
=
None
self
.
_load_status
=
"uneventful"
self
.
swh_revision_gen
=
None
if
not
self
.
cvsroot_path
:
self
.
cvsroot_path
=
tempfile
.
mkdtemp
(
suffix
=
"-
%s
"
%
os
.
getpid
(),
prefix
=
TEMPORARY_DIR_PREFIX_PATTERN
,
dir
=
self
.
temp_directory
,
)
self
.
worktree_path
=
tempfile
.
mkdtemp
(
suffix
=
"-
%s
"
%
os
.
getpid
(),
prefix
=
TEMPORARY_DIR_PREFIX_PATTERN
,
dir
=
self
.
temp_directory
,
)
url
=
parse_url
(
self
.
origin_url
)
self
.
log
.
debug
(
"prepare; origin_url=
%s
scheme=
%s
path=
%s
"
%
(
self
.
origin_url
,
url
.
scheme
,
url
.
path
)
)
if
not
url
.
path
:
raise
NotFound
(
"Invalid CVS origin URL '
%s
'"
%
self
.
origin_url
)
self
.
cvs_module_name
=
os
.
path
.
basename
(
url
.
path
)
os
.
mkdir
(
os
.
path
.
join
(
self
.
worktree_path
,
self
.
cvs_module_name
))
if
url
.
scheme
==
"file"
:
if
not
os
.
path
.
exists
(
url
.
path
):
raise
NotFound
elif
url
.
scheme
==
"rsync"
:
self
.
fetch_cvs_repo_with_rsync
(
url
.
host
,
url
.
path
)
if
url
.
scheme
==
"file"
or
url
.
scheme
==
"rsync"
:
# local CVS repository conversion
have_rcsfile
=
False
have_cvsroot
=
False
for
root
,
dirs
,
files
in
os
.
walk
(
self
.
cvsroot_path
):
if
"CVSROOT"
in
dirs
:
have_cvsroot
=
True
dirs
.
remove
(
"CVSROOT"
)
continue
for
f
in
files
:
filepath
=
os
.
path
.
join
(
root
,
f
)
if
f
[
-
2
:]
==
",v"
:
try
:
rcsfile
=
rcsparse
.
rcsfile
(
filepath
)
# noqa: F841
except
(
Exception
):
raise
else
:
self
.
log
.
debug
(
"Looks like we have data to convert; "
"found a valid RCS file at
%s
"
%
filepath
)
have_rcsfile
=
True
break
if
have_rcsfile
:
break
if
not
have_rcsfile
:
raise
NotFound
(
"Directory
%s
does not contain any valid RCS files
%s
"
%
self
.
cvsroot_path
)
if
not
have_cvsroot
:
self
.
log
.
warn
(
"The CVS repository at '
%s
' lacks a CVSROOT directory; "
"we might be ingesting an incomplete copy of the repository"
%
self
.
cvsroot_path
)
# Unfortunately, there is no way to convert CVS history in an
# iterative fashion because the data is not indexed by any kind
# of changeset ID. We need to walk the history of each and every
# RCS file in the repository during every visit, even if no new
# changes will be added to the SWH archive afterwards.
# "CVS’s repository is the software equivalent of a telephone book
# sorted by telephone number."
# https://corecursive.com/software-that-doesnt-suck-with-jim-blandy/
#
# An implicit assumption made here is that self.cvs_changesets will
# fit into memory in its entirety. If it won't fit then the CVS walker
# will need to be modified such that it spools the list of changesets
# to disk instead.
cvs
=
CvsConv
(
self
.
cvsroot_path
,
RcsKeywords
(),
False
,
CHANGESET_FUZZ_SEC
)
self
.
log
.
info
(
"Walking CVS module
%s
"
,
self
.
cvs_module_name
)
cvs
.
walk
(
self
.
cvs_module_name
)
self
.
cvs_changesets
=
sorted
(
cvs
.
changesets
)
self
.
log
.
info
(
"CVS changesets found in
%s
:
%d
"
%
(
self
.
cvs_module_name
,
len
(
self
.
cvs_changesets
))
)
self
.
swh_revision_gen
=
self
.
process_cvs_changesets
()
elif
url
.
scheme
==
"pserver"
or
url
.
scheme
==
"fake"
or
url
.
scheme
==
"ssh"
:
# remote CVS repository conversion
self
.
cvsclient
=
cvsclient
.
CVSClient
(
url
)
cvsroot_path
=
os
.
path
.
dirname
(
url
.
path
)
self
.
log
.
info
(
"Fetching CVS rlog from
%s
:
%s
/
%s
"
,
url
.
host
,
cvsroot_path
,
self
.
cvs_module_name
,
)
self
.
rlog
=
RlogConv
(
cvsroot_path
,
CHANGESET_FUZZ_SEC
)
self
.
rlog_file
=
self
.
cvsclient
.
fetch_rlog
()
self
.
rlog
.
parse_rlog
(
self
.
rlog_file
)
self
.
cvs_changesets
=
sorted
(
self
.
rlog
.
changesets
)
self
.
log
.
info
(
"CVS changesets found for
%s
:
%d
"
%
(
self
.
cvs_module_name
,
len
(
self
.
cvs_changesets
))
)
self
.
swh_revision_gen
=
self
.
process_cvs_rlog_changesets
()
else
:
raise
NotFound
(
"Invalid CVS origin URL '
%s
'"
%
self
.
origin_url
)
def
fetch_data
(
self
):
"""Fetch the next CVS revision."""
try
:
data
=
next
(
self
.
swh_revision_gen
)
except
StopIteration
:
return
False
except
Exception
as
e
:
self
.
log
.
exception
(
e
)
return
False
# Stopping iteration
self
.
_contents
,
self
.
_skipped_contents
,
self
.
_directories
,
rev
=
data
self
.
_revisions
=
[
rev
]
return
True
def
build_swh_revision
(
self
,
k
:
ChangeSetKey
,
logmsg
:
bytes
,
dir_id
:
bytes
,
parents
:
Sequence
[
bytes
]
)
->
Revision
:
"""Given a CVS revision, build a swh revision.
Args:
k: changeset data
logmsg: the changeset's log message
dir_id: the tree's hash identifier
parents: the revision's parents identifier
Returns:
The swh revision dictionary.
"""
author
=
Person
.
from_fullname
(
k
.
author
.
encode
(
"UTF-8"
))
date
=
TimestampWithTimezone
.
from_datetime
(
k
.
max_time
)
return
Revision
(
type
=
RevisionType
.
CVS
,
date
=
date
,
committer_date
=
date
,
directory
=
dir_id
,
message
=
logmsg
,
author
=
author
,
committer
=
author
,
synthetic
=
True
,
extra_headers
=
[],
parents
=
tuple
(
parents
),
)
def
generate_and_load_snapshot
(
self
,
revision
)
->
Snapshot
:
"""Create the snapshot either from existing revision.
Args:
revision (dict): Last revision seen if any (None by default)
Returns:
Optional[Snapshot] The newly created snapshot
"""
snap
=
Snapshot
(
branches
=
{
DEFAULT_BRANCH
:
SnapshotBranch
(
target
=
revision
.
id
,
target_type
=
TargetType
.
REVISION
)
}
)
self
.
log
.
debug
(
"snapshot:
%s
"
%
snap
)
self
.
storage
.
snapshot_add
([
snap
])
return
snap
def
store_data
(
self
):
"Add our current CVS changeset to the archive."
self
.
storage
.
skipped_content_add
(
self
.
_skipped_contents
)
self
.
storage
.
content_add
(
self
.
_contents
)
self
.
storage
.
directory_add
(
self
.
_directories
)
self
.
storage
.
revision_add
(
self
.
_revisions
)
self
.
snapshot
=
self
.
generate_and_load_snapshot
(
self
.
_last_revision
)
self
.
log
.
debug
(
"SWH snapshot ID:
%s
"
%
hashutil
.
hash_to_hex
(
self
.
snapshot
.
id
))
self
.
flush
()
self
.
loaded_snapshot_id
=
self
.
snapshot
.
id
self
.
_skipped_contents
=
[]
self
.
_contents
=
[]
self
.
_directories
=
[]
self
.
_revisions
=
[]
def
load_status
(
self
):
return
{
"status"
:
self
.
_load_status
,
}
def
visit_status
(
self
):
return
self
.
_visit_status
File Metadata
Details
Attached
Mime Type
text/x-python
Expires
Thu, Jul 3, 11:47 AM (3 d, 15 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3365115
Attached To
rDLDCVS CVS Loader
Event Timeline
Log In to Comment