Skip to content

Commit

Permalink
*: change flashback grammar to flashback cluster to timestamp (#37815)
Browse files Browse the repository at this point in the history
  • Loading branch information
Defined2014 committed Sep 14, 2022
1 parent 0f4c3e6 commit 13ac510
Show file tree
Hide file tree
Showing 11 changed files with 36 additions and 41 deletions.
10 changes: 5 additions & 5 deletions ddl/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func TestFlashbackCloseAndResetPDSchedule(t *testing.T) {
ts, err := tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{})
require.NoError(t, err)

tk.MustGetErrCode(fmt.Sprintf("flashback cluster as of timestamp '%s'", oracle.GetTimeFromTS(ts)), errno.ErrCancelledDDLJob)
tk.MustGetErrCode(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts)), errno.ErrCancelledDDLJob)
dom.DDL().SetHook(originHook)

finishValue, err := infosync.GetPDScheduleConfig(context.Background())
Expand Down Expand Up @@ -200,7 +200,7 @@ func TestGlobalVariablesOnFlashback(t *testing.T) {
tk.MustExec("set global tidb_gc_enable = on")
tk.MustExec("set global tidb_super_read_only = off")

tk.MustExec(fmt.Sprintf("flashback cluster as of timestamp '%s'", oracle.GetTimeFromTS(ts)))
tk.MustExec(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts)))
rs, err := tk.Exec("show variables like 'tidb_super_read_only'")
require.NoError(t, err)
require.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], variable.Off)
Expand All @@ -214,7 +214,7 @@ func TestGlobalVariablesOnFlashback(t *testing.T) {

ts, err = tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{})
require.NoError(t, err)
tk.MustExec(fmt.Sprintf("flashback cluster as of timestamp '%s'", oracle.GetTimeFromTS(ts)))
tk.MustExec(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts)))
rs, err = tk.Exec("show variables like 'tidb_super_read_only'")
require.NoError(t, err)
require.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], variable.On)
Expand Down Expand Up @@ -251,15 +251,15 @@ func TestCancelFlashbackCluster(t *testing.T) {
return job.SchemaState == model.StateWriteOnly
})
dom.DDL().SetHook(hook)
tk.MustGetErrCode(fmt.Sprintf("flashback cluster as of timestamp '%s'", oracle.GetTimeFromTS(ts)), errno.ErrCancelledDDLJob)
tk.MustGetErrCode(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts)), errno.ErrCancelledDDLJob)
hook.MustCancelDone(t)

// Try canceled on StateWriteReorganization, cancel failed
hook = newCancelJobHook(t, store, dom, func(job *model.Job) bool {
return job.SchemaState == model.StateWriteReorganization
})
dom.DDL().SetHook(hook)
tk.MustExec(fmt.Sprintf("flashback cluster as of timestamp '%s'", oracle.GetTimeFromTS(ts)))
tk.MustExec(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts)))
hook.MustCancelFailed(t)

dom.DDL().SetHook(originHook)
Expand Down
2 changes: 1 addition & 1 deletion executor/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ func (e *DDLExec) executeFlashBackCluster(ctx context.Context, s *ast.FlashBackC
return errors.Errorf("not support flash back cluster with TiFlash stores")
}

flashbackTS, err := staleread.CalculateAsOfTsExpr(e.ctx, &s.AsOf)
flashbackTS, err := staleread.CalculateAsOfTsExpr(e.ctx, s.FlashbackTS)
if err != nil {
return err
}
Expand Down
20 changes: 10 additions & 10 deletions executor/recover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ func TestRecoverClusterMeetError(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

tk.MustContainErrMsg(fmt.Sprintf("flashback cluster as of timestamp '%s'", time.Now().Add(30*time.Second)), "Not support flashback cluster in non-TiKV env")
tk.MustContainErrMsg(fmt.Sprintf("flashback cluster to timestamp '%s'", time.Now().Add(30*time.Second)), "Not support flashback cluster in non-TiKV env")

ts, _ := tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{})
flashbackTs := oracle.GetTimeFromTS(ts)
Expand All @@ -311,8 +311,8 @@ func TestRecoverClusterMeetError(t *testing.T) {
fmt.Sprintf("return(%v)", injectSafeTS)))

