Skip to content

Commit

Permalink
timeseries: fix memory leaks involving CardObserver and shareReplay (t…
Browse files Browse the repository at this point in the history
…ensorflow#5254)

This addresses 2 non-trivial sources of memory leaks observed when
filtering tags in the Time Series dashboard.

a) CardObserver used to track Elements that were destroyed via
ngOnDestroy and stored them in a `Set<Element>`. Now with a WeakSet
instead, we cover cases when the browser's IntersectionObserverEntry
is not handled for some reason.

b) The rxjs operator "shareReplay" has some known gotchas [1], [2].
When more observables subscribe to a source containing
`shareReplay(1)` in its operator chain, the source remains subscribed
even after the other observables unsubscribe. This change uses the
`takeUntil(ngUnsubscribe$)` pattern to ensure any component using a
`shareReplay` will not keep it subscribed after the component is gone.
Alternatively we could use `refCount`, but this opts for the `takeUntil`
pattern already used widely in the codebase.

Manually checked that the dashboard still operates properly, and in
Angular's dev mode, using a specific demo logdir, adding and clearing a
"." tag filter has these characteristics:

Before this change: 5.8 MB, 5.5K DOM nodes added each cycle
After this change: 0.05 MB, <10 DOM nodes added each cycle

Googlers, see b/196996936

[1] ReactiveX/rxjs#5034
[2] ReactiveX/rxjs#5931
  • Loading branch information
