
    bih                        d dl mZ d dlZd dlZd dlZd dlmZm	Z	m
Z
 d dlmZ d dlmZ d dlmZ d dlmZmZ  G d d	      Zd
 ZddZddZd Zd Zd Zd Z	 	 	 	 	 	 ddZd Zd ZddZy)    )annotationsN)expand_paths_if_neededget_fs_token_pathsstringify_path)AbstractFileSystem)config)_is_local_fs)natural_sort_keyparse_bytesc                      e Zd ZdZed        Zed        Ze	 dd       Ze	 	 	 	 dd       Zed        Z	edd       Z
ed	        Zed
        Zy)Enginez8The API necessary to provide a new Parquet reader/writerc                   ||j                  dd      }nd|v rt        d      |}|dv rt        |d|      \  }}}||||fS t        |t              st        d|       |rt        d	|       t        |t
        t        t        f      r&|st        d
      |D 	cg c]  }	t        |	       }}	nt        |      g}t        |dd|d      }||D 	cg c]  }	|j                  |	       c}	||fS c c}	w c c}	w )a  Extract filesystem object from urlpath or user arguments

        This classmethod should only be overridden for engines that need
        to handle filesystem implementations other than ``fsspec``
        (e.g. ``pyarrow.fs.S3FileSystem``).

        Parameters
        ----------
        urlpath: str or List[str]
            Source directory for data, or path(s) to individual parquet files.
        filesystem: "fsspec" or fsspec.AbstractFileSystem
            Filesystem backend to use. Default is "fsspec"
        dataset_options: dict
            Engine-specific dataset options.
        open_file_options: dict
            Options to be used for file-opening at read time.
        storage_options: dict
            Options to be passed on to the file-system backend.

        Returns
        -------
        fs: Any
            A global filesystem object to be used for metadata
            processing and file-opening by the engine.
        paths: List[str]
            List of data-source paths.
        dataset_options: dict
            Engine-specific dataset options.
        open_file_options: dict
            Options to be used for file-opening at read time.
        NfsfsspeczPCannot specify a filesystem argument if the 'fs' dataset option is also defined.)Nr   rb)modestorage_optionsz4Expected fsspec.AbstractFileSystem or 'fsspec'. Got zUCannot specify storage_options when an explicit filesystem object is specified. Got: zempty urlpath sequence   )pop
