Skip to content

Commit

Permalink
Refactor Selenium Grid scaler
Browse files Browse the repository at this point in the history
Signed-off-by: Viet Nguyen Duc <[email protected]>
  • Loading branch information
VietND96 committed Oct 7, 2024
1 parent c4f02d5 commit 4e46487
Show file tree
Hide file tree
Showing 4 changed files with 1,704 additions and 297 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ Here is an overview of all new **experimental** features:
- **Grafana dashboard**: Fix dashboard to handle wildcard scaledObject variables ([#6214](https://github.com/kedacore/keda/issues/6214))
- **Kafka**: Fix logic to scale to zero on invalid offset even with earliest offsetResetPolicy ([#5689](https://github.com/kedacore/keda/issues/5689))
- **RabbitMQ Scaler**: Add connection name for AMQP ([#5958](https://github.com/kedacore/keda/issues/5958))
- **Selenium Scaler**: Add Support for Username and Password Authentication ([#6144](https://github.com/kedacore/keda/issues/6144))
- **Selenium Grid Scaler**: Add optional auth parameters `username`, `password`, `authType`, `accessToken` to configure a secure GraphQL endpoint ([#6144](https://github.com/kedacore/keda/issues/6144))
- **Selenium Grid Scaler**: Add parameter `nodeMaxSessions` to configure scaler sync with `--max-sessions` capacity in the Node ([#6080](https://github.com/kedacore/keda/issues/6080))
- **Selenium Grid Scaler**: Improve logic based on node stereotypes, node sessions and queue requests capabilities ([#6080](https://github.com/kedacore/keda/issues/6080))
- TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX))

### Fixes
Expand Down
254 changes: 195 additions & 59 deletions pkg/scalers/selenium_grid_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"errors"
"fmt"
"io"
"math"
"net/http"
"strings"

Expand All @@ -29,55 +28,86 @@ type seleniumGridScaler struct {
type seleniumGridScalerMetadata struct {
triggerIndex int

URL string `keda:"name=url, order=triggerMetadata;authParams"`
URL string `keda:"name=url, order=authParams;triggerMetadata"`
AuthType string `keda:"name=authType, order=authParams;resolvedEnv, optional"`
Username string `keda:"name=username, order=authParams;resolvedEnv, optional"`
Password string `keda:"name=password, order=authParams;resolvedEnv, optional"`
AccessToken string `keda:"name=accessToken, order=authParams;resolvedEnv, optional"`
BrowserName string `keda:"name=browserName, order=triggerMetadata"`
SessionBrowserName string `keda:"name=sessionBrowserName, order=triggerMetadata, optional"`
ActivationThreshold int64 `keda:"name=activationThreshold, order=triggerMetadata, optional"`
BrowserVersion string `keda:"name=browserVersion, order=triggerMetadata, optional, default=latest"`
UnsafeSsl bool `keda:"name=unsafeSsl, order=triggerMetadata, optional, default=false"`
PlatformName string `keda:"name=platformName, order=triggerMetadata, optional, default=linux"`

// auth
Username string `keda:"name=username, order=authParams;resolvedEnv;triggerMetadata, optional"`
Password string `keda:"name=password, order=authParams;resolvedEnv;triggerMetadata, optional"`
NodeMaxSessions int `keda:"name=nodeMaxSessions, order=triggerMetadata, optional, default=1"`

TargetValue int64
}

type seleniumResponse struct {
Data data `json:"data"`
type SeleniumResponse struct {
Data Data `json:"data"`
}

type Data struct {
Grid Grid `json:"grid"`
NodesInfo NodesInfo `json:"nodesInfo"`
SessionsInfo SessionsInfo `json:"sessionsInfo"`
}

type Grid struct {
SessionCount int `json:"sessionCount"`
MaxSession int `json:"maxSession"`
TotalSlots int `json:"totalSlots"`
}

type NodesInfo struct {
Nodes Nodes `json:"nodes"`
}

type data struct {
Grid grid `json:"grid"`
SessionsInfo sessionsInfo `json:"sessionsInfo"`
type SessionsInfo struct {
SessionQueueRequests []string `json:"sessionQueueRequests"`
}

type grid struct {
MaxSession int `json:"maxSession"`
NodeCount int `json:"nodeCount"`
type Nodes []struct {
ID string `json:"id"`
Status string `json:"status"`
SessionCount int `json:"sessionCount"`
MaxSession int `json:"maxSession"`
SlotCount int `json:"slotCount"`
Stereotypes string `json:"stereotypes"`
Sessions Sessions `json:"sessions"`
}

type sessionsInfo struct {
SessionQueueRequests []string `json:"sessionQueueRequests"`
Sessions []seleniumSession `json:"sessions"`
type ReservedNodes struct {
ID string `json:"id"`
MaxSession int `json:"maxSession"`
SlotCount int `json:"slotCount"`
}

type seleniumSession struct {
type Sessions []struct {
ID string `json:"id"`
Capabilities string `json:"capabilities"`
NodeID string `json:"nodeId"`
Slot Slot `json:"slot"`
}

type Slot struct {
ID string `json:"id"`
Stereotype string `json:"stereotype"`
}

type capability struct {
type Capability struct {
BrowserName string `json:"browserName"`
BrowserVersion string `json:"browserVersion"`
PlatformName string `json:"platformName"`
}

type Stereotypes []struct {
Slots int `json:"slots"`
Stereotype Capability `json:"stereotype"`
}

const (
DefaultBrowserVersion string = "latest"
DefaultPlatformName string = "linux"
)

func NewSeleniumGridScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {
Expand Down Expand Up @@ -121,7 +151,7 @@ func parseSeleniumGridScalerMetadata(config *scalersconfig.ScalerConfig) (*selen
return meta, nil
}

// No cleanup required for selenium grid scaler
// No cleanup required for Selenium Grid scaler
func (s *seleniumGridScaler) Close(context.Context) error {
if s.httpClient != nil {
s.httpClient.CloseIdleConnections()
Expand Down Expand Up @@ -156,7 +186,7 @@ func (s *seleniumGridScaler) GetMetricSpecForScaling(context.Context) []v2.Metri

func (s *seleniumGridScaler) getSessionsCount(ctx context.Context, logger logr.Logger) (int64, error) {
body, err := json.Marshal(map[string]string{
"query": "{ grid { maxSession, nodeCount }, sessionsInfo { sessionQueueRequests, sessions { id, capabilities, nodeId } } }",
"query": "{ grid { sessionCount, maxSession, totalSlots }, nodesInfo { nodes { id, status, sessionCount, maxSession, slotCount, stereotypes, sessions { id, capabilities, slot { id, stereotype } } } }, sessionsInfo { sessionQueueRequests } }",
})

if err != nil {
Expand All @@ -168,8 +198,11 @@ func (s *seleniumGridScaler) getSessionsCount(ctx context.Context, logger logr.L
return -1, err
}

// Add HTTP Auth
req.SetBasicAuth(s.metadata.Username, s.metadata.Password)
if (s.metadata.AuthType == "" || strings.EqualFold(s.metadata.AuthType, "Basic")) && s.metadata.Username != "" && s.metadata.Password != "" {
req.SetBasicAuth(s.metadata.Username, s.metadata.Password)
} else if !strings.EqualFold(s.metadata.AuthType, "Basic") && s.metadata.AccessToken != "" {
req.Header.Set("Authorization", fmt.Sprintf("%s %s", s.metadata.AuthType, s.metadata.AccessToken))
}

res, err := s.httpClient.Do(req)
if err != nil {
Expand All @@ -186,62 +219,165 @@ func (s *seleniumGridScaler) getSessionsCount(ctx context.Context, logger logr.L
if err != nil {
return -1, err
}
v, err := getCountFromSeleniumResponse(b, s.metadata.BrowserName, s.metadata.BrowserVersion, s.metadata.SessionBrowserName, s.metadata.PlatformName, logger)
v, err := getCountFromSeleniumResponse(b, s.metadata.BrowserName, s.metadata.BrowserVersion, s.metadata.SessionBrowserName, s.metadata.PlatformName, s.metadata.NodeMaxSessions, logger)
if err != nil {
return -1, err
}
return v, nil
}

func getCountFromSeleniumResponse(b []byte, browserName string, browserVersion string, sessionBrowserName string, platformName string, logger logr.Logger) (int64, error) {
func countMatchingSlotsStereotypes(stereotypes Stereotypes, request Capability, browserName string, browserVersion string, sessionBrowserName string, platformName string) int {
var matchingSlots int
for _, stereotype := range stereotypes {
if checkCapabilitiesMatch(stereotype.Stereotype, request, browserName, browserVersion, sessionBrowserName, platformName) {
matchingSlots += stereotype.Slots
}
}
return matchingSlots
}

func countMatchingSessions(sessions Sessions, request Capability, browserName string, browserVersion string, sessionBrowserName string, platformName string, logger logr.Logger) int {
var matchingSessions int
for _, session := range sessions {
var capability = Capability{}
if err := json.Unmarshal([]byte(session.Capabilities), &capability); err == nil {
if checkCapabilitiesMatch(capability, request, browserName, browserVersion, sessionBrowserName, platformName) {
matchingSessions++
}
} else {
logger.Error(err, fmt.Sprintf("Error when unmarshaling session capabilities: %s", err))
}
}
return matchingSessions
}

func checkCapabilitiesMatch(capability Capability, requestCapability Capability, browserName string, browserVersion string, sessionBrowserName string, platformName string) bool {
// Ensure the logic should be aligned with DefaultSlotMatcher in Selenium Grid - SeleniumHQ/selenium/java/src/org/openqa/selenium/grid/data/DefaultSlotMatcher.java
// A browserName matches when one of the following conditions is met:
// 1. `browserName` in capability matches with `browserName` or `sessionBrowserName` in scaler metadata
// 2. `browserName` in request capability is empty or not provided
var browserNameMatches = strings.EqualFold(capability.BrowserName, browserName) || strings.EqualFold(capability.BrowserName, sessionBrowserName) ||
requestCapability.BrowserName == ""
// A browserVersion matches when one of the following conditions is met:
// 1. `browserVersion` in request capability is empty or not provided or `stable`
// 2. `browserVersion` in capability matches with prefix of the scaler metadata `browserVersion`
// 3. `browserVersion` in scaler metadata is `latest`
var browserVersionMatches = requestCapability.BrowserVersion == "" || requestCapability.BrowserVersion == "stable" ||
strings.HasPrefix(capability.BrowserVersion, browserVersion) || browserVersion == DefaultBrowserVersion
// A platformName matches when one of the following conditions is met:
// 1. `platformName` in request capability is empty or not provided
// 2. `platformName` in capability is empty or not provided
// 3. `platformName` in capability matches with the scaler metadata `platformName`
// 4. `platformName` in scaler metadata is empty or not provided
var platformNameMatches = requestCapability.PlatformName == "" || capability.PlatformName == "" ||
strings.EqualFold(capability.PlatformName, platformName) || platformName == ""
return browserNameMatches && browserVersionMatches && platformNameMatches
}

func checkNodeReservedSlots(reservedNodes []ReservedNodes, nodeID string, availableSlots int) int {
for _, reservedNode := range reservedNodes {
if strings.EqualFold(reservedNode.ID, nodeID) {
return reservedNode.SlotCount
}
}
return availableSlots
}

func updateOrAddReservedNode(reservedNodes []ReservedNodes, nodeID string, slotCount int, maxSession int) []ReservedNodes {
for i, reservedNode := range reservedNodes {
if strings.EqualFold(reservedNode.ID, nodeID) {
// Update remaining available slots for the reserved node
reservedNodes[i].SlotCount = slotCount
return reservedNodes
}
}
// Add new reserved node if not found
return append(reservedNodes, ReservedNodes{ID: nodeID, SlotCount: slotCount, MaxSession: maxSession})
}

func getCountFromSeleniumResponse(b []byte, browserName string, browserVersion string, sessionBrowserName string, platformName string, nodeMaxSessions int, logger logr.Logger) (int64, error) {
// The returned count of the number of new Nodes will be scaled up
var count int64
var seleniumResponse = seleniumResponse{}
// Track number of available slots of existing Nodes in the Grid can be reserved for the matched requests
var availableSlots int
// Track number of matched requests in the sessions queue will be served by this scaler
var queueSlots int

var seleniumResponse = SeleniumResponse{}
if err := json.Unmarshal(b, &seleniumResponse); err != nil {
return 0, err
}

var sessionQueueRequests = seleniumResponse.Data.SessionsInfo.SessionQueueRequests
for _, sessionQueueRequest := range sessionQueueRequests {
var capability = capability{}
if err := json.Unmarshal([]byte(sessionQueueRequest), &capability); err == nil {
if capability.BrowserName == browserName {
var platformNameMatches = capability.PlatformName == "" || strings.EqualFold(capability.PlatformName, platformName)
if strings.HasPrefix(capability.BrowserVersion, browserVersion) && platformNameMatches {
count++
} else if len(strings.TrimSpace(capability.BrowserVersion)) == 0 && browserVersion == DefaultBrowserVersion && platformNameMatches {
count++
}
var nodes = seleniumResponse.Data.NodesInfo.Nodes
// Track list of existing Nodes that have available slots for the matched requests
var reservedNodes []ReservedNodes
// Track list of new Nodes will be scaled up with number of available slots following scaler parameter `nodeMaxSessions`
var newRequestNodes []ReservedNodes
for requestIndex, sessionQueueRequest := range sessionQueueRequests {
var isRequestMatched bool
var requestCapability = Capability{}
if err := json.Unmarshal([]byte(sessionQueueRequest), &requestCapability); err == nil {
if checkCapabilitiesMatch(requestCapability, requestCapability, browserName, browserVersion, sessionBrowserName, platformName) {
queueSlots++
isRequestMatched = true
}
} else {
logger.Error(err, fmt.Sprintf("Error when unmarshaling session queue requests: %s", err))
logger.Error(err, fmt.Sprintf("Error when unmarshaling sessionQueueRequest capability: %s", err))
}
}

var sessions = seleniumResponse.Data.SessionsInfo.Sessions
for _, session := range sessions {
var capability = capability{}
if err := json.Unmarshal([]byte(session.Capabilities), &capability); err == nil {
var platformNameMatches = capability.PlatformName == "" || strings.EqualFold(capability.PlatformName, platformName)
if capability.BrowserName == sessionBrowserName {
if strings.HasPrefix(capability.BrowserVersion, browserVersion) && platformNameMatches {
count++
} else if browserVersion == DefaultBrowserVersion && platformNameMatches {
count++
// Skip the request if the capability does not match the scaler parameters
if !isRequestMatched {
continue
}

var isRequestReserved bool
// Check if the matched request can be assigned to available slots of existing Nodes in the Grid
for _, node := range nodes {
// Check if node is UP and has available slots (maxSession > sessionCount)
if strings.EqualFold(node.Status, "UP") && checkNodeReservedSlots(reservedNodes, node.ID, node.MaxSession-node.SessionCount) > 0 {
var stereotypes = Stereotypes{}
var availableSlotsMatch int
if err := json.Unmarshal([]byte(node.Stereotypes), &stereotypes); err == nil {
// Count available slots that match the request capability and scaler metadata
availableSlotsMatch += countMatchingSlotsStereotypes(stereotypes, requestCapability, browserName, browserVersion, sessionBrowserName, platformName)
} else {
logger.Error(err, fmt.Sprintf("Error when unmarshaling node stereotypes: %s", err))
}
// Count ongoing sessions that match the request capability and scaler metadata
var currentSessionsMatch = countMatchingSessions(node.Sessions, requestCapability, browserName, browserVersion, sessionBrowserName, platformName, logger)
// Count remaining available slots can be reserved for this request
var availableSlotsCanBeReserved = checkNodeReservedSlots(reservedNodes, node.ID, node.MaxSession-node.SessionCount)
// Reserve one available slot for the request if available slots match is greater than current sessions match
if availableSlotsMatch > currentSessionsMatch {
availableSlots++
reservedNodes = updateOrAddReservedNode(reservedNodes, node.ID, availableSlotsCanBeReserved-1, node.MaxSession)
isRequestReserved = true
break
}
}
} else {
logger.Error(err, fmt.Sprintf("Error when unmarshaling sessions info: %s", err))
}
// Check if the matched request can be assigned to available slots of new Nodes will be scaled up, since the scaler parameter `nodeMaxSessions` can be greater than 1
if !isRequestReserved {
for _, newRequestNode := range newRequestNodes {
if newRequestNode.SlotCount > 0 {
newRequestNodes = updateOrAddReservedNode(newRequestNodes, newRequestNode.ID, newRequestNode.SlotCount-1, nodeMaxSessions)
isRequestReserved = true
break
}
}
}
// Check if a new Node should be scaled up to reserve for the matched request
if !isRequestReserved {
newRequestNodes = updateOrAddReservedNode(newRequestNodes, string(rune(requestIndex)), nodeMaxSessions-1, nodeMaxSessions)
}
}

var gridMaxSession = int64(seleniumResponse.Data.Grid.MaxSession)
var gridNodeCount = int64(seleniumResponse.Data.Grid.NodeCount)

if gridMaxSession > 0 && gridNodeCount > 0 {
// Get count, convert count to next highest int64
var floatCount = float64(count) / (float64(gridMaxSession) / float64(gridNodeCount))
count = int64(math.Ceil(floatCount))
if queueSlots > availableSlots {
count = int64(len(newRequestNodes))
} else {
count = 0
}

return count, nil
}
Loading

0 comments on commit 4e46487

Please sign in to comment.