
    uki4                    h   d Z ddlmZ ddlZddlZddlmZmZ ddlZddl	Z	ddl
Z
ddlZddlZddlZddlmZ ddlZddlmZ ddlmZ ddlmZ dd	lmZ dd
lmZ ddlmZ ddlmZ ddlmZ ddlmZm Z!m"Z"m#Z#m$Z%m&Z'm(Z( ddl)Z* ejV                  ejX                  d      Z- ejV                  ej\                  dd      Z.dZ/ e	j`                         Z1dZ2ddgZ3ddddddgZ4 G d de5      Z6dZ7 e
jp                  e9      Z:d%dZ;d&dZ< G d d ejz                        Z> G d! d"      Z? G d# d$e?e>      Z@y)'z(Array serialization and deserialization.    )annotationsN)CallableSequence)Any)array)distributed)sharding)typing)util)Format)_jax)tensorstore_impl)_run_deserialization_run_serializationasync_serializeasync_deserialize_TS_CONTEXT_DEFAULT_BASE_DRIVER_LimitInFlightByteszarr)driverF)r   ocdbtcheckpoint_write_successziPlease initialize the distributed system via `jax.distributed.initialize()` at the start of your program.zgs://zs3://gcs)r   
path_regexs3c                      e Zd Zy)BarrierTimeoutErrorN)__name__
__module____qualname__     m/home/cdr/jupyterlab/.venv/lib/python3.12/site-packages/jax/experimental/array_serialization/serialization.pyr   r   C   s    r#   r   zSuggestions for possible fixes:
* Check the logs to see if one or more processes failed.
* Make sure the training and checkpointing endpoints are close geographically.
* Try increasing the timeout you pass to GlobalAsyncCheckpointManager.c                8   t        | t              r/t        j                  ddj	                  t
               d|       ryydD ]  }|| v st        | |         c S  d| v r<t        D ]3  }| d   |d   k(  s|d    yt        j                  |d   | d	         s3 y y)
zDetect if user is using cloud storages.

  This can detect common defines and unable to detect some corner cases such as
  using gcsfuse.
  z^(|)TF)basekvstorer   r   path)
isinstancestrrematchjoin_REMOTE_URL_PREFIXESis_remote_storage_REMOTE_DRIVER_VALIDATIONS)tspeckeyrules      r$   r1   r1   O   s     s	xx2chh345Q7?  +c
e|uSz**+ * 	xDN	*% 88D&f6 
r#   c                    d|  S )Ntensorstore_checkpoint_r"   )r4   s    r$   _get_keyr8   l   s    "3%	((r#   c                      e Zd ZdZej
                  d        Zej
                  d        Zej
                  	 	 dd       Zej
                  	 	 d	 	 	 	 	 d	d       Z	y)
 GlobalAsyncCheckpointManagerBasea&  Interface for checkpointing GDAs asynchronously.

  This class manages the state of an ongoing asynchronous checkpoint.

  For example, say a checkpoint happens on every step. If you checkpoint on
  step 1 and after some computation the model is on checkpoint 2. But step 1's
  checkpoint hasn't finished committing to the storage layer yet. So until that
  is finished, checkpoint for step 2 will need to be blocked. Maintaining a
  class allows to maintain that state.

  Examples:

  Below is a simplified training loop:

  ```
  # Call this at the start of your program.
  jax.distributed.initialize()

  manager = GlobalAsyncCheckpointManager()

  # Restore checkpoint if available or initialize the train_state from
  # init_fn().
  train_state = manager.deserialize(...)

  while ...:
    if step % num_steps_between_checkpoints == 0:
      manager.serialize(train_state, temp_checkpoint_dir=...,
                        final_checkpoint_dir=...)
      train_state = train_step(train_state, input)
      # This is a non-blocking call.
      manager.check_for_errors()

  manager.serialize(train_state, temp_checkpoint_dir=...,
                    final_checkpoint_dir=...)
  # Wait before the end of the program for the checkpoint to finish. This is a
  # blocking call.
  manager.wait_until_finished()
  ```
  c                     y)zChecks if any errors have been raised in the child thread.

    This is a non-blocking call that can be called in the main thread.
    Nr"   selfs    r$   check_for_errorsz1GlobalAsyncCheckpointManagerBase.check_for_errors       r#   c                     y)z(Blocks until serialization has finished.Nr"   r<   s    r$   wait_until_finishedz4GlobalAsyncCheckpointManagerBase.wait_until_finished   r?   r#   c                    y)zSerializes GDAs to TensorStore.Nr"   )r=   arraystensorstore_specson_commit_callbacks       r$   	serializez*GlobalAsyncCheckpointManagerBase.serialize   r?   r#   Nc                     y)z#Deserializes GDAs from TensorStore.Nr"   )r=   	shardingsrD   global_shapesdtypess        r$   deserializez,GlobalAsyncCheckpointManagerBase.deserialize   r?   r#   )rE   zCallable[[], None])NN)rH   Sequence[sharding.Sharding]rD   Sequence[dict[str, Any]]rI   Sequence[array.Shape] | NonerJ   !Sequence[typing.DTypeLike] | None)