ValueErrorr   
isinstancer   listtuplesetr   r   _strip_protocol)
clsurlpath
filesystemdataset_optionsopen_file_optionsr   r   _pathsus
             Z/home/cdr/jupyterlab/.venv/lib/python3.12/site-packages/dask/dataframe/io/parquet/utils.pyextract_filesystemzEngine.extract_filesystem   sH   T  $$T84B& ;  B!!-dOLB5 uo/@@@ b"45 J2$O   !<<K;LN 
 'D%#56$%=>>6=>>!,>>)'23*7D!RFE0561##A&6!	  ? 7s   C0C5c                     y)Nz256 MiB )r   s    r$   default_blocksizezEngine.default_blocksizek   s        c                    t               )a  Read a single piece of a Parquet dataset into a Pandas DataFrame

        This function is called many times in individual tasks

        Parameters
        ----------
        fs: FileSystem
        piece: object
            This is some token that is returned by Engine.read_metadata.
            Typically it represents a row group in a Parquet dataset
        columns: List[str]
            List of column names to pull out of that row group
        index: str, List[str], or False
            The index name(s).
        use_nullable_dtypes: boolean
            Whether to use pandas nullable dtypes (like "string" or "Int64")
            where appropriate when reading parquet files.
        dtype_backend: {"numpy_nullable", "pyarrow"}
            Whether to use pandas nullable dtypes (like "string" or "Int64")
            where appropriate when reading parquet files.
        convert_string: boolean
            Whether to use pyarrow strings when reading parquet files.
        **kwargs:
            Includes `"kwargs"` values stored within the `parts` output
            of `engine.read_metadata`. May also include arguments to be
            passed to the backend (if stored under a top-level `"read"` key).

        Returns
        -------
        A Pandas DataFrame
        NotImplementedError)r   r   piececolumnsindexuse_nullable_dtypeskwargss          r$   read_partitionzEngine.read_partitiono   s    F "##r)   Nc                    t         )a#  Perform engine-specific initialization steps for this dataset

        Parameters
        ----------
        df: dask.dataframe.DataFrame
        fs: FileSystem
        path: str
            Destination directory for data.  Prepend with protocol like ``s3://``
            or ``hdfs://`` for remote data.
        append: bool
            If True, may use existing metadata (if any) and perform checks
            against the new data being stored.
        partition_on: List(str)
            Column(s) to use for dataset partitioning in parquet.
        ignore_divisions: bool
            Whether or not to ignore old divisions when appending.  Otherwise,
            overlapping divisions will lead to an error being raised.
        division_info: dict
            Dictionary containing the divisions and corresponding column name.
        **kwargs: dict
            Other keyword arguments (including `index_cols`)

        Returns
        -------
        tuple:
            engine-specific instance
            list of filenames, one per partition
        r+   )	r   dfr   pathappendpartition_onignore_divisionsdivision_infor1   s	            r$   initialize_writezEngine.initialize_write   s    P "!r)   c                    t         )a
  
        Output a partition of a dask.DataFrame. This will correspond to
        one output file, unless partition_on is set, in which case, it will
        correspond to up to one file in each sub-directory.

        Parameters
        ----------
        df: dask.dataframe.DataFrame
        path: str
            Destination directory for data.  Prepend with protocol like ``s3://``
            or ``hdfs://`` for remote data.
        fs: FileSystem
        filename: str
        partition_on: List(str)
            Column(s) to use for dataset partitioning in parquet.
        return_metadata : bool
            Whether to return list of instances from this write, one for each
            output file. These will be passed to write_metadata if an output
            metadata file is requested.
        **kwargs: dict
            Other keyword arguments (including `fmd` and `index_cols`)

        Returns
        -------
        List of metadata-containing instances (if `return_metadata` is `True`)
        or empty list
        r+   )r   r4   r5   r   filenamer7   return_metadatar1   s           r$   write_partitionzEngine.write_partition   s
    > "!r)   c                    t               )a;  
        Write the shared metadata file for a parquet dataset.

        Parameters
        ----------
        parts: List
            Contains metadata objects to write, of the type undrestood by the
            specific implementation
        meta: non-chunk metadata
            Details that do not depend on the specifics of each chunk write,
            typically the schema and pandas metadata, in a format the writer
            can use.
        fs: FileSystem
        path: str
            Output file to write to, usually ``"_metadata"`` in the root of
            the output dataset
        append: boolean
            Whether or not to consolidate new metadata with existing (True)
            or start from scratch (False)
        **kwargs: dict
            Other keyword arguments (including `compression`)
        r+   )r   partsmetar   r5   r6   r1   s          r$   write_metadatazEngine.write_metadata   s    0 "##r)   c                    t               )a  
        Collect parquet metadata from a file and set the file_path.

        Parameters
        ----------
        path: str
            Parquet-file path to extract metadata from.
        fs: FileSystem
        file_path: str
            Relative path to set as `file_path` in the metadata.

        Returns
        -------
        A metadata object.  The specific type should be recognized
        by the aggregate_metadata method.
        r+   )r   r5   r   	file_paths       r$   collect_file_metadatazEngine.collect_file_metadata   s    $ "##r)   c                    t               )a  
        Aggregate a list of metadata objects and optionally
        write out the final result as a _metadata file.

        Parameters
        ----------
        meta_list: list
            List of metadata objects to be aggregated into a single
            metadata object, and optionally written to disk. The
            specific element type can be engine specific.
        fs: FileSystem
        out_path: str or None
            Directory to write the final _metadata file. If None
            is specified, the aggregated metadata will be returned,
            and nothing will be written to disk.

        Returns
        -------
        If out_path is None, an aggregate metadata object is returned.
        Otherwise, None is returned.
        r+   )r   	meta_listr   out_paths       r$   aggregate_metadatazEngine.aggregate_metadata  s    . "##r)   F)FNFN)__name__
__module____qualname____doc__classmethodr%   r(   r2   r:   r>   rB   rE   rI   r'   r)   r$   r   r      s    BV Vp   <A"$ "$H  '" '"R " "@ $ $2 $ $& $ $r)   r   c                   | du}|du}| t        |      } nt        | t              r| g} nt        |       } ||}n+|du rg }||z   }nt        |t              r|g}nt        |      }|r|s|}|D cg c]	  }||vs| }}||fS |r|s| }|D cg c]	  }||vs| }}||fS |r/|r-| }|}t        |      j	                  |      rt        d      ||fS |}|}||fS c c}w c c}w )aD  Normalize user and file-provided column and index names

    Parameters
    ----------
    user_columns : None, str or list of str
    data_columns : list of str
    user_index : None, str, or list of str
    data_index : list of str

    Returns
    -------
    column_names : list of str
    index_names : list of str
    NFz3Specified index and column names must not intersect)r   r   strr   intersectionr   )	user_columnsdata_columns
