Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v18 backport: Online DDL: avoid SQL's CONVERT(...), convert programmatically if needed #16604

Merged
merged 1 commit into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
MODIFY `t1` varchar(128) CHARACTER SET utf8mb4 NOT NULL, MODIFY `t2` varchar(128) CHARACTER SET latin2 NOT NULL, MODIFY `tutf8` varchar(128) CHARACTER SET latin1 NOT NULL
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
drop table if exists onlineddl_test;
create table onlineddl_test (
id int auto_increment,
t1 varchar(128) charset latin1 collate latin1_swedish_ci,
t2 varchar(128) charset latin1 collate latin1_swedish_ci,
tutf8 varchar(128) charset utf8,
tutf8mb4 varchar(128) charset utf8mb4,
tlatin1 varchar(128) charset latin1 collate latin1_swedish_ci,
primary key(id)
) auto_increment=1;

insert into onlineddl_test values (null, md5(rand()), md5(rand()), md5(rand()), md5(rand()), md5(rand()));
insert into onlineddl_test values (null, 'átesting', 'átesting', 'átesting', 'átesting', 'átesting');
insert into onlineddl_test values (null, 'testátest', 'testátest', 'testátest', '🍻😀', 'átesting');
insert into onlineddl_test values (null, 'átesting-binlog', 'átesting-binlog', 'átesting-binlog', 'átesting-binlog', 'átesting-binlog');
insert into onlineddl_test values (null, 'testátest-binlog', 'testátest-binlog', 'testátest-binlog', '🍻😀', 'átesting-binlog');
insert into onlineddl_test values (null, 'átesting-bnull', 'átesting-bnull', 'átesting-bnull', null, null);

drop event if exists onlineddl_test;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
(5.5|5.6|5.7)
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
drop table if exists onlineddl_test;
create table onlineddl_test (
id varchar(128) charset latin1 collate latin1_swedish_ci,
t1 varchar(128) charset latin1 collate latin1_swedish_ci,
t2 varchar(128) charset latin1 collate latin1_swedish_ci,
tutf8 varchar(128) charset utf8,
tutf8mb4 varchar(128) charset utf8mb4,
tlatin1 varchar(128) charset latin1 collate latin1_swedish_ci,
primary key(id)
) auto_increment=1;

insert into onlineddl_test values (concat('átesting-', md5(rand())), md5(rand()), md5(rand()), md5(rand()), md5(rand()), md5(rand()));
insert into onlineddl_test values (concat('átesting-', md5(rand())), 'átesting', 'átesting', 'átesting', 'átesting', 'átesting');
insert into onlineddl_test values (concat('átesting-', md5(rand())), 'testátest', 'testátest', 'testátest', '🍻😀', 'átesting');

drop event if exists onlineddl_test;
delimiter ;;
create event onlineddl_test
on schedule every 1 second
starts current_timestamp
ends current_timestamp + interval 60 second
on completion not preserve
enable
do
begin
insert into onlineddl_test values (concat('átesting-', md5(rand())), md5(rand()), md5(rand()), md5(rand()), md5(rand()), md5(rand()));
insert into onlineddl_test values (concat('átesting-', md5(rand())), 'átesting-binlog', 'átesting-binlog', 'átesting-binlog', 'átesting-binlog', 'átesting-binlog');
insert into onlineddl_test values (concat('átesting-', md5(rand())), 'testátest-binlog', 'testátest-binlog', 'testátest-binlog', '🍻😀', 'átesting-binlog');
insert into onlineddl_test values (concat('átesting-', md5(rand())), 'átesting-bnull', 'átesting-bnull', 'átesting-bnull', null, null);
end ;;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
(5.5|5.6|5.7)
16 changes: 10 additions & 6 deletions go/vt/sqlparser/parsed_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type ParsedQuery struct {
}

type bindLocation struct {
offset, length int
Offset, Length int
}

