
    bi/                        d Z ddlmZ ddlZddlZddlZddlZddl	m
Z
 ddlmZ ddlmZ ddlmZmZ ddlmZmZ dd	lmZ d
 Zd Zd ZddZddZd Zd Zd Zd Zd Z y)a	  
Algorithms that Involve Multiple DataFrames
===========================================

The pandas operations ``concat``, ``join``, and ``merge`` combine multiple
DataFrames.  This module contains analogous algorithms in the parallel case.

There are two important cases:

1.  We combine along a partitioned index
2.  We combine along an unpartitioned index or other column

In the first case we know which partitions of each dataframe interact with
which others.  This lets us be significantly more clever and efficient.

In the second case each partition from one dataset interacts with all
partitions from the other.  We handle this through a shuffle operation.

Partitioned Joins
-----------------

In the first case where we join along a partitioned index we proceed in the
following stages.

1.  Align the partitions of all inputs to be the same.  This involves a call
    to ``dd.repartition`` which will split up and concat existing partitions as
    necessary.  After this step all inputs have partitions that align with
    each other.  This step is relatively cheap.
    See the function ``align_partitions``.
2.  Remove unnecessary partitions based on the type of join we perform (left,
    right, inner, outer).  We can do this at the partition level before any
    computation happens.  We'll do it again on each partition when we call the
    in-memory function.  See the function ``require``.
3.  Embarrassingly parallel calls to ``pd.concat``, ``pd.join``, or
    ``pd.merge``.  Now that the data is aligned and unnecessary blocks have
    been removed we can rely on the fast in-memory Pandas join machinery to
    execute joins per-partition.  We know that all intersecting records exist
    within the same partition


Hash Joins via Shuffle
----------------------

When we join along an unpartitioned index or along an arbitrary column any
partition from one input might interact with any partition in another.  In
this case we perform a hash-join by shuffling data in each input by that
column.  This results in new inputs with the same partition structure cleanly
separated along that column.

We proceed with hash joins in the following stages:

1.  Shuffle each input on the specified column.  See the function
    ``dd.shuffle``.
2.  Perform embarrassingly parallel join across shuffled inputs.
    )annotationsN)is_dtype_equal)methods)_concat)group_split_dispatchhash_object_dispatch)partitioning_indexshuffle_group)
asciitablec          	        |^}}|j                  dd      }|j                  dd      }|j                  j                  }|j                  d      j                  }||D ]  }	d }
d }|	| v r| |	   }
nP|	|j                  d      k(  r<|r:t        | j                  j                  t        j                        r| j                  }
|	|v r||	   }nP|	|j                  d      k(  r<|r:t        |j                  j                  t        j                        r|j                  }d}|
A|?t        j                  |
j                  d      |j                  d      g      j                  }|
Tt        |
t        j                        r|
j                  |      | _        n# | j                  di |	|
j                  |      i} |[t        |t        j                        r|j                  |      |_         |j                  di |	|j                  |      i} t        |      r|d   d	k(  s|j                  d
      d	k(  rPt        |t        j                  t        j                  f      r&|j!                         }t        |      rd|d<   nd|d
<    | j"                  |g|i |}t        |       dk(  r||j                     }t        |      dk(  r"| |j                  j                  |      |_        |S )N
left_indexFright_indexcategory)includeright_onleft_onr   leftsemihowinner )getindexdtypeselect_dtypescolumns
isinstancepdCategoricalDtyper   union_categoricalsastypeIndexassignlen	DataFrameSeriesdrop_duplicatesmerge)lhsresult_metaargskwargsrhsr   r   empty_index_dtypecategorical_columnscolleftrightr   outs                 O/home/cdr/jupyterlab/.venv/lib/python3.12/site-packages/dask/dataframe/multi.pymerge_chunkr4   M   s    JC$L%0J**]E2K#))//%33J3GOO&& 	CCDEcz3x

:..:ciioor/B/BC99DczC

