
    bia9                        d dl Z d dlZd dlZd dlmZ d dlmZ d dlmZ d dl	m
Z
mZmZ d dlZd dlZd dlZd dlmZmZ d dlmZ d dlmZmZ d d	lmZ d d
lmZ d dlmZ ej@                  jB                  jE                  e#      Z$e
rd dl%Z%d dl&Z%e G d dejN                               Z(ddde)e*   fdZ+	 dddde)e*   dee,   fdZ- G d de      Z. G d dej^                        Z0y)    N)Iterable)	dataclass)islice)TYPE_CHECKINGOptionalUnion)ArrowWriterParquetWriter)MAX_SHARD_SIZE)is_remote_filesystemrename)_BaseExamplesIterable)experimental)convert_file_size_to_intc                   L     e Zd ZU dZdZeej                     ed<    fdZ	 xZ
S )SparkConfigzBuilderConfig for Spark.Nfeaturesc                 "    t         |           y N)super__post_init__)self	__class__s    `/home/cdr/jupyterlab/.venv/lib/python3.12/site-packages/datasets/packaged_modules/spark/spark.pyr   zSparkConfig.__post_init__%   s        )__name__
__module____qualname____doc__r   r   datasetsFeatures__annotations__r   __classcell__r   s   @r   r   r      s%    ",0Hhx(()0   r   r   dfpyspark.sql.DataFramenew_partition_orderc                     | j                  d      j                  d|d          }|dd  D ]6  }| j                  d      j                  d|       }|j                  |      }8 |S )N*z
part_id = r      )selectwhereunion)r%   r'   df_combinedpartition_idpartition_dfs        r   _reorder_dataframe_by_partitionr1   )   su    ))C.&&4G4J3K'LMK+AB/ 6yy~++j,GH!''56 r   partition_order
state_dictc              #     K   dd l }| j                  d|j                  j                  j	                         j                  d            }|r|d   nd}t        |||d        }|j                  d      }d }|r|d   nd}	t        ||	d       D ]\  }
|
j                         }|d   }|j                  d       ||k7  r|r||dxx   dz  cc<   |}d}	|r|	dz   |d<   | d	|	 |f |	dz  }	^ y w)
Nr   r)   part_idpartition_idxT)prefetchPartitionspartition_example_idxr*   _)pysparkr+   sql	functionsspark_partition_idaliasr1   toLocalIteratorr   asDictpop)r%   r2   r3   r:   df_with_partition_idpartition_idx_startr0   rowscurr_partitionrow_idrowrow_as_dictr5   s                r   _generate_iterable_examplesrI   1   s    
 99S'++*?*?*R*R*T*Z*Z[d*ef9C*_523GYlYmInoL''4'@DN4>Z/0AFdFD) jjli(	"W$n8?+q0+$NF281*J./	6(#[00!s   C*C,c                        e Zd Z	 d	 d fdZdefdZededef fd       Zd Zde	j                  j                  dd fdZdd	ed
edd fdZedefd       Z xZS )SparkExamplesIterablec                     t         |           || _        |xs- t        | j                  j                  j                               | _        y r   )r   __init__r%   rangerddgetNumPartitionsr2   )r   r%   r2   r   s      r   rM   zSparkExamplesIterable.__init__O   s:    
 	.W%8T8T8V2Wr   returnc                 .    ddd| _         | j                   S )Nr   )r6   r8   )_state_dictr   s    r   _init_state_dictz&SparkExamplesIterable._init_state_dictX   s    -.Kr   r3   c                 "    t         |   |      S r   )r   load_state_dict)r   r3   r   s     r   rW   z%SparkExamplesIterable.load_state_dict\   s    w&z22r   c              #   v   K   t        | j                  | j                  | j                        E d {    y 7 wr   )rI   r%   r2   rS   rT   s    r   __iter__zSparkExamplesIterable.__iter__`   s(     .tww8L8LdN^N^___s   /979	generatorc                     t        t        | j                  j                  j	                                     }|j                  |       t        | j                  |      S )Nr2   )listrN   r%   rO   rP   shufflerK   )r   rZ   r2   s      r   shuffle_data_sourcesz*SparkExamplesIterable.shuffle_data_sourcesc   sA    uTWW[[%A%A%CDE/*$TWWoNNr   
