Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance patches for graylog speeding it from 4-5k up to 50k messages from single instance #110

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions jar-with-dependencies.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
<!-- TODO: a jarjar format would be better -->
<id>jar-with-dependencies</id>
<formats>
<format>jar</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<dependencySets>
<dependencySet>
<outputDirectory>/</outputDirectory>
<useProjectArtifact>true</useProjectArtifact>
<unpack>true</unpack>
<scope>runtime</scope>
</dependencySet>
<dependencySet>
<outputDirectory>/</outputDirectory>
<useProjectArtifact>true</useProjectArtifact>
<unpack>true</unpack>
<scope>system</scope>
</dependencySet>
</dependencySets>
</assembly>
Binary file removed lib/rabbitmq-client.jar
Binary file not shown.
Binary file added lib/syslog4j-0.9.47-ODKL.jar
Binary file not shown.
17 changes: 15 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,18 @@
<artifactId>syslog4j-graylog2</artifactId>
<version>0.9.47-graylog2</version>
</dependency>
<dependency>
<groupId>shit</groupId>
<artifactId>syslog4j</artifactId>
<version>0.9.47-ODKL</version>
<scope>system</scope>
<systemPath>${basedir}/lib/syslog4j-0.9.47-ODKL.jar</systemPath>
</dependency>
<dependency>
<groupId>com.github.stephenc.high-scale-lib</groupId>
<artifactId>high-scale-lib</artifactId>
<version>1.1.4</version>
</dependency>
</dependencies>

<build>
Expand Down Expand Up @@ -214,9 +226,10 @@
<mainClass>org.graylog2.Main</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptor>jar-with-dependencies.xml</descriptor>
<!-- descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</descriptorRefs -->
</configuration>
</plugin>
</plugins>
Expand Down
6 changes: 4 additions & 2 deletions src/main/java/org/graylog2/Configuration.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,11 @@
import java.util.Map;
import org.graylog2.inputs.amqp.AMQPInput;
import org.graylog2.inputs.gelf.GELFTCPInput;
import org.graylog2.inputs.gelf.GELFUDPBlockingInput;
import org.graylog2.inputs.gelf.GELFUDPInput;
import org.graylog2.inputs.http.GELFHttpInput;
import org.graylog2.inputs.syslog.SyslogTCPInput;
import org.graylog2.inputs.syslog.SyslogUDPBlockingInput;
import org.graylog2.inputs.syslog.SyslogUDPInput;

/**
Expand Down Expand Up @@ -648,11 +650,11 @@ public Map<String, String> getJabberTransportConfiguration() {
}

public Map<String, String> getInputConfig(Class input) {
if (input.equals(GELFTCPInput.class) || input.equals(GELFUDPInput.class)) {
if (input.equals(GELFTCPInput.class) || input.equals(GELFUDPInput.class) || input.equals(GELFUDPBlockingInput.class)) {
return getGELFInputConfig();
}

if (input.equals(SyslogTCPInput.class) || input.equals(SyslogUDPInput.class)) {
if (input.equals(SyslogTCPInput.class) || input.equals(SyslogUDPInput.class)|| input.equals(SyslogUDPBlockingInput.class)) {
return getSyslogInputConfig();
}

Expand Down
53 changes: 50 additions & 3 deletions src/main/java/org/graylog2/Core.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import org.graylog2.plugin.Tools;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.slf4j.Logger;
Expand All @@ -40,6 +41,9 @@

import com.google.common.collect.Lists;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import com.google.common.collect.Maps;
import java.util.Map;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
Expand Down Expand Up @@ -121,7 +125,11 @@ public class Core implements GraylogServer {

private boolean localMode = false;
private boolean statsMode = false;


private AtomicReference<Set<Stream>> enabledStreamsCache = new AtomicReference<Set<Stream>>();
private AtomicReference<Map<String,Stream>> enabledStreamsMap = new AtomicReference<Map<String,Stream>>();
private AtomicLong enabledStreamsUpdated = new AtomicLong();

public void initialize(Configuration configuration) {
serverId = Tools.generateServerId();

Expand Down Expand Up @@ -495,12 +503,51 @@ public boolean isStatsMode() {
*/
@Override
public Map<String, Stream> getEnabledStreams() {
Map<String, Stream> map = enabledStreamsMap.get();
if (map==null)
{
return buildEnabledStreamsMap(getEnabledStreamsSet());
}

return map;
}

