From 3804e3c8d90484195cf02ea7c7d59ea08b92313c Mon Sep 17 00:00:00 2001 From: Eric Gourlaouen Date: Mon, 22 May 2017 10:15:42 +0200 Subject: [PATCH 1/5] Added token refresh --- pktfwd/manager.go | 12 +++++- pktfwd/network.go | 101 ++++++++++++++++++++++++++++++++++------------ 2 files changed, 87 insertions(+), 26 deletions(-) diff --git a/pktfwd/manager.go b/pktfwd/manager.go index 9a559431..65c57390 100644 --- a/pktfwd/manager.go +++ b/pktfwd/manager.go @@ -233,16 +233,25 @@ func (m *Manager) statusRoutine(bgCtx context.Context, errC chan error) { } } +func (m *Manager) networkRoutine(bgCtx context.Context, errC chan error) { + err := m.netClient.RefreshRoutine(bgCtx) + if err != nil { + errC <- errors.Wrap(err, "Couldn't refresh account server token") + } +} + func (m *Manager) startRoutines(bgCtx context.Context, err chan error, runTime time.Time) { - var errC = make(chan error) + var errC = make(chan error, 4) upCtx, upCancel := context.WithCancel(bgCtx) downCtx, downCancel := context.WithCancel(bgCtx) statsCtx, statsCancel := context.WithCancel(bgCtx) gpsCtx, gpsCancel := context.WithCancel(bgCtx) + networkCtx, networkCancel := context.WithCancel(bgCtx) go m.uplinkRoutine(upCtx, errC, runTime) go m.downlinkRoutine(downCtx) go m.statusRoutine(statsCtx, errC) + go m.networkRoutine(networkCtx, errC) if m.isGPS { go m.gpsRoutine(gpsCtx, errC) } @@ -257,6 +266,7 @@ func (m *Manager) startRoutines(bgCtx context.Context, err chan error, runTime t gpsCancel() downCancel() statsCancel() + networkCancel() } func (m *Manager) shutdown() error { diff --git a/pktfwd/network.go b/pktfwd/network.go index 6b9d07cd..17b1e082 100644 --- a/pktfwd/network.go +++ b/pktfwd/network.go @@ -3,6 +3,7 @@ package pktfwd import ( + "context" "fmt" "math" "sync" @@ -19,7 +20,10 @@ import ( "google.golang.org/grpc" ) -const uplinksBufferSize = 32 +const ( + tokenRefreshMargin = -2 * time.Minute + uplinksBufferSize = 32 +) type TTNConfig struct { ID string @@ -41,10 +45,12 @@ type TTNClient struct { downlinkStream router.DownlinkStream statusStream router.GatewayStatusStream account *account.Account - id string + runConfig TTNConfig connected bool - streamsMutex sync.Mutex + streamsMutex *sync.Mutex + routerMutex *sync.Mutex token string + tokenExpiry time.Time frequencyPlan string // Communication between internal goroutines stopDownlinkQueue chan bool @@ -64,10 +70,11 @@ type NetworkClient interface { Ping() (time.Duration, error) DefaultLocation() *account.AntennaLocation Stop() + RefreshRoutine(ctx context.Context) error } func (c *TTNClient) GatewayID() string { - return c.id + return c.runConfig.ID } type RouterHealthCheck struct { @@ -93,7 +100,7 @@ func connectToRouter(ctx log.Interface, discoveryClient discovery.Client, router var announcement = *routerAccess - ctx.Info("Connecting to router...") + ctx.WithField("RouterID", router).Info("Connecting to router...") return announcement.Dial() } @@ -109,7 +116,7 @@ func reconnectionDelay(tries uint) time.Duration { return time.Duration(math.Exp(float64(tries)/2.0)) * time.Second } -func (c *TTNClient) tryMainRouterReconnection(gw account.Gateway, discoveryClient discovery.Client, gatewayID string) { +func (c *TTNClient) tryMainRouterReconnection(gw account.Gateway, discoveryClient discovery.Client) { tries := uint(0) for { select { @@ -127,17 +134,19 @@ func (c *TTNClient) tryMainRouterReconnection(gw account.Gateway, discoveryClien } c.ctx.Info("Connection to main router successful") - c.uplinkMutex.Lock() - c.connectToStreams(router.NewRouterClientForGateway(router.NewRouterClient(routerConn), gatewayID, c.token), true) - c.uplinkMutex.Unlock() + c.routerMutex.Lock() c.currentRouterConn = routerConn - c.downlinkStreamChange <- true + c.routerMutex.Unlock() + c.refreshClients() break } } func (c *TTNClient) Ping() (time.Duration, error) { - return connectionHealthCheck(c.currentRouterConn) + c.routerMutex.Lock() + t, err := connectionHealthCheck(c.currentRouterConn) + c.routerMutex.Unlock() + return t, err } func (c *TTNClient) getLowestLatencyRouter(discoveryClient discovery.Client, fallbackRouters []account.GatewayRouter) (*grpc.ClientConn, error) { @@ -193,12 +202,12 @@ func (c *TTNClient) getLowestLatencyRouterFromAnnouncements(discoveryClient disc return routerConn, nil } -func (c *TTNClient) getRouterClient(ctx log.Interface, ttnConfig TTNConfig) (router.RouterClient, error) { - ctx.WithField("Address", ttnConfig.DiscoveryServer).Info("Connecting to TTN discovery server") - discoveryClient, err := discovery.NewClient(ttnConfig.DiscoveryServer, &discovery.Announcement{ +func (c *TTNClient) getRouterClient(ctx log.Interface) (router.RouterClient, error) { + ctx.WithField("Address", c.runConfig.DiscoveryServer).Info("Connecting to TTN discovery server") + discoveryClient, err := discovery.NewClient(c.runConfig.DiscoveryServer, &discovery.Announcement{ ServiceName: "ttn-packet-forwarder", - ServiceVersion: ttnConfig.Version, - Id: c.id, + ServiceVersion: c.runConfig.Version, + Id: c.runConfig.ID, }, func() string { return "" }) if err != nil { return nil, err @@ -208,7 +217,7 @@ func (c *TTNClient) getRouterClient(ctx log.Interface, ttnConfig TTNConfig) (rou defer discoveryClient.Close() var routerConn *grpc.ClientConn - if ttnConfig.Router == "" { + if c.runConfig.Router == "" { gw, err := c.account.FindGateway(c.GatewayID()) if err != nil { return nil, errors.Wrap(err, "Couldn't fetch the gateway information from the account server") @@ -241,18 +250,20 @@ func (c *TTNClient) getRouterClient(ctx log.Interface, ttnConfig TTNConfig) (rou } defer func() { // Wait for the function to be finished, to protect `c.currentRouterConn` - go c.tryMainRouterReconnection(gw, discoveryClient, c.GatewayID()) + go c.tryMainRouterReconnection(gw, discoveryClient) }() } } else { - routerConn, err = connectToRouter(ctx, discoveryClient, ttnConfig.Router) + routerConn, err = connectToRouter(ctx, discoveryClient, c.runConfig.Router) if err != nil { return nil, errors.Wrap(err, "Couldn't connect to user-specified router") } ctx.Info("Connected to router") } + c.routerMutex.Lock() c.currentRouterConn = routerConn + c.routerMutex.Unlock() return router.NewRouterClient(routerConn), nil } @@ -296,35 +307,72 @@ func (c *TTNClient) queueDownlinks() { } } -func (c *TTNClient) fetchAccountServerInfo(gatewayID string) error { - gw, err := c.account.FindGateway(gatewayID) +func (c *TTNClient) fetchAccountServerInfo() error { + gw, err := c.account.FindGateway(c.runConfig.ID) if err != nil { return errors.Wrap(err, "Account server error") } c.antennaLocation = gw.AntennaLocation c.token = gw.Token.AccessToken + c.tokenExpiry = gw.Token.Expiry c.frequencyPlan = gw.FrequencyPlan + c.ctx.WithField("TokenExpiry", c.tokenExpiry).Info("Refreshed account server information") return nil } +func (c *TTNClient) refreshAccountInfo() error { + c.account = account.NewWithKey(c.runConfig.AuthServer, c.runConfig.Key) + return c.fetchAccountServerInfo() +} + +func (c *TTNClient) refreshClients() { + c.uplinkMutex.Lock() + c.routerMutex.Lock() + c.connectToStreams(router.NewRouterClientForGateway(router.NewRouterClient(c.currentRouterConn), c.runConfig.ID, c.token), true) + c.routerMutex.Unlock() + c.uplinkMutex.Unlock() + c.downlinkStreamChange <- true +} + +func (c *TTNClient) RefreshRoutine(ctx context.Context) error { + for { + refreshTime := c.tokenExpiry.Add(tokenRefreshMargin) + c.ctx.Debugf("Preparing to update network clients at %v", refreshTime) + select { + case <-time.After(refreshTime.Sub(time.Now())): + err := c.refreshAccountInfo() + if err != nil { + return err + } + + c.refreshClients() + c.ctx.Debug("Refreshed network connection") + case <-ctx.Done(): + return nil + } + } +} + func CreateNetworkClient(ctx log.Interface, ttnConfig TTNConfig) (NetworkClient, error) { var client = &TTNClient{ - account: account.NewWithKey(ttnConfig.AuthServer, ttnConfig.Key), ctx: ctx, - id: ttnConfig.ID, + runConfig: ttnConfig, downlinkQueue: make(chan *router.DownlinkMessage), uplinkQueue: make(chan *router.UplinkMessage, uplinksBufferSize), + routerMutex: &sync.Mutex{}, + streamsMutex: &sync.Mutex{}, stopDownlinkQueue: make(chan bool), stopUplinkQueue: make(chan bool), } - err := client.fetchAccountServerInfo(ttnConfig.ID) + // Get the first token + err := client.refreshAccountInfo() if err != nil { return nil, err } // Getting a RouterClient object - routerClient, err := client.getRouterClient(ctx, ttnConfig) + routerClient, err := client.getRouterClient(ctx) if err != nil { return nil, err } @@ -409,4 +457,7 @@ func (c *TTNClient) Stop() { default: break } + c.streamsMutex.Lock() + c.disconnectOfStreams() + c.streamsMutex.Unlock() } From 1566f8f0321619baba1e6ad2402e51ae71487535 Mon Sep 17 00:00:00 2001 From: Eric Gourlaouen Date: Mon, 22 May 2017 14:32:39 +0200 Subject: [PATCH 2/5] Introduced routerChanges channel for network changes --- pktfwd/manager.go | 29 ++++++--- pktfwd/network.go | 153 +++++++++++++++++++++++++--------------------- 2 files changed, 103 insertions(+), 79 deletions(-) diff --git a/pktfwd/manager.go b/pktfwd/manager.go index 65c57390..5c8cece1 100644 --- a/pktfwd/manager.go +++ b/pktfwd/manager.go @@ -241,24 +241,37 @@ func (m *Manager) networkRoutine(bgCtx context.Context, errC chan error) { } func (m *Manager) startRoutines(bgCtx context.Context, err chan error, runTime time.Time) { - var errC = make(chan error, 4) + uplinkErrors := make(chan error) + defer close(uplinkErrors) + statusErrors := make(chan error) + defer close(statusErrors) + networkErrors := make(chan error) + defer close(networkErrors) + gpsErrors := make(chan error) + defer close(gpsErrors) + upCtx, upCancel := context.WithCancel(bgCtx) downCtx, downCancel := context.WithCancel(bgCtx) statsCtx, statsCancel := context.WithCancel(bgCtx) gpsCtx, gpsCancel := context.WithCancel(bgCtx) networkCtx, networkCancel := context.WithCancel(bgCtx) - go m.uplinkRoutine(upCtx, errC, runTime) + go m.uplinkRoutine(upCtx, uplinkErrors, runTime) go m.downlinkRoutine(downCtx) - go m.statusRoutine(statsCtx, errC) - go m.networkRoutine(networkCtx, errC) + go m.statusRoutine(statsCtx, statusErrors) + go m.networkRoutine(networkCtx, networkErrors) if m.isGPS { - go m.gpsRoutine(gpsCtx, errC) + go m.gpsRoutine(gpsCtx, gpsErrors) } select { - case routineErr := <-errC: - err <- routineErr - close(errC) + case uplinkError := <-uplinkErrors: + err <- errors.Wrap(uplinkError, "Uplink routine error") + case statusError := <-statusErrors: + err <- errors.Wrap(statusError, "Status routine error") + case networkError := <-networkErrors: + err <- errors.Wrap(networkError, "Network routine error") + case gpsError := <-gpsErrors: + err <- errors.Wrap(gpsError, "GPS routine error") case <-bgCtx.Done(): err <- nil } diff --git a/pktfwd/network.go b/pktfwd/network.go index 17b1e082..cd7f92fc 100644 --- a/pktfwd/network.go +++ b/pktfwd/network.go @@ -37,21 +37,21 @@ type TTNConfig struct { } type TTNClient struct { - antennaLocation *account.AntennaLocation - currentRouterConn *grpc.ClientConn - ctx log.Interface - uplinkStream router.UplinkStream - uplinkMutex sync.Mutex - downlinkStream router.DownlinkStream - statusStream router.GatewayStatusStream - account *account.Account - runConfig TTNConfig - connected bool - streamsMutex *sync.Mutex - routerMutex *sync.Mutex - token string - tokenExpiry time.Time - frequencyPlan string + antennaLocation *account.AntennaLocation + routerConn *grpc.ClientConn + ctx log.Interface + uplinkStream router.UplinkStream + uplinkMutex sync.Mutex + downlinkStream router.DownlinkStream + statusStream router.GatewayStatusStream + account *account.Account + runConfig TTNConfig + connected bool + networkMutex *sync.Mutex + streamsMutex *sync.Mutex + token string + tokenExpiry time.Time + frequencyPlan string // Communication between internal goroutines stopDownlinkQueue chan bool stopUplinkQueue chan bool @@ -59,6 +59,7 @@ type TTNClient struct { downlinkStreamChange chan bool downlinkQueue chan *router.DownlinkMessage uplinkQueue chan *router.UplinkMessage + routerChanges chan func(c *TTNClient) error } type NetworkClient interface { @@ -133,19 +134,19 @@ func (c *TTNClient) tryMainRouterReconnection(gw account.Gateway, discoveryClien continue } + c.routerChanges <- func(t *TTNClient) error { + t.routerConn = routerConn + return nil + } c.ctx.Info("Connection to main router successful") - c.routerMutex.Lock() - c.currentRouterConn = routerConn - c.routerMutex.Unlock() - c.refreshClients() break } } func (c *TTNClient) Ping() (time.Duration, error) { - c.routerMutex.Lock() - t, err := connectionHealthCheck(c.currentRouterConn) - c.routerMutex.Unlock() + c.networkMutex.Lock() + t, err := connectionHealthCheck(c.routerConn) + c.networkMutex.Unlock() return t, err } @@ -202,7 +203,7 @@ func (c *TTNClient) getLowestLatencyRouterFromAnnouncements(discoveryClient disc return routerConn, nil } -func (c *TTNClient) getRouterClient(ctx log.Interface) (router.RouterClient, error) { +func (c *TTNClient) getRouterClient(ctx log.Interface) error { ctx.WithField("Address", c.runConfig.DiscoveryServer).Info("Connecting to TTN discovery server") discoveryClient, err := discovery.NewClient(c.runConfig.DiscoveryServer, &discovery.Announcement{ ServiceName: "ttn-packet-forwarder", @@ -210,7 +211,7 @@ func (c *TTNClient) getRouterClient(ctx log.Interface) (router.RouterClient, err Id: c.runConfig.ID, }, func() string { return "" }) if err != nil { - return nil, err + return err } ctx.Info("Connected to discovery server - getting router address") @@ -220,7 +221,7 @@ func (c *TTNClient) getRouterClient(ctx log.Interface) (router.RouterClient, err if c.runConfig.Router == "" { gw, err := c.account.FindGateway(c.GatewayID()) if err != nil { - return nil, errors.Wrap(err, "Couldn't fetch the gateway information from the account server") + return errors.Wrap(err, "Couldn't fetch the gateway information from the account server") } if gw.Router.ID != "" { @@ -236,35 +237,33 @@ func (c *TTNClient) getRouterClient(ctx log.Interface) (router.RouterClient, err routers, err := discoveryClient.GetAll("router") if err != nil { ctx.WithError(err).Error("Couldn't retrieve routers") - return nil, err + return err } routerConn, err = c.getLowestLatencyRouterFromAnnouncements(discoveryClient, routers) if err != nil { - return nil, errors.Wrap(err, "Couldn't figure out the lowest latency router") + return errors.Wrap(err, "Couldn't figure out the lowest latency router") } } else { routerConn, err = c.getLowestLatencyRouter(discoveryClient, fallbackRouters) if err != nil { - return nil, errors.Wrap(err, "Couldn't figure out the lowest latency router") + return errors.Wrap(err, "Couldn't figure out the lowest latency router") } } defer func() { - // Wait for the function to be finished, to protect `c.currentRouterConn` + // Wait for the function to be finished, to protect `c.routerConn` go c.tryMainRouterReconnection(gw, discoveryClient) }() } } else { routerConn, err = connectToRouter(ctx, discoveryClient, c.runConfig.Router) if err != nil { - return nil, errors.Wrap(err, "Couldn't connect to user-specified router") + return errors.Wrap(err, "Couldn't connect to user-specified router") } ctx.Info("Connected to router") } - c.routerMutex.Lock() - c.currentRouterConn = routerConn - c.routerMutex.Unlock() - return router.NewRouterClient(routerConn), nil + c.routerConn = routerConn + return nil } func (c *TTNClient) Downlinks() <-chan *router.DownlinkMessage { @@ -291,7 +290,9 @@ func (c *TTNClient) queueUplinks() { func (c *TTNClient) queueDownlinks() { c.ctx.Info("Downlinks queuing routine started") + c.streamsMutex.Lock() downlinkStreamChannel := c.downlinkStream.Channel() + c.streamsMutex.Unlock() for { select { case <-c.stopDownlinkQueue: @@ -302,12 +303,15 @@ func (c *TTNClient) queueDownlinks() { c.ctx.Info("Received downlink packet") c.downlinkQueue <- downlink case <-c.downlinkStreamChange: + c.streamsMutex.Lock() downlinkStreamChannel = c.downlinkStream.Channel() + c.streamsMutex.Unlock() } } } func (c *TTNClient) fetchAccountServerInfo() error { + c.account = account.NewWithKey(c.runConfig.AuthServer, c.runConfig.Key) gw, err := c.account.FindGateway(c.runConfig.ID) if err != nil { return errors.Wrap(err, "Account server error") @@ -320,32 +324,18 @@ func (c *TTNClient) fetchAccountServerInfo() error { return nil } -func (c *TTNClient) refreshAccountInfo() error { - c.account = account.NewWithKey(c.runConfig.AuthServer, c.runConfig.Key) - return c.fetchAccountServerInfo() -} - -func (c *TTNClient) refreshClients() { - c.uplinkMutex.Lock() - c.routerMutex.Lock() - c.connectToStreams(router.NewRouterClientForGateway(router.NewRouterClient(c.currentRouterConn), c.runConfig.ID, c.token), true) - c.routerMutex.Unlock() - c.uplinkMutex.Unlock() - c.downlinkStreamChange <- true -} - func (c *TTNClient) RefreshRoutine(ctx context.Context) error { for { refreshTime := c.tokenExpiry.Add(tokenRefreshMargin) c.ctx.Debugf("Preparing to update network clients at %v", refreshTime) select { case <-time.After(refreshTime.Sub(time.Now())): - err := c.refreshAccountInfo() - if err != nil { - return err + c.routerChanges <- func(t *TTNClient) error { + if err := t.fetchAccountServerInfo(); err != nil { + return errors.Wrap(err, "Couldn't update account server info") + } + return nil } - - c.refreshClients() c.ctx.Debug("Refreshed network connection") case <-ctx.Done(): return nil @@ -355,29 +345,36 @@ func (c *TTNClient) RefreshRoutine(ctx context.Context) error { func CreateNetworkClient(ctx log.Interface, ttnConfig TTNConfig) (NetworkClient, error) { var client = &TTNClient{ - ctx: ctx, - runConfig: ttnConfig, - downlinkQueue: make(chan *router.DownlinkMessage), - uplinkQueue: make(chan *router.UplinkMessage, uplinksBufferSize), - routerMutex: &sync.Mutex{}, - streamsMutex: &sync.Mutex{}, - stopDownlinkQueue: make(chan bool), - stopUplinkQueue: make(chan bool), + ctx: ctx, + runConfig: ttnConfig, + downlinkQueue: make(chan *router.DownlinkMessage), + uplinkQueue: make(chan *router.UplinkMessage, uplinksBufferSize), + networkMutex: &sync.Mutex{}, + streamsMutex: &sync.Mutex{}, + stopDownlinkQueue: make(chan bool), + stopUplinkQueue: make(chan bool), + downlinkStreamChange: make(chan bool), + routerChanges: make(chan func(c *TTNClient) error), } + client.networkMutex.Lock() + defer client.networkMutex.Unlock() + // Get the first token - err := client.refreshAccountInfo() + err := client.fetchAccountServerInfo() if err != nil { return nil, err } - // Getting a RouterClient object - routerClient, err := client.getRouterClient(ctx) + // Updating with the initial RouterConn + err = client.getRouterClient(ctx) if err != nil { return nil, err } - client.connectToStreams(router.NewRouterClientForGateway(routerClient, ttnConfig.ID, client.token), false) + client.connectToStreams(router.NewRouterClientForGateway(router.NewRouterClient(client.routerConn), client.runConfig.ID, client.token)) + + go client.watchRouterChanges() go client.queueDownlinks() go client.queueUplinks() @@ -385,15 +382,28 @@ func CreateNetworkClient(ctx log.Interface, ttnConfig TTNConfig) (NetworkClient, return client, nil } -func (c *TTNClient) connectToStreams(routerClient router.RouterClientForGateway, force bool) { +func (c *TTNClient) watchRouterChanges() { + for { + select { + case routerChange := <-c.routerChanges: + if routerChange == nil { // Channel closed, shutting network client down + return + } + c.networkMutex.Lock() + if err := routerChange(c); err != nil { + c.ctx.WithError(err).Warn("Couldn't operate network client change") + } + c.connectToStreams(router.NewRouterClientForGateway(router.NewRouterClient(c.routerConn), c.runConfig.ID, c.token)) + c.networkMutex.Unlock() + c.downlinkStreamChange <- true + } + } +} + +func (c *TTNClient) connectToStreams(routerClient router.RouterClientForGateway) { c.streamsMutex.Lock() defer c.streamsMutex.Unlock() if c.connected { - if !force { - // If the client is already connected and the new routerClient doesn't want to force - // connection change, the function is dropped - return - } c.disconnectOfStreams() } c.uplinkStream = router.NewMonitoredUplinkStream(routerClient) @@ -457,6 +467,7 @@ func (c *TTNClient) Stop() { default: break } + close(c.routerChanges) c.streamsMutex.Lock() c.disconnectOfStreams() c.streamsMutex.Unlock() From 3d01e3da773af428f8e64be06ca07bcec50b54b5 Mon Sep 17 00:00:00 2001 From: Eric Gourlaouen Date: Tue, 23 May 2017 08:40:38 +0200 Subject: [PATCH 3/5] Routines now returning an error channel --- pktfwd/manager.go | 193 ++++++++++++++++++++++++---------------------- pktfwd/network.go | 9 ++- 2 files changed, 105 insertions(+), 97 deletions(-) diff --git a/pktfwd/manager.go b/pktfwd/manager.go index 5c8cece1..20454a04 100644 --- a/pktfwd/manager.go +++ b/pktfwd/manager.go @@ -126,67 +126,75 @@ func (m *Manager) setBootTime(bootTime time.Time) { m.uplinkPollingRate = stableUplinkPollingRate } -func (m *Manager) uplinkRoutine(bgCtx context.Context, errc chan error, runStart time.Time) { - m.ctx.Info("Waiting for uplink packets") - for { - packets, err := wrapper.Receive() - if err != nil { - errc <- errors.Wrap(err, "Uplink packets retrieval error") - return - } - if len(packets) == 0 { // Empty payload => we sleep, then reiterate. - time.Sleep(m.uplinkPollingRate) - continue - } - - m.ctx.WithField("NbPackets", len(packets)).Info("Received uplink packets") - if !m.foundBootTime { - // First packets received => find concentrator boot time - err = m.findConcentratorBootTime(packets, runStart) +func (m *Manager) uplinkRoutine(bgCtx context.Context, runStart time.Time) chan error { + errC := make(chan error) + go func() { + m.ctx.Info("Waiting for uplink packets") + for { + packets, err := wrapper.Receive() if err != nil { - m.ctx.WithError(err).Warn("Error when computing concentrator boot time - using packet forwarder run start time") - m.setBootTime(runStart) + errC <- errors.Wrap(err, "Uplink packets retrieval error") + return + } + if len(packets) == 0 { // Empty payload => we sleep, then reiterate. + time.Sleep(m.uplinkPollingRate) + continue } - } - validPackets, err := wrapUplinkPayload(packets, m.netClient.GatewayID()) - if err != nil { - continue - } - m.statusMgr.HandledRXBatch(len(validPackets), len(packets)) - if len(validPackets) == 0 { - m.ctx.Warn("Packets received, but with invalid CRC - ignoring") - time.Sleep(m.uplinkPollingRate) - continue - } + m.ctx.WithField("NbPackets", len(packets)).Info("Received uplink packets") + if !m.foundBootTime { + // First packets received => find concentrator boot time + err = m.findConcentratorBootTime(packets, runStart) + if err != nil { + m.ctx.WithError(err).Warn("Error when computing concentrator boot time - using packet forwarder run start time") + m.setBootTime(runStart) + } + } - m.ctx.WithField("NbValidPackets", len(validPackets)).Info("Received valid packets - sending them to the back-end") - m.netClient.SendUplinks(validPackets) + validPackets, err := wrapUplinkPayload(packets, m.netClient.GatewayID()) + if err != nil { + continue + } + m.statusMgr.HandledRXBatch(len(validPackets), len(packets)) + if len(validPackets) == 0 { + m.ctx.Warn("Packets received, but with invalid CRC - ignoring") + time.Sleep(m.uplinkPollingRate) + continue + } - select { - case <-bgCtx.Done(): - errc <- nil - return - default: - continue + m.ctx.WithField("NbValidPackets", len(validPackets)).Info("Received valid packets - sending them to the back-end") + m.netClient.SendUplinks(validPackets) + + select { + case <-bgCtx.Done(): + errC <- nil + return + default: + continue + } } - } + }() + return errC } -func (m *Manager) gpsRoutine(bgCtx context.Context, errC chan error) { - m.ctx.Info("Starting GPS update routine") - for { - select { - case <-bgCtx.Done(): - return - default: - // The GPS time reference and coordinates are updated at `gpsUpdateRate` - err := wrapper.UpdateGPSData(m.ctx) - if err != nil { - errC <- errors.Wrap(err, "GPS update error") +func (m *Manager) gpsRoutine(bgCtx context.Context) chan error { + errC := make(chan error) + go func() { + m.ctx.Info("Starting GPS update routine") + for { + select { + case <-bgCtx.Done(): + return + default: + // The GPS time reference and coordinates are updated at `gpsUpdateRate` + err := wrapper.UpdateGPSData(m.ctx) + if err != nil { + errC <- errors.Wrap(err, "GPS update error") + } } } - } + }() + return errC } func (m *Manager) downlinkRoutine(bgCtx context.Context) { @@ -206,62 +214,61 @@ func (m *Manager) downlinkRoutine(bgCtx context.Context) { } } -func (m *Manager) statusRoutine(bgCtx context.Context, errC chan error) { - for { - select { - case <-time.After(statusRoutineSleepRate): - rtt, err := m.netClient.Ping() - if err != nil { - errC <- errors.Wrap(err, "Network server health check error") - continue - } - - status, err := m.statusMgr.GenerateStatus(rtt) - if err != nil { - errC <- errors.Wrap(err, "Gateway status computation error") - return - } - - err = m.netClient.SendStatus(*status) - if err != nil { - errC <- errors.Wrap(err, "Gateway status transmission error") +func (m *Manager) statusRoutine(bgCtx context.Context) chan error { + errC := make(chan error) + go func() { + for { + select { + case <-time.After(statusRoutineSleepRate): + rtt, err := m.netClient.Ping() + if err != nil { + errC <- errors.Wrap(err, "Network server health check error") + continue + } + + status, err := m.statusMgr.GenerateStatus(rtt) + if err != nil { + errC <- errors.Wrap(err, "Gateway status computation error") + return + } + + err = m.netClient.SendStatus(*status) + if err != nil { + errC <- errors.Wrap(err, "Gateway status transmission error") + return + } + case <-bgCtx.Done(): return } - case <-bgCtx.Done(): - return } - } + }() + return errC } -func (m *Manager) networkRoutine(bgCtx context.Context, errC chan error) { - err := m.netClient.RefreshRoutine(bgCtx) - if err != nil { - errC <- errors.Wrap(err, "Couldn't refresh account server token") - } +func (m *Manager) networkRoutine(bgCtx context.Context) chan error { + errC := make(chan error) + go func() { + if err := m.netClient.RefreshRoutine(bgCtx); err != nil { + errC <- errors.Wrap(err, "Couldn't refresh account server token") + } + }() + return errC } func (m *Manager) startRoutines(bgCtx context.Context, err chan error, runTime time.Time) { - uplinkErrors := make(chan error) - defer close(uplinkErrors) - statusErrors := make(chan error) - defer close(statusErrors) - networkErrors := make(chan error) - defer close(networkErrors) - gpsErrors := make(chan error) - defer close(gpsErrors) - upCtx, upCancel := context.WithCancel(bgCtx) downCtx, downCancel := context.WithCancel(bgCtx) - statsCtx, statsCancel := context.WithCancel(bgCtx) + statusCtx, statusCancel := context.WithCancel(bgCtx) gpsCtx, gpsCancel := context.WithCancel(bgCtx) networkCtx, networkCancel := context.WithCancel(bgCtx) - go m.uplinkRoutine(upCtx, uplinkErrors, runTime) go m.downlinkRoutine(downCtx) - go m.statusRoutine(statsCtx, statusErrors) - go m.networkRoutine(networkCtx, networkErrors) + uplinkErrors := m.uplinkRoutine(upCtx, runTime) + statusErrors := m.statusRoutine(statusCtx) + networkErrors := m.networkRoutine(networkCtx) + var gpsErrors chan error if m.isGPS { - go m.gpsRoutine(gpsCtx, gpsErrors) + gpsErrors = m.gpsRoutine(gpsCtx) } select { case uplinkError := <-uplinkErrors: @@ -278,7 +285,7 @@ func (m *Manager) startRoutines(bgCtx context.Context, err chan error, runTime t upCancel() gpsCancel() downCancel() - statsCancel() + statusCancel() networkCancel() } diff --git a/pktfwd/network.go b/pktfwd/network.go index cd7f92fc..9ee210e0 100644 --- a/pktfwd/network.go +++ b/pktfwd/network.go @@ -145,8 +145,8 @@ func (c *TTNClient) tryMainRouterReconnection(gw account.Gateway, discoveryClien func (c *TTNClient) Ping() (time.Duration, error) { c.networkMutex.Lock() + defer c.networkMutex.Unlock() t, err := connectionHealthCheck(c.routerConn) - c.networkMutex.Unlock() return t, err } @@ -392,10 +392,11 @@ func (c *TTNClient) watchRouterChanges() { c.networkMutex.Lock() if err := routerChange(c); err != nil { c.ctx.WithError(err).Warn("Couldn't operate network client change") + } else { + c.connectToStreams(router.NewRouterClientForGateway(router.NewRouterClient(c.routerConn), c.runConfig.ID, c.token)) + c.downlinkStreamChange <- true } - c.connectToStreams(router.NewRouterClientForGateway(router.NewRouterClient(c.routerConn), c.runConfig.ID, c.token)) c.networkMutex.Unlock() - c.downlinkStreamChange <- true } } } @@ -469,6 +470,6 @@ func (c *TTNClient) Stop() { } close(c.routerChanges) c.streamsMutex.Lock() + defer c.streamsMutex.Unlock() c.disconnectOfStreams() - c.streamsMutex.Unlock() } From fce2d62af55f65e40882aa2fe97fffddcdf6cf46 Mon Sep 17 00:00:00 2001 From: Eric Gourlaouen Date: Fri, 26 May 2017 07:44:57 +0200 Subject: [PATCH 4/5] Closing channels at the end of routine --- pktfwd/manager.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/pktfwd/manager.go b/pktfwd/manager.go index a223d432..cbb86351 100644 --- a/pktfwd/manager.go +++ b/pktfwd/manager.go @@ -156,6 +156,7 @@ func (m *Manager) uplinkRoutine(bgCtx context.Context, runStart time.Time) chan validPackets, err := wrapUplinkPayload(packets, m.ignoreCRC, m.netClient.GatewayID()) if err != nil { continue + } m.statusMgr.HandledRXBatch(len(validPackets), len(packets)) if len(validPackets) == 0 { @@ -170,6 +171,7 @@ func (m *Manager) uplinkRoutine(bgCtx context.Context, runStart time.Time) chan select { case <-bgCtx.Done(): errC <- nil + close(errC) return default: continue @@ -186,6 +188,7 @@ func (m *Manager) gpsRoutine(bgCtx context.Context) chan error { for { select { case <-bgCtx.Done(): + close(errC) return default: // The GPS time reference and coordinates are updated at `gpsUpdateRate` @@ -219,13 +222,14 @@ func (m *Manager) downlinkRoutine(bgCtx context.Context) { func (m *Manager) statusRoutine(bgCtx context.Context) chan error { errC := make(chan error) go func() { + defer close(errC) for { select { case <-time.After(statusRoutineSleepRate): rtt, err := m.netClient.Ping() if err != nil { errC <- errors.Wrap(err, "Network server health check error") - continue + return } status, err := m.statusMgr.GenerateStatus(rtt) @@ -252,6 +256,8 @@ func (m *Manager) networkRoutine(bgCtx context.Context) chan error { go func() { if err := m.netClient.RefreshRoutine(bgCtx); err != nil { errC <- errors.Wrap(err, "Couldn't refresh account server token") + close(errC) + return } }() return errC From d268eefc0f4e2fd904932e6ba83b9f42b135711d Mon Sep 17 00:00:00 2001 From: Eric Gourlaouen Date: Mon, 29 May 2017 12:14:19 +0200 Subject: [PATCH 5/5] Adding more & deferred channel closings --- pktfwd/manager.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/pktfwd/manager.go b/pktfwd/manager.go index cbb86351..0c51d78c 100644 --- a/pktfwd/manager.go +++ b/pktfwd/manager.go @@ -95,6 +95,8 @@ func (m *Manager) handler(runStart time.Time) (err error) { m.ctx.Error("Program ended after one of the network links failed") } + close(c) + close(routinesErr) return err } @@ -132,6 +134,7 @@ func (m *Manager) uplinkRoutine(bgCtx context.Context, runStart time.Time) chan errC := make(chan error) go func() { m.ctx.Info("Waiting for uplink packets") + defer close(errC) for { packets, err := wrapper.Receive() if err != nil { @@ -171,7 +174,6 @@ func (m *Manager) uplinkRoutine(bgCtx context.Context, runStart time.Time) chan select { case <-bgCtx.Done(): errC <- nil - close(errC) return default: continue @@ -185,10 +187,10 @@ func (m *Manager) gpsRoutine(bgCtx context.Context) chan error { errC := make(chan error) go func() { m.ctx.Info("Starting GPS update routine") + defer close(errC) for { select { case <-bgCtx.Done(): - close(errC) return default: // The GPS time reference and coordinates are updated at `gpsUpdateRate` @@ -254,10 +256,9 @@ func (m *Manager) statusRoutine(bgCtx context.Context) chan error { func (m *Manager) networkRoutine(bgCtx context.Context) chan error { errC := make(chan error) go func() { + defer close(errC) if err := m.netClient.RefreshRoutine(bgCtx); err != nil { errC <- errors.Wrap(err, "Couldn't refresh account server token") - close(errC) - return } }() return errC