Skip to content

Commit

Permalink
fix: added exception for prosys
Browse files Browse the repository at this point in the history
  • Loading branch information
JeremyTheocharis committed Sep 18, 2024
1 parent 6821dc9 commit 8144e6c
Show file tree
Hide file tree
Showing 2 changed files with 198 additions and 63 deletions.
183 changes: 152 additions & 31 deletions opcua_plugin/opcua.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,7 @@ type OPCUAInput struct {
HeartbeatManualSubscribed bool
HeartbeatNodeId *ua.NodeID
Subscription *opcua.Subscription
ServerInfo ServerInfo
}

// UpdateNodePaths updates the node paths to use the nodeID instead of the browseName
Expand Down Expand Up @@ -576,6 +577,44 @@ func (g *OPCUAInput) createMessageFromValue(dataValue *ua.DataValue, nodeDef Nod
return message
}

func (g *OPCUAInput) Read(ctx context.Context, req *ua.ReadRequest) (*ua.ReadResponse, error) {
resp, err := g.Client.Read(ctx, req)
if err != nil {
g.Log.Errorf("Read failed: %s", err)
// if the error is StatusBadSessionIDInvalid, the session has been closed, and we need to reconnect.
switch {
case errors.Is(err, ua.StatusBadSessionIDInvalid):
_ = g.Close(ctx)
return nil, service.ErrNotConnected
case errors.Is(err, ua.StatusBadCommunicationError):
_ = g.Close(ctx)
return nil, service.ErrNotConnected
case errors.Is(err, ua.StatusBadConnectionClosed):
_ = g.Close(ctx)
return nil, service.ErrNotConnected
case errors.Is(err, ua.StatusBadTimeout):
_ = g.Close(ctx)
return nil, service.ErrNotConnected
case errors.Is(err, ua.StatusBadConnectionRejected):
_ = g.Close(ctx)
return nil, service.ErrNotConnected
case errors.Is(err, ua.StatusBadServerNotConnected):
_ = g.Close(ctx)
return nil, service.ErrNotConnected
}

// return error and stop executing this function.
return nil, err
}

if !errors.Is(resp.Results[0].Status, ua.StatusOK) {
g.Log.Errorf("Status not OK: %v", resp.Results[0].Status)
return nil, fmt.Errorf("status not OK: %v", resp.Results[0].Status)
}

return resp, nil
}