// NewParsedQuery returns a ParsedQuery of the ast.
Expand Down Expand Up @@ -67,8 +67,8 @@ func (pq *ParsedQuery) GenerateQuery(bindVariables map[string]*querypb.BindVaria
func (pq *ParsedQuery) Append(buf *strings.Builder, bindVariables map[string]*querypb.BindVariable, extras map[string]Encodable) error {
current := 0
for _, loc := range pq.bindLocations {
buf.WriteString(pq.Query[current:loc.offset])
name := pq.Query[loc.offset : loc.offset+loc.length]
buf.WriteString(pq.Query[current:loc.Offset])
name := pq.Query[loc.Offset : loc.Offset+loc.Length]
if encodable, ok := extras[name[1:]]; ok {
encodable.EncodeSQL(buf)
} else {
Expand All @@ -78,7 +78,7 @@ func (pq *ParsedQuery) Append(buf *strings.Builder, bindVariables map[string]*qu
}
EncodeValue(buf, supplied)
}
current = loc.offset + loc.length
current = loc.Offset + loc.Length
}
buf.WriteString(pq.Query[current:])
return nil
Expand Down Expand Up @@ -122,7 +122,7 @@ func (pq *ParsedQuery) AppendFromRow(buf *bytes2.Buffer, fields []*querypb.Field
var offsetQuery int
for i, loc := range pq.bindLocations {
col := rowInfo[i]
buf.WriteString(pq.Query[offsetQuery:loc.offset])
buf.WriteString(pq.Query[offsetQuery:loc.Offset])
typ := col.typ

switch typ {
Expand All @@ -148,12 +148,16 @@ func (pq *ParsedQuery) AppendFromRow(buf *bytes2.Buffer, fields []*querypb.Field
vv.EncodeSQLBytes2(buf)
}
}
offsetQuery = loc.offset + loc.length
offsetQuery = loc.Offset + loc.Length
}
buf.WriteString(pq.Query[offsetQuery:])
return nil
}

func (pq *ParsedQuery) BindLocations() []bindLocation {
return pq.bindLocations
}

// MarshalJSON is a custom JSON marshaler for ParsedQuery.
// Note that any queries longer that 512 bytes will be truncated.
func (pq *ParsedQuery) MarshalJSON() ([]byte, error) {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/sqlparser/parsed_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestNewParsedQuery(t *testing.T) {
pq := NewParsedQuery(stmt)
want := &ParsedQuery{
Query: "select * from a where id = :id",
bindLocations: []bindLocation{{offset: 27, length: 3}},
bindLocations: []bindLocation{{Offset: 27, Length: 3}},
}
if !reflect.DeepEqual(pq, want) {
t.Errorf("GenerateParsedQuery: %+v, want %+v", pq, want)
Expand Down
4 changes: 2 additions & 2 deletions go/vt/sqlparser/tracked_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,8 +285,8 @@ func areBothISExpr(op Expr, val Expr) bool {
// tracking information for future substitutions.
func (buf *TrackedBuffer) WriteArg(prefix, arg string) {
buf.bindLocations = append(buf.bindLocations, bindLocation{
offset: buf.Len(),
length: len(prefix) + len(arg),
Offset: buf.Len(),
Length: len(prefix) + len(arg),
})
buf.WriteString(prefix)
buf.WriteString(arg)
Expand Down
15 changes: 9 additions & 6 deletions go/vt/vttablet/onlineddl/vrepl.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,9 @@ func (v *VRepl) generateFilterQuery(ctx context.Context) error {
sb.WriteString(fmt.Sprintf("CONCAT(%s)", escapeName(name)))
case sourceCol.Type == vrepl.JSONColumnType:
sb.WriteString(fmt.Sprintf("convert(%s using utf8mb4)", escapeName(name)))
case targetCol.Type == vrepl.JSONColumnType:
// Convert any type to JSON: encode the type as utf8mb4 text
sb.WriteString(fmt.Sprintf("convert(%s using utf8mb4)", escapeName(name)))
case sourceCol.Type == vrepl.StringColumnType:
// Check source and target charset/encoding. If needed, create
// a binlogdatapb.CharsetConversion entry (later written to vreplication)
Expand All @@ -523,19 +526,19 @@ func (v *VRepl) generateFilterQuery(ctx context.Context) error {
if targetCol.Type == vrepl.StringColumnType && toCollation == collations.Unknown {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "Character set %s not supported for column %s", targetCol.Charset, targetCol.Name)
}

if trivialCharset(fromCollation) && trivialCharset(toCollation) && targetCol.Type != vrepl.JSONColumnType {
if trivialCharset(fromCollation) && trivialCharset(toCollation) {
sb.WriteString(escapeName(name))
} else if fromCollation == toCollation {
// No need for charset conversions as both have the same collation.
sb.WriteString(escapeName(name))
} else {
// Charset conversion required:
v.convertCharset[targetName] = &binlogdatapb.CharsetConversion{
FromCharset: sourceCol.Charset,
ToCharset: targetCol.Charset,
}
sb.WriteString(fmt.Sprintf("convert(%s using utf8mb4)", escapeName(name)))
sb.WriteString(escapeName(name))
}
case targetCol.Type == vrepl.JSONColumnType && sourceCol.Type != vrepl.JSONColumnType:
// Convert any type to JSON: encode the type as utf8mb4 text
sb.WriteString(fmt.Sprintf("convert(%s using utf8mb4)", escapeName(name)))
default:
sb.WriteString(escapeName(name))
}
Expand Down
119 changes: 113 additions & 6 deletions go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"vitess.io/vitess/go/mysql/collations/charset"
"vitess.io/vitess/go/mysql/collations/colldata"
vjson "vitess.io/vitess/go/mysql/json"
"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
Expand Down Expand Up @@ -252,7 +253,7 @@ func (tp *TablePlan) applyBulkInsert(sqlbuffer *bytes2.Buffer, rows []*querypb.R
if i > 0 {
sqlbuffer.WriteString(", ")
}
if err := tp.BulkInsertValues.AppendFromRow(sqlbuffer, tp.Fields, row, tp.FieldsToSkip); err != nil {
if err := tp.appendFromRow(sqlbuffer, row); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -307,6 +308,30 @@ func (tp *TablePlan) isOutsidePKRange(bindvars map[string]*querypb.BindVariable,
return false
}

// convertStringCharset does a charset conversion given raw data and an applicable conversion rule.
// In case of a conversion error, it returns an equivalent of MySQL error 1366, which is what you'd
// get in a failed `CONVERT()` function, e.g.:
//
// > create table tascii(v varchar(100) charset ascii);
// > insert into tascii values ('€');
// ERROR 1366 (HY000): Incorrect string value: '\xE2\x82\xAC' for column 'v' at row 1
func (tp *TablePlan) convertStringCharset(raw []byte, conversion *binlogdatapb.CharsetConversion, fieldName string) ([]byte, error) {
fromCollation := collations.Local().DefaultCollationForCharset(conversion.FromCharset)
if fromCollation == collations.Unknown {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "character set %s not supported for column %s", conversion.FromCharset, fieldName)
}
toCollation := collations.Local().DefaultCollationForCharset(conversion.ToCharset)
if toCollation == collations.Unknown {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "character set %s not supported for column %s", conversion.ToCharset, fieldName)
}

out, err := charset.Convert(nil, colldata.Lookup(toCollation).Charset(), raw, colldata.Lookup(fromCollation).Charset())
if err != nil {
return nil, sqlerror.NewSQLError(sqlerror.ERTruncatedWrongValueForField, sqlerror.SSUnknownSQLState, "Incorrect string value: %s", err.Error())
}
return out, nil
}

// bindFieldVal returns a bind variable based on given field and value.
// Most values will just bind directly. But some values may need manipulation:
// - text values with charset conversion
Expand All @@ -315,11 +340,7 @@ func (tp *TablePlan) isOutsidePKRange(bindvars map[string]*querypb.BindVariable,
func (tp *TablePlan) bindFieldVal(field *querypb.Field, val *sqltypes.Value) (*querypb.BindVariable, error) {
if conversion, ok := tp.ConvertCharset[field.Name]; ok && !val.IsNull() {
// Non-null string value, for which we have a charset conversion instruction
fromCollation := collations.Local().DefaultCollationForCharset(conversion.FromCharset)
if fromCollation == collations.Unknown {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "Character set %s not supported for column %s", conversion.FromCharset, field.Name)
}
out, err := charset.Convert(nil, charset.Charset_utf8mb4{}, val.Raw(), colldata.Lookup(fromCollation).Charset())
out, err := tp.convertStringCharset(val.Raw(), conversion, field.Name)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -486,3 +507,89 @@ func valsEqual(v1, v2 sqltypes.Value) bool {
// Compare content only if none are null.
return v1.ToString() == v2.ToString()
}

// AppendFromRow behaves like Append but takes a querypb.Row directly, assuming that
// the fields in the row are in the same order as the placeholders in this query. The fields might include generated
// columns which are dropped, by checking against skipFields, before binding the variables
// note: there can be more fields than bind locations since extra columns might be requested from the source if not all
// primary keys columns are present in the target table, for example. Also some values in the row may not correspond for
// values from the database on the source: sum/count for aggregation queries, for example
func (tp *TablePlan) appendFromRow(buf *bytes2.Buffer, row *querypb.Row) error {
bindLocations := tp.BulkInsertValues.BindLocations()
if len(tp.Fields) < len(bindLocations) {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "wrong number of fields: got %d fields for %d bind locations ",
len(tp.Fields), len(bindLocations))
}

type colInfo struct {
typ querypb.Type
length int64
offset int64
field *querypb.Field
}
rowInfo := make([]*colInfo, 0)

offset := int64(0)
for i, field := range tp.Fields { // collect info required for fields to be bound
length := row.Lengths[i]
if !tp.FieldsToSkip[strings.ToLower(field.Name)] {
rowInfo = append(rowInfo, &colInfo{
typ: field.Type,
length: length,
offset: offset,
field: field,
})
}
if length > 0 {
offset += row.Lengths[i]
}
}

// bind field values to locations
var offsetQuery int
for i, loc := range bindLocations {
col := rowInfo[i]
buf.WriteString(tp.BulkInsertValues.Query[offsetQuery:loc.Offset])
typ := col.typ

switch typ {
case querypb.Type_TUPLE:
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected Type_TUPLE for value %d", i)
case querypb.Type_JSON:
if col.length < 0 { // An SQL NULL and not an actual JSON value
buf.WriteString(sqltypes.NullStr)
} else { // A JSON value (which may be a JSON null literal value)
buf2 := row.Values[col.offset : col.offset+col.length]
vv, err := vjson.MarshalSQLValue(buf2)
if err != nil {
return err
}
buf.WriteString(vv.RawStr())
}
default:
if col.length < 0 {
// -1 means a null variable; serialize it directly
buf.WriteString(sqltypes.NullStr)
} else {
raw := row.Values[col.offset : col.offset+col.length]
var vv sqltypes.Value

if conversion, ok := tp.ConvertCharset[col.field.Name]; ok && col.length > 0 {
// Non-null string value, for which we have a charset conversion instruction
out, err := tp.convertStringCharset(raw, conversion, col.field.Name)
if err != nil {
return err
}
vv = sqltypes.MakeTrusted(typ, out)
} else {
vv = sqltypes.MakeTrusted(typ, raw)
}

vv.EncodeSQLBytes2(buf)
}
}
offsetQuery = loc.Offset + loc.Length
}
buf.WriteString(tp.BulkInsertValues.Query[offsetQuery:])
return nil
}
Loading