Golang - race condition using go-routine
I tried to use the race flag to my program and issue found :(
The func is the following
func (g *WaitingErrorGroup) Start(run func() (bool, error)) {
g.g.Start(func() {
requeue, err := run()
if g.err == nil {
g.err = err
}
if requeue {
g.requeue = requeue
}
})
}
The function is called like following
g.Start(func() (bool, error) {
return install(vins, crObjectKey, releasePrefix, kFilePath, objectCli, dependenciesBroadcastingSchema, compStatus)
})
g.Start(func() (bool, error) {
return false, uninstall(currentRelease, kFilePath, updateChartStatus)
})
The stack trace look like following
WARNING: DATA RACE
Read at 0x00c0001614a8 by goroutine 82:
github.vs.sar/agm/coperator/components/tools.(*WaitingErrorGroup).Start.func1()
/Users/github.vs.sar/agm/coperator/components/tools/waitgroup.go:27 +0x84
k8s.io/apimachinery/pkg/util/wait.(*Group).Start.func1()
/Users/i88893/go/pkg/mod/k8s.io/[email protected]/pkg/util/wait/wait.go:73 +0x6d
The start function is this : (my code ) github.vs.sar/agm/coperator/components/tools.(*WaitingErrorGroup).Start.func1()
func (g *WaitingErrorGroup) Start(run func() (bool, error)) {
g.g.Start(func() {
requeue, err := run()
if g.err == nil {
g.err = err
}
if requeue {
g.requeue = requeue
}
})
}
The second in the stack trace is this (not my code)
go/pkg/mod/k8s.io/[email protected]/pkg/util/wait/wait.go:73 +0x6d
// Start starts f in a new goroutine in the group.
func (g *Group) Start(f func()) {
g.wg.Add(1)
go func() {
defer g.wg.Done()
f()
}()
}
I guess (not an expert in go) that this is related to the usage of g.err
from multiple goroutines
concurrently which isn’t allowed. Same for writing g.requeue
Any idea how to solve this?
Maybe I need to use https://pkg.go.dev/sync#RWMutex But not sure how...
UPDATE
I took @Danil suggestion (change the lock position) and change it like following added mutex in the struct and add lock in the function, does it make sense ? Now when I run with race flag everything seems to be OK
type WaitingErrorGroup struct {
g *wait.Group
mu sync.Mutex
err error
requeue bool
}
func (g *WaitingErrorGroup) Start(run func() (bool, error)) {
g.g.Start(func() {
g.mu.Lock()
defer g.mu.Unlock()
requeue, err := run()
if g.err == nil {
g.err = err
}
if requeue {
g.requeue = requeue
}
})
}
Solution 1:
You could use a channel to communicate the occurring errors and handle them.
For example, something like this.
func handleErrors(c chan error) *sync.WaitGroup {
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for err := range c {
fmt.Println(err)
}
}()
return &wg
}
func main() {
c := make(chan error, 2)
wg := sync.WaitGroup{}
defer handleErrors(c).Wait()
defer close(c)
defer wg.Wait()
wg.Add(2)
go func() {
defer wg.Done()
c <- errors.New("error 1")
}()
go func() {
defer wg.Done()
c <- errors.New("error 2")
}()
}
I think using channels is more idiomatic in go than other sync primitives like locks. Locks are harder to get right, and they can come with performance cost.
If one go routine has the lock, the other goroutines have to wait until the lock is released. So you are introducing a bottleneck in your concurrent execution. In the above example, this is solved by buffering the channel. Even if nothing has read the message yet, still both goroutines are able to pass their message in without being blocked.
Additionally, it can happen that when using a lock, the lock is never released, for example, if the programmer forgot to add the relevant line, leading to a deadlock situation. Although similar bad things can happen when channels are not closed.
Solution 2:
The problem appears because you trying to manipulate not synchronized shared memory (g.err
in your case) from different goroutines.
You have two different approaches to handle concurrent code in go:
- Synchronization primitives for sharing memory (e.g., sync.Mutex)
- Synchronization via communicating (e.g., channels)
It seems that in your code you follow Synchronization primitives for sharing memory
and to resolve the error you need to synchronize access to g.err
.
You can use sync.Mutex and sync.RWMutex
In your case you will have:
func (g *WaitingErrorGroup) Start(run func() (bool, error)) {
g.g.Start(func() {
requeue, err := run()
// Lock before reading and writing g.err and unlock after
g.mu.Lock()
defer g.mu.Unlock()
if g.err == nil {
g.err = err
}
if requeue {
g.requeue = requeue
}
})
}
However, according to Robe Pike more idiomatic way for Go is Don't communicate by sharing memory; share memory by communicating.
(it means use channels, not mutexes). It is already mentioned by @TheFool and @kostix
in the comments.
But it's not clear for me from the question do you have a possibility to redesign your code to follow this idiom.