
    VpfH                       d Z ddlmZ ddlmZmZ ddlmZ ddlZddlm	Z	 ddl
Z
ddlmZ ddlmZmZmZ ddlmZ dd	lmZ dd
lmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddl m!Z" ddlm#Z# ddl$m%Z% ddlZ&dBdZ'dCdDdZ(dEdZ)d Z*d  Z+dFdGd$Z,dHdId'Z-dJd*Z.ed+             Z/ed,             Z0ed-             Z1dKd2Z2dLd5Z3 ej4        d6          Z5e56                    e2           d7 Z7e58                    e7            ej9        e5d8            d9 Z: ee:d!          ej;        e5<    ee:d!d          ej<        e5<   d: Z= ej>        e5e=           dKd;Z?dMd=Z@ ej4        d>          ZAeA6                    e?           d? ZBeA8                    eB            ej9        eAd@             ejC        eA           dA ZD ej>        eAeD           dS )NzDUtilities for synchronizing and communication across multiple hosts.    )annotations)partial	lru_cache)OptionalN)Any)tree_flattentree_maptree_unflatten)core)ad)batching)mlir)array)sharding_impls)pxla)xla)pjit)PartitionSpec)distributed)safe_zipxr   returnc                t    t           j                            t          t          j        d          |           S Nr   axis)jaxtreemapr   jnpsumr   s    `/var/www/html/nettyfy-visnx/env/lib/python3.11/site-packages/jax/experimental/multihost_utils.py_psumr$   *   s'    	gcgA...	2	22    in_tree	is_sourcebool | Nonec           	         t          j                    dk    r*t           j                            t          j        |           S t          j                    dk    t	          j        t          j                              	                    t          j                    t          j
                              }t           j                            |d          t          d          fd}d }t           j                            ||           }  t          j        t          t           j                            t                                          |           }t           j                            ||          S )	a  Broadcast data from a source host (host 0 by default) to all other hosts.

  Args:
    in_tree: pytree of arrays - each array *must* have the same shape across the
      hosts.
    is_source: optional bool denoting whether the caller is the source. Only
      'source host' will contribute the data for the broadcast. If None, then
      host 0 is used.

  Returns:
    A pytree matching in_tree where the leaves now all contain the data from the
    first host.
     Nr   	processeslocal_devicesr,   c                    r| }nt          j        |           }t          j        |d          }t          |          S r   )np
zeros_likeexpand_dims host_local_array_to_global_array)r   inpglobal_meshr'   pspecs     r#   pre_jitz%broadcast_one_to_all.<locals>.pre_jitG   sG     ccM!c
.1
%
%
%C+CeDDDr%   c                P    t          j        |                     d                    S )Nr   )r/   asarrayaddressable_datar"   s    r#   post_jitz&broadcast_one_to_all.<locals>.post_jitO   s     :a((++,,,r%   out_shardings)r   process_countr   r   r/   r8   process_indexr   devicesreshapelocal_device_countshardingMeshPjitr$   NamedSharding)r&   r'   r?   r6   r:   out_treer4   r5   s    `    @@r#   broadcast_one_to_allrH   .   sY    	A8<<
G,,,!##q(I	kmm WS.00#2H2J2JKK 
!!'+IJJ+
K..%E E E E E E E- - - HLL'**'SWU#,*D*D133+ +    " "(	h	)	))r%   namestrc                    t          j        t          j        |                                                     }t          |d|  d           dS )z+Creates a barrier across all hosts/devices.z$sync_global_devices name mismatch ('z')N)r/   uint32zlibcrc32encodeassert_equal)rI   hs     r#   sync_global_devicesrR   X   sE    i
4;;==))**!qAAAABBBBBr%   c                    | S N r"   s    r#   _identity_fnrV   `   s    	
(r%   c                   t          | t          j                  rQ| j        sJt          j                            | j        j                  } t          t          |          |           }nt          j                    dk    rt          j        |           S t          j        t          j                                                  t          j                    t          j                              }t          j                            |d          }t'          d          }t          j                            ||          }t          j        |           j        dk    s|st          j        d          t/          j        j        j                  }t7          j        |t7          j        |          |          }	fdt          j                    D             }
t          j        |	j        ||
          }|5   t          t          d           |          }d d d            n# 1 swxY w Y   t          j        |                     d                    S )Nr;   r*   r+   r,   r   r   c                :    g | ]}t          j        |          S rU   )r   
