Skip to content

Commit

Permalink
refactor join logic (pingcap#8)
Browse files Browse the repository at this point in the history
  • Loading branch information
AilinKid authored Mar 7, 2022
1 parent d4ef9af commit 6d9b0ef
Show file tree
Hide file tree
Showing 4 changed files with 189 additions and 23 deletions.
2 changes: 1 addition & 1 deletion planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4264,7 +4264,7 @@ func (ds *DataSource) ExtractFD() *fd.FDSet {
}
}
// handle the datasource conditions (maybe pushed down from upper layer OP)
if ds.allConds != nil {
if len(ds.allConds) != 0 {
// extract the not null attributes from selection conditions.
notnullColsUniqueIDs := extractNotNullFromConds(ds.allConds, ds)

Expand Down
57 changes: 56 additions & 1 deletion planner/functional_dependency/extract_fd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,6 @@ func TestFDSet_ExtractFD(t *testing.T) {
}

func TestFDSet_ExtractFDForApply(t *testing.T) {
t.Parallel()
ass := assert.New(t)

store, clean := testkit.CreateMockStore(t)
Expand Down Expand Up @@ -317,3 +316,59 @@ func TestFDSet_ExtractFDForApply(t *testing.T) {
ass.Equal(tt.fd, plannercore.FDToString(p.(plannercore.LogicalPlan)), comment)
}
}

func TestFDSet_MakeOuterJoin(t *testing.T) {
ass := assert.New(t)

store, clean := testkit.CreateMockStore(t)
defer clean()
par := parser.New()
par.SetParserConfig(parser.ParserConfig{EnableWindowFunction: true, EnableStrictDoubleTypeCheck: true})

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("CREATE TABLE X (a INT PRIMARY KEY, b INT, c INT, d INT, e INT)")
tk.MustExec("CREATE UNIQUE INDEX uni ON X (b, c)")
tk.MustExec("CREATE TABLE Y (m INT, n INT, p INT, q INT, PRIMARY KEY (m, n))")

tests := []struct {
sql string
best string
fd string
}{
{
sql: "select * from X left outer join (select *, p+q from Y) Y1 ON true",
best: "Join{DataScan(X)->DataScan(Y)->Projection}->Projection",
fd: "{(1)-->(2-5), (2,3)~~>(1,4,5), (6,7)-->(8,9,11), (8,9)~~>(11), (1,6,7)-->(2-5,8,9,11)} >>> {(1)-->(2-5), (2,3)~~>(1,4,5), (6,7)-->(8,9,11), (8,9)~~>(11), (1,6,7)-->(2-5,8,9,11)}",
},
{
sql: "select * ",
},
}

ctx := context.TODO()
is := testGetIS(ass, tk.Session())
for i, tt := range tests {
if i == 0 {
fmt.Println(1)
}
comment := fmt.Sprintf("case:%v sql:%s", i, tt.sql)
stmt, err := par.ParseOneStmt(tt.sql, "", "")
ass.Nil(err, comment)
tk.Session().GetSessionVars().PlanID = 0
tk.Session().GetSessionVars().PlanColumnID = 0
err = plannercore.Preprocess(tk.Session(), stmt, plannercore.WithPreprocessorReturn(&plannercore.PreprocessorReturn{InfoSchema: is}))
ass.Nil(err)
tk.Session().PrepareTSFuture(ctx)
builder, _ := plannercore.NewPlanBuilder().Init(tk.Session(), is, &hint.BlockHintProcessor{})
// extract FD to every OP
p, err := builder.Build(ctx, stmt)
ass.Nil(err)
p, err = plannercore.LogicalOptimizeTest(ctx, builder.GetOptFlag(), p.(plannercore.LogicalPlan))
ass.Nil(err)
ass.Equal(tt.best, plannercore.ToString(p), comment)
// extract FD to every OP
p.(plannercore.LogicalPlan).ExtractFD()
ass.Equal(tt.fd, plannercore.FDToString(p.(plannercore.LogicalPlan)), comment)
}
}
151 changes: 131 additions & 20 deletions planner/functional_dependency/fd_graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,8 @@ func (s *FDSet) MakeCartesianProduct(rhs *FDSet) {
s.fdEdges = append(s.fdEdges, fd)
}
}
// todo: add strict FD: (left key + right key) -> all cols.
// maintain a key?
}

// MakeApply maintain the FD relationship between outer and inner table after Apply OP is done.
Expand All @@ -506,43 +508,152 @@ func (s *FDSet) MakeApply(inner *FDSet) {
}

