AnonSec Shell
Server IP : 209.38.156.173  /  Your IP : 216.73.216.122   [ Reverse IP ]
Web Server : Apache/2.4.52 (Ubuntu)
System : Linux lakekumayuhotel 5.15.0-136-generic #147-Ubuntu SMP Sat Mar 15 15:53:30 UTC 2025 x86_64
User : root ( 0)
PHP Version : 8.1.2-1ubuntu2.22
Disable Function : NONE
Domains : 2 Domains
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : OFF  |  Sudo : ON  |  Pkexec : ON
Directory :  /lib/python3/dist-packages/twisted/test/__pycache__/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME ]     [ BACKUP SHELL ]     [ JUMPING ]     [ MASS DEFACE ]     [ SCAN ROOT ]     [ SYMLINK ]     

Current File : /lib/python3/dist-packages/twisted/test/__pycache__/test_stdio.cpython-310.pyc
o

�b�1�@s�dZddlZddlZddlZddlmZddlmZmZm	Z	m
Z
mZddlm
Z
mZddlmZddlmZddlmZdd	lmZmZd
Zeej�Zej�ej�ed<Gdd
�d
e	j�Z Gdd�de�Z!dS)z�
Tests for L{twisted.internet.stdio}.

@var properEnv: A copy of L{os.environ} which has L{bytes} keys/values on POSIX
    platforms and native L{str} keys/values on Windows.
�N)�skipIf)�defer�error�protocol�reactor�stdio)�filepath�log)�
requireModule)�platform)�ConnectionLostNotifyingProtocol)�SkipTest�TestCasesxyz123abc Twisted is great!�
PYTHONPATHc@s4eZdZdZdZdd�Zdd�Zdd�Zd	d
�ZdS)�StandardIOTestProcessProtocola�
    Test helper for collecting output from a child process and notifying
    something when it exits.

    @ivar onConnection: A L{defer.Deferred} which will be called back with
    L{None} when the connection to the child process is established.

    @ivar onCompletion: A L{defer.Deferred} which will be errbacked with the
    failure associated with the child process exiting when it exits.

    @ivar onDataReceived: A L{defer.Deferred} which will be called back with
    this instance whenever C{childDataReceived} is called, or L{None} to
    suppress these callbacks.

    @ivar data: A C{dict} mapping file descriptors to strings containing all
    bytes received from the child process on each file descriptor.
    NcCst��|_t��|_i|_dS�N)r�Deferred�onConnection�onCompletion�data��self�r�9/usr/lib/python3/dist-packages/twisted/test/test_stdio.py�__init__8s


z&StandardIOTestProcessProtocol.__init__cCs|j�d�dSr)r�callbackrrrr�connectionMade=�z,StandardIOTestProcessProtocol.connectionMadecCsB|j�|d�||j|<|jdur|jd}|_|�|�dSdS)z�
        Record all bytes received from the child process in the C{data}
        dictionary.  Fire C{onDataReceived} if it is not L{None}.
        �N)r�get�onDataReceivedr)r�name�bytes�drrr�childDataReceived@s

�z/StandardIOTestProcessProtocol.childDataReceivedcCs|j�|�dSr)rr)r�reasonrrr�processEndedJrz*StandardIOTestProcessProtocol.processEnded)	�__name__�
__module__�__qualname__�__doc__r rrr$r&rrrrr#s
rc@s�eZdZe��red�durdZdd�Zdd�Zdd	�Z	d
d�Z
dd
�Zdd�Zdd�Z
dd�Zdd�Zdd�Zdd�Zee��d�dd��ZdS)�StandardInputOutputTests�win32processNzIOn windows, spawnProcess is not available in the absence of win32process.cOs:tjdd|tjjgt|�}tj|tj|fdti|��S)a_
        Launch a child Python process and communicate with it using the
        given ProcessProtocol.

        @param proto: A L{ProcessProtocol} instance which will be connected
        to the child process.

        @param sibling: The basename of a file containing the Python program
        to run in the child process.

        @param *args: strings which will be passed to the child process on
        the command line as C{argv[2:]}.

        @param **kw: additional arguments to pass to L{reactor.spawnProcess}.

        @return: The L{IProcessTransport} provider for the spawned process.
        s-ms
