Skip to content

Commit

Permalink
Use a single thread to watch all event executors (#3884)
Browse files Browse the repository at this point in the history
Signed-off-by: Jörg Sautter <[email protected]>
  • Loading branch information
joerg1985 authored Nov 24, 2023
1 parent bdb1e55 commit ae117f6
Showing 1 changed file with 7 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class EventHandler implements AutoCloseable {
private final Map<String, EventFactory> typedEventFactories;

private final Map<Class<? extends EventSubscriber>, ExecutorRecord> executors = new HashMap<>();
private final ScheduledExecutorService watcher;

/**
* Create a new event handler.
Expand All @@ -64,21 +65,21 @@ public EventHandler(final Map<String, Set<EventSubscriber>> typedEventSubscriber
final Map<String, EventFactory> typedEventFactories) {
this.typedEventSubscribers = typedEventSubscribers;
this.typedEventFactories = typedEventFactories;
watcher = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("eventwatcher"));
}

private synchronized ExecutorRecord createExecutorRecord(Class<? extends EventSubscriber> subscriber) {
return new ExecutorRecord(
Executors.newSingleThreadExecutor(new NamedThreadFactory("eventexecutor-" + executors.size())),
Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("eventwatcher-" + executors.size())),
new AtomicInteger());
}

@Override
public void close() {
executors.values().forEach(r -> {
r.executor.shutdownNow();
r.watcher.shutdownNow();
});
watcher.shutdownNow();
}

public void handleEvent(org.osgi.service.event.Event osgiEvent) {
Expand Down Expand Up @@ -154,13 +155,13 @@ private synchronized void dispatchEvent(final Set<EventSubscriber> eventSubscrib
logger.trace("Delegate event to subscriber ({}).", eventSubscriber.getClass());
ExecutorRecord executorRecord = Objects.requireNonNull(
executors.computeIfAbsent(eventSubscriber.getClass(), this::createExecutorRecord));
int queueSize = executorRecord.count().incrementAndGet();
int queueSize = executorRecord.count.incrementAndGet();
if (queueSize > EVENT_QUEUE_WARN_LIMIT) {
logger.warn("The queue for a subscriber of type '{}' exceeds {} elements. System may be unstable.",
eventSubscriber.getClass(), EVENT_QUEUE_WARN_LIMIT);
}
CompletableFuture.runAsync(() -> {
ScheduledFuture<?> logTimeout = executorRecord.watcher().schedule(
ScheduledFuture<?> logTimeout = watcher.schedule(
() -> logger.warn("Dispatching event to subscriber '{}' takes more than {}ms.",
eventSubscriber, EVENTSUBSCRIBER_EVENTHANDLING_MAX_MS),
EVENTSUBSCRIBER_EVENTHANDLING_MAX_MS, TimeUnit.MILLISECONDS);
Expand All @@ -171,13 +172,13 @@ private synchronized void dispatchEvent(final Set<EventSubscriber> eventSubscrib
EventSubscriber.class.getName(), ex.getMessage(), ex);
}
logTimeout.cancel(false);
}, executorRecord.executor()).thenRun(executorRecord.count::decrementAndGet);
}, executorRecord.executor).thenRun(executorRecord.count::decrementAndGet);
} else {
logger.trace("Skip event subscriber ({}) because of its filter.", eventSubscriber.getClass());
}
}
}

private record ExecutorRecord(ExecutorService executor, ScheduledExecutorService watcher, AtomicInteger count) {
private record ExecutorRecord(ExecutorService executor, AtomicInteger count) {
}
}

0 comments on commit ae117f6

Please sign in to comment.