From 6c45182b649a12594bc8e40f4dfbd2b3020dc1b5 Mon Sep 17 00:00:00 2001 From: xieenlong Date: Fri, 29 Sep 2017 16:59:23 +0800 Subject: [PATCH] client: add AutoCloseable --- .../main/java/com/coreos/jetcd/Client.java | 3 +- .../src/main/java/com/coreos/jetcd/Watch.java | 3 +- .../jetcd/internal/impl/CloseableClient.java | 4 +- .../jetcd/internal/impl/LoadBalancerTest.java | 40 +- .../coreos/jetcd/internal/impl/WatchTest.java | 10 +- .../jetcd/internal/impl/WatchUnitTest.java | 385 +++++++++--------- jetcd-examples/jetcd-simple-ctl/pom.xml | 2 +- .../coreos/jetcd/examples/jetcdctl/Main.java | 8 +- jetcd-examples/jetcd-watch-example/pom.xml | 2 +- .../com/coreos/jetcd/examples/watch/Main.java | 25 +- 10 files changed, 220 insertions(+), 262 deletions(-) diff --git a/jetcd-core/src/main/java/com/coreos/jetcd/Client.java b/jetcd-core/src/main/java/com/coreos/jetcd/Client.java index 0b523f27f..a9242f7a3 100644 --- a/jetcd-core/src/main/java/com/coreos/jetcd/Client.java +++ b/jetcd-core/src/main/java/com/coreos/jetcd/Client.java @@ -22,7 +22,7 @@ *

