Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F8395161
test_unixsocketendpoint.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
2 KB
Subscribers
None
test_unixsocketendpoint.py
View Options
# Copyright DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License
try
:
import
unittest2
as
unittest
except
ImportError
:
import
unittest
# noqa
import
time
import
subprocess
import
logging
from
cassandra.cluster
import
ExecutionProfile
,
EXEC_PROFILE_DEFAULT
from
cassandra.connection
import
UnixSocketEndPoint
from
cassandra.policies
import
WhiteListRoundRobinPolicy
,
RoundRobinPolicy
from
tests
import
notwindows
from
tests.integration
import
use_single_node
,
TestCluster
log
=
logging
.
getLogger
()
log
.
setLevel
(
'DEBUG'
)
UNIX_SOCKET_PATH
=
'/tmp/cass.sock'
def
setup_module
():
use_single_node
()
class
UnixSocketWhiteListRoundRobinPolicy
(
WhiteListRoundRobinPolicy
):
def
__init__
(
self
,
hosts
):
self
.
_allowed_hosts
=
self
.
_allowed_hosts_resolved
=
tuple
(
hosts
)
RoundRobinPolicy
.
__init__
(
self
)
@notwindows
class
UnixSocketTest
(
unittest
.
TestCase
):
@classmethod
def
setUpClass
(
cls
):
log
.
debug
(
"Starting socat..."
)
cls
.
proc
=
subprocess
.
Popen
(
[
'socat'
,
'UNIX-LISTEN:
%s
,fork'
%
UNIX_SOCKET_PATH
,
'TCP:localhost:9042'
],
stdout
=
subprocess
.
PIPE
,
stderr
=
subprocess
.
STDOUT
)
time
.
sleep
(
1
)
if
cls
.
proc
.
poll
()
is
not
None
:
for
line
in
cls
.
proc
.
stdout
.
readlines
():
log
.
debug
(
"socat: "
+
line
.
decode
(
'utf-8'
))
raise
Exception
(
"Error while starting socat. Return code:
%d
"
%
cls
.
proc
.
returncode
)
lbp
=
UnixSocketWhiteListRoundRobinPolicy
([
UNIX_SOCKET_PATH
])
ep
=
ExecutionProfile
(
load_balancing_policy
=
lbp
)
endpoint
=
UnixSocketEndPoint
(
UNIX_SOCKET_PATH
)
cls
.
cluster
=
TestCluster
(
contact_points
=
[
endpoint
],
execution_profiles
=
{
EXEC_PROFILE_DEFAULT
:
ep
})
@classmethod
def
tearDownClass
(
cls
):
cls
.
cluster
.
shutdown
()
cls
.
proc
.
terminate
()
def
test_unix_socket_connection
(
self
):
s
=
self
.
cluster
.
connect
()
s
.
execute
(
'select * from system.local'
)
File Metadata
Details
Attached
Mime Type
text/x-python
Expires
Jun 4 2025, 7:35 PM (10 w, 22 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3399250
Attached To
rPCASS python-cassandra-driver
Event Timeline
Log In to Comment