
    bi,                        d Z ddlZddlmZ ddlmZmZmZmZm	Z	m
Z
mZmZmZmZ ddlZddlZddlZddlZddlmZ ddlmZ ddlmZ dd	lmZmZ dd
lm Z m!Z!m"Z" ddl#m$Z$  ejJ                  d      Z&eee'ef      Z(g dZ) G d de       Z*edejV                  dejX                  de
ejV                  e'f   fd       Z-edejV                  ddde
ejV                  df   fd       Z-dejV                  de	ejX                     de
ejV                  e	e'   f   fdZ-de'dz  dejV                  dejX                  dejX                  de	ejX                     de	ejX                     de
ejV                  ejX                  ejX                  e	ejX                     e	ejX                     f   fdZ.dee   dee'ee   f   fdZ/de(dee'ee   f   fdZ0d e	e(   de1fd!Z2d"e1dejf                  fd#Z4dd$d%e	e   d&e	eeee   f      d'e	e   d(e5d)e6d e	e(   d*e6d+e1d,e	e6   d-e	e!   de"fd.Z7d%e	e   d&e	eeee   f      d'e	e   d(e5d)e6d+e1d e	e(   de!fd/Z8y)0z)Copyright 2019-2025, XGBoost contributors    N)Sequence)
AnyCallableDictListOptionalTupleTypeVarUnioncastoverload)	dataframe   )
collective)FeatureNames)concatimport_cupy)DataIterDMatrixQuantileDMatrix)
is_on_cudaz[xgboost.dask])labelweightbase_marginqidlabel_lower_boundlabel_upper_boundc                        e Zd ZdZ	 	 	 ddee   dee   deeeee   f      dee   deee      ddf fd	Z	d
e
dee   fdZdefdZddZdedefdZ xZS )DaskPartitionIterz.A data iterator for the `DaskQuantileDMatrix`.Ndatafeature_namesfeature_typesfeature_weightskwargsreturnc           	      F   t         t        d       f}|| _        t        D ]7  }t	        | ||j                  |d              t        t        | |      |      r7J  || _        || _	        || _
        t        | j                  t               sJ d| _        t        | 5  d       y )Nr   T)release_data)r   type_datametasetattrget
isinstancegetattr_feature_names_feature_types_feature_weights_itersuper__init__)	selfr    r!   r"   r#   r$   typesk	__class__s	           L/home/cdr/jupyterlab/.venv/lib/python3.12/site-packages/xgboost/dask/data.pyr4   zDaskPartitionIter.__init__1   s     4:&
 	7AD!VZZ401gdA.666	7
 ,+ /$**h///
d+    attrc                 N    t        | |      t        | |      | j                     S y N)r.   r2   )r5   r;   s     r9   _getzDaskPartitionIter._getJ   s(    4*4&tzz22r:   c                 4    | j                   | j                     S )z5Utility function for obtaining current batch of data.)r)   r2   r5   s    r9   r    zDaskPartitionIter.dataO   s    zz$**%%r:   c                     d| _         y)zReset the iteratorr   N)r2   r@   s    r9   resetzDaskPartitionIter.resetS   s	    
r:   
input_datac           	      8   | j                   t        | j                        k(  ryt        D ci c]  }|| j	                  |       }} |d| j                         d| j                  | j                  | j                  d| | xj                   dz  c_         yc c}w )zYield next batch of dataFN)r    groupr!   r"   r#      T )	r2   lenr)   r*   r>   r    r/   r0   r1   )r5   rC   r7   r$   s       r9   nextzDaskPartitionIter.nextW   s    ::TZZ(+/0a!TYYq\/00 	
---- 11	
 	
 	

a
 1s   B)NNN)r%   N)__name__
__module____qualname____doc__r   r   r   r   r   r4   strr>   r    rB   r   boolrI   __classcell__)r8   s   @r9   r   r   .   s    8
 159=)-,3i,  -,  c49n 56	,
 "#, 49%, 
,2 # 
&c &x D r:   r   dfcolr%   c                      y r=   rG   rQ   rR   s     r9   _add_columnrU   j   s    ORr:   c                      y r=   rG   rT   s     r9   rU   rU   n   s    KNr:   c                     || |fS d}|j                    d| }|| j                  v r%|dz  }|j                    d| }|| j                  v r% | j                  di ||i} | |fS )Nr   _rF   rG   )namecolumnsassign)rQ   rR   trailsuids       r9   rU   rU   r   s     {3wFXXJax
 C

!
!F8$ 
 
	 c3Z	 Bs7Nr:   devicer   ysample_weightr   c                    t        |      \  }t        |      \  }t        |      \  }t        |      \  }	| | dk(  rdnd}