9--+ciioor/B/BCIIEEE$522[[,ell:.FG%  dBHH- $E 2CI$#**AT[[-?'@AC eRXX. %U 3CI$#**BU\\%-@'ABC?	CB 4yT!W
*fjj.?:.McBLL"))45%%'C4y!Q 'u
#))C
)$
)&
)C
 3x1}+%%&
 3x1}*6II$$%67	J    c           	         t        |t              s|g}t        |t              s|g}t         fd|D              rt        fd|D              rt        ||      D cg c]L  \  }}t	         j
                  |   j
                  |         s!||f j
                  |   j
                  |   fN }}}|r1t        d|      }t        j                  dj                  |             yyyyc c}}w )zEChecks for merge column dtype mismatches and throws a warning (#4574)c              3  :   K   | ]  }|j                   v   y wNr   ).0r/   r0   s     r3   	<genexpr>z&warn_dtype_mismatch.<locals>.<genexpr>   s     
233$,,
2   c              3  :   K   | ]  }|j                   v   y wr8   r9   )r:   r/   r1   s     r3   r;   z&warn_dtype_mismatch.<locals>.<genexpr>   s      ;!$u}};r<   )zMerge columnsz
left dtypezright dtypezrMerging dataframes with merge column data type mismatches: 
{}
Cast dtypes explicitly to avoid unexpected results.N)
r   listallzipr   dtypesr   warningswarnformat)r0   r1   r   r   loro
dtype_mismcol_tbs   ``      r3   warn_dtype_mismatchrI      s     gt$)h%:

2'
22s ;(0; 8
 gx0
B!$++b/5<<3CD "Xt{{2R(89

 
 >
F MM0 &. 82
s    AC*c                   g }t        |       dz
  t        |      dz
  }}d\  }}|dz   |k  r*||dz      | |   k  r|dz  }|dz   |k  r||dz      | |   k  rg }||k  rt        dt        |dz
  |            }|dk\  r||   | |   kD  r||   nd}	|dz   |k  r2||dz      | |dz      k  s||dz      | |dz      k(  r||dz
  k(  r||dz      nd}
|j                  ||	|
f       |dz   |k(  s|dz   |k  r||dz      | |dz      k\  r|dz   n|}|dz   |k(  s|dz   |k  r| |dz      ||dz      k\  r|dz   n|}||kD  r|j                  |       g }n'||dz
  k(  r||   | |   kD  r|j                  |       	 |S ||}}||k  r|S )zoReturns which partitions to pair for the merge_asof algorithm and the
    bounds on which to split them up
       )r   r   N)r#   maxminappend)LRresultnmijJ	partitionlowerupperi1j1s                r3   pair_partitionsr]      s    Fq6A:s1vzqADAq
a%!)!a%AaD(	Q a%!)!a%AaD(
A
a%3q1ua=)	Q1Q4!A$;!D 1uqy1q5Aa!eH$!a%Aa!eH(<a!e a!eH 	 	 	
)UE*+!eqjQUQY1QU8qQx3GQUa!eqjQUQY1QU8qQx3GQUa6MM!A!a%ZAbEAaDLMM! M 21) a%, Mr5   c                t   g }||j                  |       |j                  |       ||j                  |       t        j                  |      }t        j                  | |fi |}|j                  j
                  | j                  j
                  k7  r%| j                  j
                  |j                  _        |S )zDmerge_asof but potentially adding rows to the beginning/end of right)rO   r   concat
merge_asofr   name)r0   r1   prevnextr+   framesframerR   s           r3   merge_asof_paddedrf      s    Fd
MM%dIIfE]]41&1F||DJJOO+ JJOOMr5   c                    t        t        t        t         |                   dk7  rt        d      t	        j
                  | d|      S )NrK   z,Concatenated DataFrames of different lengths)axisignore_order)r#   setmap
ValueErrorr   r_   )dfsri   s     r3   concat_and_checkrn      s9    
3s3}!#GHH>>#ALAAr5   c                h     d t        |t              rt         fd|D              S   |      S )z
    Test whether ``columns_or_index`` contains a reference
    to the index of ``df

    This is the local (non-collection) version of
    ``dask.core.DataFrame._contains_index_name``.
    c                    | j                   j                  d uxrS t        j                  |      xs t	        |t
              xr* || j                   j                  k(  xr |t        | dd      vS )Nr   r   )r   ra   npisscalarr   tuplegetattr)xkeys     r3   _is_index_level_referencez7_contains_index_name.<locals>._is_index_level_reference   sa    GGLL$ 5S!;ZU%;5qww||#5 71i44		
