From 6602de0d49da7423d5ed96971764bba8a15f7b12 Mon Sep 17 00:00:00 2001 From: Dirkjan Bussink Date: Mon, 21 Oct 2024 12:03:50 +0200 Subject: [PATCH] Track type for id column for messages This tracks the explicit type for the id column so that we can use that properly when binding variables. If we don't bind the right type, we end up depending on implicit MySQL type casting behavior which isn't something we should do. Making it explicit here is much more robust. Signed-off-by: Dirkjan Bussink --- go/vt/vttablet/tabletserver/messager/message_manager.go | 8 ++++++-- go/vt/vttablet/tabletserver/schema/load_table.go | 8 ++++++++ go/vt/vttablet/tabletserver/schema/schema.go | 4 ++++ 3 files changed, 18 insertions(+), 2 deletions(-) diff --git a/go/vt/vttablet/tabletserver/messager/message_manager.go b/go/vt/vttablet/tabletserver/messager/message_manager.go index a1fc27187a3..7a217fb63b7 100644 --- a/go/vt/vttablet/tabletserver/messager/message_manager.go +++ b/go/vt/vttablet/tabletserver/messager/message_manager.go @@ -236,6 +236,9 @@ type messageManager struct { ackQuery *sqlparser.ParsedQuery postponeQuery *sqlparser.ParsedQuery purgeQuery *sqlparser.ParsedQuery + + // idType is the type of the id column in the message table. + idType sqltypes.Type } // newMessageManager creates a new message manager. @@ -259,6 +262,7 @@ func newMessageManager(tsv TabletService, vs VStreamer, table *schema.Table, pos purgeTicks: timer.NewTimer(table.MessageInfo.PollInterval), postponeSema: postponeSema, messagesPending: true, + idType: table.MessageInfo.IDType, } mm.cond.L = &mm.mu @@ -856,7 +860,7 @@ func (mm *messageManager) GenerateAckQuery(ids []string) (string, map[string]*qu } for _, id := range ids { idbvs.Values = append(idbvs.Values, &querypb.Value{ - Type: querypb.Type_VARBINARY, + Type: mm.idType, Value: []byte(id), }) } @@ -874,7 +878,7 @@ func (mm *messageManager) GeneratePostponeQuery(ids []string) (string, map[strin } for _, id := range ids { idbvs.Values = append(idbvs.Values, &querypb.Value{ - Type: querypb.Type_VARBINARY, + Type: mm.idType, Value: []byte(id), }) } diff --git a/go/vt/vttablet/tabletserver/schema/load_table.go b/go/vt/vttablet/tabletserver/schema/load_table.go index 6022f8724eb..2a2be33a208 100644 --- a/go/vt/vttablet/tabletserver/schema/load_table.go +++ b/go/vt/vttablet/tabletserver/schema/load_table.go @@ -157,6 +157,14 @@ func loadMessageInfo(ta *Table, comment string, collationEnv *collations.Environ ta.MessageInfo.Fields = getDefaultMessageFields(ta.Fields, hiddenCols) } + ta.MessageInfo.IDType = sqltypes.VarBinary + for _, field := range ta.MessageInfo.Fields { + if field.Name == "id" { + ta.MessageInfo.IDType = field.Type + break + } + } + return nil } diff --git a/go/vt/vttablet/tabletserver/schema/schema.go b/go/vt/vttablet/tabletserver/schema/schema.go index 4b3d9c88fb5..e800477da3b 100644 --- a/go/vt/vttablet/tabletserver/schema/schema.go +++ b/go/vt/vttablet/tabletserver/schema/schema.go @@ -20,6 +20,7 @@ import ( "sync" "time" + "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/sqlparser" @@ -125,6 +126,9 @@ type MessageInfo struct { // MaxBackoff specifies the longest duration message manager // should wait before rescheduling a message MaxBackoff time.Duration + + // IDType specifies the type of the ID column + IDType sqltypes.Type } // NewTable creates a new Table.