t        j                  j                  d|
i      5  j	                         |   j                  d      j                  j                         j                  j                  |<   j                  |      j                  |      |   j                         }|j                  j                         j                  j                         }t!        |      }t#        ||d   d	z   gz         }j%                  |d
|      j	                         ddd       |   }|   }fd||	fD        \  }}||||	fD cg c]  }||	 }}j'                  |d	      j	                         ||||fS # 1 sw Y   axY wc c}w )zA function to prevent query group from being scattered to different
    workers. Please see the tutorial in the document for the implication for not having
    partition boundary based on query groups.

    Ncpup2ptaskszdataframe.shuffle.methodcategory)byrF   F)drop	divisionsc              3   `   K   | ]%  }|t        t        j                  |         nd  ' y wr=   )r   ddSeries).0r]   rQ   s     r9   	<genexpr>z!no_group_split.<locals>.<genexpr>   s.      "BECORYY3 ="s   +.)axis)rU   daskconfigsetpersistastypecatas_knowncodessort_valuesgroupbycountindexcomputevaluestolistsortedtuple	set_indexrh   )r^   rQ   r   r_   r`   r   qid_uidy_uidw_uidbm_uidshufflecntdivr]   uidss    `             r9   no_group_splitr      s     b#&KBB"IBB.IBR-JB ~5egG	4g>	? ZZ\k((488AACGGMM7^^w^'jj!'*002ii!((//1SkC3r7Q;-'(\\  
 ')	 	" W+C
5	A"JOQW"M; $UE6:NCcoCNDN	A		&	&	(BsA}k113 . Os   *C<GGGGr$   c                       j                  d      }|J t        |      }t        |d         rddlm nddlm dt        dt        t        t        f   f fddt        dt        j                  ffd}t        |      D cg c]
  } ||       }}t        |      }|j                  j                  r S t        j!                  d	t#        j$                         |       t        |      }t        |      r&t'               }|j)                  |j                        }	nt+        j(                  |j                        }	|j,                  |	ddf   }t/        |d
      r|j,                  |	ddf   }n	||	ddf   } j1                  d|gi       t3        |j4                        D ]"  \  }}
|
 v sJ  j1                  |
||
   gi       $  S c c}w )z>Sort worker-local data by query ID for learning to rank tasks.r    Nr   )	DataFrameir%   c           
          dt         t        t              dt         t           f fd}t        D ci c]  }| |j                  |d             }}|j                         D ci c]  \  }}|	|| }}}|S c c}w c c}}w )zDReturn a dictionary containing all the meta info and all partitions.r;   r%   c                     | |    S y r=   rG   )r;   r   s    r9   r>   z0sort_data_by_qid.<locals>.get_dict.<locals>._get   s    Awr:   N)r   r   r   listr*   r,   items)r   r>   rY   data_optr7   vr    r$   s   `      r9   get_dictz"sort_data_by_qid.<locals>.get_dict   s    	xS	* 	x~ 	
 DHH4D$vzz$566HH!)!1CAQ]1CC ICs    A<'
B2Bc                 $     |       } |      S r=   rG   )r   r    r   r   s     r9   map_fnz sort_data_by_qid.<locals>.map_fn   s    {r:   a  [r%d]: Sorting data with %d partitions for ranking. This is a costly operation and will increase the memory usage significantly. To avoid this warning, sort the data based on qid before passing it into XGBoost. Alternatively, you can use set the `allow_group_split` to False.iloc)r,   rH   r   cudfr   pandasintr   rN   r   pdranger   r   is_monotonic_increasingLOGGERwarningcollget_rankr   argsortnpr   hasattrupdate	enumeraterZ   )r$   
data_partsn_partsr   r   
meta_partsdfqdfxcp
sorted_idxcr   r   s   `          @@r9   sort_data_by_qidr      s   F#J!!!*oG*Q- "$C DdO # ",,  &+7^4&)4J4

C
ww&&
NN	T 	" 
C#]ZZ(
ZZ(

((:q=
!CsFhhz1}%*a- 
MM6C5/"#++& %1F{{q3q6(m$% MS 5s   Glist_of_partsc                      t         t              sJ i dt        dt        dd f fd}t	               D ]"  \  }} ||d       t
        D ]  } |||        $ j                  dd       }|t        di S )Nr   rY   r%   c                 j    ||    v r	|    |   }nd }||vrg |<   |   j                  |       y y r=   )append)r   rY   partr   results      r9   r   z!_get_worker_parts.<locals>.append  sR    =## #D)DD6!!t4L% r:   r    r   rG   )r-   r   r   rN   r   r*   r,   r   )r   r   r   rX   r7   r   r   s   `     @r9   _get_worker_partsr      s    mT***#%F&# &S &T & -( 1q& 	A1aL	
 **UD