r5   c              3  0   K   | ]  } |        y wr8   r   )r:   rS   rw   dfs     r3   r;   z'_contains_index_name.<locals>.<genexpr>	  s     N,R3Ns   )r   r>   any)ry   columns_or_indexrw   s   ` @r3   _contains_index_namer|      s5    
 "D)N=MNNN(-=>>r5   c                    d }t        |t              r|n|g}|D cg c]  } || |      s| }}| |   }t        | |      r|j                  | j                        }|S c c}w )a'  
    Returns a DataFrame with columns corresponding to each
    column or index level in columns_or_index.  If included,
    the column corresponding to the index level is named _index.

    This is the local (non-collection) version of
    ``dask.core.DataFrame._select_columns_or_index``.
    c                p    t        j                  |      xs t        |t              xr || j                  v S r8   )rq   rr   r   rs   r   )ry   rv   s     r3   _is_column_label_referencez<_select_columns_or_index.<locals>._is_column_label_reference  s+    C :JsE$:Qrzz@QQr5   )_index)r   r>   r|   r"   r   )ry   r{   r   rS   column_namesselected_dfs         r3   _select_columns_or_indexr     s{    R
 ''7>EUDV   0U!3MbRS3TAULU\"KB 01!(((9 Vs
   A!A!c           	         t        |t              rt        j                  |      }t        |t              s)t
        j                  j                  j                  |      rt        |t              r|gn
t        |      }t        |      }|j                  t        | j                              |k(  r| |   }i }|j                  j                         D ]B  \  }}t
        j                  j                  j                  |      s0t         j"                  ||<   D |st%        | |   d      }nt%        | |   j'                  |      d      }||z  }t)        | ||d      S ddlm}	 t        ||	      st/        | |      }i }|j                  j                         D ]B  \  }}t
        j                  j                  j                  |      s0t         j"                  ||<   D |sd}t1        |||      }
| j3                  |
      }t5        |d	gd||d|      S )
z
    Split-by-hash a DataFrame into `nsplits` groups.

    Hashing will be performed on the columns or index specified by `on`.
    F)r   )ignore_indexr   )	FrameBaseN)
cast_dtype)_partitionsr   )r   bytespickleloadsstrr   apitypesis_list_liker>   rj   intersectionr   rA   itemsis_numeric_dtyperq   float64r   r    r   $dask.dataframe.dask_expr._collectionr   r   r	   r"   r
   )ry   onnsplitsnsetorA   r/   r   indr   
partitionsdf2s               r3   _split_partitionr   *  s    "e\\""cbffll77;  C(bTd2h2wS_-52AFhhnn. -
U66<<007"$**F3K- *2b6?*2b6==+@N-C'CuMM ?b)$%b"-Fiioo' %
U66<<((/**F3K% #BFCJ
))
)
+C		 r5   c                @    t        | d      }d|j                  v r|d= |S )z0Concat and remove temporary "_partitions" columnFr   )r   r   )rm   ry   s     r3   _concat_wrapperr   b  s'    	e	B

"}Ir5   c                     t        | i |j                         D ci c]-  \  }}|t        |t              rt	        j
                  |      n|/ c}}S c c}}w r8   )r4   r   r   r   r   r   )r*   r+   kvs       r3   _merge_chunk_wrapperr   j  sU    	 KQ,,.
BF!QA*Q"6v||AA=
 
s   2A)NN)F)!__doc__
__future__r   r   rB   numpyrq   pandasr   pandas.api.typesr   dask.dataframer   dask.dataframe.corer   dask.dataframe.dispatchr   r   dask.dataframe.shuffler	   r
   dask.dataframe.utilsr   r4   rI   r]   rf   rn   r|   r   r   r   r   r   r5   r3   <module>r      sf   6p #     + " ' N D +CLH!H,B?.85pr5   