
    bi                   R   d dl mZ d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	Z	d dl
mZ d dlmZ d dlmZmZ d dlZd dlZd dlZd dlmZ d dlZd dlmZ d dlmZ d dl m!Z" d dl#Z$d dl%m&Z& d dl$m'Z' d dl(Z(d dl)m*Z*m+Z+ d d	l,m-Z- d d
l.m/Z/m0Z0m1Z1m2Z2m3Z3m4Z4m5Z5m6Z6m7Z7m8Z8m9Z9m:Z:m;Z;m<Z<m=Z=m>Z> d dl?m@Z@ d dlAmBZB d dlCmDZDmEZE d dlFmGZG d dlHmIZImJZJmKZKmLZLmMZMmNZNmOZO d dlPmQZQ d dlRmSZS d dlTmUZU d dlVmWZWmXZXmYZY d dlZm[Z[ d dl\m]Z]m^Z^m_Z_m`Z`maZa  eXj                  ej<                  j                        d        Zddaed Zfi Zg ej                          ej                          ej                          ej                          ej                          ej                          ej                          ej                          ej                          ej                          ej                          ej                          ej                          ej                          ej                          ej                          ej                          ej                          ej                          ej                          ej                          ej                          ej                          ej                         iZdZdZi Z G d d      Zd Z eXj                  ej                        d        Z eXj                  ej                        d        Z eXj                  ej                        d         Z eXj                  e"j                        d!        Z eXj                  e"j                        d"        Z G d# d$e7      Z G d% d&e6      Z G d' d(e7      Z	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 dCd)Zd* Z G d+ d,eEeD      Z G d- d.e      Z G d/ d0e      ZdDd1Zd2 Zd3 Zd4 Z G d5 d6      Z	 dE	 	 	 	 	 dFd7ZdEd8Zd9 Zd: Zd; Zd< Zd= Zd> Zd? Ze(j                  d@        ZdA ZdB Zy)G    )annotationsN)abstractmethod)defaultdict)cached_propertypartial)stringify_path)identity)TaskTaskRef)flatten)EQGEGTLELTNEAnd	BlockwiseExprFilterIndexLengthsLiteralOr
Projectiondetermine_column_projection)Len)_convert_to_list)BlockwiseIOPartitionsFiltered)FusedParquetIO)ParquetFunctionWrapperToParquetFunctionWrapperaggregate_row_groupsapply_filters
get_engineset_index_columnssorted_columns)_split_user_options)_is_local_fs)delayed)_tokenize_deterministicnormalize_tokentokenize)Key)applyfuncnamenatural_sort_keyparse_bytestypenamec                r    t        |       j                  | j                  | j                  | j                  ffS N)type__name__pathsizemtime_ns)fileinfos    ^/home/cdr/jupyterlab/.venv/lib/python3.12/site-packages/dask/dataframe/dask_expr/io/parquet.py_tokenize_fileinfor>   E   s5    >""%      Fc                 d    t         s*t        j                  t        j                                da y y )NT)_CPU_COUNT_SETpaset_cpu_countos	cpu_count r?   r=   _maybe_adjust_cpu_countrG   Q   s%     	( r?   __null_dask_index__
   c                  h    e Zd Z ej                         ZdZ	 d		 d
dZd Zd Z	e
d        Zd Zd Zy)FragmentWrapper)NNc                J    || _         || _        || _        d| _        || _        y)z8Wrap a pyarrow Fragment to only deserialize when needed.N)	_fragment_fragment_packed
_file_size_fs_filesystem)selffragment
filesystem	file_sizefragment_packeds        r=   __init__zFragmentWrapper.__init__w   s)    
 " /#%r?   c                @   | j                   | j                  j                  }|j                  t	        j
                  d            rd}| j                  j                  }|j                  j                  t        j                         j                        rd }| j                  xs | j                  j                  }|j                  | j                  j                        sJ | j                  d   t        |      k7  r)t        |      t        j                  |      ft         _        t         j                  d   }|| j                  j"                  | j                  j$                  n| j                  j"                  ||| j&                  f| _         d x| _        | _        y )NTr      )rO   rN   partition_expressionequalspcscalarformatread_optionspa_dsParquetFileFormatrR   rU   _filesystem_pickle_cacheidpickledumpsrK   bufferr9   rP   rQ   )rS   	part_exprpqformatfsfs_pkls        r=   packzFragmentWrapper.pack   s?     (;;I		$0 	~~,,H$$++E,C,C,E,R,RS!!>T^^%>%>B99T^^66777,,Q/2b69<>rFFLLQSDT;U8$==a@F ~~,,4 NN''..
%D! %)(4>r?   c                   | j                   | j                  \  }}}}}t        j                  j	                  |      }|(t        j                  |      }|t        j                  |<   |du rt        j                  d      }|| _	        |t        j                         }|j                  ||||      | _         d | _        y )NT)rU   r[   rV   )rN   rO   rK   _filesystemsgetre   loadsr]   r^   rQ   ra   rb   make_fragment)rS   ri   path_or_bufferfs_rawr[   rV   rj   s          r=   unpackzFragmentWrapper.unpack   s    >>! %%$ --11&9Bz\\&)79,,V4#t+')yy$ DH 224%33%9#	 4 DN !%r?   c                :    | j                          | j                  S r6   )rt   rN   rS   s    r=   rT   zFragmentWrapper.fragment   s    ~~r?   c                    t        |       j                  t        | j                  | j                  | j
                  f      fS r6   )r7   r8   r-   rT   rO   rP   rv   s    r=   __dask_tokenize__z!FragmentWrapper.__dask_tokenize__   s<    Dz""O%%%
 
 	
r?   c                N    | j                          t        d d d | j                  ffS r6   )rl   rK   rO   rv   s    r=   
__reduce__zFragmentWrapper.__reduce__   s%    		tT43H3H IIIr?   NN)returnNone)r8   
__module____qualname__weakrefWeakValueDictionaryrn   rc   rX   rl   rt   propertyrT   rx   rz   rF   r?   r=   rK   rK   s   sU    .7..0L) EI	&		&)6%8  
Jr?   rK   c                    t        t              t        kD  r?| t        vr6t        t        j	                               d   }t        j                  |       y y y Nr   )len_cached_plan_CACHED_PLAN_SIZElistkeyspop)key
key_to_pops     r=   _control_cached_planr      sF    
<,,L1H,++-.q1
$ 2I,r?   c                2    | j                   | j                  fS r6   )filesschema)dss    r=   normalize_pa_dsr      s    HHbii  r?   c                    t        |       S r6   )str)file_formats    r=   normalize_pa_file_formatr      s    {r?   c                "    | j                         S r6   )	to_stringr   s    r=   normalize_pa_schemar      s    r?   c                `    	 t        |       S # t        $ r t        t        |             cY S w xY wr6   )hash	TypeErrorreprr   s    r=   normalize_pq_schemar      s/    "F| "DL!!"s   
 --c                    	 t        |       S # t        $ rN t        t        | j                        | j                  | j
                  | j                  | j                  f      cY S w xY wr6   )r   r   r   r   num_rowsnum_row_groupsformat_versionserialized_size)metas    r=   normalize_pq_filemetadatar      sb    
Dz 
 T[[!####$$
 	
	
s   
 AA$#A$c                  0    e Zd Zg dZed        Zd Zd Zy)	ToParquet)framer9   rj   fmdengineoffsetpartition_onwrite_metadata_filename_functionwrite_kwargsappendc                     y r6   rF   rv   s    r=   _metazToParquet._meta      r?   c                     yNr{   rF   rv   s    r=   
_divisionszToParquet._divisions      r?   c                T    t        t        | j                   g| j                  dd   S NrZ   )ToParquetBarrierToParquetDataoperandsrv   s    r=   _lowerzToParquet._lower  s3    
 ]]12	
 	
r?   N)r8   r~   r   _parametersr   r   r   r   rF   r?   r=   r   r      s'    K  
r?   r   c                  B    e Zd Zej                  Zed        Zd ZddZy)r   c           
         t        | j                  | j                  | j                  | j                  | j
                  | j                  | j                  | j                        S r6   )	r#   r   r9   rj   r   r   r   r   r   rv   s    r=   io_funczToParquetData.io_func  sO    'KKIIGG$$KK	
 		