private Map<String, Stream> buildEnabledStreamsMap(Set<Stream> enabled)
{
Map<String, Stream> streams = Maps.newHashMap();
for (Stream stream : StreamImpl.fetchAllEnabled(this)) {
for (Stream stream : enabled) {
streams.put(stream.getId().toString(), stream);
}

return streams;
}


/**
cache the list of streams is not being changed for every log message (as well as its disabled state)
*/
public Set<Stream> getEnabledStreamsSet()
{
Set<Stream> set = enabledStreamsCache.get();


long now = System.currentTimeMillis();
long l = enabledStreamsUpdated.get();
if ( now-l < 5000 ) {
return set !=null ? set : StreamImpl.fetchAllEnabled(this);
}

// need to refresh
if ( enabledStreamsUpdated.compareAndSet(l, now) ) {
set=StreamImpl.fetchAllEnabled(this);
enabledStreamsCache.set(set);
enabledStreamsMap.set( buildEnabledStreamsMap(set) );
}

if (set == null )
{
set = StreamImpl.fetchAllEnabled(this);
}

return set;
}
}
6 changes: 4 additions & 2 deletions src/main/java/org/graylog2/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,11 @@
import org.graylog2.initializers.*;
import org.graylog2.inputs.amqp.AMQPInput;
import org.graylog2.inputs.gelf.GELFTCPInput;
import org.graylog2.inputs.gelf.GELFUDPBlockingInput;
import org.graylog2.inputs.gelf.GELFUDPInput;
import org.graylog2.inputs.http.GELFHttpInput;
import org.graylog2.inputs.syslog.SyslogTCPInput;
import org.graylog2.inputs.syslog.SyslogUDPBlockingInput;
import org.graylog2.inputs.syslog.SyslogUDPInput;
import org.graylog2.outputs.ElasticSearchOutput;