The implementation may throw unchecked ConnectException or AuthFailedException on * initialization (or when invoking *Client methods if configured to initialize lazily). */ -public interface Client { +public interface Client extends AutoCloseable { Auth getAuthClient(); @@ -36,6 +36,7 @@ public interface Client { Watch getWatchClient(); + @Override void close(); static ClientBuilder builder() { diff --git a/jetcd-core/src/main/java/com/coreos/jetcd/Watch.java b/jetcd-core/src/main/java/com/coreos/jetcd/Watch.java index f0d19ec38..41664c6f1 100644 --- a/jetcd-core/src/main/java/com/coreos/jetcd/Watch.java +++ b/jetcd-core/src/main/java/com/coreos/jetcd/Watch.java @@ -48,11 +48,12 @@ public interface Watch extends CloseableClient { /** * Interface of Watcher. */ - interface Watcher { + interface Watcher extends AutoCloseable { /** * closes this watcher and all its resources. **/ + @Override void close(); /** diff --git a/jetcd-core/src/main/java/com/coreos/jetcd/internal/impl/CloseableClient.java b/jetcd-core/src/main/java/com/coreos/jetcd/internal/impl/CloseableClient.java index 40aed0ffb..2f581f76a 100644 --- a/jetcd-core/src/main/java/com/coreos/jetcd/internal/impl/CloseableClient.java +++ b/jetcd-core/src/main/java/com/coreos/jetcd/internal/impl/CloseableClient.java @@ -16,10 +16,12 @@ package com.coreos.jetcd.internal.impl; -public interface CloseableClient { +public interface CloseableClient extends AutoCloseable { + /** * close the client and release its resources. */ + @Override default void close() { // noop } diff --git a/jetcd-core/src/test/java/com/coreos/jetcd/internal/impl/LoadBalancerTest.java b/jetcd-core/src/test/java/com/coreos/jetcd/internal/impl/LoadBalancerTest.java index e79cbdd8d..a03afc258 100644 --- a/jetcd-core/src/test/java/com/coreos/jetcd/internal/impl/LoadBalancerTest.java +++ b/jetcd-core/src/test/java/com/coreos/jetcd/internal/impl/LoadBalancerTest.java @@ -35,17 +35,15 @@ public class LoadBalancerTest { @Test public void testPickFirstBalancerFactory() throws Exception { - Client client = Client.builder() - .endpoints(TestConstants.endpoints) - .loadBalancerFactory(PickFirstBalancerFactory.getInstance()) - .build(); + try (Client client = Client.builder() + .endpoints(TestConstants.endpoints) + .loadBalancerFactory(PickFirstBalancerFactory.getInstance()) + .build(); - KV kv = client.getKVClient(); + KV kv = client.getKVClient()) { + PutResponse response; + long lastMemberId = 0; - PutResponse response; - long lastMemberId = 0; - - try { for (int i = 0; i < TestConstants.endpoints.length * 2; i++) { response = kv.put(TestUtil.randomByteSequence(), TestUtil.randomByteSequence()).get(); @@ -55,26 +53,21 @@ public void testPickFirstBalancerFactory() throws Exception { assertThat(response.getHeader().getMemberId()).isEqualTo(lastMemberId); } - } finally { - kv.close(); - client.close(); } } @Test public void testRoundRobinLoadBalancerFactory() throws Exception { - Client client = Client.builder() - .endpoints(TestConstants.endpoints) - .loadBalancerFactory(RoundRobinLoadBalancerFactory.getInstance()) - .build(); - - KV kv = client.getKVClient(); - PutResponse response; - long lastMemberId = 0; - long differences = 0; + try (Client client = Client.builder() + .endpoints(TestConstants.endpoints) + .loadBalancerFactory(RoundRobinLoadBalancerFactory.getInstance()) + .build(); + KV kv = client.getKVClient()) { + PutResponse response; + long lastMemberId = 0; + long differences = 0; - try { for (int i = 0; i < TestConstants.endpoints.length; i++) { response = kv.put(TestUtil.randomByteSequence(), TestUtil.randomByteSequence()).get(); @@ -86,9 +79,6 @@ public void testRoundRobinLoadBalancerFactory() throws Exception { } assertThat(differences).isNotEqualTo(lastMemberId); - } finally { - kv.close(); - client.close(); } } } diff --git a/jetcd-core/src/test/java/com/coreos/jetcd/internal/impl/WatchTest.java b/jetcd-core/src/test/java/com/coreos/jetcd/internal/impl/WatchTest.java index c217cd5e2..cb0b51f5d 100644 --- a/jetcd-core/src/test/java/com/coreos/jetcd/internal/impl/WatchTest.java +++ b/jetcd-core/src/test/java/com/coreos/jetcd/internal/impl/WatchTest.java @@ -62,16 +62,13 @@ public void tearDown() { public void testWatchOnPut() throws ExecutionException, InterruptedException { ByteSequence key = ByteSequence.fromString(TestUtil.randomString()); ByteSequence value = ByteSequence.fromString("value"); - Watcher watcher = watchClient.watch(key); - try { + try (Watcher watcher = watchClient.watch(key)) { kvClient.put(key, value).get(); WatchResponse response = watcher.listen(); assertThat(response.getEvents().size()).isEqualTo(1); assertThat(response.getEvents().get(0).getEventType()).isEqualTo(EventType.PUT); assertThat(response.getEvents().get(0).getKeyValue().getKey()).isEqualTo(key); - } finally { - watcher.close(); } } @@ -80,16 +77,13 @@ public void testWatchOnDelete() throws ExecutionException, InterruptedException ByteSequence key = ByteSequence.fromString(TestUtil.randomString()); ByteSequence value = ByteSequence.fromString("value"); kvClient.put(key, value).get(); - Watcher watcher = watchClient.watch(key); - try { + try (Watcher watcher = watchClient.watch(key)) { kvClient.delete(key); WatchResponse response = watcher.listen(); assertThat(response.getEvents().size()).isEqualTo(1); WatchEvent event = response.getEvents().get(0); assertThat(event.getEventType()).isEqualTo(EventType.DELETE); assertThat(Arrays.equals(event.getKeyValue().getKey().getBytes(), key.getBytes())).isTrue(); - } finally { - watcher.close(); } } } diff --git a/jetcd-core/src/test/java/com/coreos/jetcd/internal/impl/WatchUnitTest.java b/jetcd-core/src/test/java/com/coreos/jetcd/internal/impl/WatchUnitTest.java index 94a48c0ef..f895349be 100644 --- a/jetcd-core/src/test/java/com/coreos/jetcd/internal/impl/WatchUnitTest.java +++ b/jetcd-core/src/test/java/com/coreos/jetcd/internal/impl/WatchUnitTest.java @@ -105,34 +105,33 @@ public void testCreateWatcherAfterClientClosed() { @Test public void testWatchOnSendingWatchCreateRequest() { - Watcher watcher = watchClient.watch(KEY, WatchOption.DEFAULT); - // expects a WatchCreateRequest is created. - verify(this.requestStreamObserverMock, timeout(100).times(1)) - .onNext(argThat(hasCreateKey(KEY))); - watcher.close(); + try (Watcher watcher = watchClient.watch(KEY, WatchOption.DEFAULT)) { + // expects a WatchCreateRequest is created. + verify(this.requestStreamObserverMock, timeout(100).times(1)) + .onNext(argThat(hasCreateKey(KEY))); + } } @Test public void testWatcherListenOnResponse() throws InterruptedException { - Watcher watcher = watchClient.watch(KEY, WatchOption.DEFAULT); - WatchResponse createdResponse = WatchResponse.newBuilder() - .setCreated(true) - .setWatchId(0) - .build(); - responseObserverRef.get().onNext(createdResponse); - - WatchResponse putResponse = WatchResponse - .newBuilder() - .setWatchId(0) - .addEvents(Event.newBuilder().setType(EventType.PUT).build()).build(); - responseObserverRef.get().onNext(putResponse); - - com.coreos.jetcd.watch.WatchResponse actualResponse = watcher.listen(); - assertThat(actualResponse.getEvents().size()).isEqualTo(1); - assertThat(actualResponse.getEvents().get(0).getEventType()) - .isEqualTo(WatchEvent.EventType.PUT); - - watcher.close(); + try (Watcher watcher = watchClient.watch(KEY, WatchOption.DEFAULT)) { + WatchResponse createdResponse = WatchResponse.newBuilder() + .setCreated(true) + .setWatchId(0) + .build(); + responseObserverRef.get().onNext(createdResponse); + + WatchResponse putResponse = WatchResponse + .newBuilder() + .setWatchId(0) + .addEvents(Event.newBuilder().setType(EventType.PUT).build()).build(); + responseObserverRef.get().onNext(putResponse); + + com.coreos.jetcd.watch.WatchResponse actualResponse = watcher.listen(); + assertThat(actualResponse.getEvents().size()).isEqualTo(1); + assertThat(actualResponse.getEvents().get(0).getEventType()) + .isEqualTo(WatchEvent.EventType.PUT); + } } @Test @@ -180,210 +179,198 @@ public void testWatcherListenOnWatchClientClose() throws InterruptedException { @Test public void testWatcherListenForMultiplePuts() throws InterruptedException { - Watcher watcher = watchClient.watch(KEY); - WatchResponse createdResponse = WatchResponse.newBuilder() - .setCreated(true) - .setWatchId(0) - .build(); - responseObserverRef.get().onNext(createdResponse); - - WatchResponse putResponse = WatchResponse - .newBuilder() - .setWatchId(0) - .addEvents(Event.newBuilder() - .setType(EventType.PUT) - .setKv(KeyValue.newBuilder() - .setModRevision(2) - .build()) - .build()) - .build(); - responseObserverRef.get().onNext(putResponse); - - com.coreos.jetcd.watch.WatchResponse actualResponse = watcher.listen(); - assertEqualOnWatchResponses(actualResponse, - new com.coreos.jetcd.watch.WatchResponse(putResponse)); - - putResponse = WatchResponse - .newBuilder() - .setWatchId(0) - .addEvents(Event.newBuilder() - .setType(EventType.PUT) - .setKv(KeyValue.newBuilder() - .setModRevision(3) - .build()) - .build()) - .build(); - responseObserverRef.get().onNext(putResponse); - - actualResponse = watcher.listen(); - assertEqualOnWatchResponses(actualResponse, - new com.coreos.jetcd.watch.WatchResponse(putResponse)); + try (Watcher watcher = watchClient.watch(KEY)) { + WatchResponse createdResponse = WatchResponse.newBuilder() + .setCreated(true) + .setWatchId(0) + .build(); + responseObserverRef.get().onNext(createdResponse); + + WatchResponse putResponse = WatchResponse + .newBuilder() + .setWatchId(0) + .addEvents(Event.newBuilder() + .setType(EventType.PUT) + .setKv(KeyValue.newBuilder() + .setModRevision(2) + .build()) + .build()) + .build(); + responseObserverRef.get().onNext(putResponse); + + com.coreos.jetcd.watch.WatchResponse actualResponse = watcher.listen(); + assertEqualOnWatchResponses(actualResponse, + new com.coreos.jetcd.watch.WatchResponse(putResponse)); + + putResponse = WatchResponse + .newBuilder() + .setWatchId(0) + .addEvents(Event.newBuilder() + .setType(EventType.PUT) + .setKv(KeyValue.newBuilder() + .setModRevision(3) + .build()) + .build()) + .build(); + responseObserverRef.get().onNext(putResponse); + + actualResponse = watcher.listen(); + assertEqualOnWatchResponses(actualResponse, + new com.coreos.jetcd.watch.WatchResponse(putResponse)); - watcher.close(); + } } @Test public void testWatcherDelete() throws InterruptedException { - Watcher watcher = watchClient.watch(KEY); - - WatchResponse createdResponse = WatchResponse - .newBuilder() - .setCreated(true) - .setWatchId(0) - .build(); - responseObserverRef.get().onNext(createdResponse); - - WatchResponse deleteResponse = WatchResponse - .newBuilder() - .setWatchId(0) - .addEvents(Event - .newBuilder() - .setType(EventType.DELETE) - .build()) - .build(); - responseObserverRef.get().onNext(deleteResponse); - - com.coreos.jetcd.watch.WatchResponse actualResponse = watcher.listen(); - assertEqualOnWatchResponses(actualResponse, - new com.coreos.jetcd.watch.WatchResponse(deleteResponse)); - watcher.close(); + try (Watcher watcher = watchClient.watch(KEY)) { + WatchResponse createdResponse = WatchResponse + .newBuilder() + .setCreated(true) + .setWatchId(0) + .build(); + responseObserverRef.get().onNext(createdResponse); + + WatchResponse deleteResponse = WatchResponse + .newBuilder() + .setWatchId(0) + .addEvents(Event + .newBuilder() + .setType(EventType.DELETE) + .build()) + .build(); + responseObserverRef.get().onNext(deleteResponse); + + com.coreos.jetcd.watch.WatchResponse actualResponse = watcher.listen(); + assertEqualOnWatchResponses(actualResponse, + new com.coreos.jetcd.watch.WatchResponse(deleteResponse)); + } } @Test public void testWatchOnUnrecoverableConnectionIssue() { - Watcher watcher = watchClient.watch(KEY, WatchOption.DEFAULT); - - WatchResponse createdResponse = WatchResponse.newBuilder().setCreated(true).setWatchId(0) - .build(); - responseObserverRef.get().onNext(createdResponse); - - // connection error causes client to release all resources including all watchers. - responseObserverRef.get() - .onError(Status.ABORTED.withDescription("connection error").asRuntimeException()); - // expects connection error to propagate to active listener. - - assertThatExceptionOfType(EtcdException.class) - .isThrownBy(watcher::listen) - .withMessageContaining("connection error"); - - watcher.close(); + try (Watcher watcher = watchClient.watch(KEY, WatchOption.DEFAULT)) { + WatchResponse createdResponse = WatchResponse.newBuilder().setCreated(true).setWatchId(0) + .build(); + responseObserverRef.get().onNext(createdResponse); + + // connection error causes client to release all resources including all watchers. + responseObserverRef.get() + .onError(Status.ABORTED.withDescription("connection error").asRuntimeException()); + // expects connection error to propagate to active listener. + + assertThatExceptionOfType(EtcdException.class) + .isThrownBy(watcher::listen) + .withMessageContaining("connection error"); + } } @Test public void testWatchOnRecoverableConnectionIssue() { - Watcher watcher = watchClient.watch(KEY, WatchOption.DEFAULT); - - WatchResponse createdResponse = WatchResponse.newBuilder() - .setCreated(true) - .setWatchId(0) - .build(); - responseObserverRef.get().onNext(createdResponse); - // expects a WatchCreateRequest is created. - verify(this.requestStreamObserverMock, timeout(100).times(1)) - .onNext(argThat(hasCreateKey(KEY))); - - // connection error causes client to release all resources including all watchers. - responseObserverRef.get() - .onError( - Status.UNAVAILABLE.withDescription("Temporary connection issue").asRuntimeException()); - // resets mock call counter. - Mockito.reset(this.requestStreamObserverMock); - - // expects re-send WatchCreateRequest. - verify(this.requestStreamObserverMock, timeout(1000).times(1)) - .onNext(argThat(hasCreateKey(KEY))); - watcher.close(); + try (Watcher watcher = watchClient.watch(KEY, WatchOption.DEFAULT)) { + WatchResponse createdResponse = WatchResponse.newBuilder() + .setCreated(true) + .setWatchId(0) + .build(); + responseObserverRef.get().onNext(createdResponse); + // expects a WatchCreateRequest is created. + verify(this.requestStreamObserverMock, timeout(100).times(1)) + .onNext(argThat(hasCreateKey(KEY))); + + // connection error causes client to release all resources including all watchers. + responseObserverRef.get() + .onError( + Status.UNAVAILABLE.withDescription("Temporary connection issue").asRuntimeException()); + // resets mock call counter. + Mockito.reset(this.requestStreamObserverMock); + + // expects re-send WatchCreateRequest. + verify(this.requestStreamObserverMock, timeout(1000).times(1)) + .onNext(argThat(hasCreateKey(KEY))); + } } @Test public void testWatcherCreateOnCompactionError() { - Watcher watcher = watchClient.watch(KEY); - - WatchResponse createdResponse = WatchResponse - .newBuilder() - .setCreated(true) - .setWatchId(0) - .build(); - responseObserverRef.get().onNext(createdResponse); - - WatchResponse compactedResponse = WatchResponse - .newBuilder() - .setCompactRevision(2) - .build(); - responseObserverRef.get().onNext(compactedResponse); - - assertThatThrownBy(watcher::listen) - .isInstanceOf(CompactedException.class) - .hasFieldOrPropertyWithValue("compactedRevision", 2L); - - watcher.close(); + try (Watcher watcher = watchClient.watch(KEY)) { + WatchResponse createdResponse = WatchResponse + .newBuilder() + .setCreated(true) + .setWatchId(0) + .build(); + responseObserverRef.get().onNext(createdResponse); + + WatchResponse compactedResponse = WatchResponse + .newBuilder() + .setCompactRevision(2) + .build(); + responseObserverRef.get().onNext(compactedResponse); + + assertThatThrownBy(watcher::listen) + .isInstanceOf(CompactedException.class) + .hasFieldOrPropertyWithValue("compactedRevision", 2L); + } } @Test public void testWatcherCreateOnCancellationWithNoReason() { - Watcher watcher = watchClient.watch(KEY); - - WatchResponse createdResponse = WatchResponse - .newBuilder() - .setCreated(true) - .setWatchId(0) - .build(); - responseObserverRef.get().onNext(createdResponse); - - WatchResponse canceledReponse = WatchResponse - .newBuilder() - .setCanceled(true) - .build(); - responseObserverRef.get().onNext(canceledReponse); - - assertThatExceptionOfType(EtcdException.class) - .isThrownBy(watcher::listen) - .withMessageContaining("etcdserver: mvcc: required revision is a future revision"); - - watcher.close(); + try (Watcher watcher = watchClient.watch(KEY)) { + WatchResponse createdResponse = WatchResponse + .newBuilder() + .setCreated(true) + .setWatchId(0) + .build(); + responseObserverRef.get().onNext(createdResponse); + + WatchResponse canceledReponse = WatchResponse + .newBuilder() + .setCanceled(true) + .build(); + responseObserverRef.get().onNext(canceledReponse); + + assertThatExceptionOfType(EtcdException.class) + .isThrownBy(watcher::listen) + .withMessageContaining("etcdserver: mvcc: required revision is a future revision"); + } } @Test public void testWatcherCreateOnCancellationWithReason() { - Watcher watcher = watchClient.watch(KEY); - - WatchResponse createdResponse = WatchResponse - .newBuilder() - .setCreated(true) - .setWatchId(0) - .build(); - responseObserverRef.get().onNext(createdResponse); - - WatchResponse canceledResponse = WatchResponse - .newBuilder() - .setCanceled(true) - .setCancelReason("bad reason") - .build(); - responseObserverRef.get().onNext(canceledResponse); - - assertThatExceptionOfType(EtcdException.class) - .isThrownBy(watcher::listen) - .withMessageContaining(canceledResponse.getCancelReason()); - - watcher.close(); + try (Watcher watcher = watchClient.watch(KEY)) { + WatchResponse createdResponse = WatchResponse + .newBuilder() + .setCreated(true) + .setWatchId(0) + .build(); + responseObserverRef.get().onNext(createdResponse); + + WatchResponse canceledResponse = WatchResponse + .newBuilder() + .setCanceled(true) + .setCancelReason("bad reason") + .build(); + responseObserverRef.get().onNext(canceledResponse); + + assertThatExceptionOfType(EtcdException.class) + .isThrownBy(watcher::listen) + .withMessageContaining(canceledResponse.getCancelReason()); + } } @Test public void testWatcherCreateOnInvalidWatchID() { - Watcher watcher = watchClient.watch(KEY); - - WatchResponse createdResponse = WatchResponse - .newBuilder() - .setCreated(true) - .setWatchId(-1) - .build(); - responseObserverRef.get().onNext(createdResponse); - - assertThatExceptionOfType(EtcdException.class) - .isThrownBy(watcher::listen) - .withMessageContaining("etcd server failed to create watch id"); - - watcher.close(); + try (Watcher watcher = watchClient.watch(KEY)) { + WatchResponse createdResponse = WatchResponse + .newBuilder() + .setCreated(true) + .setWatchId(-1) + .build(); + responseObserverRef.get().onNext(createdResponse); + + assertThatExceptionOfType(EtcdException.class) + .isThrownBy(watcher::listen) + .withMessageContaining("etcd server failed to create watch id"); + } } // return a ArgumentMatcher that checks if the captured WatchRequest has same key. diff --git a/jetcd-examples/jetcd-simple-ctl/pom.xml b/jetcd-examples/jetcd-simple-ctl/pom.xml index 6502f815e..e63b6bf6c 100644 --- a/jetcd-examples/jetcd-simple-ctl/pom.xml +++ b/jetcd-examples/jetcd-simple-ctl/pom.xml @@ -30,7 +30,7 @@ com.coreos jetcd-core - 0.0.1 + 0.1.0-SNAPSHOT diff --git a/jetcd-examples/jetcd-simple-ctl/src/main/java/com/coreos/jetcd/examples/jetcdctl/Main.java b/jetcd-examples/jetcd-simple-ctl/src/main/java/com/coreos/jetcd/examples/jetcdctl/Main.java index 6e23b8285..719bb1db3 100644 --- a/jetcd-examples/jetcd-simple-ctl/src/main/java/com/coreos/jetcd/examples/jetcdctl/Main.java +++ b/jetcd-examples/jetcd-simple-ctl/src/main/java/com/coreos/jetcd/examples/jetcdctl/Main.java @@ -52,10 +52,9 @@ public static void main(String[] args) { return; } - Client client = Client.builder() - .endpoints(main.endpoints.split(",")) - .build(); - try { + try (Client client = Client.builder() + .endpoints(main.endpoints.split(",")) + .build()) { switch (parsedCmd) { case "get": getCmd.get(client); @@ -71,6 +70,5 @@ public static void main(String[] args) { LOGGER.error(parsedCmd + " Error {}", e); System.exit(1); } - client.close(); } } \ No newline at end of file diff --git a/jetcd-examples/jetcd-watch-example/pom.xml b/jetcd-examples/jetcd-watch-example/pom.xml index 276ee3f9f..34a4d99ad 100644 --- a/jetcd-examples/jetcd-watch-example/pom.xml +++ b/jetcd-examples/jetcd-watch-example/pom.xml @@ -30,7 +30,7 @@ com.coreos jetcd-core - 0.0.1 + 0.1.0-SNAPSHOT diff --git a/jetcd-examples/jetcd-watch-example/src/main/java/com/coreos/jetcd/examples/watch/Main.java b/jetcd-examples/jetcd-watch-example/src/main/java/com/coreos/jetcd/examples/watch/Main.java index bb9ae44a3..17657ffb5 100644 --- a/jetcd-examples/jetcd-watch-example/src/main/java/com/coreos/jetcd/examples/watch/Main.java +++ b/jetcd-examples/jetcd-watch-example/src/main/java/com/coreos/jetcd/examples/watch/Main.java @@ -40,15 +40,9 @@ public static void main(String[] args) throws Exception { .build() .parse(args); - Client client = null; - Watch watch = null; - Watch.Watcher watcher = null; - - try { - client = Client.builder().endpoints(cmd.endpoints).build(); - watch = client.getWatchClient(); - watcher = watch.watch(ByteSequence.fromString(cmd.key)); - + try (Client client = Client.builder().endpoints(cmd.endpoints).build(); + Watch watch = client.getWatchClient(); + Watch.Watcher watcher = watch.watch(ByteSequence.fromString(cmd.key))) { for (int i = 0; i < cmd.maxEvents; i++) { LOGGER.info("Watching for key={}", cmd.key); WatchResponse response = watcher.listen(); @@ -66,17 +60,8 @@ public static void main(String[] args) throws Exception { } } } catch (Exception e) { - if (watcher != null) { - watcher.close(); - } - - if (watch != null) { - watch.close(); - } - - if (client != null) { - client.close(); - } + LOGGER.error("Watching Error {}", e); + System.exit(1); } }