device_put).0dhost_np_arrs     r#   
<listcomp>z3_handle_array_process_allgather.<locals>.<listcomp>|   s%    HHHqCN;**HHHr%   )!
isinstancer   	ArrayImplis_fully_addressabler   GSPMDShardingget_replicatedrB   _device_assignmentr   rV   r   r=   r/   r8   r?   r@   rA   rC   rD   rF   ndimr1   r   ShapedArrayshapedtyper   mesh_local_to_globalget_array_mappingr-   $make_array_from_single_device_arraysr9   )r3   tiledrepsoutr?   r4   r5   savalglobal_avalbufs
global_arrr\   s               @r#   _handle_array_process_allgatherrs   d   sC   U_%% ?c.F ?'66') )D
0$|4
0
0
0
5
5CC aZ__hs{}}%%--c.?.A.A.1.D.F.FH HG,##G-KLLKkNNE"";66A*S//K1EN;Q777kK-{/@AAD+T+E22D: :K IHHHC4E4G4GHHHD;1d$ $J	 ? ?2DT222:>>c? ? ? ? ? ? ? ? ? ? ? ? ? ? ? 
C((++	,	,,s   < H((H,/H,Frk   boolc                N    fd}t           j                            ||           S )a  Gather data from across processes.

  Args:
    in_tree: pytree of arrays - each array _must_ have the same shape across the
      hosts.
    tiled: Whether to stack or concat the output. Defaults to False i.e. stack
      into a new positional axis at index 0.

  Returns:
    Pytrees of numpy arrays.
      * If the input is a non-fully addressable jax.Array, then the data is
        fully replicated.
      * If the input is numpy array or fully addressable jax.Array, then the
        output shape is dependent on the `tiled` argument.
        If its False, then the output will be stacked else concatenated.
      * If the input is a scalar, then the output will be stacked.
  c                $    t          |           S rT   )rs   )r3   rk   s    r#   _pjitz process_allgather.<locals>._pjit   s    *3666r%   )r   r   r   )r&   rk   rw   s    ` r#   process_allgatherrx      s2    &7 7 7 7 7	eW	%	%%r%    fail_messagec                    t          |           }t          j                            t          j                            d | |                    st          | d| d|  d          dS )z9Verifies that all the hosts have the same tree of values.c                 B    t          j        t          j        |            S rT   )r/   allequalr"   s    r#   <lambda>zassert_equal.<locals>.<lambda>   s    rx|(<(< r%   z Expected: z; got: .N)rH   r   	tree_utiltree_allr	   AssertionError)r&   rz   expecteds      r#   rP   rP      s    !'**(				m<<gxPP
R 
R A
??H??W???A A AA Ar%   step_idintc                    t           j        j        dS t           j        j        }|t	          d          |                    |           S )a  Determine whether all hosts have reached a preemption sync step.

  When any host receives a preemption notice, the notice is propagated to all
  hosts and triggers a synchronization protocol in the background. The
  synchronization protocol calculates the maximum step ids from all hosts, and
  uses the next step id (i.e., max + 1) as the safe step to save a checkpoint.
  All hosts should continue training more steps until this method returns True,
  indicating that the `step_id` is equal to the safe step and the hosts should
  start saving a checkpoint.

  To use this API, all hosts must start training from the same step and call it
  at every training step. Example usage:

  ```
  def should_save(step_id: int) -> bool:

    # Should save an on-demand checkpoint for preemption
    if multihost_utils.reached_preemption_sync_point(step_id):
      return True

    # Should save a regular checkpoint
    return step_id - last_saved_checkpoint_step >= save_interval_steps
  ```

  Preemption notice is provided by the cluster scheduler to notify the
  application in advance before it gets evicted. By default, we use SIGTERM as
  the signal for preemption notice.

  TODO(b/230630494): Add instructions for customized preemption notice.

  Returns:
    A boolean indicating whether all hosts have reached a synchronization step
    after some hosts are preempted.

  Raises:
    RuntimeError: if preemption sync manager has not been inititialized.
  NFz1Preemption sync manager has not been initialized.)r   global_stateclientpreemption_sync_managerRuntimeErrorreached_sync_point)r   sync_managers     r#   reached_preemption_sync_pointr      sJ    L $,5)A,