r?   c                :    d| j                   j                  dz   z  S )Nr6   rZ   )r   npartitionsrv   s    r=   r   zToParquetData._divisions+  s    $**001455r?   c                r    t        || j                  t        | j                  j                  |f      |f      S r6   )r
   r   r   r   _name)rS   nameindexs      r=   _taskzToParquetData._task.  s-    D$,,1A1A50I(JUHUUr?   Nr   r/   r   intr|   r
   )	r8   r~   r   r   r   r   r   r   r   rF   r?   r=   r   r     s*    ''K

 

6Vr?   r   c                  @    e Zd Zej                  Zed        Zd Zd Zy)r   c                     y r6   rF   rv   s    r=   r   zToParquetBarrier._meta5  r   r?   c                     yr   rF   rv   s    r=   r   zToParquetBarrier._divisions9  r   r?   c                   | j                   r| j                  }| j                  j                  d      }| j                  dft
        | j                  j                  | j                  j                         | j                  | j                  | j                  g||dfiS | j                  dfd | j                  j                         fiS )Ncompressionr   )r   r   c                     y r6   rF   xs    r=   <lambda>z)ToParquetBarrier._layer.<locals>.<lambda>N  s    r?   )r   r   r   ro   r   r0   r   write_metadatar   __dask_keys__r   rj   r9   )rS   r   r   s      r=   _layerzToParquetBarrier._layer<  s    ##[[F++//>KQKK..

002			  &kB
"  ZZOndjj6N6N6P%QRRr?   N)	r8   r~   r   r   r   r   r   r   r   rF   r?   r=   r   r   2  s*    ''K Sr?   r   c                
   ddl m} t        || j                        }|xs i }|xs g }t	        |t
              r|g}t        |      t        | j                        z
  r6t        dt        |      dt        t        | j                                    | j                  j                  dvrt        d      t	        |t
              rt        |      }t        |d      rt        |      }|j                  ||i i |	      \  }}}}t        |      d
k(  sJ d       |d   }|r|rt        d      |j!                  |      r|j#                  |      r| j$                  j'                  t(              D ]Y  }t        |j*                        j-                  d      dz   }|j-                  d      dz   }|j/                  |      sPt        d       t1        |      rB|j3                  d      d   }|j-                  d      |j-                  d      k(  rt        d      |j5                  |d       t6        j9                          | j:                  sd}| j<                  | j>                  j@                  d}|d   >tB        | j                  vr
tB        |d<   n"|rt        d      tE        jF                  d       g }dD ]"  }|| j                  vs|jI                  |       $ g }|rt        | j                        }t        | j                  j>                  jJ                        dgk(  }| jM                         } |r5| j                  D  ci c]  } | |v s| tB         }!} | jO                  |!      } t        | j                        |z
  D  cg c]  } |  }} n| jM                  d      } |	rd|	jQ                         v rt        d       |jR                  | ||f|||||||	d|\  }"}#}$}%|r|
