Skip to content

Commit

Permalink
Add first minimal support of Observe Request (SendSync Only)
Browse files Browse the repository at this point in the history
  • Loading branch information
sbernard31 committed Jan 20, 2023
1 parent 21134c6 commit 4c1913b
Show file tree
Hide file tree
Showing 7 changed files with 209 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,35 @@
package org.eclipse.leshan.transport.javacoap.endpoint;

import java.net.URI;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;

import org.eclipse.leshan.core.endpoint.Protocol;
import org.eclipse.leshan.core.observation.Observation;
import org.eclipse.leshan.core.observation.SingleObservation;
import org.eclipse.leshan.core.request.DownlinkRequest;
import org.eclipse.leshan.core.request.ObserveRequest;
import org.eclipse.leshan.core.request.exception.SendFailedException;
import org.eclipse.leshan.core.response.ErrorCallback;
import org.eclipse.leshan.core.response.LwM2mResponse;
import org.eclipse.leshan.core.response.ObserveResponse;
import org.eclipse.leshan.core.response.ResponseCallback;
import org.eclipse.leshan.server.endpoint.LwM2mServerEndpoint;
import org.eclipse.leshan.server.endpoint.ServerEndpointToolbox;
import org.eclipse.leshan.server.observation.LwM2mNotificationReceiver;
import org.eclipse.leshan.server.profile.ClientProfile;
import org.eclipse.leshan.server.registration.RegistrationStore;
import org.eclipse.leshan.server.request.LowerLayerConfig;
import org.eclipse.leshan.transport.javacoap.observation.ObservationUtil;

import com.mbed.coap.client.CoapClient;
import com.mbed.coap.client.CoapClientBuilder;
import com.mbed.coap.client.ObservationConsumer;
import com.mbed.coap.exception.CoapException;
import com.mbed.coap.packet.CoapRequest;
import com.mbed.coap.packet.CoapResponse;
import com.mbed.coap.packet.Opaque;
import com.mbed.coap.server.CoapServer;