J
K
KK		(	(	1	11r%   c                B    t          j        | | |            d          S )NT)tupled_args)pjit_libflatten_axis_resources)rI   r&   pspecs_thunks      r#   _flatten_pspecsr      s+    		(
G\\^^
7 
7 
7 7r%   c                R    t          j        |t          j        |          |           S rT   )r   rh   ri   )
local_avalmeshr5   s      r#   _local_to_global_avalr      s)    		"4)?)F)F#-
/ 
/ /r%   c                R    t          j        |t          j        |          |           S rT   )r   mesh_global_to_localri   )rp   r   r5   s      r#   _global_to_local_avalr      s)    		"4)?)F)F#.
0 
0 0r%   arrr4   jax.sharding.Meshr5   c                   |t          d          t           t          j                  r	 j        s S t           t          j                  r8t           j        t          j        j                  rt          j                    t          j        	                    |j
        |          }t           t          j                  r2 j                            | j                  rd  j        D             }nLt          j                     fd|                     j                                                  D             }t'          t)          j         j         j                  ||          }t/          j        |t          j        	                    ||          |t3          |j
        j        j                            S )N`None` is not a valid input to the pspecs argument. Please use jax.sharding.PartitionSpec() if you wanted to replicate your input.c                    g | ]	}|j         
S rU   )data)rZ   r   s     r#   r]   z9host_local_array_to_global_array_impl.<locals>.<listcomp>   s    555af555r%   c                &    g | ]\  }}|         S rU   rU   rZ   r[   indexr   s      r#   r]   z9host_local_array_to_global_array_impl.<locals>.<listcomp>   7     O O OAu 	E
O O Or%   )
ValueErrorr^   r   r_   r`   rB   r   PmapShardingr/   rF   
local_meshis_equivalent_tord   addressable_shardsr   canonicalize_dtypedevices_indices_maprf   itemsr   r   re   rg   r   batched_device_putlistr?   flat)r   r4   r5   local_shardingarraysrp   s   `     r#   %host_local_array_to_global_array_implr      s   
]
	NO O O U_%% c.F JU_%% *	lCL-+/ +/ 
(3--C<--k.DeLL.
 eo&& O	l##NCH==O55c4555FF

 
%
%CO O O O&::39EEKKMMO O OF &
sy#),,k5B B+ 
	 3<--k5AAd;)1677
