Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Streaming POC #6291

Open
wants to merge 37 commits into
base: master
Choose a base branch
from
Open

[WIP] Streaming POC #6291

wants to merge 37 commits into from

Conversation

buger
Copy link
Member

@buger buger commented May 18, 2024

User description

Based on #6288

The main changes that Streaming configuration is now part of OAS API definition "x-tyk-streaming", and can look like this:

{
  "info": {
    "title": "Petstore",
    "version": "1.0.0"
  },
  "openapi": "3.0.3",
  "components": {},
  "paths": {},
  "x-tyk-streaming": {
      "streams": {
          "test": {
              "input": {
                   "generate": {
                        "count": 3,
                        "interval": "",
                        "mapping": "root = \"hello world\""
                    }
               },
               "output": {
                    "discard": {}
                }
           }
       }
   },
  "x-tyk-api-gateway": {
    "info": {
      "name": "Petstore",
      "state": {
        "active": true
      }
    },
    "upstream": {
      "url": "https://petstore.swagger.io/v2"
    },
    "server": {
      "listenPath": {
        "value": "/petstore/",
        "strip": true
      }
    }
  }
}

PR Type

Enhancement, Tests, Documentation


Description

  • Introduced StreamingMiddleware for handling streaming functionality with WebSocket and SSE support.
  • Implemented StreamManager for managing stream configurations and Redis integration.
  • Added PortalClient and portalOutput for interacting with the developer portal and posting messages to webhooks.
  • Integrated StreamingMiddleware into the API loading process and added unload hooks to APISpec.
  • Added comprehensive tests for new streaming and portal functionalities.
  • Updated dependencies and configuration to support new features.

Changes walkthrough 📝

Relevant files
Enhancement
8 files
mw_streaming.go
Introduce Streaming Middleware with WebSocket and SSE Support