user_index
data_indexspecified_columnsspecified_indexindex_namesxcolumn_namess	            r$   _normalize_index_columnsr\   '  sO    %D0 ,OL)	L#	&$~L)
	u	 
!L0	J	$ \
*%
0 !#/Ha1K3GHH$ $$# 
? $",FQ0EqFF $$ 
. $ |))+6RSS $$ $ $$% I
 Gs   0	C:C	C C c                P    t        | t              } t        | ||      \  }}| ||fS )N)key)root)sortedr
   _analyze_paths)	file_listr   r_   basefnss        r$   _sort_and_analyze_pathsre   e  s.    y&67Iy"48ID#dCr)   c                   fd}| D cg c]  } ||      j                  d       }}|du rW|d   dd |D ]>  }t        |      dz
  }t        t        |            D ]  \  }\  }	}
|	|
k7  s|} n d| @ t              n> ||      j                  d      t              t	        fd|D              sJ d	       g }|D ]%  }|j                  dj                  |d              ' dj                        |fS c c}w )
zConsolidate list of file-paths into parquet relative paths

    Note: This function was mostly copied from dask/fastparquet to
    use in ArrowEngine`.c                    	fd}d}| rm| d   rh| d   d   dk(  rd}t        |       } | d   dd  | d<   nD	j                  dk(  r5| d   dd  j                  d      r| d   dd }t        |       } | d   dd  | d<   g }t        |       D ],  \  }}|j	                   |||      j                  d             . g }|D ]j  }|d	k(  r	|d
k(  rL|r+|d   d
k(  r|j                  |       *|j                          ;|rt        d      |j                  |       Z|j                  |       l |s
|r|}|S d	}|S |dj                  |      z   }|S )Nc                    |j                  j                  d      }|dk(  ry|d   dk(  r|d d }| dkD  r|d   dk(  r|dd  }|S )N/ .r   r   )replacesep)ipr   s     r$   _scrubz2_analyze_paths.<locals>._join_path.<locals>._scrubr  sZ     		"&&#&ABwu|crF1u1abEHr)   rj   r   ri   r   \z:/   rk   z..rl   zcan not get parent of root)
r   rn   
startswith	enumerateextendsplitr6   r   	Exceptionjoin)
r5   rq   
abs_prefix	_scrubbedro   rp   simplersjoinedr   s
            r$   
_join_pathz"_analyze_paths.<locals>._join_pathq  s}   
	 
DGAwqzS  
Dzq'!"+Q4DGABK$:$:4$@!!WQq\
Dzq'!"+Q	dO 	6DAqVAq\//45	6 	"ACxdr{d*q)#$@AANN1%q!	"  #
     388G#45Fr)   ri   Fr   Nrl   r   c              3  .   K   | ]  }|d  k(    y w)Nr'   ).0rp   basepathls     r$   	<genexpr>z!_analyze_paths.<locals>.<genexpr>  s!      
"#AbqEX
s   z(All paths must begin with the given root)rw   lenru   zipallr6   ry   )rb   r   r_   r   fnpath_parts_list
path_partsjk	base_part	path_partout_listr   r   s    `          @@r$   ra   ra   k  sJ   4l <EERz"~++C0EOEu}"1%cr*) 	$JJ!#A-6s8Z7P-Q ))Iy	)A  |H	$ Md#))#.M 
'6
 
 	65	6 
 H% 

HHZ^$	

 	 / Fs   Dc                   t        |      dk  ri S t        |      dk(  rt        |      dk(  sJ |d   S t        |      dkD  rQt        j                  |      }| |d   j                         |d   j	                         |d   j                         g d}n| |d   d   d|d   d   g d}d}t        |      dkD  rt        j                  |      }t        |      D ]   \  }}|dz  }	|V|d   |	   }
|d   |	dz      }|d   |	dz      }|
|k(  r|r|d	   j                  d
|i       J|d	   j                  ||
||d       d|j                  dd|	f   j                         j                         }
|j                  dd|	dz   f   j                         j                         }|j                  dd|	dz   f   j                         }|
|k(  r|r|d	   j                  d
|i       |d	   j                  ||
||d       # |S )z~Utility to aggregate the statistics for N row-groups
    into a single dictionary.

    Used by `Engine._construct_parts`
    r   r   num-rowstotal_byte_size)file_path_0r   znum-row-groupsr   r.   Nrs      r.   