|$}
|ltU        |      st        d      tW        | jX                        D &cg c]  }& ||&|"z          }'}&t        t        |'            t        |'      k  rt        d      tZ        j\                  j_                  di       }(d |(vr"t1        |      st[        j`                  d!"      })ntc        jd                         })tE        jf                         5  tE        jh                  d#d$tj        %       |)5   |tm        | |||#||"||
|to        jp                  |||	d&|%      |            }*ddd       ddd       |r *jr                  d'i |}*|ju                  |       *S c c} w c c} w c c}&w # 1 sw Y   GxY w# 1 sw Y   KxY w)(u9  Store Dask.dataframe to Parquet files

    Notes
    -----
    Each partition will be written to a separate file.

    Parameters
    ----------
    df : dask.dataframe.DataFrame
    path : string or pathlib.Path
        Destination directory for data.  Prepend with protocol like ``s3://``
        or ``hdfs://`` for remote data.
    compression : string or dict, default 'snappy'
        Either a string like ``"snappy"`` or a dictionary mapping column names
        to compressors like ``{"name": "gzip", "values": "snappy"}``. Defaults
        to ``"snappy"``.
    write_index : bool, default True
        Whether or not to write the index. Defaults to True.
    append : bool, default False
        If False (default), construct data-set from scratch. If True, add new
        row-group(s) to an existing data-set. In the latter case, the data-set
        must exist, and the schema must match the input data.
    overwrite : bool, default False
        Whether or not to remove the contents of `path` before writing the dataset.
        The default is False.  If True, the specified path must correspond to
        a directory (but not the current working directory).  This option cannot
        be set to True if `append=True`.
        NOTE: `overwrite=True` will remove the original data even if the current
        write operation fails.  Use at your own risk.
    ignore_divisions : bool, default False
        If False (default) raises error when previous divisions overlap with
        the new appended divisions. Ignored if append=False.
    partition_on : list, default None
        Construct directory-based partitioning by splitting on these fields'
        values. Each dask partition will result in one or more datafiles,
        there will be no global groupby.
    storage_options : dict, default None
        Key/value pairs to be passed on to the file-system backend, if any.
    custom_metadata : dict, default None
        Custom key/value metadata to include in all footer metadata (and
        in the global "_metadata" file, if applicable).  Note that the custom
        metadata may not contain the reserved b"pandas" key.
    write_metadata_file : bool or None, default None
        Whether to write the special ``_metadata`` file. If ``None`` (the
        default), a ``_metadata`` file will only be written if ``append=True``
        and the dataset already has a ``_metadata`` file.
    compute : bool, default True
        If ``True`` (default) then the result is computed immediately. If
        ``False`` then a ``dask.dataframe.Scalar`` object is returned for
        future computation.
    compute_kwargs : dict, default True
        Options to be passed in to the compute method
    schema : pyarrow.Schema, dict, "infer", or None, default "infer"
        Global schema to use for the output dataset. Defaults to "infer", which
        will infer the schema from the dask dataframe metadata. This is usually
        sufficient for common schemas, but notably will fail for ``object``
        dtype columns that contain things other than strings. These columns
        will require an explicit schema be specified. The schema for a subset
        of columns can be overridden by passing in a dict of column names to
        pyarrow types (for example ``schema={"field": pa.string()}``); columns
        not present in this dict will still be automatically inferred.
        Alternatively, a full ``pyarrow.Schema`` may be passed, in which case
        no schema inference will be done. Passing in ``schema=None`` will
        disable the use of a global file schema - each written file may use a
        different schema dependent on the dtypes of the corresponding
        partition.
    name_function : callable, default None
        Function to generate the filename for each output partition.
        The function should accept an integer (partition index) as input and
        return a string which will be used as the filename for the corresponding
        partition. Should preserve the lexicographic order of partitions.
        If not specified, files will created using the convention
        ``part.0.parquet``, ``part.1.parquet``, ``part.2.parquet``, ...
        and so on for each partition in the DataFrame.
    filesystem: "fsspec", "arrow", or fsspec.AbstractFileSystem backend to use.
    **kwargs :
        Extra options to be passed on to the specific backend.

    Examples
    --------
    >>> df = dd.read_csv(...)  # doctest: +SKIP
    >>> df.to_parquet('/path/to/output/', ...)  # doctest: +SKIP

    By default, files will be created in the specified output directory using the
    convention ``part.0.parquet``, ``part.1.parquet``, ``part.2.parquet``, ... and so on for
    each partition in the DataFrame. To customize the names of each file, you can use the
    ``name_function=`` keyword argument. The function passed to ``name_function`` will be
    used to generate the filename for each partition and should expect a partition's index
    integer as input and return a string which will be used as the filename for the corresponding
    partition. Strings produced by ``name_function`` must preserve the order of their respective
    partition indices.

    For example:

    >>> name_function = lambda x: f"data-{x}.parquet"
    >>> df.to_parquet('/path/to/output/', name_function=name_function)  # doctest: +SKIP

    will result in the following files being created::

        /path/to/output/
            ├── data-0.parquet
            ├── data-1.parquet
            ├── data-2.parquet
            └── ...

    See Also
    --------
    read_parquet: Read parquet data to dask.dataframe
    r   )new_collection)r   r   z2Partitioning on non-existent column. partition_on=z
 .columns=>   emptystringz/parquet doesn't support non-string column namesr   )rU   dataset_optionsopen_file_optionsstorage_optionsrZ   zonly one pathz3Cannot use both `overwrite=True` and `append=True`!/zICannot overwrite a path that you are reading from in the same task graph..z;Cannot clear the contents of the current working directory!T	recursive)	divisionsr   Nz:Index must have a name if __null_dask_index__ is a column.zfIf read back by Dask, column named __null_dask_index__ will be set to the index (and renamed to None).)r   level_0columns)drops   pandaszUser-defined key/value metadata (custom_metadata) can not contain a b'pandas' key.  This key is reserved by Pandas, and overwriting the corresponding value can render the entire dataset unreadable.)r   ignore_divisionsr   division_info
index_colsr   custom_metadataz7``name_function`` must be a callable with one argument.z0``name_function`` must produce unique filenames.r   retries   )r   ignorezDask annotations )messagecategory)r   r   rF   );$dask.dataframe.dask_expr._collectionr   _set_parquet_enginer   
isinstancer   setr   
ValueErrorr   inferred_typer&   hasattrr   extract_filesystemr   existsisdirexprfind_operationsReadParquetr9   rstrip
startswithr*   expand_pathrmr   clearknown_divisionsr   r   r   
NONE_LABELwarningswarnr   namesreset_indexrenamer   initialize_writecallableranger   daskconfigro   annotate
contextlibnullcontextcatch_warningsfilterwarningsUserWarningr   toolzmergecomputeinvalidate_cache)+dfr9   r   write_indexr   	overwriter   r   r   r   r   r  compute_kwargsr   r   rU   r   kwargsr   rj   _paths_read_opread_path_with_slashwrite_path_with_slashworking_dirr   reserved_namesr   r   	real_cols
none_indexcrename_columnsi_offsetr   metadata_file_existsextra_write_kwargsi	filenamesr   ctxouts+                                              r=   
to_parquetr0  Q  sp   B D RXX>F#)rN%2L,$$~
<3rzz?*  -s4

3C/DF
 	
 
zz'::JKK&#F#tVd#00' 1 B1 v;!,_,!9DRSS99T?rxx~ 7722;? '*7<<'8'?'?'Ds'J$(,C(83(>%'223HI$7 	 B nnS1!4;;s#{'9'9#'>>$U 
 EE$$E' 	  #%,,FMV$ RZZ'$.M&!L  MMB N$ (rzz!!!$'( J

O	"((..../D69
^^57ZZW1CVamWNW>2B!$RZZ9!<=Aa=
= ^^^&9(<(<(>>)
 	
 ?Vf>U>U

? )!#'? ?;Hc');  %-2  &VWW:?:OPQ]1x<0P	Ps9~Y/OPP ++//-4K#L,<mmA&$$&		 	 	" 1K	
  	  '!KK+6/> + %C		8 ckk+N+
 Jw X=N Q$	 		 s<   	U	U	UUU)#4UU)U&	"U))U2c                   g d }| j                  |        |rj                  t        j                         t        j                  d      ij
                         j                  t        j                         t        j                  t        j                               ij
                         j                  t        j                         t        j                  t        j                               ij
                         d }j                  |       |dk(  r j                  t        j
                         n|dk(  rj                  |       fd}t              dkD  r|S y )Nc                    | t        j                         k(  rt        j                  d      S t        j                  |       S )Npyarrow)rB   r   pdStringDtype
ArrowDtype)pyarrow_dtypes    r=   pyarrow_type_mapperz3_determine_type_mapper.<locals>.pyarrow_type_mapper  s0     BIIK'>>),,==//r?   r3  c                l    t         j                  j                  |       rt        j                  |       S y r6   )rB   types
is_decimalr4  r6  )r7   s    r=   _convert_decimal_typez5_determine_type_mapper.<locals>._convert_decimal_type  s&    xx""4(}}T**r?   numpy_nullablec                0    D ]  } ||       }||c S  y)zBTry all type mappers in order, starting from the user type mapper.NrF   )r7  type_converterconverted_typetype_mapperss      r=   default_types_mapperz4_determine_type_mapper.<locals>.default_types_mapper  s(    * 	&N+M:N)%%	&r?   r   )r   rB   r   r4  r5  ro   date32r6  date64PYARROW_NULLABLE_DTYPE_MAPPINGr   )user_types_mapperdtype_backendpyarrow_strings_enabledr8  r<  rB  rA  s         @r=   _determine_type_mapperrI    s    L0 $-. RYY["..*CDHHIRYY["--		*DEIIJRYY["--		*DEIIJ	
 	12 ((:>>?	)	#/0& <1## r?   c                       e Zd ZdZdZdZ fdZ fdZed        Z	e
d        Zd Ze
d        Zed	        Z fd
Ze
d        Zed        Zed        Z xZS )r   FTc                    t         |   ||      xrl t        |j                  t        t
        t        t        t        t        t        t        f      xr, t        j                  | |j                        j                  d uS r6   )super_filter_passthrough_availabler   	predicater   r   r   r   r   r   r   r   _DNFextract_pq_filters_filters)rS   parent
dependents	__class__s      r=   rM  z)ReadParquet._filter_passthrough_available  sf    G1&*E UF,,r2r2r2sB.OPU''f.>.>?HHPTT	
r?   c                &   t        |t              rjt        | ||      }| j                  D cg c]	  }||v s| }}t	        |      t	        | j                        k(  ry t        | j                  |dd            S t        |t              rt        | !  ||      S t        |t              r}| j                  ||      rkt        j                  | |j                        }|j                  ?| j                  d|j                  | j!                  d            j#                         i      S t        |t$              r| j'                         }|rt)        |      S t        |t*              r'| j'                         }|rt)        t-        |            S y y c c}w )NF)r   _seriesfilters)r   r   r   r   r   substitute_parametersr   rL  _simplify_upr   rM  rO  rP  rN  rQ  combineoperandto_list_tupler   _get_lengthsr   r   sum)rS   rR  rS  r   colrW  _lengthsrT  s          r=   rY  zReadParquet._simplify_up  sk   fe$1$
KG&*llEscWnsEGE7|s4<<00**w5+QR  fj)7'
;;ff%$*L*LJ+
 --dF4D4DEG+11!7?? LL3$'-/  fg&((*Hx((fc"((*Hs8}--  #; Fs
   	FFc                |    | j                  d      }|t        | j                  j                        S t	        |      S )Nr   )r[  r   r   r   r   )rS   columns_operands     r=   r   zReadParquet.columns   s6    ,,y1"

**++#O44r?   c                     y)Nread_parquetrF   rv   s    r=   	_funcnamezReadParquet._funcname  s    r?   c                    | j                   s;t        t        t        |             | j                  g| j
                  d d  | _         | j                   S )NrL   )_determ_tokenr,   r1   r7   checksumr   rv   s    r=   rx   zReadParquet.__dask_tokenize__  sK    !!!8d$dmm"6:mmCR6H"D !!!r?   c                :    | j                   dz   | j                  z   S )N-)re  deterministic_tokenrv   s    r=   r   zReadParquet._name  s    ~~#d&>&>>>r?   c                     | j                   d   S )Nrh  _dataset_inforv   s    r=   rh  zReadParquet.checksum  s    !!*--r?   c                N    | j                   |   dk(  r|S t        | 	  |||      S )N_dataset_info_cache)r   rL   _tree_repr_argument_construction)rS   r,  opheaderrT  s       r=   rq  z,ReadParquet._tree_repr_argument_construction  s0    A"77Mw72vFFr?   c                    | j                   d   }t        | j                  d            }| j                  rt	        |      dkD  sJ ||d      S |||   S |S )N	base_metar   r   )rn  r   r[  rV  r   )rS   r   r   s      r=   r   zReadParquet._meta"  sb    !!+."4<<	#:;<<w<!###
## = r?   c                    t         r6   )NotImplementedErrorrv   s    r=   r   zReadParquet._divisions-  s    !!r?   c           	         | j                  d      yt        t        | j                  d   j                        dz
  d      }t        t        t        | j                  d                  |z  d      S )Nr   rZ   r   MbP?)r[  maxr   rn  r  r   )rS   nr_original_columnss     r=   _fusion_compression_factorz&ReadParquet._fusion_compression_factor1  si    <<	"*!#d&8&8&B&H&H"IA"MqQ i!89:=PPRW
 	
r?   )r8   r~   r   _pickle_functools_cache_absorb_projections_filter_passthroughrM  rY  r   r   r   re  rx   r   rh  rq  r   r   r   r|  __classcell__)rT  s   @r=   r   r     s    #
$.L 5 5  " ? ? . .G   " " 
 
r?   r   c                  .   e Zd Zg dZdddddddddddddddZdZdZed        Zed        Z	ddZ
dd	Zdd
Zed        Zed        Zd Zed        Zed        ZddZd Zd Zd Zed        Zed        Zed        ZddZed        Zed        Zy)ReadParquetPyarrowFS)r9   r   rW  
categoriesr   r   rU   ignore_metadata_filecalculate_divisionsarrow_to_pandasrH  r  _partitionsrV  rp  NTF)r   rW  r  r   r   rU   r  r  r  rH  r  r  rV  rp  c                ,    t        | j                        S r6   )_normalize_and_strip_protocolr9   rv   s    r=   normalized_pathz$ReadParquetPyarrowFS.normalized_path`  s    ,TYY77r?   c                6   | j                  d      }t        |t        j                  j                        r|S t
        j                  j                  | j                        d   }| j                  x}r'd|v ri nd|j                  i} t        |      di ||}|S )NrU   r   regionrF   )r[  r   rB   rj   
