Skip to content
This repository has been archived by the owner on Sep 3, 2020. It is now read-only.

Added token refresh #34

Merged
merged 7 commits into from
Jun 6, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
229 changes: 134 additions & 95 deletions pktfwd/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,14 @@ func (m *Manager) run() error {
func (m *Manager) handler(runStart time.Time) (err error) {
// First, we'll handle the case when the user wants to end the program
c := make(chan os.Signal)
defer close(c)
signal.Notify(c, os.Interrupt, os.Kill, syscall.SIGABRT)

// We'll start the routines, and attach them a context
bgCtx, cancel := context.WithCancel(context.Background())
defer cancel()
var routinesErr = make(chan error)
go m.startRoutines(bgCtx, routinesErr, runStart)
routinesErr := m.startRoutines(bgCtx, runStart)
defer close(routinesErr)

// Finally, we'll listen to the different issues
select {
Expand Down Expand Up @@ -128,64 +129,74 @@ func (m *Manager) setBootTime(bootTime time.Time) {
m.uplinkPollingRate = stableUplinkPollingRate
}

func (m *Manager) uplinkRoutine(bgCtx context.Context, errc chan error, runStart time.Time) {
m.ctx.Info("Waiting for uplink packets")
for {
packets, err := wrapper.Receive()
if err != nil {
errc <- errors.Wrap(err, "Uplink packets retrieval error")
return
}
if len(packets) == 0 { // Empty payload => we sleep, then reiterate.
time.Sleep(m.uplinkPollingRate)
continue
}

m.ctx.WithField("NbPackets", len(packets)).Info("Received uplink packets")
if !m.foundBootTime {
// First packets received => find concentrator boot time
err = m.findConcentratorBootTime(packets, runStart)
func (m *Manager) uplinkRoutine(bgCtx context.Context, runStart time.Time) chan error {
errC := make(chan error)
go func() {
m.ctx.Info("Waiting for uplink packets")
defer close(errC)
for {
packets, err := wrapper.Receive()
if err != nil {
m.ctx.WithError(err).Warn("Error when computing concentrator boot time - using packet forwarder run start time")
m.setBootTime(runStart)
errC <- errors.Wrap(err, "Uplink packets retrieval error")
return
}
if len(packets) == 0 { // Empty payload => we sleep, then reiterate.
time.Sleep(m.uplinkPollingRate)
continue
}
}

validPackets := wrapUplinkPayload(m.ctx, packets, m.ignoreCRC, m.netClient.GatewayID())
m.statusMgr.HandledRXBatch(len(validPackets), len(packets))
if len(validPackets) == 0 {
// Packets received, but with invalid CRC - ignoring
time.Sleep(m.uplinkPollingRate)
continue
}
m.ctx.WithField("NbPackets", len(packets)).Info("Received uplink packets")
if !m.foundBootTime {
// First packets received => find concentrator boot time
err = m.findConcentratorBootTime(packets, runStart)
if err != nil {
m.ctx.WithError(err).Warn("Error when computing concentrator boot time - using packet forwarder run start time")
m.setBootTime(runStart)
}
}

validPackets := wrapUplinkPayload(m.ctx, packets, m.ignoreCRC, m.netClient.GatewayID())
m.statusMgr.HandledRXBatch(len(validPackets), len(packets))
if len(validPackets) == 0 {
// Packets received, but with invalid CRC - ignoring
time.Sleep(m.uplinkPollingRate)
continue
}

m.ctx.WithField("NbValidPackets", len(validPackets)).Info("Received valid packets - sending them to the back-end")
m.netClient.SendUplinks(validPackets)
m.ctx.WithField("NbValidPackets", len(validPackets)).Info("Received valid packets - sending them to the back-end")
m.netClient.SendUplinks(validPackets)

select {
case <-bgCtx.Done():
errc <- nil
return
default:
continue
select {
case <-bgCtx.Done():
errC <- nil
return
default:
continue
}
}
}
}()
return errC
}

func (m *Manager) gpsRoutine(bgCtx context.Context, errC chan error) {
m.ctx.Info("Starting GPS update routine")
for {
select {
case <-bgCtx.Done():
return
default:
// The GPS time reference and coordinates are updated at `gpsUpdateRate`
err := wrapper.UpdateGPSData(m.ctx)
if err != nil {
errC <- errors.Wrap(err, "GPS update error")
func (m *Manager) gpsRoutine(bgCtx context.Context) chan error {
errC := make(chan error)
go func() {
m.ctx.Info("Starting GPS update routine")
defer close(errC)
for {
select {
case <-bgCtx.Done():
return
default:
// The GPS time reference and coordinates are updated at `gpsUpdateRate`
err := wrapper.UpdateGPSData(m.ctx)
if err != nil {
errC <- errors.Wrap(err, "GPS update error")
}
}
}
}
}()
return errC
}

func (m *Manager) downlinkRoutine(bgCtx context.Context) {
Expand All @@ -205,57 +216,85 @@ func (m *Manager) downlinkRoutine(bgCtx context.Context) {
}
}

func (m *Manager) statusRoutine(bgCtx context.Context, errC chan error) {
for {
select {
case <-time.After(statusRoutineSleepRate):
rtt, err := m.netClient.Ping()
if err != nil {
errC <- errors.Wrap(err, "Network server health check error")
continue
}

status, err := m.statusMgr.GenerateStatus(rtt)
if err != nil {
errC <- errors.Wrap(err, "Gateway status computation error")
func (m *Manager) statusRoutine(bgCtx context.Context) chan error {
errC := make(chan error)
go func() {
defer close(errC)
for {
select {
case <-time.After(statusRoutineSleepRate):
rtt, err := m.netClient.Ping()
if err != nil {
errC <- errors.Wrap(err, "Network server health check error")
return
}

status, err := m.statusMgr.GenerateStatus(rtt)
if err != nil {
errC <- errors.Wrap(err, "Gateway status computation error")
return
}

err = m.netClient.SendStatus(*status)
if err != nil {
errC <- errors.Wrap(err, "Gateway status transmission error")
return
}
case <-bgCtx.Done():
return

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Idem

}
}
}()
return errC
}

err = m.netClient.SendStatus(*status)
if err != nil {
errC <- errors.Wrap(err, "Gateway status transmission error")
return
}
case <-bgCtx.Done():
return
func (m *Manager) networkRoutine(bgCtx context.Context) chan error {
errC := make(chan error)
go func() {
defer close(errC)
if err := m.netClient.RefreshRoutine(bgCtx); err != nil {
errC <- errors.Wrap(err, "Couldn't refresh account server token")
}
}
}()
return errC
}

func (m *Manager) startRoutines(bgCtx context.Context, err chan error, runTime time.Time) {
var errC = make(chan error)
upCtx, upCancel := context.WithCancel(bgCtx)
downCtx, downCancel := context.WithCancel(bgCtx)
statsCtx, statsCancel := context.WithCancel(bgCtx)
gpsCtx, gpsCancel := context.WithCancel(bgCtx)

go m.uplinkRoutine(upCtx, errC, runTime)
go m.downlinkRoutine(downCtx)
go m.statusRoutine(statsCtx, errC)
if m.isGPS {
go m.gpsRoutine(gpsCtx, errC)
}
select {
case routineErr := <-errC:
err <- routineErr
close(errC)
case <-bgCtx.Done():
err <- nil
}
upCancel()
gpsCancel()
downCancel()
statsCancel()
func (m *Manager) startRoutines(bgCtx context.Context, runTime time.Time) chan error {
err := make(chan error)
go func() {
upCtx, upCancel := context.WithCancel(bgCtx)
downCtx, downCancel := context.WithCancel(bgCtx)
statusCtx, statusCancel := context.WithCancel(bgCtx)
gpsCtx, gpsCancel := context.WithCancel(bgCtx)
networkCtx, networkCancel := context.WithCancel(bgCtx)

go m.downlinkRoutine(downCtx)
uplinkErrors := m.uplinkRoutine(upCtx, runTime)
statusErrors := m.statusRoutine(statusCtx)
networkErrors := m.networkRoutine(networkCtx)
var gpsErrors chan error
if m.isGPS {
gpsErrors = m.gpsRoutine(gpsCtx)
}
select {
case uplinkError := <-uplinkErrors:
err <- errors.Wrap(uplinkError, "Uplink routine error")
case statusError := <-statusErrors:
err <- errors.Wrap(statusError, "Status routine error")
case networkError := <-networkErrors:
err <- errors.Wrap(networkError, "Network routine error")
case gpsError := <-gpsErrors:
err <- errors.Wrap(gpsError, "GPS routine error")
case <-bgCtx.Done():
err <- nil
}
upCancel()
gpsCancel()
downCancel()
statusCancel()
networkCancel()
}()
return err
}

func (m *Manager) shutdown() error {
Expand Down
Loading