Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enable the ability for buffering and aggregation to work at the same #851

Merged
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions datadog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def initialize(
statsd_port=None, # type: Optional[int]
statsd_disable_aggregator=True, # type: bool
gh123man marked this conversation as resolved.
Show resolved Hide resolved
statsd_disable_buffering=True, # type: bool
statsd_aggregation_flush_interval=2, # type: float
statsd_aggregation_flush_interval=0.3, # type: float
statsd_use_default_route=False, # type: bool
statsd_socket_path=None, # type: Optional[str]
statsd_namespace=None, # type: Optional[str]
Expand Down Expand Up @@ -82,8 +82,9 @@ def initialize(
(default: True).
:type statsd_disable_aggregator: boolean

:param statsd_aggregation_flush_interval: Sets the flush interval for aggregation
(default: 2 seconds)
:param statsd_aggregation_flush_interval: If aggregation is enabled, set the flush interval for
aggregation/buffering
(default: 0.3 seconds)
:type statsd_aggregation_flush_interval: float

:param statsd_use_default_route: Dynamically set the statsd host to the default route
Expand Down
126 changes: 39 additions & 87 deletions datadog/dogstatsd/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,9 @@
DEFAULT_PORT = 8125

# Buffering-related values (in seconds)
DEFAULT_BUFFERING_FLUSH_INTERVAL = 0.3
DEFAULT_FLUSH_INTERVAL = 0.3
MIN_FLUSH_INTERVAL = 0.0001

# Aggregation-related values (in seconds)
DEFAULT_AGGREGATION_FLUSH_INTERVAL = 2
# Env var to enable/disable sending the container ID field
ORIGIN_DETECTION_ENABLED = "DD_ORIGIN_DETECTION_ENABLED"

Expand Down Expand Up @@ -147,7 +145,7 @@ def __init__(
host=DEFAULT_HOST, # type: Text
port=DEFAULT_PORT, # type: int
max_buffer_size=None, # type: None
flush_interval=DEFAULT_BUFFERING_FLUSH_INTERVAL, # type: float
flush_interval=DEFAULT_FLUSH_INTERVAL, # type: float
disable_aggregating=True, # type: bool
gh123man marked this conversation as resolved.
Show resolved Hide resolved
disable_buffering=True, # type: bool
namespace=None, # type: Optional[Text]
Expand Down Expand Up @@ -455,26 +453,16 @@ def __init__(
self.aggregator = Aggregator()
# Indicates if the process is about to fork, so we shouldn't start any new threads yet.
self._forking = False
# Currently, we do not allow both aggregation and buffering, we may revisit this in the future
if self._disable_buffering and self._disable_aggregating:
self._send = self._send_to_server
log.debug("Statsd buffering and aggregation is disabled")
elif self._disable_aggregating:
# Start the flush thread if buffering is enabled and the interval is above
# a reasonable range. This both prevents thrashing and allow us to use "0.0"
# as a value for disabling the automatic flush timer as well.

if not self._disable_buffering:
self._send = self._send_to_buffer
self._start_flush_thread(
self._flush_interval,
self.flush_buffered_metrics,
)
else:
self._send = self._send_to_server
self._disable_buffering = True
self._start_flush_thread(
self._flush_interval,
self.flush_aggregated_metrics,
)

if not self._disable_aggregating or not self._disable_buffering:
self._start_flush_thread()
else:
log.debug("Statsd buffering and aggregation is disabled")

self._queue = None
self._sender_thread = None
Expand Down Expand Up @@ -551,30 +539,14 @@ def enable_telemetry(self):
self._telemetry = True

# Note: Invocations of this method should be thread-safe
def _start_flush_thread(
self,
flush_interval,
flush_function,
):
if (self._disable_buffering or not self._disable_aggregating) and flush_function == self.flush_buffered_metrics:
log.debug("Statsd periodic buffer flush is disabled")
return
if (
self._disable_aggregating
and flush_function == self.flush_aggregated_metrics
):
log.debug("Statsd periodic aggregating flush is disabled")
def _start_flush_thread(self):
if self._disable_aggregating and self.disable_buffering:
log.debug("Statsd periodic buffer and aggregation flush is disabled")
return

flush_type = ""
if self._disable_buffering:
flush_type = "aggregation"
else:
flush_type = "buffering"

if flush_interval <= MIN_FLUSH_INTERVAL:
if self._flush_interval <= MIN_FLUSH_INTERVAL:
log.debug(
"the set flush interval for %s is less then the minimum", flush_type
"the set flush interval is less then the minimum"
)
return

Expand All @@ -587,30 +559,31 @@ def _start_flush_thread(
def _flush_thread_loop(self, flush_interval):
while not self._flush_thread_stop.is_set():
time.sleep(flush_interval)
flush_function()

if not self._disable_aggregating:
self.flush_aggregated_metrics()
if not self._disable_buffering:
self.flush_buffered_metrics()
self._flush_thread = threading.Thread(
name="{}_flush_thread".format(self.__class__.__name__),
target=_flush_thread_loop,
args=(self, flush_interval,),
args=(self, self._flush_interval,),
)
self._flush_thread.daemon = True
self._flush_thread.start()
log.debug(
"Statsd %s flush thread registered with period of %s",
flush_type,
flush_interval,
"Statsd flush thread registered with period of %s",
self._flush_interval,
)

# Note: Invocations of this method should be thread-safe
def _stop_flush_thread(self):
if not self._flush_thread:
return
try:
if self._disable_aggregating:
self.flush_buffered_metrics()
else:
if not self._disable_aggregating:
self.flush_aggregated_metrics()
if not self.disable_buffering:
self.flush_buffered_metrics()
finally:
pass

Expand Down Expand Up @@ -645,18 +618,16 @@ def disable_buffering(self, is_disabled):

self._disable_buffering = is_disabled

# If buffering has been disabled, flush and kill the background thread
# If buffering (and aggregation) has been disabled, flush and kill the background thread
# otherwise start up the flushing thread and enable the buffering.
if is_disabled:
self._send = self._send_to_server
self._stop_flush_thread()
if self._disable_aggregating and self.disable_buffering:
self._stop_flush_thread()
log.debug("Statsd buffering is disabled")
else:
self._send = self._send_to_buffer
self._start_flush_thread(
self._flush_interval,
self.flush_buffered_metrics,
)
self._start_flush_thread()

def disable_aggregation(self):
with self._config_lock:
Expand All @@ -666,22 +637,21 @@ def disable_aggregation(self):

self._disable_aggregating = True

# If aggregation has been disabled, flush and kill the background thread
# If aggregation and buffering has been disabled, flush and kill the background thread
# otherwise start up the flushing thread and enable aggregation.
self._stop_flush_thread()
if self._disable_aggregating and self.disable_buffering:
self._stop_flush_thread()
log.debug("Statsd aggregation is disabled")

def enable_aggregation(self, aggregation_flush_interval=DEFAULT_AGGREGATION_FLUSH_INTERVAL):
def enable_aggregation(self, flush_interval=DEFAULT_FLUSH_INTERVAL):
with self._config_lock:
if not self._disable_aggregating:
return
self._disable_aggregating = False
self._flush_interval = aggregation_flush_interval
self._send = self._send_to_server
self._start_flush_thread(
self._flush_interval,
self.flush_aggregated_metrics,
)
self._flush_interval = flush_interval
if self._disable_buffering:
self._send = self._send_to_server
self._start_flush_thread()

@staticmethod
def resolve_host(host, use_default_route):
Expand Down Expand Up @@ -1533,16 +1503,7 @@ def pre_fork(self):

def post_fork_parent(self):
"""Restore the client state after a fork in the parent process."""
if self._disable_aggregating:
self._start_flush_thread(
self._flush_interval,
self.flush_buffered_metrics,
)
else:
self._start_flush_thread(
self._flush_interval,
self.flush_aggregated_metrics,
)
self._start_flush_thread()
self._start_sender_thread()
self._config_lock.release()

Expand All @@ -1565,16 +1526,7 @@ def post_fork_child(self):
self.close_socket()

with self._config_lock:
if self._disable_aggregating:
self._start_flush_thread(
self._flush_interval,
self.flush_buffered_metrics,
)
else:
self._start_flush_thread(
self._flush_interval,
self.flush_aggregated_metrics,
)
self._start_flush_thread()
self._start_sender_thread()

def stop(self):
Expand All @@ -1588,8 +1540,8 @@ def stop(self):
self.disable_background_sender()
self._disable_buffering = True
self._disable_aggregating = True
self.flush_buffered_metrics()
self.flush_aggregated_metrics()
self.flush_buffered_metrics()
self.close_socket()


Expand Down
34 changes: 31 additions & 3 deletions tests/unit/dogstatsd/test_statsd.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
# Datadog libraries
from datadog import initialize, statsd
from datadog import __version__ as version
from datadog.dogstatsd.base import DEFAULT_BUFFERING_FLUSH_INTERVAL, DogStatsd, MIN_SEND_BUFFER_SIZE, UDP_OPTIMAL_PAYLOAD_LENGTH, UDS_OPTIMAL_PAYLOAD_LENGTH
from datadog.dogstatsd.base import DEFAULT_FLUSH_INTERVAL, DogStatsd, MIN_SEND_BUFFER_SIZE, UDP_OPTIMAL_PAYLOAD_LENGTH, UDS_OPTIMAL_PAYLOAD_LENGTH
from datadog.dogstatsd.context import TimedContextManagerDecorator
from datadog.util.compat import is_higher_py35, is_p3k
from tests.util.contextmanagers import preserve_environment_variable, EnvVars
Expand All @@ -41,7 +41,7 @@ class FakeSocket(object):

FLUSH_GRACE_PERIOD = 0.2

def __init__(self, flush_interval=DEFAULT_BUFFERING_FLUSH_INTERVAL):
def __init__(self, flush_interval=DEFAULT_FLUSH_INTERVAL):
self.payloads = deque()

self._flush_interval = flush_interval
Expand Down Expand Up @@ -1087,6 +1087,34 @@ def test_flush_interval(self):
'page.views:1|c\n',
fake_socket.recv(2, no_wait=True)
)

def test_aggregation_buffering_simultaneously(self):
dogstatsd = DogStatsd(disable_buffering=False, disable_aggregating=False, telemetry_min_flush_interval=0)
fake_socket = FakeSocket()
dogstatsd.socket = fake_socket
for _ in range(10):
dogstatsd.increment('test.aggregation_and_buffering')
self.assertIsNone(fake_socket.recv(no_wait=True))
dogstatsd.flush_aggregated_metrics()
dogstatsd.flush_buffered_metrics()
self.assert_equal_telemetry('test.aggregation_and_buffering:10|c\n', fake_socket.recv(2))

def test_aggregation_buffering_simultaneously_with_interval(self):
dogstatsd = DogStatsd(disable_buffering=False, disable_aggregating=False, flush_interval=1, telemetry_min_flush_interval=0)
fake_socket = FakeSocket()
dogstatsd.socket = fake_socket
for _ in range(10):
dogstatsd.increment('test.aggregation_and_buffering_with_interval')
self.assertIsNone(fake_socket.recv(no_wait=True))

time.sleep(0.3)
self.assertIsNone(fake_socket.recv(no_wait=True))

time.sleep(1)
self.assert_equal_telemetry(
'test.aggregation_and_buffering_with_interval:10|c\n',
fake_socket.recv(2, no_wait=True)
)

def test_disable_buffering(self):
dogstatsd = DogStatsd(disable_buffering=True, telemetry_min_flush_interval=0)
Expand All @@ -1111,7 +1139,7 @@ def test_flush_disable(self):
dogstatsd.increment('page.views')
self.assertIsNone(fake_socket.recv(no_wait=True))

time.sleep(DEFAULT_BUFFERING_FLUSH_INTERVAL)
time.sleep(DEFAULT_FLUSH_INTERVAL)
self.assertIsNone(fake_socket.recv(no_wait=True))

time.sleep(0.3)
Expand Down
Loading