AnonSec Shell
Server IP : 209.38.156.173  /  Your IP : 216.73.216.122   [ Reverse IP ]
Web Server : Apache/2.4.52 (Ubuntu)
System : Linux lakekumayuhotel 5.15.0-136-generic #147-Ubuntu SMP Sat Mar 15 15:53:30 UTC 2025 x86_64
User : root ( 0)
PHP Version : 8.1.2-1ubuntu2.22
Disable Function : NONE
Domains : 2 Domains
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : OFF  |  Sudo : ON  |  Pkexec : ON
Directory :  /lib/python3/dist-packages/twisted/internet/test/__pycache__/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME ]     [ BACKUP SHELL ]     [ JUMPING ]     [ MASS DEFACE ]     [ SCAN ROOT ]     [ SYMLINK ]     

Current File : /lib/python3/dist-packages/twisted/internet/test/__pycache__/test_testing.cpython-310.pyc
o

�bo?�@s�dZddlmZddlmZddlmZmZmZm	Z	m
Z
mZmZm
Z
mZddlmZmZddlmZmZmZmZddlmZddlmZGd	d
�d
e�ZGdd�de�ZGd
d�d�ZGdd�de�ZGdd�de�ZdS)z(
Tests for L{twisted.internet.testing}.
�)�verifyObject)�IPv4Address)	�IAddress�
IConnector�	IConsumer�IListeningPort�
IPushProducer�IReactorSSL�IReactorTCP�IReactorUNIX�
ITransport)�
ClientFactory�Factory)�
MemoryReactor�NonStreamingProducer�RaisingMemoryReactor�StringTransport)�namedAny)�TestCasec@s�eZdZdZdd�Zdd�Zdd�Zdd	�Zd
d�Zdd
�Z	dd�Z
dd�Zdd�Zdd�Z
dd�Zdd�Zdd�Zdd�Zdd�Zd d!�Zd"d#�Zd$d%�Zd&d'�Zd(S))�StringTransportTestsz@
    Tests for L{twisted.internet.testing.StringTransport}.
    cCst�|_dS�N)r�	transport��self�r�D/usr/lib/python3/dist-packages/twisted/internet/test/test_testing.py�setUp&szStringTransportTests.setUpcCs:|�tt|j��|�tt|j��|�tt|j��dS)zq
        L{StringTransport} instances provide L{ITransport}, L{IPushProducer},
        and L{IConsumer}.
        N)�
assertTruerrrrrrrrr�test_interfaces)sz$StringTransportTests.test_interfacescCs>t�}t�}|j�||�|�|jj|�|�|jj|�dS)zz
        L{StringTransport.registerProducer} records the arguments supplied to
        it as instance attributes.
        N)�objectr�registerProducer�assertIs�producer�	streaming�rr"r#rrr�test_registerProducer2s
z*StringTransportTests.test_registerProducercCsLt�}|j�|d�|�t|jjt�d�|�|jj|�|�|jj�dS)zy
        L{StringTransport.registerProducer} raises L{RuntimeError} if a
        producer is already registered.
        TFN)	rrr �assertRaises�RuntimeErrorr!r"rr#)rr"rrr�test_disallowedRegisterProducer=s�z4StringTransportTests.test_disallowedRegisterProducercCsbt�}t�}|j�|d�|j��|�|jj�|j�|d�|�|jj|�|�|jj�dS)z�
        L{StringTransport.unregisterProducer} causes the transport to forget
        about the registered producer and makes it possible to register a new
        one.
        FTN)	rrr �unregisterProducer�assertIsNoner"r!rr#)r�oldProducer�newProducerrrr�test_unregisterProducerJs
z,StringTransportTests.test_unregisterProducercCs|�t|jj�dS)z�
        L{StringTransport.unregisterProducer} raises L{RuntimeError} if called
        when no producer is registered.
        N)r&r'rr)rrrr�test_invalidUnregisterProducerYsz3StringTransportTests.test_invalidUnregisterProducercCs|�|jjd�dS)zO
        L{StringTransport.producerState} is initially C{'producing'}.
        �	producingN)�assertEqualr�
