Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Send Operation #971

Merged
merged 9 commits into from
Mar 9, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,21 @@
import org.eclipse.leshan.client.resource.RootEnabler;
import org.eclipse.leshan.client.resource.listener.ObjectListener;
import org.eclipse.leshan.client.resource.listener.ObjectsListenerAdapter;
import org.eclipse.leshan.client.send.NoDataException;
import org.eclipse.leshan.client.servers.ServerIdentity;
import org.eclipse.leshan.core.californium.EndpointFactory;
import org.eclipse.leshan.core.model.LwM2mModel;
import org.eclipse.leshan.core.node.LwM2mNode;
import org.eclipse.leshan.core.node.LwM2mPath;
import org.eclipse.leshan.core.node.codec.LwM2mNodeDecoder;
import org.eclipse.leshan.core.node.codec.LwM2mNodeEncoder;
import org.eclipse.leshan.core.request.ContentFormat;
import org.eclipse.leshan.core.request.ReadCompositeRequest;
import org.eclipse.leshan.core.request.SendRequest;
import org.eclipse.leshan.core.response.ErrorCallback;
import org.eclipse.leshan.core.response.ReadCompositeResponse;
import org.eclipse.leshan.core.response.ResponseCallback;
import org.eclipse.leshan.core.response.SendResponse;
import org.eclipse.leshan.core.util.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -113,7 +124,7 @@ public LeshanClient(String endpoint, InetSocketAddress localAddress,
bootstrapHandler = createBoostrapHandler(objectTree);
endpointsManager = createEndpointsManager(localAddress, coapConfig, dtlsConfigBuilder, trustStore,
endpointFactory);
requestSender = createRequestSender(endpointsManager, sharedExecutor);
requestSender = createRequestSender(endpointsManager, sharedExecutor, encoder, objectTree.getModel());
engine = engineFactory.createRegistratioEngine(endpoint, objectTree, endpointsManager, requestSender,
bootstrapHandler, observers, additionalAttributes, bsAdditionalAttributes, sharedExecutor);

Expand Down Expand Up @@ -218,8 +229,8 @@ protected CaliforniumEndpointsManager createEndpointsManager(InetSocketAddress l
}

protected CaliforniumLwM2mRequestSender createRequestSender(CaliforniumEndpointsManager endpointsManager,
ScheduledExecutorService executor) {
return new CaliforniumLwM2mRequestSender(endpointsManager, executor);
ScheduledExecutorService executor, LwM2mNodeEncoder encoder, LwM2mModel model) {
return new CaliforniumLwM2mRequestSender(endpointsManager, executor, encoder, model);
}

protected RegistrationUpdateHandler createRegistrationUpdateHandler(RegistrationEngine engine,
Expand Down Expand Up @@ -278,6 +289,39 @@ public void triggerRegistrationUpdate(ServerIdentity server) {
engine.triggerRegistrationUpdate(server);
}

@Override
public SendResponse sendData(ServerIdentity server, ContentFormat format, List<String> paths, long timeoutInMs)
throws InterruptedException {
Validate.notNull(server);
Validate.notEmpty(paths);

Map<LwM2mPath, LwM2mNode> collectedData = collectData(server, paths);
return requestSender.send(server, new SendRequest(format, collectedData, null), 2000);
}

@Override
public void sendData(ServerIdentity server, ContentFormat format, List<String> paths, long timeoutInMs,
ResponseCallback<SendResponse> onResponse, ErrorCallback onError) {
Validate.notNull(server);
Validate.notEmpty(paths);
Validate.notNull(onResponse);
Validate.notNull(onError);

Map<LwM2mPath, LwM2mNode> collectedData = collectData(server, paths);
requestSender.send(server, new SendRequest(format, collectedData, null), 2000, onResponse, onError);
}

private Map<LwM2mPath, LwM2mNode> collectData(ServerIdentity server, List<String> paths) {
// format is not really used as this is an internal call, kind of HACK :/ ...
ContentFormat format = ContentFormat.SENML_CBOR;
ReadCompositeResponse response = rootEnabler.read(server, new ReadCompositeRequest(format, format, paths));
if (response.isSuccess()) {
return response.getContent();
}
throw new NoDataException("Unable to collect data for %s : %s / %s", paths, response.getCode(),
response.getErrorMessage());
}

/**
* A CoAP API, generally needed if you want to access to underlying CoAP layer.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.eclipse.leshan.client.servers.ServerIdentity;
import org.eclipse.leshan.core.californium.AsyncRequestObserver;
import org.eclipse.leshan.core.californium.SyncRequestObserver;
import org.eclipse.leshan.core.model.LwM2mModel;
import org.eclipse.leshan.core.node.codec.LwM2mNodeEncoder;
import org.eclipse.leshan.core.request.UplinkRequest;
import org.eclipse.leshan.core.response.ErrorCallback;
import org.eclipse.leshan.core.response.LwM2mResponse;
Expand All @@ -45,9 +47,11 @@ public class CaliforniumLwM2mRequestSender implements LwM2mRequestSender {
private final ScheduledExecutorService executor;
private final boolean attached;
private final CaliforniumEndpointsManager endpointsManager;
private final LwM2mNodeEncoder encoder;
private final LwM2mModel model;

public CaliforniumLwM2mRequestSender(CaliforniumEndpointsManager endpointsManager,
ScheduledExecutorService sharedExecutor) {
ScheduledExecutorService sharedExecutor, LwM2mNodeEncoder encoder, LwM2mModel model) {
this.endpointsManager = endpointsManager;
if (sharedExecutor == null) {
this.executor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Leshan Async Request timeout"));
Expand All @@ -56,13 +60,15 @@ public CaliforniumLwM2mRequestSender(CaliforniumEndpointsManager endpointsManage
this.executor = sharedExecutor;
this.attached = false;
}
this.model = model;
this.encoder = encoder;
}

@Override
public <T extends LwM2mResponse> T send(ServerIdentity server, final UplinkRequest<T> request, long timeout)
throws InterruptedException {
// Create the CoAP request from LwM2m request
CoapRequestBuilder coapClientRequestBuilder = new CoapRequestBuilder(server.getIdentity());
CoapRequestBuilder coapClientRequestBuilder = new CoapRequestBuilder(server.getIdentity(), encoder, model);
request.accept(coapClientRequestBuilder);
Request coapRequest = coapClientRequestBuilder.getRequest();

Expand All @@ -89,7 +95,7 @@ public T buildResponse(Response coapResponse) {
public <T extends LwM2mResponse> void send(ServerIdentity server, final UplinkRequest<T> request, long timeout,
ResponseCallback<T> responseCallback, ErrorCallback errorCallback) {
// Create the CoAP request from LwM2m request
CoapRequestBuilder coapClientRequestBuilder = new CoapRequestBuilder(server.getIdentity());
CoapRequestBuilder coapClientRequestBuilder = new CoapRequestBuilder(server.getIdentity(), encoder, model);
request.accept(coapClientRequestBuilder);
Request coapRequest = coapClientRequestBuilder.getRequest();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@
import org.eclipse.californium.elements.EndpointContext;
import org.eclipse.leshan.core.Link;
import org.eclipse.leshan.core.californium.EndpointContextUtil;
import org.eclipse.leshan.core.model.LwM2mModel;
import org.eclipse.leshan.core.node.codec.LwM2mNodeEncoder;
import org.eclipse.leshan.core.request.BindingMode;
import org.eclipse.leshan.core.request.BootstrapRequest;
import org.eclipse.leshan.core.request.ContentFormat;
import org.eclipse.leshan.core.request.DeregisterRequest;
import org.eclipse.leshan.core.request.Identity;
import org.eclipse.leshan.core.request.RegisterRequest;
import org.eclipse.leshan.core.request.SendRequest;
import org.eclipse.leshan.core.request.UpdateRequest;
import org.eclipse.leshan.core.request.UplinkRequest;
import org.eclipse.leshan.core.request.UplinkRequestVisitor;
Expand All @@ -42,9 +45,13 @@ public class CoapRequestBuilder implements UplinkRequestVisitor {

protected Request coapRequest;
protected final Identity server;
protected final LwM2mNodeEncoder encoder;
protected final LwM2mModel model;

public CoapRequestBuilder(Identity server) {
public CoapRequestBuilder(Identity server, LwM2mNodeEncoder encoder, LwM2mModel model) {
this.server = server;
this.encoder = encoder;
this.model = model;
}

@Override
Expand Down Expand Up @@ -143,6 +150,17 @@ public void visit(DeregisterRequest request) {
coapRequest.getOptions().setUriPath(request.getRegistrationId());
}

@Override
public void visit(SendRequest request) {
coapRequest = Request.newPost();
buildRequestSettings();
coapRequest.getOptions().setUriPath("/dp");

ContentFormat format = request.getFormat();
coapRequest.getOptions().setContentFormat(format.getCode());
coapRequest.setPayload(encoder.encodeNodes(request.getNodes(), format, model));
}

public Request getRequest() {
return coapRequest;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@
import org.eclipse.leshan.core.request.DeregisterRequest;
import org.eclipse.leshan.core.request.LwM2mRequest;
import org.eclipse.leshan.core.request.RegisterRequest;
import org.eclipse.leshan.core.request.SendRequest;
import org.eclipse.leshan.core.request.UpdateRequest;
import org.eclipse.leshan.core.request.UplinkRequestVisitor;
import org.eclipse.leshan.core.request.exception.InvalidResponseException;
import org.eclipse.leshan.core.response.BootstrapResponse;
import org.eclipse.leshan.core.response.DeregisterResponse;
import org.eclipse.leshan.core.response.LwM2mResponse;
import org.eclipse.leshan.core.response.RegisterResponse;
import org.eclipse.leshan.core.response.SendResponse;
import org.eclipse.leshan.core.response.UpdateResponse;

/**
Expand Down Expand Up @@ -93,6 +95,21 @@ public void visit(UpdateRequest request) {
}
}

@Override
public void visit(SendRequest request) {
if (coapResponse.isError()) {
// handle error response:
lwM2mresponse = new SendResponse(toLwM2mResponseCode(coapResponse.getCode()),
coapResponse.getPayloadString());
} else if (coapResponse.getCode() == org.eclipse.californium.core.coap.CoAP.ResponseCode.CHANGED) {
// handle success response:
lwM2mresponse = SendResponse.success();
} else {
// handle unexpected response:
handleUnexpectedResponseCode(request, coapResponse);
}
}

@Override
public void visit(BootstrapRequest request) {
if (coapResponse.isError()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,23 @@
*******************************************************************************/
package org.eclipse.leshan.client;

import java.util.List;

import org.eclipse.leshan.client.resource.LwM2mObjectTree;
import org.eclipse.leshan.client.send.NoDataException;
import org.eclipse.leshan.client.servers.ServerIdentity;
import org.eclipse.leshan.core.node.codec.CodecException;
import org.eclipse.leshan.core.request.ContentFormat;
import org.eclipse.leshan.core.request.exception.InvalidRequestException;
import org.eclipse.leshan.core.request.exception.InvalidResponseException;
import org.eclipse.leshan.core.request.exception.RequestCanceledException;
import org.eclipse.leshan.core.request.exception.RequestRejectedException;
import org.eclipse.leshan.core.request.exception.SendFailedException;
import org.eclipse.leshan.core.request.exception.TimeoutException;
import org.eclipse.leshan.core.request.exception.UnconnectedPeerException;
import org.eclipse.leshan.core.response.ErrorCallback;
import org.eclipse.leshan.core.response.ResponseCallback;
import org.eclipse.leshan.core.response.SendResponse;

/**
* A Lightweight M2M client.
Expand Down Expand Up @@ -52,6 +67,75 @@ public interface LwM2mClient {
*/
void triggerRegistrationUpdate(ServerIdentity server);

/**
* Send Data synchronously to a LWM2M Server.
* <p>
* The "Send" operation is used by the LwM2M Client to send data to the LwM2M Server without explicit request by
* that Server.
* <p>
* If some data can not be collected before to send, this will be silently ignored.<br>
* If there is not data to send at all, {@link NoDataException} is raised.
*
* @param server to which data must be send
* @param format {@link ContentFormat} to use. It MUST be {@link ContentFormat#SENML_CBOR} or
* {@link ContentFormat#SENML_JSON}
* @param paths the list of LWM2M node path to send.
* @param timeoutInMs The global timeout to wait in milliseconds (see
* https://github.com/eclipse/leshan/wiki/Request-Timeout)
* @return the LWM2M response. The response can be <code>null</code> if the timeout expires (see
* https://github.com/eclipse/leshan/wiki/Request-Timeout).
*
* @throws InterruptedException if the thread was interrupted.
* @throws InvalidRequestException if send request can not be created.
* @throws CodecException if request payload can not be encoded.
* @throws NoDataException if we can not collect data for given list of path.
* @throws RequestRejectedException if the request is rejected by foreign peer.
* @throws RequestCanceledException if the request is cancelled.
* @throws SendFailedException if the request can not be sent. E.g. error at CoAP or DTLS/UDP layer.
* @throws InvalidResponseException if the response received is malformed.
* @throws UnconnectedPeerException if client is not connected (no dtls connection available).
*/
SendResponse sendData(ServerIdentity server, ContentFormat format, List<String> paths, long timeoutInMs)
throws InterruptedException;

/**
* Send Data asynchronously to a LWM2M Server.
* <p>
* The "Send" operation is used by the LwM2M Client to send data to the LwM2M Server without explicit request by
* that Server.
* <p>
* If some data can not be collected before to send, this will be silently ignored.<br>
* If there is not data to send at all, {@link NoDataException} is raised.
* <p>
* {@link ResponseCallback} and {@link ErrorCallback} are exclusively called.
*
* @param server to which data must be send
* @param format {@link ContentFormat} to use. It MUST be {@link ContentFormat#SENML_CBOR} or
* {@link ContentFormat#SENML_JSON}
* @param paths the list of LWM2M node path to send.
* @param timeoutInMs The global timeout to wait in milliseconds (see
* https://github.com/eclipse/leshan/wiki/Request-Timeout)
* @param responseCallback a callback called when a response is received (successful or error response). This
* callback MUST NOT be null.
* @param errorCallback a callback called when an error or exception occurred when response is received. It can be :
* <ul>
* <li>{@link RequestRejectedException} if the request is rejected by foreign peer.</li>
* <li>{@link RequestCanceledException} if the request is cancelled.</li>
* <li>{@link SendFailedException} if the request can not be sent. E.g. error at CoAP or DTLS/UDP layer.</li>
* <li>{@link InvalidResponseException} if the response received is malformed.</li>
* <li>{@link UnconnectedPeerException} if client is not connected (no dtls connection available).</li>
* <li>{@link TimeoutException} if the timeout expires (see
* https://github.com/eclipse/leshan/wiki/Request-Timeout).</li>
* <li>or any other RuntimeException for unexpected issue.
* </ul>
* This callback MUST NOT be null.
* @throws CodecException if request payload can not be encoded.
* @throws NoDataException if we can not collect data for given list of path.
* @throws InvalidRequestException if send request can not be created.
*/
void sendData(ServerIdentity server, ContentFormat format, List<String> paths, long timeoutInMs,
ResponseCallback<SendResponse> responseCallback, ErrorCallback errorCallback);

/**
* @return the {@link LwM2mObjectTree} containing all the object implemented by this client.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
import org.eclipse.leshan.core.Destroyable;
import org.eclipse.leshan.core.Startable;
import org.eclipse.leshan.core.Stoppable;
import org.eclipse.leshan.core.model.LwM2mModel;
import org.eclipse.leshan.core.model.ObjectModel;
import org.eclipse.leshan.core.model.ResourceModel;
import org.eclipse.leshan.core.node.LwM2mPath;

/**
Expand All @@ -38,9 +41,10 @@
*/
public class LwM2mObjectTree implements Startable, Stoppable, Destroyable {

protected ObjectListener dispatcher = new ObjectListenerDispatcher();
protected CopyOnWriteArrayList<ObjectsListener> listeners = new CopyOnWriteArrayList<>();
protected ConcurrentHashMap<Integer, LwM2mObjectEnabler> objectEnablers = new ConcurrentHashMap<>();
protected final ObjectListener dispatcher = new ObjectListenerDispatcher();
protected final CopyOnWriteArrayList<ObjectsListener> listeners = new CopyOnWriteArrayList<>();
protected final ConcurrentHashMap<Integer, LwM2mObjectEnabler> objectEnablers = new ConcurrentHashMap<>();
protected final LwM2mModel model;

public LwM2mObjectTree(LwM2mClient client, LwM2mObjectEnabler... enablers) {
this(client, Arrays.asList(enablers));
Expand All @@ -58,6 +62,31 @@ public LwM2mObjectTree(LwM2mClient client, Collection<? extends LwM2mObjectEnabl
enabler.addListener(dispatcher);
enabler.setLwM2mClient(client);
}

this.model = new LwM2mModel() {

@Override
public ResourceModel getResourceModel(int objectId, int resourceId) {
ObjectModel objectModel = this.getObjectModel(objectId);
if (objectModel != null)
return objectModel.resources.get(resourceId);
return null;
}

@Override
public Collection<ObjectModel> getObjectModels() {
// TODO implements this ?
throw new UnsupportedOperationException("Not implemented");
}

@Override
public ObjectModel getObjectModel(int objectId) {
LwM2mObjectEnabler objectEnabler = getObjectEnabler(objectId);
if (objectEnabler != null)
return objectEnabler.getObjectModel();
return null;
}
};
}

public void addListener(ObjectsListener listener) {
Expand All @@ -68,6 +97,10 @@ public void removedListener(ObjectsListener listener) {
listeners.remove(listener);
}

public LwM2mModel getModel() {
return model;
}

public Map<Integer, LwM2mObjectEnabler> getObjectEnablers() {
return Collections.unmodifiableMap(objectEnablers);
}
Expand Down Expand Up @@ -161,5 +194,4 @@ public void endTransaction(Map<Integer, LwM2mObjectEnabler> enablers) {
enabler.endTransaction(LwM2mPath.ROOT_DEPTH);
}
}

}
Loading