Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: add 'endpoint hashkv' command #8351

Merged
merged 5 commits into from
Aug 7, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions Documentation/op-guide/v2-migration.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ Migrating an application from the API v2 to the API v3 involves two steps: 1) mi

## Migrate client library

API v3 is different from API v2, thus application developers need to use a new client library to send requests to etcd API v3. The documentation of the client v3 is available at https://godoc.org/github.com/coreos/etcd/clientv3.
API v3 is different from API v2, thus application developers need to use a new client library to send requests to etcd API v3. The documentation of the client v3 is available at https://godoc.org/github.com/coreos/etcd/clientv3.

There are some notable differences between API v2 and API v3:

Expand Down Expand Up @@ -38,13 +38,17 @@ Second, migrate the v2 keys into v3 with the [migrate][migrate_command] (`ETCDCT

Restart the etcd members and everything should just work.

For etcd v3.3+, run `ETCDCTL_API=3 etcdctl endpoint hashkv --cluster` to ensure key-value stores are consistent post migration.

**Warn**: When v2 store has expiring TTL keys and migrate command intends to preserve TTLs, migration may be inconsistent with the last committed v2 state when run on any member with a raft index less than the last leader's raft index.

### Online migration

If the application cannot tolerate any downtime, then it must migrate online. The implementation of online migration will vary from application to application but the overall idea is the same.

First, write application code using the v3 API. The application must support two modes: a migration mode and a normal mode. The application starts in migration mode. When running in migration mode, the application reads keys using the v3 API first, and, if it cannot find the key, it retries with the API v2. In normal mode, the application only reads keys using the v3 API. The application writes keys over the API v3 in both modes. To acknowledge a switch from migration mode to normal mode, the application watches on a switch mode key. When switch key’s value turns to `true`, the application switches over from migration mode to normal mode.

Second, start a background job to migrate data from the store v2 to the mvcc store by reading keys from the API v2 and writing keys to the API v3.
Second, start a background job to migrate data from the store v2 to the mvcc store by reading keys from the API v2 and writing keys to the API v3.

After finishing data migration, the background job writes `true` into the switch mode key to notify the application that it may switch modes.

Expand Down
33 changes: 33 additions & 0 deletions clientv3/integration/maintenance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,39 @@ import (
"github.com/coreos/etcd/pkg/testutil"
)

func TestMaintenanceHashKV(t *testing.T) {
defer testutil.AfterTest(t)

clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)

for i := 0; i < 3; i++ {
if _, err := clus.RandClient().Put(context.Background(), "foo", "bar"); err != nil {
t.Fatal(err)
}
}

var hv uint32
for i := 0; i < 3; i++ {
cli := clus.Client(i)
// ensure writes are replicated
if _, err := cli.Get(context.TODO(), "foo"); err != nil {
t.Fatal(err)
}
hresp, err := cli.HashKV(context.Background(), clus.Members[i].GRPCAddr(), 0)
if err != nil {
t.Fatal(err)
}
if hv == 0 {
hv = hresp.Hash
continue
}
if hv != hresp.Hash {
t.Fatalf("#%d: hash expected %d, got %d", i, hv, hresp.Hash)
}
}
}

func TestMaintenanceMoveLeader(t *testing.T) {
defer testutil.AfterTest(t)

Expand Down
19 changes: 19 additions & 0 deletions clientv3/maintenance.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type (
AlarmResponse pb.AlarmResponse
AlarmMember pb.AlarmMember
StatusResponse pb.StatusResponse
HashKVResponse pb.HashKVResponse
MoveLeaderResponse pb.MoveLeaderResponse
)

Expand All @@ -50,6 +51,11 @@ type Maintenance interface {
// Status gets the status of the endpoint.
Status(ctx context.Context, endpoint string) (*StatusResponse, error)

// HashKV returns a hash of the KV state at the time of the RPC.
// If revision is zero, the hash is computed on all keys. If the revision
// is non-zero, the hash is computed on all keys at or below the given revision.
HashKV(ctx context.Context, endpoint string, rev int64) (*HashKVResponse, error)

// Snapshot provides a reader for a snapshot of a backend.
Snapshot(ctx context.Context) (io.ReadCloser, error)

Expand Down Expand Up @@ -159,6 +165,19 @@ func (m *maintenance) Status(ctx context.Context, endpoint string) (*StatusRespo
return (*StatusResponse)(resp), nil
}

func (m *maintenance) HashKV(ctx context.Context, endpoint string, rev int64) (*HashKVResponse, error) {
remote, cancel, err := m.dial(endpoint)
if err != nil {
return nil, toErr(ctx, err)
}
defer cancel()
resp, err := remote.HashKV(ctx, &pb.HashKVRequest{Revision: rev}, grpc.FailFast(false))
if err != nil {
return nil, toErr(ctx, err)
}
return (*HashKVResponse)(resp), nil
}

func (m *maintenance) Snapshot(ctx context.Context) (io.ReadCloser, error) {
ss, err := m.remote.Snapshot(ctx, &pb.SnapshotRequest{}, grpc.FailFast(false))
if err != nil {
Expand Down
38 changes: 38 additions & 0 deletions e2e/ctl_v3_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,18 @@
package e2e

import (
"context"
"fmt"
"net/url"
"testing"
"time"

"github.com/coreos/etcd/clientv3"
)

func TestCtlV3EndpointHealth(t *testing.T) { testCtl(t, endpointHealthTest, withQuorum()) }
func TestCtlV3EndpointStatus(t *testing.T) { testCtl(t, endpointStatusTest, withQuorum()) }
func TestCtlV3EndpointHashKV(t *testing.T) { testCtl(t, endpointHashKVTest, withQuorum()) }

func endpointHealthTest(cx ctlCtx) {
if err := ctlV3EndpointHealth(cx); err != nil {
Expand Down Expand Up @@ -52,3 +58,35 @@ func ctlV3EndpointStatus(cx ctlCtx) error {
}
return spawnWithExpects(cmdArgs, eps...)
}

func endpointHashKVTest(cx ctlCtx) {
if err := ctlV3EndpointHashKV(cx); err != nil {
cx.t.Fatalf("endpointHashKVTest ctlV3EndpointHashKV error (%v)", err)
}
}

func ctlV3EndpointHashKV(cx ctlCtx) error {
eps := cx.epc.EndpointsV3()

// get latest hash to compare
cli, err := clientv3.New(clientv3.Config{
Endpoints: eps,
DialTimeout: 3 * time.Second,
})
if err != nil {
cx.t.Fatal(err)
}
defer cli.Close()
hresp, err := cli.HashKV(context.TODO(), eps[0], 0)
if err != nil {
cx.t.Fatal(err)
}

cmdArgs := append(cx.PrefixArgs(), "endpoint", "hashkv")
var ss []string
for _, ep := range cx.epc.EndpointsV3() {
u, _ := url.Parse(ep)
ss = append(ss, fmt.Sprintf("%s, %d", u.Host, hresp.Hash))
}
return spawnWithExpects(cmdArgs, ss...)
}
43 changes: 43 additions & 0 deletions etcdctl/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,49 @@ Get the status for all endpoints in the cluster associated with the default endp
+------------------------+------------------+----------------+---------+-----------+-----------+------------+
```

### ENDPOINT HASHKV

ENDPOINT HASHKV fetches the hash of the key-value store of an endpoint.

#### Output

##### Simple format

Prints a humanized table of each endpoint URL and KV history hash.

##### JSON format

Prints a line of JSON encoding each endpoint URL and KV history hash.

#### Examples

Get the hash for the default endpoint:

```bash
./etcdctl endpoint hashkv
# 127.0.0.1:2379, 1084519789
```

Get the status for the default endpoint as JSON:

```bash
./etcdctl -w json endpoint hashkv
# [{"Endpoint":"127.0.0.1:2379","Hash":{"header":{"cluster_id":14841639068965178418,"member_id":10276657743932975437,"revision":1,"raft_term":3},"hash":1084519789,"compact_revision":-1}}]
```

Get the status for all endpoints in the cluster associated with the default endpoint:

```bash
./etcdctl -w table endpoint --cluster hashkv
+------------------------+------------+
| ENDPOINT | HASH |
+------------------------+------------+
| http://127.0.0.1:12379 | 1084519789 |
| http://127.0.0.1:22379 | 1084519789 |
| http://127.0.0.1:32379 | 1084519789 |
+------------------------+------------+
```

### ALARM \<subcommand\>

Provides alarm related commands
Expand Down
41 changes: 41 additions & 0 deletions etcdctl/ctlv3/command/ep_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
)

