Skip to content

Commit

Permalink
initial backport of graphite-project#56
Browse files Browse the repository at this point in the history
ah

lgtm is wrong here

migrating to monkeypatching

lgtm is wrong here

default relay method

style fixes

fix linting

init tests

different aggregation_rules init

Adding gitignore

Fixing tests

Adding new pythons

Adding new pythons
  • Loading branch information
deniszh committed Oct 3, 2020
1 parent 87641a5 commit aa54875
Show file tree
Hide file tree
Showing 15 changed files with 170 additions and 42 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ coverage.xml
.mr.developer.cfg
.project
.pydevproject
.idea

# Rope
.ropeproject
Expand All @@ -55,3 +56,4 @@ docs/_build/

# Virtualenv
.Python
venv
15 changes: 12 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ jobs:
- name: "Python 2.7"
python: "2.7"
env: TOXENV=py27
- name: "Python Linting"
python: "3.5"
env: TOXENV=lint
- name: "pypy 2.7"
python: "pypy"
env: TOXENV=py27
- name: "Python 3.5"
python: "3.5"
env: TOXENV=py3
Expand All @@ -16,6 +16,15 @@ jobs:
- name: "Python 3.7"
python: "3.7"
env: TOXENV=py3
- name: "Python 3.8"
python: "3.8"
env: TOXENV=py3
- name: "pypy 3.6"
python: "pypy3"
env: TOXENV=py27
- name: "Python Linting"
python: "3.8"
env: TOXENV=lint
install:
- pip install tox
script:
Expand Down
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ DESTINATIONS = 192.168.9.13:2004:carbon01, 192.168.9.15:2004:carbon02, 192.168.6
REPLICATION_FACTOR = 2
SSH_USER = carbon
[agg]
DESTINATIONS = 192.168.9.13:2004:carbon01, 192.168.9.15:2004:carbon02, 192.168.6.20:2004:carbon03
RELAY_METHOD = aggregated-consistent-hashing
REPLICATION_FACTOR = 2
SSH_USER = carbon
[fnv]
DESTINATIONS = 192.168.9.13:2004:ba603c36342304ed77953f84ac4d357b, 192.168.9.15:2004:5dd63865534f84899c6e5594dba6749a, 192.168.6.20:2004:866a18b81f2dc4649517a1df13e26f28
REPLICATION_FACTOR = 2
Expand All @@ -37,6 +43,8 @@ HASHING_TYPE = fnv1a_ch

You should take care to match the list of destination IPs or hostnames to the nodes in your cluster (i.e. it should match with routing configuretion of your carbon relay). Order is important because of how the consistent hash ring is created.

You can configure the relay method to be one of "consistent-hashing" or "aggregated-consistent-hashing". If omitted, "consistent-hashing" is used by default. Use of "aggregated-consistent-hashing" usually requires a rules file to be provided to relevant commands.

The replication factor should match the replication factor for the cluster.