r   r    r!   __doc__abcabstractmethodr>   rA   rF   rK   r"   r#   r$   r:   r:   p   s    &P   3 3 *$6* *  AE>B.%=.!=. <. .r#   r:   c                  :    e Zd Zd	dZd Zd Zd Zd Zd Zd
dZ	y)AsyncManagerc                0   || _         | j                   dz  | _        d | _        d | _        d | _        t        j                         dkD  r)t        j                  j                  t        t              t        j                  j                  | _        d | _        y )Ni     )_timeout_secs_timeout_in_ms_commit_futures_thread
_exceptionjaxprocess_countr   global_stateclient
ValueError_DISTRIBUTED_SYSTEM_MSG_client_count)r=   timeout_secss     r$   __init__zAsyncManager.__init__   s{    %D,,t3DDDLDO
Q;#;#;#B#B#J.//++22DLDKr#   c                ~    | j                   1| j                   j                         rt        j                  d       y y y )NzPlease add `.wait_until_finished()` in the main thread before your program finishes because there is a possibility of losing errors raised if the this class is deleted before writing is completed.)rZ   is_aliveloggerwarningr<   s    r$   __del__zAsyncManager.__del__   s6    ||DLL$9$9$;nn K L %<r#   c                   	 t        j                         }t        j                         }t        j	                  d|       t        j
                         }| j                  D ]  }|j                           t        j	                  d|       d }|dkD  rv| j                  J t        | j                        }t        j	                  d||       | j                  j                  || j                         t        j	                  d|       |dk(  rz| j                  %| j                          t        j	                  d       |dkD  rD| j                  J | j                  j                  |t               t        j	                  d|       t         j                   j#                  d	t        j
                         |z
         y # t$        $ r}|| _        Y d }~y d }~ww xY w)
Nz/Starting commit to storage layer by process: %sz3Finished committing to storage layer by process: %srV   z)Key used for barrier is %s for process %sz*Finished waiting at barrier for process %sr   z$on_commit_callback successfully ran!z1Process 0 successfully set key %s in the kv storez//jax/checkpoint/write/async/thread_duration_sec)r\   process_indexr]   rh   infotimerY   resultrb   r8   rc   wait_at_barrierrX   _on_commit_callbackkey_value_set_CHECKPOINT_SUCCESS
monitoringrecord_event_duration_secs	Exceptionr[   )r=   current_processr]   thread_start_timefuturekey_for_barrieres          r$   _thread_funczAsyncManager._thread_func   s   &))+o'')mkkC"$))+(( &kkG"$ o		||''' #4;;/?#_	6$$_d6I6IJ@#	% 
A	##/

"
"
$
++<
=1)
))
,,
$
$_6I
J
++I%' 
nn//
;
))+)
)+  doos   F:F= =	GGGc                    t        t              | _        || _        t	        j
                  | j                        | _        | j                  j                          y )N)target)	next_module_unique_countrc   rq   	threadingThreadr|   rZ   start)r=   rE   s     r$   _start_async_commitz AsyncManager._start_async_commit   s?    +,DK1D##4+<+<=DLLLr#   c                    | j                   e| j                   }d | _         t        |t        j                        r6dt	        |      v r)t        dj                  t	        |      t        g            |y )Nz$DEADLINE_EXCEEDED: Barrier timed out
)r[   r+   r   JaxRuntimeErrorr,   r   r/   _BARRIER_TIMED_OUT_MSG)r=   	exceptions     r$   r>   zAsyncManager.check_for_errors   sg    "//ido
Y 4 4
5
0C	N
B!IIs9~'=>?A 	Ao #r#   c                   | j                   6| j                   j                          d | _         t        j                  d       | j	                          t        j                  d       t        j                         dkD  rm| j                  `| j                  J t        | j                        }| j                  j                  || j                         t        j                  d|       y y y )NzThread joined successfullyz!Error check finished successfullyrV   z<blocking_key_value_get on key %s was successfully completed.)rZ   r/   rh   rm   r>   r\   r]   rc   rb   r8   blocking_key_value_getrX   )r=   get_keys     r$   rA   z AsyncManager.wait_until_finished  s    ||
lldlkk./
KK34
Q4;;#:\\%%% %g
ll))'43F3FGkk  ') $;r#   c                    || _         y N)rY   )r=   futuress     r$   _add_futureszAsyncManager._add_futures  s
    "Dr#   N)i,  )r   zSequence[ts.Future])
r   r    r!   re   rj   r|   r   r>   rA   r   r"   r#   r$   rT   rT      s'    L'R	)$#r#   rT   c                      e Zd ZdZddd	 	 	 ddZddd	 	 	 	 	 	 	 d	dZ	 	 	 d
	 	 	 	 	 	 	 ddZ	 	 	 d
	 	 	 	 	 	 	 	 	 ddZy)GlobalAsyncCheckpointManagerz1Responsible for serializing GDAs via TensorStore.NrE   transactionc                   t         j                  d       | j                          g fd}t        j                   |              | j                         | j                  |       y)a  Serializes Arrays or Arrays via TensorStore asynchronously.

    TensorStore writes to a storage layer in 2 steps:
    *  Reading/copying from the source after which the source can be modified.
         * Returns a copy future.
    *  Writing/committing to the storage layer.
         * Returns a commit future.

    In asynchronous mode, the serialization waits for the commit future to
    finish in a separate thread allowing other computation to proceed.

    Args:
      arrays: Arrays or Arrays that should be serialized.
      tensorstore_specs: TensorStore specs that are used to serialize GDAs or
        Arrays.
      on_commit_callback: This callback will be executed after all processes
        have finished writing their checkpoints to disk. Filesystems where
        atomic rename operations are supported, you can rename from the
        temporary directory to the final directory. On GCS, you write to the
        final directory directly and in `on_commit_callback` you write a success
        file indicating that the serialization was successful because GCS does
        not support atomic rename operations.
      transaction: Optional TensorStore transaction to use.
    z-Waiting for previous serialization to finish.c                    K   t         j                  j                  fd      } t        j                  |   d {   S 7 w)Nc                6    t        j                  | |      S )N)commit_futurer   )ts_implr   )arr_inptensorstore_speccommit_futuresr   s     r$   <lambda>zQGlobalAsyncCheckpointManager.serialize.<locals>._run_serializer.<locals>.<lambda>B  s    G,C,C*%	- r#   )r\   	tree_utiltree_mapasynciogather)future_writerrC   r   rD   r   s    r$   _run_serializerz?GlobalAsyncCheckpointManager.serialize.<locals>._run_serializer@  s?     mm,, 
	m >>=1111s   ;AAAN)rh   rm   rA   r   runr   r   )r=   rC   rD   rE   r   r   r   s    `` ` @r$   rF   z&GlobalAsyncCheckpointManager.serialize  sY    @ KK?@+-N2 KK!"n% 	/0r#   c               t    t         j                  j                  t        |      }| j	                  ||||      S )Nr   )r\   treemapget_tensorstore_specrF   )r=   rC   pathsrE   r   tspecss         r$   serialize_with_pathsz1GlobalAsyncCheckpointManager.serialize_with_pathsT  s;     XX\\.6F>>-	   r#   c                T    | j                          t        j                  |||||      S r   )rA   r   r   )r=   rH   rD   rI   rJ   concurrent_gbs         r$   rK   z(GlobalAsyncCheckpointManager.deserialized  s1    
 	''$mV]L Lr#   c                t    t         j                  j                  t        |      }| j	                  |||||      S r   )r\   r   r   r   rK   )r=   rH   r   rI   rJ   r   r   s          r$   deserialize_with_pathsz3GlobalAsyncCheckpointManager.deserialize_with_pathsm  s7     XX\\.6FIv}f)+ +r#   )rE   Callable[[], None] | Noner   ts_impl.Transaction | None)rC   zSequence[jax.Array]r   Sequence[str]rE   r   r   r   )NN    )
rH   z$Sequence[sharding.Sharding | Format]rD   rM   rI   rN   rJ   rO   r   int)
rH   rL   r   r   rI   rN   rJ   rO   r   r   )r   r    r!   rP   rF   r   rK   r   r"   r#   r$   r   r     s    9 7;0471
 471 .71| 7;04! 
 4 .$ AE>B')	L%=L!=L <L "%	L 5926+2++ 2+ 0	+
 +r#   r   )r3   zdict[str, Any] | strreturnbool)r4   r   )ArP   
__future__r   rQ   r   collections.abcr   r   	functools	itertoolsloggingr-   r   rn   r
   r   r\   jax._srcr   r   r	   r   jax._src.layoutr   jax._src.libr   $jax.experimental.array_serializationr   r   5jax.experimental.array_serialization.tensorstore_implr   run_deserializationr   run_serializationr   r   r   
TS_CONTEXTr   _DEFAULT_DRIVERr   tensorstoretspartial_get_tensorstore_metadata_get_metadatar   rs   countr   ra   r0   r2   rv   r   r   	getLoggerr   rh   r1   r8   	StrictABCr:   rT   r   r"   r#   r$   <module>r      sH   / " 
  .    	    
       "  LB B B
  "	!!'"C"C)/1(y(()E)E06eE 
 1 &y( C   ) D)4( 
) M  
		8	$:)>.t~~ >.Bd# d#N]+<1Q ]+r#   