AnonSec Shell
Server IP : 209.38.156.173  /  Your IP : 216.73.216.122   [ Reverse IP ]
Web Server : Apache/2.4.52 (Ubuntu)
System : Linux lakekumayuhotel 5.15.0-136-generic #147-Ubuntu SMP Sat Mar 15 15:53:30 UTC 2025 x86_64
User : root ( 0)
PHP Version : 8.1.2-1ubuntu2.22
Disable Function : NONE
Domains : 2 Domains
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : OFF  |  Sudo : ON  |  Pkexec : ON
Directory :  /lib/python3/dist-packages/twisted/test/__pycache__/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME ]     [ BACKUP SHELL ]     [ JUMPING ]     [ MASS DEFACE ]     [ SCAN ROOT ]     [ SYMLINK ]     

Current File : /lib/python3/dist-packages/twisted/test/__pycache__/test_unix.cpython-310.pyc
o

�b5�@sdZddlZddlZddlZddlmZddlmZmZm	Z	m
Z
mZmZm
Z
ddlmZddlmZddlmZddlmZmZdd	lmZGd
d�dej�Zee
�ed�d�Gd
d�dej��ZGdd�dej�ZGdd�dej�Z ee
�!ed�d�Gdd�dej��Z"dS)zK
Tests for implementations of L{IReactorUNIX} and L{IReactorUNIXDatagram}.
�N)�skipIf)�address�defer�error�
interfaces�protocol�reactor�utils)�lockfile)�
networkString)�FilePath)�MyClientFactory�MyServerFactory)�unittestc@�eZdZdd�Zdd�ZdS)�FailedConnectionClientFactorycCs
||_dS�N)�onFail)�selfr�r�8/usr/lib/python3/dist-packages/twisted/test/test_unix.py�__init__�
z&FailedConnectionClientFactory.__init__cCs|j�|�dSr)r�errback)r�	connector�reasonrrr�clientConnectionFailedsz4FailedConnectionClientFactory.clientConnectionFailedN)�__name__�
__module__�__qualname__rrrrrrr�r�1This reactor does not support UNIX domain socketsc@sheZdZdZe�ed�sdZdd�Zdd�Z	dd	�Z
d
d�Zdd
�Zdd�Z
dd�Zdd�Zdd�ZdS)�UnixSocketTestsz
    Test unix sockets.
    Nr!cs����}����t��t��}�_t�|��}��|j�t	�	t	j
t	j�}��|j�|�
��|�|����fdd�}|�|�|S)z�
        The address passed to the server factory's C{buildProtocol} method and
        the address returned by the connected protocol's transport's C{getPeer}
        method match the address the client socket is bound to.
        cs0t���}���j|g���|j��|�dSr)r�UNIXAddress�assertEqual�
peerAddresses�	transport�getPeer)�proto�expected��peernamer�
serverFactoryrr�
cbConnMade;s
z1UnixSocketTests.test_peerBind.<locals>.cbConnMade)�mktemprr�Deferred�protocolConnectionMader�
listenUNIX�
addCleanup�
stopListening�socket�AF_UNIX�SOCK_STREAM�close�bind�connect�addCallback)r�filename�connMade�unixPort�
unixSocketr-rr*r�
test_peerBind*s


zUnixSocketTests.test_peerBindcs�����t�}t��}||_t��|�}��|j�t	��t��}|�_t�
���t�||g�}���fdd�}|�|�|S)z�
        L{IReactorUNIX.connectUNIX} can be used to connect a client to a server
        started with L{IReactorUNIX.listenUNIX}.
        cs6|\}}���jt���g�|j��|j��dSr)r$r%rr#r&�loseConnection)�args�serverProtocol�clientProtocol��
clientFactoryr;rrr�allConnectedTs�
z1UnixSocketTests.test_dumber.<locals>.allConnected)
r.rrr/r0rr1r2r3r
�connectUNIX�
gatherResultsr:)rr,�serverConnMader=�clientConnMade�drFrrDr�test_dumberCs
zUnixSocketTests.test_dumbercs�����t�}t��}||_tj�|dd����t�	�d��t
��t��}|�_tj��dd�t�||g�}����fdd�}|�
|���fdd	�}|�
|�|S)
z�
        A lockfile is created and locked when L{IReactorUNIX.listenUNIX} is
        called and released when the Deferred returned by the L{IListeningPort}
        provider's C{stopListening} method is called back.
        T��wantPID�.lock���checkPIDcs:|\}}���jt���g�|j��|j�����Sr)r$r%rr#r&r@r3)rArB�clientProto�rEr;rr=rr�
_portStuffys�

