Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/cvs/cvsclient.py
# Copyright (C) 2015-2021 The Software Heritage developers | # Copyright (C) 2015-2022 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU Affero General Public License version 3, or any later version | # License: GNU Affero General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
"""Minimal CVS client implementation | """Minimal CVS client implementation | ||||
""" | """ | ||||
▲ Show 20 Lines • Show All 71 Lines • ▼ Show 20 Lines | def scramble_password(password): | ||||
return "".join(s) | return "".join(s) | ||||
class CVSProtocolError(Exception): | class CVSProtocolError(Exception): | ||||
pass | pass | ||||
class CVSClient: | class CVSClient: | ||||
def connect_pserver(self, hostname, port, auth): | def connect_pserver(self, hostname, port, username, password): | ||||
if port is None: | if port is None: | ||||
port = CVS_PSERVER_PORT | port = CVS_PSERVER_PORT | ||||
if auth is None: | if username is None: | ||||
raise NotFound( | raise NotFound( | ||||
"Username and password are required for " | "Username is required for " | ||||
"a pserver connection: %s" % EXAMPLE_PSERVER_URL | |||||
) | |||||
try: | |||||
user = auth.split(":")[0] | |||||
password = auth.split(":")[1] | |||||
except IndexError: | |||||
raise NotFound( | |||||
"Username and password are required for " | |||||
"a pserver connection: %s" % EXAMPLE_PSERVER_URL | "a pserver connection: %s" % EXAMPLE_PSERVER_URL | ||||
) | ) | ||||
try: | try: | ||||
self.socket = socket.create_connection((hostname, port)) | self.socket = socket.create_connection((hostname, port)) | ||||
except ConnectionRefusedError: | except ConnectionRefusedError: | ||||
raise NotFound("Could not connect to %s:%s", hostname, port) | raise NotFound("Could not connect to %s:%s", hostname, port) | ||||
scrambled_password = scramble_password(password) | # use empty password if it is None | ||||
scrambled_password = scramble_password(password or "") | |||||
request = "BEGIN AUTH REQUEST\n%s\n%s\n%s\nEND AUTH REQUEST\n" % ( | request = "BEGIN AUTH REQUEST\n%s\n%s\n%s\nEND AUTH REQUEST\n" % ( | ||||
self.cvsroot_path, | self.cvsroot_path, | ||||
user, | username, | ||||
scrambled_password, | scrambled_password, | ||||
) | ) | ||||
print("Request: %s\n" % request) | print("Request: %s\n" % request) | ||||
self.socket.sendall(request.encode("UTF-8")) | self.socket.sendall(request.encode("UTF-8")) | ||||
response = self.conn_read_line() | response = self.conn_read_line() | ||||
if response != b"I LOVE YOU\n": | if response != b"I LOVE YOU\n": | ||||
raise NotFound( | raise NotFound( | ||||
"pserver authentication failed for %s:%s: %s" | "pserver authentication failed for %s:%s: %s" | ||||
% (hostname, port, response) | % (hostname, port, response) | ||||
) | ) | ||||
def connect_ssh(self, hostname, port, auth): | def connect_ssh(self, hostname, port, username): | ||||
command = ["ssh"] | command = ["ssh"] | ||||
if auth is not None: | if username is not None: | ||||
# Assume 'auth' contains only a user name. | # Assume 'auth' contains only a user name. | ||||
# We do not support password authentication with SSH since the | # We do not support password authentication with SSH since the | ||||
# anoncvs user is usually granted access without a password. | # anoncvs user is usually granted access without a password. | ||||
command += ["-l", "%s" % auth] | command += ["-l", "%s" % username] | ||||
if port is not None: | if port is not None: | ||||
command += ["-p", "%d" % port] | command += ["-p", "%d" % port] | ||||
# accept new SSH hosts keys upon first use; changed host keys | # accept new SSH hosts keys upon first use; changed host keys | ||||
# will require intervention | # will require intervention | ||||
command += ["-o", "StrictHostKeyChecking=accept-new"] | command += ["-o", "StrictHostKeyChecking=accept-new"] | ||||
# disable interactive prompting | # disable interactive prompting | ||||
command += ["-o", "BatchMode=yes"] | command += ["-o", "BatchMode=yes"] | ||||
# disable further option processing by adding '--' | # disable further option processing by adding '--' | ||||
command += ["--"] | command += ["--"] | ||||
command += ["%s" % hostname, "cvs", "server"] | command += ["%s" % hostname, "cvs", "server"] | ||||
# use non-buffered I/O to match behaviour of self.socket | # use non-buffered I/O to match behaviour of self.socket | ||||
self.ssh = subprocess.Popen( | self.ssh = subprocess.Popen( | ||||
command, bufsize=0, stdin=subprocess.PIPE, stdout=subprocess.PIPE | command, bufsize=0, stdin=subprocess.PIPE, stdout=subprocess.PIPE | ||||
) | ) | ||||
def connect_fake(self, hostname, port, auth): | def connect_fake(self): | ||||
command = ["cvs", "server"] | command = ["cvs", "server"] | ||||
# use non-buffered I/O to match behaviour of self.socket | # use non-buffered I/O to match behaviour of self.socket | ||||
self.ssh = subprocess.Popen( | self.ssh = subprocess.Popen( | ||||
command, bufsize=0, stdin=subprocess.PIPE, stdout=subprocess.PIPE | command, bufsize=0, stdin=subprocess.PIPE, stdout=subprocess.PIPE | ||||
) | ) | ||||
def conn_read_line(self, require_newline=True): | def conn_read_line(self, require_newline=True): | ||||
if len(self.linebuffer) != 0: | if len(self.linebuffer) != 0: | ||||
▲ Show 20 Lines • Show All 55 Lines • ▼ Show 20 Lines | def conn_close(self): | ||||
"Could not terminate " "ssh program: %s" % e | "Could not terminate " "ssh program: %s" % e | ||||
) | ) | ||||
def __init__(self, url): | def __init__(self, url): | ||||
""" | """ | ||||
Connect to a CVS server at the specified URL and perform the initial | Connect to a CVS server at the specified URL and perform the initial | ||||
CVS protocol handshake. | CVS protocol handshake. | ||||
""" | """ | ||||
self.hostname = url.host | self.hostname = url.hostname | ||||
self.cvsroot_path = os.path.dirname(url.path) | self.cvsroot_path = os.path.dirname(url.path) | ||||
self.cvs_module_name = os.path.basename(url.path) | self.cvs_module_name = os.path.basename(url.path) | ||||
self.socket = None | self.socket = None | ||||
self.ssh = None | self.ssh = None | ||||
self.linebuffer = list() | self.linebuffer = list() | ||||
self.incomplete_line = b"" | self.incomplete_line = b"" | ||||
if url.scheme == "pserver": | if url.scheme == "pserver": | ||||
self.connect_pserver(url.host, url.port, url.auth) | self.connect_pserver(url.hostname, url.port, url.username, url.password) | ||||
elif url.scheme == "ssh": | elif url.scheme == "ssh": | ||||
self.connect_ssh(url.host, url.port, url.auth) | self.connect_ssh(url.hostname, url.port, url.username) | ||||
elif url.scheme == "fake": | elif url.scheme == "fake": | ||||
self.connect_fake(url.host, url.port, url.auth) | self.connect_fake() | ||||
else: | else: | ||||
raise NotFound("Invalid CVS origin URL '%s'" % url) | raise NotFound("Invalid CVS origin URL '%s'" % url) | ||||
# we should have a connection now | # we should have a connection now | ||||
assert self.socket or self.ssh | assert self.socket or self.ssh | ||||
self.conn_write_str( | self.conn_write_str( | ||||
"Root %s\nValid-responses %s\nvalid-requests\n" | "Root %s\nValid-responses %s\nvalid-requests\n" | ||||
▲ Show 20 Lines • Show All 185 Lines • Show Last 20 Lines |