FileSystempa_fsfrom_urir9   r   r  r7   )rS   fs_inputrj   r   r  s        r=   rj   zReadParquetPyarrowFS.fsd  s    <<-h 0 01O!!**4995a8B"&"6"666'?:299@UT"X::/:Ir?   c                    | j                         }t        j                  | j                  d         |   }|D cg c]  }t        t        |          }}t        |      S c c}w )a  Return an approximation of a single files statistics.

        This is determined by sampling a few files and averaging their statistics.

        Fields
        ------
        num_rows: avg
        num_row_groups: avg
        serialized_size: avg
        columns: list
            A list of all column statistics where individual fields are also
            averaged.


        Example
        -------
        {
            'num_rows': 1991129,
            'num_row_groups': 2.3333333333333335,
            'serialized_size': 6256.666666666667,
            'total_byte_size': 118030095,
            'columns': [
                {'total_compressed_size': 6284162.333333333,
                'total_uncompressed_size': 6347380.333333333,
                'path_in_schema': 'l_orderkey'},
                {'total_compressed_size': 9423516.333333334,
                'total_uncompressed_size': 9423063.333333334,
                'path_in_schema': 'l_partkey'},
                {'total_compressed_size': 9405796.666666666,
                'total_uncompressed_size': 9405346.666666666,
                'path_in_schema': 'l_suppkey'},
                ...
            ]
        }

        Returns
        -------
        dict
        	all_files)sample_statisticsnparrayrn  _STATS_CACHEr.   _combine_stats)rS   idxsfiles_to_considerfinfostatss        r=   approx_statisticsz&ReadParquetPyarrowFS.approx_statisticsq  s_    P %%'HHT%7%7%DEdK<MN5huo.NNe$$ Os   A c           	     $   || j                   d   }|| j                  }t        j                  j	                  ddi      5  t        t        j                  t        ||                  }d d d        D ]  \  }}|t        |<    y # 1 sw Y   xY w)Nr  z,distributed.diagnostics.computations.nframesr   )	rn  fragments_unsortedr  r  r   r   r  _collect_statistics_planr  )rS   r   	fragmentstoken_statstokenr  s         r=   load_statisticsz$ReadParquetPyarrowFS.load_statistics  s    =&&{3E//I [[__LaPQ 	!5eYGHK	 ( 	(LE5"'L	(		 	s   )BBc                   | j                   }t        j                  | j                  d         }t        j                  d dd      } ||      }|j                         }t        |      }t        ||z  d      }g }	g }
g }t        d||      D ]I  }||   }||k  r|nd}|j                  |       |	j                  ||          |
j                  ||          K | j                  |	|
       |S )a  Sample statistics from the dataset.

        Sample N file statistics from the dataset. The files are chosen by
        sorting all files based on their binary file size and picking
        equidistant sampling points.

        In the special case of n=3 this corresponds to min/median/max.

        Returns
        -------
        ixs: list[int]
            The indices of files that were sampled
        r  c                    | j                   S r6   )r:   r   s    r=   r   z8ReadParquetPyarrowFS.sample_statistics.<locals>.<lambda>  s
    !&& r?   rZ   )ninnoutr   )r  r  r  rn  
frompyfuncargsortr   rz  r  r   r  )rS   nfragsfinfosgetsizefinfo_size_arrfinfo_argsortnfragsstepsizefinfos_sampledfrags_samplesixsr,  sort_ixs                 r=   r  z&ReadParquetPyarrowFS.sample_statistics  s     ''$,,[9:-- 0aa@ &..0Uv{A&q&(+ 	1A#A&G!(6!1gqGJJw!!&/2  w0	1 	^];
r?   c                    | j                          | j                  d   D cg c]  }t        t        |          c}S c c}w )zParquet statstics for every file in the dataset.
        The statistics do not include all the metadata that is stored in the
        file but only a subset. See also `_extract_stats`.
        r  )r  rn  r  r.   )rS   r  s     r=   raw_statisticsz#ReadParquetPyarrowFS.raw_statistics  sA     	7;7I7I+7V
.3L%)
 	
 
s   >c                ,    t        | j                        S )a  Aggregate statistics for every partition in the dataset.

        These statistics aggregated the row group statistics to partition level
        such that min/max/total_compressed_size/etc. corresponds to the entire
        partition instead of individual row groups.
        )_aggregate_statistics_to_filer  rv   s    r=   aggregated_statisticsz*ReadParquetPyarrowFS.aggregated_statistics  s     -T-@-@AAr?   c                T    | j                   st        d | j                  D              S y )Nc              3  &   K   | ]	  }|d      yw)r   NrF   ).0r  s     r=   	<genexpr>z4ReadParquetPyarrowFS._get_lengths.<locals>.<genexpr>  s     Suz*S   )rW  tupler  rv   s    r=   r]  z!ReadParquetPyarrowFS._get_lengths  s%    ||S8R8RSSS r?   c                   | j                  d      x}r|S i }| j                  }	 g }|D ]  }t        j                  |d      }|j	                  | j
                  j                  |      D cg c]5  }|j                  t        j
                  j                  j                  k(  r|7 c}        	 d}d }	d }
| j                  st        |d       }|d   j                  j!                  d      r|j#                         }t%        |      }	t'        j(                  |j*                  | j
                  	      }
d|d
<   |
j-                         D cg c]  }t/        || j
                         c}|d<   d gt1        |d         z  |d<   |	(t%        |      }	|D cg c]  }|j2                   c}|d<   |	|d<   |
dd lm}  |j8                  |D cg c]  }|j*                   c}| j
                  | j:                        }
d|d
<   |
j<                  D cg c]  }t/        || j
                         c}|d<   ||d<   |
j>                  |d<   |
j>                  jA                         jC                         |d<   || _"        |S c c}w # t        t        f$ r2 |D cg c]  }| j
                  j                  |       nc c}w }}Y w xY wc c}w c c}w c c}w c c}w )Nrp  Tr   Fc                8    | j                   j                  d      S )N	_metadata)	base_nameendswithr   s    r=   r   z4ReadParquetPyarrowFS._dataset_info.<locals>.<lambda>  s    )=)=k)J r?   r   rL   r  rU   using_metadata_filer  
