This repository has been archived by the owner on Jan 8, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 73
/
KafkaSpout.java
341 lines (306 loc) · 14 KB
/
KafkaSpout.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
/**
* Copyright 2013 Netherlands Forensic Institute
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package nl.minvenj.nfi.storm.kafka;
import static java.nio.ByteBuffer.wrap;
import static nl.minvenj.nfi.storm.kafka.util.ConfigUtils.CONFIG_FAIL_HANDLER;
import static nl.minvenj.nfi.storm.kafka.util.ConfigUtils.DEFAULT_FAIL_HANDLER;
import static nl.minvenj.nfi.storm.kafka.util.ConfigUtils.createFailHandlerFromString;
import static nl.minvenj.nfi.storm.kafka.util.ConfigUtils.createKafkaConfig;
import static nl.minvenj.nfi.storm.kafka.util.ConfigUtils.getMaxBufSize;
import static nl.minvenj.nfi.storm.kafka.util.ConfigUtils.getTopic;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Queue;
import java.util.SortedMap;
import java.util.TreeMap;
import org.apache.storm.spout.RawScheme;
import org.apache.storm.spout.Scheme;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.ConsumerTimeoutException;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.InvalidMessageException;
import kafka.message.MessageAndMetadata;
import nl.minvenj.nfi.storm.kafka.fail.FailHandler;
import nl.minvenj.nfi.storm.kafka.util.ConfigUtils;
import nl.minvenj.nfi.storm.kafka.util.KafkaMessageId;
/**
* Storm spout reading messages from kafka, emitting them as single field tuples.
*
* Implementation tracks a queue of message ids (partition and offset) and a set of those ids that are pending to be
* acknowledged by the topology. The buffer will only be populated with new message when *all* messages from the buffer
* have been acknowledged because the {@link ConsumerConnector} allows committing of the currently processed offset only
* through {@link kafka.javaapi.consumer.ConsumerConnector#commitOffsets()}, which commits *all* offsets that have been
* read, which does not necessarily correspond to the offsets that were successfully processed by the storm topology.
* Optimizing this behaviour is work for the (near) future.
*
* Aside from the properties used to configure the kafka consumer, the kafka spout reads the following configuration
* parameters in storm configuration:
* <ul>
* <li>{@code kafka.spout.topic}: the kafka topic to read messages from (default {@code storm});</li>
* <li>{@code kafka.spout.fail.handler}: the policy to be used when messages fail, whether to replay them, default
* {@code "reliable"} (either {@code "reliable"}, {@code "unreliable"} or a fully qualified class name of an
* implementation of {@link FailHandler});</li>
* <li>{@code kafka.spout.consumer.group}: The kafka consumer group id.</li>
* <li>{@code kafka.spout.buffer.size.max}: The maximum number of kafka messages to buffer.</li>
* </ul>
*
* @author Netherlands Forensics Institute
*/
public class KafkaSpout implements IRichSpout {
private static final long serialVersionUID = -1L;
private static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class);
protected final Scheme _serializationScheme;
/**
* Collection of messages being processed by the topology (either waiting to be emitted or waiting to be
* acknowledged). Processed message offset is committed when this is becomes empty.
*
* @see #fillBuffer()
*/
protected final SortedMap<KafkaMessageId, byte[]> _inProgress = new TreeMap<KafkaMessageId, byte[]>();
/**
* Queue of messages waiting to be emitted by this spout.
*
* @see #fillBuffer()
*/
protected final Queue<KafkaMessageId> _queue = new LinkedList<KafkaMessageId>();
protected String _topic;
protected int _bufSize;
protected FailHandler _failHandler;
protected ConsumerIterator<byte[], byte[]> _iterator;
protected transient SpoutOutputCollector _collector;
protected transient ConsumerConnector _consumer;
/**
* Creates a new kafka spout to be submitted in a storm topology. Configuration is read from storm config when the
* spout is opened. Uses a {@link RawScheme} to serialize messages from kafka as a single {@code byte[]}.
*/
public KafkaSpout() {
_serializationScheme = new RawScheme();
}
/**
* Creates a new kafka spout to be submitted in a storm topology with the provided {@link Scheme}. This impacts
* output fields, see {@link #declareOutputFields(OutputFieldsDeclarer)}). Configuration is read from storm config
* when the spout is opened.
*
* @param serializationScheme The serialization to apply to messages read from kafka.
*/
public KafkaSpout(final Scheme serializationScheme) {
_serializationScheme = serializationScheme;
}
/**
* Creates a new kafka spout to be submitted in a storm topology. Configuration is read from storm config when the
* spout is opened.
*
* @param topic The kafka topic to read messages from.
*/
public KafkaSpout(final String topic) {
this();
_topic = topic;
}
/**
* Creates a new kafka spout to be submitted in a storm topology with the provided {@link Scheme}. This impacts
* output fields, see {@link #declareOutputFields(OutputFieldsDeclarer)}). Configuration is read from storm config
* when the spout is opened.
*
* @param topic The kafka topic to read messages from.
* @param serializationScheme The serialization to apply to messages read from kafka.
*/
public KafkaSpout(final String topic, final Scheme serializationScheme) {
this(serializationScheme);
_topic = topic;
}
/**
* Convenience method assigning a {@link FailHandler} instance to this kafka spout. If the configured value is
* {@code null}, {@link ConfigUtils#DEFAULT_FAIL_HANDLER} will be used, otherwise the creation is delegated to
* {@link ConfigUtils#createFailHandlerFromString(String)}.
*
* @param failHandler The configuration value for the failure policy.
*/
protected void createFailHandler(final String failHandler) {
if (failHandler == null) {
_failHandler = DEFAULT_FAIL_HANDLER;
}
else {
_failHandler = createFailHandlerFromString(failHandler);
}
}
/**
* Ensures an initialized kafka {@link ConsumerConnector} is present.
*
* @param config The storm configuration passed to {@link #open(Map, TopologyContext, SpoutOutputCollector)}.
* @throws IllegalArgumentException When a required configuration parameter is missing or a sanity check fails.
*/
protected void createConsumer(final Map<String, Object> config) {
final Properties consumerConfig = createKafkaConfig(config);
LOG.info("connecting kafka client to zookeeper at {} as client group {}",
consumerConfig.getProperty("zookeeper.connect"),
consumerConfig.getProperty("group.id"));
_consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerConfig));
}
/**
* Refills the buffer with messages from the configured kafka topic if available.
*
* @return Whether the buffer contains messages to be emitted after this call.
* @throws IllegalStateException When current buffer is not empty or messages not acknowledged by topology.
*/
protected boolean fillBuffer() {
if (!_inProgress.isEmpty() || !_queue.isEmpty()) {
throw new IllegalStateException("cannot fill buffer when buffer or pending messages are non-empty");
}
if (_iterator == null) {
// create a stream of messages from _consumer using the streams as defined on construction
final Map<String, List<KafkaStream<byte[], byte[]>>> streams = _consumer.createMessageStreams(Collections.singletonMap(_topic, 1));
_iterator = streams.get(_topic).get(0).iterator();
}
// We'll iterate the stream in a try-clause; kafka stream will poll its client channel for the next message,
// throwing a ConsumerTimeoutException when the configured timeout is exceeded.
try {
int size = 0;
while (size < _bufSize && _iterator.hasNext()) {
final MessageAndMetadata<byte[], byte[]> message = _iterator.next();
final KafkaMessageId id = new KafkaMessageId(message.partition(), message.offset());
_inProgress.put(id, message.message());
size++;
}
}
catch (final InvalidMessageException e) {
LOG.warn(e.getMessage(), e);
}
catch (final ConsumerTimeoutException e) {
// ignore, storm will call nextTuple again at some point in the near future
// timeout does *not* mean that no messages were read (state is checked below)
}
if (_inProgress.size() > 0) {
// set _queue to all currently pending kafka message ids
_queue.addAll(_inProgress.keySet());
LOG.debug("buffer now has {} messages to be emitted", _queue.size());
// message(s) appended to buffer
return true;
}
else {
// no messages appended to buffer
return false;
}
}
@Override
public void declareOutputFields(final OutputFieldsDeclarer declarer) {
// delegate fields mapping to specified scheme (single field "bytes" by default)
declarer.declare(_serializationScheme.getOutputFields());
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
@Override
public void open(final Map config, final TopologyContext topology, final SpoutOutputCollector collector) {
_collector = collector;
if (_topic == null) {
_topic = getTopic((Map<String, Object>) config);
}
_bufSize = getMaxBufSize((Map<String, Object>) config);
createFailHandler((String) config.get(CONFIG_FAIL_HANDLER));
// ensure availability of kafka consumer
createConsumer((Map<String, Object>) config);
// inform the failure policy of spout being opened
_failHandler.open(config, topology, collector);
LOG.info("kafka spout opened, reading from topic {}, using failure policy {}", _topic, _failHandler.getIdentifier());
}
@Override
public void close() {
// reset state by setting members to null
_collector = null;
_iterator = null;
if (_consumer != null) {
try {
_consumer.shutdown();
}
finally {
_consumer = null;
}
}
_failHandler.close();
}
@Override
public void activate() {
_failHandler.activate();
}
@Override
public void deactivate() {
_failHandler.deactivate();
}
@Override
public void nextTuple() {
// next tuple available when _queue contains ids or fillBuffer() is allowed and indicates more messages were available
// see class documentation for implementation note on the rationale behind this condition
if (!_queue.isEmpty() || (_inProgress.isEmpty() && fillBuffer())) {
final KafkaMessageId nextId = _queue.poll();
if (nextId != null) {
final byte[] message = _inProgress.get(nextId);
// the next id from buffer should correspond to a message in the pending map
if (message == null) {
throw new IllegalStateException("no pending message for next id " + nextId);
}
// use specified scheme to deserialize messages (single-field Values by default)
_collector.emit(_serializationScheme.deserialize(wrap(message)), nextId);
LOG.debug("emitted kafka message id {} ({} bytes payload)", nextId, message.length);
}
}
}
@Override
public void ack(final Object o) {
if (o instanceof KafkaMessageId) {
final KafkaMessageId id = (KafkaMessageId) o;
// message corresponding to o is no longer pending
_inProgress.remove(id);
LOG.debug("kafka message {} acknowledged", id);
if (_inProgress.isEmpty()) {
// commit offsets to zookeeper when pending is now empty
// (buffer will be filled on next call to nextTuple())
LOG.debug("all pending messages acknowledged, committing client offsets");
_consumer.commitOffsets();
}
// notify fail handler of tuple success
_failHandler.ack(id);
}
}
@Override
public void fail(final Object o) {
if (o instanceof KafkaMessageId) {
final KafkaMessageId id = (KafkaMessageId) o;
// delegate decision of replaying the message to failure policy
if (_failHandler.shouldReplay(id)) {
LOG.debug("kafka message id {} failed in topology, adding to buffer again", id);
_queue.add(id);
}
else {
LOG.debug("kafka message id {} failed in topology, delegating failure to policy", id);
// remove message from pending; _failHandler will take action if needed
_failHandler.fail(id, _inProgress.remove(id));
}
}
}
}