Skip to content

Commit

Permalink
Add serial binary communication (#653)
Browse files Browse the repository at this point in the history
* update bufferflow_timedraw as bufferflow_timed

* remove old commands

* remove utf8 decoding with timedraw buffer type

* binary support (WIP)

* use switch case

* fixed test deps

* socketio test connection is working 🎉 (with the correct python-socketio version)

* add callback to capture returned message, add new test for serial

* fix tests: "socketio.exceptions.ConnectionError: Connection refused by the server"

* minor optimizations: data and buf are already an array of bytes

* enhanced a bit how the logic of the serial works

* enhance a lot test on serial communication (with different buffer types)

The tests should be skipped on the CI (no board connected)

* update and enhance commands output (the space in front of `<` and `>` is required) 🤷‍♂️

* increased sleeptime, remove harcoded message[i]: should work on different systems

* generalize the tests

* Apply suggestions from code review

Co-authored-by: per1234 <[email protected]>

* add sketch used for testing

* Fix panic closing closed channel

* apply suggestions

* Partially revert #e80400b7ddbbc2e8f34f1e6701b55102c3a99289

* 🧹(cleanup) and 🛠️(refactoring) of bufferflow stuff

* extract code in helper function and uniform the code

reintroduce the closing of input channel (it's required)

* optimize the handling of data coming from the serial port

* uniform default bufferflow and 🧹

* forgot to fix this in #621

* apply suggestions from code review ✨

* remove timedbinary: it's the same as timedraw except for the casting

* Escape html commands string

* forgot to remove timed_binary

* remove useless id field (was unused)

* remove useless channel done & other stuff

* make sendNoBuf more general: will be used later 😏

* add `sendraw` command to send base64 encoded bytes, add tests (for send raw and for open/close port)

* forgot to skip test_sendraw_serial on CI

* update comments

* refactor tests

* remove BlockUntilReady because it was unused

Co-authored-by: per1234 <[email protected]>
Co-authored-by: Silvano Cerza <[email protected]>
  • Loading branch information
3 people committed Aug 18, 2021
1 parent d759c46 commit 056c22e
Show file tree
Hide file tree
Showing 14 changed files with 644 additions and 482 deletions.
46 changes: 2 additions & 44 deletions bufferflow.go
Original file line number Diff line number Diff line change
@@ -1,49 +1,7 @@
package main

import (
//"log"
//"time"
)

var availableBufferAlgorithms = []string{"default", "timed", "timedraw"}

type BufferMsg struct {
Cmd string
Port string
TriggeringResponse string
//Desc string
//Desc string
}

type Bufferflow interface {
Init()
BlockUntilReady(cmd string, id string) (bool, bool) // implement this method
//JustQueue(cmd string, id string) bool // implement this method
OnIncomingData(data string) // implement this method
ClearOutSemaphore() // implement this method
BreakApartCommands(cmd string) []string // implement this method
Pause() // implement this method
Unpause() // implement this method
SeeIfSpecificCommandsShouldSkipBuffer(cmd string) bool // implement this method
SeeIfSpecificCommandsShouldPauseBuffer(cmd string) bool // implement this method
SeeIfSpecificCommandsShouldUnpauseBuffer(cmd string) bool // implement this method
SeeIfSpecificCommandsShouldWipeBuffer(cmd string) bool // implement this method
SeeIfSpecificCommandsReturnNoResponse(cmd string) bool // implement this method
ReleaseLock() // implement this method
IsBufferGloballySendingBackIncomingData() bool // implement this method
Close() // implement this method
}

/*data packets returned to client*/
type DataCmdComplete struct {
Cmd string
Id string
P string
BufSize int `json:"-"`
D string `json:"-"`
}

type DataPerLine struct {
P string
D string
OnIncomingData(data string) // implement this method
Close() // implement this method
}
81 changes: 31 additions & 50 deletions bufferflow_default.go
Original file line number Diff line number Diff line change
@@ -1,71 +1,52 @@
package main

import (
"encoding/json"

log "github.com/sirupsen/logrus"
)

type BufferflowDefault struct {
Name string
Port string
port string
output chan<- []byte
input chan string
done chan bool
}

var ()
func NewBufferflowDefault(port string, output chan<- []byte) *BufferflowDefault {
return &BufferflowDefault{
port: port,
output: output,
input: make(chan string),
done: make(chan bool),
}
}

func (b *BufferflowDefault) Init() {
log.Println("Initting default buffer flow (which means no buffering)")
go b.consumeInput()
}

func (b *BufferflowDefault) BlockUntilReady(cmd string, id string) (bool, bool) {
//log.Printf("BlockUntilReady() start\n")
return true, false
func (b *BufferflowDefault) consumeInput() {
Loop:
for {
select {
case data := <-b.input:
m := SpPortMessage{b.port, data}
message, _ := json.Marshal(m)
b.output <- message
case <-b.done:
break Loop //this is required, a simple break statement would only exit the innermost switch statement
}
}
close(b.input) // close the input channel at the end of the computation
}

func (b *BufferflowDefault) OnIncomingData(data string) {
//log.Printf("OnIncomingData() start. data:%v\n", data)
}

// Clean out b.sem so it can truly block
func (b *BufferflowDefault) ClearOutSemaphore() {
}

func (b *BufferflowDefault) BreakApartCommands(cmd string) []string {
return []string{cmd}
}

func (b *BufferflowDefault) Pause() {
return
}

func (b *BufferflowDefault) Unpause() {
return
}

func (b *BufferflowDefault) SeeIfSpecificCommandsShouldSkipBuffer(cmd string) bool {
return false
}

func (b *BufferflowDefault) SeeIfSpecificCommandsShouldPauseBuffer(cmd string) bool {
return false
}

func (b *BufferflowDefault) SeeIfSpecificCommandsShouldUnpauseBuffer(cmd string) bool {
return false
}

func (b *BufferflowDefault) SeeIfSpecificCommandsShouldWipeBuffer(cmd string) bool {
return false
}

func (b *BufferflowDefault) SeeIfSpecificCommandsReturnNoResponse(cmd string) bool {
return false
}

func (b *BufferflowDefault) ReleaseLock() {
}

func (b *BufferflowDefault) IsBufferGloballySendingBackIncomingData() bool {
return false
b.input <- data
}

func (b *BufferflowDefault) Close() {
b.done <- true
close(b.done)
}
129 changes: 41 additions & 88 deletions bufferflow_timed.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,104 +8,57 @@ import (
)

type BufferflowTimed struct {
Name string
Port string
Output chan []byte
Input chan string
done chan bool
ticker *time.Ticker
port string
output chan<- []byte
input chan string
done chan bool
ticker *time.Ticker
sPort string
bufferedOutput string
}

var (
bufferedOutput string
sPort string
)
func NewBufferflowTimed(port string, output chan<- []byte) *BufferflowTimed {
return &BufferflowTimed{
port: port,
output: output,
input: make(chan string),
done: make(chan bool),
ticker: time.NewTicker(16 * time.Millisecond),
sPort: "",
bufferedOutput: "",
}
}

func (b *BufferflowTimed) Init() {
log.Println("Initting timed buffer flow (output once every 16ms)")
bufferedOutput = ""
sPort = ""

go func() {
b.ticker = time.NewTicker(16 * time.Millisecond)
b.done = make(chan bool)
Loop:
for {
select {
case data := <-b.Input:
bufferedOutput = bufferedOutput + data
sPort = b.Port
case <-b.ticker.C:
if bufferedOutput != "" {
m := SpPortMessage{sPort, bufferedOutput}
buf, _ := json.Marshal(m)
// data is now encoded in base64 format
// need a decoder on the other side
b.Output <- []byte(buf)
bufferedOutput = ""
sPort = ""
}
case <-b.done:
break Loop
go b.consumeInput()
}

func (b *BufferflowTimed) consumeInput() {
Loop:
for {
select {
case data := <-b.input: // use the buffer and append data to it
b.bufferedOutput = b.bufferedOutput + data
b.sPort = b.port
case <-b.ticker.C: // after 16ms send the buffered output message
if b.bufferedOutput != "" {
m := SpPortMessage{b.sPort, b.bufferedOutput}
buf, _ := json.Marshal(m)
b.output <- buf
// reset the buffer and the port
b.bufferedOutput = ""
b.sPort = ""
}
case <-b.done:
break Loop //this is required, a simple break statement would only exit the innermost switch statement
}

close(b.Input)

}()

}

func (b *BufferflowTimed) BlockUntilReady(cmd string, id string) (bool, bool) {
//log.Printf("BlockUntilReady() start\n")
return true, false
}
close(b.input)
}

func (b *BufferflowTimed) OnIncomingData(data string) {
b.Input <- data
}

// Clean out b.sem so it can truly block
func (b *BufferflowTimed) ClearOutSemaphore() {
}

func (b *BufferflowTimed) BreakApartCommands(cmd string) []string {
return []string{cmd}
}

func (b *BufferflowTimed) Pause() {
return
}

func (b *BufferflowTimed) Unpause() {
return
}

func (b *BufferflowTimed) SeeIfSpecificCommandsShouldSkipBuffer(cmd string) bool {
return false
}

func (b *BufferflowTimed) SeeIfSpecificCommandsShouldPauseBuffer(cmd string) bool {
return false
}

func (b *BufferflowTimed) SeeIfSpecificCommandsShouldUnpauseBuffer(cmd string) bool {
return false
}

func (b *BufferflowTimed) SeeIfSpecificCommandsShouldWipeBuffer(cmd string) bool {
return false
}

func (b *BufferflowTimed) SeeIfSpecificCommandsReturnNoResponse(cmd string) bool {
return false
}

func (b *BufferflowTimed) ReleaseLock() {
}

func (b *BufferflowTimed) IsBufferGloballySendingBackIncomingData() bool {
return true
b.input <- data
}

func (b *BufferflowTimed) Close() {
Expand Down
Loading

0 comments on commit 056c22e

Please sign in to comment.