-
Notifications
You must be signed in to change notification settings - Fork 30
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
V2 Control Protocol #29
Conversation
We need to decide on how we want to merge this into the repository. I was thinking about making it |
@blakerouse I think we should work the same as our other project, something like:
|
This will work but I've usually seen public proto packages versioned like Go packages, where v1 and v2 coexist together in the tree until one obsoletes the other. Like Go modules, this will make the import path change between versions. You could use Here is Google Cloud Storage using this approach: https://github.com/googleapis/googleapis/tree/master/google/storage |
@cmacknz good point, but at the same time we have a limited set of users, so I am not sure having a v2 worth it. I believe this would reduce our flexibility to make changes in repo/testing and so on. |
I am +1 for using main for v2. |
Fair point, go with whatever makes development easier. |
@jlind23 Had a good idea in our 1-1 to instead of making a whole v2, that we instead try to make this work with the existing v1 instead of a hard breaking change. He had some good points of allowing things like Endpoint Security and APM server to migrate slower to v2. I think this is a valid point and something that we could do in this protocol. The change only adds 1 field to The new additional functions are not streaming protocols that are even required for processes to even use, so if they don't use them that is fine and not breaking. The largest change is the |
@blakerouse The v1,v2 I am worried to run in a hybrid mode, wouldn't that bring more complexity while debugging? |
@ph we will be aware when agent will be running in V2 mode, we just have to properly log it then. |
@ph Really we have 2 different ways we could implement it.
The open question is really do we want to support existing v1 clients when operating as v2, if so #2. Or we don't need too and #1 could be the answer. Or option #3, start with #1 and if we see an issue that requires us to do #2 then we implement the translation layer. |
elastic-agent-client.proto
Outdated
// | ||
// Transactional store is provided to allow multiple key operations to occur before a commit to ensure consistent | ||
// state when multiple keys make up the state of an units persistent state. | ||
rpc StoreBeginTxn(StoreBeginTxnRequest) returns (StoreBeginTxnResponse); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question here: must all the store methods be called within a transaction, or could the Get|Set|Delete be called alone?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Must be in a transaction.
Co-authored-by: Anderson Queiroz <[email protected]>
Co-authored-by: Anderson Queiroz <[email protected]>
Co-authored-by: Anderson Queiroz <[email protected]>
@blakerouse I think what you are proposing in CheckV2, is the appropriate path. 1 to see if we need to support 2. I am considering that most processes would be fine with the new protocol because they are beats. The endpoint is a bit different, but and they do not need to adopt the Artifact service right away.
|
elastic-agent-client.proto
Outdated
// | ||
// Transactional store is provided to allow multiple key operations to occur before a commit to ensure consistent | ||
// state when multiple keys make up the state of an units persistent state. | ||
rpc BeginTxn(StoreBeginTxnRequest) returns (StoreBeginTxnResponse); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe rename BeginTxn -> BeginTx and other Txns to Txs?
similar to postgres go library for example BeginTx
https://github.com/lib/pq/blob/1ef134dc0e0dcdf8097ca347580c15df72ef786e/conn_go18.go#L58
or mysql
https://github.com/go-sql-driver/mysql/blob/bcc459a906419e2890a50fc2c99ea6dd927a88f2/connection.go#L471
"Tx" is more common abbreviation for transactions in other db libaries
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any handling for example for the broken connection after the transaction was started and left uncommitted? @blakerouse
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aleksmaus I would expect the method call to return an error when the method is called?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
depending on the storage backend/library and transaction implementation details, not closing the transaction could leave the transaction locks on the database/tables/rows. should probably be coded to have transaction in the scope of the connection or something and the connection is broken the transaction should be auto rolled back. thinking if these kind of cases. don't completely know if this is relevant for our particular implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aleksmaus the state of the connection will be tracked on the server-side (aka. Elastic Agent), when the connection is lost then the transaction will be discarded server-side. At that point that transaction is broken and they client will need to handle the error and create a new transaction
I also think Elastic Agent should place a hard timeout on transactions, as it would not be good to keep them open for a long period of time. They really should be short in time.
I will go through and adjust it all from "Txn" abbr. to "Tx".
pkg/client/client_v2.go
Outdated
cfgLock sync.RWMutex | ||
obsLock sync.RWMutex | ||
|
||
kickChan chan struct{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe rename kickChan ->kickCh etc for other chan vars. This naming convention seems to be more common in https://github.com/golang/go
pkg/client/client_v2.go
Outdated
kickChan chan struct{} | ||
errChan chan error | ||
unitsChan chan UnitChanged | ||
unitsLock sync.RWMutex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe rename unitsLock -> unitsMu. This naming convention seems to be more common in https://github.com/golang/go
elastic-agent-client.proto
Outdated
// | ||
// Transactional store is provided to allow multiple key operations to occur before a commit to ensure consistent | ||
// state when multiple keys make up the state of an units persistent state. | ||
rpc BeginTxn(StoreBeginTxnRequest) returns (StoreBeginTxnResponse); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any handling for example for the broken connection after the transaction was started and left uncommitted? @blakerouse
// Response from `SetKey`. | ||
message StoreSetKeyResponse { | ||
// Empty at the moment, defined for possibility of adding future return values. | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, I see a TTL maybe or to indicate single-use key.
// | ||
// `txn_id` must be an ID of a transaction that was started with `READ_WRITE`. | ||
// | ||
// Does not error in the case that a key does not exist. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, align with Go's expected behavior.
What's the procedure look like for additions to this protocol in the future? If the changes are purely additive to the control protocol, can things be added to this spec in, say, 8.4 timeline? We are still looking to add a new feature/API to Fleet Server for our next use case, but we don't have a full picture of what's being sent over the wire (aside from, its a big file. Not sure if streamed or pre-chunked. Metadata format, etc) I assume you want this merged without waiting for us? Or is it much preferred to hammer out something on our side and send it in here |
|
||
message LogMessageResponse { | ||
// Empty at the moment, defined for possibility of adding future return values. | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for forward change.
pkg/client/client_v2.go
Outdated
for { | ||
expected, err := checkinClient.Recv() | ||
if err != nil { | ||
if err != io.EOF { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would fail the linter.
if err != io.EOF { | |
if !errors.Is(err, io.EOF) { |
return | ||
} | ||
|
||
t := time.NewTicker(c.minCheckTimeout) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on our morning discussion this would behavior weirdlky with sleep or hibernate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After looking at the golang I do not believe this to be an issue. The ticker will keep ticking, on hibernate it will just pickup where it left off because it uses the monoclock. The ticker also handles a slow reader so it never adds more than one to the channel so until the next is removed from the channel it will not add another one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the verification!
pkg/client/client_v2.go
Outdated
defer checkinWrite.Done() | ||
|
||
if err := c.sendObserved(checkinClient); err != nil { | ||
if err != io.EOF { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if err != io.EOF { | |
if !errors.Is(err, io.EOF) { |
} | ||
} | ||
} | ||
c.units = c.units[:i] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've almost missed the resize.
Token: c.token, | ||
Id: action.Id, | ||
Status: proto.ActionResponse_FAILED, | ||
Result: ActionTypeUnknown, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
Result: ActionErrUnitNotFound, | ||
} | ||
return | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this mean that If an agent receive an osquery
and it receive an osquery action and the unit isn't runnning we would silently ignore the event?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, not we would report back upstream, ok I think its fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added a few questions in the PR, and so far it looks good to me.
I've been having a few difficulties concerning the Keystore API and implementation, but I find it hard without playing in the context of the agent.
Looking at other comments concerning renaming txn
to tx
I believe is a good thing to do.
Result: ActionErrUnmarshableParams, | ||
} | ||
return | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for adding this, +1 don't trust anything.
go func() { | ||
time.Sleep(100 * time.Millisecond) | ||
change.Unit.UpdateState(UnitStateStopped, "Stopped", nil) | ||
}() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we make the "Healthy", "Stopping", "Stopped" and other a public constant?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For testing make it constant? For the library, no I would prefer they provide more detail than that. Like "Connected to elasticsearch(https://localhost:8200)" or something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK good point.
pkg/client/client_v2_test.go
Outdated
defer m.Unlock() | ||
|
||
if !connected { | ||
return fmt.Errorf("server never recieved valid token") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return fmt.Errorf("server never recieved valid token") | |
return fmt.Errorf("server never received a valid token") |
units = append(units, change.Unit) | ||
unitsLock.Unlock() | ||
default: | ||
panic("not implemented") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's use testing's t.Fatal
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its in a seperate goroutine, go vet
will not allow this.
call to (*T).Fatal from a non-test goroutine
pkg/client/client_v2_test.go
Outdated
defer m.Unlock() | ||
|
||
if !gotInit { | ||
return fmt.Errorf("server never recieved valid token") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return fmt.Errorf("server never recieved valid token") | |
return fmt.Errorf("server never received valid token") |
"github.com/elastic/elastic-agent-client/v7/pkg/utils" | ||
) | ||
|
||
// CheckinMinimumTimeout is the amount of time the client must send a new checkin even if the status has not changed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See above discussion concerning hibernation and sleep.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not an issue with the usage of ticker.
// where the action type is unknown. | ||
var ActionTypeUnknown = utils.JSONMustMarshal(map[string]string{ | ||
"error": "action type unknown", | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably have a concrete type implementing MarshalJSON
instead of creating JSON fragment, something like:
ActionTypeErr{ error: "action-type unknown" }
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do.
unitsLock.Unlock() | ||
default: | ||
panic("not implemented") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
t.Fatal
s.store = append(s.store, unitStore) | ||
} | ||
return unitStore | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, we have a full in-memory store :D
|
||
expLock sync.RWMutex | ||
exp UnitState | ||
config string |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would be the string representation of the yaml, this seems asking for having his own type? We we can validate the presence of some fields depending on the unit or that we have valid yaml.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sadly we can have a yaml.RawMessage
because of the way indent works and could break the configuration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it YAML, JSON, or something else. The point of the client is that it doesn't know and doesn't care. So its best to keep it just as a string representation that could then be decoded.
@cmacknz Can you review this too, this directly impact the data plane? |
@aleksmaus @ph Made the requested changes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, Let's get another approval from @elastic/elastic-agent-data-plane
@@ -25,6 +41,8 @@ message ConnInfo { | |||
bytes peer_cert = 5; | |||
// Peer private key. | |||
bytes peer_key = 6; | |||
// Allowed services that spawned process can use. (only used in V2) | |||
repeated ConnInfoServices services = 7; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is it expected that processes do with the services list? Is the intent that each process checks to make sure the services it needs are actually available?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct. A process can only use the services that it is given, if it tries to Elastic Agent will return an error back to the caller.
} | ||
|
||
// Errors returns channel of errors that occurred during communication. | ||
func (c *clientV2) Errors() <-chan error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you document that failing to read Errors()
or UnitChanges()
will cause the client to block internally?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is documented on the interface.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, I expect we will have more feedback once we actually try to use the new V2 client :)
@cmacknz I also change when we try the real world. |
Agreed on real-world testing, I'm honestly kinda planning with the assumption that a lot of the APIs are gonna change over time as we actually use it. |
* Move httpcommon from libbeat * updates to satisfy linter
V2 Control Protocol
This is a proposal on the new design of the control protocol.
Features
Backward Compatibility
This adds a new
CheckinV2
that is to be used when the Elastic Agent is operating in V2 mode. The originalCheckin
will be deprecated at some point in the future (once all tools have migrated off).Closes elastic/elastic-agent#215