Skip to content

Commit

Permalink
Refactoring/Improving source: dmf (#1611)
Browse files Browse the repository at this point in the history
Signed-off-by: Marinov Avgustin <[email protected]>
  • Loading branch information
avgustinmm authored Feb 4, 2024
1 parent e218d2c commit da3a647
Show file tree
Hide file tree
Showing 20 changed files with 73 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
/**
* An abstract error handler for errors resulting from AMQP.
*/
public abstract class AbstractAmqpErrorHandler<T> implements AmqpErrorHandler{
public abstract class AbstractAmqpErrorHandler<T> implements AmqpErrorHandler {

@Override
public void doHandle(Throwable throwable, AmqpErrorHandlerChain chain) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.util.Optional;
import java.util.UUID;

import lombok.extern.slf4j.Slf4j;
import org.eclipse.hawkbit.api.HostnameResolver;
import org.eclipse.hawkbit.cache.DownloadArtifactCache;
import org.eclipse.hawkbit.cache.DownloadIdCache;
Expand All @@ -27,8 +28,6 @@
import org.eclipse.hawkbit.security.DmfTenantSecurityToken;
import org.eclipse.hawkbit.security.DmfTenantSecurityToken.FileResource;
import org.eclipse.hawkbit.tenancy.TenantAware;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
Expand All @@ -48,10 +47,9 @@
* is permitted to download certain artifact. This is handled by the queue that
* is configured for the property
* hawkbit.dmf.rabbitmq.authenticationReceiverQueue.
*
*/
@Slf4j
public class AmqpAuthenticationMessageHandler extends BaseAmqpService {
private static final Logger LOG = LoggerFactory.getLogger(AmqpAuthenticationMessageHandler.class);

private final AmqpControllerAuthentication authenticationManager;

Expand Down Expand Up @@ -133,30 +131,30 @@ private void checkIfArtifactIsAssignedToTarget(final DmfTenantSecurityToken secu
} else if (securityToken.getTargetId() != null) {
checkByTargetId(sha1Hash, securityToken.getTargetId());
} else {
LOG.info("anonymous download no authentication check for artifact {}", sha1Hash);
log.info("anonymous download no authentication check for artifact {}", sha1Hash);
}

}

private void checkByTargetId(final String sha1Hash, final Long targetId) {
LOG.debug("no anonymous download request, doing authentication check for target {} and artifact {}", targetId,
log.debug("no anonymous download request, doing authentication check for target {} and artifact {}", targetId,
sha1Hash);
if (!controllerManagement.hasTargetArtifactAssigned(targetId, sha1Hash)) {
LOG.info("target {} tried to download artifact {} which is not assigned to the target", targetId, sha1Hash);
log.info("target {} tried to download artifact {} which is not assigned to the target", targetId, sha1Hash);
throw new EntityNotFoundException();
}
LOG.info("download security check for target {} and artifact {} granted", targetId, sha1Hash);
log.info("download security check for target {} and artifact {} granted", targetId, sha1Hash);
}

private void checkByControllerId(final String sha1Hash, final String controllerId) {
LOG.debug("no anonymous download request, doing authentication check for target {} and artifact {}",
log.debug("no anonymous download request, doing authentication check for target {} and artifact {}",
controllerId, sha1Hash);
if (!controllerManagement.hasTargetArtifactAssigned(controllerId, sha1Hash)) {
LOG.info("target {} tried to download artifact {} which is not assigned to the target", controllerId,
log.info("target {} tried to download artifact {} which is not assigned to the target", controllerId,
sha1Hash);
throw new EntityNotFoundException();
}
LOG.info("download security check for target {} and artifact {} granted", controllerId, sha1Hash);
log.info("download security check for target {} and artifact {} granted", controllerId, sha1Hash);
}

private Optional<Artifact> findArtifactByFileResource(final FileResource fileResource) {
Expand Down Expand Up @@ -222,16 +220,16 @@ private Message handleAuthenticationMessage(final Message message) {
.path(tenantAware.getCurrentTenant()).path("/").path(downloadId).build().toUriString());
authenticationResponse.setResponseCode(HttpStatus.OK.value());
} catch (final BadCredentialsException | AuthenticationServiceException | CredentialsExpiredException e) {
LOG.error("Login failed", e);
log.error("Login failed", e);
authenticationResponse.setResponseCode(HttpStatus.FORBIDDEN.value());
authenticationResponse.setMessage("Login failed");
} catch (final URISyntaxException e) {
LOG.error("URI build exception", e);
log.error("URI build exception", e);
authenticationResponse.setResponseCode(HttpStatus.INTERNAL_SERVER_ERROR.value());
authenticationResponse.setMessage("Building download URI failed");
} catch (final EntityNotFoundException e) {
final String errorMessage = "Artifact for resource " + fileResource + " not found ";
LOG.info(errorMessage);
log.info(errorMessage);
authenticationResponse.setResponseCode(HttpStatus.NOT_FOUND.value());
authenticationResponse.setMessage(errorMessage);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
*/
package org.eclipse.hawkbit.amqp;

import lombok.extern.slf4j.Slf4j;
import org.eclipse.hawkbit.api.ArtifactUrlHandler;
import org.eclipse.hawkbit.api.HostnameResolver;
import org.eclipse.hawkbit.cache.DownloadIdCache;
Expand All @@ -26,8 +27,6 @@
import org.eclipse.hawkbit.security.DdiSecurityProperties;
import org.eclipse.hawkbit.security.SystemSecurityContext;
import org.eclipse.hawkbit.tenancy.TenantAware;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
Expand Down Expand Up @@ -60,15 +59,13 @@
/**
* Spring configuration for AMQP based DMF communication for indirect device
* integration.
*
*/
@Slf4j
@EnableConfigurationProperties({ AmqpProperties.class, AmqpDeadletterProperties.class })
@ConditionalOnProperty(prefix = "hawkbit.dmf.rabbitmq", name = "enabled", matchIfMissing = true)
@PropertySource("classpath:/hawkbit-dmf-defaults.properties")
public class AmqpConfiguration {

private static final Logger LOGGER = LoggerFactory.getLogger(AmqpConfiguration.class);

@Autowired
private AmqpProperties amqpProperties;

Expand Down Expand Up @@ -154,9 +151,9 @@ public RabbitTemplate rabbitTemplate() {

rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
LOGGER.debug("Message with {} confirmed by broker.", correlationData);
log.debug("Message with {} confirmed by broker.", correlationData);
} else {
LOGGER.error("Broker is unable to handle message with {} : {}", correlationData, cause);
log.error("Broker is unable to handle message with {} : {}", correlationData, cause);
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import jakarta.annotation.PostConstruct;

import lombok.extern.slf4j.Slf4j;
import org.eclipse.hawkbit.im.authentication.TenantAwareAuthenticationDetails;
import org.eclipse.hawkbit.repository.ControllerManagement;
import org.eclipse.hawkbit.repository.SystemManagement;
Expand All @@ -29,19 +30,15 @@
import org.eclipse.hawkbit.security.PreAuthenticationFilter;
import org.eclipse.hawkbit.security.SystemSecurityContext;
import org.eclipse.hawkbit.tenancy.TenantAware;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.security.core.Authentication;
import org.springframework.security.web.authentication.preauth.PreAuthenticatedAuthenticationToken;

/**
*
* A controller which handles the DMF AMQP authentication.
*/
@Slf4j
public class AmqpControllerAuthentication {

private static final Logger LOGGER = LoggerFactory.getLogger(AmqpControllerAuthentication.class);

private final PreAuthTokenSourceTrustAuthenticationProvider preAuthenticatedAuthenticationProvider = new PreAuthTokenSourceTrustAuthenticationProvider();

private List<PreAuthenticationFilter> filterChain;
Expand Down Expand Up @@ -155,11 +152,11 @@ private static PreAuthenticatedAuthenticationToken createAuthentication(final Pr
final Object credentials = filter.getPreAuthenticatedCredentials(securityToken);

if (principal == null) {
LOGGER.debug("No pre-authenticated principal found in message");
log.debug("No pre-authenticated principal found in message");
return null;
}

LOGGER.debug("preAuthenticatedPrincipal = {} trying to authenticate", principal);
log.debug("preAuthenticatedPrincipal = {} trying to authenticate", principal);

return new PreAuthenticatedAuthenticationToken(principal, credentials,
filter.getSuccessfulAuthenticationAuthorities());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,18 @@
import java.util.HashMap;
import java.util.Map;

import lombok.Data;
import org.springframework.amqp.core.Queue;
import org.springframework.boot.context.properties.ConfigurationProperties;

/**
* Bean which holds the necessary properties for configuring the AMQP deadletter
* queue.
*/
@Data
@ConfigurationProperties("hawkbit.dmf.rabbitmq.dead-letter")
public class AmqpDeadletterProperties {

private static final int THREE_WEEKS = 21;

/**
Expand All @@ -46,8 +49,7 @@ public Map<String, Object> getDeadLetterExchangeArgs(final String exchange) {
/**
* Create a deadletter queue with ttl for messages
*
* @param queueName
* the deadlette queue name
* @param queueName the deadletter queue name
* @return the deadletter queue
*/
public Queue createDeadletterQueue(final String queueName) {
Expand All @@ -59,13 +61,4 @@ private Map<String, Object> getTTLArgs() {
args.put("x-message-ttl", getTtl());
return args;
}

public long getTtl() {
return ttl;
}

public void setTtl(final long ttl) {
this.ttl = ttl;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,5 @@ public interface AmqpErrorHandler {
* @param chain
* an {@link AmqpErrorHandlerChain}
*/
void doHandle(final Throwable throwable, final AmqpErrorHandlerChain chain);

}
void doHandle(final Throwable throwable, final AmqpErrorHandlerChain chain);
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,5 +61,4 @@ public void handle(final Throwable error) {
defaultHandler.handleError(error);
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,17 @@
import java.util.Map;
import java.util.stream.Collectors;

import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.support.ListenerExecutionFailedException;

/**
* Class that composes a meaningful error message and enhances it with properties from failed message
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class AmqpErrorMessageComposer {

private AmqpErrorMessageComposer() {
}

/**
* Constructs an error message based on failed message content
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.IterableUtils;
import org.apache.commons.collections4.ListUtils;
import org.eclipse.hawkbit.api.ApiType;
Expand Down Expand Up @@ -67,8 +68,6 @@
import org.eclipse.hawkbit.repository.model.TenantMetaData;
import org.eclipse.hawkbit.security.SystemSecurityContext;
import org.eclipse.hawkbit.util.IpUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
Expand All @@ -87,12 +86,10 @@
*
* Additionally the dispatcher listener/subscribe for some target events e.g.
* assignment.
*
*/
@Slf4j
public class AmqpMessageDispatcherService extends BaseAmqpService {

private static final Logger LOG = LoggerFactory.getLogger(AmqpMessageDispatcherService.class);

private static final int MAX_PROCESSING_SIZE = 1000;

private final ArtifactUrlHandler artifactUrlHandler;
Expand Down Expand Up @@ -167,7 +164,7 @@ protected void targetAssignDistributionSet(final TargetAssignDistributionSetEven
assignedEvent.getActions().keySet());

if (!filteredTargetList.isEmpty()) {
LOG.debug("targetAssignDistributionSet retrieved. I will forward it to DMF broker.");
log.debug("targetAssignDistributionSet retrieved. I will forward it to DMF broker.");
sendUpdateMessageToTargets(assignedEvent.getDistributionSetId(), assignedEvent.getActions(),
filteredTargetList);
}
Expand All @@ -184,15 +181,15 @@ protected void onMultiAction(final MultiActionEvent multiActionEvent) {
if (!shouldBeProcessed(multiActionEvent)) {
return;
}
LOG.debug("MultiActionEvent received for {}", multiActionEvent.getControllerIds());
log.debug("MultiActionEvent received for {}", multiActionEvent.getControllerIds());
sendMultiActionRequestMessages(multiActionEvent.getTenant(), multiActionEvent.getControllerIds());
}

private List<Target> getTargetsWithoutPendingCancellations(final Set<String> controllerIds) {
return partitionedParallelExecution(controllerIds, partition -> {
return targetManagement.getByControllerID(partition).stream().filter(target -> {
if (hasPendingCancellations(target.getId())) {
LOG.debug("Target {} has pending cancellations. Will not send update message to it.",
log.debug("Target {} has pending cancellations. Will not send update message to it.",
target.getControllerId());
return false;
}
Expand Down
Loading

0 comments on commit da3a647

Please sign in to comment.