null_count)nameminmaxr   )r   pd	DataFramesumcountru   r6   ilocdropnar   r   )rD   file_row_group_statsfile_row_group_column_statsstat_col_indicesdf_rgsr}   df_colsindr   ro   minvalmaxvalr   s                r$   _aggregate_statsr     sN     1$		(	)Q	.'(A---#A&& #$q(\\"67F(":.224"("4":":"<#)*;#<#@#@#BA  )03J?"##7#:;L#MA *+a/ll#>?G"#34  	ICaA4Q7:4Q7A>8;AEB
V#
iL''z(BCiL''$(#)#)*4	 !ad+22488: aQh/668<<>$\\!QU(3779
V#
iL''z(BCiL''$(#)#)*4	3 	B r)   c	           	        g }	g }
|rt        |      }d}|j                         D ]  \  }}t        |      }|rdgt        t	        |||            z   }nt        t	        |||            }|D ]s  }||z   }|du r|r	|dk(  r|}d}||z
  }|dkD  r|}||| } |||fi |}|5|	j                  |       | sIt        |||   || ||   || |      }|
j                  |       u  |	|
fS |j                         D ]K  \  }} |||fi |}||	j                  |       | s't        |||   ||   |      }|
j                  |       M |	|
fS )Nr   T)intitemsr   r   ranger6   r   )gather_statisticssplit_row_groupsaggregation_depthfile_row_groupsr   r   r   make_part_funcmake_part_kwargsr@   statsresidualr<   
row_groupsrow_group_count_rgsro   i_end	_residualrg_listpartstats                         r$   _row_groups_to_partsr     s    EE /0$3$9$9$; #	' Hj!*oOsT%/CS"TUUE(O=MNO ',,$,AF (#$ % 7I 1}#,$Qu-% '
 <T"$+ ,X6q?3H=aF(	D LL&9'#	'p %<' %4$9$9$; 	# Hj! #D
 |LL '(2/9$	 T"#	#& %<r)   c                    | }t        | t              r1| |v rt        |      |j                  |       z
  }|S t	        |  d      |S )Nz) is not a recognized directory partition.)r   rQ   r   r/   r   )aggregate_filespartition_namesr   s      r$   _get_aggregation_depthr   a  si      (/3'o- !$O 47L7L8 ! 	 "##LM  r)   c                Z    | (dt        |      rdndz   }t        j                  |d      S | S )Nz%dataframe.parquet.metadata-task-size-localremoter   )r	   r   get)metadata_task_sizer   
config_strs      r$   _set_metadata_task_sizer     s;     ! =#B'GX

 zz*a((r)   c           	     |   | xs i j                         } | j                  di       j                         }|si }d| vr}|j                  d      dk(  r?| j                  dd      | d<   |j                  ||||j                  d|      d       || fS | j                  d|      | d<   | j                  d	d
      | d	<   || fS )Nprecache_optionsopen_file_funcmethodparquet
cache_typer@   engine)metadatar.   r   r   r   r   )copyr   r   update)r    r   r.   r   default_enginedefault_cacheallow_precacher   s           r$   _process_open_file_optionsr     s     +0b668(,,-?DIIK 00)Y6.?.C.Cg/l+ ## (&",.228^L	 ...	 /@.C.Cm/l+ ):(=(=fd(Kf%...r)   c                 n   | j                         }d|v rt        j                  dt               i |j	                  di       j                         |j	                  di       j                         }|j	                  di       j                         }|j	                  di       j                         }||||fS )Nfilez^Passing user options with the 'file' argument is now deprecated. Please use 'dataset' instead.datasetreadr    )r   warningswarnFutureWarningr   )r1   user_kwargsr   read_optionsr    s        r$   _split_user_optionsr     s     ++-K-	

//&"
%
*
*
,
//)R
(
-
-
/O ??62.335L#(;R@EEG	 r)   c                    |r|du s!t        |      dkD  r|s|j                  |      rd} t        |       S |sd} t        |       S )NTr   F)r   rR   bool)r   	blocksizer   r   filter_columnsstat_columnss         r$   _set_gather_statisticsr     s]     
'4/ !A%*;&&|4 ! !""  "!""r)   c                Z    | r)t        |      }|st        j                  |       d|z  kD  ryy)Nr   adaptiveF)r   npr   )row_group_sizesr   r   s      r$   _infer_split_row_groupsr     s.    	*	bff_5IE r)   rJ   )NNNN	readaheadT) 
__future__r   r   numpyr   pandasr   fsspec.corer   r   r   fsspec.specr   daskr   dask.dataframe.io.utilsr	   
dask.utilsr
   r   r   r\   re   ra   r   r   r   r   r   r   r   r   r'   r)   r$   <module>r      s    "    R R *  0 4U$ U$p;%| VrL^K\B  #/L6#Br)   