Skip to content

Commit

Permalink
Rework event notification queuing (#925)
Browse files Browse the repository at this point in the history
  • Loading branch information
Supereg authored Jan 16, 2022
1 parent 2a3201c commit 481c7cb
Showing 1 changed file with 57 additions and 34 deletions.
91 changes: 57 additions & 34 deletions src/lib/util/eventedhttp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -175,15 +175,15 @@ export class EventedHTTPServer extends EventEmitter {
}

/**
* Send a even notification for given characteristic and changed value to all connected clients.
* Send an event notification for given characteristic and changed value to all connected clients.
* If {@param originator} is specified, the given {@link HAPConnection} will be excluded from the broadcast.
*
* @param aid - The accessory id of the updated characteristic.
* @param iid - The instance id of the updated characteristic.
* @param value - The newly set value of the characteristic.
* @param originator - If specified, the connection will not get a event message.
* @param originator - If specified, the connection will not get an event message.
* @param immediateDelivery - The HAP spec requires some characteristics to be delivery immediately.
* Namely for the {@link ButtonEvent} and the {@link ProgrammableSwitchEvent} characteristics.
* Namely, for the {@link ButtonEvent} and the {@link ProgrammableSwitchEvent} characteristics.
*/
public broadcastEvent(aid: number, iid: number, value: Nullable<CharacteristicValue>, originator?: HAPConnection, immediateDelivery?: boolean): void {
for (const connection of this.connections) {
Expand Down Expand Up @@ -265,7 +265,7 @@ export const enum HAPConnectionState {
CONNECTING, // initial state, setup is going on
FULLY_SET_UP, // internal http server is running and connection is established
AUTHENTICATED, // encryption is set up
// above signals are represent a alive connection
// above signals represent an alive connection

// below states are considered "closed or soon closed"
TO_BE_TEARED_DOWN, // when in this state, connection should be closed down after response was sent out
Expand Down Expand Up @@ -323,7 +323,7 @@ export class HAPConnection extends EventEmitter {

private registeredEvents: Set<EventName> = new Set();
private eventsTimer?: NodeJS.Timeout;
private readonly queuedEvents: Map<EventName, CharacteristicEventNotification> = new Map();
private readonly queuedEvents: CharacteristicEventNotification[] = [];
// queue of unencrypted event data waiting to be sent until after an in-progress HTTP response is being written
private readonly pendingEventData: Buffer[] = [];

Expand All @@ -348,7 +348,7 @@ export class HAPConnection extends EventEmitter {
this.tcpSocket.on("error", this.onTCPSocketError.bind(this));
this.tcpSocket.setNoDelay(true); // disable Nagle algorithm
// "HAP accessory servers must not use keepalive messages, which periodically wake up iOS devices".
// Thus we don't configure any tcp keepalive
// Thus, we don't configure any tcp keepalive

// create our internal HTTP server for this connection that we will proxy data to and from
this.internalHttpServer = http.createServer();
Expand Down Expand Up @@ -409,28 +409,46 @@ export class HAPConnection extends EventEmitter {
return;
}

const event: CharacteristicEventNotification = {
aid: aid,
iid: iid,
value: value,
};

if (immediateDelivery) {
// some characteristics are required to deliver notifications immediately
this.writeEventNotification({
characteristics: [{
aid: aid,
iid: iid,
value: value,
}],
});
} else {
// TODO should a new event not remove a previous event (to support censor open -> censor closed :thinking:)
// any only remove previous events if the same value was set?
this.queuedEvents.set(eventName, {
aid: aid,
iid: iid,
value: value,
});
if (!this.handlingRequest && !this.eventsTimer) { // if we are handling a request or there is already a timer running we just add it in the queue
this.eventsTimer = setTimeout(this.handleEventsTimeout.bind(this), 250);
this.eventsTimer.unref();
// we will flush all other events too, on that occasion.
this.queuedEvents.push(event);

if (this.eventsTimer) {
clearTimeout(this.eventsTimer);
}
this.handleEventsTimeout();
return;
}

// we search the list of queued events in reverse order.
// if the last element with the same aid and iid has the same value we don't want to send the event notification twice.
// BUT, we do not want to override previous event notifications which have a different value. Automations must be executed!
for (let i = this.queuedEvents.length - 1; i >= 0; i--) {
const queuedEvent = this.queuedEvents[i];
if (queuedEvent.aid === aid && queuedEvent.iid === iid) {
if (queuedEvent.value === value) {
return; // the same event was already queued. do not add it again!
}

break; // we break in any case
}
}

this.queuedEvents.push(event);

// if we are handling a request or there is already a timer running we just add it in the queue.
// remember: we flush the event queue after we send out the response.
if (!this.handlingRequest && !this.eventsTimer) {
this.eventsTimer = setTimeout(this.handleEventsTimeout.bind(this), 250);
this.eventsTimer.unref();
}
}

private handleEventsTimeout(): void {
Expand All @@ -452,25 +470,30 @@ export class HAPConnection extends EventEmitter {
}

private writeQueuedEventNotifications(): void {
if (this.queuedEvents.size === 0 || this.eventsTimer) {
if (this.queuedEvents.length === 0 || this.eventsTimer) {
return; // don't send empty event notifications or if there is a timeout running
}

const eventData: EventNotification = { characteristics: [] };
for (const [eventName, characteristic] of this.queuedEvents) {
if (!this.registeredEvents.has(eventName)) { // client unregistered events in the mean time
continue;
const eventData: EventNotification = {
characteristics: [],
};

for (const queuedEvent of this.queuedEvents) {
if (!this.registeredEvents.has(queuedEvent.aid + "." + queuedEvent.iid)) {
continue; // client unregistered that event in the meantime
}
eventData.characteristics.push(characteristic);

eventData.characteristics.push(queuedEvent);
}
this.queuedEvents.clear();

this.queuedEvents.splice(0, this.queuedEvents.length);

this.writeEventNotification(eventData);
}

/**
* This will create an EVENT/1.0 notification header with the provided event notification.
* If currently a HTTP request is in progress the assembled packet will be
* If currently an HTTP request is in progress the assembled packet will be
* added to the pending events list.
*
* @param notification - The event which should be sent out
Expand Down Expand Up @@ -524,15 +547,15 @@ export class HAPConnection extends EventEmitter {
if (this.encryption && this.encryption.accessoryToControllerKey.length > 0 && this.encryption.controllerToAccessoryCount > 0) {
return hapCrypto.layerEncrypt(data, this.encryption);
}
return data; // otherwise we don't encrypt and return plaintext
return data; // otherwise, we don't encrypt and return plaintext
}

private decrypt(data: Buffer): Buffer {
if (this.encryption && this.encryption.controllerToAccessoryKey.length > 0) {
// below call may throw an error if decryption failed
return hapCrypto.layerDecrypt(data, this.encryption);
}
return data; // otherwise we don't decrypt and return plaintext
return data; // otherwise, we don't decrypt and return plaintext
}

private onHttpServerListening() {
Expand Down

0 comments on commit 481c7cb

Please sign in to comment.