func (g *OPCUAInput) ReadBatchPull(ctx context.Context) (service.MessageBatch, service.AckFunc, error) {
if g.Client == nil {
return nil, nil, errors.New("client is nil")
Expand All @@ -601,37 +640,9 @@ func (g *OPCUAInput) ReadBatchPull(ctx context.Context) (service.MessageBatch, s
TimestampsToReturn: ua.TimestampsToReturnBoth,
}

resp, err := g.Client.Read(ctx, req)
resp, err := g.Read(ctx, req)
if err != nil {
g.Log.Errorf("Read failed: %s", err)
// if the error is StatusBadSessionIDInvalid, the session has been closed, and we need to reconnect.
switch {
case errors.Is(err, ua.StatusBadSessionIDInvalid):
_ = g.Close(ctx)
return nil, nil, service.ErrNotConnected
case errors.Is(err, ua.StatusBadCommunicationError):
_ = g.Close(ctx)
return nil, nil, service.ErrNotConnected
case errors.Is(err, ua.StatusBadConnectionClosed):
_ = g.Close(ctx)
return nil, nil, service.ErrNotConnected
case errors.Is(err, ua.StatusBadTimeout):
_ = g.Close(ctx)
return nil, nil, service.ErrNotConnected
case errors.Is(err, ua.StatusBadConnectionRejected):
_ = g.Close(ctx)
return nil, nil, service.ErrNotConnected
case errors.Is(err, ua.StatusBadServerNotConnected):
_ = g.Close(ctx)
return nil, nil, service.ErrNotConnected
}

// return error and stop executing this function.
return nil, nil, err
}

if !errors.Is(resp.Results[0].Status, ua.StatusOK) {
g.Log.Errorf("Status not OK: %v", resp.Results[0].Status)
}

// Create a message with the node's path as the metadata
Expand Down Expand Up @@ -745,7 +756,11 @@ func (g *OPCUAInput) ReadBatch(ctx context.Context) (msgs service.MessageBatch,
_ = g.Close(ctx)
return nil, nil, service.ErrNotConnected
} else {
g.Log.Warn("No heartbeat message (ServerTime) received for over 10 seconds. This can be normal for certain OPC UA servers (e.g., Prosys OPC UA Simulation). Other messages are being received; continuing operations.")
if g.ServerInfo.ManufacturerName == "Prosys OPC Ltd." {
g.Log.Info("No heartbeat message (ServerTime) received for over 10 seconds. This is normal for your Prosys OPC UA server. Other messages are being received; continuing operations. ")
} else {
g.Log.Warn("No heartbeat message (ServerTime) received for over 10 seconds. Other messages are being received; continuing operations.")
}
}
}

Expand Down Expand Up @@ -1026,7 +1041,17 @@ func (g *OPCUAInput) Connect(ctx context.Context) error {
}

g.Log.Infof("Connected to %s", g.Endpoint)
g.Log.Infof("Please note that browsing large node trees can take a long time (around 5 nodes per second)")

// Get OPC UA server information
serverInfo, err := g.GetOPCUAServerInformation(ctx)
if err != nil {
g.Log.Warnf("Failed to get OPC UA server information: %s", err)
} else {
g.Log.Infof("OPC UA Server Information: %v+", serverInfo)
g.ServerInfo = serverInfo
}

g.Log.Infof("Please note that browsing large node trees can take some time")

g.Client = c

Expand All @@ -1040,6 +1065,102 @@ func (g *OPCUAInput) Connect(ctx context.Context) error {
return nil
}

type ServerInfo struct {
ManufacturerName string
ProductName string
SoftwareVersion string
}

// GetOPCUAServerInformation retrieves the server information from the OPC UA server
// It is available as i=2295
func (g *OPCUAInput) GetOPCUAServerInformation(ctx context.Context) (ServerInfo, error) {

if g.Client == nil {
return ServerInfo{}, errors.New("client is nil")
}
// Fetch ManufacturerName node from i=2263
manufacturerNameNodeID := ua.NewNumericNodeID(0, 2263)
productNameNodeID := ua.NewNumericNodeID(0, 2261)
softwareVersionNodeID := ua.NewNumericNodeID(0, 2264)

nodeChan := make(chan NodeDef, 3)
var wg sync.WaitGroup
errChan := make(chan error, 3)

wg.Add(3)
go browse(ctx, g.Client.Node(manufacturerNameNodeID), "", 0, g.Log, manufacturerNameNodeID.String(), nodeChan, errChan, &wg)
go browse(ctx, g.Client.Node(productNameNodeID), "", 0, g.Log, productNameNodeID.String(), nodeChan, errChan, &wg)
go browse(ctx, g.Client.Node(softwareVersionNodeID), "", 0, g.Log, softwareVersionNodeID.String(), nodeChan, errChan, &wg)
wg.Wait()

close(nodeChan)
close(errChan)

if len(errChan) > 0 {
return ServerInfo{}, <-errChan
}

var nodeList []NodeDef
for node := range nodeChan {
nodeList = append(nodeList, node)
}

if len(nodeList) != 3 {
g.Log.Warn("Could not find OPC UA Server Information")
return ServerInfo{}, errors.New("could not find OPC UA Server Information")
}

var nodesToRead []*ua.ReadValueID
for _, node := range nodeList {
nodesToRead = append(nodesToRead, &ua.ReadValueID{
NodeID: node.NodeID,
})
}

req := &ua.ReadRequest{
MaxAge: 2000,
NodesToRead: nodesToRead,
TimestampsToReturn: ua.TimestampsToReturnBoth,
}

resp, err := g.Read(ctx, req)
if err != nil {
g.Log.Errorf("Read failed: %s", err)
return ServerInfo{}, err
}

if len(resp.Results) != 3 {
g.Log.Errorf("Expected 3 results, got %d", len(resp.Results))
return ServerInfo{}, errors.New("expected 3 results")
}

serverInfo := ServerInfo{}

for i, node := range nodeList {
value := resp.Results[i]
if value == nil || value.Value == nil {
g.Log.Warnf("Received nil in item structure for OPC UA Server Information")
}

message := g.createMessageFromValue(value, node)
if message != nil {
messageBytes, err := message.AsBytes()
if err != nil {
return ServerInfo{}, err
}

if node.NodeID.IntID() == 2263 {
serverInfo.ManufacturerName = string(messageBytes)
} else if node.NodeID.IntID() == 2261 {
serverInfo.ProductName = string(messageBytes)
} else if node.NodeID.IntID() == 2264 {
serverInfo.SoftwareVersion = string(messageBytes)
}
}
}
return serverInfo, nil
}

func (g *OPCUAInput) BrowseAndSubscribeIfNeeded(ctx context.Context) error {

// Create a slice to store the detected nodes
Expand Down
78 changes: 46 additions & 32 deletions opcua_plugin/opcua_simulator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,56 @@ var _ = Describe("Test Against Prosys Simulator", func() {

var endpoint string

Describe("YAML Configuration", func() {
BeforeEach(func() {
endpoint = os.Getenv("TEST_PROSYS_ENDPOINT_URI")
BeforeEach(func() {
endpoint = os.Getenv("TEST_PROSYS_ENDPOINT_URI")

// Check if environment variables are set
if endpoint == "" {
Skip("Skipping test: environment variables not set")

Check notice on line 32 in opcua_plugin/opcua_simulator_test.go

View workflow job for this annotation

GitHub Actions / go-test-s7-plc

BeforeEach 09/18/24 15:03:57.501

Check notice on line 32 in opcua_plugin/opcua_simulator_test.go

View workflow job for this annotation

GitHub Actions / go-test-s7-plc

BeforeEach 09/18/24 15:03:57.501

Check notice on line 32 in opcua_plugin/opcua_simulator_test.go

View workflow job for this annotation

GitHub Actions / go-test-s7-plc

BeforeEach 09/18/24 15:03:57.505

Check notice on line 32 in opcua_plugin/opcua_simulator_test.go

View workflow job for this annotation

GitHub Actions / go-test-s7-plc

BeforeEach 09/18/24 15:03:57.514

Check notice on line 32 in opcua_plugin/opcua_simulator_test.go

View workflow job for this annotation

GitHub Actions / go-test-opcua-plc

BeforeEach 09/18/24 15:03:48.57

Check notice on line 32 in opcua_plugin/opcua_simulator_test.go

View workflow job for this annotation

GitHub Actions / go-test-opcua-plc

BeforeEach 09/18/24 15:03:48.577

Check notice on line 32 in opcua_plugin/opcua_simulator_test.go

View workflow job for this annotation

GitHub Actions / go-test-opcua-plc

BeforeEach 09/18/24 15:03:48.584

Check notice on line 32 in opcua_plugin/opcua_simulator_test.go

View workflow job for this annotation

GitHub Actions / go-test-opcua-plc

BeforeEach 09/18/24 15:03:48.584
return
}

})

Describe("OPC UA Server Information", func() {

It("should connect to the server and retrieve server information", func() {
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()

var nodeIDStrings = []string{"ns=3;i=1003"}
parsedNodeIDs := ParseNodeIDs(nodeIDStrings)

// Check if environment variables are set
if endpoint == "" {
Skip("Skipping test: environment variables not set")
return
input := &OPCUAInput{
Endpoint: endpoint,
Username: "",
Password: "",
NodeIDs: parsedNodeIDs,
SecurityMode: "None",
SecurityPolicy: "None",
}

// Attempt to connect
err := input.Connect(ctx)
Expect(err).NotTo(HaveOccurred())

serverInformation, err := input.GetOPCUAServerInformation(ctx)
Expect(err).NotTo(HaveOccurred())

GinkgoWriter.Printf("Server Information: \n")
GinkgoWriter.Printf("ManufacturerName: %s\n", serverInformation.ManufacturerName)
GinkgoWriter.Printf("ProductName: %s\n", serverInformation.ProductName)
GinkgoWriter.Printf("SoftwareVersion: %s\n", serverInformation.SoftwareVersion)

// Close connection
if input.Client != nil {
err = input.Close(ctx)
Expect(err).NotTo(HaveOccurred())
}
})
})

Describe("YAML Configuration", func() {

When("using a yaml and stream builder", func() {

Expand Down Expand Up @@ -67,7 +106,6 @@ opcua:
var count int64
err = builder.AddConsumerFunc(func(c context.Context, m *service.Message) error {
atomic.AddInt64(&count, 1)
GinkgoWriter.Printf("Received message: %+v\n", m)
return err
})

Expand Down Expand Up @@ -99,16 +137,6 @@ opcua:

var endpoint string

BeforeEach(func() {
endpoint = os.Getenv("TEST_PROSYS_ENDPOINT_URI")

// Check if environment variables are set
if endpoint == "" {
Skip("Skipping test: environment variables not set")
return
}

})
It("should read data correctly", func() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
Expand Down Expand Up @@ -153,20 +181,6 @@ opcua:

Describe("Secure (SignAndEncrypt/Basic256Sha256) Connect", func() {

var endpoint string

BeforeEach(func() {
Skip("Skipping test: prosys will reject all unknown certificates")

endpoint = os.Getenv("TEST_PROSYS_ENDPOINT_URI")

// Check if environment variables are set
if endpoint == "" {
Skip("Skipping test: environment variables not set")
return
}

})
It("should read data correctly", func() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
Expand Down

0 comments on commit 8144e6c

Please sign in to comment.