file_sizesrh  r   )rU   rW  r  r   ru  )#r[  r  r  FileSelectorextendrj   get_file_infor7   rB   FileTypeFileNotADirectoryErrorFileNotFoundErrorr  sortedr  r  r   r.   ra   parquet_datasetr9   get_fragmentsrK   r   r:   pyarrow.parquetparquetParquetDatasetrW  r  r   empty_table	to_pandasrp  )rS   rvdataset_infopath_normalizedr  r9   dataset_selectorr  metadata_filerh  datasetfragfipqs                 r=   rn  z"ReadParquetPyarrowFS._dataset_info  s   34424I..
	R I' #(#5#5dd#K    &*WW%:%:;K%L! ::)<)<<  ((JI }&&//< )#M2//!&&#ww 7;23?F?T?T?V-7;OD$''2-[) /3Vc,{:S6T-T\*	*H<E)Fb"'')FL&#+Z ?('b'' $--R-77G 38L./;B;L;L)37dgg.)L% )2L%!(X$+NN$>$>$@$J$J$L[!#/ o #$56 	RAPQ..t4QQIQ	R(- *G .
)sH   AI= +:I8
%	I= 
KK	KK8I= =J>"J32J>=J>c                    | j                   r8| j                  ,| j                  j                  }t        | j                  |      S t        dgt        | j                        dz   z        dfS )a  If enabled, compute the divisions from the collected statistics.
        If divisions are possible to set, the second argument will be the
        argsort of the fragments such that the divisions are correct.

        Returns
        -------
        divisions
        argsort
        NrZ   )r  r   r   _divisions_from_statisticsr  r  r   r  )rS   
index_names     r=   _division_from_statsz)ReadParquetPyarrowFS._division_from_stats6  s]     ##

(>J-d.H.H*UUdVs4#:#:;a?@A4GGr?   c                @    t        d | j                  d   D              S )z:Whether all statistics have been fetched from remote storec              3  >   K   | ]  }t        |      t        v   y wr6   )r.   r  )r  r  s     r=   r  z<ReadParquetPyarrowFS.all_statistics_known.<locals>.<genexpr>H  s      
05HUO|+
s   r  )allrn  rv   s    r=   all_statistics_knownz)ReadParquetPyarrowFS.all_statistics_knownF  s'     
9=9K9KK9X
 
 	
r?   c                     | j                   d   S r   r  rv   s    r=   _fragment_sort_indexz)ReadParquetPyarrowFS._fragment_sort_indexL      ((++r?   c                     | j                   d   S r   r  rv   s    r=   r   zReadParquetPyarrowFS._divisionsO  r  r?   c                z    | j                   dk\  ry t        |t              ry |j                  | t        |             S r   )r|  r   r!   
substitute)rS   rR  s     r=   _tune_upzReadParquetPyarrowFS._tune_upR  s8    **a/fn-  ~d';<<r?   c                t    | j                         | j                  | j                            S | j                  S )zReturn all fragments in the dataset after filtering in the order as
        expected by the divisions.

        See also
        --------
        ReadParquetPyarrowFS.fragments_unsorted
        )r  r  rv   s    r=   r  zReadParquetPyarrowFS.fragmentsY  s9     $$&2**4+D+D+FGG&&&r?   c                   | j                   t        j                  | j                         }| j                  d   D cg c]  }|j                   }}t
        j                  j                  || j                  d   |d   j                  | j                        }t        j                  t        |j                  |                  S t        j                  | j                  d   D cg c]  }|j                   c}      S c c}w c c}w )zAll fragments in the dataset after filtering.

        No guarantees on ordering. This is ordered as the files are listed.

        See also
        --------
        ReadParquetPyarrowFS.fragments
        r  r   r   )r_   rU   )filter)rW  r  filters_to_expressionrn  rT   r3  r  FileSystemDatasetr_   rj   r  r  r   r  )rS   filter_expressionr  r  r   s        r=   r  z'ReadParquetPyarrowFS.fragments_unsortedf  s     <<# " 8 8 F/3/A/A+/NOtT]]OEO22""8,Qx77	 3 B 88D!1!19J!1!KLMMxx43E3Ek3RS4STT P Ts   D'Dc                2   | j                         }d}d}| j                  d      xs | j                  }|d   D ]  }||d   z  }|d   |v s||d   z  } t        t        j
                  j                  d            }t        ||      }t        ||z  d      S )Nr   r   total_uncompressed_sizepath_in_schemaz(dataframe.parquet.minimum-partition-sizery  )r  r[  r   r3   r  r  ro   rz  )rS   approx_statstotal_uncompressedafter_projectioncol_opr_  min_sizes          r=   r|  z/ReadParquetPyarrowFS._fusion_compression_factor|  s    --/i(8DLL	* 	CC#&?"@@#$. C(A$BB 	C
 KKOOFG
 !!3X>#&88%@@r?   c                b   | j                   j                         }| j                  j                  }| j                  | j                  j                  }| j                  d   j                         }|r(|t        |j                        }|j                  |       t        |t        j                  t        d t        j                  t        | j                  |   | j                        | j                   ||      || j"                  | j$                  j'                  d      | j(                  d      S )Nr   r  )fragment_wrapperrW  r   r   rG  T)r  r  rG  rH  _data_producer)r   copyr   r   rn  remove_metadatar   r  r   r
   r  _table_to_pandas_fragment_to_tablerK   r  rj   rW  r  r  ro   rH  )rS   r   r   r   r  r   s         r=   _filtered_taskz#ReadParquetPyarrowFS._filtered_task  s    ,,##%ZZ__
::!J##H-==?v||,NN:&  11$77!0NN5)dgg" 	 " 00++///:$($@$@#
 	
r?   c                X   t                t        | t              r| j                  }n| }t        |t              rt        j                  |      }|j                  |||dt        j                  j                  dt        j                  t        d      t        d                  d      S )Ni Tz4 MiBz	32.00 MiB)hole_size_limitrange_size_limit)
pre_buffercache_options)r   r   r  
batch_sizefragment_scan_optionsuse_threads)rG   r   rK   rT   r   r  r  to_tablerB   r  ParquetFragmentScanOptionsCacheOptionsr3   )r  rW  r   r   rT   s        r=   r  z'ReadParquetPyarrowFS._fragment_to_table  s    !&8'00H'Hgt$..w7G   ""$**"G"G oo$/$8%0%= #H # ) ! 
 	
r?   c           	     &   |i }n|j                         }|j                  dd         | j                  d	t        |j                  dd       ||      |j	                  dd      |j	                  dd      d|ddi}||j                  |      }|S )
Nignore_metadatatypes_mapper)rF  rG  rH  r  Fself_destructT)r  r  r  rF   )r  r   r  rI  ro   	set_index)tabler  r  rG  rH  r  s         r=   r  z%ReadParquetPyarrowFS._table_to_pandas  s     " O-224O-t4U__ 

