diff --git a/cmd/root.go b/cmd/root.go index 65d9ee1875..6b047e3e46 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -38,9 +38,8 @@ var ( log = util.NewLogger("main") cfgFile string - ignoreEmpty = "" // ignore empty keys - ignoreLogs = []string{"log"} // ignore log messages, including warn/error - ignoreMqtt = []string{"log", "auth", "releaseNotes"} // excessive size may crash certain brokers + ignoreLogs = []string{"log"} // ignore log messages, including warn/error + ignoreMqtt = []string{"log", "auth", "releaseNotes"} // excessive size may crash certain brokers ) // rootCmd represents the base command when called without any subcommands @@ -139,8 +138,11 @@ func runRoot(cmd *cobra.Command, args []string) { cache := util.NewCache() go cache.Run(pipe.NewDropper(ignoreLogs...).Pipe(tee.Attach())) - // create web server + // web socket hub socketHub := server.NewSocketHub() + go socketHub.Run(tee.Attach(), cache) + + // web server httpd := server.NewHTTPd(fmt.Sprintf(":%d", conf.Network.Port), socketHub) // metrics @@ -153,9 +155,6 @@ func runRoot(cmd *cobra.Command, args []string) { httpd.Router().PathPrefix("/debug/").Handler(http.DefaultServeMux) } - // publish to UI - go socketHub.Run(pipe.NewDropper(ignoreEmpty).Pipe(tee.Attach()), cache) - // setup values channel valueChan := make(chan util.Param) go tee.Run(valueChan) @@ -193,13 +192,13 @@ func runRoot(cmd *cobra.Command, args []string) { // setup database if err == nil && conf.Influx.URL != "" { - configureInflux(conf.Influx, site, pipe.NewDropper(append(ignoreLogs, ignoreEmpty)...).Pipe(tee.Attach())) + configureInflux(conf.Influx, site, pipe.NewDropper(ignoreLogs...).Pipe(tee.Attach())) } // setup mqtt publisher if err == nil && conf.Mqtt.Broker != "" { publisher := server.NewMQTT(strings.Trim(conf.Mqtt.Topic, "/")) - go publisher.Run(site, pipe.NewDropper(append(ignoreMqtt, ignoreEmpty)...).Pipe(tee.Attach())) + go publisher.Run(site, pipe.NewDropper(ignoreMqtt...).Pipe(tee.Attach())) } // announce on mDNS @@ -213,9 +212,9 @@ func runRoot(cmd *cobra.Command, args []string) { } // setup messaging - var pushChan chan push.Event + var pushChan chan<- push.Event if err == nil { - pushChan, err = configureMessengers(conf.Messaging, valueChan, cache) + pushChan, err = configureMessengers(conf.Messaging, cache) } // run shutdown functions on stop diff --git a/cmd/setup.go b/cmd/setup.go index 134b43444b..c34ffad9bd 100644 --- a/cmd/setup.go +++ b/cmd/setup.go @@ -539,7 +539,7 @@ func configureEEBus(conf map[string]interface{}) error { } // setup messaging -func configureMessengers(conf messagingConfig, valueChan chan util.Param, cache *util.Cache) (chan push.Event, error) { +func configureMessengers(conf messagingConfig, cache *util.Cache) (chan<- push.Event, error) { messageChan := make(chan push.Event, 1) messageHub, err := push.NewHub(conf.Events, cache) @@ -555,7 +555,7 @@ func configureMessengers(conf messagingConfig, valueChan chan util.Param, cache messageHub.Add(impl) } - go messageHub.Run(messageChan, valueChan) + go messageHub.Run(messageChan, cache.Attach()) return messageChan, nil } diff --git a/push/hub.go b/push/hub.go index 5ac51ded73..df7bcb3df3 100644 --- a/push/hub.go +++ b/push/hub.go @@ -72,10 +72,12 @@ func (h *Hub) apply(ev Event, tmpl string) (string, error) { } // Run is the Hub's main publishing loop -func (h *Hub) Run(events <-chan Event, valueChan chan util.Param) { +func (h *Hub) Run(events <-chan Event, valueChan <-chan util.Param) { log := util.NewLogger("push") for ev := range events { + fmt.Println("event", ev) + if len(h.sender) == 0 { continue } @@ -85,11 +87,6 @@ func (h *Hub) Run(events <-chan Event, valueChan chan util.Param) { continue } - // let cache catch up, refs https://github.com/evcc-io/evcc/pull/445 - flushC := util.Flusher() - valueChan <- util.Param{Val: flushC} - <-flushC - title, err := h.apply(ev, definition.Title) if err != nil { log.ERROR.Printf("invalid title template for %s: %v", ev.Event, err) @@ -102,12 +99,12 @@ func (h *Hub) Run(events <-chan Event, valueChan chan util.Param) { continue } - for _, sender := range h.sender { - if strings.TrimSpace(msg) != "" { + if strings.TrimSpace(msg) != "" { + for _, sender := range h.sender { go sender.Send(title, msg) - } else { - log.DEBUG.Printf("did not send empty message template for %s: %v", ev.Event, err) } + } else { + log.DEBUG.Printf("did not send empty message template for %s: %v", ev.Event, err) } } } diff --git a/server/influxdb.go b/server/influxdb.go index 4ccb18c4eb..2e40d344e5 100644 --- a/server/influxdb.go +++ b/server/influxdb.go @@ -12,7 +12,7 @@ import ( "github.com/evcc-io/evcc/core/site" "github.com/evcc-io/evcc/util" influxdb2 "github.com/influxdata/influxdb-client-go/v2" - "github.com/influxdata/influxdb-client-go/v2/api/write" + influxapi "github.com/influxdata/influxdb-client-go/v2/api" influxlog "github.com/influxdata/influxdb-client-go/v2/log" ) @@ -30,11 +30,10 @@ type InfluxConfig struct { // Influx is a influx publisher type Influx struct { sync.Mutex - log *util.Logger - clock clock.Clock - client influxdb2.Client - org string - database string + log *util.Logger + clock clock.Clock + client influxdb2.Client + writer influxapi.WriteAPI } // NewInfluxClient creates new publisher for influx @@ -52,28 +51,32 @@ func NewInfluxClient(url, token, org, user, password, database string) *Influx { // handle error logging in writer influxlog.Log = nil + writer := client.WriteAPI(org, database) + + // log errors + go func() { + for err := range writer.Errors() { + // log async as we're part of the logging loop + go log.ERROR.Println(err) + } + }() + return &Influx{ - log: log, - clock: clock.New(), - client: client, - org: org, - database: database, + log: log, + clock: clock.New(), + client: client, + writer: writer, } } -// pointWriter is the minimal interface for influxdb2 api.Writer -type pointWriter interface { - WritePoint(point *write.Point) -} - // writePoint asynchronously writes a point to influx -func (m *Influx) writePoint(writer pointWriter, key string, fields map[string]any, tags map[string]string) { +func (m *Influx) writePoint(key string, fields map[string]any, tags map[string]string) { m.log.TRACE.Printf("write %s=%v (%v)", key, fields, tags) - writer.WritePoint(influxdb2.NewPoint(key, tags, fields, m.clock.Now())) + m.writer.WritePoint(influxdb2.NewPoint(key, tags, fields, m.clock.Now())) } // writeComplexPoint asynchronously writes a point to influx -func (m *Influx) writeComplexPoint(writer pointWriter, param util.Param, tags map[string]string) { +func (m *Influx) writeComplexPoint(param util.Param, tags map[string]string) { fields := make(map[string]any) switch val := param.Val.(type) { @@ -124,7 +127,7 @@ func (m *Influx) writeComplexPoint(writer pointWriter, param util.Param, tags ma fields["value"] = v tags["id"] = strconv.Itoa(i + 1) - m.writePoint(writer, key, fields, tags) + m.writePoint(key, fields, tags) } } } @@ -132,21 +135,11 @@ func (m *Influx) writeComplexPoint(writer pointWriter, param util.Param, tags ma return } - m.writePoint(writer, param.Key, fields, tags) + m.writePoint(param.Key, fields, tags) } // Run Influx publisher func (m *Influx) Run(site site.API, in <-chan util.Param) { - writer := m.client.WriteAPI(m.org, m.database) - - // log errors - go func() { - for err := range writer.Errors() { - // log async as we're part of the logging loop - go m.log.ERROR.Println(err) - } - }() - // add points to batch for async writing for param := range in { tags := make(map[string]string) @@ -159,7 +152,7 @@ func (m *Influx) Run(site site.API, in <-chan util.Param) { } } - m.writeComplexPoint(writer, param, tags) + m.writeComplexPoint(param, tags) } m.client.Close() diff --git a/util/cache.go b/util/cache.go index 2818d4c344..225d92d107 100644 --- a/util/cache.go +++ b/util/cache.go @@ -3,22 +3,14 @@ package util import ( "fmt" "sync" + "time" ) // Cache is a data store type Cache struct { sync.Mutex - val map[string]Param -} - -// flush is the value type used as parameter for flushing the cache. -// Flushing is implemented by closing the channel. At this time, it is guaranteed -// that the cache has catched up processing all pending messages. -type flush chan struct{} - -// Flusher returns a new flush channel -func Flusher() flush { - return make(flush) + val map[string]Param + recv chan Param } // NewCache creates cache @@ -28,16 +20,21 @@ func NewCache() *Cache { } } +// Attach creates a new receiver channel and attaches it to the tee +func (c *Cache) Attach() <-chan Param { + if c.recv != nil { + panic("cache already attached") + } + + c.recv = make(chan Param, 16) + return c.recv +} + // Run adds input channel's values to cache func (c *Cache) Run(in <-chan Param) { log := NewLogger("cache") for p := range in { - if flushC, ok := p.Val.(flush); ok { - close(flushC) - continue - } - key := p.Key if p.Loadpoint != nil { key = fmt.Sprintf("lp-%d/%s", *p.Loadpoint+1, key) @@ -45,6 +42,15 @@ func (c *Cache) Run(in <-chan Param) { log.TRACE.Printf("%s: %v", key, p.Val) c.Add(p.UniqueID(), p) + + // send downstream after adding to cache + if c.recv != nil { + select { + case c.recv <- p: + case <-time.After(time.Second): + fmt.Println("blocked: cache", p) + } + } } } diff --git a/util/tee.go b/util/tee.go index 7bbae57dbb..b7f9dc42d7 100644 --- a/util/tee.go +++ b/util/tee.go @@ -1,15 +1,12 @@ package util import ( + "fmt" "reflect" "sync" + "time" ) -// TeeAttacher allows attaching a listener to a tee -type TeeAttacher interface { - Attach() <-chan Param -} - // Tee distributes parameters to subscribers type Tee struct { mu sync.Mutex @@ -41,10 +38,14 @@ func (t *Tee) Run(in <-chan Param) { } } - t.mu.Lock() - for _, recv := range t.recv { - recv <- msg + for i, recv := range t.recv { + t.mu.Lock() + select { + case recv <- msg: + case <-time.After(time.Second): + fmt.Println("blocked: tee", i, msg) + } + t.mu.Unlock() } - t.mu.Unlock() } }