twisted.test.�env)�sys�
executabler�	__class__r(�list�spawnProcess�	properEnv)r�proto�sibling�args�kwrrr�
_spawnProcessVs��z&StandardInputOutputTests._spawnProcesscs$�fdd�}�fdd�}|�||�S)Ncs��d|���dS)Nz%Process terminated with non-Failure: )�fail)�resultrrr�cbqsz4StandardInputOutputTests._requireFailure.<locals>.cbcs�|�Srr)�err)rrr�ebtsz4StandardInputOutputTests._requireFailure.<locals>.eb)�addCallbacks)rr#rr;r=r)rrr�_requireFailurepsz(StandardInputOutputTests._requireFailurecsL����t�d��t���j}���d�����fdd�}��||�S)z�
        Verify that a protocol connected to L{StandardIO} can disconnect
        itself using C{transport.loseConnection}.
        �Child process logging to sstdio_test_loseconncsbt���}|D]}t�d|���qWd�n1swY��d�j�|�tj�dS)NzChild logged: �)	�openr	�msg�rstrip�failIfInr�trapr�ProcessDone)r%�f�line��errorLogFile�prrrr&�s
��zBStandardInputOutputTests.test_loseConnection.<locals>.processEnded)�mktempr	rCrrr8r?�rr#r&rrJr�test_loseConnectionys	z,StandardInputOutputTests.test_loseConnectioncsf|��}t�d|�t��t���_�fdd�}�j�|�dd�}|��j|�}|�	�d|�|S)z�
        When stdin is closed and the protocol connected to it implements
        L{IHalfCloseableProtocol}, the protocol's C{readConnectionLost} method
        is called.
        r@cs�j}�j��|Sr)r�	transport�
closeStdin)�ignoredr#�rLrr�cbBytes�s
zAStandardInputOutputTests.test_readConnectionLost.<locals>.cbBytescSs|�tj�dSr)rFrrG�r%rrrr&�rzFStandardInputOutputTests.test_readConnectionLost.<locals>.processEndedsstdio_test_halfclose)
rMr	rCrrrr �addCallbackr?r8)rrKrTr&r#rrSr�test_readConnectionLost�s
z0StandardInputOutputTests.test_readConnectionLostc
s^t��z�j�dtdd�Wnty }ztt|���d}~ww��fdd�}���j|�S)z�
        Verify that a write made directly to stdout using L{os.write}
        after StandardIO has finished is reliably received by the
        process reading that stdout.
        sstdio_test_lastwriteT)�usePTYNcs2���jd�t�d�j�d��|�tj�dS)z�
            Asserts that the parent received the bytes written by the child
            immediately after the child starts.
            rAz	Received z) from child, did not find expected bytes.N)�
assertTruer�endswith�UNIQUE_LAST_WRITE_STRINGrFrrGrU�rLrrrr&�s
�zEStandardInputOutputTests.test_lastWriteReceived.<locals>.processEnded)rr8r[�
ValueErrorr
�strr?r)r�er&rr\r�test_lastWriteReceived�s
���z/StandardInputOutputTests.test_lastWriteReceivedc�2t���j}���d���fdd�}��||�S)z�
        Verify that the transport of a protocol connected to L{StandardIO}
        has C{getHost} and C{getPeer} methods.
        sstdio_test_hostpeercs6�jd��\}}��|���|�|�tj�dS)NrA)r�
splitlinesrYrFrrG)r%�host�peerr\rrr&�s

z?StandardInputOutputTests.test_hostAndPeer.<locals>.processEnded�rrr8r?rNrr\r�test_hostAndPeer�s
z)StandardInputOutputTests.test_hostAndPeercra)z�
        Verify that the C{write} method of the transport of a protocol
        connected to L{StandardIO} sends bytes to standard out.
        sstdio_test_writec�"���jdd�|�tj�dS�NrAsok!��assertEqualrrFrrGrUr\rrr&��z9StandardInputOutputTests.test_write.<locals>.processEndedrerNrr\r�
test_write��
z#StandardInputOutputTests.test_writecra)z�
        Verify that the C{writeSequence} method of the transport of a
        protocol connected to L{StandardIO} sends bytes to standard out.
        sstdio_test_writeseqcrgrhrirUr\rrr&�rkzAStandardInputOutputTests.test_writeSequence.<locals>.processEndedrerNrr\r�test_writeSequence�rmz+StandardInputOutputTests.test_writeSequencecCsV|��}t|d��}td�D]
}|�d|f�qWd�|S1s$wY|S)N�wbi�%d
)rMrB�range�write)r�junkPath�junkFile�irrr�	_junkPaths�
��z"StandardInputOutputTests._junkPathcsdt���j}g�ttd�������fdd�����d���j�������fdd�}��||�S)z�
        Verify that the transport of a protocol connected to L{StandardIO}
        is a working L{IProducer} provider.
        �dcs<�r��d���f����d�t�d�d�dSdS)Nrp���g{�G�z�?)�append�poprrr�	callLater)�ign)r�proc�toWrite�writtenrrrs
�z>StandardInputOutputTests.test_producer.<locals>.connectionMadesstdio_test_producercs>���jdd�������dt��f�|�tj�dS)NrArz*Connection lost with %d writes left to go.)rjr�join�assertFalse�lenrFrrGrU)rLrr~rrrr&s
�z<StandardInputOutputTests.test_producer.<locals>.processEnded)rrr1rqr8rrVr?rNr)rrLr}rr~rr�
test_producersz&StandardInputOutputTests.test_producercs>t���j}�������d�����fdd�}��||�S)z�
        Verify that the transport of a protocol connected to L{StandardIO}
        is a working L{IConsumer} provider.
        sstdio_test_consumercsPt�d��}���jd|���Wd�n1swY|�tj�dS)N�rbrA)rBrjr�readrFrrG)r%rH�rsrLrrrr&2s�z<StandardInputOutputTests.test_consumer.<locals>.processEnded)rrrvr8r?rNrr�r�
test_consumer&sz&StandardInputOutputTests.test_consumerzpStandardIO does not accept stdout as an argument to Windows.  Testing redirection to a file is therefore harder.cs�t��}t|�}t��������d��_}��|j	�t
|��d�}t�
�s@t��\}}��tj	|���tj	|�||d<tj|fi|���d�t�������fdd��t�d������fdd	�}|�|�|S)
aE
        If L{StandardIO} is created with a file descriptor which refers to a
        normal file (ie, a file from the filesystem), L{StandardIO.write}
        writes bytes to that file.  In particular, it does not immediately
        consider the file closed or call its protocol's C{connectionLost}
        method.
        ro)�stdout�stdin�cs@�D]}|�kr���dS��d|f�t�d��dS)N�%dr)�loseConnectionrrrr{)�value)�
connection�count�howMany�spinrrr�]szAStandardInputOutputTests.test_normalFileStandardOut.<locals>.spinrcs<��t���d������d�dd�t��D���dS)NrArcss�|]}d|fVqdS)r�Nr)�.0rurrr�	<genexpr>ms�zVStandardInputOutputTests.test_normalFileStandardOut.<locals>.cbLost.<locals>.<genexpr>)rj�next�
getContentr�rqrU)r�r��pathrrr�cbLostjs�zCStandardInputOutputTests.test_normalFileStandardOut.<locals>.cbLost)rrrr�FilePathrMrB�normal�
addCleanup�close�dict�filenor�	isWindows�os�piper�
StandardIO�	itertoolsr�rr{rV)r�
onConnLostr4r��kwargs�r�wr�r)r�r�r�r�rr�r�test_normalFileStandardOut9s&
	
z3StandardInputOutputTests.test_normalFileStandardOut)r'r(r)rr�r
�skipr8r?rOrWr`rfrlrnrvr�r�rr�rrrrr+Ns(�	'�r+)"r*r�r�r.�unittestr�twisted.internetrrrrr�twisted.pythonrr	�twisted.python.reflectr
�twisted.python.runtimer�twisted.test.test_tcpr�twisted.trial.unittestr
rr[r��environr3�pathsepr�r��ProcessProtocolrr+rrrr�<module>s 
+

Anon7 - 2022
AnonSec Team