// Get GC safe point error.
tk.MustContainErrMsg(fmt.Sprintf("flashback cluster as of timestamp '%s'", time.Now().Add(30*time.Second)), "cannot set flashback timestamp to future time")
tk.MustContainErrMsg(fmt.Sprintf("flashback cluster as of timestamp '%s'", time.Now().Add(0-30*time.Second)), "can not get 'tikv_gc_safe_point'")
tk.MustContainErrMsg(fmt.Sprintf("flashback cluster to timestamp '%s'", time.Now().Add(30*time.Second)), "cannot set flashback timestamp to future time")
tk.MustContainErrMsg(fmt.Sprintf("flashback cluster to timestamp '%s'", time.Now().Add(0-30*time.Second)), "can not get 'tikv_gc_safe_point'")

timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk)
defer resetGC()
Expand All @@ -321,19 +321,19 @@ func TestRecoverClusterMeetError(t *testing.T) {
tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop))

// out of GC safe point range.
tk.MustGetErrCode(fmt.Sprintf("flashback cluster as of timestamp '%s'", time.Now().Add(0-60*60*60*time.Second)), int(variable.ErrSnapshotTooOld.Code()))
tk.MustGetErrCode(fmt.Sprintf("flashback cluster to timestamp '%s'", time.Now().Add(0-60*60*60*time.Second)), int(variable.ErrSnapshotTooOld.Code()))

// Flashback without super privilege.
tk.MustExec("CREATE USER 'testflashback'@'localhost';")
newTk := testkit.NewTestKit(t, store)
require.NoError(t, newTk.Session().Auth(&auth.UserIdentity{Username: "testflashback", Hostname: "localhost"}, nil, nil))
newTk.MustGetErrCode(fmt.Sprintf("flashback cluster as of timestamp '%s'", time.Now().Add(0-30*time.Second)), int(core.ErrSpecificAccessDenied.Code()))
newTk.MustGetErrCode(fmt.Sprintf("flashback cluster to timestamp '%s'", time.Now().Add(0-30*time.Second)), int(core.ErrSpecificAccessDenied.Code()))
tk.MustExec("drop user 'testflashback'@'localhost';")

// Flashback failed because of ddl history.
tk.MustExec("use test;")
tk.MustExec("create table t(a int);")
tk.MustContainErrMsg(fmt.Sprintf("flashback cluster as of timestamp '%s'", flashbackTs), "schema version not same, have done ddl during [flashbackTS, now)")
tk.MustContainErrMsg(fmt.Sprintf("flashback cluster to timestamp '%s'", flashbackTs), "schema version not same, have done ddl during [flashbackTS, now)")

require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS"))
require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS"))
Expand All @@ -356,7 +356,7 @@ func TestRecoverClusterWithTiFlash(t *testing.T) {
// Set GC safe point
tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop))

