Skip to content

Commit

Permalink
added proper heartbeat
Browse files Browse the repository at this point in the history
  • Loading branch information
JeremyTheocharis committed Jul 17, 2024
1 parent 207b089 commit c4c67dc
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 15 deletions.
39 changes: 25 additions & 14 deletions opcua_plugin/opcua.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,28 +676,31 @@ func (g *OPCUAInput) ReadBatchSubscribe(ctx context.Context) (service.MessageBat

case _, ok := <-ctx.Done():
if !ok {
g.Log.Errorf("timeout channel was closed")
g.Log.Warnf("subscribe timeout")
return nil, nil, nil
} else {
// Timeout occurred
g.Log.Error("Timeout waiting for response from g.subNotifyChan")
g.Log.Warnf("Timeout waiting for response from g.subNotifyChan")
return nil, nil, nil
}
return nil, nil, errors.New("timeout waiting for response")
}
}

func (g *OPCUAInput) ReadBatch(ctx context.Context) (msgs service.MessageBatch, ackFunc service.AckFunc, err error) {
if g.SubscribeEnabled {
msgs, ackFunc, err = g.ReadBatchSubscribe(ctx)
// Wait for maximum 3 seconds for a response from the subscription channel
// So that this never gets stuck
ctxSubscribe, cancel := context.WithTimeout(ctx, time.Second*3)
defer cancel()

msgs, ackFunc, err = g.ReadBatchSubscribe(ctxSubscribe)
} else {
msgs, ackFunc, err = g.ReadBatchPull(ctx)
}

if err != nil {
g.Log.Error("Received error in ReadBatch: " + err.Error())
return nil, nil, err
}

// Heartbeat logic
var newMsg *service.Message

if g.UseHeartbeat {
// loop through all messages and check if they are heartbeat messages
// if they are, set the last heartbeat message received to the current time
Expand All @@ -708,13 +711,13 @@ func (g *OPCUAInput) ReadBatch(ctx context.Context) (msgs service.MessageBatch,

// if the user subscribed manually to the current time, duplicate it and put it into a new topic
if g.HeartbeatManualSubscribed {
newMsg := msg.DeepCopy()
g.Log.Infof("Got heartbeat message. Duplicating it to a new message.")
newMsg = msg.DeepCopy()
newMsg.MetaSet("opcua_tag_group", "heartbeat")
newMsg.MetaSet("opcua_tag_name", "CurrentTime")
newMsg.MetaSet("opcua_heartbeat_message", "")
msgs = append(msgs, newMsg)

} else { // otherwise rename it
g.Log.Infof("Got heartbeat message. Renaming it.")
msg.MetaSet("opcua_tag_group", "heartbeat")
msg.MetaSet("opcua_tag_name", "CurrentTime")
}
Expand All @@ -726,21 +729,22 @@ func (g *OPCUAInput) ReadBatch(ctx context.Context) (msgs service.MessageBatch,
// benthos will automatically reconnect
if g.LastHeartbeatMessageReceived.Load() < uint32(time.Now().Unix()-10) {
g.Log.Error("Did not receive a heartbeat message for more than 10 seconds. Closing the connection to prevent stale data.")
fmt.Printf("DId not receive a heartbeat message for more than 10 seconds. Closing the connection to prevent stale data.")
_ = g.Close(ctx)
return nil, nil, service.ErrNotConnected
}

}

return
}

func (g *OPCUAInput) Close(ctx context.Context) error {
g.Log.Errorf("Closing OPC UA client...")
if g.Client != nil {
_ = g.Client.Close(ctx)
g.Client = nil
}

g.Log.Infof("OPC UA client closed!")
return nil
}

Expand Down Expand Up @@ -785,6 +789,13 @@ func (g *OPCUAInput) Connect(ctx context.Context) error {
var endpoints []*ua.EndpointDescription
var err error

defer func() {
if err != nil {
g.Log.Warnf("Connect failed with %v, waiting 5 seconds before retrying to prevent overloading the server", err)
time.Sleep(5 * time.Second)
}
}()

// Step 1: Retrieve all available endpoints from the OPC UA server
// Iterate through DiscoveryURLs until we receive a list of all working endpoints including their potential security modes, etc.

Expand Down
2 changes: 1 addition & 1 deletion opcua_plugin/opcua_simulator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1197,7 +1197,7 @@ var _ = Describe("Test Against Microsoft OPC UA simulator", Serial, func() {
ctx3, cancel3 := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel3()
messageBatch, _, err = input.ReadBatch(ctx3)
Expect(err).To(Equal(context.DeadlineExceeded))
Expect(err).NotTo(HaveOccurred())

Expect(len(messageBatch)).To(Equal(0))

Expand Down

0 comments on commit c4c67dc

Please sign in to comment.