-
Notifications
You must be signed in to change notification settings - Fork 50
/
data_test.go
96 lines (87 loc) · 2.05 KB
/
data_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package astits
import (
"bytes"
"testing"
"github.com/asticode/go-astikit"
"github.com/stretchr/testify/assert"
)
func TestParseData(t *testing.T) {
// Init
pm := newProgramMap()
ps := []*Packet{}
// Custom parser
cds := []*DemuxerData{{PID: 1}}
var c = func(ps []*Packet) (o []*DemuxerData, skip bool, err error) {
o = cds
skip = true
return
}
ds, err := parseData(ps, c, pm)
assert.NoError(t, err)
assert.Equal(t, cds, ds)
// Do nothing for CAT
ps = []*Packet{{Header: PacketHeader{PID: PIDCAT}}}
ds, err = parseData(ps, nil, pm)
assert.NoError(t, err)
assert.Empty(t, ds)
// PES
p := pesWithHeaderBytes()
ps = []*Packet{
{
Header: PacketHeader{PID: uint16(256)},
Payload: p[:33],
},
{
Header: PacketHeader{PID: uint16(256)},
Payload: p[33:],
},
}
ds, err = parseData(ps, nil, pm)
assert.NoError(t, err)
assert.Equal(t, []*DemuxerData{
{
FirstPacket: &Packet{Header: ps[0].Header, AdaptationField: ps[0].AdaptationField},
PES: pesWithHeader(),
PID: uint16(256),
}}, ds)
// PSI
pm.setUnlocked(uint16(256), uint16(1))
p = psiBytes()
ps = []*Packet{
{
Header: PacketHeader{PID: uint16(256)},
Payload: p[:33],
},
{
Header: PacketHeader{PID: uint16(256)},
Payload: p[33:],
},
}
ds, err = parseData(ps, nil, pm)
assert.NoError(t, err)
assert.Equal(t, psi.toData(
&Packet{Header: ps[0].Header, AdaptationField: ps[0].AdaptationField},
uint16(256),
), ds)
}
func TestIsPSIPayload(t *testing.T) {
pm := newProgramMap()
var pids []int
for i := 0; i <= 255; i++ {
if isPSIPayload(uint16(i), pm) {
pids = append(pids, i)
}
}
assert.Equal(t, []int{0, 16, 17, 18, 19, 20, 30, 31}, pids)
pm.setUnlocked(uint16(1), uint16(0))
assert.True(t, isPSIPayload(uint16(1), pm))
}
func TestIsPESPayload(t *testing.T) {
buf := &bytes.Buffer{}
w := astikit.NewBitsWriter(astikit.BitsWriterOptions{Writer: buf})
w.Write("0000000000000001")
assert.False(t, isPESPayload(buf.Bytes()))
buf.Reset()
w.Write("000000000000000000000001")
assert.True(t, isPESPayload(buf.Bytes()))
}