Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix/topic alias mapping #176

Merged
merged 2 commits into from
Jul 6, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import static org.mqttbee.api.mqtt.mqtt5.message.publish.TopicAliasUsage.*;
import static org.mqttbee.mqtt.message.publish.MqttPublish.DEFAULT_TOPIC_ALIAS_USAGE;
import static org.mqttbee.mqtt.message.publish.MqttPublish.MESSAGE_EXPIRY_INTERVAL_INFINITY;

Expand Down Expand Up @@ -182,7 +183,8 @@ public Mqtt5PublishBuilder<P> correlationData(@Nullable final ByteBuffer correla

@NotNull
public Mqtt5PublishBuilder<P> topicAliasUsage(@NotNull final TopicAliasUsage topicAliasUsage) {
Preconditions.checkNotNull(topicAliasUsage);
Preconditions.checkArgument(
topicAliasUsage == MUST_NOT || topicAliasUsage == MAY || topicAliasUsage == MAY_OVERWRITE);
this.topicAliasUsage = topicAliasUsage;
return this;
}
Expand Down
11 changes: 7 additions & 4 deletions src/main/java/org/mqttbee/mqtt/MqttClientConnectionData.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.mqttbee.api.mqtt.mqtt5.auth.Mqtt5EnhancedAuthProvider;
import org.mqttbee.mqtt.datatypes.MqttTopicImpl;
import org.mqttbee.mqtt.datatypes.MqttVariableByteInteger;
import org.mqttbee.util.collections.IntMap;

import java.util.Optional;

Expand All @@ -37,7 +38,8 @@ public class MqttClientConnectionData implements Mqtt5ClientConnectionData, Mqtt
private int keepAlive;
private long sessionExpiryInterval;
private final int receiveMaximum;
private final MqttTopicImpl[] topicAliasMapping;
private final int topicAliasMaximum;
private final IntMap<MqttTopicImpl> topicAliasMapping;
private final int maximumPacketSize;
private final int subscriptionIdentifierMaximum;
private final Mqtt5EnhancedAuthProvider enhancedAuthProvider;
Expand All @@ -56,7 +58,8 @@ public MqttClientConnectionData(
this.keepAlive = keepAlive;
this.sessionExpiryInterval = sessionExpiryInterval;
this.receiveMaximum = receiveMaximum;
this.topicAliasMapping = (topicAliasMaximum == 0) ? null : new MqttTopicImpl[topicAliasMaximum];
this.topicAliasMaximum = topicAliasMaximum;
this.topicAliasMapping = (topicAliasMaximum == 0) ? null : IntMap.range(1, topicAliasMaximum);
this.maximumPacketSize = maximumPacketSize;
this.subscriptionIdentifierMaximum =
MqttVariableByteInteger.FOUR_BYTES_MAX_VALUE; // TODO CONNECT + CONNACK user properties
Expand Down Expand Up @@ -92,11 +95,11 @@ public int getReceiveMaximum() {

@Override
public int getTopicAliasMaximum() {
return (topicAliasMapping == null) ? 0 : topicAliasMapping.length;
return topicAliasMaximum;
}

@Nullable
public MqttTopicImpl[] getTopicAliasMapping() {
public IntMap<MqttTopicImpl> getTopicAliasMapping() {
return topicAliasMapping;
}

Expand Down
5 changes: 3 additions & 2 deletions src/main/java/org/mqttbee/mqtt/MqttServerConnectionData.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,10 @@ public MqttServerConnectionData(
final int receiveMaximum, final int topicAliasMaximum, final int maximumPacketSize,
final MqttQos maximumQos, final boolean isRetainAvailable, final boolean isWildcardSubscriptionAvailable,
final boolean isSubscriptionIdentifierAvailable, final boolean isSharedSubscriptionAvailable) {

this.receiveMaximum = receiveMaximum;
this.maximumPacketSize = maximumPacketSize;
this.topicAliasMapping = topicAliasMaximum == 0 ? null : new MqttTopicAliasMapping(topicAliasMaximum);
this.topicAliasMapping = (topicAliasMaximum == 0) ? null : new MqttTopicAliasMapping(topicAliasMaximum);
this.maximumQos = maximumQos;
this.isRetainAvailable = isRetainAvailable;
this.isWildcardSubscriptionAvailable = isWildcardSubscriptionAvailable;
Expand All @@ -73,7 +74,7 @@ public int getReceiveMaximum() {

@Override
public int getTopicAliasMaximum() {
return (topicAliasMapping == null) ? 0 : topicAliasMapping.size();
return (topicAliasMapping == null) ? 0 : topicAliasMapping.getTopicAliasMaximum();
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.mqttbee.mqtt.message.publish.MqttStatefulPublish;
import org.mqttbee.mqtt.netty.ChannelAttributes;
import org.mqttbee.util.ByteBufferUtil;
import org.mqttbee.util.collections.IntMap;

import javax.inject.Inject;
import javax.inject.Singleton;
Expand Down Expand Up @@ -183,20 +184,20 @@ public MqttStatefulPublish decode(

boolean isNewTopicAlias = false;
if (topicAlias != DEFAULT_NO_TOPIC_ALIAS) {
final MqttTopicImpl[] topicAliasMapping = clientConnectionData.getTopicAliasMapping();
if ((topicAliasMapping == null) || (topicAlias > topicAliasMapping.length)) {
final IntMap<MqttTopicImpl> topicAliasMapping = clientConnectionData.getTopicAliasMapping();
if ((topicAliasMapping == null) || (topicAlias > clientConnectionData.getTopicAliasMaximum())) {
throw new MqttDecoderException(
Mqtt5DisconnectReasonCode.TOPIC_ALIAS_INVALID,
"topic alias must not exceed topic alias maximum");
}
if (topic == null) {
topic = topicAliasMapping[topicAlias - 1];
topic = topicAliasMapping.get(topicAlias);
if (topic == null) {
throw new MqttDecoderException(
Mqtt5DisconnectReasonCode.TOPIC_ALIAS_INVALID, "topic alias has no mapping");
}
} else {
topicAliasMapping[topicAlias - 1] = topic;
topicAliasMapping.put(topicAlias, topic);
isNewTopicAlias = true;
}
} else if (topic == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,32 +21,35 @@
import org.mqttbee.api.mqtt.mqtt5.message.publish.TopicAliasUsage;
import org.mqttbee.mqtt.datatypes.MqttTopicImpl;

import javax.annotation.concurrent.NotThreadSafe;
import java.util.HashMap;
import java.util.Random;

/**
* @author Silvio Giebl
*/
@NotThreadSafe
public class MqttTopicAliasMapping {

private static final Random random = new Random();

private final int size;
private final int topicAliasMaximum;
private final HashMap<String, Integer> hashMap;
private int nextTopicAlias;

public MqttTopicAliasMapping(final int size) {
this.size = size;
hashMap = new HashMap<>(size);
public MqttTopicAliasMapping(final int topicAliasMaximum) {
this.topicAliasMaximum = topicAliasMaximum;
hashMap = new HashMap<>(topicAliasMaximum);
nextTopicAlias = 1;
}

public int set(@NotNull final MqttTopicImpl topic, @NotNull final TopicAliasUsage topicAliasUsage) {
int topicAlias = MqttStatefulPublish.DEFAULT_NO_TOPIC_ALIAS;
if (topicAliasUsage != TopicAliasUsage.MUST_NOT) {
if (nextTopicAlias == size) {
if (nextTopicAlias > topicAliasMaximum) {
if (topicAliasUsage == TopicAliasUsage.MAY_OVERWRITE) {
topicAlias = random.nextInt(size) + 1;
topicAlias = random.nextInt(topicAliasMaximum) + 1;
hashMap.values().remove(topicAlias);
hashMap.put(topic.toString(), topicAlias);
}
} else {
Expand All @@ -63,8 +66,8 @@ public int get(@NotNull final MqttTopicImpl topic) {
return (topicAlias == null) ? MqttStatefulPublish.DEFAULT_NO_TOPIC_ALIAS : topicAlias;
}

public int size() {
return size;
public int getTopicAliasMaximum() {
return topicAliasMaximum;
}

}