Skip to content

Commit

Permalink
fix(signals): use Injector of rxMethod instance caller if availab…
Browse files Browse the repository at this point in the history
…le (#4529)

Closes #4528
  • Loading branch information
rainerhahnekamp authored Sep 28, 2024
1 parent 4fb78f1 commit ffc1d87
Show file tree
Hide file tree
Showing 2 changed files with 260 additions and 13 deletions.
224 changes: 224 additions & 0 deletions modules/signals/rxjs-interop/spec/rx-method.spec.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
import {
Component,
createEnvironmentInjector,
EnvironmentInjector,
inject,
Injectable,
Injector,
OnInit,
signal,
} from '@angular/core';
import { TestBed } from '@angular/core/testing';
import { provideLocationMocks } from '@angular/common/testing';
import { provideRouter } from '@angular/router';
import { RouterTestingHarness } from '@angular/router/testing';
import { BehaviorSubject, pipe, Subject, tap } from 'rxjs';
import { rxMethod } from '../src';
import { createLocalService } from '../../spec/helpers';
Expand Down Expand Up @@ -231,4 +238,221 @@ describe('rxMethod', () => {
TestBed.flushEffects();
expect(counter()).toBe(4);
});

/**
* This test suite verifies that a signal or observable passed to a reactive
* method that is initialized at the ancestor injector level is tracked within
* the correct injection context and untracked at the specified time.
*
* Components use `globalSignal` or `globalObservable` from `GlobalService`
* and pass it to the reactive method. If the component is destroyed but
* signal or observable change still increases the corresponding counter,
* the internal effect or subscription is still active.
*/
describe('with instance injector', () => {
@Injectable({ providedIn: 'root' })
class GlobalService {
readonly globalSignal = signal(1);
readonly globalObservable = new BehaviorSubject(1);

globalSignalChangeCounter = 0;
globalObservableChangeCounter = 0;

readonly signalMethod = rxMethod<number>(
tap(() => this.globalSignalChangeCounter++)
);
readonly observableMethod = rxMethod<number>(
tap(() => this.globalObservableChangeCounter++)
);

incrementSignal(): void {
this.globalSignal.update((value) => value + 1);
}

incrementObservable(): void {
this.globalObservable.next(this.globalObservable.value + 1);
}
}

@Component({
selector: 'app-without-store',
template: '',
standalone: true,
})
class WithoutStoreComponent {}

function setup(WithStoreComponent: new () => unknown): GlobalService {
TestBed.configureTestingModule({
providers: [
provideRouter([
{ path: 'with-store', component: WithStoreComponent },
{
path: 'without-store',
component: WithoutStoreComponent,
},
]),
provideLocationMocks(),
],
});

return TestBed.inject(GlobalService);
}

it('tracks a signal until the component is destroyed', async () => {
@Component({
selector: 'app-with-store',
template: '',
standalone: true,
})
class WithStoreComponent {
store = inject(GlobalService);

constructor() {
this.store.signalMethod(this.store.globalSignal);
}
}

const globalService = setup(WithStoreComponent);
const harness = await RouterTestingHarness.create('/with-store');

expect(globalService.globalSignalChangeCounter).toBe(1);

globalService.incrementSignal();
TestBed.flushEffects();
expect(globalService.globalSignalChangeCounter).toBe(2);

globalService.incrementSignal();
TestBed.flushEffects();
expect(globalService.globalSignalChangeCounter).toBe(3);

await harness.navigateByUrl('/without-store');
globalService.incrementSignal();
TestBed.flushEffects();

expect(globalService.globalSignalChangeCounter).toBe(3);
});

it('tracks an observable until the component is destroyed', async () => {
@Component({
selector: 'app-with-store',
template: '',
standalone: true,
})
class WithStoreComponent {
store = inject(GlobalService);

constructor() {
this.store.observableMethod(this.store.globalObservable);
}
}

const globalService = setup(WithStoreComponent);
const harness = await RouterTestingHarness.create('/with-store');

expect(globalService.globalObservableChangeCounter).toBe(1);

globalService.incrementObservable();
expect(globalService.globalObservableChangeCounter).toBe(2);

globalService.incrementObservable();
expect(globalService.globalObservableChangeCounter).toBe(3);

await harness.navigateByUrl('/without-store');
globalService.incrementObservable();

expect(globalService.globalObservableChangeCounter).toBe(3);
});

it('tracks a signal until the provided injector is destroyed', async () => {
@Component({
selector: 'app-with-store',
template: '',
standalone: true,
})
class WithStoreComponent implements OnInit {
store = inject(GlobalService);
injector = inject(Injector);

ngOnInit() {
this.store.signalMethod(this.store.globalSignal, {
injector: this.injector,
});
}
}

const globalService = setup(WithStoreComponent);
const harness = await RouterTestingHarness.create('/with-store');

globalService.incrementSignal();
TestBed.flushEffects();

expect(globalService.globalSignalChangeCounter).toBe(2);

await harness.navigateByUrl('/without-store');
globalService.incrementSignal();
TestBed.flushEffects();

expect(globalService.globalSignalChangeCounter).toBe(2);
});

it('tracks an observable until the provided injector is destroyed', async () => {
@Component({
selector: 'app-with-store',
template: '',
standalone: true,
})
class WithStoreComponent implements OnInit {
store = inject(GlobalService);
injector = inject(Injector);

ngOnInit() {
this.store.observableMethod(this.store.globalObservable, {
injector: this.injector,
});
}
}

const globalService = setup(WithStoreComponent);
const harness = await RouterTestingHarness.create('/with-store');

globalService.incrementObservable();

expect(globalService.globalObservableChangeCounter).toBe(2);

await harness.navigateByUrl('/without-store');
globalService.incrementObservable();

expect(globalService.globalObservableChangeCounter).toBe(2);
});

it('falls back to source injector when reactive method is called outside of the injection context', async () => {
@Component({
selector: 'app-with-store',
template: '',
standalone: true,
})
class WithStoreComponent implements OnInit {
store = inject(GlobalService);

ngOnInit() {
this.store.signalMethod(this.store.globalSignal);
this.store.observableMethod(this.store.globalObservable);
}
}

const globalService = setup(WithStoreComponent);
const harness = await RouterTestingHarness.create('/with-store');

expect(globalService.globalSignalChangeCounter).toBe(1);
expect(globalService.globalObservableChangeCounter).toBe(1);

await harness.navigateByUrl('/without-store');
globalService.incrementSignal();
TestBed.flushEffects();
globalService.incrementObservable();

expect(globalService.globalSignalChangeCounter).toBe(2);
expect(globalService.globalObservableChangeCounter).toBe(2);
});
});
});
49 changes: 36 additions & 13 deletions modules/signals/rxjs-interop/src/rx-method.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import {
import { isObservable, noop, Observable, Subject, Unsubscribable } from 'rxjs';

type RxMethod<Input> = ((
input: Input | Signal<Input> | Observable<Input>
input: Input | Signal<Input> | Observable<Input>,
config?: { injector?: Injector }
) => Unsubscribable) &
Unsubscribable;

Expand All @@ -23,39 +24,61 @@ export function rxMethod<Input>(
assertInInjectionContext(rxMethod);
}

const injector = config?.injector ?? inject(Injector);
const destroyRef = injector.get(DestroyRef);
const sourceInjector = config?.injector ?? inject(Injector);
const source$ = new Subject<Input>();

const sourceSub = generator(source$).subscribe();
destroyRef.onDestroy(() => sourceSub.unsubscribe());
sourceInjector.get(DestroyRef).onDestroy(() => sourceSub.unsubscribe());

const rxMethodFn = (
input: Input | Signal<Input> | Observable<Input>,
config?: { injector?: Injector }
) => {
if (isStatic(input)) {
source$.next(input);
return { unsubscribe: noop };
}

const instanceInjector =
config?.injector ?? getCallerInjector() ?? sourceInjector;

const rxMethodFn = (input: Input | Signal<Input> | Observable<Input>) => {
if (isSignal(input)) {
const watcher = effect(
() => {
const value = input();
untracked(() => source$.next(value));
},
{ injector }
{ injector: instanceInjector }
);
const instanceSub = { unsubscribe: () => watcher.destroy() };
sourceSub.add(instanceSub);

return instanceSub;
}

if (isObservable(input)) {
const instanceSub = input.subscribe((value) => source$.next(value));
sourceSub.add(instanceSub);
const instanceSub = input.subscribe((value) => source$.next(value));
sourceSub.add(instanceSub);

return instanceSub;
if (instanceInjector !== sourceInjector) {
instanceInjector
.get(DestroyRef)
.onDestroy(() => instanceSub.unsubscribe());
}

source$.next(input);
return { unsubscribe: noop };
return instanceSub;
};
rxMethodFn.unsubscribe = sourceSub.unsubscribe.bind(sourceSub);

return rxMethodFn;
}

function isStatic<T>(value: T | Signal<T> | Observable<T>): value is T {
return !isSignal(value) && !isObservable(value);
}

function getCallerInjector(): Injector | null {
try {
return inject(Injector);
} catch {
return null;
}
}

0 comments on commit ffc1d87

Please sign in to comment.