num_shardsindexc                 X    | j                  |||      }t        | j                  |      S )N)r`   ra   
contiguousr\   )split_shard_indices_by_workerrK   r%   )r   r`   ra   rc   r2   s        r   shard_data_sourcesz(SparkExamplesIterable.shard_data_sourcesh   s,    <<
Z_lv<w$TWWoNNr   c                 ,    t        | j                        S r   )lenr2   rT   s    r   r`   z SparkExamplesIterable.num_shardsl   s    4''((r   r   )r%   r&   )T)r   r   r   rM   dictrU   r   rW   rY   nprandom	Generatorr_   intre   propertyr`   r#   r$   s   @r   rK   rK   N   s     X#X $   3$ 34 3 3`Obii.A.A OF] O
OS O ORi O )C ) )r   rK   c                   
    e Zd ZeZ	 	 ddddedef fdZd Zd Zde	j                  j                  j                  fd	Zd
 Zdedededeeeeeeef   f      fdZ	 	 	 ddddedeeeef      dee   fdZdddefdZ xZS )Sparkr%   r&   	cache_dirworking_dirc                     dd l }|j                  j                  j                  j	                         | _        || _        || _        t        | $  d|t        | j                  j                               d| y )Nr   )rp   config_name )r:   r;   SparkSessionbuildergetOrCreate_sparkr%   _working_dirr   rM   strsemanticHash)r   r%   rp   rq   config_kwargsr:   r   s         r   rM   zSpark.__init__t   sk     	kk..66BBD' 	
DGG0023	
 	
r   c                    | j                   fd}| j                  j                  j                  dd      j	                  d      ry | j                   ro| j                  j
                  j                  t        d      d      j                  |      j                         }t        j                  j                  |d         ry t        d      )Nc                     t        j                  d       t         j                  j                  dt	        j
                         j                  z         }t        |d       |gS )NT)exist_okfs_testa)osmakedirspathjoinuuiduuid4hexopen)context
