
    bi6                        d dl Z d dlZd dlZd dlZd dlmZ d dlmZ d dlZd dlm	Z	 ddl
mZmZmZ  e       r
d dlZd dlmZ  e       rd dlmZ d d	lmZ  e       rd d
lmZ  ej,                  e      Z G d d      Zedk(  r{d dlmZ  e       Zej;                          ej=                  ddgdd e             Z e de       d dl!m"Z"  e"jF                  d      jI                  d      Z%ejM                  e%       yy)    N)Optional)urlparse)nn   )is_requests_availableis_vllm_ascend_availableis_vllm_available)ConnectionError)PyNcclCommunicator)StatelessProcessGroup)PyHcclCommunicatorc                      e Zd ZdZ	 	 	 	 	 d dee   dedededef
dZd!d	ed
efdZ		 	 	 	 	 	 	 	 	 d"de
e   dedededededededee   dee   de
e
e      fdZd Zdedej                  fdZdej$                  fdZd Zd Zy)#
VLLMClientau	  
    A client class to interact with a vLLM server.

    This class provides methods to generate completions, initialize and manage weight update groups, and update model
    weights in a distributed setting. Before using it, start the vLLM server with `trl vllm-serve`.

    Args:
        base_url (`str` or `None`, *optional*, defaults to `None`):
            Base URL for the vLLM server (e.g., `"http://localhost:8000"`). If provided, `host` and `server_port` are
            ignored.
        host (`str`, *optional*, defaults to `"0.0.0.0"`):
            IP address of the vLLM server. Ignored if `base_url` is provided.
        server_port (`int`, *optional*, defaults to `8000`):
            Port number of the vLLM server. Ignored if `base_url` is provided.
        group_port (`int`, *optional*, defaults to `51216`):
            Port number for the weight update group.
        connection_timeout (`float`, *optional*, defaults to `0.0`):
            Total timeout duration in seconds to wait for the server to be up. If the server is not up after the
            timeout, a `ConnectionError` is raised.

    Examples:
        Run the vLLM server with the model `Qwen/Qwen2.5-7B`:

        ```
        $ trl vllm-serve --model Qwen/Qwen2.5-7B
        ...
        INFO:     Application startup complete.
        INFO:     Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit)
        ```

        Use the client to generate completions and update model weights:

        ```python
        >>> from trl.extras.vllm_client import VLLMClient

        >>> client = VLLMClient()
        >>> client.generate(["Hello, AI!", "Tell me a joke"])
        [[2980, 498, 1492, 752, 448, 264, 13027, 8645, 30, 358, 2776, 4460, 311, 3270, 264, 2025],
         [911, 7988, 1251, 382, 3838, 653, 498, 1618, 4325, 879, 2581, 20027, 264, 21428, 30, 362]]

        >>> from transformers import AutoModelForCausalLM

        >>> model = AutoModelForCausalLM.from_pretrained("Qwen/Qwen2.5-7B", device_map="cuda")
        >>> client.init_communicator()
        >>> client.update_model_params(model)
        ```

        There are several ways to initialize the client:

        ```python
        VLLMClient(base_url="http://localhost:8000")
        VLLMClient(base_url="http://192.168.1.100:8000")
        VLLMClient(host="localhost", server_port=8000)
        VLLMClient(host="192.168.1.100", server_port=8000)
        ```
    Nbase_urlhostserver_port
group_portconnection_timeoutc                    t               st        d      t               st        d      t        j                         | _        |bt        |      }t        j                  |j                        | _
        |j                  xs d}| d|j                   |j                   | _        n/|| _
        || _        d| j                   d| j                   | _        || _        | j#                  |       y )NzIrequests is not installed. Please install it with `pip install requests`.zAvLLM is not installed. Please install it with `pip install vllm`.httpz://zhttp://:)r   ImportErrorr	   requestsSessionsessionr   socketgethostbynamehostnamer   schemenetlocpathr   r   r   check_server)selfr   r   r   r   r   
