Skip to content

Commit

Permalink
Develop (#23)
Browse files Browse the repository at this point in the history
 * 3.5.0
 * add monitor.join method for setting the client joined event
 * add `monitor.leave` method for setting the client left event
 * update README.ms
 * fix typo occured in storage `totalOutboundPacketsReceived` property
 * fix typo occurred in storage `deltaOutboundPacketsReceived`
 * add `kind` property to inboundRtpEntry
 * add `kind` property to OutboundRtpEntry
 * fix using wrong calculated fields for RTT in peer connection entry
 * make sfuStreamId and sfuSinkId association 'faster' by using high level map for pending associations
  • Loading branch information
balazskreith authored Mar 10, 2024
1 parent 8458da8 commit f1698d6
Show file tree
Hide file tree
Showing 12 changed files with 507 additions and 130 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ dist/
lib/
keys/
draft.md
yarn-error.log
455 changes: 391 additions & 64 deletions README.md

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@observertc/client-monitor-js",
"version": "3.4.0",
"version": "3.5.0",
"description": "ObserveRTC Client Integration Javascript Library",
"main": "lib/index.js",
"types": "lib/index.d.ts",
Expand Down
39 changes: 39 additions & 0 deletions src/ClientMonitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ export interface ClientMonitorEvents {
}

export class ClientMonitor extends TypedEventEmitter<ClientMonitorEvents> {
public readonly created = Date.now();
public readonly meta: ClientMetaData;
public readonly storage = new StatsStorage();
public readonly collectors = createCollectors({
Expand All @@ -67,6 +68,8 @@ export class ClientMonitor extends TypedEventEmitter<ClientMonitorEvents> {
private readonly _sampler = new Sampler(this.storage);
private _timer?: ReturnType<typeof setInterval>;

private _joined = false;
private _left = false;
private _lastCollectedAt = Date.now();
private _lastSampledAt = 0;
private _closed = false;
Expand Down Expand Up @@ -114,6 +117,12 @@ export class ClientMonitor extends TypedEventEmitter<ClientMonitorEvents> {
}
this._closed = true;
clearInterval(this._timer);

if (!this._left) this.leave();
if (0 < (this._config.samplingTick ?? 0)) {
this.sample();
}

this.storage.clear();
this.collectors.clear();
this._sampler.clear();
Expand All @@ -122,6 +131,8 @@ export class ClientMonitor extends TypedEventEmitter<ClientMonitorEvents> {
}

public async collect(): Promise<CollectedStats> {
if (this._closed) throw new Error('ClientMonitor is closed');

const collectedStats = await this.collectors.collect();
this.storage.update(collectedStats);
const timestamp = Date.now();
Expand All @@ -138,6 +149,9 @@ export class ClientMonitor extends TypedEventEmitter<ClientMonitorEvents> {
}

public sample(): ClientSample | undefined {
if (this._closed) return;
if (!this._joined) this.join();

const clientSample = this._sampler.createClientSample();
const timestamp = Date.now();
if (!clientSample) {
Expand All @@ -151,6 +165,31 @@ export class ClientMonitor extends TypedEventEmitter<ClientMonitorEvents> {
return clientSample;
}

public join(settings?: Pick<CustomCallEvent, 'attachments' | 'timestamp' | 'message'>): void {
if (this._joined) return;
this._joined = true;

this._sampler.addCustomCallEvent({
name: 'CLIENT_JOINED',
message: settings?.message ?? 'Client joined',
timestamp: settings?.timestamp ?? this.created,
attachments: settings?.attachments,
})
}

public leave(settings?: Pick<CustomCallEvent, 'attachments' | 'timestamp' | 'message'>): void {
if (!this._left) return;
this._left = true;

this._sampler.addCustomCallEvent({
name: 'CLIENT_LEFT',
message: settings?.message ?? 'Client left',
timestamp: settings?.timestamp ?? Date.now(),
attachments: settings?.attachments,
});
}


public setMarker(value?: string) {
this._sampler.setMarker(value);
}
Expand Down
13 changes: 10 additions & 3 deletions src/collectors/MediasoupStatsCollector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,9 @@ export function createMediasoupStatsCollector(config: MediasoupStatsCollectorCon
sfuStreamId: producer.id,
kind: producer.kind,
});
storage.bindTrackToSfu(producer.track.id, producer.id);
storage.pendingSfuBindings.set(producer.track.id, {
sfuStreamId: producer.id,
});
producer.track.onended = () => {
if (!producer.closed) {
pendingProducerBindings.set(producer.id, { producer, peerConnectionId });
Expand Down Expand Up @@ -181,7 +183,10 @@ export function createMediasoupStatsCollector(config: MediasoupStatsCollectorCon
});
consumer.observer.on('pause', pauseListener);
consumer.observer.on('resume', resumeListener);
storage.bindTrackToSfu(consumer.track.id, consumer.producerId, consumer.id);
storage.pendingSfuBindings.set(consumer.track.id, {
sfuStreamId: consumer.producerId,
sfuSinkId: consumer.id,
});
emitCallEvent({
name: 'CONSUMER_ADDED',
...eventBase
Expand Down Expand Up @@ -310,7 +315,9 @@ export function createMediasoupStatsCollector(config: MediasoupStatsCollectorCon
function adaptStorageMiddleware(storage: StatsStorage, next: (storage: StatsStorage) => void) {
for (const [producerId, { producer, peerConnectionId }] of Array.from(pendingProducerBindings.entries())) {
if (producer.track) {
storage.bindTrackToSfu(producer.track.id, producerId);
storage.pendingSfuBindings.set(producer.track.id, {
sfuStreamId: producer.id,
});
addTrack({
peerConnectionId,
direction: 'outbound',
Expand Down
1 change: 0 additions & 1 deletion src/detectors/CongestionDetector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,6 @@ export function createCongestionDetector(config: CongestionDetectorConfig & {
}

for (const [pcId, state] of Array.from(peerConnectionStates)) {
// console.warn("pc", pcId, state);
if (state.visited) {
state.visited = false;
} else {
Expand Down
6 changes: 1 addition & 5 deletions src/entries/OutboundTrackStats.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@ export function createOutboundTrackStats(peerConnection: PeerConnectionEntry, tr
},
set sfuStreamId(value: string | undefined) {
sfuStreamId = value;
for (const outboundRtp of iterator()) {
outboundRtp.sfuStreamId = value;
}
},

sendingBitrate: outboundRtps.reduce((acc, outboundRtp) => acc + (outboundRtp.sendingBitrate ?? 0), 0),
Expand All @@ -44,8 +41,7 @@ export function createOutboundTrackStats(peerConnection: PeerConnectionEntry, tr
result.remoteReceivedPackets = 0;

for (const outboundRtp of iterator()) {
outboundRtp.sfuStreamId = result.sfuStreamId;

outboundRtp.sfuStreamId = sfuStreamId;
result.sendingBitrate += outboundRtp.sendingBitrate ?? 0;
result.sentPackets += outboundRtp.sentPackets ?? 0;
result.remoteLostPackets += outboundRtp.getRemoteInboundRtp()?.lostPackets ?? 0;
Expand Down
22 changes: 15 additions & 7 deletions src/entries/PeerConnectionEntryManifest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import * as W3C from '../schema/W3cStatsIdentifiers'
import { TypedEventEmitter, TypedEvents } from "../utils/TypedEmitter";
import { IndexedMap } from "../utils/IndexedMap";
import { calculateAudioMOS, calculateVideoMOS } from "./UpdateFields";
import type { StatsStorage } from "./StatsStorage";

const SSRC_INDEX = 'ssrc';

Expand Down Expand Up @@ -101,10 +102,11 @@ export class PeerConnectionEntryManifest implements PeerConnectionEntry {
private readonly _iceServers = new Map<string, IceServerEntry>();

// helper fields
private _lastAvgRttInS = -1;
// private _lastAvgRttInS = -1;
private _visit: StatsVisitor;

public constructor(
public readonly storage: StatsStorage,
public readonly peerConnectionId: string,
public readonly label: string | undefined,
) {
Expand Down Expand Up @@ -303,7 +305,7 @@ export class PeerConnectionEntryManifest implements PeerConnectionEntry {
entry.receivingBitrate,
entry.fractionLoss,
entry.avgJitterBufferDelayInMs,
this._lastAvgRttInS * 1000.0,
(this.avgRttInS ?? 0) * 1000.0,
false,
false,
);
Expand All @@ -313,7 +315,7 @@ export class PeerConnectionEntryManifest implements PeerConnectionEntry {
entry.stats.frameWidth ?? 640,
entry.stats.frameHeight ?? 480,
entry.avgJitterBufferDelayInMs,
this._lastAvgRttInS * 1000.0,
(this.avgRttInS ?? 0) * 1000.0,
'vp8',
entry.stats.framesPerSecond ?? 30,
entry.expectedFrameRate ?? 30,
Expand Down Expand Up @@ -562,7 +564,7 @@ export class PeerConnectionEntryManifest implements PeerConnectionEntry {
roundTripTimesInS.push(currentRoundTripTime)
}
}
const avgRttInS = (roundTripTimesInS.length < 1 ? this._lastAvgRttInS : roundTripTimesInS.reduce((a, x) => a + x, 0) / roundTripTimesInS.length);
const avgRttInS = (roundTripTimesInS.length < 1 ? this.avgRttInS : Math.max(0, roundTripTimesInS.reduce((acc, rtt) => acc + rtt, 0) / roundTripTimesInS.length));

for (const outboundRtpEntry of this._outboundRtps.values()) {
if (outboundRtpEntry.stats.kind === 'audio') {
Expand All @@ -573,7 +575,7 @@ export class PeerConnectionEntryManifest implements PeerConnectionEntry {
this.deltaSentVideoBytes += outboundRtpEntry.sentBytes ?? 0;
}
this.deltaOutboundPacketsSent += outboundRtpEntry.sentPackets ?? 0;
outboundRtpEntry.updateStabilityScore(avgRttInS);
avgRttInS && outboundRtpEntry.updateStabilityScore(avgRttInS);
}
this.totalOutboundPacketsSent += this.deltaOutboundPacketsSent;
this.totalSentAudioBytes += this.deltaSentAudioBytes;
Expand Down Expand Up @@ -658,6 +660,9 @@ export class PeerConnectionEntryManifest implements PeerConnectionEntry {
}
return this._audioPlayouts.get(result.stats.playoutId);
},
get kind() {
return result.stats.kind;
}
}
return result;
}
Expand Down Expand Up @@ -715,13 +720,16 @@ export class PeerConnectionEntryManifest implements PeerConnectionEntry {
}
return remoteInboundRtps[0];
},
get kind() {
return result.stats.kind;
},

updateStabilityScore: (currentRttInS: number) => {
const remoteInb = result.getRemoteInboundRtp();
if (!remoteInb) return;
// Packet Jitter measured in seconds
// let's say we normalize it to a deviation of 100ms in a linear scale
const latencyFactor = 1.0 - Math.min(0.1, Math.abs(currentRttInS - this._lastAvgRttInS)) / 0.1
const latencyFactor = 1.0 - Math.min(0.1, Math.abs(currentRttInS - (this.avgRttInS ?? 0))) / 0.1
const sentPackets = Math.max(1, (result.sentPackets ?? 0));
const lostPackets = remoteInb.lostPackets ?? 0;
const deliveryFactor = 1.0 - ((lostPackets) / (lostPackets + sentPackets));
Expand All @@ -741,7 +749,7 @@ export class PeerConnectionEntryManifest implements PeerConnectionEntry {
totalScore += weight * score;
}
result.score = totalScore / counter;
}
},
}
return result;
}
Expand Down
4 changes: 3 additions & 1 deletion src/entries/StatsEntryInterfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ export interface InboundRtpEntry extends ReceivedRtpStreamEntry, StatsEntryAbs {
getRemoteOutboundRtp(): RemoteOutboundRtpEntry | undefined;
getAudioPlayout(): AudioPlayoutEntry | undefined;


readonly kind: "audio" | "video" | undefined;
}

/**
Expand All @@ -119,6 +119,8 @@ export interface OutboundRtpEntry extends SenderRtpStreamEntry, StatsEntryAbs {
getSender(): SenderEntry | undefined;
getRemoteInboundRtp(): RemoteInboundRtpEntry | undefined;

readonly kind: "audio" | "video" | undefined;

}


Expand Down
Loading

0 comments on commit f1698d6

Please sign in to comment.