diff --git a/pkg/watcher/reconciler/dynamic/dynamic.go b/pkg/watcher/reconciler/dynamic/dynamic.go index 1eddb2683..8a02fce6e 100644 --- a/pkg/watcher/reconciler/dynamic/dynamic.go +++ b/pkg/watcher/reconciler/dynamic/dynamic.go @@ -16,6 +16,7 @@ package dynamic import ( "context" + "encoding/json" "fmt" "time" @@ -48,6 +49,8 @@ var ( clock = clockwork.NewRealClock() ) +const LogFinalizer = "results.tekton.dev/streaming-logs" + // Reconciler implements common reconciler behavior across different Tekton Run // Object types. type Reconciler struct { @@ -81,8 +84,10 @@ func NewDynamicReconciler(rc pb.ResultsClient, lc pb.LogsClient, oc ObjectClient resultsClient: results.NewClient(rc, lc), objectClient: oc, cfg: cfg, - // Always true predicate. IsReadyForDeletionFunc: func(ctx context.Context, object results.Object) (bool, error) { + if LogsFinalizerExist(object, LogFinalizer) { + return false, nil + } return true, nil }, } @@ -114,6 +119,15 @@ func (r *Reconciler) Reconcile(ctx context.Context, o results.Object) error { // Update logs if enabled. if r.resultsClient.LogsClient != nil { + // Add finalizer for new object if the object has never been reconciled before + // if the object has the results log annotation then the log has been sent/streaming + annotations := o.GetAnnotations() + if _, exists := annotations[annotation.Log]; !exists { + err = r.AddFinalizer(ctx, o) + if err != nil { + return err + } + } if err := r.sendLog(ctx, o); err != nil { logger.Errorw("Error sending log", zap.String("namespace", o.GetNamespace()), @@ -342,11 +356,6 @@ func (r *Reconciler) sendLog(ctx context.Context, o results.Object) error { zap.Error(err), ) } - logger.Debugw("Streaming log completed", - zap.String("namespace", o.GetNamespace()), - zap.String("kind", o.GetObjectKind().GroupVersionKind().Kind), - zap.String("name", o.GetName()), - ) }() } @@ -388,19 +397,65 @@ func (r *Reconciler) streamLogs(ctx context.Context, o results.Object, logType, return fmt.Errorf("error reading from tkn reader: %w", err) } - errChanRepeater := make(chan error) - go func(echan <-chan error, o metav1.Object) { - writeErr := <-echan - errChanRepeater <- writeErr + errChanRepeater := make(chan error, 100) - _, err := writer.Flush() - if err != nil { - logger.Error(err) + // logctx is derived from ctx. Therefore, if ctx is cancelled (either explicitly through a call to its cancel + // function or when it reaches its deadline), logctx will be cancelled automatically. + // TODO: Implement configurable timeout based on user feedback analysis. + logctx, _ := context.WithTimeout(ctx, 10*time.Minute) + + go func(ctx context.Context, echan <-chan error, o metav1.Object) { + defer close(errChanRepeater) + select { + case <-ctx.Done(): + logger.Warnw("Context done streaming log", + zap.String("namespace", o.GetNamespace()), + zap.String("name", o.GetName()), + ) + case writeErr := <-echan: + errChanRepeater <- writeErr } + var gofuncerr error + _, gofuncerr = writer.Flush() + if gofuncerr != nil { + logger.Error(gofuncerr) + } + logger.Info("Flush in streamLogs done", + zap.String("namespace", o.GetNamespace()), + zap.String("name", o.GetName()), + ) if err = logsClient.CloseSend(); err != nil { logger.Error(err) } - }(errChan, o) + logger.Info("Gofunc in streamLogs done", + zap.String("namespace", o.GetNamespace()), + zap.String("name", o.GetName()), + ) + err = r.RemoveFinalizer(ctx, o) + if err != nil { + logger.Errorw("Error removing finalizer", + zap.String("namespace", o.GetNamespace()), + zap.String("name", o.GetName()), + zap.Error(err), + ) + return + } + + logger.Info("Finalizer removed successfully", + zap.String("namespace", o.GetNamespace()), + zap.String("name", o.GetName()), + ) + + if o.GetDeletionTimestamp() != nil { + err = controller.NewRequeueImmediately() + logger.Errorw("Error requing object for deletion", + zap.String("namespace", o.GetNamespace()), + zap.String("name", o.GetName()), + zap.Error(err), + ) + } + + }(logctx, errChan, o) // errChanRepeater receives stderr from the TaskRun containers. // This will be forwarded as combined output (stdout and stderr) @@ -410,5 +465,86 @@ func (r *Reconciler) streamLogs(ctx context.Context, o results.Object, logType, Err: writer, }, logChan, errChanRepeater) + logger.Info("Exiting streaming logs", + zap.String("namespace", o.GetNamespace()), + zap.String("name", o.GetName()), + ) + return nil +} + +func (r *Reconciler) AddFinalizer(ctx context.Context, o metav1.Object) error { + if ownerReferences := o.GetOwnerReferences(); len(ownerReferences) > 0 { + // do not add finalizer if the object is owned by a PipelineRun object + for _, or := range ownerReferences { + if or.Kind == "PipelineRun" { + return nil + } + } + } + finalizers := o.GetFinalizers() + for _, f := range finalizers { + if f == LogFinalizer { + return nil + } + } + finalizers = append(finalizers, LogFinalizer) + + patch, err := finalizerPatch(finalizers) + if err != nil { + return fmt.Errorf("error adding results log finalizer: %w", err) + } + + if err = r.objectClient.Patch(ctx, o.GetName(), types.MergePatchType, patch, metav1.PatchOptions{}); err != nil { + return fmt.Errorf("error patching object: %w", err) + } return nil } + +func (r *Reconciler) RemoveFinalizer(ctx context.Context, o metav1.Object) error { + finalizers := o.GetFinalizers() + for i, f := range finalizers { + if f == LogFinalizer { + finalizers = append(finalizers[:i], finalizers[i+1:]...) + patch, err := finalizerPatch(finalizers) + if err != nil { + return fmt.Errorf("error removing results log finalizer: %w", err) + } + + if err = r.objectClient.Patch(ctx, o.GetName(), types.MergePatchType, patch, metav1.PatchOptions{}); err != nil { + return fmt.Errorf("error patching object: %w", err) + } + return nil + } + } + return nil + +} + +func LogsFinalizerExist(o metav1.Object, finalizer string) bool { + finalizers := o.GetFinalizers() + for _, f := range finalizers { + if f == finalizer { + return true + } + } + return false +} + +type mergePatch struct { + Metadata metadata `json:"metadata"` +} + +type metadata struct { + Finalizer []string `json:"finalizers"` +} + +func finalizerPatch(finalizers []string) ([]byte, error) { + data := mergePatch{ + Metadata: metadata{ + Finalizer: []string{}, + }, + } + data.Metadata.Finalizer = finalizers + + return json.Marshal(data) +}