tk.MustContainErrMsg(fmt.Sprintf("flashback cluster as of timestamp '%s'", time.Now().Add(0-30*time.Second)),
tk.MustContainErrMsg(fmt.Sprintf("flashback cluster to timestamp '%s'", time.Now().Add(0-30*time.Second)),
"not support flash back cluster with TiFlash stores")

require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS"))
Expand Down Expand Up @@ -387,20 +387,20 @@ func TestFlashbackWithSafeTs(t *testing.T) {
}{
{
name: "5 seconds ago to now, safeTS 5 secs ago",
sql: fmt.Sprintf("flashback cluster as of timestamp '%s'", flashbackTs),
sql: fmt.Sprintf("flashback cluster to timestamp '%s'", flashbackTs),
injectSafeTS: oracle.GoTimeToTS(flashbackTs),
compareWithSafeTS: 0,
},
{
name: "10 seconds ago to now, safeTS 5 secs ago",
// Add flashbackTs.Add(-500*time.Millisecond) to avoid flashback time range overlapped.
sql: fmt.Sprintf("flashback cluster as of timestamp '%s'", flashbackTs.Add(-500*time.Millisecond)),
sql: fmt.Sprintf("flashback cluster to timestamp '%s'", flashbackTs.Add(-500*time.Millisecond)),
injectSafeTS: oracle.GoTimeToTS(flashbackTs.Add(10 * time.Second)),
compareWithSafeTS: -1,
},
{
name: "5 seconds ago to now, safeTS 10 secs ago",
sql: fmt.Sprintf("flashback cluster as of timestamp '%s'", flashbackTs),
sql: fmt.Sprintf("flashback cluster to timestamp '%s'", flashbackTs),
injectSafeTS: oracle.GoTimeToTS(flashbackTs.Add(-10 * time.Second)),
compareWithSafeTS: 1,
},
Expand Down
13 changes: 6 additions & 7 deletions parser/ast/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -4006,14 +4006,14 @@ func (n *RecoverTableStmt) Accept(v Visitor) (Node, bool) {
type FlashBackClusterStmt struct {
ddlNode

AsOf AsOfClause
FlashbackTS ExprNode
}

// Restore implements Node interface
func (n *FlashBackClusterStmt) Restore(ctx *format.RestoreCtx) error {
ctx.WriteKeyWord("FLASHBACK CLUSTER ")
if err := n.AsOf.Restore(ctx); err != nil {
return errors.Annotate(err, "An error occurred while splicing FlashBackClusterStmt.Asof")
ctx.WriteKeyWord("FLASHBACK CLUSTER TO TIMESTAMP ")
if err := n.FlashbackTS.Restore(ctx); err != nil {
return errors.Annotate(err, "An error occurred while splicing FlashBackClusterStmt.FlashbackTS")
}
return nil
}
Expand All @@ -4024,13 +4024,12 @@ func (n *FlashBackClusterStmt) Accept(v Visitor) (Node, bool) {
if skipChildren {
return v.Leave(newNode)
}

n = newNode.(*FlashBackClusterStmt)
node, ok := n.AsOf.Accept(v)
node, ok := n.FlashbackTS.Accept(v)
if !ok {
return n, false
}
n.AsOf = *node.(*AsOfClause)
n.FlashbackTS = node.(ExprNode)
return v.Leave(n)
}

Expand Down
10 changes: 4 additions & 6 deletions parser/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -1479,7 +1479,7 @@ var (
57376: 646, // character (658x)
57473: 647, // match (650x)
57437: 648, // index (646x)
57542: 649, // to (568x)
57542: 649, // to (569x)
57360: 650, // all (554x)
46: 651, // '.' (549x)
57362: 652, // analyze (533x)
Expand All @@ -1502,7 +1502,7 @@ var (
57373: 669, // cascade (484x)
57503: 670, // read (484x)
57513: 671, // restrict (484x)
57347: 672, // asof (483x)
57347: 672, // asof (482x)
57383: 673, // create (480x)
57422: 674, // foreign (480x)
57424: 675, // fulltext (480x)
Expand Down Expand Up @@ -11246,7 +11246,7 @@ var (
{2382, 2382},
{563: 6766, 659: 6767, 1134: 6775},
{2383, 2383},
{672: 6782},
{649: 6782},
// 4245
{2: 3129, 2961, 2996, 2841, 2877, 2998, 2768, 10: 2814, 2769, 2900, 3015, 3008, 3365, 3360, 2880, 3164, 2882, 2856, 2800, 2803, 2792, 2825, 2884, 2885, 2992, 2879, 3016, 3121, 3120, 2767, 2878, 2881, 2892, 2832, 2836, 2888, 3001, 2847, 2928, 2765, 2766, 2927, 3000, 2764, 3013, 2973, 50: 3084, 2846, 2849, 3067, 3064, 3056, 3068, 3071, 3072, 3069, 3073, 3074, 3070, 3063, 3075, 3058, 3059, 3062, 3065, 3066, 3076, 3368, 2914, 2850, 3043, 3042, 3044, 3039, 3038, 3045, 3040, 3041, 2842, 2958, 3028, 3092, 3026, 3093, 3133, 3027, 2854, 2922, 3216, 3220, 3208, 3219, 3221, 3211, 3217, 3218, 3222, 3215, 2783, 2917, 3369, 3362, 3358, 2777, 3381, 3025, 3014, 2812, 3364, 3379, 3380, 3378, 3374, 3017, 3018, 3019, 3020, 3021, 3022, 3024, 3370, 2855, 2851, 2943, 2947, 2948, 2949, 2950, 2938, 2967, 3010, 2969, 2827, 2785, 2968, 2939, 3089, 2919, 2959, 2822, 2875, 3034, 2896, 2786, 2791, 2802, 2817, 3357, 2826, 3029, 2899, 2844, 2941, 2858, 2866, 2772, 2918, 2801, 2821, 3196, 2831, 3078, 3168, 2955, 2864, 3372, 2894, 3166, 2835, 2843, 2865, 3079, 2776, 2794, 3361, 2815, 2807, 2893, 2828, 3032, 3048, 2976, 3085, 3086, 3050, 2913, 3087, 3006, 3163, 3114, 3046, 2845, 2946, 3367, 3366, 3004, 2903, 2761, 2787, 2908, 2798, 2799, 2910, 2806, 2816, 2819, 3057, 2869, 2971, 3165, 2937, 2906, 2966, 3009, 2895, 3031, 3116, 2853, 3126, 3127, 3005, 3095, 3054, 3096, 2915, 2977, 2775, 3144, 3097, 3100, 2781, 3080, 3101, 3377, 2788, 2979, 3146, 3103, 2975, 2796, 3105, 2988, 3012, 2999, 2797, 3150, 3107, 3136, 3007, 2810, 3037, 3203, 3363, 2820, 2823, 2989, 3035, 3155, 3030, 3156, 2983, 3109, 3108, 3033, 3090, 2920, 3382, 3110, 3111, 2924, 2981, 3112, 3088, 2839, 2840, 2954, 3060, 2956, 3169, 3113, 3002, 3003, 2944, 2848, 2985, 3117, 2763, 3178, 2984, 3185, 3186, 3187, 3188, 3190, 3189, 3191, 3192, 3193, 3128, 2861, 2986, 3213, 3212, 2867, 2758, 2759, 3036, 3053, 2770, 3055, 3081, 2762, 2773, 2774, 3098, 3099, 2778, 2965, 2779, 2780, 2952, 3091, 3373, 3102, 2897, 2784, 2789, 2790, 3104, 3106, 2909, 3151, 2911, 2804, 2805, 2921, 2809, 2972, 3197, 2811, 2982, 2916, 2890, 3123, 2990, 3011, 2974, 2905, 3157, 2960, 2978, 3023, 2902, 2991, 2883, 3047, 2886, 2887, 3383, 2923, 2830, 2852, 3130, 3198, 2833, 2994, 2997, 3049, 3083, 3131, 3094, 2933, 2934, 2940, 3161, 3134, 3162, 3135, 3061, 3137, 2964, 2901, 3115, 2995, 2953, 3122, 3119, 3118, 3170, 2980, 3082, 2993, 3182, 3125, 2962, 2857, 3206, 3194, 2862, 2891, 2898, 2963, 3132, 2970, 3386, 2872, 3139, 3140, 3359, 3141, 3142, 3143, 3199, 3145, 3147, 3148, 3149, 2808, 2957, 3200, 2926, 3152, 2813, 3207, 3387, 3154, 3392, 3391, 3384, 3209, 3210, 3159, 3158, 2829, 3160, 3167, 2932, 2837, 2838, 3077, 2951, 3375, 3376, 3385, 2945, 2873, 2987, 2904, 2907, 3201, 3174, 3175, 3176, 3177, 3202, 3388, 3172, 3173, 2925, 3124, 3389, 3390, 3195, 3179, 3180, 3181, 3214, 3371, 661: 3914, 2756, 2757, 2755, 736: 6778},
{2386, 2386, 649: 6780, 1217: 6779},
Expand Down Expand Up @@ -12531,9 +12531,7 @@ yynewstate:
case 144:
{
parser.yyVAL.statement = &ast.FlashBackClusterStmt{
AsOf: ast.AsOfClause{
TsExpr: ast.NewValueExpr(yyS[yypt-0].ident, "", ""),
},
FlashbackTS: ast.NewValueExpr(yyS[yypt-0].ident, "", ""),
}
}
case 145:
Expand Down
6 changes: 2 additions & 4 deletions parser/parser.y
Original file line number Diff line number Diff line change
Expand Up @@ -2573,12 +2573,10 @@ RecoverTableStmt:
*
*******************************************************************/
FlashbackClusterStmt:
"FLASHBACK" "CLUSTER" asof "TIMESTAMP" stringLit
"FLASHBACK" "CLUSTER" "TO" "TIMESTAMP" stringLit
{
$$ = &ast.FlashBackClusterStmt{
AsOf: ast.AsOfClause{
TsExpr: ast.NewValueExpr($5, "", ""),
},
FlashbackTS: ast.NewValueExpr($5, "", ""),
}
}

Expand Down
6 changes: 3 additions & 3 deletions parser/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3236,9 +3236,9 @@ func TestDDL(t *testing.T) {
{"flashback table t TO t1", true, "FLASHBACK TABLE `t` TO `t1`"},

// for flashback cluster
{"flashback cluster as of timestamp '2021-05-26 16:45:26'", true, "FLASHBACK CLUSTER AS OF TIMESTAMP '2021-05-26 16:45:26'"},
{"flashback cluster as of timestamp TIDB_BOUNDED_STALENESS(DATE_SUB(NOW(), INTERVAL 3 SECOND), NOW())", false, ""},
{"flashback cluster as of timestamp DATE_SUB(NOW(), INTERVAL 3 SECOND)", false, ""},
{"flashback cluster to timestamp '2021-05-26 16:45:26'", true, "FLASHBACK CLUSTER TO TIMESTAMP '2021-05-26 16:45:26'"},
{"flashback cluster to timestamp TIDB_BOUNDED_STALENESS(DATE_SUB(NOW(), INTERVAL 3 SECOND), NOW())", false, ""},
{"flashback cluster to timestamp DATE_SUB(NOW(), INTERVAL 3 SECOND)", false, ""},

// for remove partitioning
{"alter table t remove partitioning", true, "ALTER TABLE `t` REMOVE PARTITIONING"},
Expand Down
2 changes: 1 addition & 1 deletion planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3213,7 +3213,7 @@ func (b *PlanBuilder) buildSimple(ctx context.Context, node ast.StmtNode) (Plan,
case *ast.BeginStmt:
readTS := b.ctx.GetSessionVars().TxnReadTS.PeakTxnReadTS()
if raw.AsOf != nil {
startTS, err := staleread.CalculateAsOfTsExpr(b.ctx, raw.AsOf)
startTS, err := staleread.CalculateAsOfTsExpr(b.ctx, raw.AsOf.TsExpr)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion sessiontxn/staleread/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func parseAndValidateAsOf(sctx sessionctx.Context, asOf *ast.AsOfClause) (uint64
return 0, nil
}

ts, err := CalculateAsOfTsExpr(sctx, asOf)
ts, err := CalculateAsOfTsExpr(sctx, asOf.TsExpr)
if err != nil {
return 0, err
}
Expand Down
4 changes: 2 additions & 2 deletions sessiontxn/staleread/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import (
)

// CalculateAsOfTsExpr calculates the TsExpr of AsOfClause to get a StartTS.
func CalculateAsOfTsExpr(sctx sessionctx.Context, asOfClause *ast.AsOfClause) (uint64, error) {
tsVal, err := expression.EvalAstExpr(sctx, asOfClause.TsExpr)
func CalculateAsOfTsExpr(sctx sessionctx.Context, tsExpr ast.ExprNode) (uint64, error) {
tsVal, err := expression.EvalAstExpr(sctx, tsExpr)
if err != nil {
return 0, err
}
Expand Down
2 changes: 1 addition & 1 deletion tests/realtikvtest/brietest/flashback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestFlashback(t *testing.T) {
fmt.Sprintf("return(%v)", injectSafeTS)))

tk.MustExec("insert t values (4), (5), (6)")
tk.MustExec(fmt.Sprintf("flashback cluster as of timestamp '%s'", oracle.GetTimeFromTS(ts)))
tk.MustExec(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts)))

tk.MustExec("admin check table t")
require.Equal(t, tk.MustQuery("select max(a) from t").Rows()[0][0], "3")
Expand Down

0 comments on commit 13ac510

Please sign in to comment.