parsed_urlr   s           Q/home/cdr/jupyterlab/.venv/lib/python3.12/site-packages/trl/extras/vllm_client.py__init__zVLLMClient.__init__f   s     %&ijj "abb'')!(+J,,Z-@-@ADI&&0&F%hc**;*;)<Z__<MNDMDI*D%dii[$2B2B1CDDM$,-    total_timeoutretry_intervalc                 (   | j                    d}t        j                         }	 	 t        j                  |      }|j                  dk(  r8d|j
                  v r|j
                  d   | _        t        j                  d       yt        j                  d	| d
       t        j                  |       # t        j                  j                  $ rC}t        j                         |z
  }||k\  rt        d| j                    d| d      |Y d}~d}~ww xY w)a  
        Check server availability with retries on failure, within a total timeout duration. If the server is not up
        after the total timeout duration, raise a `ConnectionError`.

        Args:
            retry_interval (`float`, *optional*, defaults to `2.0`):
                Interval in seconds between retries.
            total_timeout (`float`, *optional*, defaults to `0.0`):
                Total timeout duration in seconds.
        z/health/   zX-Forwarded-ForzServer is up!Nz$The vLLM server can't be reached at z after zF seconds. Make sure the server is running by running `trl vllm-serve`.z"Server is not up yet. Retrying in z seconds...)r   timer   getstatus_codeheadersr   loggerinfo
exceptionsRequestExceptionr
   sleep)r#   r(   r)   url
start_timeresponseexcelapsed_times           r%   r"   zVLLMClient.check_server   s    x(YY[
 #<<, ''3.(H,<,<<$,$4$45F$G	KK0 KK<^<LKXYJJ~&)  &&77 #yy{Z7=0)>t}}oWUbTc dR R  1s   B1 1D9DDpromptsnrepetition_penaltytemperaturetop_ptop_kmin_p
max_tokensguided_decoding_regexgeneration_kwargsreturnc                    | j                    d}| j                  j                  ||||||||||	|
xs i d
      }|j                  dk(  r|j	                         d   S t        d|j                   d|j                         )a  
        Generates model completions for the provided prompts.

        Args:
            prompts (`list[str]`):
                List of text prompts for which the model will generate completions.
            n (`int`, *optional*, defaults to `1`):
                Number of completions to generate for each prompt.
            repetition_penalty (`float`, *optional*, defaults to `1.0`):
                Parameter for repetition penalty. 1.0 means no penalty.
            temperature (`float`, *optional*, defaults to `1.0`):
                Temperature parameter for sampling. Higher values increase diversity.
            top_p (`float`, *optional*, defaults to `1.0`):
                Top-p sampling parameter.`1.0` means no truncation.
            top_k (`int`, *optional*, defaults to `-1`):
                Top-k sampling parameter. `-1` means no truncation.
            min_p (`float`, *optional*, defaults to `0.0`):
                Minimum probability for sampling.
            max_tokens (`int`, *optional*, defaults to `16`):
                Maximum number of tokens to generate for each prompt.
            guided_decoding_regex (`str` or `None`, *optional*, defaults to `None`):
                Regular expression to guide the decoding process.
            generation_kwargs (`dict` or `None`, *optional*, defaults to `None`):
                Additional generation parameters to pass to the vLLM `SamplingParams`. This can include parameters like
                `seed`, `frequency_penalty`, etc. If it contains keys that conflict with the other parameters, they
                will override them.

        Returns:
            `list[list[int]]`:
                List of lists of token IDs representing the model-generated completions for each prompt.
        z
/generate/)
r:   r;   r<   r=   r>   r?   r@   rA   rB   rC   jsonr+   completion_idsRequest failed: , )r   r   postr.   rG   	Exceptiontext)r#   r:   r;   r<   r=   r>   r?   r@   rA   rB   rC   r5   r7   s                r%   generatezVLLMClient.generate   s    X z*<<$$"&8*()>%6%<" % 
 3&==?#344.x/C/C.DBx}}oVWWr'   c                    | j                    d}t        j                  |      }|j                  dk(  r|j	                         d   }n%t        d|j                   d|j                         |dz   }|| _        | j                    d}| j                  j                  |d| j                  |d	
      }|j                  dk7  r%t        d|j                   d|j                         t        j                  d       t        j                  | j                  | j                  | j                  |      }t!        |d      | _        t%        j&                  | j(                         y)zg
        Initializes the weight update group in a distributed setup for model synchronization.
        z/get_world_size/r+   
