Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/cvs/rlog.py
Show First 20 Lines • Show All 43 Lines • ▼ Show 20 Lines | |||||
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF | # ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF | ||||
# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. | # OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. | ||||
import calendar | import calendar | ||||
import re | import re | ||||
import time | import time | ||||
from typing import BinaryIO, Dict, List, NamedTuple, Optional, Tuple | from typing import BinaryIO, Dict, List, NamedTuple, Optional, Tuple | ||||
from swh.loader.cvs.cvs2gitdump.cvs2gitdump import ChangeSetKey, file_path | from swh.loader.cvs.cvs2gitdump.cvs2gitdump import ChangeSetKey | ||||
# There is no known encoding of path names in CVS. The actual encoding used | # There is no known encoding of path names in CVS. The actual encoding used | ||||
# will depend on the CVS server's operating system and perhaps even the | # will depend on the CVS server's operating system and perhaps even the | ||||
# underlying filesystem used to host a CVS repository. | # underlying filesystem used to host a CVS repository. | ||||
# It is even conceivable that a given repository may use multiple encodings, | # It is even conceivable that a given repository may use multiple encodings, | ||||
# e.g. due to migrations of the repository between different servers over time. | # e.g. due to migrations of the repository between different servers over time. | ||||
# | # | ||||
# This issue also affects the CVS network protocol which is communicating | # This issue also affects the CVS network protocol which is communicating | ||||
Show All 24 Lines | def __init__(self, cvsroot_path: str, fuzzsec: int) -> None: | ||||
self.tags: Dict[str, ChangeSetKey] = dict() | self.tags: Dict[str, ChangeSetKey] = dict() | ||||
self.offsets: Dict[str, Dict[str, int]] = dict() | self.offsets: Dict[str, Dict[str, int]] = dict() | ||||
def _process_rlog_revisions( | def _process_rlog_revisions( | ||||
self, | self, | ||||
path: str, | path: str, | ||||
taginfo: Dict[bytes, bytes], | taginfo: Dict[bytes, bytes], | ||||
revisions: Dict[str, revtuple], | revisions: Dict[str, revtuple], | ||||
logmsgs: Dict[str, Optional[bytes]] | logmsgs: Dict[str, Optional[bytes]], | ||||
) -> None: | ) -> None: | ||||
""" Convert RCS revision history of a file into self.changesets items """ | """ Convert RCS revision history of a file into self.changesets items """ | ||||
rtags: Dict[str, List[str]] = dict() | rtags: Dict[str, List[str]] = dict() | ||||
# RCS and CVS represent branches by adding digits to revision numbers. | # RCS and CVS represent branches by adding digits to revision numbers. | ||||
# And CVS assigns special meaning to certain revision number ranges. | # And CVS assigns special meaning to certain revision number ranges. | ||||
# | # | ||||
# Revision numbers on the main branch have only two digits: | # Revision numbers on the main branch have only two digits: | ||||
# | # | ||||
▲ Show 20 Lines • Show All 111 Lines • ▼ Show 20 Lines | ) -> None: | ||||
a = c | a = c | ||||
self.changesets[a] = a | self.changesets[a] = a | ||||
if k in rtags: | if k in rtags: | ||||
for t in rtags[k]: | for t in rtags[k]: | ||||
if t not in self.tags or self.tags[t].max_time < a.max_time: | if t not in self.tags or self.tags[t].max_time < a.max_time: | ||||
self.tags[t] = a | self.tags[t] = a | ||||
def parse_rlog(self, fp: BinaryIO) -> None: | def parse_rlog(self, fp: BinaryIO) -> None: | ||||
self.changesets = dict() | |||||
self.tags = dict() | |||||
self.offsets = dict() | |||||
eof = None | eof = None | ||||
while eof != _EOF_LOG and eof != _EOF_ERROR: | while eof != _EOF_LOG and eof != _EOF_ERROR: | ||||
filename, branch, taginfo, lockinfo, errmsg, eof = _parse_log_header(fp) | filename, branch, taginfo, lockinfo, errmsg, eof = _parse_log_header(fp) | ||||
revisions: Dict[str, revtuple] = {} | revisions: Dict[str, revtuple] = {} | ||||
logmsgs: Dict[str, Optional[bytes]] = {} | logmsgs: Dict[str, Optional[bytes]] = {} | ||||
path = "" | path = "" | ||||
if filename: | if filename: | ||||
# There is no known encoding of filenames in CVS. | # There is no known encoding of filenames in CVS. | ||||
# Attempt to decode the path with our list of known encodings. | # Attempt to decode the path with our list of known encodings. | ||||
# If none of them work, forcefully decode the path assuming | # If none of them work, forcefully decode the path assuming | ||||
# the final path encoding provided in the list. | # the final path encoding provided in the list. | ||||
for i, e in enumerate(path_encodings): | for i, e in enumerate(path_encodings): | ||||
try: | try: | ||||
how = "ignore" if i == len(path_encodings) - 1 else "strict" | how = "ignore" if i == len(path_encodings) - 1 else "strict" | ||||
fname = filename.decode(e, how) | fname = filename.decode(e, how) | ||||
break | break | ||||
except UnicodeError: | except UnicodeError: | ||||
pass | pass | ||||
path = file_path(self.cvsroot_path, fname) | path = fname | ||||
elif not eof: | elif not eof: | ||||
raise ValueError("No filename found in rlog header") | raise ValueError("No filename found in rlog header") | ||||
while not eof: | while not eof: | ||||
off = fp.tell() | off = fp.tell() | ||||
rev, logmsg, eof = _parse_log_entry(fp) | rev, logmsg, eof = _parse_log_entry(fp) | ||||
if rev: | if rev: | ||||
revisions[rev[0]] = rev | revisions[rev[0]] = rev | ||||
logmsgs[rev[0]] = logmsg | logmsgs[rev[0]] = logmsg | ||||
▲ Show 20 Lines • Show All 43 Lines • ▼ Show 20 Lines | |||||
_re_cvsnt_error = re.compile( | _re_cvsnt_error = re.compile( | ||||
rb"^(?:cvs rcsfile\: |cvs \[rcsfile aborted\]: )" | rb"^(?:cvs rcsfile\: |cvs \[rcsfile aborted\]: )" | ||||
rb"(?:\`(.*,v)' |" | rb"(?:\`(.*,v)' |" | ||||
rb"cannot open (.*,v)\: |(.*,v)\: |)" | rb"cannot open (.*,v)\: |(.*,v)\: |)" | ||||
rb"(.*)$" | rb"(.*)$" | ||||
) | ) | ||||
def _parse_log_header(fp: BinaryIO) -> Tuple[ | def _parse_log_header( | ||||
fp: BinaryIO, | |||||
) -> Tuple[ | |||||
bytes, bytes, Dict[bytes, bytes], Dict[bytes, bytes], bytes, Optional[bytes] | bytes, bytes, Dict[bytes, bytes], Dict[bytes, bytes], bytes, Optional[bytes] | ||||
]: | ]: | ||||
"""Parse and RCS/CVS log header. | """Parse and RCS/CVS log header. | ||||
fp is a file (pipe) opened for reading the log information. | fp is a file (pipe) opened for reading the log information. | ||||
On entry, fp should point to the start of a log entry. | On entry, fp should point to the start of a log entry. | ||||
On exit, fp will have consumed the separator line between the header and | On exit, fp will have consumed the separator line between the header and | ||||
the first revision log. | the first revision log. | ||||
▲ Show 20 Lines • Show All 186 Lines • Show Last 20 Lines |