producerStaterrrr�test_initialProducerState`sz.StringTransportTests.test_initialProducerStatecC�|j��|�|jjd�dS)zy
        L{StringTransport.pauseProducing} changes the C{producerState} of the
        transport to C{'paused'}.
        �pausedN)r�pauseProducingr0r1rrrr�test_pauseProducingf�
z(StringTransportTests.test_pauseProducingcCs(|j��|j��|�|jjd�dS)z}
        L{StringTransport.resumeProducing} changes the C{producerState} of the
        transport to C{'producing'}.
        r/N)rr5�resumeProducingr0r1rrrr�test_resumeProducingns

z)StringTransportTests.test_resumeProducingcCr3)z{
        L{StringTransport.stopProducing} changes the C{'producerState'} of the
        transport to C{'stopped'}.
        �stoppedN)r�
stopProducingr0r1rrrr�test_stopProducingwr7z'StringTransportTests.test_stopProducingcC�|j��|�t|jj�dS)zu
        L{StringTransport.pauseProducing} raises L{RuntimeError} if the
        transport has been stopped.
        N)rr;r&r'r5rrrr� test_stoppedTransportCannotPauser7z5StringTransportTests.test_stoppedTransportCannotPausecCr=)zv
        L{StringTransport.resumeProducing} raises L{RuntimeError} if the
        transport has been stopped.
        N)rr;r&r'r8rrrr�!test_stoppedTransportCannotResume�r7z6StringTransportTests.test_stoppedTransportCannotResumecCr=)zz
        L{StringTransport.pauseProducing} raises L{RuntimeError} if the
        transport is being disconnected.
        N)r�loseConnectionr&r'r5rrrr�&test_disconnectingTransportCannotPause�r7z;StringTransportTests.test_disconnectingTransportCannotPausecCr=)z{
        L{StringTransport.resumeProducing} raises L{RuntimeError} if the
        transport is being disconnected.
        N)rr@r&r'r8rrrr�'test_disconnectingTransportCannotResume�r7z<StringTransportTests.test_disconnectingTransportCannotResumecCs*|�|jj�|j��|�|jj�dS)zv
        L{StringTransport.loseConnection} toggles the C{disconnecting} instance
        variable to C{True}.
        N)�assertFalser�
disconnectingr@rrrrr�$test_loseConnectionSetsDisconnecting�s
z9StringTransportTests.test_loseConnectionSetsDisconnectingcCst�}|�t|���|�dS)z�
        If a host address is passed to L{StringTransport.__init__}, that
        value is returned from L{StringTransport.getHost}.
        N)rr!r�getHost�r�addressrrr�test_specifiedHostAddress�sz.StringTransportTests.test_specifiedHostAddresscCs t�}|�t|d���|�dS)z�
        If a peer address is passed to L{StringTransport.__init__}, that
        value is returned from L{StringTransport.getPeer}.
        )�peerAddressN)rr!r�getPeerrGrrr�test_specifiedPeerAddress�sz.StringTransportTests.test_specifiedPeerAddresscC�t���}|�|t�dS)z�
        If no host address is passed to L{StringTransport.__init__}, an
        L{IPv4Address} is returned from L{StringTransport.getHost}.
        N)rrF�assertIsInstancerrGrrr�test_defaultHostAddress��
z,StringTransportTests.test_defaultHostAddresscCrM)z�
        If no peer address is passed to L{StringTransport.__init__}, an
        L{IPv4Address} is returned from L{StringTransport.getPeer}.
        N)rrKrNrrGrrr�test_defaultPeerAddress�rPz,StringTransportTests.test_defaultPeerAddressN)�__name__�
__module__�__qualname__�__doc__rrr%r(r-r.r2r6r9r<r>r?rArBrErIrLrOrQrrrrr!s*	
		rc@s@eZdZdZdd�Zdd�Zdd�Zdd	�Zd
d�Zdd
�Z	dS)�ReactorTestszA
    Tests for L{MemoryReactor} and L{RaisingMemoryReactor}.
    cC�(t�}tt|�tt|�tt|�dS)zt
        L{MemoryReactor} provides all of the attributes described by the
        interfaces it advertises.
        N)rrr
r	r)r�
memoryReactorrrr�test_memoryReactorProvides��

z'ReactorTests.test_memoryReactorProvidescCrW)z{
        L{RaisingMemoryReactor} provides all of the attributes described by the
        interfaces it advertises.
        N)rrr
r	r)r�raisingReactorrrr�test_raisingReactorProvides�rZz(ReactorTests.test_raisingReactorProvidescCs�t�}|�ddt��|�ddt�d�fD]}tt|�|��}tt|�|�|j	d�|�|j
d�q|�dt��}tt|�|��}tt|�|�|jd�dS)a
        L{MemoryReactor.connectTCP}, L{MemoryReactor.connectSSL}, and
        L{MemoryReactor.connectUNIX} will return an L{IConnector} whose
        C{getDestination} method returns an L{IAddress} with attributes which
        reflect the values passed.
        ztest.example.comi� Ns
/fake/path)
r�
connectTCPr
�
connectSSLrr�getDestinationrr0�host�port�connectUNIX�name)rrX�	connectorrHrrr�test_connectDestination�s�



z$ReactorTests.test_connectDestinationcCs�t�}|�dt��|�dt�d�fD]}tt|�|��}tt|�|�|j	d�|�|j
d�q|�dt��}tt|�|��}tt|�|�|jd�dS)a�
        L{MemoryReactor.listenTCP}, L{MemoryReactor.listenSSL} and
        L{MemoryReactor.listenUNIX} will return an L{IListeningPort} whose
        C{getHost} method returns an L{IAddress}; C{listenTCP} and C{listenSSL}
        will have a default host of C{'0.0.0.0'}, and a port that reflects the
        value passed, and C{listenUNIX} will have a name that reflects the path
        passed.
        i2 Nz0.0.0.0s/path/to/socket)
r�	listenTCPr�	listenSSLrrrFrr0r`ra�
listenUNIXrc)rrXrarHrrr�test_listenDefaultHost�s	�



z#ReactorTests.test_listenDefaultHostcC�Pt�}t�}|�|�|�|�|�|��|g�|�|�|�|��g�dS)z>
        Adding, removing, and listing readers works.
        N)rr�	addReaderr0�
getReaders�removeReader)r�reader�reactorrrr�test_readers�


zReactorTests.test_readerscCrj)z>
        Adding, removing, and listing writers works.
        N)rr�	addWriterr0�
getWriters�removeWriter)r�writerrorrr�test_writers"rqzReactorTests.test_writersN)
rRrSrTrUrYr\rerirprvrrrrrV�s

rVc@s0eZdZdZdd�Zdd�Zdd�Zdd	�Zd
S)�TestConsumerzP
    A very basic test consumer for use with the NonStreamingProducerTests.
    cCsg|_d|_d|_dSr)�writesr"�producerStreamingrrrr�__init__8s
zTestConsumer.__init__cCs||_||_dS)z�
        Registers a single producer with this consumer. Just keeps track of it.

        @param producer: The producer to register.
        @param streaming: Whether the producer is a streaming one or not.
        N�r"ryr$rrrr =s
zTestConsumer.registerProducercCsd|_d|_dS)zC
        Forget the producer we had previously registered.
        Nr{rrrrr)Gs
zTestConsumer.unregisterProducercCs|j�|�dS)zz
        Some data was written to the consumer: stores it for later use.

        @param data: The data to write.
        N)rx�append)r�datarrr�writeNszTestConsumer.writeN)rRrSrTrUrzr r)r~rrrrrw3s
rwc@s eZdZdZdd�Zdd�ZdS)�NonStreamingProducerTestszF
    Tests for the L{NonStreamingProducer} to validate behaviour.
    cCs�t�}t|�}|�|d�|�|j|�|�|j|�|�|j�td�D]}|�	�q%gd�}|�
|j�|�
|j�|�
|j�|�|j|�|�
t|j	�dS)z�
        When the L{NonStreamingProducer} has resumeProducing called 10 times,
        it writes the counter each time and then fails.
        F�
)
�0�1�2�3�4�5�6�7�8�9N)rwrr r!r"�consumerrCry�ranger8r*r0rxr&r')rr�r"�_�expectedWritesrrr�test_producesOnly10Times\s
z2NonStreamingProducerTests.test_producesOnly10TimescCs4t�}t|�}|�|d�|��|�t|j�dS)zb
        When the L{NonStreamingProducer} is paused, it raises a
        L{RuntimeError}.
        FN)rwrr r8r&r'r5)rr�r"rrr�test_cannotPauseProductionvs
z4NonStreamingProducerTests.test_cannotPauseProductionN)rRrSrTrUr�r�rrrrrWsrc@s�eZdZdZdd�Zdd�Zdd�Zdd	�Zd
d�Zdd
�Z	dd�Z
dd�Zdd�Zdd�Z
dd�Zdd�Zdd�Zdd�Zdd�Zd S)!�DeprecationTestsz8
    Deprecations in L{twisted.test.proto_helpers}.
    cCsdd|j��}|�|g�}|�t|dd�|�dt|��|�||dd�|�|t|��dS)Nztwisted.internet.testing.r�category��message)rR�
flushWarningsr0�DeprecationWarning�len�assertInr!r)r�test�obj�new_path�warningsrrr�helper�szDeprecationTests.helpercC�ddlm}|�|j|�dS)Nr)�AccumulatingProtocol)�twisted.test.proto_helpersr�r��test_accumulatingProtocol)rr�rrrr���z*DeprecationTests.test_accumulatingProtocolcCr�)Nr)�LineSendingProtocol)r�r�r��test_lineSendingProtocol)rr�rrrr��r�z)DeprecationTests.test_lineSendingProtocolcCr�)Nr)�FakeDatagramTransport)r�r�r��test_fakeDatagramTransport)rr�rrrr��r�z+DeprecationTests.test_fakeDatagramTransportcCr�)Nr)r)r�rr��test_stringTransport)rrrrrr��r�z%DeprecationTests.test_stringTransportcCr�)Nr)� StringTransportWithDisconnection)r�r�r��%test_stringTransportWithDisconnection)rr�rrrr��s�z6DeprecationTests.test_stringTransportWithDisconnectioncCr�)Nr)�StringIOWithoutClosing)r�r�r��test_stringIOWithoutClosing)rr�rrrr��r�z,DeprecationTests.test_stringIOWithoutClosingcCr�)Nr)�_FakeConnector)r�r�r��test__fakeConnector)rr�rrrr��r�z$DeprecationTests.test__fakeConnectorcCr�)Nr)�	_FakePort)r�r�r��test__fakePort)rr�rrrr��r�zDeprecationTests.test__fakePortcCr�)Nr)r)r�rr��test_memoryReactor)rrrrrr��r�z#DeprecationTests.test_memoryReactorcCr�)Nr)�MemoryReactorClock)r�r�r��test_memoryReactorClock)rr�rrrr��r�z(DeprecationTests.test_memoryReactorClockcCr�)Nr)r)r�rr��test_raisingMemoryReactor)rrrrrr��r�z*DeprecationTests.test_raisingMemoryReactorcCr�)Nr)r)r�rr��test_nonStreamingProducer)rrrrrr��r�z*DeprecationTests.test_nonStreamingProducercCr�)Nr)�waitUntilAllDisconnected)r�r�r��test_waitUntilAllDisconnected)rr�rrrr��r�z.DeprecationTests.test_waitUntilAllDisconnectedcCr�)Nr)�EventLoggingObserver)r�r�r��test_eventLoggingObserver)rr�rrrr��r�z*DeprecationTests.test_eventLoggingObserverN)rRrSrTrUr�r�r�r�r�r�r�r�r�r�r�r�r�r�r�rrrrr��s"r�N) rU�zope.interface.verifyr�twisted.internet.addressr�twisted.internet.interfacesrrrrrr	r
rr�twisted.internet.protocolr
r�twisted.internet.testingrrrr�twisted.python.reflectr�twisted.trial.unittestrrrVrwrr�rrrr�<module>s,)j$.

Anon7 - 2022
AnonSec Team