var epClusterEndpoints bool
var epHashKVRev int64

// NewEndpointCommand returns the cobra command for "endpoint".
func NewEndpointCommand() *cobra.Command {
Expand All @@ -39,6 +40,7 @@ func NewEndpointCommand() *cobra.Command {
ec.PersistentFlags().BoolVar(&epClusterEndpoints, "cluster", false, "use all endpoints from the cluster member list")
ec.AddCommand(newEpHealthCommand())
ec.AddCommand(newEpStatusCommand())
ec.AddCommand(newEpHashKVCommand())

return ec
}
Expand All @@ -64,6 +66,16 @@ The items in the lists are endpoint, ID, version, db size, is leader, raft term,
}
}

func newEpHashKVCommand() *cobra.Command {
hc := &cobra.Command{
Use: "hashkv",
Short: "Prints the KV history hash for each endpoint in --endpoints",
Run: epHashKVCommandFunc,
}
hc.PersistentFlags().Int64Var(&epHashKVRev, "rev", 0, "maximum revision to hash (default: all revisions)")
return hc
}

// epHealthCommandFunc executes the "endpoint-health" command.
func epHealthCommandFunc(cmd *cobra.Command, args []string) {
flags.SetPflagsFromEnv("ETCDCTL", cmd.InheritedFlags())
Expand Down Expand Up @@ -151,6 +163,35 @@ func epStatusCommandFunc(cmd *cobra.Command, args []string) {
}
}

