
    bi                        d dl mZ d dlZd dlZd dlZd dlZd dlmZ d dl	m
Z
 d dlmZmZ  ej                  e      Z G d d      ZddZd	 Zd
 ZddZd Zd Zd Zej2                  d        Zd Zd Zy)    )annotationsN)config)PANDAS_GE_300)group_split_dispatchhash_object_dispatchc                  $    e Zd ZdZddZd Zd Zy)maybe_buffered_partdz[
    If serialized, will return non-buffered partd. Otherwise returns a buffered partd
    Nc                    |xs t        j                  dd       | _        || _        t        j                  dd       | _        || _        |dd l}|j                  | _        y y )Ntemporary_directoryzdataframe.shuffle.compressionr   )r   gettempdirbuffercompression
encode_clspartdPandasBlocks)selfr   r   r   r   s        Q/home/cdr/jupyterlab/.venv/lib/python3.12/site-packages/dask/dataframe/shuffle.py__init__zmaybe_buffered_partd.__init__   sW    I&**-BD"I!::&EtL$#00DO     c                    | j                   rt        | j                  d| j                   ffS t        | j                  dffS )NF)r   r	   r   )r   s    r   
__reduce__zmaybe_buffered_partd.__reduce__   s7    <<(4??E4<<*PQQ(4??E*BCCr   c                .   dd l }t        j                  d| j                        }	 | j                  r t        |j                  | j                        nd }|j                  |      }|j                  j                  j                  |       |r ||      }| j                  r/| j                  |j!                  |j#                         |            S | j                  |      S # t        $ r*}t        dj                  | j                              |d }~ww xY w)Nr   z.partd)suffixdirzxNot able to import and load {} as compression algorithm.Please check if the library is installed and supported by Partd.)r   tempfilemkdtempr   r   getattr
compressedAttributeErrorImportErrorformatFilefilecleanup_filesappendr   r   BufferDict)r   argskwargsr   pathpartd_compressioner$   s           r   __call__zmaybe_buffered_partd.__call__%   s    xT\\B	 ## (($*:*:;  zz$

  ''-$T*D;;??5<<

d#CDD??4((  	SSYSY$$T
 	s   .C! !	D*%DD)NTN)__name__
__module____qualname____doc__r   r   r.    r   r   r	   r	      s    1D)r   r	   c                    || j                  |d      } t        | d      t        |      z  }|j                  t        j                  |dz
               S )a  
    Computes a deterministic index mapping each record to a partition.

    Identical rows are mapped to the same partition.

    Parameters
    ----------
    df : DataFrame/Series/Index
    npartitions : int
        The number of partitions to group into.
    cast_dtype : dtype, optional
        The dtype to cast to to avoid nullability issues

    Returns
    -------
    partitions : ndarray
        An array of int64 values mapping each record to a partition.
    ignore)errorsFindex   )astyper   intnpmin_scalar_type)dfnpartitions
cast_dtyperess       r   partitioning_indexrB   G   sW    &  YYz(Y3
r
/#k2B
BC ::b((;?);<==r   c                    t        |        y)Nr   )list)r)   s    r   barrierrE   d   s    Jr   c                    t        |       5  | j                  |      }t        |      dkD  r|n|cddd       S # 1 sw Y   yxY w)z/Collect partitions from partd, yield dataframesr   N)ensure_cleanup_on_exceptionr   len)ppartmetabarrier_tokenrA   s        r   collectrM   i   s;    	$Q	' -eeDk#hls- - -s	   #9Ac                z   	 |r|j                  | d      dz
  }n"t        |      |j                  | d      z
  dz
  }|rt        |      dz
  nd||dk  |t        |      dz
  k\  z  <   | j                         }t        |d|      }|d	k(  rt        |      dz
  nd||<   |S # t        t        f$ r t	        j
                  t        |       d      }| j                         }||j                            }|r|j                  | |   d      dz
  ||<   n(t        |      |j                  | |   d      z
  dz
  ||<   Y w xY w)