world_sizerI   rJ      z/init_communicator/0.0.0.0)r   portrP   rF   g?)r   rS   rankrP   r   )deviceN)r   r   r-   r.   rG   rL   rM   rT   r   rK   r   r,   r4   r   creater   r   pynccl_commatexitregisterclose_communicator)r#   r5   r7   vllm_world_sizerP   pgs         r%   init_communicatorzVLLMClient.init_communicator   s8   
 /0<<$3&&mmol;O.x/C/C.DBx}}oVWW$q(
#	 23<<$$S	4??jt/u$v3&.x/C/C.DBx}}oVWW
 	

3 #))tyytUYU^U^kuv-b; 	//0r'   nameweightsc                    t        |j                        t        |j                        }}| j                   d}| j
                  j                  ||||d      }|j                  dk7  r%t        d|j                   d|j                         | j                  j                  || j                         | j                  j                  j                          y)	a0  
        Updates a specific named parameter in the model and broadcasts it to other processes.

        Args:
            name (`str`):
                Name of the layer whose weights are being updated.
            weights (`torch.Tensor`):
                Tensor containing the updated weights.
        z/update_named_param/)r^   dtypeshaperF   r+   rI   rJ   )srcN)strra   tuplerb   r   r   rK   r.   rL   rM   rW   	broadcastrT   groupbarrier)r#   r^   r_   ra   rb   r5   r7   s          r%   update_named_paramzVLLMClient.update_named_param	  s     7==)5+?u34<<$$SuW\/]$^3&.x/C/C.DBx}}oVWW 	""7		":&&(r'   modelc                 l    |j                         D ]!  \  }}| j                  ||j                         # y)z
        Updates all parameters of the given model by calling `update_named_param` for each parameter in the model.

        Args:
            model (`nn.Module`):
                Model whose parameters (weights/biases) are to be updated.
        N)named_parametersri   data)r#   rj   r^   params       r%   update_model_paramszVLLMClient.update_model_params  s4     !113 	6KD%##D%**5	6r'   c                     | j                    d}| j                  j                  |      }|j                  dk7  r%t	        d|j                   d|j
                         y)z8
        Resets the prefix cache for the model.
        z/reset_prefix_cache/r+   rI   rJ   N)r   r   rK   r.   rL   rM   r#   r5   r7   s      r%   reset_prefix_cachezVLLMClient.reset_prefix_cache)  s`     34<<$$S)3&.x/C/C.DBx}}oVWW 'r'   c                     | j                    d}	 | j                  j                  |      }|j                  dk7  r%t	        d|j                   d|j
                         y# t        $ r Y yw xY w)zW
        Closes the weight update group and cleans up the communication group.
        z/close_communicator/r+   rI   rJ   N)r   r   rK   r.   rL   rM   r
   rq   s      r%   rZ   zVLLMClient.close_communicator2  s|     34	\||((-H
 ##s*"283G3G2H8==/ Z[[ +	  		s   A! !	A-,A-)NrR   i@  i          )rt   g       @)	rQ         ?ru   ru   rt      NN)__name__
__module____qualname____doc__r   rd   intfloatr&   r"   listdictrN   r]   torchTensorri   r   Modulero   rr   rZ    r'   r%   r   r   ,   sN   7v #'$'.3-. . 	.
 . ".8"'% "'u "'N $' /3,0?Xc?X ?X "	?X
 ?X ?X ?X ?X ?X  (}?X $D>?X 
d3i?XB 1D)s )U\\ )(
6 
6X\r'   r   __main__)SamplingParamsz
Hello, AI!zTell me a joke       )r;   rA   sampling_paramsz
Responses:)AutoModelForCausalLMzQwen/Qwen2.5-7Bcuda)'rX   loggingr   r,   typingr   urllib.parser   r   r   import_utilsr   r   r	   r   r
   ,vllm.distributed.device_communicators.pyncclr   vllm.distributed.utilsr   3vllm_ascend.distributed.device_communicators.pyhcclr   	getLoggerrx   r0   r   vllmr   clientr]   rN   	responsesprinttransformersr   from_pretrainedtorj   ro   r   r'   r%   <module>r      s         !   ] ] ( O<!p 
		8	$S\ S\n z#\F
 /? @ARTftfvwI	,	" 20 001BCFFvNE
u% r'   