Also, you can choose to provide a SSH user that will be used when carbonate requires connecting to another node in the cluster to perform an operation. If this is not provided, then the current user executing the command will be chosen.
Expand Down Expand Up @@ -78,6 +86,10 @@ optional arguments:
/opt/graphite/conf/carbonate.conf)
-C CLUSTER, --cluster CLUSTER
Cluster name (default: main)
-a AGGREGATION_RULES, --aggregation-rules AGGREGATION_RULES
File containing rules used in conjunction with the
"aggregated-consistent-hashing" relay method (default:
/opt/graphite/conf/aggregation-rules.conf)
-s, --short Only display the address, without port and cluster
name (default: False)
```
Expand Down Expand Up @@ -115,6 +127,10 @@ optional arguments:
/opt/graphite/conf/carbonate.conf)
-C CLUSTER, --cluster CLUSTER
Cluster name (default: main)
-a AGGREGATION_RULES, --aggregation-rules AGGREGATION_RULES
File containing rules used in conjunction with the
"aggregated-consistent-hashing" relay method (default:
/opt/graphite/conf/aggregation-rules.conf)
-f METRICS_FILE, --metrics-file METRICS_FILE
File containing metric names to filter, or '-' to read
from STDIN (default: -)
Expand Down
16 changes: 14 additions & 2 deletions carbonate/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ def carbon_lookup():
type=str,
help='Full metric name to search for')

parser.add_argument(
'-a', '--aggregation-rules',
default='/opt/graphite/conf/aggregation-rules.conf',
help='File containing rules used in conjunction with the ' +
'"aggregated-consistent-hashing" relay method')

parser.add_argument(
'-s', '--short',
action='store_true',
Expand All @@ -82,7 +88,7 @@ def carbon_lookup():
args = parser.parse_args()

config = Config(args.config_file)
cluster = Cluster(config, args.cluster)
cluster = Cluster(config, args.cluster, args.aggregation_rules)

results = lookup(str(args.metric[0]), cluster)

Expand All @@ -103,6 +109,12 @@ def carbon_sieve():
help='File containing metric names to filter, or \'-\' ' +
'to read from STDIN')

parser.add_argument(
'-a', '--aggregation-rules',
default='/opt/graphite/conf/aggregation-rules.conf',
help='File containing rules used in conjunction with the ' +
'"aggregated-consistent-hashing" relay method')

parser.add_argument(
'-n', '--node',
help='Filter for metrics belonging to this node. Uses the local ' +
Expand All @@ -116,7 +128,7 @@ def carbon_sieve():
args = parser.parse_args()

config = Config(args.config_file)
cluster = Cluster(config, args.cluster)
cluster = Cluster(config, args.cluster, args.aggregation_rules)
invert = args.invert

metrics = metrics_from_args(args)
Expand Down
41 changes: 21 additions & 20 deletions carbonate/cluster.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import os
import sys
import inspect

# Inject the graphite libs into the system path
venv_root = ""
Expand All @@ -12,31 +11,33 @@
# We're going to use carbon's libs directly to do things
try:
from carbon import util
from carbon.routers import ConsistentHashingRouter
from carbon.conf import Settings
from carbon.aggregator.rules import RuleManager
from carbon.routers import AggregatedConsistentHashingRouter, \
ConsistentHashingRouter
from carbon.hashing import ConsistentHashRing
except ImportError as e:
raise SystemExit("No bueno. Can't import carbon! (" + str(e) + ")")


class Cluster():
def __init__(self, config, cluster='main'):
# Support multiple versions of carbon, the API changed in 0.10.
args = inspect.getargspec(ConsistentHashingRouter.__init__).args
if 'replication_factor' in args:
r = ConsistentHashingRouter(config.replication_factor(cluster))
else:
class Settings(object):
REPLICATION_FACTOR = config.replication_factor(cluster)
DIVERSE_REPLICAS = False
ROUTER_HASH_TYPE = None
r = ConsistentHashingRouter(Settings())

# 'hash_type' was added only in carbon 1.0.2 or master
args = inspect.getargspec(ConsistentHashRing.__init__).args
if 'hash_type' in args:
r.ring = ConsistentHashRing(nodes=[],
hash_type=config.hashing_type(cluster))

def __init__(self, config, cluster='main', aggregation_rules=None):
settings = Settings()
settings['DIVERSE_REPLICAS'] = config.diverse_replicas(cluster)
settings['REPLICATION_FACTOR'] = config.replication_factor(cluster)
settings['ROUTER_HASH_TYPE'] = config.hashing_type(cluster)
settings['aggregation-rules'] = None
r = ConsistentHashingRouter(settings)

relay_method = config.relay_method(cluster=cluster)
if relay_method == "aggregated-consistent-hashing":
r = AggregatedConsistentHashingRouter(settings)
if aggregation_rules:
RuleManager.read_from(aggregation_rules)
r.agg_rules_manager = RuleManager

r.ring = ConsistentHashRing(nodes=[],
hash_type=config.hashing_type(cluster))
self.ring = r

try:
Expand Down
20 changes: 20 additions & 0 deletions carbonate/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,15 @@ def destinations(self, cluster='main'):
destinations = self.config.get(cluster, 'destinations')
return destinations.replace(' ', '').split(',')

def relay_method(self, cluster='main'):
"""Return the carbon relay method for a cluster."""
if not self.config.has_section(cluster):
raise SystemExit("Cluster '%s' not defined in %s"
% (cluster, self.config_file))
if self.config.has_option(cluster, 'relay_method'):
return self.config.get(cluster, 'relay_method')
return 'consistent-hashing'

def replication_factor(self, cluster='main'):
"""Return the replication factor for a cluster as an integer."""
if not self.config.has_section(cluster):
Expand Down Expand Up @@ -66,3 +75,14 @@ def hashing_type(self, cluster='main'):
return self.config.get(cluster, 'hashing_type')
except NoOptionError:
return hashing_type

def diverse_replicas(self, cluster='main'):
"""DIVERSE REPLICAS parameter of cluster."""
if not self.config.has_section(cluster):
raise SystemExit("Cluster '%s' not defined in %s"
% (cluster, self.config_file))
diverse_replicas = True
try:
return self.config.get(cluster, 'diverse_replicas')
except NoOptionError:
return diverse_replicas
4 changes: 2 additions & 2 deletions carbonate/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ def sync_from_remote(sync_file, remote, staging, rsync_options):
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)

for l in iter(proc.stdout.readline, b''):
sys.stdout.write(l.decode("utf-8"))
for line in iter(proc.stdout.readline, b''):
sys.stdout.write(line.decode("utf-8"))
sys.stdout.flush()
except subprocess.CalledProcessError as e:
logging.warn("Failed to sync from %s! %s" % (remote, e))
Expand Down
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
carbon
whisper
carbon>=1.0.2
whisper>=0.9.10
scandir
3 changes: 3 additions & 0 deletions tests/conf/aggregation-rules.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
<env>.applications.<app>.all.requests (60) = sum <env>.applications.<app>.*.requests
<env>.applications.<app>.all.latency (60) = avg <env>.applications.<app>.*.latency
<env>.a.<app> (60) = avg <env>.a.*
5 changes: 5 additions & 0 deletions tests/conf/realistic.conf
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ DESTINATIONS = 192.168.1.8:2004:0
REPLICATION_FACTOR = 1
SSH_USER = carbonate

[agg]
DESTINATIONS = 1.1.1.1:2003:0, 2.2.2.2:2003:0
REPLICATION_FACTOR = 1
RELAY_METHOD = aggregated-consistent-hashing

[fnv]
DESTINATIONS = 192.168.9.13:2124:0, 192.168.9.15:2124:0
REPLICATION_FACTOR = 2
Expand Down
4 changes: 0 additions & 4 deletions tests/old_whisper_requirements.txt

This file was deleted.

7 changes: 5 additions & 2 deletions tests/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ nose
mock
coverage
nosexcover
flake8<3.7
flake8
mccabe==0.6.1
tox
pep8
pyflakes>=2.0.0,<2.1.0
pyflakes
Twisted<=20.3.0
six
pycodestyle
65 changes: 65 additions & 0 deletions tests/test_agg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import unittest

import carbonate.sieve
from carbonate.lookup import lookup

from carbonate.cluster import Cluster
from carbonate.config import Config


class AggLookupSieveTest(unittest.TestCase):

def setUp(self):
config_file = "tests/conf/realistic.conf"
agg_rules_file = "test/conf/aggregation-rules.conf"
config = Config(config_file)
self.cluster = Cluster(config, cluster='agg', aggregation_rules=agg_rules_file)

# testing all aggregation hash tests in one go,
# otherwise it clashes on reading rules file in twisted
def test_sieve_lookup_agg(self):
inputs = ['metric.a.100',
'metric.a.101',
'metric.a.102',
'metric.a.103',
'metric.a.104',
'metric.a.105',
'metric.a.106',
'metric.a.107',
'metric.a.108',
'metric.a.109']

node = '1.1.1.1'
node_long = '1.1.1.1:2003:0'
output = ['metric.a.100',
'metric.a.104',
'metric.a.105',
'metric.a.106',
'metric.a.107']

node2 = '2.2.2.2'
node2_long = '2.2.2.2:2003:0'
output2 = ['metric.a.101',
'metric.a.102',
'metric.a.103',
'metric.a.108',
'metric.a.109']

f = list(carbonate.sieve.filterMetrics(inputs, node, self.cluster))
self.assertEqual(f, output)
f = list(carbonate.sieve.filterMetrics(inputs, node_long, self.cluster))
self.assertEqual(f, output)

f = list(carbonate.sieve.filterMetrics(inputs, node2, self.cluster))
self.assertEqual(f, output2)
f = list(carbonate.sieve.filterMetrics(inputs, node2_long, self.cluster))
self.assertEqual(f, output2)

f = list(carbonate.sieve.filterMetrics(inputs, node, self.cluster, True))
self.assertEqual(f, output2)

f = list(carbonate.sieve.filterMetrics(inputs, node2, self.cluster, True))
self.assertEqual(f, output)

f = lookup('metric.a.one', self.cluster)
self.assertEqual(f, ['2.2.2.2:2003:0'])
2 changes: 1 addition & 1 deletion tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def test_config_destinations(self):
def test_config_multiple_clusters(self):
c = config.Config(self.real_config)

expected = ['main', 'standalone', 'fnv']
expected = ['main', 'standalone', 'agg', 'fnv']
self.assertEqual(set(c.clusters()), set(expected))

def test_config_ssh_user(self):
Expand Down
8 changes: 2 additions & 6 deletions tox.ini
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
[tox]
envlist = py27,py27-pre0_9_10,lint,py3
envlist = py27,py3,pypy,lint

[testenv]
install_command = pip install --install-option='--install-scripts={envbindir}' --install-option='--install-lib={envsitepackagesdir}' --install-option='--install-data={envdir}/lib/graphite' -r{toxinidir}/requirements.txt -r{toxinidir}/tests/requirements.txt --pre {opts} {packages}
install_command = pip install --prefix={envdir} -r{toxinidir}/requirements.txt -r{toxinidir}/tests/requirements.txt --pre {opts} {packages}
commands = coverage erase
coverage run {envbindir}/nosetests
coverage report --include=carbonate* --omit=*test*

[testenv:py27-pre0_9_10]
install_command = pip install --install-option='--install-scripts={envbindir}' --install-option='--install-lib={envsitepackagesdir}' --install-option='--install-data={envdir}/lib/graphite' -r{toxinidir}/tests/old_whisper_requirements.txt -r{toxinidir}/tests/requirements.txt --pre {opts} {packages}
basepython = python2.7

[testenv:lint]
basepython = python
commands = flake8 carbonate

0 comments on commit aa54875

Please sign in to comment.