/"1"5"5nd"K+(?
 (++M5A)--otD

 

 !

 !j)B	r?   )r|   dictr{   )   r|   boolr   )r8   r~   r   r   	_defaultsr~  r  r   r  rj   r  r  r  r  r  r]  rn  r  r  r  r   r  r  r   r  r|  r  staticmethodr  r  rF   r?   r=   r  r  ;  s`   K$  $$#'#I  8 8 
 
+%Z( D 
 
 B BT
 K KZ H H
,,= 
' 
' U U* A A 
@ 
 
>  r?   r  c                      e Zd ZdZg dZi ddddddddddd	d
dd
dddddddddddddddddidddd
dddZed        Zd Zed        Z	d%d Z
ed!        Zed"        Zd&d#Zed$        Zy)'ReadParquetFSSpeczRead a parquet dataset)r9   r   rW  r  r   r   r  r  metadata_task_sizesplit_row_groups	blocksizeaggregate_filesparquet_file_extensionrU   r   r  r  rV  rp  _pq_length_statsr   NrW  r  r   r   r  Fr  r!  r"  inferr#  defaultr$  r%  )z.parqz.parquetz.pqrU   fsspecr   r3  r  rG  r  rV  )rp  r&  c                ^    | j                  d      }t        |t              rt        |      S |S )Nr   )r[  r   r   r&   )rS   _engines     r=   r   zReadParquetFSSpec.engine  s*    ,,x(gs#g&&r?   c                     | j                   d   S )Nr   )_planrv   s    r=   r   zReadParquetFSSpec._divisions  s    zz+&&r?   c                D   | j                  d      x}r|S t        di | j                  xs i \  }}}}| j                  j	                  | j
                  | j                  ||| j                        \  }}}}||d<   t        |t              }d}| j                  d      }	|	d}|	rt        |	t              r|	g}
n|	}
| j                  }| j                  dv rC| j                  dk(  r6t        | j                  d	      r| j                  j                         }nd
}nd }||| j                   |
| j"                  | j$                  | j                  || j&                  | j(                  | j*                  | j,                  ||d|f} | j                  j.                  | }g }g }|d   rVt        | j
                  t0              rt3        d | j
                  D              g}n-| j
                  |j4                  z   dz   g}n|d   j6                  }|D ]"  }|j9                  |j;                  |             $ t=        |      |d<   | j                  j?                  |      }|d   }
t        |
t              r|
gn|
}
tA        ||
d |      \  }}
}|jB                  jD                  tF        k(  rd |jB                  _"        ||d<   |
|d<   ||d<   | j"                  |d<   t=        |      }|tH        vr| j                  jK                  |      \  }}}tM        ||      \  }}tO        |||      \  }}| j$                  r|rtQ        ||| j$                        \  }}tS        ||tU        |            }d}tU        |      dk  rd}|g}d}tW        |       |||||dtH        |<   tH        |   |d<   || _,        |S )Nrp  r   r  Fr   T)r'  adaptiver(  default_blocksize128MiB)readr  has_metadata_filec              3  D   K   | ]  }|j                  d       s|  yw)r  N)r  )r  r9   s     r=   r  z2ReadParquetFSSpec._dataset_info.<locals>.<genexpr>g  s     R$t}}[7QRs     r  r   rh  ru  all_columnsr     r{   )r   parts
statisticsr   common_kwargsplanrF   )-r[  r)   r  r   r   r9   rU   r   r  r2   r   r   r#  r"  r   r0  r  r  rW  r$  r  r!  r%  _collect_dataset_infor   nextsepr   r   rh  r.   _create_dd_metar'   r   r   r  r   _construct_collection_plan_align_statistics_aggregate_row_groupsr%   _calculate_divisionsr   r   rp  )rS   r  r   r`   r   other_optionsrj   pathsauto_index_allowedindex_operandr   r#  argsr  rh  files_for_checksumfiler   r5  dataset_tokenr7  r  r9  r   r   s                            r=   rn  zReadParquetFSSpec._dataset_info   s   34424I  64;;#4"6	
 9=8V8VIIOO  9
5E?$5 ->()u"23"W- !%Zs;"OE!ENN	  $99~~*4;;(;< $ = = ?I (I I OO$$LL!!  %%##''$*  
& 9t{{88$?+,$))T*R$))RR&" '+ii"&&&8;&F%G"!-d!3!9!9& 	/D OOBKK-.		/
 $,H#5Z  {{**<8W%%eS1u#4%1$
 e[ ::??j("DJJO$([! %W&1]#.2.F.F*+ .,*.++*P*P+'E5-
 -UE:LE5 1|LLE5 ||,UE4<<Hu -UL#e*MIE9~!(	 /#&!.+L'  ,M:V#/ r?   c                    t        || j                  | j                  d   |   d      }| j                  r)t        |t        j
                  || j                  d         S |S )Nr7  T)r  r   )r
   _io_funcr-  rV  operatorgetitemr   )rS   r   r   tsks       r=   r  z ReadParquetFSSpec._filtered_task  sP    4

7(;E(BSWX<<h..T\\!_EE
r?   c                    | j                   d   rt        S | j                  }t        | j                  |d   |d   | j
                  |d   |d   d   i | j                   d         S )Nr   rj   ru  r   r  rG  r9  )r-  r	   rn  r"   r   r   )rS   r  s     r=   rL  zReadParquetFSSpec._io_func  sm    ::gO))%KK%LL!"?3JJ'	
 		
r?   c                     | j                   d   S )Nr:  rm  rv   s    r=   r-  zReadParquetFSSpec._plan  s    !!&))r?   c                l      j                   s't         fdt         j                        D              S y)z7Return known partition lengths using parquet statisticsc              3  \   K   | ]#  \  }}j                   r|j                  v r| % y wr6   	_filteredr  )r  r,  lengthrS   s      r=   r  z1ReadParquetFSSpec._get_lengths.<locals>.<genexpr>  s1      Av~~d.>.>)> s   ),N)rW  r  	enumerater&  rv   s   `r=   r]  zReadParquetFSSpec._get_lengths  s4    || !*4+@+@!A  
 r?   c                      j                   d   r*t         fdt         j                   d         D              S t        d t               D              S )z6Ensure that partition-length statistics are up to dater8  c              3  b   K   | ]&  \  }}j                   r|j                  v r|d     ( ywnum-rowsNrT  )r  r,  statrS   s      r=   r  z5ReadParquetFSSpec._pq_length_stats.<locals>.<genexpr>  s6      At~~d.>.>)> Z s   ,/c              3  &   K   | ]	  }|d      ywrZ  rF   )r  r\  s     r=   r  z5ReadParquetFSSpec._pq_length_stats.<locals>.<genexpr>  s     Sdj)Sr  )r-  r  rW  _collect_pq_statisticsrv   s   `r=   r&  z"ReadParquetFSSpec._pq_length_stats  sR     ::l# (L)AB   S6LT6RSSSr?   r   )r|   ztuple | None)r8   r~   r   __doc__r   r  r   r   r   rn  r  rL  r-  r]  r   r&  rF   r?   r=   r   r     sf    K,44 	d 		
 	4 	u 	 	d 	G 	Y 	4 	!"> 	h 	) 	?D)  	t!" 	5#$  $ 'I,  ' F FP 
 
 * * T Tr?   r   c                    | dk(  rt        d      | Q| t        |      j                  d      d   dk(  s#t        j                  j                  dd      dk(  r
ddlm} |} | S d	} | S )
Nfastparquetz#Fastparquet engine is not supportedr   r   cudfzdataframe.backendpandas)
CudfEnginer3  )rw  r4   splitr  r  ro   dask_cudf.io.parquetrd  )r   r   rd  s      r=   r   r     so    !"GHH~$!5!5c!:1!=!G[[__0(;vE7F M FMr?   c           
         |rt        |       t        |      k7  rg }|rCt        t        t        | |      D cg c]  \  }}|d   dkD  r||f c}}       }|xs g g g\  } }| |fS c c}}w )Nr[  r   )r   r   zip)r7  r8  partr  results        r=   r@  r@    s     c%jC
O3
 (+5*'=#eZ(1, 5M
 #.r2hz*s   A%c                    |d   du r|d   nd }|d   }|d   }|d   }|r%|s|r!t        |      dkD  rt        | |||||      \  } }| |fS )Nr"  Tr#  rj   aggregation_depthrZ   )r   r$   )r7  r8  r  r#  r"  rj   rl  s          r=   rA  rA    s     &22D%E%M[!SW  $$67	d	B$%89)c2B.Ca.G 4z9.>DU!E: *r?   c                    d }| rc|j                  dd      rQ|j                  dd       }|d   }|rt        |      dk(  r|nd }|dur"|r t        | |      D ]  }|d   |v s|d   } n |xs d	|dz   z  S )
Ngather_statisticsFr  r   rZ   r   r   r   r6   )ro   r   r(   )r8  r  r   r   r  r   process_columnssorted_column_infos           r=   rB  rB    s    Il&&':EB*../DdKW%#(SZ1_%$u,/&4O' " &f-6 2; ?I 3;?33r?   c                      e Zd ZU dZ G d de      Z G d de      Zded<   ddZdd	Z	dd
Z
edd       ZddZedd       Zy)rO  z/Manage filters in Disjunctive Normal Form (DNF)c                      e Zd ZdZddZy)_DNF._OrzFrozen set of disjunctionsc                    d }| D cg c],  }t        |d      r ||j                               n ||      . c}S c c}w )Nc                t    t        | t              r&| r$t        | d   t        t        f      rt        |       S | gS r   )r   r  r   )vals    r=   _maybe_listz+_DNF._Or.to_list_tuple.<locals>._maybe_list6  s1    c5)cjQ%QU6W9$ur?   r\  r   r\  )rS   rw  rv  s      r=   r\  z_DNF._Or.to_list_tuple4  sR        sO4   1 1 34$S)*  s   1<Nr|   r   r8   r~   r   r_  r\  rF   r?   r=   _Orrs  1  s
    (	r?   r{  c                      e Zd ZdZddZy)	_DNF._AndzFrozen set of conjunctionsc                &    t        d | D              S )Nc              3  X   K   | ]"  }t        |d       r|j                         n| $ yw)r\  Nrx  )r  rv  s     r=   r  z*_DNF._And.to_list_tuple.<locals>.<genexpr>I  s/       (/sO'D!!##Ms   (*)r  rv   s    r=   r\  z_DNF._And.to_list_tupleG  s       r?   Nry  rz  rF   r?   r=   _Andr}  D  s
    (	r?   r  z_And | _Or | NonerQ  c                0    | j                  |      | _        y r6   )	normalizerQ  )rS   rW  s     r=   rX   z_DNF.__init__P  s    w/r?   c                6    | j                   j                         S r6   )rQ  r\  rv   s    r=   r\  z_DNF.to_list_tupleS  s    }}**,,r?   c                ,    t        | j                        S r6   )r  rQ  rv   s    r=   __bool__z_DNF.__bool__V  s    DMM""r?   c                    |sd}|S t        |t              rHt        |d   t              r|n|g} j                  |D cg c]  } j                  |       c}      }|S t        |t              rBt        |d   t              rt        d       j                   j                  |f      f      }|S t        | j                        r j                   fd|D              }|S t        | j                        rpg }t        j                  |D cg c]  } j                  |       c} D ])  }|j                   j                  d |D                     +  j                  |      }|S t        t        |       d      c c}w c c}w )z9Convert raw filters to the `_Or(_And)` DNF representationNr   z0filters must be List[Tuple] or List[List[Tuple]]c              3  N   K   | ]  }j                  |      D ]  }|   y wr6   )r  )r  eseclss      r=   r  z!_DNF.normalize.<locals>.<genexpr>f  s&     LA3==;KLRRLRLs   "%c              3  .   K   | ]  }|D ]  }|   y wr6   rF   )r  r  r  s      r=   r  z!_DNF.normalize.<locals>.<genexpr>j  s     %@Qa%@b%@b%@s   z not a supported type for _DNF)r   r   r{  r  r  r   	itertoolsproductr  r   r7   )r  rW  rj  conjunctionsconjunctiontotalr  r'  s   `       r=   r  z_DNF.normalizeY  sl    F" ! &&0T&B7	LWW|Tchh{3TUF  ''!*e, RSSWWchhz245F  )WWLLLF  *E&&7(Kaq)9(KL BSXX%@!%@@ABWWU^F  tG}o-KLMM U )Ls   FFc                L   t        |t              st        |      }t        |t              sJ | j                  |j                  }t        |      S |j                  | j                  }t        |      S | j                  | j                  |j                  g      }t        |      S )z Combine with another _DNF object)r   rO  rQ  r  )rS   otherrj  s      r=   rZ  z_DNF.combinep  s    %&KE%&&&== ^^F
 F|	 ^^#]]F F| YYu~~>?FF|r?   c           	        d }t        |t        t        t        t        t
        t        f      rt        |j                  t              st        |j                  t              rn|j                  j                  j                  |j                  k(  rA|j                  }|j                  j                  d   }|j                  }|||f}t-        |      S t        |j                  t              smt        |j                  t              rR|j                  j                  j                  |j                  k(  r$t        t        t        t        t        t        t        t        i}|}|j                  ||      j                  }|j                  j                  d   }|j                  }|||f}t-        |      S t        |t         t"        f      r| j%                  ||j                        j&                  }| j%                  ||j                        j&                  }	|rC|	rAt        |t               r| j)                  ||	g      }t-        |      S | j+                  ||	g      }t-        |      S r   )r   r   r   r   r   r   r   rightr   leftr   r   r   _operator_reprr   ro   r   r   rP  rQ  r  r{  rO  )
