Skip to content

Commit

Permalink
Merge branch 'apache:develop' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
hhy50 committed Apr 23, 2023
2 parents e76407b + af6a95f commit 7847646
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,15 @@ public class PlainPermissionChecker implements PermissionChecker {
public void check(AccessResource checkedAccess, AccessResource ownedAccess) {
PlainAccessResource checkedPlainAccess = (PlainAccessResource) checkedAccess;
PlainAccessResource ownedPlainAccess = (PlainAccessResource) ownedAccess;
if (Permission.needAdminPerm(checkedPlainAccess.getRequestCode()) && !ownedPlainAccess.isAdmin()) {

if (ownedPlainAccess.isAdmin()) {
// admin user don't need verification
return;
}
if (Permission.needAdminPerm(checkedPlainAccess.getRequestCode())) {
throw new AclException(String.format("Need admin permission for request code=%d, but accessKey=%s is not", checkedPlainAccess.getRequestCode(), ownedPlainAccess.getAccessKey()));
}

Map<String, Byte> needCheckedPermMap = checkedPlainAccess.getResourcePermMap();
Map<String, Byte> ownedPermMap = ownedPlainAccess.getResourcePermMap();

Expand All @@ -38,11 +44,6 @@ public void check(AccessResource checkedAccess, AccessResource ownedAccess) {
return;
}

if (ownedPermMap == null && ownedPlainAccess.isAdmin()) {
// If the ownedPermMap is null and it is an admin user, then return
return;
}

for (Map.Entry<String, Byte> needCheckedEntry : needCheckedPermMap.entrySet()) {
String resource = needCheckedEntry.getKey();
Byte neededPerm = needCheckedEntry.getValue();
Expand All @@ -58,7 +59,7 @@ public void check(AccessResource checkedAccess, AccessResource ownedAccess) {
continue;
}
if (!Permission.checkPermission(neededPerm, ownedPermMap.get(resource))) {
throw new AclException(String.format("No default permission for %s", PlainAccessResource.printStr(resource, isGroup)));
throw new AclException(String.format("No permission for %s", PlainAccessResource.printStr(resource, isGroup)));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,12 +177,14 @@ private void processAllGroup(Consumer<ProcessGroupInfo> consumer) {
if (isPop) {
String retryTopic = KeyBuilder.buildPopRetryTopic(topic, group);
TopicConfig retryTopicConfig = topicConfigManager.selectTopicConfig(retryTopic);
int retryTopicPerm = retryTopicConfig.getPerm() & brokerConfig.getBrokerPermission();
if (PermName.isReadable(retryTopicPerm) || PermName.isWriteable(retryTopicPerm)) {
consumer.accept(new ProcessGroupInfo(group, topic, true, retryTopic));
} else {
consumer.accept(new ProcessGroupInfo(group, topic, true, null));
if (retryTopicConfig != null) {
int retryTopicPerm = retryTopicConfig.getPerm() & brokerConfig.getBrokerPermission();
if (PermName.isReadable(retryTopicPerm) || PermName.isWriteable(retryTopicPerm)) {
consumer.accept(new ProcessGroupInfo(group, topic, true, retryTopic));
continue;
}
}
consumer.accept(new ProcessGroupInfo(group, topic, true, null));
} else {
consumer.accept(new ProcessGroupInfo(group, topic, false, null));
}
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@
<disruptor.version>1.2.10</disruptor.version>
<org.relection.version>0.9.11</org.relection.version>
<caffeine.version>2.9.3</caffeine.version>
<spring.version>5.3.26</spring.version>
<spring.version>5.3.27</spring.version>
<okio-jvm.version>3.0.0</okio-jvm.version>
<opentelemetry.version>1.19.0</opentelemetry.version>
<opentelemetry-exporter-prometheus.version>1.19.0-alpha</opentelemetry-exporter-prometheus.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public CompactionStore(DefaultMessageStore defaultMessageStore) {
this.compactionLogPath = Paths.get(compactionPath, COMPACTION_LOG_DIR).toString();
this.compactionCqPath = Paths.get(compactionPath, COMPACTION_CQ_DIR).toString();
this.positionMgr = new CompactionPositionMgr(compactionPath);
this.compactionThreadNum = Math.min(Runtime.getRuntime().availableProcessors(), config.getCompactionThreadNum());
this.compactionThreadNum = Math.min(Runtime.getRuntime().availableProcessors(), Math.max(1, config.getCompactionThreadNum()));

this.compactionSchedule = Executors.newScheduledThreadPool(this.compactionThreadNum,
new ThreadFactoryImpl("compactionSchedule_"));
Expand Down

0 comments on commit 7847646

Please sign in to comment.