Skip to content

Commit

Permalink
first full version (untested)
Browse files Browse the repository at this point in the history
  • Loading branch information
JeremyTheocharis committed Aug 13, 2024
1 parent 936349e commit 5b2c462
Show file tree
Hide file tree
Showing 6 changed files with 179 additions and 84 deletions.
208 changes: 151 additions & 57 deletions modbus_plugin/modbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"math"
"net"
"net/url"
"reflect"
"regexp"
"strconv"
"time"

Expand Down Expand Up @@ -71,6 +73,9 @@ type ModbusDataItemWithAddress struct {
// It holds the configuration necessary to establish a connection with a Modbus PLC,
type ModbusInput struct {

// Benthos
TimeBetweenReads time.Duration // The time between two reads of a Modbus device. Useful if you want to read the device every x seconds. Defaults to 1s. Not to be confused with TimeBetweenRequests.

// Standard
Controller string // e.g., "tcp://localhost:502"
TransmissionMode string // Can be "TCP" (default), "RTUOverTCP", "ASCIIOverTCP"\
Expand Down Expand Up @@ -103,6 +108,7 @@ type ModbusInput struct {
PauseAfterConnect time.Duration // PauseAfterConnect is the pause after connect delays the first request by the specified time. This might be necessary for (slow) devices.
OneRequestPerField bool // OneRequestPerField sends each field in a separate request. This might be necessary for some devices. see https://github.com/influxdata/telegraf/issues/12071.
ReadCoilsStartingAtZero bool // ReadCoilsStartingAtZero reads coils starting at address 0 instead of 1. This might be necessary for some devices. See https://github.com/influxdata/telegraf/issues/8905
TimeBetweenRequests time.Duration // TimeBetweenRequests is the time between two requests to the same device. Useful to avoid flooding the device. Defaults to 0s. Not to be confused with TimeBetweenReads.

// StringRegisterLocation is the String byte-location in registers AFTER byte-order conversion.
// Some device (e.g. EM340) place the string byte in only the upper
Expand Down Expand Up @@ -152,6 +158,7 @@ var errAddressOverflow = errors.New("address overflow")
var ModbusConfigSpec = service.NewConfigSpec().
Summary("Creates an input that reads data from Modbus devices. Created & maintained by the United Manufacturing Hub. About us: www.umh.app").
Description("This input plugin enables Benthos to read data directly from Modbus devices using the Modbus protocol.").
Field(service.NewDurationField("timeBetweenReads").Description("The time between two reads of a Modbus device. Useful if you want to read the device every x seconds. Not to be confused with TimeBetweenRequests.").Default("1s")).
Field(service.NewStringField("controller").Description("The Modbus controller address, e.g., 'tcp://localhost:502'").Default("tcp://localhost:502")).
Field(service.NewStringField("transmissionMode").Description("Transmission mode: 'TCP', 'RTUOverTCP', or 'ASCIIOverTCP'").Default("TCP")).
Field(service.NewIntField("slaveID").Description("Slave ID of the Modbus device").Default(1)).
Expand All @@ -165,7 +172,8 @@ var ModbusConfigSpec = service.NewConfigSpec().
service.NewDurationField("pauseAfterConnect").Description("Pause after connect to delay the first request").Default("0s"),
service.NewBoolField("oneRequestPerField").Description("Send each field in a separate request").Default(false),
service.NewBoolField("readCoilsStartingAtZero").Description("Read coils starting at address 0 instead of 1").Default(false),
service.NewStringField("stringRegisterLocation").Description("String byte-location in registers: 'lower', 'upper', or empty for both").Default("")).
service.NewStringField("stringRegisterLocation").Description("String byte-location in registers: 'lower', 'upper', or empty for both").Default(""),
service.NewDurationField("timeBetweenRequests").Description("imeBetweenRequests is the time between two requests to the same device. Useful to avoid flooding the device. Not to be confused with TimeBetweenReads.").Default("0s")).
Description("Modbus workarounds. Required by some devices to work correctly. Should be left alone by default and must not be changed unless necessary.")).
Field(service.NewObjectListField("addresses",
service.NewStringField("name").Description("Field name"),
Expand Down Expand Up @@ -442,7 +450,7 @@ func newModbusInput(conf *service.ParsedConfig, mgr *service.Resources) (service
m.Log.Infof("Got %d request(s) touching %d coil registers for %d fields (slave %d)",
len(m.requestSet.coil), nCoilRegs, nCoilFields, m.SlaveID)

// Now setup the modbus client
// Now set up the modbus client
u, err := url.Parse(m.Controller)
if err != nil {
return nil, err
Expand All @@ -457,15 +465,15 @@ func newModbusInput(conf *service.ParsedConfig, mgr *service.Resources) (service
switch m.TransmissionMode {
case "", "auto", "TCP":
handler := modbus.NewTCPClientHandler(host + ":" + port)
handler.Timeout = time.Duration(m.Timeout)
handler.Timeout = m.Timeout
m.handler = handler
case "RTUoverTCP":
handler := modbus.NewRTUOverTCPClientHandler(host + ":" + port)
handler.Timeout = time.Duration(m.Timeout)
handler.Timeout = m.Timeout
m.handler = handler
case "ASCIIoverTCP":
handler := modbus.NewASCIIOverTCPClientHandler(host + ":" + port)
handler.Timeout = time.Duration(m.Timeout)
handler.Timeout = m.Timeout
m.handler = handler
default:
return nil, fmt.Errorf("invalid transmission mode %q for %q", m.TransmissionMode, u.Scheme)
Expand Down Expand Up @@ -631,7 +639,7 @@ func (m *ModbusInput) newTag(item ModbusDataItemWithAddress) (modbusTag, error)
func (m *ModbusInput) Connect(ctx context.Context) error {
err := m.handler.Connect()
if err != nil {
m.Log.Errorf("Failed to connect to Modbus device at %s: %v", m.TcpDevice, err)
m.Log.Errorf("Failed to connect to Modbus device at %s: %v", m.Controller, err)
return err
}

Expand All @@ -646,59 +654,39 @@ func (m *ModbusInput) Connect(ctx context.Context) error {

func (m *ModbusInput) ReadBatch(ctx context.Context) (service.MessageBatch, service.AckFunc, error) {
if m.handler == nil {
return nil, nil, fmt.Errorf("Modbus client is not initialized")
return nil, nil, fmt.Errorf("modbus client is not initialized")
}

msgs := make(service.MessageBatch, 0)

m.Log.Debugf("Reading slave %d for %s...", m.SlaveID, m.Controller)
if err := m.readSlaveData(m.SlaveID, m.requestSet); err != nil {
msgBatch, err := m.readSlaveData(m.SlaveID, m.requestSet)
if err != nil {
m.Log.Errorf("slave %d encountered an error: %v", m.SlaveID, err)
var mbErr *modbus.Error
if !errors.As(err, &mbErr) || mbErr.ExceptionCode != modbus.ExceptionCodeServerDeviceBusy {
m.Log.Errorf("slave %d encountered an error: %v", m.SlaveID, mbErr)
return nil, nil, err
}
}
timestamp := time.Now()

tags := map[string]string{
"name": m.Name,
"type": cCoils,
"slave_id": strconv.Itoa(int(slaveID)),
}
m.collectFields(acc, timestamp, tags, requests.coil)

tags["type"] = cDiscreteInputs
m.collectFields(acc, timestamp, tags, requests.discrete)

tags["type"] = cHoldingRegisters
m.collectFields(acc, timestamp, tags, requests.holding)

tags["type"] = cInputRegisters
m.collectFields(acc, timestamp, tags, requests.input)

time.Sleep(time.Second)

return msgs, func(ctx context.Context, err error) error {
return msgBatch, func(ctx context.Context, err error) error {
return nil
}, nil
}

func (m *ModbusInput) readSlaveData(slaveID byte, requests requestSet) error {
func (m *ModbusInput) readSlaveData(slaveID byte, requests requestSet) (msgBatch service.MessageBatch, err error) {
m.handler.SetSlave(slaveID)

for retry := 0; retry < m.BusyRetries; retry++ {
err := m.gatherTags(requests)
msgBatch, err = m.gatherTags(requests)
if err == nil {
// Reading was successful
return nil
return msgBatch, nil
}

// Exit in case a non-recoverable error occurred
var mbErr *modbus.Error
if !errors.As(err, &mbErr) || mbErr.ExceptionCode != modbus.ExceptionCodeServerDeviceBusy {
return err
return nil, err
}

// Wait some time and try again reading the slave.
Expand All @@ -708,23 +696,108 @@ func (m *ModbusInput) readSlaveData(slaveID byte, requests requestSet) error {
return m.gatherTags(requests)
}

func (m *ModbusInput) createMessageFromValue(item modbusTag, v []byte) *service.Message {
func (m *ModbusInput) createMessageFromValue(item modbusTag, rawValue []byte, registerName string) *service.Message {

value := item.converter(rawValue)

b := make([]byte, 0)
var tagType string

switch v := value.(type) {
case float32:
b = append(b, []byte(strconv.FormatFloat(float64(v), 'f', -1, 32))...)
tagType = "number"
case float64:
b = append(b, []byte(strconv.FormatFloat(v, 'f', -1, 64))...)
tagType = "number"
case string:
b = append(b, []byte(v)...)
tagType = "string"
case bool:
b = append(b, []byte(strconv.FormatBool(v))...)
tagType = "bool"
case int:
b = append(b, []byte(strconv.Itoa(v))...)
tagType = "number"
case int8:
b = append(b, []byte(strconv.FormatInt(int64(v), 10))...)
tagType = "number"
case int16:
b = append(b, []byte(strconv.FormatInt(int64(v), 10))...)
tagType = "number"
case int32:
b = append(b, []byte(strconv.FormatInt(int64(v), 10))...)
tagType = "number"
case int64:
b = append(b, []byte(strconv.FormatInt(v, 10))...)
tagType = "number"
case uint:
b = append(b, []byte(strconv.FormatUint(uint64(v), 10))...)
tagType = "number"
case uint8:
b = append(b, []byte(strconv.FormatUint(uint64(v), 10))...)
tagType = "number"
case uint16:
b = append(b, []byte(strconv.FormatUint(uint64(v), 10))...)
tagType = "number"
case uint32:
b = append(b, []byte(strconv.FormatUint(uint64(v), 10))...)
tagType = "number"
case uint64:
b = append(b, []byte(strconv.FormatUint(v, 10))...)
tagType = "number"
default:
m.Log.Errorf("Unknown type %T for item %s: %v", v, item.name, v)
}

if b == nil {
m.Log.Errorf("Could not create benthos message as payload is empty for item %s in register %s: %v", item.name, registerName, b)
return nil
}

// Store the original datatype as metadata
originalDataType := reflect.TypeOf(value).String()

message := service.NewMessage(item.converter(v))
message := service.NewMessage(b)
message.MetaSet("modbus_tag_name", sanitize(item.name)) // This is the tag name without special characters
message.MetaSet("modbus_tag_name_original", item.name) // This is the tag name without any changes
message.MetaSet("modbus_tag_datatype", originalDataType) // This is the original data type in Modbus
message.MetaSet("modbus_tag_datatype_json", tagType) // This is the data type for JSONs. Either number, bool or string
message.MetaSet("modbus_tag_address", strconv.Itoa(int(item.address))) // This is the address of the tag
message.MetaSet("modbus_tag_length", strconv.Itoa(int(item.length))) // This is the length of the tag
message.MetaSet("modbus_tag_register", registerName) // This is the register where the tag is located

return message
}

func (m *ModbusInput) gatherTags(requests requestSet) error {
if err := m.gatherRequestsCoil(requests.coil); err != nil {
return err
func sanitize(s string) string {
re := regexp.MustCompile(`[^a-zA-Z0-9_-]`)
return re.ReplaceAllString(s, "_")
}

func (m *ModbusInput) gatherTags(requests requestSet) (service.MessageBatch, error) {
msgBatchCoil, err := m.gatherRequestsCoil(requests.coil)
if err != nil {
return nil, err
}
if err := m.gatherRequestsDiscrete(requests.discrete); err != nil {
return err
msgBatchDiscrete, err := m.gatherRequestsDiscrete(requests.discrete)
if err != nil {
return nil, err
}
if err := m.gatherRequestsHolding(requests.holding); err != nil {
return err
msgBatchHolding, err := m.gatherRequestsHolding(requests.holding)
if err != nil {
return nil, err
}
msgBatchInput, err := m.gatherRequestsInput(requests.input)
if err != nil {
return nil, err
}
return m.gatherRequestsInput(requests.input)

msgBatch := append(msgBatchCoil, msgBatchDiscrete...)
msgBatch = append(msgBatch, msgBatchHolding...)
msgBatch = append(msgBatch, msgBatchInput...)

return msgBatch, nil
}

func (m *ModbusInput) gatherRequestsCoil(requests []request) (service.MessageBatch, error) {
Expand All @@ -749,7 +822,7 @@ func (m *ModbusInput) gatherRequestsCoil(requests []request) (service.MessageBat
//request.fields[i].value = field.converter([]byte{v})
m.Log.Debugf(" field %s with bit %d @ byte %d: %v --> %v", field.name, bit, idx, v, request.fields[i].value)

message := m.createMessageFromValue(field, v)
message := m.createMessageFromValue(field, []byte{v}, "coil")
if message != nil {
msgs = append(msgs, message)
}
Expand All @@ -759,12 +832,14 @@ func (m *ModbusInput) gatherRequestsCoil(requests []request) (service.MessageBat
return msgs, nil
}

func (m *ModbusInput) gatherRequestsDiscrete(requests []request) error {
func (m *ModbusInput) gatherRequestsDiscrete(requests []request) (service.MessageBatch, error) {
msgs := service.MessageBatch{}

for _, request := range requests {
m.Log.Debugf("trying to read discrete@%v[%v]...", request.address, request.length)
bytes, err := m.client.ReadDiscreteInputs(request.address, request.length)
if err != nil {
return err
return nil, err
}

m.Log.Debugf("got discrete@%v[%v]: %v", request.address, request.length, bytes)
Expand All @@ -776,20 +851,27 @@ func (m *ModbusInput) gatherRequestsDiscrete(requests []request) error {
bit := offset % 8

v := (bytes[idx] >> bit) & 0x01
request.fields[i].value = field.converter([]byte{v})
//request.fields[i].value = field.converter([]byte{v})
m.Log.Debugf(" field %s with bit %d @ byte %d: %v --> %v", field.name, bit, idx, v, request.fields[i].value)

message := m.createMessageFromValue(field, []byte{v}, "discrete")
if message != nil {
msgs = append(msgs, message)
}
}

}
return nil
return msgs, nil
}

func (m *ModbusInput) gatherRequestsHolding(requests []request) error {
func (m *ModbusInput) gatherRequestsHolding(requests []request) (service.MessageBatch, error) {
msgs := service.MessageBatch{}

for _, request := range requests {
m.Log.Debugf("trying to read holding@%v[%v]...", request.address, request.length)
bytes, err := m.client.ReadHoldingRegisters(request.address, request.length)
if err != nil {
return err
return nil, err
}

m.Log.Debugf("got holding@%v[%v]: %v", request.address, request.length, bytes)
Expand All @@ -801,20 +883,27 @@ func (m *ModbusInput) gatherRequestsHolding(requests []request) error {
length := 2 * uint32(field.length) // field length is in registers a 16bit

// Convert the actual value
request.fields[i].value = field.converter(bytes[offset : offset+length])
//request.fields[i].value = field.converter(bytes[offset : offset+length])
m.Log.Debugf(" field %s with offset %d with len %d: %v --> %v", field.name, offset, length, bytes[offset:offset+length], request.fields[i].value)

message := m.createMessageFromValue(field, bytes[offset:offset+length], "holding")
if message != nil {
msgs = append(msgs, message)
}
}

}
return nil
return msgs, nil
}

func (m *ModbusInput) gatherRequestsInput(requests []request) error {
func (m *ModbusInput) gatherRequestsInput(requests []request) (service.MessageBatch, error) {
msgs := service.MessageBatch{}

for _, request := range requests {
m.Log.Debugf("trying to read input@%v[%v]...", request.address, request.length)
bytes, err := m.client.ReadInputRegisters(request.address, request.length)
if err != nil {
return err
return nil, err
}

m.Log.Debugf("got input@%v[%v]: %v", request.address, request.length, bytes)
Expand All @@ -826,12 +915,17 @@ func (m *ModbusInput) gatherRequestsInput(requests []request) error {
length := 2 * uint32(field.length) // field length is in registers a 16bit

// Convert the actual value
request.fields[i].value = field.converter(bytes[offset : offset+length])
//request.fields[i].value = field.converter(bytes[offset : offset+length])
m.Log.Debugf(" field %s with offset %d with len %d: %v --> %v", field.name, offset, length, bytes[offset:offset+length], request.fields[i].value)

message := m.createMessageFromValue(field, bytes[offset:offset+length], "holding")
if message != nil {
msgs = append(msgs, message)
}
}

}
return nil
return msgs, nil
}

func (m *ModbusInput) Close(ctx context.Context) error {
Expand Down
Loading

0 comments on commit 5b2c462

Please sign in to comment.