public class JavaCoapServerEndpoint implements LwM2mServerEndpoint {
Expand All @@ -42,13 +53,19 @@ public class JavaCoapServerEndpoint implements LwM2mServerEndpoint {
private final CoapServer coapServer;
private final ServerCoapMessageTranslator translator;
private final ServerEndpointToolbox toolbox;
private final LwM2mNotificationReceiver notificationReceiver;
private final RegistrationStore registrationStore;

public JavaCoapServerEndpoint(URI endpointUri, CoapServer coapServer, ServerCoapMessageTranslator translator,
ServerEndpointToolbox toolbox) {
ServerEndpointToolbox toolbox, LwM2mNotificationReceiver notificationReceiver,
RegistrationStore registrationStore) {
this.endpointUri = endpointUri;
this.coapServer = coapServer;
this.translator = translator;
this.toolbox = toolbox;
this.notificationReceiver = notificationReceiver;
this.registrationStore = registrationStore;

}

@Override
Expand All @@ -65,46 +82,96 @@ public URI getURI() {
public <T extends LwM2mResponse> T send(ClientProfile destination, DownlinkRequest<T> request,
LowerLayerConfig lowerLayerConfig, long timeoutInMs) throws InterruptedException {

final CoapRequest coapRequest = translator.createCoapRequest(destination, request, toolbox);
// Create Coap Request to send
CoapRequest coapRequest = translator.createCoapRequest(destination, request, toolbox);

// create a Coap Client to send request
// Create a Coap Client to send request
CoapClient coapClient = CoapClientBuilder.clientFor(destination.getIdentity().getPeerAddress(), coapServer);

// Send CoAP request synchronously
CoapResponse coapResponse = null;
try {
coapResponse = coapClient.sendSync(coapRequest);
// Handle special case of ObserveRequest
if (request instanceof ObserveRequest) {
// TODO HACK as we can not get token from coapresponse.
Opaque token = translator.getTokenGenerator().createToken();
final CoapRequest newCoapRequest = coapRequest.token(token);

// Add Callback to Handle notification
// TODO HACK we don't use sendSync because of observe Handling
CompletableFuture<CoapResponse> differedCoapResponse = coapClient.send(newCoapRequest);
differedCoapResponse.thenAccept(r -> ObservationConsumer.consumeFrom(r.next, notification -> {
// Handle notification
ObserveResponse lwm2mResponse = null;
try {
// create LWM2M response
lwm2mResponse = (ObserveResponse) translator.createLwM2mResponse(destination, request,
coapRequest, notification, toolbox, token);
SingleObservation observation = lwm2mResponse.getObservation();

// check if we have observe relation in store for this notification
Observation observeRelation = registrationStore.getObservation(destination.getRegistrationId(),
observation.getId());
if (observeRelation != null) {
// we have an observe relation notify upper layer
notificationReceiver.onNotification(lwm2mResponse.getObservation(), destination,
lwm2mResponse);
return true;
} else {
// we haven't observe relation so stop this observation.
return false;
}
} catch (Exception e) {
if (lwm2mResponse != null) {
notificationReceiver.onError(lwm2mResponse.getObservation(), destination, e);
} else {
notificationReceiver
.onError(ObservationUtil.createSingleObservation(destination.getRegistrationId(),
(ObserveRequest) request, token, null), destination, e);
}
return false;
}

}));

// wait synchronously for CoAP response;
CoapResponse coapResponse = await(differedCoapResponse);

// translate CoAP response into LWM2M response
T lwm2mResponse = translator.createLwM2mResponse(destination, request, coapRequest, coapResponse,
toolbox, token);

// Add Observation to the store if relation is established
if (lwm2mResponse.isSuccess()) {
ObserveResponse observeResponse = (ObserveResponse) lwm2mResponse;
// TODO should we handle conflict ?
Collection<Observation> previousRelation = registrationStore
.addObservation(destination.getRegistrationId(), observeResponse.getObservation(), false);
if (!previousRelation.isEmpty()) {
// TODO log that a relation is override.
}

// notify upper layer that new relation is established
notificationReceiver.newObservation(observeResponse.getObservation(),
destination.getRegistration());
}

return lwm2mResponse;
} else {
// Common use case : Send CoAP Request
CoapResponse coapResponse = coapClient.sendSync(coapRequest);
// translate CoAP response into LWM2M response
return translator.createLwM2mResponse(destination, request, coapRequest, coapResponse, toolbox, null);
}
} catch (CoapException e) {
throw new IllegalStateException("Unable to send request");
}

return translator.createLwM2mResponse(destination, request, coapRequest, coapResponse, toolbox);

// TODO send request using code like this ?
// from
// https://github.com/open-coap/java-coap/blob/42032086dca3bf0482d3a4461d0431c9502fcf98/example-client/src/main/java/com/mbed/coap/cli/CoapCli.java#L112-L139

// InetSocketAddress destination = new InetSocketAddress(uri.getHost(), uri.getPort());
// CoapClient cli = CoapClientBuilder.clientFor(destination, cliServer);
//
// Thread.sleep(200);
//
// String uriPath = uri.getPath().isEmpty() ? CoapConstants.WELL_KNOWN_CORE : uri.getPath();
// try {
// CoapResponse resp = cli.sendSync(CoapRequest.of(destination, Method.valueOf(method), uriPath)
// .query(uri.getQuery() == null ? "" : uri.getQuery())
// .token(System.currentTimeMillis() % 0xFFFF)
// .proxy(proxyUri)
// .blockSize(blockSize)
// .payload(payload)
// );
}

@Override
public <T extends LwM2mResponse> void send(ClientProfile destination, DownlinkRequest<T> request,
ResponseCallback<T> responseCallback, ErrorCallback errorCallback, LowerLayerConfig lowerLayerConfig,
long timeoutInMs) {
final CoapRequest coapRequest = translator.createCoapRequest(destination, request, toolbox);
CoapRequest coapRequest = translator.createCoapRequest(destination, request, toolbox);

// create a Coap Client to send request
CoapClient coapClient = CoapClientBuilder.clientFor(destination.getIdentity().getPeerAddress(), coapServer);
Expand All @@ -119,11 +186,25 @@ public <T extends LwM2mResponse> void send(ClientProfile destination, DownlinkRe
// Handle CoAP Response
.thenAccept((coapResponse) -> {
T lwM2mResponse = translator.createLwM2mResponse(destination, request, coapRequest, coapResponse,
toolbox);
toolbox, null);
responseCallback.onResponse(lwM2mResponse);
});
}

// TODO this is a copy/past from com.mbed.coap.client.CoapClient.await(CompletableFuture<CoapResponse>) we should
// find a better way.
private static CoapResponse await(CompletableFuture<CoapResponse> future) throws CoapException {
try {
return future.join();
} catch (CompletionException ex) {
if (ex.getCause() instanceof CoapException) {
throw (CoapException) ex.getCause();
} else {
throw new CoapException(ex.getCause());
}
}
}

@Override
public void cancelRequests(String sessionID) {
// TODO not implemented yet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public JavaCoapServerEndpointsProvider(int coapPort) {
}

@Override
public void createEndpoints(UplinkRequestReceiver requestReceiver, LwM2mNotificationReceiver observationService,
public void createEndpoints(UplinkRequestReceiver requestReceiver, LwM2mNotificationReceiver notificationReceiver,
ServerEndpointToolbox toolbox, ServerSecurityInfo serverSecurityInfo, LeshanServer server) {

// TODO we should get endpoint URI dynamically in Resources
Expand All @@ -65,7 +65,8 @@ public void createEndpoints(UplinkRequestReceiver requestReceiver, LwM2mNotifica
.build();
coapServer = CoapServer.builder().transport(coapPort).route(resources).build();

lwm2mEndpoint = new JavaCoapServerEndpoint(endpointURI, coapServer, new ServerCoapMessageTranslator(), toolbox);
lwm2mEndpoint = new JavaCoapServerEndpoint(endpointURI, coapServer, new ServerCoapMessageTranslator(), toolbox,
notificationReceiver, server.getRegistrationStore());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import com.mbed.coap.packet.CoapRequest;
import com.mbed.coap.packet.CoapResponse;
import com.mbed.coap.packet.Opaque;

public class ServerCoapMessageTranslator {

Expand All @@ -49,10 +50,12 @@ public CoapRequest createCoapRequest(ClientProfile clientProfile,
}

public <T extends LwM2mResponse> T createLwM2mResponse(ClientProfile clientProfile, DownlinkRequest<T> lwm2mRequest,
CoapRequest coapRequest, CoapResponse coapResponse, ServerEndpointToolbox toolbox) {
CoapRequest coapRequest, CoapResponse coapResponse, ServerEndpointToolbox toolbox,
/* TODO HACK */ Opaque token) {

LwM2mResponseBuilder<T> builder = new LwM2mResponseBuilder<T>(coapRequest, coapResponse,
clientProfile.getEndpoint(), clientProfile.getModel(), toolbox.getDecoder(), toolbox.getLinkParser());
clientProfile.getEndpoint(), clientProfile.getModel(), toolbox.getDecoder(), toolbox.getLinkParser(),
clientProfile.getRegistrationId(), token);
lwm2mRequest.accept(builder);
return builder.getResponse();
}
Expand All @@ -72,4 +75,9 @@ public AbstractLwM2mResponse createObservation(Observation observation, CoapResp
// TODO implement it ?
return null;
}

// TODO HACK remove this public access
public RandomTokenGenerator getTokenGenerator() {
return tokenGenerator;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*******************************************************************************
* Copyright (c) 2023 Sierra Wireless and others.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v20.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.html.
*
* Contributors:
* Sierra Wireless - initial API and implementation
*******************************************************************************/
package org.eclipse.leshan.transport.javacoap.observation;

import org.eclipse.leshan.core.observation.ObservationIdentifier;
import org.eclipse.leshan.core.observation.SingleObservation;
import org.eclipse.leshan.core.request.ContentFormat;
import org.eclipse.leshan.core.request.ObserveRequest;

import com.mbed.coap.packet.CoapResponse;
import com.mbed.coap.packet.Opaque;

public class ObservationUtil {

public static SingleObservation createSingleObservation(String registationID, ObserveRequest lwm2mRequest,
Opaque token, CoapResponse coapResponse) {
ContentFormat contentFormat = null;
if (coapResponse != null) {
contentFormat = ContentFormat.fromCode(coapResponse.options().getContentFormat());
}
return new SingleObservation(new ObservationIdentifier(token.getBytes()), registationID, lwm2mRequest.getPath(),
contentFormat, null, null);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -172,18 +172,10 @@ public void visit(DeleteRequest request) {

@Override
public void visit(ObserveRequest request) {
// TODO not implemented : need to investigate how observe work with java-coap

// coapRequest = Request.newGet();
// if (request.getContentFormat() != null)
// coapRequest.getOptions().setAccept(request.getContentFormat().getCode());
// coapRequest.setObserve();
// setURI(coapRequest, request.getPath());
// setSecurityContext(coapRequest);
//
// // add context info to the observe request
// coapRequest.setUserContext(ObserveUtil.createCoapObserveRequestContext(endpoint, registrationId, request));
// applyLowerLayerConfig(coapRequest);
coapRequest = CoapRequest.observe(getAddress(), getURI(request.getPath()));
if (request.getContentFormat() != null)
coapRequest.options().setAccept(request.getContentFormat().getCode());
applyLowerLayerConfig(coapRequest);
}

@Override
Expand Down
Loading

0 comments on commit 4c1913b

Please sign in to comment.