Skip to content

Commit

Permalink
[Feature][RayCluster]: introduce RayClusterSuspending and RayClusterS…
Browse files Browse the repository at this point in the history
…uspended conditions (#2403)
  • Loading branch information
rueian authored Oct 8, 2024
1 parent 35e913a commit b5f14f1
Show file tree
Hide file tree
Showing 6 changed files with 307 additions and 49 deletions.
4 changes: 4 additions & 0 deletions ray-operator/apis/ray/v1/raycluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,10 @@ const (
HeadPodReady RayClusterConditionType = "HeadPodReady"
// RayClusterReplicaFailure is added in a RayCluster when one of its pods fails to be created or deleted.
RayClusterReplicaFailure RayClusterConditionType = "ReplicaFailure"
// RayClusterSuspending is set to true when a user sets .Spec.Suspend to true, ensuring the atomicity of the suspend operation.
RayClusterSuspending RayClusterConditionType = "RayClusterSuspending"
// RayClusterSuspended is set to true when all Pods belonging to a suspending RayCluster are deleted. Note that RayClusterSuspending and RayClusterSuspended cannot both be true at the same time.
RayClusterSuspended RayClusterConditionType = "RayClusterSuspended"
)

// HeadInfo gives info about head
Expand Down
112 changes: 99 additions & 13 deletions ray-operator/controllers/ray/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,10 +210,24 @@ func (r *RayClusterReconciler) deleteAllPods(ctx context.Context, filters common
return pods, nil
}

func (r *RayClusterReconciler) validateRayClusterStatus(instance *rayv1.RayCluster) error {
suspending := meta.IsStatusConditionTrue(instance.Status.Conditions, string(rayv1.RayClusterSuspending))
suspended := meta.IsStatusConditionTrue(instance.Status.Conditions, string(rayv1.RayClusterSuspended))
if suspending && suspended {
return errstd.New("invalid RayCluster State: rayv1.RayClusterSuspending and rayv1.RayClusterSuspended conditions should not be both true")
}
return nil
}

func (r *RayClusterReconciler) rayClusterReconcile(ctx context.Context, request ctrl.Request, instance *rayv1.RayCluster) (ctrl.Result, error) {
var reconcileErr error
logger := ctrl.LoggerFrom(ctx)

if err := r.validateRayClusterStatus(instance); err != nil {
logger.Error(err, "The RayCluster status is invalid")
return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, err
}

// Please do NOT modify `originalRayClusterInstance` in the following code.
originalRayClusterInstance := instance.DeepCopy()

Expand Down Expand Up @@ -340,10 +354,11 @@ func (r *RayClusterReconciler) rayClusterReconcile(ctx context.Context, request
// Calculate the new status for the RayCluster. Note that the function will deep copy `instance` instead of mutating it.
newInstance, calculateErr := r.calculateStatus(ctx, instance, reconcileErr)
var updateErr error
var inconsistent bool
if calculateErr != nil {
logger.Info("Got error when calculating new status", "cluster name", request.Name, "error", calculateErr)
} else {
updateErr = r.updateRayClusterStatus(ctx, originalRayClusterInstance, newInstance)
inconsistent, updateErr = r.updateRayClusterStatus(ctx, originalRayClusterInstance, newInstance)
}

// Return error based on order.
Expand All @@ -355,7 +370,10 @@ func (r *RayClusterReconciler) rayClusterReconcile(ctx context.Context, request
} else {
err = updateErr
}
if err != nil {
// If the custom resource's status is updated, requeue the reconcile key.
// Without this behavior, atomic operations such as the suspend operation would need to wait for `RAYCLUSTER_DEFAULT_REQUEUE_SECONDS` to delete Pods
// after the condition rayv1.RayClusterSuspending is set to true.
if err != nil || inconsistent {
return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, err
}

Expand Down Expand Up @@ -614,8 +632,11 @@ func (r *RayClusterReconciler) reconcileHeadlessService(ctx context.Context, ins
func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv1.RayCluster) error {
logger := ctrl.LoggerFrom(ctx)

// if RayCluster is suspended, delete all pods and skip reconcile
if instance.Spec.Suspend != nil && *instance.Spec.Suspend {
// if RayCluster is suspending, delete all pods and skip reconcile
suspendStatus := utils.FindRayClusterSuspendStatus(instance)
statusConditionGateEnabled := features.Enabled(features.RayClusterStatusConditions)
if suspendStatus == rayv1.RayClusterSuspending ||
(!statusConditionGateEnabled && instance.Spec.Suspend != nil && *instance.Spec.Suspend) {
if _, err := r.deleteAllPods(ctx, common.RayClusterAllPodsAssociationOptions(instance)); err != nil {
r.Recorder.Eventf(instance, corev1.EventTypeWarning, string(utils.FailedToDeletePod),
"Failed deleting Pods due to suspension for RayCluster %s/%s, %v",
Expand All @@ -629,6 +650,16 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
return nil
}

if statusConditionGateEnabled {
if suspendStatus == rayv1.RayClusterSuspended {
return nil // stop reconcilePods because the cluster is suspended.
}
// (suspendStatus != rayv1.RayClusterSuspending) is always true here because it has been checked above.
if instance.Spec.Suspend != nil && *instance.Spec.Suspend {
return nil // stop reconcilePods because the cluster is going to suspend.
}
}

// check if all the pods exist
headPods := corev1.PodList{}
if err := r.List(ctx, &headPods, common.RayClusterHeadPodsAssociationOptions(instance).ToListOptions()...); err != nil {
Expand Down Expand Up @@ -1177,7 +1208,8 @@ func (r *RayClusterReconciler) calculateStatus(ctx context.Context, instance *ra
// Deep copy the instance, so we don't mutate the original object.
newInstance := instance.DeepCopy()

if features.Enabled(features.RayClusterStatusConditions) {
statusConditionGateEnabled := features.Enabled(features.RayClusterStatusConditions)
if statusConditionGateEnabled {
if reconcileErr != nil {
if reason := utils.RayClusterReplicaFailureReason(reconcileErr); reason != "" {
meta.SetStatusCondition(&newInstance.Status.Conditions, metav1.Condition{
Expand Down Expand Up @@ -1218,8 +1250,8 @@ func (r *RayClusterReconciler) calculateStatus(ctx context.Context, instance *ra
newInstance.Status.State = rayv1.Ready //nolint:staticcheck // https://github.com/ray-project/kuberay/pull/2288
}

// Check if the head node is running and ready by checking the head pod's status.
if features.Enabled(features.RayClusterStatusConditions) {
// Check if the head node is running and ready by checking the head pod's status or if the cluster has been suspended.
if statusConditionGateEnabled {
headPod, err := common.GetRayClusterHeadPod(ctx, r, newInstance)
if err != nil {
return nil, err
Expand All @@ -1237,9 +1269,10 @@ func (r *RayClusterReconciler) calculateStatus(ctx context.Context, instance *ra
meta.SetStatusCondition(&newInstance.Status.Conditions, headPodReadyCondition)
}

if !meta.IsStatusConditionTrue(newInstance.Status.Conditions, string(rayv1.RayClusterProvisioned)) {
suspendStatus := utils.FindRayClusterSuspendStatus(newInstance)
if !meta.IsStatusConditionTrue(newInstance.Status.Conditions, string(rayv1.RayClusterProvisioned)) && suspendStatus != rayv1.RayClusterSuspended {
// RayClusterProvisioned indicates whether all Ray Pods are ready when the RayCluster is first created.
// Note RayClusterProvisioned StatusCondition will not be updated after all Ray Pods are ready for the first time.
// Note RayClusterProvisioned StatusCondition will not be updated after all Ray Pods are ready for the first time. Unless the cluster has been suspended.
if utils.CheckAllPodsRunning(ctx, runtimePods) {
meta.SetStatusCondition(&newInstance.Status.Conditions, metav1.Condition{
Type: string(rayv1.RayClusterProvisioned),
Expand All @@ -1257,6 +1290,53 @@ func (r *RayClusterReconciler) calculateStatus(ctx context.Context, instance *ra
}
}

if suspendStatus == rayv1.RayClusterSuspending {
if len(runtimePods.Items) == 0 {
meta.SetStatusCondition(&newInstance.Status.Conditions, metav1.Condition{
Type: string(rayv1.RayClusterProvisioned),
Status: metav1.ConditionFalse,
Reason: rayv1.RayClusterPodsProvisioning,
Message: "RayCluster has been suspended",
})
meta.SetStatusCondition(&newInstance.Status.Conditions, metav1.Condition{
Type: string(rayv1.RayClusterSuspending),
Reason: string(rayv1.RayClusterSuspending),
Status: metav1.ConditionFalse,
})
meta.SetStatusCondition(&newInstance.Status.Conditions, metav1.Condition{
Type: string(rayv1.RayClusterSuspended),
Reason: string(rayv1.RayClusterSuspended),
Status: metav1.ConditionTrue,
})
}
} else if suspendStatus == rayv1.RayClusterSuspended {
if instance.Spec.Suspend != nil && !*instance.Spec.Suspend {
meta.SetStatusCondition(&newInstance.Status.Conditions, metav1.Condition{
Type: string(rayv1.RayClusterSuspended),
Reason: string(rayv1.RayClusterSuspended),
Status: metav1.ConditionFalse,
})
}
} else {
meta.SetStatusCondition(&newInstance.Status.Conditions, metav1.Condition{
Type: string(rayv1.RayClusterSuspended),
Reason: string(rayv1.RayClusterSuspended),
Status: metav1.ConditionFalse,
})
if instance.Spec.Suspend != nil && *instance.Spec.Suspend {
meta.SetStatusCondition(&newInstance.Status.Conditions, metav1.Condition{
Type: string(rayv1.RayClusterSuspending),
Reason: string(rayv1.RayClusterSuspending),
Status: metav1.ConditionTrue,
})
} else {
meta.SetStatusCondition(&newInstance.Status.Conditions, metav1.Condition{
Type: string(rayv1.RayClusterSuspending),
Reason: string(rayv1.RayClusterSuspending),
Status: metav1.ConditionFalse,
})
}
}
}

if newInstance.Spec.Suspend != nil && *newInstance.Spec.Suspend && len(runtimePods.Items) == 0 {
Expand Down Expand Up @@ -1359,6 +1439,9 @@ func (r *RayClusterReconciler) updateHeadInfo(ctx context.Context, instance *ray
if headPod != nil {
instance.Status.Head.PodIP = headPod.Status.PodIP
instance.Status.Head.PodName = headPod.Name
} else {
instance.Status.Head.PodIP = ""
instance.Status.Head.PodName = ""
}

ip, name, err := r.getHeadServiceIPAndName(ctx, instance)
Expand Down Expand Up @@ -1514,17 +1597,20 @@ func (r *RayClusterReconciler) reconcileAutoscalerRoleBinding(ctx context.Contex
return nil
}

func (r *RayClusterReconciler) updateRayClusterStatus(ctx context.Context, originalRayClusterInstance, newInstance *rayv1.RayCluster) error {
// updateRayClusterStatus updates the RayCluster status if it is inconsistent with the old status and returns a bool to indicate the inconsistency.
// We rely on the returning bool to requeue the reconciliation for atomic operations, such as suspending a RayCluster.
func (r *RayClusterReconciler) updateRayClusterStatus(ctx context.Context, originalRayClusterInstance, newInstance *rayv1.RayCluster) (bool, error) {
logger := ctrl.LoggerFrom(ctx)
if !r.inconsistentRayClusterStatus(ctx, originalRayClusterInstance.Status, newInstance.Status) {
return nil
inconsistent := r.inconsistentRayClusterStatus(ctx, originalRayClusterInstance.Status, newInstance.Status)
if !inconsistent {
return inconsistent, nil
}
logger.Info("updateRayClusterStatus", "name", originalRayClusterInstance.Name, "old status", originalRayClusterInstance.Status, "new status", newInstance.Status)
err := r.Status().Update(ctx, newInstance)
if err != nil {
logger.Info("Error updating status", "name", originalRayClusterInstance.Name, "error", err, "RayCluster", newInstance)
}
return err
return inconsistent, err
}

// sumGPUs sums the GPUs in the given resource list.
Expand Down
Loading

0 comments on commit b5f14f1

Please sign in to comment.