AnonSec Shell
Server IP : 209.38.156.173  /  Your IP : 216.73.216.122   [ Reverse IP ]
Web Server : Apache/2.4.52 (Ubuntu)
System : Linux lakekumayuhotel 5.15.0-136-generic #147-Ubuntu SMP Sat Mar 15 15:53:30 UTC 2025 x86_64
User : root ( 0)
PHP Version : 8.1.2-1ubuntu2.22
Disable Function : NONE
Domains : 2 Domains
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : OFF  |  Sudo : ON  |  Pkexec : ON
Directory :  /lib/python3/dist-packages/twisted/internet/test/__pycache__/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME ]     [ BACKUP SHELL ]     [ JUMPING ]     [ MASS DEFACE ]     [ SCAN ROOT ]     [ SYMLINK ]     

Current File : /lib/python3/dist-packages/twisted/internet/test/__pycache__/test_base.cpython-310.pyc
o

�bt5�@sndZddlZddlmZddlmZmZddlmZddl	m
Z
ddlmZddl
mZmZmZdd	lmZdd
lmZddlmZmZmZddlmZdd
lmZddlmZmZzddl Z!Wne"ykdZ Ynwe!Z e
ee�Gdd�d��Z#Gdd�de�Z$dd�Z%Gdd�d�Z&Gdd�de&e�Z'Gdd�de&e�Z(Gdd�de�Z)ee d�Gdd�de��Z*dS) z%
Tests for L{twisted.internet.base}.
�N)�Queue)�Any�Callable)�skipIf)�implementer)�FirstOneWins)�DelayedCall�ReactorBase�ThreadedResolver)�Deferred)�DNSLookupError)�IReactorThreads�IReactorTime�IResolverSimple)�Clock)�
ThreadPool)�SkipTest�TestCasec@sleZdZdZdd�Zdedeffdd�Zdd	�Zd
d�Z	dd
�Z
dd�Zdedeffdd�Zdd�Z
dS)�FakeReactorzl
    A fake reactor implementation which just supports enough reactor APIs for
    L{ThreadedResolver}.
    cs>t��_�jj�_t��_�j���fdd��_t��_dS)Ncs�jS�N)�_threadpool���selfr�A/usr/lib/python3/dist-packages/twisted/internet/test/test_base.py�<lambda>-sz&FakeReactor.__init__.<locals>.<lambda>)	r�_clock�	callLaterrr�start�
getThreadPoolr�_threadCallsrrrr�__init__'s

zFakeReactor.__init__�callable.cOs|j�|||f�dSr)r �put�rr"�args�kwargsrrr�callFromThread1szFakeReactor.callFromThreadcCs"|j��\}}}||i|��dSr)r �get)r�fr%r&rrr�_runThreadCalls4szFakeReactor._runThreadCallscCs|j��dSr)r�stoprrrr�_stop8szFakeReactor._stopcC�dSrrrrrr�getDelayedCalls;�zFakeReactor.getDelayedCallscCr-rrrrrr�seconds?r/zFakeReactor.secondscOr-rrr$rrr�callInThreadCr/zFakeReactor.callInThreadcCr-rr)r�sizerrr�suggestThreadPoolSizeGr/z!FakeReactor.suggestThreadPoolSizeN)�__name__�
__module__�__qualname__�__doc__r!rrr'r*r,r.r0r1r3rrrrr s
rc@�0eZdZdZdd�Zdd�Zdd�Zdd	�Zd
S)�ThreadedResolverTestsz(
    Tests for L{ThreadedResolver}.
    cs�d�d}d}t�}|�|j�g�g}��fdd�}|�td|�t|�}|�||f�}|�|j�|�	�|�
