Skip to content

Commit e4dfadb

Browse files
committed
fix store handling of many pids per container, add tests
1 parent 1ef6055 commit e4dfadb

File tree

6 files changed

+1569
-43
lines changed

6 files changed

+1569
-43
lines changed

pkg/components/kube/store.go

Lines changed: 38 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -82,15 +82,15 @@ type Store struct {
8282

8383
metadataNotifier meta.Notifier
8484

85-
containerIDs map[string]*container.Info
85+
containerIDs maps.Map2[string, uint32, *container.Info]
8686

8787
// stores container info by PID. It is only required for
8888
// deleting entries in namespaces and podsByContainer when DeleteProcess is called
8989
containerByPID map[uint32]*container.Info
9090

9191
// a single namespace will point to any container inside the pod
9292
// but we don't care which one
93-
namespaces map[uint32]*container.Info
93+
namespaces maps.Map2[uint32, uint32, *container.Info]
9494

9595
// container ID to pod matcher
9696
podsByContainer map[string]*CachedObjMeta
@@ -126,8 +126,8 @@ func NewStore(kubeMetadata meta.Notifier, resourceLabels ResourceLabels, service
126126

127127
db := &Store{
128128
log: log,
129-
containerIDs: map[string]*container.Info{},
130-
namespaces: map[uint32]*container.Info{},
129+
containerIDs: maps.Map2[string, uint32, *container.Info]{},
130+
namespaces: maps.Map2[uint32, uint32, *container.Info]{},
131131
podsByContainer: map[string]*CachedObjMeta{},
132132
containerByPID: map[uint32]*container.Info{},
133133
objectMetaByIP: map[string]*CachedObjMeta{},
@@ -197,13 +197,19 @@ func (s *Store) cacheResourceMetadata(meta *informer.ObjectMeta) *CachedObjMeta
197197
func (s *Store) On(event *informer.Event) error {
198198
switch event.Type {
199199
case informer.EventType_CREATED:
200+
// go func() {
201+
// time.Sleep(10 * time.Second)
200202
s.addObjectMeta(event.Resource)
203+
s.Notify(event)
204+
// }()
201205
case informer.EventType_UPDATED:
202206
s.updateObjectMeta(event.Resource)
207+
s.Notify(event)
203208
case informer.EventType_DELETED:
204209
s.deleteObjectMeta(event.Resource)
210+
s.Notify(event)
205211
}
206-
s.Notify(event)
212+
//s.Notify(event)
207213
return nil
208214
}
209215

@@ -221,8 +227,8 @@ func (s *Store) AddProcess(pid uint32) {
221227

222228
s.access.Lock()
223229
defer s.access.Unlock()
224-
s.namespaces[ifp.PIDNamespace] = &ifp
225-
s.containerIDs[ifp.ContainerID] = &ifp
230+
s.namespaces.Put(ifp.PIDNamespace, pid, &ifp)
231+
s.containerIDs.Put(ifp.ContainerID, pid, &ifp)
226232
s.containerByPID[pid] = &ifp
227233
}
228234

@@ -234,8 +240,8 @@ func (s *Store) DeleteProcess(pid uint32) {
234240
return
235241
}
236242
delete(s.containerByPID, pid)
237-
delete(s.namespaces, info.PIDNamespace)
238-
delete(s.containerIDs, info.ContainerID)
243+
s.namespaces.Delete(info.PIDNamespace, pid)
244+
s.containerIDs.Delete(info.ContainerID, pid)
239245
}
240246

241247
func (s *Store) addObjectMeta(meta *informer.ObjectMeta) {
@@ -279,9 +285,11 @@ func (s *Store) unlockedAddObjectMeta(meta *informer.ObjectMeta) {
279285
for _, c := range meta.Pod.Containers {
280286
s.podsByContainer[c.Id] = cmeta
281287
// TODO: make sure we can handle when the containerIDs is set after this function is triggered
282-
info, ok := s.containerIDs[c.Id]
288+
infos, ok := s.containerIDs[c.Id]
283289
if ok {
284-
s.namespaces[info.PIDNamespace] = info
290+
for pid, info := range infos {
291+
s.namespaces.Put(info.PIDNamespace, pid, info)
292+
}
285293
}
286294
s.containersByOwner.Put(oID, c.Id, c)
287295
}
@@ -315,10 +323,14 @@ func (s *Store) unlockedDeleteObjectMeta(meta *informer.ObjectMeta) {
315323
s.log.Debug("deleting pod from store",
316324
"ips", meta.Ips, "pod", meta.Name, "namespace", meta.Namespace, "containers", meta.Pod.Containers)
317325
for _, c := range meta.Pod.Containers {
318-
info, ok := s.containerIDs[c.Id]
326+
infos, ok := s.containerIDs[c.Id]
319327
if ok {
320-
delete(s.containerIDs, c.Id)
321-
delete(s.namespaces, info.PIDNamespace)
328+
s.containerIDs.DeleteAll(c.Id)
329+
for _, info := range infos {
330+
s.namespaces.DeleteAll(info.PIDNamespace)
331+
// delete all needs to be done only once, we could alternatively delete one by one pid
332+
break
333+
}
322334
}
323335
delete(s.podsByContainer, c.Id)
324336
s.containersByOwner.Delete(oID, c.Id)
@@ -345,14 +357,19 @@ func (s *Store) PodByContainerID(cid string) *CachedObjMeta {
345357
func (s *Store) PodContainerByPIDNs(pidns uint32) (*CachedObjMeta, string) {
346358
s.access.RLock()
347359
defer s.access.RUnlock()
348-
if info, ok := s.namespaces[pidns]; ok {
349-
if om, ok := s.podsByContainer[info.ContainerID]; ok {
350-
oID := fetchOwnerID(om.Meta)
351-
containerName := ""
352-
if containerInfo, ok := s.containersByOwner.Get(oID, info.ContainerID); ok {
353-
containerName = containerInfo.Name
360+
if infos, ok := s.namespaces[pidns]; ok {
361+
for _, info := range infos {
362+
if om, ok := s.podsByContainer[info.ContainerID]; ok {
363+
oID := fetchOwnerID(om.Meta)
364+
containerName := ""
365+
if containerInfo, ok := s.containersByOwner.Get(oID, info.ContainerID); ok {
366+
containerName = containerInfo.Name
367+
}
368+
return om, containerName
354369
}
355-
return om, containerName
370+
// we break here, the namespace is the same for all pids in the container
371+
// we need to check one only
372+
break
356373
}
357374
}
358375
return nil, ""

0 commit comments

Comments
 (0)