Expand Down Expand Up @@ -191,11 +193,11 @@ public static void main(String[] args) {

// Register inputs.
if (configuration.isUseGELF()) {
server.registerInput(new GELFUDPInput());
server.registerInput(new GELFUDPBlockingInput());
server.registerInput(new GELFTCPInput());
}

if (configuration.isSyslogUdpEnabled()) { server.registerInput(new SyslogUDPInput()); }
if (configuration.isSyslogUdpEnabled()) { server.registerInput(new SyslogUDPBlockingInput()); }
if (configuration.isSyslogTcpEnabled()) { server.registerInput(new SyslogTCPInput()); }

if (configuration.isAmqpEnabled()) { server.registerInput(new AMQPInput()); }
Expand Down
92 changes: 58 additions & 34 deletions src/main/java/org/graylog2/MessageCounterImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,14 @@

import org.graylog2.plugin.Tools;
import org.bson.types.ObjectId;
import org.cliffc.high_scale_lib.Counter;
import org.cliffc.high_scale_lib.NonBlockingHashMap;

import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicInteger;

import com.google.common.collect.Maps;
import org.graylog2.plugin.MessageCounter;

Expand All @@ -34,31 +41,44 @@
*/
public final class MessageCounterImpl implements MessageCounter {

private int total;
private final Map<String, Integer> streams = Maps.newConcurrentMap();
private final Map<String, Integer> hosts = Maps.newConcurrentMap();
private Counter total = new Counter();
private NonBlockingHashMap<String, Counter> streams =new NonBlockingHashMap<String, Counter>();
private NonBlockingHashMap<String, Counter> hosts = new NonBlockingHashMap<String, Counter>();

private int throughput = 0;
private int highestThroughput = 0;
private AtomicInteger throughput = new AtomicInteger();
private long highestThroughput = 0;

public int getTotalCount() {
return this.total;
return (int) this.total.get();
}

public Map<String, Integer> getStreamCounts() {
return this.streams;

HashMap<String, Integer> r = new HashMap<String, Integer>(this.streams.size());

for (Entry<String, Counter> entry : this.streams.entrySet()) {
r.put(entry.getKey(),(int) entry.getValue().get());
}

return r;
}

public Map<String, Integer> getHostCounts() {
return this.hosts;
HashMap<String, Integer> r = new HashMap<String, Integer>(this.hosts.size());

for (Entry<String, Counter> entry : this.hosts.entrySet()) {
r.put(entry.getKey(),(int) entry.getValue().get());
}

return r;
}

public int getThroughput() {
return this.throughput;
return (int) this.throughput.get();
}

public int getHighestThroughput() {
return this.highestThroughput;
return (int) this.highestThroughput;
}

public void resetAllCounts() {
Expand All @@ -68,33 +88,33 @@ public void resetAllCounts() {
}

public void resetHostCounts() {
this.hosts.clear();
this.hosts = new NonBlockingHashMap<String, Counter>(this.hosts.size());
}

public void resetStreamCounts() {
this.streams.clear();
this.streams = new NonBlockingHashMap<String, Counter>(this.streams.size());
}

public void resetTotal() {
this.total = 0;
this.total.set(0);
}

public void resetThroughput() {
this.throughput = 0;
this.throughput.set( 0 );
}

/**
* Increment total count by 1.
*/
public void incrementTotal() {
this.countUpTotal(1);
this.total.increment();
}

/**
* Increment five second throughput by 1.
*/
public void incrementThroughput() {
this.countUpThroughput(1);
countUpThroughput(1);
}

/**
Expand All @@ -103,7 +123,7 @@ public void incrementThroughput() {
* @param x The value to add on top of current total count.
*/
public void countUpTotal(final int x) {
this.total += x;
this.total.add( x );
}

/**
Expand All @@ -113,10 +133,10 @@ public void countUpTotal(final int x) {
* @param x The value to add on top of five second throuput.
*/
public void countUpThroughput(final int x) {
this.throughput += x;
int t = this.throughput.addAndGet(x);

if (this.throughput > this.highestThroughput) {
this.highestThroughput = this.throughput;
if (t > this.highestThroughput) {
this.highestThroughput = t;
}
}

Expand All @@ -135,14 +155,17 @@ public void incrementStream(final ObjectId streamId) {
* @param streamId The ID of the stream which count to increment.
* @param x The value to add on top of the current stream count.
*/
public synchronized void countUpStream(final ObjectId streamId, final int x) {
if (this.streams.containsKey(streamId.toString())) {
// There already is an entry. Increment.
final int oldCount = this.streams.get(streamId.toString());
this.streams.put(streamId.toString(), oldCount+x); // Overwrites old entry.
public void countUpStream(final ObjectId streamId, final int x) {
Counter c = this.streams.get(streamId);
if (c != null) {
c.add(x);
} else {
// First entry for this stream.
this.streams.put(streamId.toString(), x);
c = new Counter();
Counter c1 = this.streams.putIfAbsent(streamId.toString(), c );

if ( c1 != null ) c= c1;

c.add(x);
}
}

Expand All @@ -161,15 +184,16 @@ public void incrementHost(final String hostname) {
* @param hostname The name of the host which count to increment.
* @param x The value to add on top of the current host count.
*/
public synchronized void countUpHost(String hostname, final int x) {
public void countUpHost(String hostname, final int x) {
hostname = Tools.encodeBase64(hostname);
if (this.hosts.containsKey(hostname)) {
// There already is an entry. Increment.
final int oldCount = this.hosts.get(hostname);
this.hosts.put(hostname, oldCount+x); // Overwrites old entry.
Counter c = this.hosts.get( hostname );
if (c != null) {
c.add(x);
} else {
// First entry for this stream.
this.hosts.put(hostname, x);
c = new Counter();
Counter c1 = this.hosts.putIfAbsent(hostname, c);
if (c1 !=null ) c = c1;
c.add(x);
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/graylog2/buffers/LogMessageEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class LogMessageEvent {

private LogMessage msg;

public LogMessage getMessage()
public LogMessage getAndResetMessage()
{
return msg;
}
Expand Down
Loading