// MakeOuterJoin generates the records the fdSet of the outer join.
//
// We always take the left as the row-supplying side, and right side as the null-supplying side. (swap it if not)
// As we know, the outer join would generate null extended rows compared with inner join.
// So we cannot directly do the same thing with the inner join. This function deals with the special cases of the outer join.
//
// Knowledge:
// 1: the filter condition related to lhs column won't filter predicate-allowed rows and refuse null rows (left rows are always exist for all)
// 2: the filter condition related to rhs column won't filter NULL rows although filter has not null attribute on it. (null-appending happened after that)
//
// Notification:
// 1: the origin FD from left side (rows-supplying) over the result of outer join filtered are preserved, because
// it may be duplicated by multi matching, but actually they are same left rows (don't violate FD definition).
//
// 2: the origin FD from right side (nulls-supplying) over the result of outer join filtered may not be valid anymore.
//
// <1> strict FD may be wakened as a lax one. But if at least one non-NULL column is part of the determinant, the
// strict FD can be preserved.
// a b | c d e
// ------+----------------
// 1 1 | 1 NULL 1
// 1 2 | NULL NULL NULL
// 2 1 | NULL NULL NULL
// left join with (a,b) * (c,d,e) on (a=c and b=1), if there is a strict FD {d} -> {e} on the rhs. After supplied
// with null values, {d} -> {e} are degraded to a lax one {d} ~~> {e} as you see. the origin and supplied null value
// for d column determine different dependency. NULL -> 1 and NULL -> NULL which breaks strict FD definition.
//
// Unless the determinant contains at least a not null column for example c here, FD like {c,d} -> {e} can survive
// after the left join. Because you can not find two same key, one from the origin rows and the other one from the
// supplied rows.
//
// for lax FD, the supplied rows of null values doesn't take effect of lax FD itself. So it can be kept.
//
// <2> constant FD should be removed, since null values may be substituted for some unmatched left rows. NULL is not a
// constant anymore.
//
// <3> equivalence FD should be removed, since substituted null value are not equal to the other substituted null value.
//
// 3: the newly added FD from filters, should take some considerations as below:
//
// <1> strict/lax FD: join key filter conditions can not produce new strict/lax FD yet (knowledge: 1&2).
//
// <2> constant FD from the join conditions is only used for the matching mechanism judgement.
// a b | c d
// -------+---------
// 1 1 | 1 1
// 1 2 | NULL NULL
// left join with (a,b) * (c,d) on (a=c and d=1), some rhs rows will be substituted with null values, and FD on rhs
// {d=1} are lost.
//
// a b | c d
// -------+---------
// 1 1 | 1 1
// 1 2 | NULL NULL
// left join with (a,b) * (c,d) on (a=c and b=1), it only gives the pass to the first matching, lhs other rows are still
// kept and appended with null values. So the FD on rhs {b=1} are not applicable to lhs rows.
//
// above all: constant FD are lost
//
// <3> equivalence FD: let's see equivalence FD as double-directed strict FD from join equal conditions, and we only keep the
// rhs ~~> lhs.
// a b | c d e
// ------+----------------
// 1 1 | 1 NULL 1
// 1 2 | NULL NULL NULL
// 2 1 | NULL NULL NULL
// left join with (a,b) * (c,d,e) on (a=c and b=1). From the join equivalence condition can derive a new FD {ac} == {ac}.
// while since there are some supplied null value in the c column, we don't guarantee {ac} == {ac} yet, so do {a} -> {c}
// because two same determinant key {1} can point to different dependency {1} & {NULL}. But in return, FD like {c} -> {a}
// are degraded to the corresponding lax one.
//
// 4: the new formed FD {left key, right key} -> {all columns} are preserved in spite of the null-supplied rows.
//
func (s *FDSet) MakeOuterJoin(innerFDs, filterFDs *FDSet, outerCols, innerCols FastIntSet) {
// copy down the left PK and right PK before the s has changed for later usage.
leftPK, ok1 := s.FindPrimaryKey()
rightPK, ok2 := innerFDs.FindPrimaryKey()

for _, edge := range innerFDs.fdEdges {
// We don't maintain the equiv edges and lax edges currently.
if edge.equiv || !edge.strict {
// Rule #2.2, constant FD are removed from right side of left join.
if edge.isConstant() {
continue
}
// If the one of the column from the inner child's functional dependency's left side is not null, this FD
// can be remained.
// Rule #2.3, equivalence FD are removed from right side of left join.
if edge.equiv {
continue
}
// Rule #2.1, lax FD can be kept after the left join.
if !edge.strict {
s.addFunctionalDependency(edge.from, edge.to, edge.strict, edge.equiv)
continue
}
// Rule #2.1, strict FD can be kept when determinant contains not null column, otherwise, downgraded to the lax one.
//
// If the one of the column from the inner child's functional dependency's left side is not null, this FD can be remained.
// This is because that the outer join would generate null-extended rows. So if at least one row from the left side
// is not null. We can guarantee that the there's no same part between the original rows and the generated rows.
// So the null extended rows would not break the original functional dependency.
if edge.from.SubsetOf(innerFDs.NotNullCols) {
s.addFunctionalDependency(edge.from, edge.to, edge.strict, edge.equiv)
} else if edge.from.SubsetOf(filterFDs.NotNullCols) {
// If we can make sure the filters of the join would filter out all nulls of this FD's left side
// and this FD is from the join's inner child. This FD can be remained.
// This is because the outer join filters out the null values. The generated null-extended rows would not
// find the same row from the original rows of the inner child. So it won't break the original functional dependency.
if edge.from.Intersects(innerFDs.NotNullCols) {
// One of determinant are not null column, strict FD are kept.
// According knowledge #2, we can't take use of right filter's not null attribute.
s.addFunctionalDependency(edge.from, edge.to, edge.strict, edge.equiv)
} else {
// Otherwise, the strict FD are downgraded to a lax one.
s.addFunctionalDependency(edge.from, edge.to, false, edge.equiv)
}
}
for _, edge := range filterFDs.fdEdges {
// We don't maintain the equiv edges and the lax edges currently.
if edge.equiv || !edge.strict {
// Rule #3.2, constant FD are removed from right side of left join.
if edge.isConstant() {
continue
}
if edge.from.SubsetOf(innerCols) && edge.to.SubsetOf(innerCols) && edge.from.SubsetOf(filterFDs.NotNullCols) {
// The functional dependency generated from the join filter would be reserved if it meets the following conditions:
// 1. All columns from this functional dependency are the columns from the inner side.
// 2. The join keys can filter out the null values from the left side of the FD.
// This is the same with the above cases. If the join filters can filter out the null values of the FD's left side,
// We won't find a same row between the original rows of the inner side and the generated null-extended rows.
s.addFunctionalDependency(edge.from, edge.to, edge.strict, edge.equiv)
// Rule #3.3, we only keep the lax FD from right side pointing the left side.
if edge.equiv {
// equivalence: {superset} --> {superset}, either `from` or `to` side is ok here.
laxFDFrom := edge.from.Intersection(innerCols)
laxFDTo := edge.from.Intersection(outerCols)
// need to break down the superset of equivalence, adding each lax FD of them.
for i, ok := laxFDFrom.Next(0); ok; i, ok = laxFDFrom.Next(i + 1) {
for j, ok := laxFDTo.Next(0); ok; j, ok = laxFDTo.Next(j + 1) {
s.addFunctionalDependency(NewFastIntSet(i), NewFastIntSet(j), false, false)
}
}
}
// Rule #3.1, filters won't produce any strict/lax FDs.
}
// Rule #4, add new FD {left key + right key} -> {all columns} if it could.
if ok1 && ok2 {
s.addFunctionalDependency(leftPK.Union(*rightPK), outerCols.Union(innerCols), true, false)
}
}

func (s FDSet) FindPrimaryKey() (*FastIntSet, bool) {
allCols := s.AllCols()
for i := 0; i < len(s.fdEdges); i++ {
fd := s.fdEdges[i]
// Since we haven't maintained the key column, let's traverse every strict FD to judge with.
if fd.strict && !fd.equiv {
closure := s.closureOfStrict(fd.from)
if allCols.SubsetOf(closure) {
pk := NewFastIntSet()
pk.CopyFrom(fd.from)
return &pk, true
}
}
}
return nil, false
}

func (s FDSet) AllCols() FastIntSet {
Expand Down
2 changes: 1 addition & 1 deletion planner/functional_dependency/fd_graph_ported_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func TestFuncDeps_ColsAreKey(t *testing.T) {
loj = *abcde
loj.MakeCartesianProduct(mnpq)
loj.AddConstants(NewFastIntSet(3))
loj.MakeOuterJoin(nil, &FDSet{}, preservedCols, nullExtendedCols)
loj.MakeOuterJoin(&FDSet{}, &FDSet{}, preservedCols, nullExtendedCols)
loj.AddEquivalence(NewFastIntSet(1), NewFastIntSet(10))

testcases := []struct {
Expand Down

0 comments on commit 6d9b0ef

Please sign in to comment.