-
Notifications
You must be signed in to change notification settings - Fork 151
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ROX-28530: use the new collector iservice for process information #14652
base: master
Are you sure you want to change the base?
Conversation
Skipping CI for Draft Pull Request. |
Images are ready for the commit at 332ef39. To use with deploy scripts, first |
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #14652 +/- ##
==========================================
- Coverage 48.98% 48.96% -0.02%
==========================================
Files 2549 2550 +1
Lines 187114 187215 +101
==========================================
+ Hits 91651 91668 +17
- Misses 88212 88292 +80
- Partials 7251 7255 +4
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
/test all |
c4eaff1
to
627fe7b
Compare
/test all |
/retest |
func newService(queue chan *sensor.ProcessSignal) Service { | ||
return &serviceImpl{ | ||
queue: queue, | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Anti-pattern waring - this object writes to the queue that is provided from the outside. This makes it unclear who and when should be closing this channel.
Solution: the queue should be created in the constructor of this object and returned as read-only. This object should then be responsible for closing the channel.
type Component interface { | ||
common.SensorComponent | ||
|
||
GetReceiver() chan *sensor.ProcessSignal |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's always make it read-only for external actors.
GetReceiver() chan *sensor.ProcessSignal | |
GetReceiver() <-chan *sensor.ProcessSignal |
common.SensorComponent | ||
|
||
processPipeline Pipeline | ||
indicators chan *message.ExpiringMessage |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't see any writes to this channel here, so let's make it read only.
indicators chan *message.ExpiringMessage | |
indicators <-chan *message.ExpiringMessage |
cmp := &componentImpl{ | ||
processPipeline: pipeline, | ||
indicators: indicators, | ||
receiver: make(chan *sensor.ProcessSignal, maxBufferSize), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need the buffer here?
It looks like we are only reading from this channel, so I am wondering what is the purpose of this channel?
sensor/kubernetes/sensor/sensor.go
Outdated
|
||
var signalSrv signalService.Service | ||
if cfg.signalServiceAuthFuncOverride != nil && cfg.localSensor { | ||
signalSrv = signalService.NewService(signalCmp.GetReceiver(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, so you create a channel in signalCmp
only to pass it to another object so that it can write to it?
This is not a good idea, we must always make sure that:
- There is exactly one writer of the channel. That writer is responsible for creating the channel and closing the channel. Writer returns the channel to the outside always as read-only channel.
- There can be many readers of the channel and none of them should attempt to write to it or close it.
- Writer and readers must be easy to identify.
For more details check https://www.oreilly.com/library/view/concurrency-in-go/9781491941294/ch04.html
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I explained offline, my initial approach was to have a single channel owned by the component and have it read messages coming from collector from both the old and new services. Thanks for taking the time to explain why this would be a dangerous practice in Go, I've made it so now the services each own a channel and those are passed in to the component for handling.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good. I shared a few thoughts, let me know what you think
sensor/common/collector/service.go
Outdated
} | ||
|
||
func NewService(opts ...Option) Service { | ||
return newService() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Options need to be passed here and handle in newService
. Otherwise WithAuthFuncOverride
will be ignored and some integration tests will fail.
type componentImpl struct { | ||
common.SensorComponent | ||
|
||
processPipeline Pipeline | ||
indicators <-chan *message.ExpiringMessage | ||
signalMessages <-chan *storage.ProcessSignal | ||
processMessages <-chan *sensor.ProcessSignal | ||
writer io.Writer | ||
|
||
stopper concurrency.Stopper | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to merge this new component with the Pipeline
? I feel like this new component only acts as a bridge between the service and the Pipeline
component.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The whole code for processing processes (haha) is messy and confusing. From what I can tell, a long long time ago in a galaxy far away, signals were (or were going to be) the generic way for stackrox to handle events, so there are all these abstractions for signals by themselves, but the only signal that ever got implemented was for processes. And the reason I mention this is that we have the same situation in collector, there are a bunch of abstract classes (essentially interfaces) for signals, but they were only ever implemented for processes as well. I'm not a huge fan of this approach of "writing the abstraction first", IMO it leads to over complicated code that can very well end up not being used or used just once, like in our case.
AFAICT, this is the only implementation of the Pipeline
interface in sensor/common/signal/component/pipeline.go
(and gopls agrees with me):
type Pipeline struct { |
So, at this point, I'm not 100% sure how I would handle this. My instinct tells me to move the entire code under sensor/common/processsignal/
into sensor/common/signal
and ditch the interface altogether, but that might be messy. Another possibility would be to move the service and component into sensor/common/processsignal/
and rename the directory to something like process
, also dropping the interface. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd go with the second one, move everything to something like sensor/common/process
(maybe processindicator), remove the interface, and probably have only the SensorComponent
as the handler for processes. So:
- collectorService: to handle the connection with collector.
- processIndicatorComponent: to handle the processIndicator enrichment.
return &storage.ProcessSignal{ | ||
Id: signal.Id, | ||
ContainerId: signal.ContainerId, | ||
Time: signal.CreationTime, | ||
Name: signal.Name, | ||
Args: signal.Args, | ||
ExecFilePath: signal.ExecFilePath, | ||
Pid: signal.Pid, | ||
Uid: signal.Uid, | ||
Gid: signal.Gid, | ||
Scraped: signal.Scraped, | ||
LineageInfo: lineage, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably out of the scope for this PR and maybe this entire effort but I would try to move away from using storage
protos in the API as a general rule.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, I would argue this falls under this issue that is outside of the scope of the current epic.
https://issues.redhat.com/browse/ROX-28532
We'll try to get that prioritized and implemented in the near future, I have to also do some cleanups in the near future and I can probably get this done then.
|
||
func newService() Service { | ||
return &serviceImpl{ | ||
queue: make(chan *sensor.ProcessSignal, maxBufferSize), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should add metrics to this buffer. Also, consider using ScaleSizeOnNonDefault
and an environment variable to configure this max size.
sensor/common/signal/singleton.go
Outdated
queue: make(chan *v1.Signal, maxBufferSize), | ||
indicators: indicators, | ||
processPipeline: pipeline, | ||
queue: make(chan *storage.ProcessSignal, maxBufferSize), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not 100% sure why we had this queue: make(chan *v1.Signal, maxBufferSize)
before because I believe it was not used 🤷 but since now queue: make(chan *storage.ProcessSignal, maxBufferSize)
is used, I'd add metrics and make is configurable with ScaleSizeOnNonDefault
+ and env variable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Asking for both this and the serivice_impl.go
comment, could you provide an example of the metrics you'd like added? I'm not 100% sure how to look for those in the sensor code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure! Here's how we do it for the BufferedStream
. Basically we track how many items were added, removed, and dropped from the buffer.
2c0619b
to
291f3a2
Compare
@Molter73: The following test failed, say
Full PR test history. Your PR dashboard. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
9bf9d85
to
ffa406e
Compare
ffa406e
to
332ef39
Compare
Description
This PR enables sensor to handle process signals coming from collector through the new internal service. In order to achieve this:
In addition to the previously described changes, a couple functions for translating between signal types have been added in order for the new type to be translated to the type central is expecting. In order to prevent this translation we could update the internal service between sensor and central to also use this new type, moving the translation into central itself before dumping to the DB, but this will require additional effort for compatibility between sensor and central which has not been discussed at the moment. Should this change be needed, it will be addressed in a follow up PR.
Sibling PR stackrox/collector#2063
User-facing documentation
Testing and quality
Automated testing
How I validated my change
Run collector and sensor with the new changes and checked the process information is available in the risk tab.