�|g�|�
|�g�|j�|d�|�
|jj
g�dS)	z�
        L{ThreadedResolver.getHostByName} returns a L{Deferred} which fires
        with the value returned by the call to L{socket.gethostbyname} in the
        threadpool of the reactor passed to L{ThreadedResolver.__init__}.
        z	10.0.0.17zfoo.bar.example.com�cs��|��Sr)�append��name��ip�lookedUprr�fakeGetHostByNameas
z=ThreadedResolverTests.test_success.<locals>.fakeGetHostByName�
gethostbyname�N)r�
addCleanupr,�patch�socketr
�
getHostByName�addCallbackr;r*�assertEqualr�advance�calls)rr=�timeout�reactor�
resolvedTorA�resolver�drr>r�test_successQs"z"ThreadedResolverTests.test_successcCs�d}t�}|�|j�dd�}|�td|�g}t|�}|�d|f�}|�|t�|�	|j
�|��|�t
|�d�|j�|d�|�|jjg�dS)z�
        L{ThreadedResolver.getHostByName} returns a L{Deferred} which fires a
        L{Failure} if the call to L{socket.gethostbyname} raises an exception.
        r:cSstd��)NzENOBUFS (this is a funny joke))�OSErrorr<rrrrA~�z=ThreadedResolverTests.test_failure.<locals>.fakeGetHostByNamerB�	some.namerCN)rrDr,rErFr
rG�
assertFailurerrHr;r*rI�lenrrJrK�rrLrMrA�
failedWithrOrPrrr�test_failuretsz"ThreadedResolverTests.test_failurecs�d}t�}|�|j�t���fdd�}|�td|�g}t|�}|�d|f�}|�|t	�|�
|j�|j�
|d�|�|g�|j�
d�|�t|�d���td��dS)	z�
        If L{socket.gethostbyname} does not complete before the specified
        timeout elapsed, the L{Deferred} returned by
        L{ThreadedResolver.getHostByName} fails with L{DNSLookupError}.
        �
cs����r)r(r<��resultrrrA�rSz=ThreadedResolverTests.test_timeout.<locals>.fakeGetHostByNamerBrTrCzThe I/O was errorfulN)rrDr,rrErFr
rGrUrrHr;rrJrIrVr#�IOErrorrWrr[r�test_timeout�s z"ThreadedResolverTests.test_timeoutcsg�tt�G�fdd�d��}Gdd�dt�}|�}|�}|�|�tt��}|j�|d�|j�|d�|j�|d�|j�|d�|j�|d�|�t	��d	�|�t
tt���t
gd	�|�d�d
�|�d�d�|�d�d
�|�d�d�|�d�d�dS)zm
        L{ThreadedResolver.getHostByName} is passed L{str}, encoded using IDNA
        if required.
        cseZdZd�fdd�	ZdS)zAThreadedResolverTests.test_resolverGivenStr.<locals>.FakeResolverrcs��|�t�Sr)r;r)rr=�timeouts�rKrrrG�s
