Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] finalizer approach to fix race condition due to pruning in results watcher #703

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 150 additions & 14 deletions pkg/watcher/reconciler/dynamic/dynamic.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package dynamic

import (
"context"
"encoding/json"
"fmt"
"time"

Expand Down Expand Up @@ -48,6 +49,8 @@ var (
clock = clockwork.NewRealClock()
)

const LogFinalizer = "results.tekton.dev/streaming-logs"

// Reconciler implements common reconciler behavior across different Tekton Run
// Object types.
type Reconciler struct {
Expand Down Expand Up @@ -81,8 +84,10 @@ func NewDynamicReconciler(rc pb.ResultsClient, lc pb.LogsClient, oc ObjectClient
resultsClient: results.NewClient(rc, lc),
objectClient: oc,
cfg: cfg,
// Always true predicate.
IsReadyForDeletionFunc: func(ctx context.Context, object results.Object) (bool, error) {
if LogsFinalizerExist(object, LogFinalizer) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would think that this is not needed, as it is redundant with the finalizer pattern.

I would expect the resource to be marked as being deleted, but the deletion to be defered by k8s until the last finalizer has been removed.

I could definitely be wrong though 😉

Copy link
Member Author

@ramessesii2 ramessesii2 Feb 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right deletion is deferred if the Finalizer is there.
So, prior to that change we could see the following message while debugging even if the resource is not deleted properly(due to Finalizer) which is misleading and more importantly we do go on to execute AfterDeletion

return false, nil
}
return true, nil
},
}
Expand Down Expand Up @@ -114,6 +119,15 @@ func (r *Reconciler) Reconcile(ctx context.Context, o results.Object) error {

// Update logs if enabled.
if r.resultsClient.LogsClient != nil {
// Add finalizer for new object if the object has never been reconciled before
// if the object has the results log annotation then the log has been sent/streaming
annotations := o.GetAnnotations()
if _, exists := annotations[annotation.Log]; !exists {
err = r.AddFinalizer(ctx, o)
if err != nil {
return err
}
}
if err := r.sendLog(ctx, o); err != nil {
logger.Errorw("Error sending log",
zap.String("namespace", o.GetNamespace()),
Expand Down Expand Up @@ -342,11 +356,6 @@ func (r *Reconciler) sendLog(ctx context.Context, o results.Object) error {
zap.Error(err),
)
}
logger.Debugw("Streaming log completed",
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This asynchronous functions never reaches this point if there is a successful streaming of logs while we do see this when there an error while streaming logs

zap.String("namespace", o.GetNamespace()),
zap.String("kind", o.GetObjectKind().GroupVersionKind().Kind),
zap.String("name", o.GetName()),
)
}()
}

Expand Down Expand Up @@ -388,19 +397,65 @@ func (r *Reconciler) streamLogs(ctx context.Context, o results.Object, logType,
return fmt.Errorf("error reading from tkn reader: %w", err)
}

errChanRepeater := make(chan error)
go func(echan <-chan error, o metav1.Object) {
writeErr := <-echan
errChanRepeater <- writeErr
errChanRepeater := make(chan error, 100)

_, err := writer.Flush()
if err != nil {
logger.Error(err)
// logctx is derived from ctx. Therefore, if ctx is cancelled (either explicitly through a call to its cancel
// function or when it reaches its deadline), logctx will be cancelled automatically.
// TODO: Implement configurable timeout based on user feedback analysis.
logctx, _ := context.WithTimeout(ctx, 10*time.Minute)
ramessesii2 marked this conversation as resolved.
Show resolved Hide resolved

go func(ctx context.Context, echan <-chan error, o metav1.Object) {
defer close(errChanRepeater)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good use of defer @ramessesii2 @sayan-biswas

where are we closing the original logChan and errChan from the reader.Read() ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logChan and errChan channels are receiver ends. When the channels ae no longer used, they will be garbage collected.
It's new subject for me, this is my reference - https://stackoverflow.com/questions/8593645/is-it-ok-to-leave-a-channel-open

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah there was a bit of back and forth on that thread @ramessesii2 but yes I think that is the end interpretation.

good find / research @ramessesii2

that said, given the exchanges I just saw with @pmacik today in slack, I should probably make a more thorough pass of the changes here .... admittedly I only focused on the mem leak bit initially.

will do that today

thanks

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok @ramessesii2 @sayan-biswas I think I looked at this too quickly yesterday

the close of the errChanRepeater should not be in the goroutine that blocks on it !!

move the defer close(errChanRepeater) to immediately after the declaration of errChanRepeater in your current line 400

Copy link
Member Author

@ramessesii2 ramessesii2 Feb 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason why I'm deferring close(errChanRepeater) in this goroutine is because it's important to tell the receiving goroutines that all data have been sent which is here:

tknlog.NewWriter(logType, true).Write(&cli.Stream{
		Out: writer,
		Err: writer,
	}, logChan, errChanRepeater)

else the above code will not exit the execution if we don't close errChanRepeater before we do a tknlog.NewWriter()...... I confirmed this as well while debugging.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep we are good here per the thread below

select {
case <-ctx.Done():
logger.Warnw("Context done streaming log",
zap.String("namespace", o.GetNamespace()),
zap.String("name", o.GetName()),
)
case writeErr := <-echan:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to close the loop @ramessesii2 on you comment https://github.com/tektoncd/results/pull/703/files#r1486049142 if you are only closing the channel in the go routine, what unblocks this call?

are we always hitting the case <-ctx.Done() path ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey Gabe,
We don't hit ctx.Done() most of the time. Actually, I've never encountered that bcs of 10 min of timeout.

case writeErr := <-echan, what unblocks this call?

My understanding is based on the fact that even if echan is closed by tkn reader, reading a closed channel, it will always be successful, returning the zero value.
We could run into problem if echan is a nil. Reading from or writing to a nil channel causes code to hang forever. But since the check was not there in the original Results code I assume we expect a non-nil channel from tknReader.
Although, in the next iteration, I'll address it. I missed this edge case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

neither the tekton reader or writer are closing this channel based on the code I've seen @ramessesii2

can you provide the code link other than this goroutine where this channel is being closed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nevermind I found it :-)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, the flow is tknReader.read() -> readPipelineLog() -> finally landing to readAvailablePipelineLogs() for our case since we only stream longs once PipelineRuns/TRs have a specific closed state -> close echan (errC)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure how the original echan could ever be nil, but in general defensive programming is good :-)

I'll wait for you update and then we can go from there - thanks

errChanRepeater <- writeErr
}
var gofuncerr error
_, gofuncerr = writer.Flush()
if gofuncerr != nil {
logger.Error(gofuncerr)
}
logger.Info("Flush in streamLogs done",
zap.String("namespace", o.GetNamespace()),
zap.String("name", o.GetName()),
)
if err = logsClient.CloseSend(); err != nil {
logger.Error(err)
}
}(errChan, o)
logger.Info("Gofunc in streamLogs done",
zap.String("namespace", o.GetNamespace()),
zap.String("name", o.GetName()),
)
err = r.RemoveFinalizer(ctx, o)
if err != nil {
logger.Errorw("Error removing finalizer",
zap.String("namespace", o.GetNamespace()),
zap.String("name", o.GetName()),
zap.Error(err),
)
return
}

logger.Info("Finalizer removed successfully",
zap.String("namespace", o.GetNamespace()),
zap.String("name", o.GetName()),
)

if o.GetDeletionTimestamp() != nil {
err = controller.NewRequeueImmediately()
logger.Errorw("Error requing object for deletion",
zap.String("namespace", o.GetNamespace()),
zap.String("name", o.GetName()),
zap.Error(err),
)
}

}(logctx, errChan, o)

// errChanRepeater receives stderr from the TaskRun containers.
// This will be forwarded as combined output (stdout and stderr)
Expand All @@ -410,5 +465,86 @@ func (r *Reconciler) streamLogs(ctx context.Context, o results.Object, logType,
Err: writer,
}, logChan, errChanRepeater)

