Skip to content
This repository has been archived by the owner on Sep 11, 2024. It is now read-only.

Live location sharing - Stop publishing location to beacons with consecutive errors #8194

Merged
merged 4 commits into from
Mar 30, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 63 additions & 5 deletions src/stores/OwnBeaconStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,14 @@ const isOwnBeacon = (beacon: Beacon, userId: string): boolean => beacon.beaconIn
export enum OwnBeaconStoreEvent {
LivenessChange = 'OwnBeaconStore.LivenessChange',
MonitoringLivePosition = 'OwnBeaconStore.MonitoringLivePosition',
WireError = 'WireError',
}

const MOVING_UPDATE_INTERVAL = 2000;
const STATIC_UPDATE_INTERVAL = 30000;

const BAIL_AFTER_CONSECUTIVE_ERROR_COUNT = 2;

type OwnBeaconStoreState = {
beacons: Map<string, Beacon>;
beaconWireErrors: Map<string, Beacon>;
Expand All @@ -65,9 +68,11 @@ export class OwnBeaconStore extends AsyncStoreWithClient<OwnBeaconStoreState> {
public readonly beacons = new Map<string, Beacon>();
public readonly beaconsByRoomId = new Map<Room['roomId'], Set<string>>();
/**
* Track over the wire errors for beacons
* Track over the wire errors for published positions
* Counts consecutive wire errors per beacon
* Reset on successful publish of location
*/
public readonly beaconWireErrors = new Map<string, Error>();
public readonly beaconWireErrorCounts = new Map<string, number>();
private liveBeaconIds = [];
private locationInterval: number;
private geolocationError: GeolocationError | undefined;
Expand Down Expand Up @@ -106,7 +111,7 @@ export class OwnBeaconStore extends AsyncStoreWithClient<OwnBeaconStoreState> {
this.beacons.clear();
this.beaconsByRoomId.clear();
this.liveBeaconIds = [];
this.beaconWireErrors.clear();
this.beaconWireErrorCounts.clear();
}

protected async onReady(): Promise<void> {
Expand All @@ -125,6 +130,25 @@ export class OwnBeaconStore extends AsyncStoreWithClient<OwnBeaconStoreState> {
return !!this.getLiveBeaconIds(roomId).length;
}

/**
* If a beacon has failed to publish position
* past the allowed consecutive failure count (BAIL_AFTER_CONSECUTIVE_ERROR_COUNT)
* Then consider it to have an error
*/
public hasWireError(beaconId: string): boolean {
return this.beaconWireErrorCounts.get(beaconId) >= BAIL_AFTER_CONSECUTIVE_ERROR_COUNT;
}

public resetWireError(beaconId: string): void {
this.incrementBeaconWireErrorCount(beaconId, false);

// always publish to all live beacons together
// instead of just one that was changed
// to keep lastPublishedTimestamp simple
// and extra published locations don't hurt
this.publishCurrentLocationToBeacons();
}

public getLiveBeaconIds(roomId?: string): string[] {
if (!roomId) {
return this.liveBeaconIds;
Expand Down Expand Up @@ -202,6 +226,13 @@ export class OwnBeaconStore extends AsyncStoreWithClient<OwnBeaconStoreState> {
* State management
*/

/**
* Live beacon ids that do not have wire errors
*/
private get healthyLiveBeaconIds() {
return this.liveBeaconIds.filter(beaconId => !this.hasWireError(beaconId));
}

private initialiseBeaconState = () => {
const userId = this.matrixClient.getUserId();
const visibleRooms = this.matrixClient.getVisibleRooms();
Expand Down Expand Up @@ -399,7 +430,7 @@ export class OwnBeaconStore extends AsyncStoreWithClient<OwnBeaconStoreState> {
*/
private publishLocationToBeacons = async (position: TimedGeoUri) => {
this.lastPublishedPositionTimestamp = Date.now();
await Promise.all(this.liveBeaconIds.map(beaconId =>
await Promise.all(this.healthyLiveBeaconIds.map(beaconId =>
this.sendLocationToBeacon(this.beacons.get(beaconId), position)),
);
};
Expand All @@ -412,10 +443,37 @@ export class OwnBeaconStore extends AsyncStoreWithClient<OwnBeaconStoreState> {
private sendLocationToBeacon = async (beacon: Beacon, { geoUri, timestamp }: TimedGeoUri) => {
const content = makeBeaconContent(geoUri, timestamp, beacon.beaconInfoId);
try {
console.log(beacon.beaconInfoId);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

debug logging

await this.matrixClient.sendEvent(beacon.roomId, M_BEACON.name, content);
this.incrementBeaconWireErrorCount(beacon.identifier, false);
} catch (error) {
logger.error(error);
this.beaconWireErrors.set(beacon.identifier, error);
this.incrementBeaconWireErrorCount(beacon.identifier, true);
}
};

/**
* Manage beacon wire error count
* - clear count for beacon when not error
* - increment count for beacon when is error
* - emit if beacon error count crossed threshold
*/
private incrementBeaconWireErrorCount = (beaconId: string, isError: boolean): void => {
const hadError = this.hasWireError(beaconId);

if (isError) {
// increment error count
this.beaconWireErrorCounts.set(
beaconId,
(this.beaconWireErrorCounts.get(beaconId) ?? 0) + 1,
);
} else {
// clear any error count
this.beaconWireErrorCounts.delete(beaconId);
}

if (this.hasWireError(beaconId) !== hadError) {
this.emit(OwnBeaconStoreEvent.WireError, beaconId);
}
};
}
134 changes: 132 additions & 2 deletions test/stores/OwnBeaconStore-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ describe('OwnBeaconStore', () => {
geolocation = mockGeolocation();
mockClient.getVisibleRooms.mockReturnValue([]);
mockClient.unstable_setLiveBeacon.mockClear().mockResolvedValue({ event_id: '1' });
mockClient.sendEvent.mockClear().mockResolvedValue({ event_id: '1' });
mockClient.sendEvent.mockReset().mockResolvedValue({ event_id: '1' });
jest.spyOn(global.Date, 'now').mockReturnValue(now);
jest.spyOn(OwnBeaconStore.instance, 'emit').mockRestore();
jest.spyOn(logger, 'error').mockRestore();
Expand Down Expand Up @@ -696,7 +696,7 @@ describe('OwnBeaconStore', () => {
});
});

describe('sending positions', () => {
describe('publishing positions', () => {
it('stops watching position when user has no more live beacons', async () => {
// geolocation is only going to emit 1 position
geolocation.watchPosition.mockImplementation(
Expand Down Expand Up @@ -825,6 +825,136 @@ describe('OwnBeaconStore', () => {
});
});

describe('when publishing position fails', () => {
beforeEach(() => {
geolocation.watchPosition.mockImplementation(
watchPositionMockImplementation([0, 1000, 3000, 3000, 3000]),
);

// eat expected console error logs
jest.spyOn(logger, 'error').mockImplementation(() => { });
});

// we need to advance time and then flush promises
// individually for each call to sendEvent
// otherwise the sendEvent doesn't reject/resolve and update state
// before the next call
// advance and flush every 1000ms
// until given ms is 'elapsed'
const advanceAndFlushPromises = async (timeMs: number) => {
while (timeMs > 0) {
jest.advanceTimersByTime(1000);
await flushPromisesWithFakeTimers();
timeMs -= 1000;
}
};

it('continues publishing positions after one publish error', async () => {
// fail to send first event, then succeed
mockClient.sendEvent.mockRejectedValueOnce(new Error('oups')).mockResolvedValue({ event_id: '1' });
makeRoomsWithStateEvents([
alicesRoom1BeaconInfo,
]);
const store = await makeOwnBeaconStore();
// wait for store to settle
await flushPromisesWithFakeTimers();

await advanceAndFlushPromises(50000);

// called for each position from watchPosition
expect(mockClient.sendEvent).toHaveBeenCalledTimes(5);
expect(store.hasWireError(alicesRoom1BeaconInfo.getType())).toBe(false);
});

it('continues publishing positions when a beacon fails intermittently', async () => {
// every second event rejects
// meaning this beacon has more errors than the threshold
// but they are not consecutive
mockClient.sendEvent
.mockRejectedValueOnce(new Error('oups'))
.mockResolvedValueOnce({ event_id: '1' })
.mockRejectedValueOnce(new Error('oups'))
.mockResolvedValueOnce({ event_id: '1' })
.mockRejectedValueOnce(new Error('oups'));

makeRoomsWithStateEvents([
alicesRoom1BeaconInfo,
]);
const store = await makeOwnBeaconStore();
const emitSpy = jest.spyOn(store, 'emit');
// wait for store to settle
await flushPromisesWithFakeTimers();

await advanceAndFlushPromises(50000);

// called for each position from watchPosition
expect(mockClient.sendEvent).toHaveBeenCalledTimes(5);
expect(store.hasWireError(alicesRoom1BeaconInfo.getType())).toBe(false);
expect(emitSpy).not.toHaveBeenCalledWith(
OwnBeaconStoreEvent.WireError, alicesRoom1BeaconInfo.getType(),
);
});

it('stops publishing positions when a beacon fails consistently', async () => {
// always fails to send events
mockClient.sendEvent.mockRejectedValue(new Error('oups'));
makeRoomsWithStateEvents([
alicesRoom1BeaconInfo,
]);
const store = await makeOwnBeaconStore();
const emitSpy = jest.spyOn(store, 'emit');
// wait for store to settle
await flushPromisesWithFakeTimers();

// 5 positions from watchPosition in this period
await advanceAndFlushPromises(50000);

// only two allowed failures
expect(mockClient.sendEvent).toHaveBeenCalledTimes(2);
expect(store.hasWireError(alicesRoom1BeaconInfo.getType())).toBe(true);
expect(emitSpy).toHaveBeenCalledWith(
OwnBeaconStoreEvent.WireError, alicesRoom1BeaconInfo.getType(),
);
});

it('restarts publishing a beacon after resetting wire error', async () => {
// always fails to send events
mockClient.sendEvent.mockRejectedValue(new Error('oups'));
makeRoomsWithStateEvents([
alicesRoom1BeaconInfo,
]);
const store = await makeOwnBeaconStore();
const emitSpy = jest.spyOn(store, 'emit');
// wait for store to settle
await flushPromisesWithFakeTimers();

// 3 positions from watchPosition in this period
await advanceAndFlushPromises(4000);

// only two allowed failures
expect(mockClient.sendEvent).toHaveBeenCalledTimes(2);
expect(store.hasWireError(alicesRoom1BeaconInfo.getType())).toBe(true);
expect(emitSpy).toHaveBeenCalledWith(
OwnBeaconStoreEvent.WireError, alicesRoom1BeaconInfo.getType(),
);

// reset emitSpy mock counts to asser on wireError again
emitSpy.mockClear();
store.resetWireError(alicesRoom1BeaconInfo.getType());

expect(store.hasWireError(alicesRoom1BeaconInfo.getType())).toBe(false);

// 2 more positions from watchPosition in this period
await advanceAndFlushPromises(10000);

// 2 from before, 2 new ones
expect(mockClient.sendEvent).toHaveBeenCalledTimes(4);
expect(emitSpy).toHaveBeenCalledWith(
OwnBeaconStoreEvent.WireError, alicesRoom1BeaconInfo.getType(),
);
});
});

it('publishes subsequent positions', async () => {
// modern fake timers + debounce + promises are not friends
// just testing that positions are published
Expand Down