gateway/mw_streaming.go

  • Added new StreamingMiddleware for handling streaming functionality.
  • Implemented WebSocket and SSE handling within the middleware.
  • Integrated Redis for stream management.
  • +488/-0 
    manager.go
    Implement StreamManager for Stream Configuration and Management

    internal/streaming/manager.go

  • Created StreamManager for managing stream configurations.
  • Added methods for adding, removing, and subscribing to streams.
  • Integrated Redis for stream message handling.
  • +464/-0 
    portal_client.go
    Implement PortalClient for Developer Portal Interaction   

    internal/portal/portal_client.go

  • Created PortalClient for interacting with the developer portal.
  • Added methods for listing webhook credentials.
  • +152/-0 
    portal_output.go
    Add PortalOutput for Posting Messages to Webhooks               

    internal/portal/portal_output.go

  • Added portalOutput for posting messages to webhooks.
  • Integrated with PortalClient to fetch webhook credentials.
  • +128/-0 
    api_loader.go
    Integrate StreamingMiddleware into API Loading Process     

    gateway/api_loader.go

  • Integrated StreamingMiddleware into the API loading process.
  • Added logic to unload specs when they change.
  • +9/-5     
    middleware.go
    Enhance TykMiddleware Interface with Spec Retrieval and Unloading

    gateway/middleware.go

  • Added methods to TykMiddleware interface for spec retrieval and
    unloading.
  • Updated middleware initialization to include unload hooks.
  • +15/-1   
    api_definition.go
    Add Unload Hooks to APISpec                                                           

    gateway/api_definition.go

  • Added unload hooks to APISpec.
  • Updated Release method to call unload hooks.
  • +12/-1   
    mw_context_vars.go
    Modify EnabledForSpec Method in MiddlewareContextVars       

    gateway/mw_context_vars.go

    • Modified EnabledForSpec method to always return true.
    +1/-1     
    Tests
    5 files
    mw_streaming_test.go
    Add Tests for StreamingMiddleware WebSocket and SSE Handling

    gateway/mw_streaming_test.go

  • Added tests for StreamingMiddleware including WebSocket and SSE
    handling.
  • Implemented test cases for stream generation and message handling.
  • +327/-0 
    manager_test.go
    Add Unit Tests for StreamManager Functionality                     

    internal/streaming/manager_test.go

  • Added unit tests for StreamManager.
  • Tested stream addition, removal, and HTTP path retrieval.
  • +162/-0 
    portal_test.go
    Add Tests for PortalClient and Webhook Credential Listing

    internal/portal/portal_test.go

  • Added tests for PortalClient and webhook credential listing.
  • Mocked server responses for testing.
  • +76/-0   
    portal_output_test.go
    Add Unit Tests for PortalOutput Plugin                                     

    internal/portal/portal_output_test.go

  • Added unit tests for portalOutput plugin.
  • Mocked server to test webhook calls.
  • +74/-0   
    testutil.go
    Update Test Utility for API Spec Unloading                             

    gateway/testutil.go

    • Updated test utility to unload API specs during cleanup.
    +6/-0     
    Configuration changes
    1 files
    config.go
    Add StreamingConfig to Global Configuration                           

    config/config.go

  • Added StreamingConfig to global configuration.
  • Enabled streaming feature flag.
  • +7/-0     
    Dependencies
    1 files
    go.mod
    Update Dependencies for Streaming and Portal Integration 

    go.mod

  • Updated dependencies including benthos and go-redis.
  • Added new dependencies for streaming and portal integration.
  • +289/-35

    💡 PR-Agent usage:
    Comment /help on the PR to get a list of all available PR-Agent tools and their descriptions

    @buger buger changed the title Streaming POC [WIP] Streaming POC May 18, 2024
    Copy link
    Contributor

    PR Description updated to latest commit (d2ca309)

    Copy link
    Contributor

    github-actions bot commented May 18, 2024

    API Changes

    --- prev.txt	2024-06-30 18:43:35.438528688 +0000
    +++ current.txt	2024-06-30 18:43:32.546505401 +0000
    @@ -5591,6 +5591,8 @@
     
     	// OAS holds the configuration for various OpenAPI-specific functionalities
     	OAS OASConfig `json:"oas_config"`
    +
    +	Labs labsConfig `json:"labs"`
     }
         Config is the configuration object used by Tyk to set up various parameters.
     
    @@ -7425,6 +7427,9 @@
         The name for event handlers as defined in the API Definition JSON/BSON
         format
     
    +const (
    +	ExtensionTykStreaming = "x-tyk-streaming"
    +)
     const ListDetailed = "detailed"
     const LoopScheme = "tyk"
     const MaxBackoffRetries = 4
    @@ -7692,6 +7697,9 @@
     
     func CloneAPI(a *APISpec) *APISpec
     
    +func (s *APISpec) AddUnloadHook(hook func())
    +    AddUnloadHook adds a function to be called when the API spec is unloaded
    +
     func (a *APISpec) CheckSpecMatchesStatus(r *http.Request, rxPaths []URLSpec, mode URLStatus) (bool, interface{})
         CheckSpecMatchesStatus checks if a url spec has a specific status
     
    @@ -7707,9 +7715,6 @@
     
     func (a *APISpec) Init(authStore, sessionStore, healthStore, orgStore storage.Handler)
     
    -func (s *APISpec) Release()
    -    Release releases all resources associated with API spec
    -
     func (a *APISpec) RequestValid(r *http.Request) (bool, RequestStatus)
         RequestValid will check if an incoming request has valid version data and
         return a RequestStatus that describes the status of the request
    @@ -7723,6 +7728,9 @@
     func (a *APISpec) URLAllowedAndIgnored(r *http.Request, rxPaths []URLSpec, whiteListStatus bool) (RequestStatus, interface{})
         URLAllowedAndIgnored checks if a url is allowed and ignored.
     
    +func (s *APISpec) Unload()
    +    Release releases all resources associated with API spec
    +
     func (s *APISpec) Validate(oasConfig config.OASConfig) error
         Validate returns nil if s is a valid spec and an error stating why the spec
         is not valid.
    @@ -7815,6 +7823,8 @@
         FireEvent is added to the BaseMiddleware object so it is available across
         the entire stack
     
    +func (t *BaseMiddleware) GetSpec() *APISpec
    +
     func (t *BaseMiddleware) Init()
     
     func (t *BaseMiddleware) Logger() (logger *logrus.Entry)
    @@ -7829,6 +7839,8 @@
     
     func (t *BaseMiddleware) SetRequestLogger(r *http.Request)
     
    +func (t *BaseMiddleware) Unload()
    +
     func (t *BaseMiddleware) UpdateRequestSession(r *http.Request) bool
     
     type BaseTykResponseHandler struct {
    @@ -10057,6 +10069,25 @@
     
     type StatsDSinkSanitizationFunc func(*bytes.Buffer, string)
     
    +type StreamingMiddleware struct {
    +	*BaseMiddleware
    +
    +	// Has unexported fields.
    +}
    +    StreamingMiddleware is a middleware that handles streaming functionality
    +
    +func (s *StreamingMiddleware) EnabledForSpec() bool
    +
    +func (s *StreamingMiddleware) Init()
    +    Init initializes the middleware
    +
    +func (s *StreamingMiddleware) Name() string
    +
    +func (s *StreamingMiddleware) ProcessRequest(w http.ResponseWriter, r *http.Request, _ interface{}) (error, int)
    +    ProcessRequest will handle the streaming functionality
    +
    +func (s *StreamingMiddleware) Unload()
    +
     type StripAuth struct {
     	*BaseMiddleware
     }
    @@ -10287,6 +10318,10 @@
     	ProcessRequest(w http.ResponseWriter, r *http.Request, conf interface{}) (error, int) // Handles request
     	EnabledForSpec() bool
     	Name() string
    +
    +	GetSpec() *APISpec
    +
    +	Unload()
     }
     
     type TykOsinServer struct {

    Copy link
    Contributor

    PR Review 🔍

    ⏱️ Estimated effort to review [1-5]

    4, because the PR introduces a significant amount of new functionality related to streaming, including integration with Benthos, handling of streaming configurations, and extensive modifications across multiple files. The complexity and breadth of the changes require careful review to ensure correctness, performance, and maintainability.

    🧪 Relevant tests

    Yes

    ⚡ Possible issues

    Possible Bug: In gateway/streaming_test.go, the function ConvertYAMLToJSON uses a recursive approach to convert keys to strings, which might lead to a stack overflow with deeply nested YAML structures.

    Error Handling: In internal/streaming/benthos_server.go, the function Start uses panic for error handling in multiple places. This approach can cause the application to terminate unexpectedly. It would be better to handle these errors gracefully and return them to the caller.

    🔒 Security concerns

    No

    Code feedback:
    relevant fileinternal/streaming/benthos_server.go
    suggestion      

    Replace panic with proper error handling in the Start method to prevent the application from terminating unexpectedly. This change enhances the robustness and reliability of the server initialization process. [important]

    relevant linepanic(err) // Handle error appropriately for your use case

    relevant filegateway/streaming_test.go
    suggestion      

    Consider adding a check for deep recursion or refactoring the method to handle deeply nested structures more efficiently to prevent potential stack overflow issues in ConvertYAMLToJSON. [important]

    relevant linefunc ConvertYAMLToJSON(yamlData []byte) ([]byte, error) {

    relevant fileinternal/streaming/benthos_server.go
    suggestion      

    Use a logging mechanism instead of fmt.Printf for stream creation and update messages to provide better control over log management and to integrate with the system's centralized logging strategy. [medium]

    relevant linefmt.Printf("Stream created: %+v\n", result)

    relevant fileinternal/streaming/benthos_server.go
    suggestion      

    Consider implementing a more secure method for handling passwords, such as using environment variables or secure vault solutions, instead of generating them within the application. This change would enhance the security posture by reducing potential exposure of sensitive data. [important]

    relevant linepassword := generateRandomPassword()

    Copy link
    Contributor

    PR Code Suggestions ✨

    CategorySuggestion                                                                                                                                    Score
    Best practice
    Add a descriptive comment for the new constant to improve code readability

    Consider adding a comment above the new constant ExtensionTykStreaming to describe its
    purpose and usage within the system. This will maintain consistency with other constants
    in the file and improve code readability.

    apidef/oas/oas.go [20]

    +// ExtensionTykStreaming is the OAS schema key for the Tyk streaming extension.
     ExtensionTykStreaming = "x-tyk-streaming"
     
    Suggestion importance[1-10]: 10

    Why: The suggestion correctly identifies the lack of a descriptive comment for the new constant ExtensionTykStreaming and proposes an appropriate comment, improving consistency and readability.

    10
    Replace fmt.Printf with log.Printf for structured logging

    Use structured logging instead of fmt.Printf for better log management and to enable
    structured log querying capabilities.

    internal/streaming/benthos_server.go [134]

    -fmt.Printf("Stream created: %+v\n", result)
    +log.Printf("Stream created: %+v", result)
     
    Suggestion importance[1-10]: 6

    Why: Using structured logging like log.Printf instead of fmt.Printf improves logging consistency and is beneficial for log analysis, but it's a moderate improvement in the context of the entire application.

    6
    Maintainability
    Refactor nested conditional logic into separate methods to improve readability and maintainability

    Refactor the nested conditional blocks inside the loop for checking and adding streams to
    reduce complexity and improve readability. Consider extracting this logic into a separate
    method.

    gateway/api_loader.go [1005-1021]

     if gw.GetConfig().Streaming.Enabled && spec.IsOAS {
    +    gw.processStreamingAPI(spec)
    +}
    +
    +// New method in Gateway struct
    +func (gw *Gateway) processStreamingAPI(spec *APISpec) {
         if ext, ok := spec.OAS.T.Extensions[oas.ExtensionTykStreaming]; ok {
    -        if streamsMap, ok := ext.(map[string]interface{}); ok {
    -            if streams, ok := streamsMap["streams"].(map[string]interface{}); ok {
    -                for streamID, stream := range streams {
    -                    if streamMap, ok := stream.(map[string]interface{}); ok {
    -                        if err := gw.StreamingServer.AddStream(spec.APIID+"_"+streamID, streamMap); err != nil {
    -                            mainLog.Errorf("Error adding stream to streaming server: %v", err)
    -                        } else {
    -                            activeStreams[spec.APIID+"_"+streamID] = struct{}{}
    -                            mainLog.Infof("Added stream %s to streaming server", spec.APIID+"_"+streamID)
    -                        }
    -                    }
    -                }
    +        gw.handleStreamExtensions(ext, spec)
    +    }
    +}
    +
    +func (gw *Gateway) handleStreamExtensions(ext interface{}, spec *APISpec) {
    +    if streamsMap, ok := ext.(map[string]interface{}); ok {
    +        if streams, ok := streamsMap["streams"].(map[string]interface{}); ok {
    +            gw.addStreamsToServer(streams, spec)
    +        }
    +    }
    +}
    +
    +func (gw *Gateway) addStreamsToServer(streams map[string]interface{}, spec *APISpec) {
    +    for streamID, stream := range streams {
    +        if streamMap, ok := stream.(map[string]interface{}); ok {
    +            streamFullName := spec.APIID + "_" + streamID
    +            if err := gw.StreamingServer.AddStream(streamFullName, streamMap); err != nil {
    +                mainLog.Errorf("Error adding stream to streaming server: %v", err)
    +            } else {
    +                activeStreams[streamFullName] = struct{}{}
    +                mainLog.Infof("Added stream %s to streaming server", streamFullName)
                 }
             }
         }
     }
     
    Suggestion importance[1-10]: 9

    Why: The suggestion effectively addresses the complexity of nested conditionals by proposing a refactor into separate methods, significantly enhancing code readability and maintainability.

    9
    Use constants for error messages in tests to reduce duplication and ease maintenance

    Replace the hardcoded error message checks in tests with constants or variables to reduce
    duplication and facilitate easier updates to test conditions.

    gateway/streaming_test.go [69-172]

    -t.Fatalf("Failed to create temporary file: %v", err)
    -t.Fatalf("Failed to convert YAML to JSON: %v", err)
    -t.Fatalf("Failed to parse JSON: %v", err)
    -t.Fatalf("Failed to get streams: %v", err)
    -t.Fatalf("Failed to read file: %v", err)
    -t.Fatalf("Expected 'hello world', got '%s'", line)
    +const (
    +    errCreateTempFile = "Failed to create temporary file: %v"
    +    errConvertYAMLToJSON = "Failed to convert YAML to JSON: %v"
    +    errParseJSON = "Failed to parse JSON: %v"
    +    errGetStreams = "Failed to get streams: %v"
    +    errReadFile = "Failed to read file: %v"
    +    errExpectedOutput = "Expected 'hello world', got '%s'"
    +)
     
    +t.Fatalf(errCreateTempFile, err)
    +t.Fatalf(errConvertYAMLToJSON, err)
    +t.Fatalf(errParseJSON, err)
    +t.Fatalf(errGetStreams, err)
    +t.Fatalf(errReadFile, err)
    +t.Fatalf(errExpectedOutput, line)
    +
    Suggestion importance[1-10]: 7

    Why: Using constants for repeated error messages in tests is a good practice for maintainability and ease of updates, correctly identified by the suggestion.

    7
    Possible issue
    Replace panic with error handling in the server start method

    Replace the panic handling in the Start method with proper error handling to avoid abrupt
    termination of the program. This will improve the robustness of the server operation by
    allowing it to handle errors gracefully.

    internal/streaming/benthos_server.go [55]

    -panic(err) // Handle error appropriately for your use case
    +return fmt.Errorf("failed to listen on TCP port: %w", err)
     
    Suggestion importance[1-10]: 9

    Why: Replacing panic with proper error handling in critical server operations is crucial for robustness and prevents the server from crashing unexpectedly, which is a significant improvement.

    9
    Possible bug
    Add error handling for uninitialized streaming server to prevent nil pointer dereference

    Add error handling for the scenario where gw.StreamingServer is nil to prevent potential
    nil pointer dereference when the streaming feature is enabled but the server fails to
    initialize.

    gateway/server.go [1906-1911]

     if gw.GetConfig().Streaming.Enabled {
         gw.StreamingServer = streaming.New()
    +    if gw.StreamingServer == nil {
    +        mainLog.Error("Failed to initialize streaming server")
    +        return
    +    }
         err := gw.StreamingServer.Start()
         if err != nil {
             mainLog.WithError(err).Error("Error starting streaming server")
         }
     }
     
    Suggestion importance[1-10]: 8

    Why: This suggestion correctly identifies a potential bug where gw.StreamingServer could be nil, leading to a nil pointer dereference. The proposed error handling is crucial for robustness.

    8
    Enhancement
    Implement exponential backoff for retries in the WaitForReady method

    Consider implementing a retry mechanism with exponential backoff for the WaitForReady
    method to handle transient errors more effectively.

    internal/streaming/benthos_server.go [104]

    -time.Sleep(500 * time.Millisecond)
    +time.Sleep(time.Duration(math.Pow(2, float64(t))) * time.Second)
     
    Suggestion importance[1-10]: 7

    Why: Implementing exponential backoff is a good practice for handling retries, especially in network operations where transient errors can occur. This would enhance the reliability of the WaitForReady method.

    7
    Provide meaningful defaults for configuration parameters

    Avoid using empty strings for configuration defaults in the server configuration template.
    Provide meaningful defaults or document the need for these fields to be populated.

    internal/streaming/benthos_server.go [74]

    -salt: ""
    +salt: "default_salt_value"  # Replace with a meaningful default or ensure it's set by the user
     
    Suggestion importance[1-10]: 5

    Why: Providing meaningful defaults or documentation for configuration parameters can prevent potential misconfigurations, but this is a relatively minor enhancement compared to other possible improvements.

    5

    Copy link
    Contributor

    💥 CI tests failed 🙈

    git-state

    all ok

    Please look at the run or in the Checks tab.

    Copy link
    Contributor

    💥 CI tests failed 🙈

    git-state

    all ok

    Please look at the run or in the Checks tab.

    Copy link
    Contributor

    💥 CI tests failed 🙈

    git-state

    all ok

    Please look at the run or in the Checks tab.

    Copy link
    Contributor

    💥 CI tests failed 🙈

    git-state

    all ok

    Please look at the run or in the Checks tab.

    1 similar comment
    Copy link
    Contributor

    💥 CI tests failed 🙈

    git-state

    all ok

    Please look at the run or in the Checks tab.

    Copy link
    Contributor

    💥 CI tests failed 🙈

    git-state

    all ok

    Please look at the run or in the Checks tab.

    Copy link
    Contributor

    💥 CI tests failed 🙈

    git-state

    all ok

    Please look at the run or in the Checks tab.

    …isconnected would cause a send on closed channel
    Copy link
    Contributor

    💥 CI tests failed 🙈

    git-state

    all ok

    Please look at the run or in the Checks tab.

    1 similar comment
    Copy link
    Contributor

    💥 CI tests failed 🙈

    git-state

    all ok

    Please look at the run or in the Checks tab.

    Copy link
    Contributor

    💥 CI tests failed 🙈

    git-state

    all ok

    Please look at the run or in the Checks tab.

    Copy link
    Contributor

    💥 CI tests failed 🙈

    git-state

    all ok

    Please look at the run or in the Checks tab.

    Copy link
    Contributor

    PR Code Suggestions ✨

    CategorySuggestion                                                                                                                                    Score
    Possible bug
    Add error handling for consumer group creation failure

    Implement error handling after attempting to create a consumer group in Redis. Currently,
    the error is logged but not returned, which might lead to silent failures where the
    consumer group is assumed to be created successfully when it's not.

    internal/streaming/manager.go [173-176]

     err := sm.redis.XGroupCreateMkStream(context.Background(), streamID, consumerGroup, "$").Err()
     if err != nil && err != redis.Nil {
       sm.log.Printf("Error while creating consumer group %s for stream %s: %v", consumerGroup, streamID, err)
    +  return nil, nil, err
     }
     
    Suggestion importance[1-10]: 9

    Why: Adding error handling for consumer group creation is crucial as it prevents silent failures and ensures that the system behaves as expected when an error occurs.

    9
    Add nil check before appending to slice to prevent potential nil pointer dereference

    Consider checking if s.unloadHooks is not nil before appending to avoid potential nil
    pointer dereference.

    gateway/api_definition.go [239]

    -s.unloadHooks = append(s.unloadHooks, hook)
    +if s.unloadHooks != nil {
    +    s.unloadHooks = append(s.unloadHooks, hook)
    +}
     
    Suggestion importance[1-10]: 8

    Why: This suggestion addresses a potential nil pointer dereference, which is a significant issue that could cause runtime panics. Adding a nil check is a good practice to ensure robustness.

    8
    Add error handling for missing stream ID in connections map

    Consider handling the case where streamID is not found in s.connections to avoid potential
    panics when accessing a non-existent key.

    gateway/mw_streaming.go [334-338]

     if conns, ok := s.connections[streamID]; ok {
         delete(conns, connID)
         if len(conns) == 0 {
             delete(s.connections, streamID)
         }
    +} else {
    +    s.Logger().Errorf("Stream ID %s not found in connections", streamID)
     }
     
    Suggestion importance[1-10]: 7

    Why: Adding error handling for missing stream IDs improves the robustness of the code by preventing potential panics and providing better logging. This is a useful improvement but not as critical as fixing concurrency issues.

    7
    Concurrency
    Protect slice access with mutex to ensure thread safety

    To ensure that the Unload method is thread-safe, consider protecting the access to
    s.unloadHooks with a mutex lock.

    gateway/api_definition.go [273-275]

    +s.unloadHooksMutex.Lock()
     for _, hook := range s.unloadHooks {
         hook()
     }
    +s.unloadHooksMutex.Unlock()
     
    Suggestion importance[1-10]: 9

    Why: Ensuring thread safety is crucial in concurrent programming. This suggestion addresses a potential concurrency issue by protecting access to the slice with a mutex, which is a significant improvement.

    9
    Possible issue
    Add a default value or validation for the 'interval' field to prevent errors

    Consider handling the case where the interval field in the YAML configuration is empty.
    This might lead to unexpected behavior or errors during the generation of events. You can
    add a default value or validate the configuration to ensure it is set.

    gateway/mw_streaming_test.go [94]

    -interval: ""
    +interval: "1s"  # Default to 1 second if not specified
     
    Suggestion importance[1-10]: 9

    Why: This suggestion addresses a potential issue where an empty interval could lead to unexpected behavior. Adding a default value or validation is crucial for ensuring the robustness of the code.

    9
    Update the github.com/golang/protobuf to a more stable version

    Consider using the latest stable version of github.com/golang/protobuf which is v1.5.2
    instead of v1.5.4 to ensure compatibility and stability, as v1.5.4 might contain
    experimental changes.

    go.mod [38]

    -github.com/golang/protobuf v1.5.4
    +github.com/golang/protobuf v1.5.2
     
    Suggestion importance[1-10]: 3

    Why: While the suggestion to use a more stable version is valid, the PR specifically updates to v1.5.4, which might be intentional to leverage new features or fixes. Without more context, it's hard to justify downgrading.

    3
    Enhancement
    Improve error messaging for missing configuration keys

    Use a more specific error message when the http_server key is not found in the
    configuration. This will help in debugging by providing clearer information about what
    exactly is missing in the configuration.

    internal/streaming/manager.go [445-447]

     if !ok {
    -  return paths, nil
    +  return nil, fmt.Errorf("http_server configuration not found in component: %s", component)
     }
     
    Suggestion importance[1-10]: 8

    Why: Providing a more specific error message when the http_server key is not found improves debugging and helps developers quickly identify configuration issues.

    8
    Make the ticker duration configurable for flexibility

    Replace the hardcoded tick duration with a configurable value to allow flexibility in
    changing the interval without modifying the code.

    gateway/mw_streaming.go [201]

    -ticker := time.NewTicker(30 * time.Second)
    +tickerDuration := s.Spec.Config["ticker_duration"].(time.Duration)
    +ticker := time.NewTicker(tickerDuration)
     
    Suggestion importance[1-10]: 6

    Why: Making the ticker duration configurable enhances flexibility and maintainability. However, it is a minor enhancement compared to critical bug fixes or security improvements.

    6
    Make the 'count' field configurable to enhance flexibility

    Instead of using a hardcoded count of 3 in the YAML configuration, consider making this
    configurable through an environment variable or a parameter. This would make the test more
    flexible and maintainable.

    gateway/mw_streaming_test.go [93]

    -count: 3
    +count: ${STREAM_COUNT:3}  # Default to 3 if not specified
     
    Suggestion importance[1-10]: 6

    Why: Making the 'count' field configurable improves the flexibility and maintainability of the test, but it is not as critical as other suggestions.

    6
    Best practice
    Enhance error handling by checking the response status before closing the body

    To improve error handling and avoid potential resource leaks, consider checking the
    response status code before closing the response body. This ensures that any network or
    HTTP errors are handled properly before proceeding.

    gateway/mw_streaming_test.go [294-298]

     resp, err := httpClient.Do(req)
     if err != nil {
       t.Fatalf("Failed to send message to /post: %v", err)
     }
    -resp.Body.Close()
    +defer resp.Body.Close()
    +if resp.StatusCode != http.StatusOK {
    +  t.Fatalf("HTTP request failed with status code: %d", resp.StatusCode)
    +}
     
    Suggestion importance[1-10]: 8

    Why: This suggestion improves error handling and resource management, which are important for maintaining the reliability and stability of the code.

    8
    Use a context with timeout for Redis operations to handle potential delays or hangs

    Avoid using the default context in context.Background() for operations that could benefit
    from timeouts or cancellations. Consider passing a context with timeout or using the
    context from higher-level functions.

    internal/streaming/manager.go [173]

    -err := sm.redis.XGroupCreateMkStream(context.Background(), streamID, consumerGroup, "$").Err()
    +ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    +defer cancel()
    +err := sm.redis.XGroupCreateMkStream(ctx, streamID, consumerGroup, "$").Err()
     
    Suggestion importance[1-10]: 8

    Why: Using a context with timeout for Redis operations is a best practice that can prevent potential delays or hangs, improving the reliability and responsiveness of the system.

    8
    Handle errors from 'SetReadDeadline' to ensure proper websocket timeout setup

    It's recommended to handle potential errors from SetReadDeadline method to ensure that the
    websocket connection is properly set up for timeouts. Ignoring these errors might lead to
    unhandled scenarios where the read deadline is not set as expected.

    gateway/mw_streaming_test.go [276-284]

    -wsConn1.SetReadDeadline(time.Now().Add(500 * time.Millisecond))
    -wsConn2.SetReadDeadline(time.Now().Add(500 * time.Millisecond))
    +if err := wsConn1.SetReadDeadline(time.Now().Add(500 * time.Millisecond)); err != nil {
    +  t.Fatalf("Failed to set read deadline for websocket connection: %v", err)
    +}
    +if err := wsConn2.SetReadDeadline(time.Now().Add(500 * time.Millisecond)); err != nil {
    +  t.Fatalf("Failed to set read deadline for websocket connection: %v", err)
    +}
     
    Suggestion importance[1-10]: 7

    Why: Handling errors from 'SetReadDeadline' is a good practice to ensure that the websocket connection is properly configured, although it is a minor improvement.

    7
    Change the logger output to a more predictable source

    Replace the direct use of log.Writer() with os.Stdout or a specific log file to avoid
    potential issues with the default logger affecting the output of logrus. This change
    ensures that the logger configurations are more predictable and controlled.

    internal/streaming/manager.go [31]

    -logger.Out = log.Writer()
    +logger.Out = os.Stdout
     
    Suggestion importance[1-10]: 7

    Why: Using os.Stdout instead of log.Writer() can make the logger's output more predictable and controlled, which is a good practice. However, this change is not critical and does not address a major issue.

    7

    Copy link

    sonarcloud bot commented Jun 30, 2024

    Quality Gate Failed Quality Gate failed

    Failed conditions
    38.0% Coverage on New Code (required ≥ 80%)

    See analysis details on SonarCloud

    …ddleware
    
    This commit introduces a major change in the streaming architecture:
    
    - Replace shared Redis streams with direct user-to-stream connections
    - Each user now has their own dedicated stream, improving isolation and performance
    
    Additional improvements include:
    
    - Refactor StreamingMiddleware to use consumer group managers
    - Update StreamManager to use sync.Map for thread-safe operations
    - Remove Redis dependency and related functionality
    - Implement garbage collection for unused consumer groups
    - Update tests to reflect new structure and functionality
    - Add support for Kafka in streaming tests
    - Improve error handling and logging throughout
    - Update go.mod and go.sum with new dependencies
    
    These changes significantly enhance the scalability and efficiency of the streaming system by establishing direct connections between users and their respective streams, eliminating the need for a shared Redis-based message queue.
    This commit refactors the `internal/streaming/manager.go` file to update the `Stream` struct and its methods. The main changes include:
    
    1. Remove unused imports: `crypto/sha256` and `sync`.
    2. Update `addMetadata`, `GetHTTPPaths`, `removeUnsafe`, and `removeConsumerGroup` methods to work with the `Stream` struct instead of `StreamManager`.
    3. Remove references to `StreamManager` and update method receivers to use `Stream`.
    4. Update `GetHTTPPaths` to use the `streamConfig` field of the `Stream` struct instead of loading from a map.
    
    These changes align the `Stream` struct with the new design, allowing it to manage a single stream configuration and its associated operations.
    This commit enhances the Stream.Stop() method to make it more robust and prevent nil pointer dereferences. The changes include:
    
    1. Adding a nil check for the stream before attempting to stop it.
    2. Implementing a timeout mechanism using context to prevent indefinite hanging.
    3. Using a goroutine and channel for asynchronous stream stopping.
    4. Improving logging to provide more detailed information about the stopping process.
    
    These modifications aim to resolve issues related to stopping non-existent streams and make the overall process more reliable.
    This commit improves the `testAsyncAPIHttp` function in the `gateway/mw_streaming_test.go` file to increase test reliability and provide more detailed logging. The changes include:
    
    1. Increase initial WebSocket connection delay from 1 to 2 seconds
    2. Extend overall timeout from 10 to 30 seconds
    3. Increase inactivity timeout from 2 to 5 seconds
    4. Add final log message showing total received messages
    
    These modifications aim to give the test more time to receive messages and offer more comprehensive logging information, facilitating better diagnosis of potential issues in message reception.
    This commit improves the `testAsyncAPIHttp` function in the `mw_streaming_test.go` file to provide better debugging information and increase the chances of successful message reception. The changes include:
    
    1. Increase initial WebSocket connection stabilization delay to 5 seconds
    2. Add logging for WebSocket connection stabilization
    3. Extend overall timeout to 60 seconds
    4. Increase inactivity timeout to 10 seconds
    5. Add a 5-second delay before closing WebSocket connections
    6. Improve logging throughout the test execution
    
    These modifications aim to address potential timing issues and provide more detailed information about the test's progress, facilitating easier debugging of any remaining issues.
    kofoworola added a commit that referenced this pull request Sep 12, 2024
    ### **User description**
    <!-- Provide a general summary of your changes in the Title above -->
    
    ## Description
    Based off this POC
    [here](#6291)
    [TT-12893](https://tyktech.atlassian.net/browse/TT-12893)
    
    <!-- Describe your changes in detail -->
    
    ## Related Issue
    
    <!-- This project only accepts pull requests related to open issues. -->
    <!-- If suggesting a new feature or change, please discuss it in an
    issue first. -->
    <!-- If fixing a bug, there should be an issue describing it with steps
    to reproduce. -->
    <!-- OSS: Please link to the issue here. Tyk: please create/link the
    JIRA ticket. -->
    
    ## Motivation and Context
    
    <!-- Why is this change required? What problem does it solve? -->
    
    ## How This Has Been Tested
    
    <!-- Please describe in detail how you tested your changes -->
    <!-- Include details of your testing environment, and the tests -->
    <!-- you ran to see how your change affects other areas of the code,
    etc. -->
    <!-- This information is helpful for reviewers and QA. -->
    
    ## Screenshots (if appropriate)
    
    ## Types of changes
    
    <!-- What types of changes does your code introduce? Put an `x` in all
    the boxes that apply: -->
    
    - [ ] Bug fix (non-breaking change which fixes an issue)
    - [ ] New feature (non-breaking change which adds functionality)
    - [ ] Breaking change (fix or feature that would cause existing
    functionality to change)
    - [ ] Refactoring or add test (improvements in base code or adds test
    coverage to functionality)
    
    ## Checklist
    
    <!-- Go over all the following points, and put an `x` in all the boxes
    that apply -->
    <!-- If there are no documentation updates required, mark the item as
    checked. -->
    <!-- Raise up any additional concerns not covered by the checklist. -->
    
    - [ ] I ensured that the documentation is up to date
    - [ ] I explained why this PR updates go.mod in detail with reasoning
    why it's required
    - [ ] I would like a code coverage CI quality gate exception and have
    explained why
    
    
    [TT-12893]:
    https://tyktech.atlassian.net/browse/TT-12893?atlOrigin=eyJpIjoiNWRkNTljNzYxNjVmNDY3MDlhMDU5Y2ZhYzA5YTRkZjUiLCJwIjoiZ2l0aHViLWNvbS1KU1cifQ
    
    
    ___
    
    ### **PR Type**
    Enhancement, Tests
    
    
    ___
    
    ### **Description**
    - Implemented `StreamingMiddleware` to handle streaming functionality,
    including stream management and integration with API specifications.
    - Added comprehensive tests for streaming API, covering single and
    multiple client scenarios, asynchronous API, and WebSocket connections.
    - Developed `Stream` struct for managing streaming configurations, with
    methods for starting, stopping, and resetting streams.
    - Introduced `PortalClient` for interacting with the developer portal,
    including methods to list webhook credentials and fetch app details.
    - Implemented `portalOutput` for sending messages to webhooks, with
    configuration and connection handling.
    - Enhanced middleware interface to support unloading functionality, with
    hooks for unloading middleware when API specs change.
    - Updated dependencies to support new streaming and portal
    functionalities, including Kafka, NATS, and Benthos integration.
    
    
    ___
    
    
    
    ### **Changes walkthrough** 📝
    <table><thead><tr><th></th><th align="left">Relevant
    files</th></tr></thead><tbody><tr><td><strong>Tests</strong></td><td><details><summary>3
    files</summary><table>
    <tr>
      <td>
        <details>
    <summary><strong>mw_streaming_test.go</strong><dd><code>Add
    comprehensive tests for streaming API functionality</code>&nbsp; &nbsp;
    </dd></summary>
    <hr>
    
    gateway/mw_streaming_test.go
    
    <li>Added tests for streaming API with single and multiple clients.<br>
    <li> Implemented setup functions for streaming API tests.<br> <li>
    Included test cases for asynchronous API and WebSocket connections.<br>
    
    
    </details>
    
    
      </td>
    <td><a
    href="https://github.com/TykTechnologies/tyk/pull/6496/files#diff-a0d1bd0196a741537a3c850e340225c8993e49d709c838af0f1b48b9893af1da">+670/-0</a>&nbsp;
    </td>
    
    </tr>                    
    
    <tr>
      <td>
        <details>
    <summary><strong>manager_test.go</strong><dd><code>Implement tests for
    Stream management and safety checks</code>&nbsp; &nbsp; </dd></summary>
    <hr>
    
    internal/streaming/manager_test.go
    
    <li>Added tests for starting and stopping streams.<br> <li> Tested
    removal and whitelisting of unsafe components.<br> <li> Verified stream
    configuration handling.<br>
    
    
    </details>
    
    
      </td>
    <td><a
    href="https://github.com/TykTechnologies/tyk/pull/6496/files#diff-c3ed576ae7d6430b9ac57b46c39d64981c274188c807ff4f0cfc4f15753e67af">+151/-0</a>&nbsp;
    </td>
    
    </tr>                    
    
    <tr>
      <td>
        <details>
    <summary><strong>portal_test.go</strong><dd><code>Add tests for
    PortalClient webhook credential listing</code>&nbsp; &nbsp; &nbsp;
    &nbsp; </dd></summary>
    <hr>
    
    internal/portal/portal_test.go
    
    <li>Added mock server for testing portal client interactions.<br> <li>
    Implemented tests for listing webhook credentials.<br> <li> Verified
    correct handling of multiple apps and webhooks.<br>
    
    
    </details>
    
    
      </td>
    <td><a
    href="https://github.com/TykTechnologies/tyk/pull/6496/files#diff-2d5d9f02d0c6c149d531f5471b69936ccbf414a02d977813803fc3eafe15052d">+76/-0</a>&nbsp;
    &nbsp; </td>
    
    </tr>                    
    
    </table></details></td></tr><tr><td><strong>Enhancement</strong></td><td><details><summary>6
    files</summary><table>
    <tr>
      <td>
        <details>
    <summary><strong>mw_streaming.go</strong><dd><code>Implement
    StreamingMiddleware for API streaming
    functionality</code></dd></summary>
    <hr>
    
    gateway/mw_streaming.go
    
    <li>Implemented <code>StreamingMiddleware</code> for handling streaming
    functionality.<br> <li> Added methods for initializing, creating, and
    removing streams.<br> <li> Integrated stream management with API
    specifications.<br>
    
    
    </details>
    
    
      </td>
    <td><a
    href="https://github.com/TykTechnologies/tyk/pull/6496/files#diff-6f565750150d990575c808f1ca8f38483160dc6edf05f1534cd0bedb27c2e6c8">+298/-0</a>&nbsp;
    </td>
    
    </tr>                    
    
    <tr>
      <td>
        <details>
    <summary><strong>manager.go</strong><dd><code>Develop Stream struct for
    managing streaming configurations</code></dd></summary>
    <hr>
    
    internal/streaming/manager.go
    
    <li>Created <code>Stream</code> struct for managing streaming
    configurations.<br> <li> Added methods for starting, stopping, and
    resetting streams.<br> <li> Implemented safety checks for removing
    unsafe components.<br>
    
    
    </details>
    
    
      </td>
    <td><a
    href="https://github.com/TykTechnologies/tyk/pull/6496/files#diff-8cda02479026085d17a6c5559a15bf59638174d16fb5f3ad65bebcecb25ad8d8">+236/-0</a>&nbsp;
    </td>
    
    </tr>                    
    
    <tr>
      <td>
        <details>
    <summary><strong>portal_client.go</strong><dd><code>Add PortalClient for
    developer portal interactions</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
    &nbsp; &nbsp; </dd></summary>
    <hr>
    
    internal/portal/portal_client.go
    
    <li>Introduced <code>PortalClient</code> for interacting with the
    developer portal.<br> <li> Added methods to list webhook credentials and
    fetch app details.<br> <li> Defined structures for app and webhook
    details.<br>
    
    
    </details>
    
    
      </td>
    <td><a
    href="https://github.com/TykTechnologies/tyk/pull/6496/files#diff-0752a118f5d1513887f5ebf9930ae9c382a16798c3a4a301dac34523b5010289">+152/-0</a>&nbsp;
    </td>
    
    </tr>                    
    
    <tr>
      <td>
        <details>
    <summary><strong>portal_output.go</strong><dd><code>Implement
    portalOutput for webhook message delivery</code>&nbsp; &nbsp; &nbsp;
    &nbsp; &nbsp; &nbsp; </dd></summary>
    <hr>
    
    internal/portal/portal_output.go
    
    <li>Implemented <code>portalOutput</code> for sending messages to
    webhooks.<br> <li> Added configuration and connection handling for
    portal output.<br> <li> Registered output plugin with Benthos
    service.<br>
    
    
    </details>
    
    
      </td>
    <td><a
    href="https://github.com/TykTechnologies/tyk/pull/6496/files#diff-5225b7c3a3d3677eed2edb98be754e5cc70a124b0af26485511a0528f535048b">+128/-0</a>&nbsp;
    </td>
    
    </tr>                    
    
    <tr>
      <td>
        <details>
    <summary><strong>api_loader.go</strong><dd><code>Integrate
    StreamingMiddleware into API loading process</code>&nbsp; &nbsp; &nbsp;
    </dd></summary>
    <hr>
    
    gateway/api_loader.go
    
    <li>Integrated <code>StreamingMiddleware</code> into the API loading
    process.<br> <li> Adjusted logic for unloading API specifications.<br>
    
    
    </details>
    
    
      </td>
    <td><a
    href="https://github.com/TykTechnologies/tyk/pull/6496/files#diff-cdf0b7f176c9d18e1a314b78ddefc2cb3a94b3de66f1f360174692c915734c68">+9/-5</a>&nbsp;
    &nbsp; &nbsp; </td>
    
    </tr>                    
    
    <tr>
      <td>
        <details>
    <summary><strong>middleware.go</strong><dd><code>Enhance middleware
    interface with unload functionality</code>&nbsp; &nbsp; &nbsp;
    </dd></summary>
    <hr>
    
    gateway/middleware.go
    
    <li>Enhanced middleware interface with unload functionality.<br> <li>
    Added hooks for unloading middleware when API specs change.<br>
    
    
    </details>
    
    
      </td>
    <td><a
    href="https://github.com/TykTechnologies/tyk/pull/6496/files#diff-703054910891a4db633eca0f42ed779d6b4fa75cd9b3aa4c503e681364201c1b">+15/-1</a>&nbsp;
    &nbsp; </td>
    
    </tr>                    
    </table></details></td></tr><tr><td><strong>Configuration
    changes</strong></td><td><details><summary>1 files</summary><table>
    <tr>
      <td>
        <details>
    <summary><strong>config.go</strong><dd><code>Add StreamingConfig to
    global configuration settings</code>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
    </dd></summary>
    <hr>
    
    config/config.go
    
    <li>Added <code>StreamingConfig</code> struct to configuration.<br> <li>
    Enabled streaming configuration in global settings.<br>
    
    
    </details>
    
    
      </td>
    <td><a
    href="https://github.com/TykTechnologies/tyk/pull/6496/files#diff-fe44f09c4d5977b5f5eaea29170b6a0748819c9d02271746a20d81a5f3efca17">+26/-0</a>&nbsp;
    &nbsp; </td>
    
    </tr>                    
    
    </table></details></td></tr><tr><td><strong>Dependencies</strong></td><td><details><summary>1
    files</summary><table>
    <tr>
      <td>
        <details>
    <summary><strong>go.mod</strong><dd><code>Update dependencies for
    streaming and portal integration</code>&nbsp; </dd></summary>
    <hr>
    
    go.mod
    
    <li>Updated dependencies for streaming and portal functionalities.<br>
    <li> Added new modules for Kafka, NATS, and Benthos integration.<br>
    
    
    </details>
    
    
      </td>
    <td><a
    href="https://github.com/TykTechnologies/tyk/pull/6496/files#diff-33ef32bf6c23acb95f5902d7097b7a1d5128ca061167ec0716715b0b9eeaa5f6">+307/-23</a></td>
    
    </tr>                    
    </table></details></td></tr><tr><td><strong>Additional files
    (token-limit)</strong></td><td><details><summary>1
    files</summary><table>
    <tr>
      <td>
        <details>
    <summary><strong>go.sum</strong><dd><code>...</code>&nbsp; &nbsp; &nbsp;
    &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
    &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
    &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
    &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
    &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
    &nbsp; </dd></summary>
    <hr>
    
    go.sum
    
    ...
    
    
    
    </details>
    
    
      </td>
    <td><a
    href="https://github.com/TykTechnologies/tyk/pull/6496/files#diff-3295df7234525439d778f1b282d146a4f1ff6b415248aaac074e8042d9f42d63">+1134/-65</a></td>
    
    </tr>                    
    </table></details></td></tr></tr></tbody></table>
    
    ___
    
    > 💡 **PR-Agent usage**:
    >Comment `/help` on the PR to get a list of all available PR-Agent tools
    and their descriptions
    
    ---------
    
    Co-authored-by: Leonid Bugaev <[email protected]>
    Co-authored-by: Martin Buhr <[email protected]>
    Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
    Projects
    None yet
    Development

    Successfully merging this pull request may close these issues.

    2 participants