Skip to content

Commit

Permalink
Updated to version v4.0.1
Browse files Browse the repository at this point in the history
Updated to version v4.0.1
  • Loading branch information
knihit authored May 19, 2023
2 parents 43bf6bd + a3897f3 commit 130ec1b
Show file tree
Hide file tree
Showing 6 changed files with 394 additions and 1 deletion.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,5 @@ six*
urllib*

# Ignore lib folder within each lambada folder. Only include lib folder at upper level
/source/**/lib
/source/**/lib
!/source/lib
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [4.0.1] - 2023-05-19

### Fixed

- Updated gitignore files to resolve the issue for missing files [Github issue 244](https://github.com/aws-solutions/aws-waf-security-automations/issues/244) [Github issue 243](https://github.com/aws-solutions/aws-waf-security-automations/issues/243) [Github issue 245](https://github.com/aws-solutions/aws-waf-security-automations/issues)

## [4.0.0] - 2023-05-11

### Added
Expand Down
69 changes: 69 additions & 0 deletions source/lib/cfn_response.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
######################################################################################################################
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. #
# #
# Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance #
# with the License. A copy of the License is located at #
# #
# http://www.apache.org/licenses/LICENSE-2.0 #
# #
# or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES #
# OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions #
# and limitations under the License. #
######################################################################################################################
# !/bin/python

import requests
import json

def send_response(log, event, context, response_status, response_data, resource_id, reason=None):
"""
Send a response to an AWS CloudFormation custom resource.
Parameters:
event: The fields in a custom resource request
context: An object, specific to Lambda functions, that you can use to specify
when the function and any callbacks have completed execution, or to
access information from within the Lambda execution environment
response_status: Whether the function successfully completed - SUCCESS or FAILED
response_data: The Data field of a custom resource response object
resource_id: The id of the custom resource that invoked the function
reason: The error message if the function fails
Returns: None
"""
log.debug("[send_response] Start")

responseUrl = event['ResponseURL']
cw_logs_url = "https://console.aws.amazon.com/cloudwatch/home?region=%s#logEventViewer:group=%s;stream=%s" % (
context.invoked_function_arn.split(':')[3], context.log_group_name, context.log_stream_name)

log.info("[send_response] Sending cfn response url: %s", responseUrl)
responseBody = {}
responseBody['Status'] = response_status
responseBody['Reason'] = reason or ('See the details in CloudWatch Logs: ' + cw_logs_url)
responseBody['PhysicalResourceId'] = resource_id
responseBody['StackId'] = event['StackId']
responseBody['RequestId'] = event['RequestId']
responseBody['LogicalResourceId'] = event['LogicalResourceId']
responseBody['NoEcho'] = False
responseBody['Data'] = response_data

json_responseBody = json.dumps(responseBody)
log.debug("Response body:\n" + json_responseBody)

headers = {
'content-type': '',
'content-length': str(len(json_responseBody))
}

try:
response = requests.put(responseUrl,
data=json_responseBody,
headers=headers,
timeout=10)
log.info("[send_response] Sending cfn response status code: %s", response.reason)

except Exception as error:
log.error("[send_response] Failed executing requests.put(..)")
log.error(str(error))

log.debug("[send_response] End")
109 changes: 109 additions & 0 deletions source/lib/cw_metrics_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
######################################################################################################################
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. #
# #
# Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance #
# with the License. A copy of the License is located at #
# #
# http://www.apache.org/licenses/LICENSE-2.0 #
# #
# or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES #
# OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions #
# and limitations under the License. #
######################################################################################################################
#!/bin/python

import datetime
from os import environ
from lib.boto3_util import create_client

class WAFCloudWatchMetrics(object):
"""
This class creates a wrapper function for cloudwatch get_metric_statistics API
and another function to add the waf cw metric statistics to the anonymous usage
data that the solution collects
"""
def __init__(self, log):
self.log = log
self.cw_client = create_client('cloudwatch')

def get_cw_metric_statistics(self, metric_name, period_seconds, waf_rule,
namespace='AWS/WAFV2',
statistics=['Sum'],
start_time=datetime.datetime.utcnow(),
end_time=datetime.datetime.utcnow(),
web_acl='STACK_NAME'):
"""
Get a WAF CloudWatch metric given a WAF rule and metric name.
Parameters:
metric_name: string. The name of the metric. Optional.
period_seconds: integer. The granularity, in seconds, of the returned data points.
waf_rule: string. The name of the WAF rule.
namespace: string. The namespace of the metric. Optional.
statistics: list. The metric statistics, other than percentile. Optional.
start_time: datetime. The time stamp that determines the first data point to return. Optional.
end_time: datetime. The time stamp that determines the last data point to return. Optional.
web_acl: string. The name of the WebACL. Optional
Returns: Metric data points if any, or None
"""
try:
response = self.cw_client.get_metric_statistics(
MetricName=metric_name,
Namespace=namespace,
Statistics=statistics,
Period=period_seconds,
StartTime=start_time - datetime.timedelta(seconds=period_seconds),
EndTime=end_time,
Dimensions=[
{
"Name": "Rule",
"Value": waf_rule
},
{
"Name": "WebACL",
"Value": environ.get(web_acl)
},
{
"Name": "Region",
"Value": environ.get('AWS_REGION')
}
]
)
self.log.debug("[cw_metrics_util: get_cw_metric_statistics] response:\n{}".format(response))
return response if len(response['Datapoints']) > 0 else None
except Exception as e:
self.log.error("[cw_metrics_util: get_cw_metric_statistics] Failed to get metric %s.", metric_name)
self.log.error(e)
return None

def add_waf_cw_metric_to_usage_data(self, metric_name, period_seconds, waf_rule,
usage_data, usage_data_field_name, default_value):
"""
Get the CloudWatch metric statistics given a WAF rule and metric name, and
add it to the anonymous usage data collected by the solution.
Parameters:
metric_name: string. The name of the metric. Optional.
period_seconds: integer. The granularity, in seconds, of the returned data points.
waf_rule: string. The name of the WAF rule.
usage_data: JSON. Anonymous customer usage data of the solution
usage_data_field_name: string. The field name in the usage data whose value will be
replaced by the waf cloudwatch metric (if any)
default_value: number. The default value of the field in the usage data
Returns: JSON. usage data.
"""
self.log.info("[cw_metrics_util: add_waf_cw_metric_to_usage_data] "
+ "Get metric %s for waf rule %s." %(metric_name, waf_rule))

response = self.get_cw_metric_statistics(
metric_name=metric_name,
period_seconds=period_seconds,
waf_rule=waf_rule
)
usage_data[usage_data_field_name] = \
response['Datapoints'][0]['Sum'] if response is not None else default_value

self.log.info("[cw_metrics_util: add_waf_cw_metric_to_usage_data] "
+ "%s - rule %s: %s"%(metric_name, waf_rule, str(usage_data[usage_data_field_name])))

return usage_data
29 changes: 29 additions & 0 deletions source/lib/logging_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
######################################################################################################################
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. #
# #
# Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance #
# with the License. A copy of the License is located at #
# #
# http://www.apache.org/licenses/LICENSE-2.0 #
# #
# or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES #
# OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions #
# and limitations under the License. #
######################################################################################################################
#!/bin/python

import logging
from os import environ

def set_log_level(default_log_level='ERROR'):
default_log_level = logging.getLevelName(default_log_level.upper())
log_level = str(environ['LOG_LEVEL'].upper()) \
if 'LOG_LEVEL' in environ else default_log_level

log = logging.getLogger()

if log_level not in ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']:
log_level = 'ERROR'
log.setLevel(log_level)

return log
Loading

0 comments on commit 130ec1b

Please sign in to comment.