logger.Info("Exiting streaming logs",
zap.String("namespace", o.GetNamespace()),
zap.String("name", o.GetName()),
)
return nil
}

func (r *Reconciler) AddFinalizer(ctx context.Context, o metav1.Object) error {
if ownerReferences := o.GetOwnerReferences(); len(ownerReferences) > 0 {
// do not add finalizer if the object is owned by a PipelineRun object
for _, or := range ownerReferences {
if or.Kind == "PipelineRun" {
return nil
}
}
}
finalizers := o.GetFinalizers()
for _, f := range finalizers {
if f == LogFinalizer {
return nil
}
}
finalizers = append(finalizers, LogFinalizer)

patch, err := finalizerPatch(finalizers)
if err != nil {
return fmt.Errorf("error adding results log finalizer: %w", err)
}

if err = r.objectClient.Patch(ctx, o.GetName(), types.MergePatchType, patch, metav1.PatchOptions{}); err != nil {
return fmt.Errorf("error patching object: %w", err)
}
return nil
}

func (r *Reconciler) RemoveFinalizer(ctx context.Context, o metav1.Object) error {
finalizers := o.GetFinalizers()
for i, f := range finalizers {
if f == LogFinalizer {
finalizers = append(finalizers[:i], finalizers[i+1:]...)
patch, err := finalizerPatch(finalizers)
if err != nil {
return fmt.Errorf("error removing results log finalizer: %w", err)
}

if err = r.objectClient.Patch(ctx, o.GetName(), types.MergePatchType, patch, metav1.PatchOptions{}); err != nil {
return fmt.Errorf("error patching object: %w", err)
}
return nil
}
}
return nil

}

func LogsFinalizerExist(o metav1.Object, finalizer string) bool {
finalizers := o.GetFinalizers()
for _, f := range finalizers {
if f == finalizer {
return true
}
}
return false
}

type mergePatch struct {
Metadata metadata `json:"metadata"`
}

type metadata struct {
Finalizer []string `json:"finalizers"`
}

func finalizerPatch(finalizers []string) ([]byte, error) {
data := mergePatch{
Metadata: metadata{
Finalizer: []string{},
},
}
data.Metadata.Finalizer = finalizers

return json.Marshal(data)
}