Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport: VStreamer: improve representation of integers in json data types (2b43fd7dfb804b421f71705eedab7c043675b648) #169

Merged
merged 1 commit into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ require (
github.com/spf13/cobra v1.4.0
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.8.1
github.com/spyzhov/ajson v0.4.2
github.com/stretchr/testify v1.7.1
github.com/tchap/go-patricia v2.2.6+incompatible
github.com/tebeka/selenium v0.9.9
Expand Down Expand Up @@ -118,6 +117,7 @@ require (
require (
github.com/bndr/gotabulate v1.1.2
github.com/hashicorp/go-version v1.6.0
github.com/spyzhov/ajson v0.8.0
golang.org/x/exp v0.0.0-20230811145659-89c5cff77bcb
modernc.org/sqlite v1.20.3
)
Expand Down Expand Up @@ -202,3 +202,5 @@ require (
modernc.org/token v1.0.1 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.0.3 // indirect
)

replace github.com/spyzhov/ajson v0.8.0 => github.com/rohit-nayak-ps/ajson v0.7.2-0.20230316112806-97deb03d883c
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,8 @@ github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6L
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k=
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
github.com/rohit-nayak-ps/ajson v0.7.2-0.20230316112806-97deb03d883c h1:Y/4qcogoZA2WUtLWMk/yXfJSpaIG3mK3r9Lw4kaARL4=
github.com/rohit-nayak-ps/ajson v0.7.2-0.20230316112806-97deb03d883c/go.mod h1:63V+CGM6f1Bu/p4nLIN8885ojBdt88TbLoSFzyqMuVA=
github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
Expand Down Expand Up @@ -695,8 +697,6 @@ github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An
github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s=
github.com/spf13/viper v1.8.1 h1:Kq1fyeebqsBfbjZj4EL7gj2IO0mMaiyjYUWcUsl2O44=
github.com/spf13/viper v1.8.1/go.mod h1:o0Pch8wJ9BVSWGQMbra6iw0oQ5oktSIBaujf1rJH9Ns=
github.com/spyzhov/ajson v0.4.2 h1:JMByd/jZApPKDvNsmO90X2WWGbmT2ahDFp73QhZbg3s=
github.com/spyzhov/ajson v0.4.2/go.mod h1:63V+CGM6f1Bu/p4nLIN8885ojBdt88TbLoSFzyqMuVA=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.2.0 h1:Hbg2NidpLE8veEBkEZTL3CvlkUIVzuU9jDplZO54c48=
Expand Down
196 changes: 147 additions & 49 deletions go/mysql/binlog_event_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (jh *BinlogJSON) register(typ jsonDataType, Plugin jsonPlugin) {
func (jh *BinlogJSON) getNode(typ jsonDataType, data []byte, pos int) (node *ajson.Node, err error) {
Plugin := jh.plugins[typ]
if Plugin == nil {
return nil, fmt.Errorf("Plugin not found for type %d", typ)
return nil, fmt.Errorf("plugin not found for type %d", typ)
}
return Plugin.getNode(typ, data, pos)
}
Expand Down Expand Up @@ -316,59 +316,157 @@ type intPlugin struct {

var _ jsonPlugin = (*intPlugin)(nil)

func (ih intPlugin) getVal(typ jsonDataType, data []byte, pos int) (value float64) {
func (ipl intPlugin) getVal(typ jsonDataType, data []byte, pos int) (value int64) {
var val uint64
var val2 float64
size := ih.sizes[typ]
var val2 int64
size := ipl.sizes[typ]
for i := 0; i < size; i++ {
val = val + uint64(data[pos+i])<<(8*i)
}
switch typ {
case jsonInt16:
val2 = float64(int16(val))
case jsonUint16:
val2 = float64(uint16(val))
val2 = int64(int16(val))
case jsonInt32:
val2 = float64(int32(val))
case jsonUint32:
val2 = float64(uint32(val))
val2 = int64(int32(val))
case jsonInt64:
val2 = float64(int64(val))
case jsonUint64:
val2 = float64(val)
case jsonDouble:
val2 = math.Float64frombits(val)
val2 = int64(val)
}
return val2
}

func (ih intPlugin) getNode(typ jsonDataType, data []byte, pos int) (node *ajson.Node, err error) {
val := ih.getVal(typ, data, pos)
node = ajson.NumericNode("", val)
func (ipl intPlugin) getNode(typ jsonDataType, data []byte, pos int) (node *ajson.Node, err error) {
val := ipl.getVal(typ, data, pos)
node = ajson.IntegerNode("", val)
return node, nil
}

func newIntPlugin() *intPlugin {
ih := &intPlugin{
ipl := &intPlugin{
info: &jsonPluginInfo{
name: "Int",
types: []jsonDataType{jsonInt64, jsonInt32, jsonInt16, jsonUint16, jsonUint32, jsonUint64, jsonDouble},
types: []jsonDataType{jsonInt64, jsonInt32, jsonInt16},
},
sizes: make(map[jsonDataType]int),
}
ipl.sizes = map[jsonDataType]int{
jsonInt64: 8,
jsonInt32: 4,
jsonInt16: 2,
}
for _, typ := range ipl.info.types {
binlogJSON.register(typ, ipl)
}
return ipl
}

//endregion

//region uint plugin

func init() {
newUintPlugin()
}

type uintPlugin struct {
info *jsonPluginInfo
sizes map[jsonDataType]int
}

var _ jsonPlugin = (*uintPlugin)(nil)

func (upl uintPlugin) getVal(typ jsonDataType, data []byte, pos int) (value uint64) {
var val uint64
var val2 uint64
size := upl.sizes[typ]
for i := 0; i < size; i++ {
val = val + uint64(data[pos+i])<<(8*i)
}
switch typ {
case jsonUint16:
val2 = uint64(uint16(val))
case jsonUint32:
val2 = uint64(uint32(val))
case jsonUint64:
val2 = val
}
return val2
}

func (upl uintPlugin) getNode(typ jsonDataType, data []byte, pos int) (node *ajson.Node, err error) {
val := upl.getVal(typ, data, pos)
node = ajson.UnsignedIntegerNode("", val)
return node, nil
}

func newUintPlugin() *uintPlugin {
upl := &uintPlugin{
info: &jsonPluginInfo{
name: "Uint",
types: []jsonDataType{jsonUint16, jsonUint32, jsonUint64},
},
sizes: make(map[jsonDataType]int),
}
ih.sizes = map[jsonDataType]int{
upl.sizes = map[jsonDataType]int{
jsonUint64: 8,
jsonInt64: 8,
jsonUint32: 4,
jsonInt32: 4,
jsonUint16: 2,
jsonInt16: 2,
}
for _, typ := range upl.info.types {
binlogJSON.register(typ, upl)
}
return upl
}

//endregion

//region float plugin

func init() {
newFloatPlugin()
}

type floatPlugin struct {
info *jsonPluginInfo
sizes map[jsonDataType]int
}

var _ jsonPlugin = (*floatPlugin)(nil)

func (flp floatPlugin) getVal(typ jsonDataType, data []byte, pos int) (value float64) {
var val uint64
var val2 float64
size := flp.sizes[typ]
for i := 0; i < size; i++ {
val = val + uint64(data[pos+i])<<(8*i)
}
switch typ {
case jsonDouble:
val2 = math.Float64frombits(val)
}
return val2
}

func (flp floatPlugin) getNode(typ jsonDataType, data []byte, pos int) (node *ajson.Node, err error) {
val := flp.getVal(typ, data, pos)
node = ajson.NumericNode("", val)
return node, nil
}

func newFloatPlugin() *floatPlugin {
fp := &floatPlugin{
info: &jsonPluginInfo{
name: "Float",
types: []jsonDataType{jsonDouble},
},
sizes: make(map[jsonDataType]int),
}
fp.sizes = map[jsonDataType]int{
jsonDouble: 8,
}
for _, typ := range ih.info.types {
binlogJSON.register(typ, ih)
for _, typ := range fp.info.types {
binlogJSON.register(typ, fp)
}
return ih
return fp
}

//endregion
Expand All @@ -385,7 +483,7 @@ type literalPlugin struct {

var _ jsonPlugin = (*literalPlugin)(nil)

func (lh literalPlugin) getNode(typ jsonDataType, data []byte, pos int) (node *ajson.Node, err error) {
func (lpl literalPlugin) getNode(typ jsonDataType, data []byte, pos int) (node *ajson.Node, err error) {
val := jsonDataLiteral(data[pos])
switch val {
case jsonNullLiteral:
Expand All @@ -401,14 +499,14 @@ func (lh literalPlugin) getNode(typ jsonDataType, data []byte, pos int) (node *a
}

func newLiteralPlugin() *literalPlugin {
lh := &literalPlugin{
lpl := &literalPlugin{
info: &jsonPluginInfo{
name: "Literal",
types: []jsonDataType{jsonLiteral},
},
}
binlogJSON.register(jsonLiteral, lh)
return lh
binlogJSON.register(jsonLiteral, lpl)
return lpl
}

//endregion
Expand All @@ -427,7 +525,7 @@ var _ jsonPlugin = (*opaquePlugin)(nil)

// other types are stored as catch-all opaque types: documentation on these is scarce.
// we currently know about (and support) date/time/datetime/decimal.
func (oh opaquePlugin) getNode(typ jsonDataType, data []byte, pos int) (node *ajson.Node, err error) {
func (opl opaquePlugin) getNode(typ jsonDataType, data []byte, pos int) (node *ajson.Node, err error) {
dataType := data[pos]
start := 3 // account for length of stored value
end := start + 8 // all currently supported opaque data types are 8 bytes in size
Expand Down Expand Up @@ -484,14 +582,14 @@ func (oh opaquePlugin) getNode(typ jsonDataType, data []byte, pos int) (node *aj
}

func newOpaquePlugin() *opaquePlugin {
oh := &opaquePlugin{
opl := &opaquePlugin{
info: &jsonPluginInfo{
name: "Opaque",
types: []jsonDataType{jsonOpaque},
},
}
binlogJSON.register(jsonOpaque, oh)
return oh
binlogJSON.register(jsonOpaque, opl)
return opl
}

//endregion
Expand All @@ -508,22 +606,22 @@ type stringPlugin struct {

var _ jsonPlugin = (*stringPlugin)(nil)

func (sh stringPlugin) getNode(typ jsonDataType, data []byte, pos int) (node *ajson.Node, err error) {
func (spl stringPlugin) getNode(typ jsonDataType, data []byte, pos int) (node *ajson.Node, err error) {
size, pos := readVariableLength(data, pos)
node = ajson.StringNode("", string(data[pos:pos+size]))

return node, nil
}

func newStringPlugin() *stringPlugin {
sh := &stringPlugin{
spl := &stringPlugin{
info: &jsonPluginInfo{
name: "String",
types: []jsonDataType{jsonString},
},
}
binlogJSON.register(jsonString, sh)
return sh
binlogJSON.register(jsonString, spl)
return spl
}

//endregion
Expand All @@ -542,7 +640,7 @@ var _ jsonPlugin = (*arrayPlugin)(nil)

// arrays are stored thus:
// | type_identifier(one of [2,3]) | elem count | obj size | list of offsets+lengths of values | actual values |
func (ah arrayPlugin) getNode(typ jsonDataType, data []byte, pos int) (node *ajson.Node, err error) {
func (apl arrayPlugin) getNode(typ jsonDataType, data []byte, pos int) (node *ajson.Node, err error) {
jlog("JSON Array %s, len %d", jsonDataTypeToString(uint(typ)), len(data))
var nodes []*ajson.Node
var elem *ajson.Node
Expand All @@ -565,15 +663,15 @@ func (ah arrayPlugin) getNode(typ jsonDataType, data []byte, pos int) (node *ajs
}

func newArrayPlugin() *arrayPlugin {
ah := &arrayPlugin{
apl := &arrayPlugin{
info: &jsonPluginInfo{
name: "Array",
types: []jsonDataType{jsonSmallArray, jsonLargeArray},
},
}
binlogJSON.register(jsonSmallArray, ah)
binlogJSON.register(jsonLargeArray, ah)
return ah
binlogJSON.register(jsonSmallArray, apl)
binlogJSON.register(jsonLargeArray, apl)
return apl
}

//endregion
Expand All @@ -592,7 +690,7 @@ var _ jsonPlugin = (*objectPlugin)(nil)

// objects are stored thus:
// | type_identifier(0/1) | elem count | obj size | list of offsets+lengths of keys | list of offsets+lengths of values | actual keys | actual values |
func (oh objectPlugin) getNode(typ jsonDataType, data []byte, pos int) (node *ajson.Node, err error) {
func (opl objectPlugin) getNode(typ jsonDataType, data []byte, pos int) (node *ajson.Node, err error) {
jlog("JSON Type is %s, len %d", jsonDataTypeToString(uint(typ)), len(data))

// "large" decides number of bytes used to specify element count and total object size: 4 bytes for large, 2 for small
Expand Down Expand Up @@ -640,15 +738,15 @@ func (oh objectPlugin) getNode(typ jsonDataType, data []byte, pos int) (node *aj
}

func newObjectPlugin() *objectPlugin {
oh := &objectPlugin{
opl := &objectPlugin{
info: &jsonPluginInfo{
name: "Object",
types: []jsonDataType{jsonSmallObject, jsonLargeObject},
},
}
binlogJSON.register(jsonSmallObject, oh)
binlogJSON.register(jsonLargeObject, oh)
return oh
binlogJSON.register(jsonSmallObject, opl)
binlogJSON.register(jsonLargeObject, opl)
return opl
}

//endregion
Loading
Loading