From 09c0c8ce78670aa34d11f028510162d9f300e497 Mon Sep 17 00:00:00 2001 From: "fujian.zfj" Date: Tue, 30 Nov 2021 20:19:07 +0800 Subject: [PATCH 1/2] typo int readme[ecosystem] --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index eb20e798ea9..79b316c2987 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,7 @@ It offers a variety of features: * A variety of cross language clients, such as Java, [C/C++](https://github.com/apache/rocketmq-client-cpp), [Python](https://github.com/apache/rocketmq-client-python), [Go](https://github.com/apache/rocketmq-client-go), [Node.js](https://github.com/apache/rocketmq-client-nodejs) * Pluggable transport protocols, such as TCP, SSL, AIO * Built-in message tracing capability, also support opentracing -* Versatile big-data and streaming ecosytem integration +* Versatile big-data and streaming ecosystem integration * Message retroactivity by time or offset * Reliable FIFO and strict ordered messaging in the same queue * Efficient pull and push consumption model From 80693a26196fc65dface90d875b53c154f2b5437 Mon Sep 17 00:00:00 2001 From: zfj Date: Fri, 10 Mar 2023 11:01:59 +0800 Subject: [PATCH 2/2] fix unexpected state from slave --- .../ha/autoswitch/AutoSwitchHAClient.java | 24 +++++++++---------- .../ha/autoswitch/AutoSwitchHAConnection.java | 2 +- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java index b95d3814aac..49a59e25139 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java +++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java @@ -332,21 +332,21 @@ private void handshakeWithMaster() throws IOException { } } - private boolean reportSlaveOffset(final long offsetToReport) throws IOException { + private boolean reportSlaveOffset(HAConnectionState currentState, final long offsetToReport) throws IOException { this.transferHeaderBuffer.position(0); this.transferHeaderBuffer.limit(TRANSFER_HEADER_SIZE); - this.transferHeaderBuffer.putInt(this.currentState.ordinal()); + this.transferHeaderBuffer.putInt(currentState.ordinal()); this.transferHeaderBuffer.putLong(offsetToReport); this.transferHeaderBuffer.flip(); return this.haWriter.write(this.socketChannel, this.transferHeaderBuffer); } - private boolean reportSlaveMaxOffset() throws IOException { + private boolean reportSlaveMaxOffset(HAConnectionState currentState) throws IOException { boolean result = true; final long maxPhyOffset = this.messageStore.getMaxPhyOffset(); if (maxPhyOffset > this.currentReportedOffset) { this.currentReportedOffset = maxPhyOffset; - result = reportSlaveOffset(this.currentReportedOffset); + result = reportSlaveOffset(currentState, this.currentReportedOffset); } return result; } @@ -369,11 +369,11 @@ public boolean connectMaster() throws IOException { return this.socketChannel != null; } - private boolean transferFromMaster() throws IOException { + private boolean transferFromMaster(HAConnectionState currentState) throws IOException { boolean result; if (isTimeToReportOffset()) { LOGGER.info("Slave report current offset {}", this.currentReportedOffset); - result = reportSlaveOffset(this.currentReportedOffset); + result = reportSlaveOffset(currentState, this.currentReportedOffset); if (!result) { return false; } @@ -386,7 +386,7 @@ private boolean transferFromMaster() throws IOException { return false; } - return this.reportSlaveMaxOffset(); + return this.reportSlaveMaxOffset(currentState); } @Override @@ -415,7 +415,7 @@ public void run() { handshakeWithMaster(); continue; case TRANSFER: - if (!transferFromMaster()) { + if (!transferFromMaster(HAConnectionState.TRANSFER)) { closeMasterAndWait(); continue; } @@ -445,7 +445,7 @@ public void run() { /** * Compare the master and slave's epoch file, find consistent point, do truncate. */ - private boolean doTruncate(List masterEpochEntries, long masterEndOffset) throws IOException { + private boolean doTruncate(List masterEpochEntries, long masterEndOffset, HAConnectionState currentState) throws IOException { if (this.epochCache.getEntrySize() == 0) { // If epochMap is empty, means the broker is a new replicas LOGGER.info("Slave local epochCache is empty, skip truncate log"); @@ -475,7 +475,7 @@ private boolean doTruncate(List masterEpochEntries, long masterEndOf changeCurrentState(HAConnectionState.TRANSFER); this.currentReportedOffset = truncateOffset; } - if (!reportSlaveMaxOffset()) { + if (!reportSlaveMaxOffset(currentState)) { LOGGER.error("AutoSwitchHAClient report max offset to master failed"); return false; } @@ -534,7 +534,7 @@ protected boolean processReadResult(ByteBuffer byteBufferRead) { byteBufferRead.position(readSocketPos); AutoSwitchHAClient.this.processPosition += bodySize; LOGGER.info("Receive handshake, masterMaxPosition {}, masterEpochEntries:{}, try truncate log", masterOffset, epochEntries); - if (!doTruncate(epochEntries, masterOffset)) { + if (!doTruncate(epochEntries, masterOffset, HAConnectionState.HANDSHAKE)) { waitForRunning(1000 * 2); LOGGER.error("AutoSwitchHAClient truncate log failed in handshake state"); return false; @@ -573,7 +573,7 @@ protected boolean processReadResult(ByteBuffer byteBufferRead) { haService.updateConfirmOffset(Math.min(confirmOffset, messageStore.getMaxPhyOffset())); - if (!reportSlaveMaxOffset()) { + if (!reportSlaveMaxOffset(HAConnectionState.TRANSFER)) { LOGGER.error("AutoSwitchHAClient report max offset to master failed"); return false; } diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java index 57f9e9619d9..8f79b55a9b3 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java +++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java @@ -363,7 +363,7 @@ protected boolean processReadResult(ByteBuffer byteBufferRead) { break; default: LOGGER.error("Current state illegal {}", currentState); - break; + return false; } if (!slaveState.equals(currentState)) {