9 
9 9r%   local_inputspspecsc                    t          |           \  }}t          d|t          j        |                    }fdt	          ||          D             }t          ||          S )a  Converts a host local value to a globally sharded jax.Array.

  This function takes host-local data (which might be different
  across hosts), and populates a global array with this data, where each
  device on each host, get the appropriate slice of the data according to
  sharding defined by the global_mesh/pspects.

  For example:

  >>> global_mesh = jax.sharding.Mesh(jax.devices(), 'x')
  >>> pspecs = jax.sharding.PartitionSpec('x')
  >>> host_id = jax.process_index()
  >>> arr = host_local_array_to_global_array(np.arange(4) * host_id, mesh, pspecs)  # NB: assumes jax.local_device_count() divides 4.   # doctest: +SKIP

  The resulting array will have the shape (4 * num_processes) and will
  have distributed value of: (0, 1, 2, 3, 0, 2, 4, 6, 0, 3, 6, 9, ... ),
  where each slice np.arange(4) * host_id will be partitioned across the
  corresponding host's devices.

  Similarly:

  >>> mesh = jax.sharding.Mesh(np.array(jax.devices()).reshape(jax.process_count(), jax.local_device_count()), ['host', 'dev'])
  >>> pspecs = jax.sharding.PartitionSpec('host')
  >>> host_id = jax.process_index()
  >>> arr = host_local_array_to_global_array(np.arange(4) * host_id, mesh, pspecs)  # doctest: +SKIP

  will create the same distributed value (0, 1, 2, 3, 0, 2, 4, 6, ...),
  however each slice np.arange(4) * i will be *replicated* across corresponding
  host devices.

  On the other hand, if pspecs = PartitionSpec(), which means
  replication across all axes, then this snippet:

  >>> pspecs = jax.sharding.PartitionSpec()
  >>> arr = host_local_array_to_global_array(np.arange(4), mesh, pspecs)  # doctest: +SKIP

  will have the shape (4,) and the value (0, 1, 2, 3) will be replicated
  across all hosts and devices.

  It is an undefined behavior to have not identical local_inputs with pspec
  indicating data replication.

  You can use this function to transition to jax.Array. Using jax.Array with
  pjit has the same semantics of using GDA with pjit i.e. all jax.Array
  inputs to pjit should be globally shaped.

  If you are currently passing host local values to pjit, you can use this
  function to convert your host local values to global Arrays and then pass that
  to pjit.


  Example usage.

  >>> from jax.experimental import multihost_utils # doctest: +SKIP
  >>>
  >>> global_inputs = multihost_utils.host_local_array_to_global_array(host_local_inputs, global_mesh, in_pspecs) # doctest: +SKIP
  >>>
  >>> with mesh: # doctest: +SKIP
  >>>   global_out = pjitted_fun(global_inputs) # doctest: +SKIP
  >>>
  >>> host_local_output = multihost_utils.global_array_to_host_local_array(global_out, mesh, out_pspecs) # doctest: +SKIP

  Please note ths function requires global mesh to be a continuous mesh, meaning
  that  devices that belong to each host should form a subcube in this mesh.
  To move local data to global array with non-continuous mesh use
  jax.make_array_from_callback or jax.make_array_from_single_device_arrays
  instead.

  Args:
    local_inputs: A Pytree of host local values.
    global_mesh: A jax.sharding.Mesh object. The mesh must be a contiguous mesh,
    that is all hosts' devices must form a subcube in this mesh.
    pspecs: A Pytree of jax.sharding.PartitionSpec's.

  Returns:
    A pytree of global arrays.
  zinput pspecsc                P    g | ]"\  }}t                               ||           #S r4   r5   "host_local_array_to_global_array_pbind)rZ   r3   in_specr4   s      r#   r]   z4host_local_array_to_global_array.<locals>.<listcomp>Y  sK        #w )--c{4; . = =  r%   r   r   r   hashable_pytreer   r
   )r   r4   r   	flat_inpsr&   	in_pspecsout_flats    `     r#   r2   r2     s    ^ $L11)Wng&6v>>@ @)    #9i88  (
 
	*	**r%   r2   c               ^    t          t          j        | j        | j                  ||          S rT   )r   r   re   rf   rg   r   r4   r5   s      r#   ltg_abstract_evalr   c  /    	
sy#),,k5
B 
B Br%   c                (    t          j        | fi |fS rT   r   ct_paramss      r#   r   r   i       49"GGGG'J r%   c	                    |\  }	|\  }
|d n|}t          |          }|                    |
|           t          | }t                              |	||          }||
fS )Nr   )r   insertrD   r   r   )insert_axisspmd_axis_name	axis_size	axis_name	main_typevals_indims_inr4   r5   r   r[   	new_parts	new_pspecys                 r#   ltg_batcherr   l  st     "!"!$,dd.)5kk)1i   m)(--[	 . 3 3!	
A+r%   c                   |gS rT   rU   ctxr   r4   r5   s       r#   _ltg_loweringr   }  	    
*r%   c               <    |t          d          t           t          j                  r	 j        r S t
          j                            ||          }t
          j                            |j        |          }t          t          j         j         j                  ||          }t           t          j                  r\ j                            | j                  r j        }nt          j         |          }|j        }t          j        |||d          S t%          j                     fd|                     j                                                  D             }t-          j        |||t1          |j        j        j                            S )Nr   T)	committedc                &    g | ]\  }}|         S rU   rU   r   s      r#   r]   z9global_array_to_host_local_array_impl.<locals>.<listcomp>  r   r%   )r   r^   r   r_   r`   r   rB   rF   r   r   r   re   rf   rg   r   rd   _arraysrY   r   r   r   r   r   r   r   r?   r   )r   r4   r5   global_shardingr   r   r   resharded_arrays   `       r#   %global_array_to_host_local_array_implr     s   
]
	NO O O U_%% #*B JL..{EBB/<--k.DeLL.$
sy#),,k5B B* U_%% 3
|$$_ch?? '{ffsO<<o&f?:~vNNNN 
 
%
%CO O O O&::39EEKKMMO O OF "NF[#+0113 3 3r%   global_inputsc                    t          |           \  }}t          d|t          j        |                    }fdt	          ||          D             }t          ||          S )a  Converts a global `jax.Array` to a host local `jax.Array`.

  You can use this function to transition to `jax.Array`. Using `jax.Array` with
  pjit has the same semantics of using GDA with pjit i.e. all `jax.Array`
  inputs to pjit should be globally shaped and the output from pjit will also
  be globally shaped jax.Array's

  You can use this function to convert the globally shaped `jax.Array` output
  from pjit to host local values again so that the transition to jax.Array can
  be a mechanical change.

  Example usage:

  >>> from jax.experimental import multihost_utils # doctest: +SKIP
  >>>
  >>> global_inputs = multihost_utils.host_local_array_to_global_array(host_local_inputs, global_mesh, in_pspecs) # doctest: +SKIP
  >>>
  >>> with mesh: # doctest: +SKIP
  ...   global_out = pjitted_fun(global_inputs) # doctest: +SKIP
  >>>
  >>> host_local_output = multihost_utils.global_array_to_host_local_array(global_out, mesh, out_pspecs) # doctest: +SKIP

  Args:
    global_inputs: A Pytree of global jax.Array's.
    global_mesh: A :class:`jax.sharding.Mesh` object. The mesh must be contiguous
      meaning all local devices of the host must form a subcube.
    pspecs: A Pytree of :class:`jax.sharding.PartitionSpec` objects.

  Returns:
    A Pytree of host local arrays.
  zoutput pspecsc                P    g | ]"\  }}t                               ||           #S r   "global_array_to_host_local_array_pr   )rZ   r3   or4   s      r#   r]   z4global_array_to_host_local_array.<locals>.<listcomp>  sK        #q )--c{45 . 7 7  r%   r   )r   r4   r   r   rG   
out_pspecsr   s    `     r#    global_array_to_host_local_arrayr     s    B %]33)X'7??A A*    Y
33  (
 
(	+	++r%   r   c               ^    t          t          j        | j        | j                  ||          S rT   )r   r   re   rf   rg   r   s      r#   gtl_abstract_evalr     r   r%   c                (    t          j        | fi |fS rT   r   r   s      r#   r   r     r   r%   c                   |gS rT   rU   r   s       r#   _gtl_loweringr     r   r%   )r   r   r   r   rT   )r&   r   r'   r(   r   r   )rI   rJ   )F)r&   r   rk   rt   r   r   )ry   )rz   rJ   )r   r   r   rt   )r   r   r4   r   r5   r   )r   r   r4   r   r   r   )r   r   r4   r   r   r   )E__doc__
__future__r   	functoolsr   r   typingr   rM   r   r   	jax.numpynumpyr    jax.tree_utilr   r	   r
   jax._srcr   jax._src.interpretersr   r   r   r   r   r   jax.interpretersr   r   r   jax.experimental.pjitjax.shardingr   rD   r   jax._src.utilr   r/   r$   rH   rR   rV   rs   rx   rP   r   r   r   r   r   r2   	Primitiver   def_implr   def_abstract_eval
deflinear2r   spmd_axis_primitive_batchersaxis_primitive_batchersr   register_loweringr   r   r   r   defvectorizedr   rU   r%   r#   <module>r     s   K J " " " " " " ( ( ( ( ( ( ( (              



       @ @ @ @ @ @ @ @ @ @       $ $ $ $ $ $ * * * * * * & & & & & &       # # # # # # & & & & & &             % % % % % % & & & & & & + + + + + +             " " " " " "    3 3 3 3'* '* '* '* '*TC C C C  - - -B& & & & &0A A A A A+2 +2 +2 +2\ 7 7 7 / / / 0 0 0
 9  9  9  9FW+ W+ W+ W+r &4T^4V%W%W " " + +,Q R R RB B B # 4 45F G G G 0J JK K K   MTGM M %&H IGNwH H  !C D    9= I I I3 3 3 3B), ), ), ),V &4T^4V%W%W " " + +,Q R R RB B B # 4 45F G G G 0J JK K K  9 : : :    9= I I I I Ir%   