r  pq_exprpredicate_exprrQ  rr  columnvalueflipr  r  s
             r=   rP  z_DNF.extract_pq_filters}  s   nr2r2r2&>?~33T:~22J?"''--33w}}D#22',,44Q7&,,"B.. H~+ ~22D9~22J?"''--33w}}D BBBB7#XXb"%44'--55a8&++"B. H~ b	2))'>3F3FGPPD**7N4H4HIRREnc2"xxu6H H~  #wwe}5HH~r?   N)rW   _And | _Or | list | tuple | Noner|   r}   ry  r  )rW  r  )r  z'_DNF | _And | _Or | list | tuple | Noner|   rO  )r  r   r  r   r|   rO  )r8   r~   r   r_  	frozensetr{  r  __annotations__rX   r\  r  classmethodr  rZ  rP  rF   r?   r=   rO  rO  .  sa    9i &y   0-#  , " "r?   rO  c                   |rt        |t              st        dt        |       d      | j                  j
                  j                  ht        | j                        z  }t        |      j                  |      st        d| d|       | j                  d   rg S | j                  j                  }t        | j                  d         D cg c]!  \  }}| j                  r|| j                  v r|# }}}t!        t#        |      rdnd      }|rt%        t              }|D ]D  }t        |t&              r|gn|D ]*  }	|	j)                  d	      d
   }
||
   j+                  |	       , F t        |j-                               }t/        t0              }t3        j4                  t7        d
t9        |      |      D cg c]=  } |t        t;        j<                  ||||z    D cg c]  }||   	 c}       ||      ? c}}      d
   }t        t;        j<                  |       S t1        |||      S c c}}w c c}w c c}}w )z+Collect Parquet statistic for dataset pathsz#Expected columns to be a list, got r   zcolumns=z must be a subset of r   r7  F   piecer   r   )r   r   r   r7   r   r   r   r   r   issubsetr-  rL  rj   rW  rU  r  r   r*   r   r  ro   r   r   r+   _read_partition_stats_groupr  r  r  r   r  chain)r   r   allowedrj   r,  ri  r7  parallelgroupspr9   
group_keysfunckrj  s                  r=   r^  r^    s*    '4(B4=/QRSTT::##(()C,==7|$$W-xy0EgYOPPzz'	 
		B !G!45At~~d&6&6!6 	E  L,5"5H
 T" 	'D)$5dV4 'uuW~a(t##A&'	' &++-(
 23 q#j/8<  !1;AH1MNAfQiN
 #
  IOOV,-- +5"gFFO6 O	s   &H1;%H<
 H7,H<
7H<
c                F    dd}| D cg c]  } ||||       c}S c c}w )z)Parse the statistics for a group of filesc                   t        | t              s| g} i }d}|xs g }| D ]  }|d   }|d   }|d   d gk(  rd n|d   }|j                  |d      5 }	t        j                  |	      j
                  }
d d d        |t        t        
j                              }|D ]  }
j                  |      }||j                  z  }t        |j                        D ]  }|j                  |      }|j                  }||v s%|j                  s2|j                  j                  sI||v rYt        ||   d   |j                  j                        ||   d<   t!        ||   d   |j                  j                         ||   d<   |j                  j                  |j                  j                   d||<      |j#                         D cg c]  }|||   d   ||   d   d	 }}||d
S # 1 sw Y   uxY wc c}w )Nr   r  rZ   none)default_cacheminrz  r  rz  )r   r  rz  )r[  r   )r   r   openr  ParquetFilemetadatar  r   	row_groupr   num_columnsr  r  r8  has_min_maxr  rz  r   )ri  rj   r   column_statsr   r  r  r9   
row_groupsfmdrgr  r,  r_  r   column_stats_lists                    r=   _read_partition_statsz:_read_partition_stats_group.<locals>._read_partition_stats  s    $%6D-R 	"AgJE8D!&qdV!3qJV4 0^^A&//0!!%(9(9":;
  "LL,	I...y445 "A#**1-C--Dw>>cnn.H.H#|3<?$0$6u$=s~~?Q?Q="T 25 9 =@$0$6u$=s~~?Q?Q="T 25 9
 ,/>>+=+=+.>>+=+=6"T 2""	"J %))+
 	 #D)%0#D)%0
 
 %1BCCG0 06
s    G$G1$G.	r   r6   rF   )r7  rj   r   r  ri  s        r=   r  r    s+    1Dh JOO!$G<OOOs   c                    t        | t        t        f      s| g} g }| D ]T  }ddg}|D ])  }|j                  |d      }t	        |      dkD  s$|d   } n |j                  |j                  d             V |S )Nz://z::rZ   r   )r   r   r  re  r   r   r   )r9   rj  r  protocol_separatorsr=  re  s         r=   r  r    s    dT5M*vF %$dm& 	CGGCOE5zA~!H		
 	ahhsm$% Mr?   c                   d}| d   }t        |d         D ]  \  }}|d   |k(  s|} n t        d| d      d }g }| D ]1  }|d   |   d   d   }	|d   |   d   d	   }