!C
!+F+Mr:   partsc                    | t        | d   j                  d            }nd}t        t        j                  t        j                  |gt
        j                        t        j                  j                        d         }|S )Nr   r    F)dtype)
r   r,   rO   r   	allreducer   arrayint32OpMAX)r   is_cudas     r9   _get_is_cudar     s]    U1X\\&124>>"((G9BHH"Etww{{STUVWGNr:   r   c                 n    | rt               }|j                  d      }|S t        j                  d      }|S )N)r   r   )r   emptyr   )r   r   r   s      r9   _make_emptyr      s6    ]  L  Lr:   )refr!   r"   r#   missingnthreadmax_binenable_categoricalmax_quantile_batchesr   c        
   	         t        j                         }
t        |      }|;t        j	                  d|
j
                         t        t        |      | |||	||      S t        di t        |      || |d}t        |||||	||      S )NzWorker %s has an empty DMatrix.)r!   r"   r   r   r   r   )r"   r!   r#   )r   r   r   r   r   r   rG   )
distributed
get_workerr   r   r   addressr   r   r   r   )r!   r"   r#   r   r   r   r   r   r   r   workerr   its                r9   _create_quantile_dmatrixr   )  s     ##%F5!G}8&..I ''1!5
 	
 
 

E
"
##'	
B 
-1 r:   c                    t        j                         }|}t        |      }	|?d|j                   d}
t        j                  |
       t        t        |	      | ||      }|S t        d      }dt        t        |      dt        |   fd}t        |      }i }|j                         D ]  \  }} ||      }|||<    t        d	i ||| ||||d}|S )
zdGet data that local to worker from DaskDMatrix.

    Returns
    -------
    A DMatrix object.

    zWorker z has an empty DMatrix.)r!   r"   r   Tr    r%   c                 >    t        d | D              ry t        |       S )Nc              3   $   K   | ]  }|d u  
 y wr=   rG   )rm   r   s     r9   rn   z:_create_dmatrix.<locals>.concat_or_none.<locals>.<genexpr>x  s     -tt|-s   )anyr   )r    s    r9   concat_or_nonez'_create_dmatrix.<locals>.concat_or_nonew  s    ---d|r:   )r   r!   r"   r   r   r#   rG   )r   r   r   r   r   r   r   r   r
   r   r   r   r   )r!   r"   r#   r   r   r   r   r   r   r   msgXyr   r   unzipped_dictconcated_dictkeyvaluer   s                      r9   _create_dmatrixr   U  s   " ##%FM5!G''=>s ''1	
 	AXhqk2 x{ 
 &m4M$&M#))+ 
U5!c 
 


##-'
B Ir:   )9rM   loggingcollections.abcr   typingr   r   r   r   r   r	   r
   r   r   r   rp   r   numpyr   r   r   r   rk    r   r   _typingr   compatr   r   corer   r   r   r    r   	getLoggerr   rN   
_DataPartsr*   r   r   rl   rU   r   r   r   rO   r   ndarrayr   floatr   r   r   rG   r:   r9   <module>r      s@   /  $         ! " ( 5 5 			+	,$sCx.!
9 9x 
 RBLL Rryy RU2<<;L5M R 
 R 
 NBLL Nt NbllD6H0I N 
 N
#BII.
2<<#&' 02$J02
02 
02 
yy	02
 BII&02 "))$02 LL"))RYY(;Xbii=PP02fEtCy ET#tCy.-A EPZ Dd3i4H 2,   "** ( ")L)) E#tCy.12) c]	)
 ) ) J) ) ) #3-) 
'	) )X6L)6 E#tCy.126 c]	6
 6 6 6 J6 6r:   