diff --git a/pkg/watcher/reconciler/dynamic/dynamic.go b/pkg/watcher/reconciler/dynamic/dynamic.go index 8f2d133e3..22aeec5e1 100644 --- a/pkg/watcher/reconciler/dynamic/dynamic.go +++ b/pkg/watcher/reconciler/dynamic/dynamic.go @@ -16,6 +16,7 @@ package dynamic import ( "context" + "encoding/json" "fmt" "time" @@ -48,6 +49,8 @@ var ( clock = clockwork.NewRealClock() ) +const LogFinalizer = "results.tekton.dev/streaming-logs" + // Reconciler implements common reconciler behavior across different Tekton Run // Object types. type Reconciler struct { @@ -81,8 +84,10 @@ func NewDynamicReconciler(rc pb.ResultsClient, lc pb.LogsClient, oc ObjectClient resultsClient: results.NewClient(rc, lc), objectClient: oc, cfg: cfg, - // Always true predicate. IsReadyForDeletionFunc: func(ctx context.Context, object results.Object) (bool, error) { + if LogsFinalizerExist(object, LogFinalizer) { + return false, nil + } return true, nil }, } @@ -114,6 +119,15 @@ func (r *Reconciler) Reconcile(ctx context.Context, o results.Object) error { // Update logs if enabled. if r.resultsClient.LogsClient != nil { + // Add finalizer for new object if the object has never been reconciled before + // if the object has the results log annotation then the log has been sent/streaming + annotations := o.GetAnnotations() + if _, exists := annotations[annotation.Log]; !exists { + err = r.AddFinalizer(ctx, o) + if err != nil { + return err + } + } if err := r.sendLog(ctx, o); err != nil { logger.Errorw("Error sending log", zap.String("namespace", o.GetNamespace()), @@ -342,11 +356,6 @@ func (r *Reconciler) sendLog(ctx context.Context, o results.Object) error { zap.Error(err), ) } - logger.Debugw("Streaming log completed", - zap.String("namespace", o.GetNamespace()), - zap.String("kind", o.GetObjectKind().GroupVersionKind().Kind), - zap.String("name", o.GetName()), - ) }() } @@ -389,18 +398,18 @@ func (r *Reconciler) streamLogs(ctx context.Context, o results.Object, logType, } errChanRepeater := make(chan error, 100) //some stuff on the internet says buffered channels are better for GC - go func(echan <-chan error, o metav1.Object) { + + // logctx is derived from ctx. Therefore, if ctx is cancelled (either explicitly through a call to its cancel + // function or when it reaches its deadline), logctx will be cancelled automatically. + logctx, _ := context.WithTimeout(ctx, 10*time.Minute) + + go func(ctx context.Context, echan <-chan error, o metav1.Object) { select { case <-ctx.Done(): logger.Warnw("Context done streaming log", zap.String("namespace", o.GetNamespace()), zap.String("name", o.GetName()), ) - case <-time.After(10 * time.Minute): //TODO could make this time configurable, but let's see how useful it is first - logger.Warnw("10 minute timer expired streaming log", - zap.String("namespace", o.GetNamespace()), - zap.String("name", o.GetName()), - ) case writeErr := <-echan: errChanRepeater <- writeErr var gofuncerr error @@ -420,7 +429,33 @@ func (r *Reconciler) streamLogs(ctx context.Context, o results.Object, logType, zap.String("namespace", o.GetNamespace()), zap.String("name", o.GetName()), ) - }(errChan, o) + + err = r.RemoveFinalizer(ctx, o) + if err != nil { + logger.Errorw("Error removing finalizer", + zap.String("namespace", o.GetNamespace()), + zap.String("name", o.GetName()), + zap.Error(err), + ) + return + } + + logger.Debugw("Finalizer removed successfully", + zap.String("namespace", o.GetNamespace()), + zap.String("name", o.GetName()), + ) + + if o.GetDeletionTimestamp() != nil { + err = controller.NewRequeueImmediately() + logger.Errorw("Error requing object for deletion", + zap.String("namespace", o.GetNamespace()), + zap.String("name", o.GetName()), + zap.Error(err), + ) + + } + + }(logctx, errChan, o) // errChanRepeater receives stderr from the TaskRun containers. // This will be forwarded as combined output (stdout and stderr) @@ -430,10 +465,74 @@ func (r *Reconciler) streamLogs(ctx context.Context, o results.Object, logType, Err: writer, }, logChan, errChanRepeater) - logger.Debugw("Exiting streamLogs", - zap.String("namespace", o.GetNamespace()), - zap.String("name", o.GetName()), - ) + return nil +} + +func (r *Reconciler) AddFinalizer(ctx context.Context, o metav1.Object) error { + finalizers := o.GetFinalizers() + for _, f := range finalizers { + if f == LogFinalizer { + return nil + } + } + finalizers = append(finalizers, LogFinalizer) + + patch, err := finalizerPatch(o, finalizers) + if err != nil { + return fmt.Errorf("error adding results log finalizer: %w", err) + } + if err = r.objectClient.Patch(ctx, o.GetName(), types.MergePatchType, patch, metav1.PatchOptions{}); err != nil { + return fmt.Errorf("error patching object: %w", err) + } + return nil +} + +func (r *Reconciler) RemoveFinalizer(ctx context.Context, o metav1.Object) error { + finalizers := o.GetFinalizers() + for i, f := range finalizers { + if f == LogFinalizer { + finalizers = append(finalizers[:i], finalizers[i+1:]...) + patch, err := finalizerPatch(o, finalizers) + if err != nil { + return fmt.Errorf("error removing results log finalizer: %w", err) + } + + if err = r.objectClient.Patch(ctx, o.GetName(), types.MergePatchType, patch, metav1.PatchOptions{}); err != nil { + return fmt.Errorf("error patching object: %w", err) + } + return nil + } + } return nil + +} + +func LogsFinalizerExist(o metav1.Object, finalizer string) bool { + finalizers := o.GetFinalizers() + for _, f := range finalizers { + if f == finalizer { + return true + } + } + return false +} + +type mergePatch struct { + Metadata metadata `json:"metadata"` +} + +type metadata struct { + Finalizer []string `json:"finalizers"` +} + +func finalizerPatch(object metav1.Object, finalizers []string) ([]byte, error) { + data := mergePatch{ + Metadata: metadata{ + Finalizer: []string{}, + }, + } + data.Metadata.Finalizer = finalizers + + return json.Marshal(data) }