From 9a7d45b81d2089a3d836f4c55966218f463ab74b Mon Sep 17 00:00:00 2001 From: "Jorge S. Cuesta" Date: Tue, 20 Aug 2024 13:29:46 -0400 Subject: [PATCH] Add support for setting headers in async relay requests Extended `SendRelaysAsync` function to accept headers as a parameter. Added default headers and merged them with chain-specific headers across various handlers. Updated Dockerfile and included a utility function for merging maps. @NOTE: I was unable to get the test working following the README.md --- Dockerfile | 2 +- .../internal/controllers/relay.go | 13 ++++++-- internal/db_query/queries.sql.go | 1 + .../checks/async_relay_handler.go | 11 +++++-- .../checks/chain_config_handler.go | 30 ++++++++++++++++++- .../checks/data_integrity_handler.go | 12 ++++++-- .../checks/height_check_handler.go | 11 +++++-- internal/relayer/relayer.go | 5 ++++ pkg/common/maps.go | 18 +++++++++++ pkg/pokt/pokt_v0/basic_client.go | 1 + 10 files changed, 90 insertions(+), 14 deletions(-) create mode 100644 pkg/common/maps.go diff --git a/Dockerfile b/Dockerfile index 4b2bbb5..9eb7f7d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -25,7 +25,7 @@ COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ COPY --from=builder /app/main . # Set default value for port exposed -ENV HTTP_SERVER_PORT 8080 +ENV HTTP_SERVER_PORT=8080 EXPOSE $HTTP_SERVER_PORT diff --git a/cmd/gateway_server/internal/controllers/relay.go b/cmd/gateway_server/internal/controllers/relay.go index 27faf71..3385c92 100644 --- a/cmd/gateway_server/internal/controllers/relay.go +++ b/cmd/gateway_server/internal/controllers/relay.go @@ -35,11 +35,18 @@ func (c *RelayController) HandleRelay(ctx *fasthttp.RequestCtx) { return } + contentType := string(ctx.Request.Header.Peek("content-type")) + if contentType == "" { + contentType = "application/json" + } + relay, err := c.relayer.SendRelay(&models.SendRelayRequest{ Payload: &models.Payload{ - Data: string(ctx.PostBody()), - Method: string(ctx.Method()), - Path: path, + // TODO: the best here will been able to get the chain configuration to use the configure headers. + Headers: map[string]string{"content-type": contentType}, + Data: string(ctx.PostBody()), + Method: string(ctx.Method()), + Path: path, }, Chain: chainID, }) diff --git a/internal/db_query/queries.sql.go b/internal/db_query/queries.sql.go index 19f9c9a..8f7582b 100644 --- a/internal/db_query/queries.sql.go +++ b/internal/db_query/queries.sql.go @@ -139,6 +139,7 @@ type GetChainConfigurationsRow struct { TopBucketP90latencyDuration pgtype.Varchar `json:"top_bucket_p90latency_duration"` HeightCheckBlockTolerance *int32 `json:"height_check_block_tolerance"` DataIntegrityCheckLookbackHeight *int32 `json:"data_integrity_check_lookback_height"` + FixedHeaders *pgtype.JSON `json:"fixed_headers"` } // GetChainConfigurations implements Querier.GetChainConfigurations. diff --git a/internal/node_selector_service/checks/async_relay_handler.go b/internal/node_selector_service/checks/async_relay_handler.go index 824da6c..04f5407 100644 --- a/internal/node_selector_service/checks/async_relay_handler.go +++ b/internal/node_selector_service/checks/async_relay_handler.go @@ -13,7 +13,7 @@ type nodeRelayResponse struct { Error error } -func SendRelaysAsync(relayer pokt_v0.PocketRelayer, nodes []*models.QosNode, payload string, method string, path string) chan *nodeRelayResponse { +func SendRelaysAsync(relayer pokt_v0.PocketRelayer, nodes []*models.QosNode, payload string, method string, path string, headers map[string]string) chan *nodeRelayResponse { // Define a channel to receive relay responses relayResponses := make(chan *nodeRelayResponse, len(nodes)) var wg sync.WaitGroup @@ -22,8 +22,13 @@ func SendRelaysAsync(relayer pokt_v0.PocketRelayer, nodes []*models.QosNode, pay sendRelayAsync := func(node *models.QosNode) { defer wg.Done() relay, err := relayer.SendRelay(&relayer_models.SendRelayRequest{ - Signer: node.GetAppStakeSigner(), - Payload: &relayer_models.Payload{Data: payload, Method: method, Path: path}, + Signer: node.GetAppStakeSigner(), + Payload: &relayer_models.Payload{ + Data: payload, + Method: method, + Path: path, + Headers: headers, + }, Chain: node.GetChain(), SelectedNodePubKey: node.GetPublicKey(), Session: node.MorseSession, diff --git a/internal/node_selector_service/checks/chain_config_handler.go b/internal/node_selector_service/checks/chain_config_handler.go index a5b278b..3afa006 100644 --- a/internal/node_selector_service/checks/chain_config_handler.go +++ b/internal/node_selector_service/checks/chain_config_handler.go @@ -1,6 +1,9 @@ package checks -import "github.com/pokt-network/gateway-server/internal/chain_configurations_registry" +import ( + "github.com/pokt-network/gateway-server/internal/chain_configurations_registry" + "github.com/pokt-network/gateway-server/pkg/common" +) // GetBlockHeightTolerance - helper function to retrieve block height tolerance across checks func GetBlockHeightTolerance(chainConfiguration chain_configurations_registry.ChainConfigurationsService, chainId string, defaultValue int) int { @@ -19,3 +22,28 @@ func GetDataIntegrityHeightLookback(chainConfiguration chain_configurations_regi } return int(*chainConfig.DataIntegrityCheckLookbackHeight) } + +// GetFixedHeaders returns the fixed headers for a specific chain configuration. +// It takes a ChainConfigurationsService to retrieve the chain configuration, +// the chainId string to identify the specific chain, +// and a defaultValue map[string]string to return in case the chain configuration is not found. +// The function first retrieves the chain configuration using the chainConfiguration.GetChainConfiguration method. +// If the chain configuration is not found, it returns the defaultValue. +// If the chain configuration is found, it retrieves the fixed headers as a map[string]string from the chain configuration. +// If the fixed headers cannot be cast into a map[string]string, it returns the defaultValue. +// Otherwise, it returns the retrieved fixed headers. +func GetFixedHeaders(chainConfiguration chain_configurations_registry.ChainConfigurationsService, chainId string, defaultValue map[string]string) map[string]string { + chainConfig, ok := chainConfiguration.GetChainConfiguration(chainId) + value := defaultValue + + if ok && chainConfig.FixedHeaders != nil { + if headers, castOk := chainConfig.FixedHeaders.Get().(map[string]string); castOk { + // apply the specific headers override coming from chain configuration over the defaults one. + // in this way, the chain configuration on db only needs to hold the overrides or additions that are may + // not add to base code. + value = common.MergeStringMaps(value, headers) + } + } + + return value +} diff --git a/internal/node_selector_service/checks/data_integrity_handler.go b/internal/node_selector_service/checks/data_integrity_handler.go index e5ce599..3db0b46 100644 --- a/internal/node_selector_service/checks/data_integrity_handler.go +++ b/internal/node_selector_service/checks/data_integrity_handler.go @@ -20,6 +20,10 @@ const ( dataIntegrityHeightLookbackDefault = 25 ) +var ( + dataIntegrityHeadersDefault = map[string]string{"content-type": "application/json"} +) + type nodeHashRspPair struct { node *models.QosNode blockIdentifier string @@ -47,10 +51,12 @@ func PerformDataIntegrityCheck(check *Check, calculatePayload GetBlockByNumberPa var nodeResponsePairs []*nodeHashRspPair - // find a random block to search that nodes should have access too - blockNumberToSearch := sourceOfTruth.GetLastKnownHeight() - uint64(GetDataIntegrityHeightLookback(check.ChainConfiguration, sourceOfTruth.GetChain(), dataIntegrityHeightLookbackDefault)) + chainId := sourceOfTruth.GetChain() + checkHeaders := GetFixedHeaders(check.ChainConfiguration, chainId, dataIntegrityHeadersDefault) - attestationResponses := SendRelaysAsync(check.PocketRelayer, getEligibleDataIntegrityCheckNodes(check.NodeList), calculatePayload(blockNumberToSearch), "POST", path) + // find a random block to search that nodes should have access too + blockNumberToSearch := sourceOfTruth.GetLastKnownHeight() - uint64(GetDataIntegrityHeightLookback(check.ChainConfiguration, chainId, dataIntegrityHeightLookbackDefault)) + attestationResponses := SendRelaysAsync(check.PocketRelayer, getEligibleDataIntegrityCheckNodes(check.NodeList), calculatePayload(blockNumberToSearch), "POST", path, checkHeaders) for rsp := range attestationResponses { if rsp.Error != nil { diff --git a/internal/node_selector_service/checks/height_check_handler.go b/internal/node_selector_service/checks/height_check_handler.go index b61fc69..fde0840 100644 --- a/internal/node_selector_service/checks/height_check_handler.go +++ b/internal/node_selector_service/checks/height_check_handler.go @@ -17,6 +17,10 @@ const ( defaultCheckPenalty = time.Minute * 5 ) +var ( + defaultHeaders = map[string]string{"content-type": "application/json"} +) + type HeightJsonParser func(response string) (uint64, error) // PerformDefaultHeightCheck is the default implementation of a height check by: @@ -26,12 +30,13 @@ type HeightJsonParser func(response string) (uint64, error) // 3. Filtering out nodes that are returning a height out of the zScore threshold // 4. Punishing the nodes with defaultCheckPenalty that exceed the height tolerance. func PerformDefaultHeightCheck(check *Check, payload string, path string, parseHeight HeightJsonParser, logger *zap.Logger) { - - logger.Sugar().Infow("running default height check", "chain", check.NodeList[0].GetChain()) + chainId := check.NodeList[0].GetChain() + logger.Sugar().Infow("running default height check", "chain", chainId) + checkHeaders := GetFixedHeaders(check.ChainConfiguration, chainId, defaultHeaders) var nodesResponded []*models.QosNode // Send request to all nodes - relayResponses := SendRelaysAsync(check.PocketRelayer, getEligibleHeightCheckNodes(check.NodeList), payload, "POST", path) + relayResponses := SendRelaysAsync(check.PocketRelayer, getEligibleHeightCheckNodes(check.NodeList), payload, "POST", path, checkHeaders) // Process relay responses for resp := range relayResponses { diff --git a/internal/relayer/relayer.go b/internal/relayer/relayer.go index 1cc0aa2..0157522 100644 --- a/internal/relayer/relayer.go +++ b/internal/relayer/relayer.go @@ -216,6 +216,11 @@ func (r *Relayer) altruistRelay(req *models.SendRelayRequest) (*models.SendRelay fasthttp.ReleaseResponse(response) }() + checkHeaders := checks.GetFixedHeaders(r.chainConfigurationRegistry, req.Chain, map[string]string{"content-type": "application/json"}) + for key, value := range checkHeaders { + request.Header.Set(key, value) + } + requestTimeout := r.getAltruistRequestTimeout(req.Chain) request.Header.SetUserAgent(r.userAgent) request.SetRequestURI(chainConfig.AltruistUrl.String) diff --git a/pkg/common/maps.go b/pkg/common/maps.go new file mode 100644 index 0000000..e8fbdce --- /dev/null +++ b/pkg/common/maps.go @@ -0,0 +1,18 @@ +package common + +func MergeStringMaps(map1, map2 map[string]string) map[string]string { + // Create a new map to store the merged result + mergedMap := make(map[string]string) + + // Add all entries from map1 to mergedMap + for k, v := range map1 { + mergedMap[k] = v + } + + // Add all entries from map2 to mergedMap + for k, v := range map2 { + mergedMap[k] = v + } + + return mergedMap +} diff --git a/pkg/pokt/pokt_v0/basic_client.go b/pkg/pokt/pokt_v0/basic_client.go index ad2cc1c..da6dbd3 100644 --- a/pkg/pokt/pokt_v0/basic_client.go +++ b/pkg/pokt/pokt_v0/basic_client.go @@ -167,6 +167,7 @@ func (r BasicClient) makeRequest(endpoint string, method string, requestData any }() request.Header.SetUserAgent(r.userAgent) + request.Header.SetContentType("application/json") if hostOverride != nil { request.SetRequestURI(*hostOverride + endpoint)