|j                  |	|
f       3 g }t        j                  |      }|j                         }||   }|j                  st        d gt        |       d
z   z        d fS |D ]  \  }	}
|j                  |	       |
} |j                  |       t        |      |fS )NrL   r   r   r  zIndex column z not found in statisticsr8  r  rz  rZ   )	rW  r   r   r4  Seriesr  is_monotonic_increasingr  r   )aggregated_statsr  col_ixpeak_rgixr_  last_maxminmax
file_statsfile_minfile_maxr   r  sorted_minmaxs                 r=   r  r  .  sZ   Fq!GWY/0 
C J.F

 J<'?@
 	
 HF& ,
i(0>uEi(0>uEx*+	,
 IYYvFnnG7OM00dVs#34q89:D@@+ (" XW$$r?   c                V   g d}g d}g d}g d}i }|D ]
  }| |   ||<    g x|d<   }| d   D ]x  }i }	|j                  |	       |D ]
  }||   |	|<    g |	d<   |d   D ]G  }
i }|	d   j                  |       |D ]
  }|
|   ||<    i |d<   |
d   3|D ]  }|
d   |   |d   |<    I z |S )zTake the raw file statistics as returned by pyarrow (as a dict) and
    filter it to what we care about. The full stats are a bit too verbose and we
    don't need all of it.)r   r   r   )r   total_byte_sizesorting_columns)
num_valuestotal_compressed_sizer  r  )r  rz  
null_countr  distinct_countr  r   r8  )r   )originalfile_level_statsrg_statscol_meta	col_statsr/  r   rgsr  rg_outr_  col_outs               r=   _extract_statsr  N  s1   
 IH
HI C  #TND	#  C|$ F

6 	$Dd8F4L	$yi= 		FCG9$$W-  * #D	*$&GL!< (! F.1,.?.E%d+F		FF" Jr?   c                    i }| D ]9  }|j                         D ]$  \  }}||vr|g||<   ||   j                  |       & ; i }|j                         D ]$  \  }}|j                  |      }|s ||      ||<   & |S r6   )itemsr   ro   )dicts	agg_funcsrj  dr  vresult2aggs           r=   
_agg_dictsr    s    F $GGI 	$DAqCq	q	  #		$$ G  1mmAQGAJ  Nr?   c                    g }d}	 g }|j                  |       	 | D ]  }|j                  ||           	 |dz  }6# t        $ r |j                          Y nw xY w|D cg c]  }t        ||       c}S c c}w )Nr   rZ   )r   
IndexErrorr   r  )colsagg_colsrZ  r,  innerr_  r'  s          r=   _aggregate_columnsr    s    G	A
u	 %SV$%
 	
Q   	KKM	 .55Jq(#555s   < AAA5c                `    | D cg c]  }||	 } }t        |       dkD  r ||       S d S c c}w r   )r   )r   r  ys      r=   _get_min_max_valuer    s7    'q'A'!fqj47*d* 	(s   ++c                4   d d d}t         t         t        t        |      d d}t         t         t        t        |      d}g }| D ]M  }|j	                         }|j                  |       |j                  t        |j                  d	      |             O |S )
z'Aggregate RG information to file level.c                "    t        | t              S r6   )r  r  r   s    r=   r   z/_aggregate_statistics_to_file.<locals>.<lambda>      +As3 r?   c                "    t        | t              S r6   )r  rz  r   s    r=   r   z/_aggregate_statistics_to_file.<locals>.<lambda>  r  r?   r  )r  c                4    t        |       j                         S r6   r   r   r   s    r=   r   z/_aggregate_statistics_to_file.<locals>.<lambda>      CFJJL r?   )r  r  r8  r  r  )r   r  r   r  )r^  r   r  r  r  r   updater   )r  	agg_statsr  agg_funcr  	file_stats         r=   r  r    s     43I
 "%#&jI>0	H -AH
  L	NN$		*IMM,$?JK	L
 r?   c                    t         j                  d        t        j                  t        fd| D              d      d   S )Nc                    t        | d   t              r| d   j                  | d<   | d   t        | d   j                  j                               fS )NrZ   r   )r   rK   rT   r  r  to_dict)token_fragments    r=   _collect_statisticsz/_gather_statistics.<locals>._collect_statistics  sO    nQ'9 .q 1 : :N1a .1B1K1K1S1S1U"VVVr?   c              3  .   K   | ]  } |        y wr6   rF   )r  r  r  s     r=   r  z%_gather_statistics.<locals>.<genexpr>  s     94 &9s   	threading)	schedulerr   )r  r+   r  r   )r  r  s    @r=   _gather_statisticsr    sF    	\\W W
 <<9599[	 	r?   c                    g }t        | |      D ],  \  }}t        |      x}t        vs|j                  ||f       . t        j
                  j                  d|      D cg c]  }t        |       c}S c c}w )zHCollect statistics for a list of files and their corresponding fragments   )rh  r.   r  r   r  	itertoolzpartition_allr  )
file_infosr  
to_collectr  r  r  batchs          r=   r  r    s~     J:y1 -te_$E\9udm,-
 __222zB 	5!  s    A5c                   t         j                  t         j                  d d}t        t        |       t         j                  t         j                  t         j                  t         j                  t	        t
        |      d      S )zCombine multiple file-level statistics into a single dict of metrics that
    represent the average values of the parquet statisticsc                4    t        |       j                         S r6   r  r   s    r=   r   z _combine_stats.<locals>.<lambda>  r  r?   )r  r  r  r  )r   r   r   r  r   )r8  meanr  r  r   r  )r  r  s     r=   r  r    s_     ",#-??0H
 %e,"(oo))1HE	
	 	r?   )snappyTFFFNNNNTNr'  NNNr{   r6   )r   r   r   zlist | Noner|   zlist[dict] | None)
__future__r   r  r  rM  rD   re   r8  r  r   abcr   collectionsr   	functoolsr   r   numpyr  rc  r4  r3  rB   pyarrow.computer  r]   pyarrow.datasetr  ra   
pyarrow.fsrj   r  r  r  r  tlzr  fsspec.utilsr   r	   r  dask._task_specr
   r   	dask.corer   dask.dataframe.dask_expr._exprr   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   $dask.dataframe.dask_expr._reductionsr   dask.dataframe.dask_expr._utilr   dask.dataframe.dask_expr.ior   r    dask.dataframe.dask_expr.io.ior!   dask.dataframe.io.parquet.corer"   r#   r$   r%   r&   r'   r(   dask.dataframe.io.parquet.utilsr)   dask.dataframe.io.utilsr*   dask.delayedr+   dask.tokenizer,   r-   r.   dask.typingr/   
dask.utilsr0   r1   r2   r3   r4   registerFileInfor>   rA   rG   r  int8	Int8Dtypeint16
Int16Dtypeint32
Int32Dtypeint64
Int64Dtypeuint8
UInt8Dtypeuint16UInt16Dtypeuint32UInt32Dtypeuint64UInt64Dtypebool_BooleanDtyper   r5  float32Float32Dtypefloat64Float64DtyperE  r  r   r   rK   r   Datasetr   
FileFormatr   Schemar   ParquetSchemar   FileMetaDatar   r   r   r   r0  rI  r   r  r   r   r@  rA  rB  rO  r^  r  r  r  r  r  r  r  r  r  r  r  rF   r?   r=   <module>rF     s   "    	      # .          '   )     $ 5 ; G 9   @ 0   L L  O O "%%..) *   BGGI|r||~BHHJBHHJBHHJBHHJBIIK!BIIK!BIIK!BHHJ!BIIK!BJJL/"//#BJJL/"//#"  #
 VJ VJr% %--(! )! %**+ , "))$ % "**+" ," "//*
 +
$
 
>VI V.St SD #K\
,$^j
$k j
Zi; iXsT sTv"&"4.r rv /39G
9G +9G9Gx7Pt %@.b 6 +
6 		 		
r?   