Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add automatic subscribe/unsubscribe to signals #2663

Merged
merged 7 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
780 changes: 335 additions & 445 deletions package-lock.json

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@ public synchronized void register(String clientSignalId,
delegate.register(clientSignalId, signal);
}

public synchronized void unsubscribe(String clientSignalId) {
var endpointMethodInfo = endpointMethods.get(clientSignalId);
if (endpointMethodInfo == null) {
return;
}
delegate.removeClientSignalToSignalMapping(clientSignalId);
endpointMethods.remove(clientSignalId);
}

public synchronized NumberSignal get(String clientSignalId)
throws EndpointInvocationException.EndpointAccessDeniedException,
EndpointInvocationException.EndpointNotFoundException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ public Flux<ObjectNode> subscribe(String providerEndpoint,
}

registry.register(clientSignalId, providerEndpoint, providerMethod);
return registry.get(clientSignalId).subscribe();
return registry.get(clientSignalId).subscribe()
.doFinally((event) -> registry.unsubscribe(clientSignalId));
} catch (Exception e) {
return Flux.error(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,29 @@ public void when_accessToEndpointIsAllowed_signalInstanceIsRegistered()
}
}

@Test
public void when_unsubscribedIsCalled_underlyingRegistryRemovesClientSignalToSignalMapping()
throws Exception {
NumberSignal signal = new NumberSignal();
EndpointInvoker invoker = mockEndpointInvokerThatGrantsAccess(signal);

AtomicReference<SignalsRegistry> signalsRegistry = new AtomicReference<>();
try (var dummy = Mockito.mockConstruction(SignalsRegistry.class,
(mockSignalRegistry, context) -> {
when(mockSignalRegistry.get("clientSignalId"))
.thenReturn(signal);
signalsRegistry.set(mockSignalRegistry);
})) {
SecureSignalsRegistry secureSignalsRegistry = new SecureSignalsRegistry(
invoker);
secureSignalsRegistry.register("clientSignalId", "endpoint",
"method");
secureSignalsRegistry.unsubscribe("clientSignalId");
verify(signalsRegistry.get(), times(1))
.removeClientSignalToSignalMapping("clientSignalId");
}
}

@Test
public void when_accessToEndpointIsRejected_register_throws()
throws Exception {
Expand Down
26 changes: 13 additions & 13 deletions packages/java/tests/spring/react-signals/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions packages/java/tests/spring/react-signals/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
"lit": "3.2.0",
"react": "18.3.1",
"react-dom": "18.3.1",
"react-router-dom": "^6.26.0"
"react-router-dom": "^6.26.1"
},
"devDependencies": {
"@babel/preset-react": "7.24.7",
Expand Down Expand Up @@ -117,7 +117,7 @@
"workbox-core": "7.1.0",
"workbox-precaching": "7.1.0"
},
"hash": "380838fd2237f6d269693eac27d292fa25ba2920ad38b53d8ebcfb27a20b5ace"
"hash": "8587dd57f1366294a48aaf55834f4b03439212337f749df1d38ec198ca284dec"
},
"overrides": {
"@vaadin/common-frontend": "$@vaadin/common-frontend",
Expand Down
89 changes: 71 additions & 18 deletions packages/ts/react-signals/src/FullStackSignal.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { computed, signal } from '@preact/signals-react';
import type { ConnectClient, Subscription } from '@vaadin/hilla-frontend';
import { nanoid } from 'nanoid';
import { Signal } from './core.js';
import { computed, signal, Signal } from './core.js';

const ENDPOINT = 'SignalsHandler';

Expand All @@ -22,6 +21,47 @@ export type StateEvent<T> = Readonly<{
value: T;
}>;

/**
* An abstraction of a signal that tracks the number of subscribers, and calls
* the provided `onSubscribe` and `onUnsubscribe` callbacks for the first
* subscription and the last unsubscription, respectively.
* @internal
*/
export abstract class DependencyTrackingSignal<T> extends Signal<T> {
readonly #onFirstSubscribe: () => void;
readonly #onLastUnsubscribe: () => void;

// -1 means to ignore the first subscription that is created internally in the
// FullStackSignal constructor.
#subscribeCount = -1;

protected constructor(value: T | undefined, onFirstSubscribe: () => void, onLastUnsubscribe: () => void) {
super(value);
this.#onFirstSubscribe = onFirstSubscribe;
this.#onLastUnsubscribe = onLastUnsubscribe;
}

protected S(node: unknown): void {
// @ts-expect-error: We use the protected method from the base class.
// eslint-disable-next-line @typescript-eslint/no-unsafe-call
super.S(node);
if (this.#subscribeCount === 0) {
this.#onFirstSubscribe();
}
this.#subscribeCount += 1;
}

protected U(node: unknown): void {
// @ts-expect-error: We use the protected method from the base class.
// eslint-disable-next-line @typescript-eslint/no-unsafe-call
super.U(node);
this.#subscribeCount -= 1;
if (this.#subscribeCount === 0) {
this.#onLastUnsubscribe();
}
}
}

/**
* An object that describes a data object to connect to the signal provider
* service.
Expand Down Expand Up @@ -93,7 +133,7 @@ class ServerConnection<T> {
*
* @internal
*/
export abstract class FullStackSignal<T> extends Signal<T> {
export abstract class FullStackSignal<T> extends DependencyTrackingSignal<T> {
/**
* The unique identifier of the signal necessary to communicate with the
* server.
Expand All @@ -118,24 +158,20 @@ export abstract class FullStackSignal<T> extends Signal<T> {
readonly #pending = signal(false);
readonly #error = signal<Error | undefined>(undefined);

// Paused at the very start to prevent the signal from sending the initial
// value to the server.
#paused = true;

constructor(value: T | undefined, config: ServerConnectionConfig) {
super(value);
super(
value,
() => this.#connect(),
() => this.#disconnect(),
);
this.server = new ServerConnection(this.id, config);

// Paused at the very start to prevent the signal from sending the initial
// value to the server.
let paused = true;

this.server.connect().onNext((event: StateEvent<T>) => {
if (event.type === StateEventType.SNAPSHOT) {
paused = true;
this.value = event.value;
paused = false;
}
});

this.subscribe((v) => {
if (!paused) {
if (!this.#paused) {
this.#pending.value = true;
this.#error.value = undefined;
this.server
Expand All @@ -153,6 +189,23 @@ export abstract class FullStackSignal<T> extends Signal<T> {
}
});

paused = false;
this.#paused = false;
}

#connect() {
this.server.connect().onNext((event: StateEvent<T>) => {
if (event.type === StateEventType.SNAPSHOT) {
this.#paused = true;
this.value = event.value;
this.#paused = false;
}
});
}

#disconnect() {
if (this.server.subscription === undefined) {
return;
}
this.server.disconnect();
}
}
Loading