AnonSec Shell
Server IP : 209.38.156.173  /  Your IP : 216.73.216.122   [ Reverse IP ]
Web Server : Apache/2.4.52 (Ubuntu)
System : Linux lakekumayuhotel 5.15.0-136-generic #147-Ubuntu SMP Sat Mar 15 15:53:30 UTC 2025 x86_64
User : root ( 0)
PHP Version : 8.1.2-1ubuntu2.22
Disable Function : NONE
Domains : 2 Domains
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : OFF  |  Sudo : ON  |  Pkexec : ON
Directory :  /lib/python3/dist-packages/twisted/test/__pycache__/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME ]     [ BACKUP SHELL ]     [ JUMPING ]     [ MASS DEFACE ]     [ SCAN ROOT ]     [ SYMLINK ]     

Current File : /lib/python3/dist-packages/twisted/test/__pycache__/test_pcp.cpython-310.pyc
o

�b�0�@sPddd�ZddlmZddlmZGdd�d�ZGdd	�d	�ZGd
d�de�ZGdd
�d
ej�Z	Gdd�d�Z
Gdd�d�ZGdd�de
ej�ZGdd�de
ej�Z
Gdd�deej�ZGdd�deej�ZGdd�dej�ZGdd�dej�ZGdd�d�ZGd d!�d!eej�ZGd"d#�d#eej�ZGd$d%�d%ej�ZGd&d'�d'ej�Zd(S))z$Revision: 1.5 $�����)�pcp)�unittestc@s(eZdZdZdd�Zdd�Zdd�ZdS)	�DummyTransportz A dumb transport to wrap around.cCs
g|_dS�N)�_writes��self�r�7/usr/lib/python3/dist-packages/twisted/test/test_pcp.py�__init__%�
zDummyTransport.__init__cCs|j�|�dSr)r�append�r
�datarrr�write(�zDummyTransport.writecCsd�|j�S)N�)�joinrr	rrr�getvalue+szDummyTransport.getvalueN)�__name__�
__module__�__qualname__�__doc__r
rrrrrrr"s
rc@s8eZdZdZdZdZdd�Zdd�Zdd�Zdd	�Z	d
S)�
DummyProducerFcCs
||_dSr)�consumer)r
rrrrr
4rzDummyProducer.__init__cCsd|_d|_dS)NTF)�resumed�pausedr	rrr�resumeProducing7s
zDummyProducer.resumeProducingcC�
d|_dS�NT)rr	rrr�pauseProducing;rzDummyProducer.pauseProducingcCr r!)�stoppedr	rrr�
stopProducing>rzDummyProducer.stopProducingN)
rrrrr#rr
rr"r$rrrrr/src@s0eZdZdZdZdZdd�Zdd�Zdd	�ZdS)
�
DummyConsumerNFTcCs||f|_dSr)�producer)r
r&�	streamingrrr�registerProducerGszDummyConsumer.registerProducercCr r!)�unregisteredr	rrr�unregisterProducerJrz DummyConsumer.unregisterProducercCr r!)�finishedr	rrr�finishMrzDummyConsumer.finish)	rrrr&r+r)r(r*r,rrrrr%Bsr%c@s"eZdZejZdd�Zdd�ZdS)�TransportInterfaceTestscC�t�|_|�|j�|_dSr)r%�
underlying�
proxyClass�	transportr	rrr�setUpT�zTransportInterfaceTests.setUpcCs|j�d�dS�Nz
some bytes)r1rr	rrr�	testWriteXrz!TransportInterfaceTests.testWriteN)rrrr�BasicProducerConsumerProxyr0r2r5rrrrr-Qsr-c@s0eZdZdZdd�Zdd�Zdd�Zdd	�Zd
S)�ConsumerInterfaceTestz�Test ProducerConsumerProxy as a Consumer.

    Normally we have ProducingServer -> ConsumingTransport.

    If I am to go between (Server -> Shaper -> Transport), I have to
    play the role of Consumer convincingly for the ProducingServer.
    cCs&t�|_|�|j�|_t|j�|_dSr)r%r/r0rrr&r	rrrr2eszConsumerInterfaceTest.setUpcC�"|j�|jd�|�|jj�dSr!)rr(r&�assertFalserr	rrr�testRegisterPushj�z&ConsumerInterfaceTest.testRegisterPushcC�>|j�|jd�|j��d|j_|j��|�|jj�dS�NF)rr(r&r*rrr9r	rrr�testUnregistervs