zOThreadedResolverTests.test_resolverGivenStr.<locals>.FakeResolver.getHostByNameN)r)r4r5r6rGrr`rr�FakeResolver�srac@seZdZdd�ZdS)zFThreadedResolverTests.test_resolverGivenStr.<locals>.JustEnoughReactorcSr-rrrrrr�installWaker��zSThreadedResolverTests.test_resolverGivenStr.<locals>.JustEnoughReactor.installWakerN)r4r5r6rbrrrr�JustEnoughReactor�srdzexample.exampleuvääntynyt.exampleuрф.examplezxn----7sbb4ac0ad0be6cf.xn--p1ai�rrCzxn--vntynyt-5waa.example�zxn--p1ai.example��N)rrr	�installResolverrr�nameResolver�resolveHostNamerIrV�list�map�type�str)rrard�fakerM�recrr`r�test_resolverGivenStr�s(

z+ThreadedResolverTests.test_resolverGivenStrN)r4r5r6r7rQrYr^rrrrrrr9Ls#!r9cC�dS)z8
    Function used by L{DelayedCallTests.test_str}.
    Nrrrrr�nothing��rtc@s`eZdZdZdd�Zdd�Zdd�Zdd	�Zd
d�Zdd
�Z	dd�Z
dd�Zdd�Zdd�Z
dS)�DelayedCallMixin�
    L{DelayedCall}
    cCs dd�}t|dd�di||d�S)z�
        Get a L{DelayedCall} instance at a given C{time}.

        @param time: The absolute time at which the returned L{DelayedCall}
            will be scheduled.
        cSr-rr)�callrrr�noop�rcz0DelayedCallMixin._getDelayedCallAt.<locals>.noopcSr-rrrrrrr�ruz4DelayedCallMixin._getDelayedCallAt.<locals>.<lambda>rN)r)r�timeryrrr�_getDelayedCallAt�sz"DelayedCallMixin._getDelayedCallAtcCs|�d�|_|�d�|_dS)zb
        Create two L{DelayedCall} instanced scheduled to run at different
        times.
        rrCN)r{�zero�onerrrr�setUp�szDelayedCallMixin.setUpc	Cs:tdtdddidddd��}|�t|�dt|�f�dS)	��
        The string representation of a L{DelayedCall} instance, as returned by
        L{str}, includes the unsigned id of the instance, as well as its state,
        the function to be called, and the function arguments.
        ��rg�AreNcSrs�Ng�?rrrrrr�ruz+DelayedCallMixin.test_str.<locals>.<lambda>z?<DelayedCall 0x%x [10.5s] called=0 cancelled=0 nothing(3, A=5)>)rrtrIro�id�r�dcrrr�test_str�s��zDelayedCallMixin.test_strc	Cs4tdtdddidddd��}|�t|�t|��dS)z�
        The string representation of a L{DelayedCall} instance, as returned by
        {repr}, is identical to that returned by L{str}.
        �
)�r��	NcSrs)Ng�������?rrrrrrruz,DelayedCallMixin.test_repr.<locals>.<lambda>)rrtrIro�reprr�rrr�	test_reprszDelayedCallMixin.test_reprcCsJ|j|j}}|�||k�|�||k�|�||k�|�||k�dS)z�
        For two instances of L{DelayedCall} C{a} and C{b}, C{a < b} is true
        if and only if C{a} is scheduled to run before C{b}.
        N�r|r}�
assertTrue�assertFalse�rr|r}rrr�test_lt�
zDelayedCallMixin.test_ltcCsJ|j|j}}|�||k�|�||k�|�||k�|�||k�dS)z�
        For two instances of L{DelayedCall} C{a} and C{b}, C{a <= b} is true
        if and only if C{a} is scheduled to run before C{b} or at the same
        time as C{b}.
        Nr�r�rrr�test_le�
zDelayedCallMixin.test_lecCsJ|j|j}}|�||k�|�||k�|�||k�|�||k�dS)z�
        For two instances of L{DelayedCall} C{a} and C{b}, C{a > b} is true
        if and only if C{a} is scheduled to run after C{b}.
        Nr�r�rrr�test_gt"r�zDelayedCallMixin.test_gtcCsJ|j|j}}|�||k�|�||k�|�||k�|�||k�dS)z�
        For two instances of L{DelayedCall} C{a} and C{b}, C{a > b} is true
        if and only if C{a} is scheduled to run after C{b} or at the same
        time as C{b}.
        Nr�r�rrr�test_ge-r�zDelayedCallMixin.test_gecCs:|�|j|jk�|�|j|jk�|�|j|jk�dS)zD
        A L{DelayedCall} instance is only equal to itself.
        N)r�r|r}r�rrrr�test_eq9�zDelayedCallMixin.test_eqcCs:|�|j|jk�|�|j|jk�|�|j|jk�dS)zM
        A L{DelayedCall} instance is not equal to any other object.
        N)r�r|r}r�rrrr�test_neCr�zDelayedCallMixin.test_neN)r4r5r6r7r{r~r�r�r�r�r�r�r�r�rrrrrv�s


rvc@� eZdZdZdd�Zdd�ZdS)�DelayedCallNoDebugTestsrwcC�|�tdd�t�|�dS)z!
        Turn debug off.
        �debugFN�rErrvr~rrrrr~S�zDelayedCallNoDebugTests.setUpc	C�>tdtdddidddd��}d�t|��}|�t|�|�dS)	rr�r�r�reNcSrsr�rrrrrr`ruz2DelayedCallNoDebugTests.test_str.<locals>.<lambda>zA<DelayedCall 0x{:x} [10.5s] called=0 cancelled=0 nothing(3, A=5)>)rrt�formatr�rIro)rr��expectedrrrr�Zs

