Golang - race condition using go-routine

I tried to use the race flag to my program and issue found :(

The func is the following

func (g *WaitingErrorGroup) Start(run func() (bool, error)) {
    g.g.Start(func() {
        requeue, err := run()
        if g.err == nil {
            g.err = err
        }
        if requeue {
            g.requeue = requeue
        }
    })
}

The function is called like following

g.Start(func() (bool, error) {
    return install(vins, crObjectKey, releasePrefix, kFilePath, objectCli, dependenciesBroadcastingSchema, compStatus)
})

g.Start(func() (bool, error) {
    return false, uninstall(currentRelease, kFilePath, updateChartStatus)
})

The stack trace look like following

WARNING: DATA RACE
Read at 0x00c0001614a8 by goroutine 82:
  github.vs.sar/agm/coperator/components/tools.(*WaitingErrorGroup).Start.func1()
      /Users/github.vs.sar/agm/coperator/components/tools/waitgroup.go:27 +0x84
  k8s.io/apimachinery/pkg/util/wait.(*Group).Start.func1()
      /Users/i88893/go/pkg/mod/k8s.io/[email protected]/pkg/util/wait/wait.go:73 +0x6d

The start function is this : (my code ) github.vs.sar/agm/coperator/components/tools.(*WaitingErrorGroup).Start.func1()

func (g *WaitingErrorGroup) Start(run func() (bool, error)) {
    g.g.Start(func() {
        requeue, err := run()
        if g.err == nil {
            g.err = err
        }
        if requeue {
            g.requeue = requeue
        }
    })
}

The second in the stack trace is this (not my code)

go/pkg/mod/k8s.io/[email protected]/pkg/util/wait/wait.go:73 +0x6d // Start starts f in a new goroutine in the group.

func (g *Group) Start(f func()) {
    g.wg.Add(1)
    go func() {
        defer g.wg.Done()
        f()
    }()
}

I guess (not an expert in go) that this is related to the usage of g.err from multiple goroutines concurrently which isn’t allowed. Same for writing g.requeue Any idea how to solve this?

Maybe I need to use https://pkg.go.dev/sync#RWMutex But not sure how...

UPDATE

I took @Danil suggestion (change the lock position) and change it like following added mutex in the struct and add lock in the function, does it make sense ? Now when I run with race flag everything seems to be OK

type WaitingErrorGroup struct {
    g       *wait.Group
    mu      sync.Mutex
    err     error
    requeue bool
}

func (g *WaitingErrorGroup) Start(run func() (bool, error)) {
    g.g.Start(func() {
        g.mu.Lock()
        defer g.mu.Unlock()
        requeue, err := run()
        if g.err == nil {
            g.err = err
        }
        if requeue {
            g.requeue = requeue
        }
    })
}

Solution 1:

You could use a channel to communicate the occurring errors and handle them.

For example, something like this.

func handleErrors(c chan error) *sync.WaitGroup {
    wg := sync.WaitGroup{}
    wg.Add(1)
    go func() {
        defer wg.Done()
        for err := range c {
            fmt.Println(err)
        }
    }()
    return &wg
}

func main() {
    c := make(chan error, 2)
    wg := sync.WaitGroup{}

    defer handleErrors(c).Wait()
    defer close(c)
    defer wg.Wait()

    wg.Add(2)
    go func() {
        defer wg.Done()
        c <- errors.New("error 1")
    }()
    go func() {
        defer wg.Done()
        c <- errors.New("error 2")
    }()

}

I think using channels is more idiomatic in go than other sync primitives like locks. Locks are harder to get right, and they can come with performance cost.

If one go routine has the lock, the other goroutines have to wait until the lock is released. So you are introducing a bottleneck in your concurrent execution. In the above example, this is solved by buffering the channel. Even if nothing has read the message yet, still both goroutines are able to pass their message in without being blocked.

Additionally, it can happen that when using a lock, the lock is never released, for example, if the programmer forgot to add the relevant line, leading to a deadlock situation. Although similar bad things can happen when channels are not closed.

Solution 2:

The problem appears because you trying to manipulate not synchronized shared memory (g.err in your case) from different goroutines.

You have two different approaches to handle concurrent code in go:

  • Synchronization primitives for sharing memory (e.g., sync.Mutex)
  • Synchronization via communicating (e.g., channels)

It seems that in your code you follow Synchronization primitives for sharing memory and to resolve the error you need to synchronize access to g.err.

You can use sync.Mutex and sync.RWMutex

In your case you will have:

func (g *WaitingErrorGroup) Start(run func() (bool, error)) {
    g.g.Start(func() {
        requeue, err := run()
        
        // Lock before reading and writing g.err and unlock after
        g.mu.Lock()
        defer g.mu.Unlock()
        
        if g.err == nil {
            g.err = err
        }
        if requeue {
            g.requeue = requeue
        }
    })
}

However, according to Robe Pike more idiomatic way for Go is Don't communicate by sharing memory; share memory by communicating.(it means use channels, not mutexes). It is already mentioned by @TheFool and @kostix in the comments.

But it's not clear for me from the question do you have a possibility to redesign your code to follow this idiom.