diff --git a/pkg/helper.go b/pkg/helper.go index 300b30b..f527d15 100644 --- a/pkg/helper.go +++ b/pkg/helper.go @@ -308,9 +308,8 @@ func writeToBlob(ctx context.Context, et time.Time, timeKey string, filter Filte metrics.SyncEvent.Add(float64(items)) metrics.SyncEventWithLabel.With(prometheus.Labels{"type": filter.Name}).Add(float64(items)) } - writerMutex.Lock() - delete(timeKeyWriters, timeKey) - writerMutex.Unlock() + + timeKeyWritersMap.Delete(timeKey) close(ch) log.Info().Msg("writer stopped") @@ -405,8 +404,7 @@ func loadToBigQueryJob(job Job, log zerolog.Logger, blobs []string, filter Filte } var ( - timeKeyWriters = make(map[string]chan *pubsub.Message) - writerMutex sync.Mutex + timeKeyWritersMap sync.Map ) func WaitAndGoogleStorageSync(ctx context.Context, wg *sync.WaitGroup, job Job, eventChannel chan *pubsub.Message, loadToBigQuery bool) { @@ -496,14 +494,12 @@ func WaitAndGoogleStorageSync(ctx context.Context, wg *sync.WaitGroup, job Job, timeKey := fmt.Sprintf("%s-%s", filter.Name, et.Format("2006-01-02-15-04")) timeKey = timeKey[:len(timeKey)-1] //10 minute buckets - wr, ok := timeKeyWriters[timeKey] + wr, ok := timeKeyWritersMap.Load(timeKey) if !ok { - writerMutex.Lock() wr = writeToBlob(ctx, et, timeKey, filter, job, storageClient, bulkSize, loadToBigQuery) - timeKeyWriters[timeKey] = wr - writerMutex.Unlock() + timeKeyWritersMap.Store(timeKey, wr) } - wr <- mx + wr.(chan *pubsub.Message) <- mx it++ } }(filter)