probe_filerp   s     r   create_cache_and_write_probez?Spark._validate_cache_dir.<locals>.create_cache_and_write_probe   sL     KK	D1iTZZ\=M=M1MNJ S!<r   zspark.master localr*   r   ztWhen using Dataset.from_spark on a multi-node cluster, the driver and all workers should be able to access cache_dir)
_cache_dirrx   confget
startswithsparkContextparallelizerN   mapPartitionscollectr   r   isfile
ValueError)r   r   proberp   s      @r   _validate_cache_dirzSpark._validate_cache_dir   s     OO		  ;;3>>wG
 ??((44U1XqAOOPlmuuw  ww~~eAh' C
 	
r   c                 V    t        j                  | j                  j                        S )N)r   )r    DatasetInfoconfigr   rT   s    r   _infozSpark._info   s    ##T[[-A-ABBr   
dl_managerc                 `    t        j                  t         j                  j                        gS )N)name)r    SplitGeneratorSplitTRAIN)r   r   s     r   _split_generatorszSpark._split_generators   s     ''X^^-A-ABCCr   c                    dd l }d }| j                  j                         }|dk  r|nd}| j                  j                  |      j	                  d      j                  |d      j                  |j                  j                  j                  d      j                  d            j                         d   j                  |z  }||z  }||kD  r9t        |t        ||z              }| j                  j	                  |      | _        y y )Nr   c              3   v   K   | D ]0  }t         j                  j                  d|j                  gi       2 y w)Nbatch_bytes)paRecordBatchfrom_pydictnbytes)itbatchs     r   get_arrow_batch_sizez=Spark._repartition_df_if_needed.<locals>.get_arrow_batch_size   s7      Rnn00-%,,1PQQRs   79d   r*   zbatch_bytes: longr   sample_bytes)r:   r%   countlimitrepartition
mapInArrowaggr;   r<   sumr>   r   r   minrl   )	r   max_shard_sizer:   r   df_num_rowssample_num_rowsapprox_bytes_per_rowapprox_total_sizenew_num_partitionss	            r   _repartition_df_if_neededzSpark._repartition_df_if_needed   s    	R ggmmo)4);+ GGMM/*[^Z,.ABS&&**=9??OPWYq	
 \ 	 1;>~-!$[#6G.6X2Y!Zgg))*<=DG .r   fpathfile_formatr   rQ   c           	   #   2  	
K   dd l 	|dk(  rt        nt        | j                  rGt        j
                  j                  | j                  t        j
                  j                              n|dk(  | j                  j                  | j                  | j                  j                  
	
f	d}| j                  j                  |d      j                  d      j!                  	j"                  j$                  j'                  d      j)                  d      	j"                  j$                  j'                  d      j)                  d	      	j"                  j$                  j+                  d      j)                  d
      	j"                  j$                  j-                  d      j)                  d            j/                         }|D ]>  }|j0                  |j2                  |j4                  |j6                  |j8                  ff @ y w)Nr   parquetc           	   3   D  	K    j                          j                         }t        | d       }|)t        j                  j                  |gdgdggg d      S d} j                  d|d      j                  d|d            }t        j                  j                  |g      }|j                  |       | D ]  }|j                  k\  r|j                         \  }}|j                          t        j                  j                  |g|g|ggg d       |dz  } |j                  j                  d|d      j                  d|d            }t        j                  j                  |g      }|j                  |        |j                  dkD  rN|j                         \  }}|j                          t        j                  j                  |g|g|ggg d       k7  rt        j                  t        j                   j#                              D ]r  }	t        j                   j%                  t        j                   j#                        t        j                   j'                  |	            }
t(        j+                  |	|
       t y y w)	Nr   )task_idnum_examples	num_bytes)namesSSSSS05dTTTTT)r   r   writer_batch_sizestorage_optionsembed_local_filesr*   )TaskContexttaskAttemptIdnextr   r   from_arraysreplaceTablefrom_batcheswrite_table
_num_bytesfinalizeclose	_featuresr   listdirr   dirnamer   basenameshutilmove)r   r   first_batchshard_idwritertabler   r   r   filedestr   r   r   r   r:   r   working_fpathr   writer_classs              r   write_arrowz0Spark._prepare_split_single.<locals>.write_arrow   sx    )g))+99;Gr4.K"~~11YaS)B 2   H!!"**7xnFNNw[bcfZgi"3 /"3F HH));-8Eu% *!-&2C2C~2U.4oo.?+L)LLN..44!\NYK@F 5   MH)!'!1!1*227xnNVVW^cjknboq*;(7*;F --ug6""5)#*&   1$*0//*;'inn00Y<B 1  
 %JJrww}'EF ,D77<<(>@P@PQU@VWDKKd+, &s   JJ z2task_id: long, num_examples: long, num_bytes: longr   r   total_num_examplesr   total_num_bytesr`   shard_lengths)r:   r
   r	   ry   r   r   r   r   r   r   _writer_batch_size_fsr   r%   r   groupByr   r;   r<   r   r>   r   collect_listr   r   r   r   r`   r   )r   r   r   r   r   statsrG   r   r   r:   r   r   r   r   s    ` `   @@@@@@@r   _prepare_split_singlezSpark._prepare_split_single   s     	(3y(@}kTXTeTeT%6%68H8H8OPkp'94 ;;'' 33((222	, 2	,j GG{,`aWYS%%)).9??@TU%%))+6<<=NO%%++K8>>|L%%22>BHHY	 WY 	  	pC++ 6 68K8KS^^]`]n]nooo	ps   HHsplit_generatorzdatasets.SplitGeneratornum_procc                    | j                          t        |xs t              }| j                  |       t	        | j
                         }|rt        j                  j                  nt        j                  }d}| j                   d|j                   | d| }	 || j                  |	      d}
d}dg }g }| j                  ||      D ]E  \  }}|\  }}}}|dkD  s|
|z  }
||z  }|z  |j                  ||f       |j                  |       G |
|j                  _        ||j                  _        t$        j'                  d d       dkD  r||j                  _        | j
                  dt*        d	t*        d
t*        ffdg }d}t-        t/        |            D ]3  }||   \  }}t-        |      D ]  }|j                  |||g       |dz  } 5 | j0                  j2                  j5                  |t/        |            j7                  fd      j9                          y d}|d   d   }| j;                  j=                  d|d      j=                  d|d      j=                  |d             y )Nz-TTTTT-SSSSS-of-NNNNN-.r   z	Renaming z shards.r*   r   r   global_shard_idc           	          t        j                  d|d      j                  d| d      j                  d|d      j                  dd             y )Nr   r   r   zTTTTT-SSSSSNNNNN)r   r   )r   r   r   r   fstotal_shardss      r   _rename_shardz+Spark._prepare_split.<locals>._rename_shardO  s^    
 MM'hs^=EEgRYZ]Q^`MM-OC3HJRRSZ_klo^prr   c                      |  S r   rt   )argsr   s    r   <lambda>z&Spark._prepare_split.<locals>.<lambda>a  s    S`bfSg r   r   r   r   r   )r   r   r   r   r   r   r   r   r   	posixpathr   _output_dirr   appendextend
split_infor   r   loggerdebugr   rl   rN   rg   rx   r   r   mapr   _renamer   )r   r   r   r   r   kwargsis_local	path_joinSUFFIXfnamer   r   task_id_and_num_shardsall_shard_lengthsr   contentr   r   r`   r   r   r   ir   r   r   r   r   s                           @@@@r   _prepare_splitzSpark._prepare_split  s    	  "1.2RNS&&~6+DHH55$,BGGLL)..	(99+Q334VHAk]K$**E2!# $ : :5+~ ^ 	8GW 1}"l2"9,
*&--w
.CD!((7	8 3E""//>"", 	yh78!7HO&&4 B			 "%	 DO3567 )&<Q&?# %j 1 )HKK(O DE#q(O))
 KK$$00s4yAEEFghppr H,Q/2GLLg(39AA'gVY]\fb)r   c                 ,    t        | j                        S r   )rK   r%   )r   r   s     r    _get_examples_iterable_for_splitz&Spark._get_examples_iterable_for_splitk  s     %TWW--r   )NN)arrowNN)r   r   r   r   BUILDER_CONFIG_CLASSrz   rM   r   r   r    downloaddownload_managerDownloadManagerr   r   rl   r   tupleboolr   r   r   r  rK   r  r#   r$   s   @r   ro   ro   q   s   &
 	
#
 
 	
&
BCDH,=,=,N,N,^,^ D>2RpRp Rp 	Rp
 
%T5e#445	6Rpn #48"&N2N N !sCx1	N
 3-N`.2. 
.r   ro   r   )1r   r   r   collections.abcr   dataclassesr   	itertoolsr   typingr   r   r   numpyri   pyarrowr   r    datasets.arrow_writerr	   r
   datasets.configr   datasets.filesystemsr   r   datasets.iterable_datasetr   datasets.utilsr   datasets.utils.py_utilsr   utilslogging
get_loggerr   r  r:   pyspark.sqlBuilderConfigr   r]   rl   r1   rh   rI   rK   DatasetBuilderro   rt   r   r   <module>r.     s    	   $ !  1 1    < * < ' < 
			*	*8	4  (((    (? VZ[^V_  "&#Y : )1  )F~.H## ~.r   