Server IP : 209.38.156.173 / Your IP : 216.73.216.122 [ Web Server : Apache/2.4.52 (Ubuntu) System : Linux lakekumayuhotel 5.15.0-136-generic #147-Ubuntu SMP Sat Mar 15 15:53:30 UTC 2025 x86_64 User : root ( 0) PHP Version : 8.1.2-1ubuntu2.22 Disable Function : NONE Domains : 2 Domains MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : OFF | Sudo : ON | Pkexec : ON Directory : /lib/python3/dist-packages/twisted/application/test/__pycache__/ |
Upload File : |
o �bO� � @ st d Z ddlZddlmZ ddlmZ ddlmZ ddlm Z m Z mZ ddlm Z ddlmZmZ dd lmZmZmZmZmZ dd lmZmZ ddlmZ ddlmZmZ dd lm Z ddl!m"Z" ddl#m$Z$m%Z% dd� Z&ee�G dd� d��Z'eee'� ee�G dd� d��Z(eee'� G dd� de%�Z)G dd� de%�Z*G dd� d�Z+d$dd�Z,efdd �Z-d!Z.G d"d#� d#e$�Z/dS )%z� Tests for (new code in) L{twisted.application.internet}. @var AT_LEAST_ONE_ATTEMPT: At least enough seconds for L{ClientService} to make one attempt. � N)�implementer)�verifyClass)�internet)� ClientService�StreamServerEndpointService�TimerService)�task)�CancelledError�Deferred)�IFileDescriptorReceiver�IHalfCloseableProtocol�IListeningPort�IStreamClientEndpoint�IStreamServerEndpoint)�Factory�Protocol)�Clock)�formatEvent�globalLogPublisher)�Failure)�StringTransport)�SynchronousTestCase�TestCasec C s dS )zM A fake target function for testing TimerService which does nothing. N� r r r �H/usr/lib/python3/dist-packages/twisted/application/test/test_internet.py�fakeTargetFunction) s r c @ sF e Zd ZdZdZdZdZe� ZdZ dd� Z dd� Zdd � Zd d� Z dS )� FakeServeraq In-memory implementation of L{IStreamServerEndpoint}. @ivar result: The L{Deferred} resulting from the call to C{listen}, after C{listen} has been called. @ivar factory: The factory passed to C{listen}. @ivar cancelException: The exception to errback C{self.result} when it is cancelled. @ivar port: The L{IListeningPort} which C{listen}'s L{Deferred} will fire with. @ivar listenAttempts: The number of times C{listen} has been invoked. @ivar failImmediately: If set, the exception to fail the L{Deferred} returned from C{listen} before it is returned. Nr c C s t � | _d S �N)�FakePort�port��selfr r r �__init__L s zFakeServer.__init__c sF � j d7 _ |� _t� fdd�d�� _� jdur � j�� j� � jS )z� Return a Deferred and store it for future use. (Implementation of L{IStreamServerEndpoint}). @param factory: the factory to listen with @return: a L{Deferred} stored in L{FakeServer.result} � c s | � � j�S r )�errback�cancelException)�dr r r �<lambda>Z � z#FakeServer.listen.<locals>.<lambda>)� cancellerN)�listenAttempts�factoryr �result�failImmediatelyr$ )r! r+ r r r �listenO s zFakeServer.listenc C s | j �| j� dS )z� Test code should invoke this method after causing C{listen} to be invoked in order to fire the L{Deferred} previously returned from C{listen}. N)r, �callbackr r r r r �startedListening_ � zFakeServer.startedListeningc C s | j j�d� dS )a Test code should invoke this method after causing C{stopListening} to be invoked on the port fired from the L{Deferred} returned from C{listen} in order to cause the L{Deferred} returned from C{stopListening} to fire. N)r �deferredr/ r r r r �stoppedListeningg s zFakeServer.stoppedListening)�__name__� __module__�__qualname__�__doc__r, r+ r- r r% r* r"