Skip to content

Commit

Permalink
Add clientEndpointType to interface (#40784)
Browse files Browse the repository at this point in the history
* add client endpoint type to interface

* add default value and fix comment

* fix lint issues

* add mqtt jwt parser implementation

* fix lint issues

* publish new test records

* remove unnecessary imports

* update changelog.md

* update changelog.md

* remove unused import

* update recorded tests

* fix lint issues

* change connectionID to run tests

* increase code coverage

* add edge test cases

* remove import * and update Changelog

* update changelog

* change method signature for better usability

* fix checkstyle

* add test coverage

* update javadoc

* refactor methods

* remove unnecessary params

* refactor method name

* update changelog

---------

Co-authored-by: chuongnguyen <[email protected]>
  • Loading branch information
cqnguy23 and chuongnguyen authored Jul 15, 2024
1 parent 9ecee32 commit 858c031
Show file tree
Hide file tree
Showing 15 changed files with 845 additions and 149 deletions.
5 changes: 5 additions & 0 deletions sdk/webpubsub/azure-messaging-webpubsub/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

## 1.3.0-beta.1 (Unreleased)

### Features Added

- Added a `clientEndpointType` option to `GenerateClientTokenOptions` to specify the type of client endpoint
when generating token. This option can be used to generate token and client connection URL for a specific client endpoint type, such as `Default` or `MQTT`.
- Added a `addConnectionsToGroups` method to `WebPubSubServiceClient` and `WebPubSubServiceAsyncClient` to add filtered connections to multiple groups.
- Migrated serialization to `azure-json` which offers implementation agnostic serialization, providing support for
more serialization frameworks than just Jackson.

Expand Down
2 changes: 1 addition & 1 deletion sdk/webpubsub/azure-messaging-webpubsub/assets.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
"AssetsRepo": "Azure/azure-sdk-assets",
"AssetsRepoPrefixPath": "java",
"TagPrefix": "java/webpubsub/azure-messaging-webpubsub",
"Tag": "java/webpubsub/azure-messaging-webpubsub_2c7e99b063"
"Tag": "java/webpubsub/azure-messaging-webpubsub_b39e249795"
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,16 @@
import com.azure.core.util.FluxUtil;
import com.azure.messaging.webpubsub.implementation.WebPubSubUtil;
import com.azure.messaging.webpubsub.implementation.WebPubSubsImpl;
import com.azure.messaging.webpubsub.implementation.models.AddToGroupsRequest;
import com.azure.messaging.webpubsub.models.ClientEndpointType;
import com.azure.messaging.webpubsub.models.GetClientAccessTokenOptions;
import com.azure.messaging.webpubsub.models.WebPubSubClientAccessToken;
import com.azure.messaging.webpubsub.models.WebPubSubContentType;
import com.azure.messaging.webpubsub.models.WebPubSubPermission;
import reactor.core.publisher.Mono;

import java.util.List;

/** Initializes a new instance of the asynchronous AzureWebPubSubServiceRestAPI type. */
@ServiceClient(builder = WebPubSubServiceClientBuilder.class, isAsync = true)
public final class WebPubSubServiceAsyncClient {
Expand Down Expand Up @@ -55,18 +59,21 @@ public final class WebPubSubServiceAsyncClient {
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<WebPubSubClientAccessToken> getClientAccessToken(GetClientAccessTokenOptions options) {
final ClientEndpointType clientEndpointType = options.getClientEndpointType();
final String path = clientEndpointType.equals(ClientEndpointType.MQTT)
? "clients/mqtt/hubs/" : "client/hubs/";
if (this.keyCredential == null) {
return this.serviceClient.generateClientTokenWithResponseAsync(hub,
configureClientAccessTokenRequestOptions(options))
.map(response -> {
String token = WebPubSubUtil.getToken(response.getValue());
return WebPubSubUtil.createToken(token, endpoint, hub);
return WebPubSubUtil.createToken(token, endpoint, hub, path);
});
}
return Mono.fromCallable(() -> {
final String audience = endpoint + (endpoint.endsWith("/") ? "" : "/") + "client/hubs/" + hub;
final String audience = endpoint + (endpoint.endsWith("/") ? "" : "/") + path + hub;
final String token = WebPubSubAuthenticationPolicy.getAuthenticationToken(audience, options, keyCredential);
return WebPubSubUtil.createToken(token, endpoint, hub);
return WebPubSubUtil.createToken(token, endpoint, hub, path);
});
}

Expand All @@ -84,6 +91,9 @@ static RequestOptions configureClientAccessTokenRequestOptions(GetClientAccessTo
if (!CoreUtils.isNullOrEmpty(options.getGroups())) {
options.getGroups().stream().forEach(groupName -> requestOptions.addQueryParam("group", groupName));
}
if (options.getClientEndpointType() != null) {
requestOptions.addQueryParam("clientType", options.getClientEndpointType().toString());
}
return requestOptions;
}

Expand Down Expand Up @@ -353,6 +363,41 @@ public Mono<Response<Void>> addConnectionToGroupWithResponse(String group, Strin
return this.serviceClient.addConnectionToGroupWithResponseAsync(hub, group, connectionId, requestOptions);
}

private Mono<Response<Void>> addConnectionsToGroupsWithResponse(String hub, BinaryData groupsToAdd,
RequestOptions requestOptions) {
return this.serviceClient.addConnectionsToGroupsWithResponseAsync(hub, groupsToAdd, requestOptions);
}

/**
* Add filtered connections to multiple groups.
* <p><strong>Request Body Schema</strong></p>
*
* <pre>{@code
* {
* groups: Iterable<String> (Optional)
* filter: String (Optional)
* }
* }</pre>
*
* @param hub Target hub name, which should start with alphabetic characters and only contain alpha-numeric
* characters or underscore.
* @param groups Target group names. Rejected by server on status code 400 if this parameter is null.
* @param filter The filter to apply to the connections.
* @return the completion of {@link Mono}.
* @throws HttpResponseException thrown if the request is rejected by server.
* @throws ClientAuthenticationException thrown if the request is rejected by server on status code 401.
* @throws ResourceNotFoundException thrown if the request is rejected by server on status code 404.
* @throws ResourceModifiedException thrown if the request is rejected by server on status code 409.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Void> addConnectionsToGroups(String hub, List<String> groups, String filter) {
AddToGroupsRequest requestBody = new AddToGroupsRequest();
requestBody.setGroups(groups);
requestBody.setFilter(filter);
BinaryData body = BinaryData.fromObject(requestBody);
return addConnectionsToGroupsWithResponse(hub, body, new RequestOptions()).flatMap(FluxUtil::toMono);
}

/**
* Remove a connection from the target group.
*
Expand Down Expand Up @@ -523,7 +568,7 @@ public Mono<Response<Void>> removeUserFromAllGroupsWithResponse(String userId, R
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Response<Void>> grantPermissionWithResponse(WebPubSubPermission permission, String connectionId,
RequestOptions requestOptions) {
RequestOptions requestOptions) {
return this.serviceClient.grantPermissionWithResponseAsync(hub, permission.toString(), connectionId,
requestOptions);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,20 @@
import com.azure.core.util.BinaryData;
import com.azure.messaging.webpubsub.implementation.WebPubSubUtil;
import com.azure.messaging.webpubsub.implementation.WebPubSubsImpl;
import com.azure.messaging.webpubsub.implementation.models.AddToGroupsRequest;
import com.azure.messaging.webpubsub.models.ClientEndpointType;
import com.azure.messaging.webpubsub.models.GetClientAccessTokenOptions;
import com.azure.messaging.webpubsub.models.WebPubSubClientAccessToken;
import com.azure.messaging.webpubsub.models.WebPubSubContentType;
import com.azure.messaging.webpubsub.models.WebPubSubPermission;

import java.util.List;

import static com.azure.messaging.webpubsub.WebPubSubServiceAsyncClient.configureClientAccessTokenRequestOptions;

/** Initializes a new instance of the synchronous AzureWebPubSubServiceRestAPI type. */
/**
* Initializes a new instance of the synchronous AzureWebPubSubServiceRestAPI type.
*/
@ServiceClient(builder = WebPubSubServiceClientBuilder.class)
public final class WebPubSubServiceClient {
private final WebPubSubsImpl serviceClient;
Expand All @@ -39,7 +45,7 @@ public final class WebPubSubServiceClient {
* @param serviceClient the service client implementation.
*/
WebPubSubServiceClient(WebPubSubsImpl serviceClient, String hub, String endpoint,
AzureKeyCredential keyCredential) {
AzureKeyCredential keyCredential) {
this.serviceClient = serviceClient;
this.endpoint = endpoint;
this.keyCredential = keyCredential;
Expand All @@ -54,14 +60,17 @@ public final class WebPubSubServiceClient {
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public WebPubSubClientAccessToken getClientAccessToken(GetClientAccessTokenOptions options) {
final ClientEndpointType clientEndpointType = options.getClientEndpointType();
final String path = clientEndpointType.equals(ClientEndpointType.MQTT)
? "clients/mqtt/hubs/" : "client/hubs/";
if (this.keyCredential == null) {
Response<BinaryData> response = serviceClient.generateClientTokenWithResponse(hub,
configureClientAccessTokenRequestOptions(options));
return WebPubSubUtil.createToken(WebPubSubUtil.getToken(response.getValue()), endpoint, hub);
return WebPubSubUtil.createToken(WebPubSubUtil.getToken(response.getValue()), endpoint, hub, path);
}
final String audience = endpoint + (endpoint.endsWith("/") ? "" : "/") + "client/hubs/" + hub;
final String audience = endpoint + (endpoint.endsWith("/") ? "" : "/") + path + hub;
final String token = WebPubSubAuthenticationPolicy.getAuthenticationToken(audience, options, keyCredential);
return WebPubSubUtil.createToken(token, endpoint, hub);
return WebPubSubUtil.createToken(token, endpoint, hub, path);
}

/**
Expand Down Expand Up @@ -111,7 +120,7 @@ Response<BinaryData> generateClientTokenWithResponse(String hub, RequestOptions
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Response<Void> sendToAllWithResponse(BinaryData message, WebPubSubContentType contentType,
long contentLength, RequestOptions requestOptions) {
long contentLength, RequestOptions requestOptions) {
if (requestOptions == null) {
requestOptions = new RequestOptions();
}
Expand Down Expand Up @@ -195,7 +204,7 @@ public Response<Void> closeConnectionWithResponse(String connectionId, RequestOp
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Response<Void> sendToConnectionWithResponse(String connectionId, BinaryData message,
WebPubSubContentType contentType, long contentLength, RequestOptions requestOptions) {
WebPubSubContentType contentType, long contentLength, RequestOptions requestOptions) {
if (requestOptions == null) {
requestOptions = new RequestOptions();
}
Expand Down Expand Up @@ -233,7 +242,7 @@ public void sendToConnection(String connectionId, String message, WebPubSubConte
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Response<Void> sendToConnectionWithResponse(String connectionId, BinaryData message,
RequestOptions requestOptions) {
RequestOptions requestOptions) {
return this.serviceClient.sendToConnectionWithResponse(hub, connectionId, "", message, requestOptions);
}

Expand Down Expand Up @@ -267,7 +276,7 @@ public Response<Boolean> groupExistsWithResponse(String group, RequestOptions re
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Response<Void> sendToGroupWithResponse(String group, BinaryData message, WebPubSubContentType contentType,
long contentLength, RequestOptions requestOptions) {
long contentLength, RequestOptions requestOptions) {
if (requestOptions == null) {
requestOptions = new RequestOptions();
}
Expand Down Expand Up @@ -321,10 +330,45 @@ public Response<Void> sendToGroupWithResponse(String group, BinaryData message,
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Response<Void> addConnectionToGroupWithResponse(String group, String connectionId,
RequestOptions requestOptions) {
RequestOptions requestOptions) {
return this.serviceClient.addConnectionToGroupWithResponse(hub, group, connectionId, requestOptions);
}

private Response<Void> addConnectionsToGroupsWithResponse(String hub, BinaryData groupsToAdd,
RequestOptions requestOptions) {
return this.serviceClient.addConnectionsToGroupsWithResponse(hub, groupsToAdd, requestOptions);
}

/**
* Add filtered connections to multiple groups.
* <p><strong>Request Body Schema</strong></p>
*
* <pre>{@code
* {
* groups: Iterable<String> (Optional)
* filter: String (Optional)
* }
* }</pre>
*
* @param hub Target hub name, which should start with alphabetic characters and only contain alpha-numeric
* characters or underscore.
* @param groups Target group names. Rejected by server on status code 400 if this parameter is null.
* @param filter The filter to apply to the connections.
* @throws HttpResponseException thrown if the request is rejected by server.
* @throws ClientAuthenticationException thrown if the request is rejected by server on status code 401.
* @throws ResourceNotFoundException thrown if the request is rejected by server on status code 404.
* @throws ResourceModifiedException thrown if the request is rejected by server on status code 409.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public void addConnectionsToGroups(String hub, List<String> groups, String filter) {
// Convert requestBody to Binary Data String
AddToGroupsRequest requestBody = new AddToGroupsRequest();
requestBody.setGroups(groups);
requestBody.setFilter(filter);
BinaryData body = BinaryData.fromObject(requestBody);
addConnectionsToGroupsWithResponse(hub, body, new RequestOptions());
}

/**
* Remove a connection from the target group.
*
Expand All @@ -338,7 +382,7 @@ public Response<Void> addConnectionToGroupWithResponse(String group, String conn
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Response<Void> removeConnectionFromGroupWithResponse(String group, String connectionId,
RequestOptions requestOptions) {
RequestOptions requestOptions) {
return this.serviceClient.removeConnectionFromGroupWithResponse(hub, group, connectionId, requestOptions);
}

Expand All @@ -356,7 +400,7 @@ public Response<Void> removeConnectionFromGroupWithResponse(String group, String
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Response<Void> removeConnectionFromAllGroupsWithResponse(String connectionId,
RequestOptions requestOptions) {
RequestOptions requestOptions) {
return this.serviceClient.removeConnectionFromAllGroupsWithResponse(hub, connectionId, requestOptions);
}

Expand Down Expand Up @@ -390,7 +434,7 @@ public Response<Boolean> userExistsWithResponse(String userId, RequestOptions re
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Response<Void> sendToUserWithResponse(String userId, BinaryData message, WebPubSubContentType contentType,
long contentLength, RequestOptions requestOptions) {
long contentLength, RequestOptions requestOptions) {
if (requestOptions == null) {
requestOptions = new RequestOptions();
}
Expand Down Expand Up @@ -491,7 +535,7 @@ public Response<Void> removeUserFromAllGroupsWithResponse(String userId, Request
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Response<Void> grantPermissionWithResponse(WebPubSubPermission permission, String connectionId,
RequestOptions requestOptions) {
RequestOptions requestOptions) {
return this.serviceClient.grantPermissionWithResponse(hub, permission.toString(), connectionId, requestOptions);
}

Expand All @@ -508,7 +552,7 @@ public Response<Void> grantPermissionWithResponse(WebPubSubPermission permission
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Response<Void> revokePermissionWithResponse(WebPubSubPermission permission, String connectionId,
RequestOptions requestOptions) {
RequestOptions requestOptions) {
return this.serviceClient.revokePermissionWithResponse(hub, permission.toString(), connectionId,
requestOptions);
}
Expand All @@ -526,7 +570,7 @@ public Response<Void> revokePermissionWithResponse(WebPubSubPermission permissio
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Response<Boolean> checkPermissionWithResponse(WebPubSubPermission permission, String connectionId,
RequestOptions requestOptions) {
RequestOptions requestOptions) {
return this.serviceClient.checkPermissionWithResponse(hub, permission.toString(), connectionId, requestOptions);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@ public enum WebPubSubServiceVersion implements ServiceVersion {
/**
* Enum value 2022-11-01.
*/
V2022_11_01("2022-11-01");
V2022_11_01("2022-11-01"),

/**
* Enum value 2024-01-01.
*/
V2024_01_01("2024-01-01");

private final String version;

Expand All @@ -40,6 +45,6 @@ public String getVersion() {
* @return The latest {@link WebPubSubServiceVersion}.
*/
public static WebPubSubServiceVersion getLatest() {
return V2022_11_01;
return V2024_01_01;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public WebPubSubServiceVersion getServiceVersion() {
* The interface defining all the services for AzureWebPubSubServiceRestApiHealthApis to be used by the proxy
* service to perform REST calls.
*/
@Host("{Endpoint}")
@Host("{endpoint}")
@ServiceInterface(name = "AzureWebPubSubServic")
public interface HealthApisService {
@Head("/api/health")
Expand All @@ -72,7 +72,7 @@ public interface HealthApisService {
@UnexpectedResponseExceptionType(value = ResourceNotFoundException.class, code = { 404 })
@UnexpectedResponseExceptionType(value = ResourceModifiedException.class, code = { 409 })
@UnexpectedResponseExceptionType(HttpResponseException.class)
Mono<Response<Void>> getServiceStatus(@HostParam("Endpoint") String endpoint,
Mono<Response<Void>> getServiceStatus(@HostParam("endpoint") String endpoint,
@QueryParam("api-version") String apiVersion, RequestOptions requestOptions, Context context);

@Head("/api/health")
Expand All @@ -81,7 +81,7 @@ Mono<Response<Void>> getServiceStatus(@HostParam("Endpoint") String endpoint,
@UnexpectedResponseExceptionType(value = ResourceNotFoundException.class, code = { 404 })
@UnexpectedResponseExceptionType(value = ResourceModifiedException.class, code = { 409 })
@UnexpectedResponseExceptionType(HttpResponseException.class)
Response<Void> getServiceStatusSync(@HostParam("Endpoint") String endpoint,
Response<Void> getServiceStatusSync(@HostParam("endpoint") String endpoint,
@QueryParam("api-version") String apiVersion, RequestOptions requestOptions, Context context);
}

Expand Down
Loading

0 comments on commit 858c031

Please sign in to comment.