z$ConsumerInterfaceTest.testUnregistercCr<r=)rr(r&r,rrr9r	rrr�
testFinish�s


z ConsumerInterfaceTest.testFinishN)rrrrr2r:r>r?rrrrr7\sr7c@sHeZdZdZdd�Zdd�Zdd�Zdd	�Zd
d�Zdd
�Z	dd�Z
dS)�ProducerInterfaceTestz�Test ProducerConsumerProxy as a Producer.

    Normally we have ProducingServer -> ConsumingTransport.

    If I am to go between (Server -> Shaper -> Transport), I have to
    play the role of Producer convincingly for the ConsumingTransport.
    cCr.r)r%rr0r&r	rrrr2�r3zProducerInterfaceTest.setUpcCs|�|jjd|j�dS)Nr)�assertEqualrr&r	rrr�testRegistersProducer�sz+ProducerInterfaceTest.testRegistersProducercC�,|j��|j�d�|�|j��d�dS)N�yakkity yakz*Paused producer should not have sent data.)r&r"rr9rrr	rrr�	testPause��


�zProducerInterfaceTest.testPausecCs6|j��|j��|j�d�|�|j��d�dS)NrD)r&r"rrrArrr	rrr�
testResume�s

z ProducerInterfaceTest.testResumecCs.|j��|j��|�t|jj�dd�dS)Nrz Resume triggered an empty write.)r&r"rrA�lenrrr	rrr�testResumeNoEmptyWrite�s


�z,ProducerInterfaceTest.testResumeNoEmptyWritecCs6|j��|j�d�|j��|�|j��d�dS)Nzbuffer this)r&r"rrrArrr	rrr�testResumeBuffer�s

z&ProducerInterfaceTest.testResumeBuffercCrC)NrDz+Stopped producer should not have sent data.)r&r$rr9rrr	rrr�testStop�rFzProducerInterfaceTest.testStopN)rrrrr2rBrErGrIrJrKrrrrr@�sr@c@�eZdZejZdS)�PCP_ConsumerInterfaceTestsN�rrrrr6r0rrrrrM��
rMc@rL)�PCPII_ConsumerInterfaceTestsN�rrrr�ProducerConsumerProxyr0rrrrrP�rOrPc@rL)�PCP_ProducerInterfaceTestsNrNrrrrrS�rOrSc@rL)�PCPII_ProducerInterfaceTestsNrQrrrrrT�rOrTc@s&eZdZdZejZdd�Zdd�ZdS)�ProducerProxyTestszAProducer methods on me should be relayed to the Producer I proxy.cCs,|�d�|_t|j�|_|j�|jd�dSr!)r0�proxyr�parentProducerr(r	rrrr2�szProducerProxyTests.setUpcC�|j��|�|jj�dSr)rVr$�
assertTruerWr#r	rrrrK��
zProducerProxyTests.testStopN)	rrrrrr6r0r2rKrrrrrU�s
rUc@s6eZdZdZejZdd�Zdd�Zdd�Z	dd	�Z
d
S)�ConsumerProxyTestszAConsumer methods on me should be relayed to the Consumer I proxy.cCr.r)r%r/r0rr	rrrr2�r3zConsumerProxyTests.setUpcCs"|j�d�|�|j��d�dSr4)rrrAr/rr	rrrr5�szConsumerProxyTests.testWritecCrXr)rr,rYr/r+r	rrrr?�rZzConsumerProxyTests.testFinishcCrXr)rr*rYr/r)r	rrrr>�rZz!ConsumerProxyTests.testUnregisterN)rrrrrr6r0r2r5r?r>rrrrr[�sr[c@s4eZdZdd�Zdd�Zdd�Zdd�Zd	d
�ZdS)�PullProducerTestcCs6t�|_|�|j�|_t|j�|_|j�|jd�dSr!)r%r/r0rVrrWr(r	rrrr2�szPullProducerTest.setUpcCs"|j�d�|�|j��d�dS)N�helloz+Pulling Consumer got data before it pulled.)rVrr9r/rr	rrr�testHoldWrites�s
�zPullProducerTest.testHoldWritescCs,|j�d�|j��|�|j��d�dS)Nr])rVrrrAr/rr	rrr�testPulls
zPullProducerTest.testPullcCsX|j�d�|j�d�|j��t|jj�}|�|dd|f�|�|j��d�dS)Nzhello �sunshine�z(Pull resulted in %d writes instead of 1.zhello sunshine)rVrrrHr/rrAr)r
�nwritesrrr�testMergeWritess
�z PullProducerTest.testMergeWritescCs,|j��|j�d�|�|j��d�dS)Nr)rVrrrAr/rr	rrr�
testLateWrites
zPullProducerTest.testLateWriteN)rrrr2r^r_rcrdrrrrr\�s
r\c@�eZdZGdd�dej�ZdS)�PCP_PullProducerTestsc@�eZdZdZdS)z PCP_PullProducerTests.proxyClassFN�rrr�iAmStreamingrrrrr0�r0NrNrrrrrf�rfc@re)�PCPII_PullProducerTestsc@rg)z"PCPII_PullProducerTests.proxyClassFNrhrrrrr0rjr0NrQrrrrrlrkrlc@sFeZdZdZejZdd�Zdd�Zdd�Z	dd	�Z
d
d�Zdd
�ZdS)�BufferedConsumerTestsz=As a consumer, ask the producer to pause after too much data.cC�>t�|_|�|j�|_d|j_t|j�|_|j�|jd�dS)N�dT�r%r/r0rV�
bufferSizerrWr(r	rrrr2)�
zBufferedConsumerTests.setUpcCr8r=)rVr(rWrYrr	rrr�testRegisterPull1r;z&BufferedConsumerTests.testRegisterPullcCrXr)rVr"r9rWrr	rrr�testPauseIntercept6rZz(BufferedConsumerTests.testPauseInterceptcCs&|j��|j��|�|jj�dSr)rVr"rr9rWrr	rrr�testResumeIntercept:s