z0UnixSocketTests.test_pidFile.<locals>._portStuffcs��t��d�d�dS)NrO�locked)�assertFalser
�isLocked��ignored)r;rrr�_check�sz,UnixSocketTests.test_pidFile.<locals>._check)r.rrr/r0rr1�
assertTruer
rXr
rGrHr:)rr,rIrJrKrUr[rrTr�test_pidFilebs 


zUnixSocketTests.test_pidFilecsR|���t��tj��dd�}|jtjtj��dd���fdd�}|���|�S)z�
        L{IReactorUNIX.listenUNIX} raises L{error.CannotListenError} if passed
        the name of a file on which a server is already listening.
        TrMcstj��dd�}|��S�NTrM)rr1r3)�ignr=�r;r,rr�stoppedListening�sz<UnixSocketTests.test_socketLocking.<locals>.stoppedListening)	r.rrr1�assertRaisesr�CannotListenErrorr3r:)rr=rarr`r�test_socketLocking�s�z"UnixSocketTests.test_socketLockingcCsj|��|_td|jf�}dttj�tj���	�ji}ttj
��	�j}tj|dd|f|d�}|�
|�|S)Nzmfrom twisted.internet import protocol, reactor
reactor.listenUNIX(%r, protocol.ServerFactory(),wantPID=True)
s
PYTHONPATHs-us-c)�env)r.r;rr�os�pathsep�join�sys�path�asBytesMode�
executabler	�getProcessValuer:)r�callback�sourcere�pyExerKrrr�_uncleanSocketTest�s
��
z"UnixSocketTests._uncleanSocketTestc��fdd�}��|�S)a"
        If passed C{True} for the C{wantPID} parameter, a server can be started
        listening with L{IReactorUNIX.listenUNIX} when passed the name of a
        file on which a previous server which has not exited cleanly has been
        listening using the C{wantPID} option.
        cstj�jt�dd�}|��Sr^)rr1r;rr3)r_�p�rrr�ranStupidChild�szGUnixSocketTests.test_uncleanServerSocketLocking.<locals>.ranStupidChild�rq�rrurrtr�test_uncleanServerSocketLocking�s
z/UnixSocketTests.test_uncleanServerSocketLockingcrr)z�
        If passed C{True} for the C{checkPID} parameter, a client connection
        attempt made with L{IReactorUNIX.connectUNIX} fails with
        L{error.BadFileError}.
        cs0t��}t|�}tj�j|dd���|tj�S)NTrQ)	rr/rrrGr;�
assertFailurer�BadFileError)r_rK�frtrrru�szCUnixSocketTests.test_connectToUncleanServer.<locals>.ranStupidChildrvrwrrtr�test_connectToUncleanServer�s
z+UnixSocketTests.test_connectToUncleanServerc�p���}t�||��d��d|�d�}��t��|���t��|�t��j�}���fdd�}|�	|�|S)z~
        Test the C{__str__} and C{__repr__} implementations of a UNIX port when
        used with the given factory.
        �<� on �>c�0d��d�}��t��|���t��|�dS�Nr~z (not listening)>�r$�repr�str�r_�unconnectedString��factoryNamerr=rrra��z3UnixSocketTests._reprTest.<locals>.stoppedListening)
r.rr1r$r�r�r�
maybeDeferredr3r:)rr,r�r;�connectedStringrKrarr�r�	_reprTest��
zUnixSocketTests._reprTestcC�(Gdd�d�}|�|t�|�|�d�S)a!
        The two string representations of the L{IListeningPort} returned by
        L{IReactorUNIX.listenUNIX} contains the name of the new-style factory
        class being used and the filename on which the port is listening or
        indicates that the port is not listening.
        c@r)zEUnixSocketTests.test_reprWithNewStyleFactory.<locals>.NewStyleFactorycS�dSrrrtrrr�doStart��zMUnixSocketTests.test_reprWithNewStyleFactory.<locals>.NewStyleFactory.doStartcSr�rrrtrrr�doStop�r�zLUnixSocketTests.test_reprWithNewStyleFactory.<locals>.NewStyleFactory.doStopN)rrrr�r�rrrr�NewStyleFactory�r r�z&twisted.test.test_unix.NewStyleFactory��assertIsInstance�typer�)rr�rrr�test_reprWithNewStyleFactory��
�z,UnixSocketTests.test_reprWithNewStyleFactory)rrr�__doc__r�IReactorUNIXr�skipr?rLr]rdrqrxr|r�r�rrrrr"s,r"c@s8eZdZdZZdZdd�Zdd�Zdd�Zd	d
�Z	dS)�ClientProtoFNcC�t��|_t��|_dSr)rr/�deferredStarted�deferredGotBackrtrrrr�
zClientProto.__init__cC�
d|_dS�NT��stoppedrtrrr�stopProtocol	rzClientProto.stopProtocolcC�d|_|j�d�dSr���startedr�rnrtrrr�
startProtocol�zClientProto.startProtocolcCs||_|j�d�dSr)�gotbackr�rn)r�datarrr�datagramReceivedr�zClientProto.datagramReceived)
rrrr�r�r�rr�r�r�rrrrr�sr�c@s<eZdZdZZdZZdd�Zdd�Zdd�Z	d	d
�Z
dS)�ServerProtoFNcCr�r)rr/r��deferredGotWhatrtrrrrr�zServerProto.__init__cCr�r�r�rtrrrr�rzServerProto.stopProtocolcCr�r�r�rtrrrr� r�zServerProto.startProtocolcCs*||_|j�d|�||_|j�d�dS)N�hi back)�gotfromr&�write�gotwhatr�rn)rr��addrrrrr�$szServerProto.datagramReceived)rrrr�r�r�r�rr�r�r�rrrrr�sr�z3This reactor does not support UNIX datagram socketsc@s0eZdZdZdd�Zdd�Zdd�Zdd	�Zd
S)�DatagramUnixSocketTestsz%
    Test datagram UNIX sockets.
    cs��������}t��t��t�|��}��|j�tj|��d�}��|j�t�	�j
�j
g�}��fdd�}����fdd�}|�|�|�|�|S)zf
        Test that a datagram can be sent to and received by a server and vice
        versa.
        )�bindAddresscs�j�d�t��j�jg�S)N�hi)r&r�rrHr�r�rY)�cp�sprrr�Dsz4DatagramUnixSocketTests.test_exchange.<locals>.writecs.��d�j�����j���d�j�dS)Nr�r�)r$r�r�r�rY��
clientaddrr�rr�rr�_cbTestExchangeHsz>DatagramUnixSocketTests.test_exchange.<locals>._cbTestExchange)r.r�r�r�listenUNIXDatagramr2r3�connectUNIXDatagramrrHr�r:)r�
serveraddr�s�crKr�r�rr�r�
test_exchange4s

z%DatagramUnixSocketTests.test_exchangecCsD|��}t�}t�||�}|�tjtj||�|��t�	|�dS)z�
        L{IReactorUNIXDatagram.listenUNIXDatagram} raises
        L{error.CannotListenError} if the unix socket specified is already in
        use.
        N)
r.r�rr�rbrrcr3rf�unlink)rr�rsr�rrr�test_cannotListenQsz)DatagramUnixSocketTests.test_cannotListencr})z�
        Test the C{__str__} and C{__repr__} implementations of a UNIX datagram
        port when used with the given protocol.
        r~rr�cr�r�r�r���protocolNamerr=rrranr�z;DatagramUnixSocketTests._reprTest.<locals>.stoppedListening)
r.rr�r$r�r�rr�r3r:)r�serverProtor�r;r��stopDeferredrarr�rr�`r�z!DatagramUnixSocketTests._reprTestcCr�)a2
        The two string representations of the L{IListeningPort} returned by
        L{IReactorUNIXDatagram.listenUNIXDatagram} contains the name of the
        new-style protocol class being used and the filename on which the port
        is listening or indicates that the port is not listening.
        c@r)zODatagramUnixSocketTests.test_reprWithNewStyleProtocol.<locals>.NewStyleProtocolcSr�rr)rr&rrr�makeConnectionr�z^DatagramUnixSocketTests.test_reprWithNewStyleProtocol.<locals>.NewStyleProtocol.makeConnectioncSr�rrrtrrrr��r�zVDatagramUnixSocketTests.test_reprWithNewStyleProtocol.<locals>.NewStyleProtocol.doStopN)rrrr�r�rrrr�NewStyleProtocol~r r�z'twisted.test.test_unix.NewStyleProtocolr�)rr�rrr�test_reprWithNewStyleProtocolvr�z5DatagramUnixSocketTests.test_reprWithNewStyleProtocolN)rrrr�r�r�r�r�rrrrr�+sr�)#r�rfr4rirr�twisted.internetrrrrrrr	�twisted.pythonr
�twisted.python.compatr�twisted.python.filepathr�twisted.test.test_tcpr
r�
twisted.trial�
ClientFactoryrr��TestCaser"�ConnectedDatagramProtocolr��DatagramProtocolr��IReactorUNIXDatagramr�rrrr�<module>s2$�`�

Anon7 - 2022
AnonSec Team