Skip to content
This repository has been archived by the owner on Sep 3, 2020. It is now read-only.

Commit

Permalink
Merge pull request #42 from TheThingsNetwork/develop
Browse files Browse the repository at this point in the history
Packet forwarder release v2.0.2
  • Loading branch information
johanstokking authored Jun 6, 2017
2 parents 2ac53a3 + 1074390 commit b64203b
Show file tree
Hide file tree
Showing 12 changed files with 290 additions and 174 deletions.
2 changes: 1 addition & 1 deletion .make/halv1/build.make
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ else
endif

# Build the HAL
hal.build: lora_gateway/libloragw/libloragw.a
hal.build: lora_gateway/libloragw/inc/$(PLATFORM).h lora_gateway/libloragw/libloragw.a

### library.cfg configuration file processing

Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ The configuration file generated by `packet-forwarder configure` is stored at `$
* `--verbose` or `-v`: Show debugging information (optional)
* `--downlink-send-margin`: Change downlink send margin, in milliseconds (optional ; [see documentation](docs/IMPLEMENTATION/DOWNLINKS.md))
* `--gps-path`: Set GPS path to enable GPS support (optional ; default: empty)
* `--ignore-crc`: Ignore CRC check, and send uplink packets upstream even if they are CRC-invalid.

## <a name="contribute"></a>Contributing

Expand Down
7 changes: 7 additions & 0 deletions cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ var startCmd = &cobra.Command{
}
}

ignoreCRC := config.GetBool("ignore-crc")
if ignoreCRC {
ctx.Warn("CRC check disabled, packets with invalid CRC will be sent upstream")
}

ttnConfig := &pktfwd.TTNConfig{
ID: config.GetString("id"),
Key: config.GetString("key"),
Expand All @@ -67,6 +72,7 @@ var startCmd = &cobra.Command{
Router: config.GetString("router"),
Version: config.GetString("version"),
DownlinksSendMargin: time.Duration(config.GetInt64("downlink-send-margin")) * time.Millisecond,
IgnoreCRC: ignoreCRC,
}

conf, err := pktfwd.FetchConfig(ctx, ttnConfig)
Expand All @@ -92,6 +98,7 @@ func init() {
startCmd.PersistentFlags().String("run-trace", "", "File to which write the runtime trace of the packet forwarder. Can later be read with `go tool trace <trace_file>`.")
startCmd.PersistentFlags().Int("reset-pin", 0, "GPIO pin associated to the reset pin of the board")
startCmd.PersistentFlags().BoolP("verbose", "v", false, "Show debug logs")
startCmd.PersistentFlags().Bool("ignore-crc", false, "Send packets upstream even if CRC validation is incorrect")

viper.BindPFlags(startCmd.PersistentFlags())

Expand Down
2 changes: 1 addition & 1 deletion docs/INSTALL_INSTRUCTIONS/IMST_RPI.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ To follow this manual, you must have a Raspberry Pi with an IMST ic880a board, c

## Download and run

1. Download the [Raspberry Pi + IMST build](https://ttnreleases.blob.core.windows.net/packet_forwarder/master/imst-rpi-pktfwd.zip) of the packet forwarder.
1. Download the [Raspberry Pi + IMST build](https://ttnreleases.blob.core.windows.net/packet-forwarder/master/imst-rpi-pktfwd.tar.gz) of the packet forwarder.

2. Configure the packet forwarder:

Expand Down
2 changes: 1 addition & 1 deletion docs/INSTALL_INSTRUCTIONS/KERLINK.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ Before installing the TTN Packet Forwarder, we recommend **updating the Station

*Note: Before installing the new packet forwarder, make sure you removed any other packet forwarder installed on your Kerlink IoT Station. If you don't have any important files stored on the disk, the safest way to make sure of that is to update the Station to the latest firmware available, which will reset the file system in the process.*

1. Download the [Kerlink build](https://ttnreleases.blob.core.windows.net/packet_forwarder/master/kerlink-iot-station-pktfwd.zip) of the packet forwarder.
1. Download the [Kerlink build](https://ttnreleases.blob.core.windows.net/packet-forwarder/master/kerlink-iot-station-pktfwd.tar.gz) of the packet forwarder.

2. In the folder, you will find several files: a `create-package.sh` script and a binary file, that we will call `packet-forwarder`.

Expand Down
2 changes: 1 addition & 1 deletion docs/INSTALL_INSTRUCTIONS/MULTITECH.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

*Note: Before installing the new packet forwarder, make sure you removed any other packet forwarder installed on your Multitech Conduit.*

1. Download the [Multitech Conduit package](https://ttnreleases.blob.core.windows.net/packet_forwarder/master/multitech-conduit-pktfwd.tar.gz) of the packet forwarder.
1. Download the [Multitech Conduit package](https://ttnreleases.blob.core.windows.net/packet-forwarder/master/multitech-conduit-pktfwd.tar.gz) of the packet forwarder.

2. In the archive, you will find an `create-package.sh` file, a `multitech-installer.sh`, as well as the executable binary. Execute the `create-package.sh` file, with the binary as a first argument:

Expand Down
235 changes: 137 additions & 98 deletions pktfwd/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type Manager struct {
bootTimeSetters multipleBootTimeSetter
foundBootTime bool
isGPS bool
ignoreCRC bool
downlinksSendMargin time.Duration
}

Expand All @@ -55,6 +56,7 @@ func NewManager(ctx log.Interface, conf util.Config, netClient NetworkClient, gp
// At the beginning, until we get our first uplinks, we keep a high polling rate to the concentrator
uplinkPollingRate: initUplinkPollingRate,
downlinksSendMargin: runConfig.DownlinksSendMargin,
ignoreCRC: runConfig.IgnoreCRC,
}
}

Expand All @@ -77,13 +79,14 @@ func (m *Manager) run() error {
func (m *Manager) handler(runStart time.Time) (err error) {
// First, we'll handle the case when the user wants to end the program
c := make(chan os.Signal)
defer close(c)
signal.Notify(c, os.Interrupt, os.Kill, syscall.SIGABRT)

// We'll start the routines, and attach them a context
bgCtx, cancel := context.WithCancel(context.Background())
defer cancel()
var routinesErr = make(chan error)
go m.startRoutines(bgCtx, routinesErr, runStart)
routinesErr := m.startRoutines(bgCtx, runStart)
defer close(routinesErr)

// Finally, we'll listen to the different issues
select {
Expand Down Expand Up @@ -126,67 +129,74 @@ func (m *Manager) setBootTime(bootTime time.Time) {
m.uplinkPollingRate = stableUplinkPollingRate
}

func (m *Manager) uplinkRoutine(bgCtx context.Context, errc chan error, runStart time.Time) {
m.ctx.Info("Waiting for uplink packets")
for {
packets, err := wrapper.Receive()
if err != nil {
errc <- errors.Wrap(err, "Uplink packets retrieval error")
return
}
if len(packets) == 0 { // Empty payload => we sleep, then reiterate.
time.Sleep(m.uplinkPollingRate)
continue
}

m.ctx.WithField("NbPackets", len(packets)).Info("Received uplink packets")
if !m.foundBootTime {
// First packets received => find concentrator boot time
err = m.findConcentratorBootTime(packets, runStart)
func (m *Manager) uplinkRoutine(bgCtx context.Context, runStart time.Time) chan error {
errC := make(chan error)
go func() {
m.ctx.Info("Waiting for uplink packets")
defer close(errC)
for {
packets, err := wrapper.Receive()
if err != nil {
m.ctx.WithError(err).Warn("Error when computing concentrator boot time - using packet forwarder run start time")
m.setBootTime(runStart)
errC <- errors.Wrap(err, "Uplink packets retrieval error")
return
}
if len(packets) == 0 { // Empty payload => we sleep, then reiterate.
time.Sleep(m.uplinkPollingRate)
continue
}
}

validPackets, err := wrapUplinkPayload(packets, m.netClient.GatewayID())
if err != nil {
continue
}
m.statusMgr.HandledRXBatch(len(validPackets), len(packets))
if len(validPackets) == 0 {
m.ctx.Warn("Packets received, but with invalid CRC - ignoring")
time.Sleep(m.uplinkPollingRate)
continue
}
m.ctx.WithField("NbPackets", len(packets)).Info("Received uplink packets")
if !m.foundBootTime {
// First packets received => find concentrator boot time
err = m.findConcentratorBootTime(packets, runStart)
if err != nil {
m.ctx.WithError(err).Warn("Error when computing concentrator boot time - using packet forwarder run start time")
m.setBootTime(runStart)
}
}

validPackets := wrapUplinkPayload(m.ctx, packets, m.ignoreCRC, m.netClient.GatewayID())
m.statusMgr.HandledRXBatch(len(validPackets), len(packets))
if len(validPackets) == 0 {
// Packets received, but with invalid CRC - ignoring
time.Sleep(m.uplinkPollingRate)
continue
}

m.ctx.WithField("NbValidPackets", len(validPackets)).Info("Received valid packets - sending them to the back-end")
m.netClient.SendUplinks(validPackets)
m.ctx.WithField("NbValidPackets", len(validPackets)).Info("Sending valid uplink packets")
m.netClient.SendUplinks(validPackets)

select {
case <-bgCtx.Done():
errc <- nil
return
default:
continue
select {
case <-bgCtx.Done():
errC <- nil
return
default:
continue
}
}
}
}()
return errC
}

func (m *Manager) gpsRoutine(bgCtx context.Context, errC chan error) {
m.ctx.Info("Starting GPS update routine")
for {
select {
case <-bgCtx.Done():
return
default:
// The GPS time reference and coordinates are updated at `gpsUpdateRate`
err := wrapper.UpdateGPSData(m.ctx)
if err != nil {
errC <- errors.Wrap(err, "GPS update error")
func (m *Manager) gpsRoutine(bgCtx context.Context) chan error {
errC := make(chan error)
go func() {
m.ctx.Info("Starting GPS update routine")
defer close(errC)
for {
select {
case <-bgCtx.Done():
return
default:
// The GPS time reference and coordinates are updated at `gpsUpdateRate`
err := wrapper.UpdateGPSData(m.ctx)
if err != nil {
errC <- errors.Wrap(err, "GPS update error")
}
}
}
}
}()
return errC
}

func (m *Manager) downlinkRoutine(bgCtx context.Context) {
Expand All @@ -206,57 +216,86 @@ func (m *Manager) downlinkRoutine(bgCtx context.Context) {
}
}

func (m *Manager) statusRoutine(bgCtx context.Context, errC chan error) {
for {
select {
case <-time.After(statusRoutineSleepRate):
rtt, err := m.netClient.Ping()
if err != nil {
errC <- errors.Wrap(err, "Network server health check error")
continue
}

status, err := m.statusMgr.GenerateStatus(rtt)
if err != nil {
errC <- errors.Wrap(err, "Gateway status computation error")
func (m *Manager) statusRoutine(bgCtx context.Context) chan error {
errC := make(chan error)
go func() {
defer close(errC)
for {
select {
case <-time.After(statusRoutineSleepRate):
rtt, err := m.netClient.Ping()
m.ctx.WithField("RTT", rtt).Debug("Ping to the router successful")
if err != nil {
errC <- errors.Wrap(err, "Network server health check error")
return
}

status, err := m.statusMgr.GenerateStatus(rtt)
if err != nil {
errC <- errors.Wrap(err, "Gateway status computation error")
return
}

err = m.netClient.SendStatus(*status)
if err != nil {
errC <- errors.Wrap(err, "Gateway status transmission error")
return
}
case <-bgCtx.Done():
return
}
}
}()
return errC
}

err = m.netClient.SendStatus(*status)
if err != nil {
errC <- errors.Wrap(err, "Gateway status transmission error")
return
}
case <-bgCtx.Done():
return
func (m *Manager) networkRoutine(bgCtx context.Context) chan error {
errC := make(chan error)
go func() {
defer close(errC)
if err := m.netClient.RefreshRoutine(bgCtx); err != nil {
errC <- errors.Wrap(err, "Couldn't refresh account server token")
}
}
}()
return errC
}

func (m *Manager) startRoutines(bgCtx context.Context, err chan error, runTime time.Time) {
var errC = make(chan error)
upCtx, upCancel := context.WithCancel(bgCtx)
downCtx, downCancel := context.WithCancel(bgCtx)
statsCtx, statsCancel := context.WithCancel(bgCtx)
gpsCtx, gpsCancel := context.WithCancel(bgCtx)

go m.uplinkRoutine(upCtx, errC, runTime)
go m.downlinkRoutine(downCtx)
go m.statusRoutine(statsCtx, errC)
if m.isGPS {
go m.gpsRoutine(gpsCtx, errC)
}
select {
case routineErr := <-errC:
err <- routineErr
close(errC)
case <-bgCtx.Done():
err <- nil
}
upCancel()
gpsCancel()
downCancel()
statsCancel()
func (m *Manager) startRoutines(bgCtx context.Context, runTime time.Time) chan error {
err := make(chan error)
go func() {
upCtx, upCancel := context.WithCancel(bgCtx)
downCtx, downCancel := context.WithCancel(bgCtx)
statusCtx, statusCancel := context.WithCancel(bgCtx)
gpsCtx, gpsCancel := context.WithCancel(bgCtx)
networkCtx, networkCancel := context.WithCancel(bgCtx)

go m.downlinkRoutine(downCtx)
uplinkErrors := m.uplinkRoutine(upCtx, runTime)
statusErrors := m.statusRoutine(statusCtx)
networkErrors := m.networkRoutine(networkCtx)
var gpsErrors chan error
if m.isGPS {
gpsErrors = m.gpsRoutine(gpsCtx)
}
select {
case uplinkError := <-uplinkErrors:
err <- errors.Wrap(uplinkError, "Uplink routine error")
case statusError := <-statusErrors:
err <- errors.Wrap(statusError, "Status routine error")
case networkError := <-networkErrors:
err <- errors.Wrap(networkError, "Network routine error")
case gpsError := <-gpsErrors:
err <- errors.Wrap(gpsError, "GPS routine error")
case <-bgCtx.Done():
err <- nil
}
upCancel()
gpsCancel()
downCancel()
statusCancel()
networkCancel()
}()
return err
}

func (m *Manager) shutdown() error {
Expand Down
Loading

0 comments on commit b64203b

Please sign in to comment.