�z DelayedCallNoDebugTests.test_strN�r4r5r6r7r~r�rrrrr�N�r�c@r�)�DelayedCallDebugTestsrwcCr�)z 
        Turn debug on.
        r�TNr�rrrrr~mr�zDelayedCallDebugTests.setUpc	Cr�)	rr�r�r�reNcSrsr�rrrrrrzruz0DelayedCallDebugTests.test_str.<locals>.<lambda>z\<DelayedCall 0x{:x} \[10.5s\] called=0 cancelled=0 nothing\(3, A=5\)

traceback at creation:)rrtr�r��assertRegexro)rr��expectedRegexprrrr�ts

�zDelayedCallDebugTests.test_strNr�rrrrr�hr�r�c@seZdZdZdd�ZdS)�TestSpySignalCapturingReactorza
    Subclass of ReactorBase to capture signals delivered to the
    reactor for inspection.
    cCrs)z*
        Required method, unused.
        Nrrrrrrb�ruz*TestSpySignalCapturingReactor.installWakerN)r4r5r6r7rbrrrrr��sr�zsignal module not availablec@r8)�ReactorBaseSignalTestszE
    Tests to exercise ReactorBase's signal exit reporting path.
    cCst�}|�d|j�dS)zI
        The default value of the _exitSignal attribute is None.
        N)r��assertIs�_exitSignal�rrMrrr�test_exitSignalDefaultsToNone�sz4ReactorBaseSignalTests.test_exitSignalDefaultsToNonecC�(t�}|�tjd�|�tj|j�dS)zn
        ReactorBase's SIGINT handler saves the value of SIGINT to the
        _exitSignal attribute.
        N)r��sigInt�signal�SIGINT�assertEqualsr�r�rrr�test_captureSIGINT��z)ReactorBaseSignalTests.test_captureSIGINTcCr�)zp
        ReactorBase's SIGTERM handler saves the value of SIGTERM to the
        _exitSignal attribute.
        N)r��sigTermr��SIGTERMr�r�r�rrr�test_captureSIGTERM�r�z*ReactorBaseSignalTests.test_captureSIGTERMcCs:ttd�s	td��t�}|�tjd�|�tj|j�dS)zr
        ReactorBase's SIGBREAK handler saves the value of SIGBREAK to the
        _exitSignal attribute.
        �SIGBREAKz$signal module does not have SIGBREAKN)�hasattrr�rr��sigBreakr�r�r�r�rrr�test_captureSIGBREAK�s

z+ReactorBaseSignalTests.test_captureSIGBREAKN)r4r5r6r7r�r�r�r�rrrrr��s		r�)+r7rF�queuer�typingrr�unittestr�zope.interfacer�twisted.internet._resolverr�twisted.internet.baserr	r
�twisted.internet.deferr�twisted.internet.errorr�twisted.internet.interfacesr
rr�twisted.internet.taskr�twisted.python.threadpoolr�twisted.trial.unittestrrr��_signal�ImportErrorrr9rtrvr�r�r�r�rrrr�<module>s>�+r



Anon7 - 2022
AnonSec Team