Nright)sider9   int32)dtype   r   valueslast)	searchsortedrH   	TypeError
ValueErrorr<   emptynotnaisnar   )s	divisions	ascendingna_position
partitionsnot_nulldivisions_notnanass           r   set_partitions_prerd   p   s_   "///@1DJY)*@*@*@*QQTUUJ" (IQ 
Q:Y!1C#CDE &&(C
#x
%C,76,Ac)nq(qJsO- z" XXc!fG4
779#IOO$56,,Qx[w,G!K x 
 I!..q{.IJ x s   ;B BD:9D:c                   t        |       si | fS t        |t              r|g}|r.|d   dk(  r&| |d      j                  t        j
                        }n=t        |r| |   n| d      t        |      z  j                  t        j
                        }|j                         dz   }t        | |||      }|| j                  d d fS )Nr   _partitionsFr7   r9   ignore_index)rH   
isinstancestrr:   r<   rQ   r   r;   maxr   iloc)r>   colsrh   npartsindnresult2s          r   shuffle_group_2rr      s    r72v$vQ=(ak  * !TDrG#f+U
&
 	 		AA"2sALIGBGGBQKr   c                "    | \  }}||v r||   S |S Nr3   )g_headigheads       r   shuffle_group_getry      s     GAtAvtr   c                D   t        |t              r|g}|r|d   dk(  r	| |d      }n)t        |r| |   n| d      }|r||k7  r|t        |      z  }t	        j
                  |dz        }t        ri nddi}	 ||z  j                  |fi |	||z  z  |z  }t        | |||      S )a  Splits dataframe into groups

    The group is determined by their final partition, and which stage we are in
    in the shuffle

    Parameters
    ----------
    df: DataFrame
    cols: str or list
        Column name(s) on which to split the dataframe. If ``cols`` is not
        "_partitions", hashing will be used to determine target partition
    stage: int
        We shuffle dataframes with many partitions we in a few stages to avoid
        a quadratic number of tasks.  This number corresponds to which stage
        we're in, starting from zero up to some small integer
    k: int
        Desired number of splits from this dataframe
    npartition: int
        Total number of output partitions for the full dataframe
    nfinal: int
        Total number of output partitions after repartitioning

    Returns
    -------
    out: Dict[int, DataFrame]
        A dictionary mapping integers in {0..k} to dataframes such that the
        hash values of ``df[col]`` are well partitioned.
    r   rf   Fr7   rS   copyrg   )	ri   rj   r   r;   r<   r=   r   r:   r   )
r>   rm   stagekr?   rh   nfinalro   typr*   s
             r   shuffle_groupr      s    : $vQ=(ak"t2d85If+F#C


[1_
-C !RvuoF
$3
$
$S
3F
3q%x
?!
CCCFFr   c              #     K   	 d y# t         $ r7 	 | j                           # t         $ r t        j                  d       Y  w xY ww xY ww)zEnsure a partd.File is cleaned up.

    We have several tasks referring to a `partd.File` instance. We want to
    ensure that the file is cleaned up if and only if there's an exception
    in the tasks using the `partd.File`.
    Nz1ignoring exception in ensure_cleanup_on_exception)	Exceptiondroplogger	exception)rI   s    r   rG   rG      sU     	 	RFFH 	  	RPQ	Rs9   A	 A	A	$A	AA	AA		Ac                D    || j                   v r| j                  |      S | S rt   )r8   r   r>   r8   s     r   drop_overlapr      s     "bhh.2775>6B6r   c                ^    || j                   v r| j                  |g   S | j                         S rt   )r8   loc_constructorr   s     r   get_overlapr      s)    #rxx/2665'?FR__5FFr   rt   )TrU   )
__future__r   
contextlibloggingr   numpyr<   daskr   dask.dataframe._compatr   dask.dataframe.dispatchr   r   	getLoggerr/   r   r	   rB   rE   rM   rd   rr   ry   r   contextmanagerrG   r   r   r3   r   r   <module>r      s    "      0 N			8	$/) /)n>:
-> &,G^  &7Gr   