type epHashKV struct {
Ep string `json:"Endpoint"`
Resp *v3.HashKVResponse `json:"HashKV"`
}

func epHashKVCommandFunc(cmd *cobra.Command, args []string) {
c := mustClientFromCmd(cmd)

hashList := []epHashKV{}
var err error
for _, ep := range endpointsFromCluster(cmd) {
ctx, cancel := commandCtx(cmd)
resp, serr := c.HashKV(ctx, ep, epHashKVRev)
cancel()
if serr != nil {
err = serr
fmt.Fprintf(os.Stderr, "Failed to get the hash of endpoint %s (%v)\n", ep, serr)
continue
}
hashList = append(hashList, epHashKV{Ep: ep, Resp: resp})
}

display.EndpointHashKV(hashList)

if err != nil {
ExitWithError(ExitError, err)
}
}

func endpointsFromCluster(cmd *cobra.Command) []string {
if !epClusterEndpoints {
endpoints, err := cmd.Flags().GetStringSlice("endpoints")
Expand Down
13 changes: 13 additions & 0 deletions etcdctl/ctlv3/command/printer.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type printer interface {
MemberList(v3.MemberListResponse)

EndpointStatus([]epStatus)
EndpointHashKV([]epHashKV)
MoveLeader(leader, target uint64, r v3.MoveLeaderResponse)

Alarm(v3.AlarmResponse)
Expand Down Expand Up @@ -146,6 +147,7 @@ func newPrinterUnsupported(n string) printer {
}

func (p *printerUnsupported) EndpointStatus([]epStatus) { p.p(nil) }
func (p *printerUnsupported) EndpointHashKV([]epHashKV) { p.p(nil) }
func (p *printerUnsupported) DBStatus(dbstatus) { p.p(nil) }

func (p *printerUnsupported) MoveLeader(leader, target uint64, r v3.MoveLeaderResponse) { p.p(nil) }
Expand Down Expand Up @@ -184,6 +186,17 @@ func makeEndpointStatusTable(statusList []epStatus) (hdr []string, rows [][]stri
return
}

func makeEndpointHashKVTable(hashList []epHashKV) (hdr []string, rows [][]string) {
hdr = []string{"endpoint", "hash"}
for _, h := range hashList {
rows = append(rows, []string{
h.Ep,
fmt.Sprint(h.Resp.Hash),
})
}
return
}

func makeDBStatusTable(ds dbstatus) (hdr []string, rows [][]string) {
hdr = []string{"hash", "revision", "total keys", "total size"}
rows = append(rows, []string{
Expand Down
9 changes: 9 additions & 0 deletions etcdctl/ctlv3/command/printer_fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,15 @@ func (p *fieldsPrinter) EndpointStatus(eps []epStatus) {
}
}

func (p *fieldsPrinter) EndpointHashKV(hs []epHashKV) {
for _, h := range hs {
p.hdr(h.Resp.Header)
fmt.Printf("\"Endpoint\" : %q\n", h.Ep)
fmt.Println(`"Hash" :`, h.Resp.Hash)
fmt.Println()
}
}

func (p *fieldsPrinter) Alarm(r v3.AlarmResponse) {
p.hdr(r.Header)
for _, a := range r.Alarms {
Expand Down
1 change: 1 addition & 0 deletions etcdctl/ctlv3/command/printer_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func newJSONPrinter() printer {
}

func (p *jsonPrinter) EndpointStatus(r []epStatus) { printJSON(r) }
func (p *jsonPrinter) EndpointHashKV(r []epHashKV) { printJSON(r) }
func (p *jsonPrinter) DBStatus(r dbstatus) { printJSON(r) }

func printJSON(v interface{}) {
Expand Down
7 changes: 7 additions & 0 deletions etcdctl/ctlv3/command/printer_simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,13 @@ func (s *simplePrinter) EndpointStatus(statusList []epStatus) {
}
}

func (s *simplePrinter) EndpointHashKV(hashList []epHashKV) {
_, rows := makeEndpointHashKVTable(hashList)
for _, row := range rows {
fmt.Println(strings.Join(row, ", "))
}
}

func (s *simplePrinter) DBStatus(ds dbstatus) {
_, rows := makeDBStatusTable(ds)
for _, row := range rows {
Expand Down
10 changes: 10 additions & 0 deletions etcdctl/ctlv3/command/printer_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,16 @@ func (tp *tablePrinter) EndpointStatus(r []epStatus) {
table.SetAlignment(tablewriter.ALIGN_RIGHT)
table.Render()
}
func (tp *tablePrinter) EndpointHashKV(r []epHashKV) {
hdr, rows := makeEndpointHashKVTable(r)
table := tablewriter.NewWriter(os.Stdout)
table.SetHeader(hdr)
for _, row := range rows {
table.Append(row)
}
table.SetAlignment(tablewriter.ALIGN_RIGHT)
table.Render()
}
func (tp *tablePrinter) DBStatus(r dbstatus) {
hdr, rows := makeDBStatusTable(r)
table := tablewriter.NewWriter(os.Stdout)
Expand Down