Skip to content

Commit

Permalink
Addressed timestamp rollover/range issue
Browse files Browse the repository at this point in the history
  • Loading branch information
mondain committed Oct 13, 2022
1 parent c568879 commit b591501
Show file tree
Hide file tree
Showing 16 changed files with 64 additions and 44 deletions.
2 changes: 1 addition & 1 deletion client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<groupId>org.red5</groupId>
<artifactId>red5-parent</artifactId>
<version>1.3.5</version>
<version>1.3.6</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>red5-client</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion client/src/main/java/org/red5/client/Red5Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public final class Red5Client {
/**
* Current server version with revision
*/
public static final String VERSION = "Red5 Client 1.3.5";
public static final String VERSION = "Red5 Client 1.3.6";

/**
* Create a new Red5Client object using the connection local to the current thread A bit of magic that lets you access the red5 scope
Expand Down
4 changes: 2 additions & 2 deletions common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<groupId>org.red5</groupId>
<artifactId>red5-parent</artifactId>
<version>1.3.5</version>
<version>1.3.6</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>red5-server-common</artifactId>
Expand Down Expand Up @@ -124,7 +124,7 @@
<dependency>
<groupId>net.engio</groupId>
<artifactId>mbassador</artifactId>
<version>1.3.5</version>
<version>1.3.6</version>
</dependency> -->
<dependency>
<groupId>junit</groupId>
Expand Down
4 changes: 2 additions & 2 deletions common/src/main/java/org/red5/server/api/Red5.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@ public final class Red5 {
/**
* Server version with revision
*/
public static final String VERSION = "Red5 Server 1.3.5";
public static final String VERSION = "Red5 Server 1.3.6";

/**
* Server version for fmsVer requests
*/
public static final String FMS_VERSION = "RED5/1,3,5,0";
public static final String FMS_VERSION = "RED5/1,3,6,0";

/**
* Server capabilities
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ public void setConnection(IStreamCapableConnection conn) {
* @return Stream capable connection object
*/
public IStreamCapableConnection getConnection() {
return conn.get();
// prevent NPE on first call
return conn != null ? conn.get() : null;
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ public void execute(ISchedulingService service) {
sendStreamNotFoundStatus(item);
throw new StreamNotFoundException(itemName);
}
//continue with common play processes (live and vod)
// continue with common play processes (live and vod)
if (sendNotifications) {
if (withReset) {
sendReset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -790,10 +790,17 @@ public long getCreationTime() {
/** {@inheritDoc} */
public int getCurrentTimestamp() {
int lastMessageTs = engine.getLastMessageTimestamp();
/* XXX(paul) I think this is incorrect
if (lastMessageTs >= 0) {
return 0;
}
return lastMessageTs;
*/
// XXX(paul) this seems to be what the correct logic would be
if (lastMessageTs >= 0) {
return lastMessageTs;
}
return 0;
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,9 @@ public void pushMessage(IPipe pipe, IMessage message) {
int eventTime = msg.getTimestamp();
log.debug("Message timestamp: {}", eventTime);
if (eventTime < 0) {
//eventTime += Integer.MIN_VALUE;
//log.debug("Message has negative timestamp, applying {} offset: {}", Integer.MIN_VALUE, eventTime);
// everyone seems to prefer positive timestamps
eventTime += (eventTime * -1);
log.debug("Message has negative timestamp, flipping it to positive: {}", Integer.MIN_VALUE, eventTime);
// handle roll-over -1 is top of the range when keeping things positive
eventTime ^= Integer.MIN_VALUE;
log.debug("Message has negative timestamp, applying {} ts: {}", Integer.MIN_VALUE, eventTime);
msg.setTimestamp(eventTime);
}
// get the data type (AMF)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,27 @@ public class TestConnectionConsumer {

@Before
public void setUp() {
connection = new RTMPMinaConnection();
channel = new Channel(connection, 1);
connection = new RTMPMinaConnection() {

@Override
public Channel getChannel(int channelId) {
Channel channel = new Channel(this, channelId);
return channel;
}

};
channel = connection.getChannel(1);
underTest = new ConnectionConsumer(connection, channel, channel, channel);
}

@Test
public void testNegativeTimestampsAreRolledOver() {
log.debug("\n testNegativeTimestampsAreRolledOver");
// https://www.rfc-editor.org/rfc/rfc1982
log.debug("\n max: {} min: {} 0:{} -1:{}", Integer.toHexString(Integer.MAX_VALUE), Integer.toHexString(Integer.MIN_VALUE), Integer.toHexString(0), Integer.toHexString(-1));

VideoData videoData1 = new VideoData();
videoData1.setTimestamp(-1);
videoData1.setTimestamp(-1); // maximum timestamp value 0xffffffff expect 2147483647
underTest.pushMessage(null, RTMPMessage.build(videoData1));

assertEquals(Integer.MAX_VALUE, videoData1.getTimestamp());
Expand Down
2 changes: 1 addition & 1 deletion io/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<groupId>org.red5</groupId>
<artifactId>red5-parent</artifactId>
<version>1.3.5</version>
<version>1.3.6</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>red5-io</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<name>Red5</name>
<description>The Red5 server</description>
<groupId>org.red5</groupId>
<version>1.3.5</version>
<version>1.3.6</version>
<url>https://github.com/Red5/red5-server</url>
<inceptionYear>2005</inceptionYear>
<organization>
Expand Down
2 changes: 1 addition & 1 deletion server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<groupId>org.red5</groupId>
<artifactId>red5-parent</artifactId>
<version>1.3.5</version>
<version>1.3.6</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>red5-server</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public boolean addWebSocketScope(WebSocketScope webSocketScope) {
try {
// ping connected websocket
if (wsConn.isConnected()) {
log.debug("pinging ws: {} on scope: {}", wsConn.getWsSessionId(), sName);
log.trace("pinging ws: {} on scope: {}", wsConn.getWsSessionId(), sName);
try {
wsConn.sendPing(PING_BYTES);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ public class WsHttpUpgradeHandler implements InternalHttpUpgradeHandler {

private Logger log = LoggerFactory.getLogger(WsHttpUpgradeHandler.class); // must not be static

private final boolean isTrace = log.isTraceEnabled();

@SuppressWarnings("unused")
private final boolean isDebug = log.isDebugEnabled();

private static final StringManager sm = StringManager.getManager(WsHttpUpgradeHandler.class);
Expand Down Expand Up @@ -104,6 +107,7 @@ public void preInit(Endpoint ep, EndpointConfig endpointConfig, DefaultWsServerC
}
}

@SuppressWarnings("deprecation")
@Override
public void init(WebConnection connection) {
if (ep == null) {
Expand All @@ -125,52 +129,52 @@ public void init(WebConnection connection) {
try {
// instance a remote endpoint server
wsRemoteEndpointServer = new WsRemoteEndpointImplServer(socketWrapper, upgradeInfo);
if (isDebug) {
log.debug("New connection 1 {}", wsRemoteEndpointServer);
if (isTrace) {
log.trace("New connection 1 {}", wsRemoteEndpointServer);
}
if (isDebug) {
log.debug("WS session pre-ctor - wsRemoteEndpointServer: {}, webSocketContainer: {}, handshakeRequest.getRequestURI: {}, handshakeRequest.getParameterMap: {}, handshakeRequest.getQueryString: {}, handshakeRequest.getUserPrincipal: {}, httpSessionId: {}, negotiatedExtensions: {}, subProtocol: {}, pathParameters: {}, secure: {}, endpointConfig: {}", wsRemoteEndpointServer, webSocketContainer,
if (isTrace) {
log.trace("WS session pre-ctor - wsRemoteEndpointServer: {}, webSocketContainer: {}, handshakeRequest.getRequestURI: {}, handshakeRequest.getParameterMap: {}, handshakeRequest.getQueryString: {}, handshakeRequest.getUserPrincipal: {}, httpSessionId: {}, negotiatedExtensions: {}, subProtocol: {}, pathParameters: {}, secure: {}, endpointConfig: {}", wsRemoteEndpointServer, webSocketContainer,
handshakeRequest.getRequestURI(), handshakeRequest.getParameterMap(), handshakeRequest.getQueryString(), handshakeRequest.getUserPrincipal(), httpSessionId, negotiatedExtensions, subProtocol, pathParameters, secure, endpointConfig);
}
// deprecated version
wsSession = new WsSession(ep, wsRemoteEndpointServer, webSocketContainer, handshakeRequest.getRequestURI(), handshakeRequest.getParameterMap(), handshakeRequest.getQueryString(), handshakeRequest.getUserPrincipal(), httpSessionId, negotiatedExtensions, subProtocol, pathParameters, secure, endpointConfig);
// newest ctor
//wsSession = new WsSession(wsRemoteEndpointServer, webSocketContainer, handshakeRequest.getRequestURI(), handshakeRequest.getParameterMap(), handshakeRequest.getQueryString(), handshakeRequest.getUserPrincipal(), httpSessionId, negotiatedExtensions, subProtocol, pathParameters, secure, endpointConfig);
if (isDebug) {
log.debug("New connection 2 {}", wsSession);
if (isTrace) {
log.trace("New connection 2 {}", wsSession);
}
wsFrame = new WsFrameServer(socketWrapper, upgradeInfo, wsSession, transformation, applicationClassLoader);
if (isDebug) {
log.debug("New connection 3 {}", wsFrame);
if (isTrace) {
log.trace("New connection 3 {}", wsFrame);
}
// WsFrame adds the necessary final transformations. Copy the completed transformation chain to the remote end point.
wsRemoteEndpointServer.setTransformation(wsFrame.getTransformation());
if (isDebug) {
log.debug("New connection 4");
if (isTrace) {
log.trace("New connection 4");
}
// get the ws scope manager from user props
WebSocketScopeManager manager = (WebSocketScopeManager) endpointConfig.getUserProperties().get(WSConstants.WS_MANAGER);
if (isDebug) {
log.debug("New connection 5");
if (isTrace) {
log.trace("New connection 5");
}
// get ws scope from user props
WebSocketScope scope = (WebSocketScope) endpointConfig.getUserProperties().get(WSConstants.WS_SCOPE);
if (isDebug) {
log.debug("New connection 6 - Scope: {} WS session: {}", scope, wsSession);
if (isTrace) {
log.trace("New connection 6 - Scope: {} WS session: {}", scope, wsSession);
}
// create a ws connection instance
WebSocketConnection conn = new WebSocketConnection(scope, wsSession);
// in debug check since WebSocketConnection.toString is a tiny bit expensive
if (isDebug) {
log.debug("New connection 7: {}", conn);
if (isTrace) {
log.trace("New connection 7: {}", conn);
}
// set ip and port
conn.setAttribute(WSConstants.WS_HEADER_REMOTE_IP, socketWrapper.getRemoteAddr());
conn.setAttribute(WSConstants.WS_HEADER_REMOTE_PORT, socketWrapper.getRemotePort());
// add the request headers
conn.setHeaders(handshakeRequest.getHeaders());
if (isDebug) {
log.debug("New connection 8: {}", conn);
if (isTrace) {
log.trace("New connection 8: {}", conn);
}
// add the connection to the user props
endpointConfig.getUserProperties().put(WSConstants.WS_CONNECTION, conn);
Expand All @@ -180,14 +184,14 @@ public void init(WebConnection connection) {
conn.setConnected();
// fire endpoint handler
ep.onOpen(wsSession, endpointConfig);
if (isDebug) {
log.debug("New connection 9: endpoint opened");
if (isTrace) {
log.trace("New connection 9: endpoint opened");
}
// get the endpoint path to use in registration since we're a server
String path = ((ServerEndpointConfig) endpointConfig).getPath();
webSocketContainer.registerSession(path, wsSession);
if (isDebug) {
log.debug("New connection 10: session registered");
if (isTrace) {
log.trace("New connection 10: session registered");
}
// add the connection to the manager
manager.addConnection(conn);
Expand Down
1 change: 0 additions & 1 deletion server/src/test/java/org/red5/server/scope/ScopeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.FixMethodOrder;
import org.junit.Test;
Expand Down
2 changes: 1 addition & 1 deletion service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<groupId>org.red5</groupId>
<artifactId>red5-parent</artifactId>
<version>1.3.5</version>
<version>1.3.6</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>red5-service</artifactId>
Expand Down

0 comments on commit b591501

Please sign in to comment.