z)BufferedConsumerTests.testResumeInterceptcCsT|j��|�|jjd�|j�d�|�|jjd�|j�d�|�|jj�dS)zMake sure I say "when." zdon't pause yet�3xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxN)rVr"r9rWrrrYr	rrr�testTriggerPauseBs
z&BufferedConsumerTests.testTriggerPausecCsR|j��|j�d�|�|jjd�|j��|�|jjd�|�|jj�dS)z3Make sure I resumeProducing when my buffer empties.�fxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxzshould be pausedzProducer should have resumed.N)	rVr"rrYrWrrr9�producerPausedr	rrr�testTriggerResumeMs

z'BufferedConsumerTests.testTriggerResumeN)
rrrrrrRr0r2rsrtrurwrzrrrrrm$srmc@s6eZdZGdd�dej�Zdd�Zdd�Zdd�Zd	S)
�BufferedPullTestsc@seZdZdZdd�ZdS)zBufferedPullTests.proxyClassFcCs$tj�||dd��tt|�d�S)Nro)rrR�_writeSomeData�minrHrrrrr|]sz+BufferedPullTests.proxyClass._writeSomeDataN)rrrrir|rrrrr0Zsr0cCrn)NroFrpr	rrrr2arrzBufferedPullTests.setUpcCs$d|j_|j��|�|jj�dSr=)rWrrVrrYr	rrr�testResumePullis
z BufferedPullTests.testResumePullcCs>|j��|j�d�|�|j��d�|�|jjdg�dS)N�idatumdatumdatumdatumdatumdatumdatumdatumdatumdatumdatumdatumdatumdatumdatumdatumdatumdatumdatumdatumdatum�ddatumdatumdatumdatumdatumdatumdatumdatumdatumdatumdatumdatumdatumdatumdatumdatumdatumdatumdatumdatum�datum)rVrrrAr/r�_bufferr	rrr�testLateWriteBufferingps
z(BufferedPullTests.testLateWriteBufferingN)	rrrrrRr0r2r~r�rrrrr{Ys
r{N)�__version__�twisted.protocolsr�
twisted.trialrrrr%�TestCaser-r7r@rMrPrSrTrUr[r\rfrlrmr{rrrr�<module>s(
.2%5

Anon7 - 2022
AnonSec Team