psybuzz authored and yatbear committed Mar 27, 2023
1 parent a36fd98 commit 2450040
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type CardObserverCallback = (
export class CardObserver {
private intersectionObserver?: IntersectionObserver;
private intersectionCallback?: CardObserverCallback;
private readonly destroyedTargets = new Set<Element>();
private readonly destroyedTargets = new WeakSet<Element>();

/**
* Buffer determines how far a card can be, beyond the root's bounding rect,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ export class ImageCardContainer implements CardRenderer, OnInit, OnDestroy {

const selectCardMetadata$ = this.store.select(getCardMetadata, this.cardId);
const cardMetadata$ = selectCardMetadata$.pipe(
takeUntil(this.ngUnsubscribe),
filter((cardMetadata) => {
return !!cardMetadata && this.isImageCardMetadata(cardMetadata);
}),
Expand All @@ -174,6 +175,7 @@ export class ImageCardContainer implements CardRenderer, OnInit, OnDestroy {
this.store.select(getCardTimeSeries, this.cardId),
]);
const timeSeries$ = metadataAndSeries$.pipe(
takeUntil(this.ngUnsubscribe),
map(([cardMetadata, runToSeries]) => {
const runId = cardMetadata.runId;
if (!runToSeries || !runToSeries.hasOwnProperty(runId)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ import {
EventEmitter,
Input,
OnInit,
OnDestroy,
Output,
} from '@angular/core';
import {Store} from '@ngrx/store';
import {combineLatest, from, Observable, of} from 'rxjs';
import {combineLatest, from, Observable, of, Subject} from 'rxjs';
import {
combineLatestWith,
debounceTime,
Expand All @@ -32,6 +33,7 @@ import {
shareReplay,
startWith,
switchMap,
takeUntil,
} from 'rxjs/operators';

import {State} from '../../../app_state';
Expand Down Expand Up @@ -140,7 +142,7 @@ function areSeriesEqual(
],
changeDetection: ChangeDetectionStrategy.OnPush,
})
export class ScalarCardContainer implements CardRenderer, OnInit {
export class ScalarCardContainer implements CardRenderer, OnInit, OnDestroy {
constructor(private readonly store: Store<State>) {}

// Angular Component constructor for DataDownload dialog. It is customizable for
Expand Down Expand Up @@ -196,6 +198,8 @@ export class ScalarCardContainer implements CardRenderer, OnInit {

showFullSize = false;

private readonly ngUnsubscribe = new Subject<void>();

private isScalarCardMetadata(
cardMetadata: CardMetadata
): cardMetadata is ScalarCardMetadata {
Expand Down Expand Up @@ -226,6 +230,7 @@ export class ScalarCardContainer implements CardRenderer, OnInit {
const nonNullRunsToScalarSeries$ = this.store
.select(getCardTimeSeries, this.cardId)
.pipe(
takeUntil(this.ngUnsubscribe),
filter((runToSeries) => Boolean(runToSeries)),
map((runToSeries) => runToSeries as RunToSeries<PluginType.SCALARS>),
shareReplay(1)
Expand Down Expand Up @@ -282,6 +287,7 @@ export class ScalarCardContainer implements CardRenderer, OnInit {
combineLatestWith(
this.store.select(getMetricsScalarPartitionNonMonotonicX)
),
takeUntil(this.ngUnsubscribe),
map<[PartialSeries[], boolean], PartitionedSeries[]>(
([normalizedSeries, enablePartition]) => {
if (enablePartition) return partitionSeries(normalizedSeries);
Expand Down Expand Up @@ -481,6 +487,11 @@ export class ScalarCardContainer implements CardRenderer, OnInit {
this.isPinned$ = this.store.select(getCardPinnedState, this.cardId);
}

ngOnDestroy() {
this.ngUnsubscribe.next();
this.ngUnsubscribe.complete();
}

private getRunDisplayName(runId: string): Observable<string> {
return combineLatest([
this.store.select(getExperimentIdForRunId, {runId}),
Expand Down
14 changes: 11 additions & 3 deletions tensorboard/webapp/metrics/views/main_view/card_grid_container.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ import {
Component,
Input,
OnChanges,
OnDestroy,
SimpleChanges,
} from '@angular/core';
import {Store} from '@ngrx/store';
import {selectors as settingsSelectors} from '../../../settings';
import {BehaviorSubject, combineLatest, Observable, of} from 'rxjs';
import {map, shareReplay, switchMap, tap} from 'rxjs/operators';
import {BehaviorSubject, combineLatest, Observable, of, Subject} from 'rxjs';
import {map, shareReplay, switchMap, takeUntil, tap} from 'rxjs/operators';

import {State} from '../../../app_state';
import {getMetricsTagGroupExpansionState} from '../../../selectors';
Expand Down Expand Up @@ -52,7 +53,7 @@ const ITEMS_COLLAPSED_CLIP_SIZE = 3;
`,
changeDetection: ChangeDetectionStrategy.OnPush,
})
export class CardGridContainer implements OnChanges {
export class CardGridContainer implements OnChanges, OnDestroy {
// groupName must be non-null if the group should be collapse/expand-able.
@Input() groupName: string | null = null;
@Input() cardIdsWithMetadata!: CardIdWithMetadata[];
Expand All @@ -61,6 +62,7 @@ export class CardGridContainer implements OnChanges {
private readonly groupName$ = new BehaviorSubject<string | null>(null);
readonly pageIndex$ = new BehaviorSubject<number>(0);
private readonly items$ = new BehaviorSubject<CardIdWithMetadata[]>([]);
private readonly ngUnsubscribe = new Subject<void>();

readonly numPages$ = combineLatest([
this.items$,
Expand Down Expand Up @@ -112,6 +114,7 @@ export class CardGridContainer implements OnChanges {
this.pageIndex$,
this.numPages$,
]).pipe(
takeUntil(this.ngUnsubscribe),
tap(([pageIndex, numPages]) => {
// Cycle in the Observable but only loops when pageIndex is not
// valid and does not repeat more than once.
Expand Down Expand Up @@ -158,6 +161,11 @@ export class CardGridContainer implements OnChanges {
}
}

ngOnDestroy() {
this.ngUnsubscribe.next();
this.ngUnsubscribe.complete();
}

onPageIndexChanged(newIndex: number) {
this.pageIndex$.next(newIndex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ export class RunsTableContainer implements OnInit, OnDestroy {
})
);
this.allUnsortedRunTableItems$ = rawAllUnsortedRunTableItems$.pipe(
takeUntil(this.ngUnsubscribe),
shareReplay(1)
);
this.allItemsLength$ = this.allUnsortedRunTableItems$.pipe(
Expand All @@ -303,7 +304,7 @@ export class RunsTableContainer implements OnInit, OnDestroy {

const getFilteredItems$ = this.getFilteredItems$(
this.allUnsortedRunTableItems$
).pipe(shareReplay(1));
).pipe(takeUntil(this.ngUnsubscribe), shareReplay(1));

this.filteredItemsLength$ = getFilteredItems$.pipe(
map((items) => items.length)
Expand Down

0 comments on commit 2450040

Please sign in to comment.