Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

more e2e tests #112

Merged
merged 3 commits into from
Sep 5, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
274 changes: 197 additions & 77 deletions roc/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,14 @@ type e2e struct {
Sender *Sender
}

func newE2E(t *testing.T) *e2e {
type e2eParams struct {
clockSource ClockSource
fecEncoding FecEncoding
sourceURI string
repairURI string
}

func newE2E(t *testing.T, params e2eParams) *e2e {
var (
err error
e e2e
Expand All @@ -32,17 +39,22 @@ func newE2E(t *testing.T) *e2e {
require.NotNil(t, e.Context)

// create receiver
e.Receiver, err = OpenReceiver(e.Context, makeReceiverConfig())
receiverConfig := makeReceiverConfig()
receiverConfig.ClockSource = params.clockSource
e.Receiver, err = OpenReceiver(e.Context, receiverConfig)
require.NoError(t, err)
require.NotNil(t, e.Receiver)

// create sender
e.Sender, err = OpenSender(e.Context, makeSenderConfig())
senderConfig := makeSenderConfig()
senderConfig.ClockSource = params.clockSource
senderConfig.FecEncoding = params.fecEncoding
e.Sender, err = OpenSender(e.Context, senderConfig)
require.NoError(t, err)
require.NotNil(t, e.Sender)

// create source endpoint
sourceEndpoint, err := ParseEndpoint("rtp+rs8m://127.0.0.1:0")
sourceEndpoint, err := ParseEndpoint(params.sourceURI)
require.NoError(t, err)
require.NotNil(t, sourceEndpoint)

Expand All @@ -51,23 +63,28 @@ func newE2E(t *testing.T) *e2e {
require.NoError(t, err)
require.NotEmpty(t, sourceEndpoint.Port)

// create repair endpoint
repairEndpoint, err := ParseEndpoint("rs8m://127.0.0.1:0")
require.NoError(t, err)
require.NotNil(t, repairEndpoint)
var repairEndpoint *Endpoint
if params.repairURI != "" {
// create repair endpoint
repairEndpoint, err = ParseEndpoint(params.repairURI)
require.NoError(t, err)
require.NotNil(t, repairEndpoint)

// bind receiver to repair endpoint
err = e.Receiver.Bind(SlotDefault, InterfaceAudioRepair, repairEndpoint)
require.NoError(t, err)
require.NotEmpty(t, repairEndpoint.Port)
// bind receiver to repair endpoint
err = e.Receiver.Bind(SlotDefault, InterfaceAudioRepair, repairEndpoint)
require.NoError(t, err)
require.NotEmpty(t, repairEndpoint.Port)
}

// connect sender to receiver source endpoint
err = e.Sender.Connect(SlotDefault, InterfaceAudioSource, sourceEndpoint)
require.NoError(t, err)

// connect sender to receiver repair endpoint
err = e.Sender.Connect(SlotDefault, InterfaceAudioRepair, repairEndpoint)
require.NoError(t, err)
if params.repairURI != "" {
// connect sender to receiver repair endpoint
err = e.Sender.Connect(SlotDefault, InterfaceAudioRepair, repairEndpoint)
require.NoError(t, err)
}

return &e
}
Expand All @@ -86,78 +103,181 @@ func (e *e2e) close(t *testing.T) {
}

func TestEnd2End_Default(t *testing.T) {
e := newE2E(t)
defer e.close(t)

samplesCnt := 100
testSamples := make([]float32, samplesCnt)
for i := 0; i < samplesCnt/NumChannels; i++ {
testSamples[i*NumChannels] = float32(i+1) / 100
testSamples[i*NumChannels+1] = -float32(i+1) / 100
tests := []struct {
name string
params e2eParams
}{
{
name: "default",
params: e2eParams{
clockSource: ClockExternal,
fecEncoding: FecEncodingDisable,
sourceURI: "rtp://127.0.0.1:0"},
},
{
name: "fec",
params: e2eParams{
clockSource: ClockExternal,
fecEncoding: FecEncodingRs8m,
sourceURI: "rtp+rs8m://127.0.0.1:0",
repairURI: "rs8m://127.0.0.1:0"},
},
}

var interval = time.Second / time.Duration(44100/samplesCnt)
sendTicker := time.NewTicker(interval)
defer sendTicker.Stop()

receiveTicker := time.NewTicker(interval)
defer receiveTicker.Stop()

endChan := make(chan struct{})
var wait sync.WaitGroup
wait.Add(1)
go func() {
for {
select {
case <-sendTicker.C:
err := e.Sender.WriteFloats(testSamples)
if err != nil {
t.Fail()
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
e := newE2E(t, tt.params)
defer e.close(t)

samplesCnt := 100
testSamples := generateTestSamples(samplesCnt)

var interval = time.Second / time.Duration(44100/samplesCnt)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Let's use sample rate from config, instead of 44100?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done!

sendTicker := time.NewTicker(interval)
defer sendTicker.Stop()

receiveTicker := time.NewTicker(interval)
defer receiveTicker.Stop()

endChan := make(chan struct{})
var wait sync.WaitGroup
wait.Add(1)
go func() {
for {
select {
case <-sendTicker.C:
err := e.Sender.WriteFloats(testSamples)
if err != nil {
t.Fail()
}
case <-endChan:
wait.Done()
return
}
}
}()

validationState := validationState{}
samples := make([]float32, samplesCnt)
for validationState.nonZeroSamplesCount < 10000 {
select {
case <-receiveTicker.C:
err := e.Receiver.ReadFloats(samples)
if err != nil {
t.Fail()
}
validateSamples(t, &validationState, samples, samplesCnt)
}
case <-endChan:
wait.Done()
return
}
}
}()

nonZeroSamplesCount := 0
prevL := 0
prevR := 0
samples := make([]float32, samplesCnt)
for nonZeroSamplesCount < 10000 {
select {
case <-receiveTicker.C:
err := e.Receiver.ReadFloats(samples)
if err != nil {
t.Fail()
}
samplesStr := samplesToString(samples)
for i := 0; i < len(samples); i += NumChannels {
valueL := int(math.Round(float64(samples[i] * 100)))
valueR := int(math.Round(float64(samples[i+1] * 100)))

if valueL == 0 { // packet loss or streaming not started yet
assert.Equal(t, 0, valueR)
} else {
nonZeroSamplesCount++
assert.Equal(t, valueL, -valueR)
if prevL != 0 {
require.Equal(t, valueL, prevL%(samplesCnt/NumChannels)+1,
"prevL: %d, valueL: %d, index: %d, samples: %s", prevL, valueL, i, samplesStr)
require.Equal(t, valueR, prevR%(samplesCnt/NumChannels)-1,
"prevR: %d, valueR: %d, index: %d, samples: %s", prevR, valueR, i+1, samplesStr)

endChan <- struct{}{}
wait.Wait()
})
}

}

func TestEnd2End_Blocking(t *testing.T) {
tests := []struct {
name string
params e2eParams
}{
{
name: "default",
params: e2eParams{
clockSource: ClockInternal,
fecEncoding: FecEncodingDisable,
sourceURI: "rtp://127.0.0.1:0"},
},
{
name: "fec",
params: e2eParams{
clockSource: ClockInternal,
fecEncoding: FecEncodingRs8m,
sourceURI: "rtp+rs8m://127.0.0.1:0",
repairURI: "rs8m://127.0.0.1:0"},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
e := newE2E(t, tt.params)
defer e.close(t)

samplesCnt := 100
testSamples := generateTestSamples(samplesCnt)

endChan := make(chan struct{})
var wait sync.WaitGroup
wait.Add(1)
go func() {
for {
select {
case <-endChan:
wait.Done()
return
default:
err := e.Sender.WriteFloats(testSamples)
if err != nil {
t.Fail()
}
}
}
prevL = valueL
prevR = valueR
}()

validationState := validationState{}
samples := make([]float32, samplesCnt)
for validationState.nonZeroSamplesCount < 10000 {
err := e.Receiver.ReadFloats(samples)
if err != nil {
t.Fail()
}
validateSamples(t, &validationState, samples, samplesCnt)
}
}

endChan <- struct{}{}
wait.Wait()
})
}

}

func generateTestSamples(samplesCnt int) []float32 {
testSamples := make([]float32, samplesCnt)
for i := 0; i < samplesCnt/NumChannels; i++ {
testSamples[i*NumChannels] = float32(i+1) / 100
testSamples[i*NumChannels+1] = -float32(i+1) / 100
}
return testSamples
}

type validationState struct {
nonZeroSamplesCount int
prevL int
prevR int
}

endChan <- struct{}{}
wait.Wait()
func validateSamples(t *testing.T, state *validationState, samples []float32, samplesCnt int) {
samplesStr := samplesToString(samples)
for i := 0; i < len(samples); i += NumChannels {
valueL := int(math.Round(float64(samples[i] * 100)))
valueR := int(math.Round(float64(samples[i+1] * 100)))

if valueL == 0 { // packet loss or streaming not started yet
assert.Equal(t, 0, valueR)
} else {
state.nonZeroSamplesCount++
assert.Equal(t, valueL, -valueR)
if state.prevL != 0 {
require.Equal(t, valueL, state.prevL%(samplesCnt/NumChannels)+1,
"prevL: %d, valueL: %d, index: %d, samples: %s", state.prevL, valueL, i, samplesStr)
require.Equal(t, valueR, state.prevR%(samplesCnt/NumChannels)-1,
"prevR: %d, valueR: %d, index: %d, samples: %s", state.prevR, valueR, i+1, samplesStr)
}
}
state.prevL = valueL
state.prevR = valueR
}
}

func samplesToString(samples []float32) string {
Expand Down