Skip to content

Commit

Permalink
Updated Schemas to match libp v5.0.0 (#5)
Browse files Browse the repository at this point in the history
  • Loading branch information
slominskir authored Aug 27, 2024
1 parent 42bea5c commit ebfca09
Show file tree
Hide file tree
Showing 12 changed files with 83 additions and 71 deletions.
2 changes: 1 addition & 1 deletion deps.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ services:
- SCHEMA_REGISTRY_KAFKA_BROKERS=PLAINTEXT://kafka:9092

jaws-libp:
image: jeffersonlab/jaws-libp:4.9.2
image: jeffersonlab/jaws-libp:5.0.0
tty: true
stdin_open: true
hostname: jaws-libp
Expand Down
43 changes: 22 additions & 21 deletions src/integration/java/org/jlab/jaws/clients/ClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,20 @@ public void setup(){


@Test
public void categoryTest() throws InterruptedException, ExecutionException, TimeoutException {
LinkedHashMap<String, EventSourceRecord<String, AlarmCategory>> results = new LinkedHashMap<>();
public void systemTest() throws InterruptedException, ExecutionException, TimeoutException {
LinkedHashMap<String, EventSourceRecord<String, AlarmSystem>> results = new LinkedHashMap<>();

try(CategoryConsumer consumer = new CategoryConsumer(clientOverrides)) {
try(SystemConsumer consumer = new SystemConsumer(clientOverrides)) {
consumer.addListener(new EventSourceListener<>() {
@Override
public void highWaterOffset(LinkedHashMap<String, EventSourceRecord<String, AlarmCategory>> records) {
public void highWaterOffset(LinkedHashMap<String, EventSourceRecord<String, AlarmSystem>> records) {
results.putAll(records);
}
});

AlarmCategory expected = new AlarmCategory("team1");
AlarmSystem expected = new AlarmSystem("team1");

try(CategoryProducer producer = new CategoryProducer(clientOverrides)) {
try(SystemProducer producer = new SystemProducer(clientOverrides)) {
Future<RecordMetadata> future = producer.send("TESTING", expected);

// Block until sent or an exception is thrown
Expand All @@ -75,7 +75,7 @@ public void highWaterOffset(LinkedHashMap<String, EventSourceRecord<String, Alar
Assert.assertEquals(expected, results.values().iterator().next().getValue());
} finally {
// Cleanup
try(CategoryProducer producer = new CategoryProducer(clientOverrides)) {
try(SystemProducer producer = new SystemProducer(clientOverrides)) {
Future<RecordMetadata> future = producer.send("TESTING", null);

// Block until sent or an exception is thrown
Expand All @@ -85,18 +85,18 @@ public void highWaterOffset(LinkedHashMap<String, EventSourceRecord<String, Alar
}

@Test
public void classTest() throws InterruptedException, ExecutionException, TimeoutException {
LinkedHashMap<String, EventSourceRecord<String, AlarmClass>> results = new LinkedHashMap<>();
public void actionTest() throws InterruptedException, ExecutionException, TimeoutException {
LinkedHashMap<String, EventSourceRecord<String, AlarmAction>> results = new LinkedHashMap<>();

try(ClassConsumer consumer = new ClassConsumer(clientOverrides)) {
try(ActionConsumer consumer = new ActionConsumer(clientOverrides)) {
consumer.addListener(new EventSourceListener<>() {
@Override
public void highWaterOffset(LinkedHashMap<String, EventSourceRecord<String, AlarmClass>> records) {
public void highWaterOffset(LinkedHashMap<String, EventSourceRecord<String, AlarmAction>> records) {
results.putAll(records);
}
});

AlarmClass expected = new AlarmClass("category",
AlarmAction expected = new AlarmAction("system",
AlarmPriority.P1_CRITICAL,
"rationale",
"correctiveaction",
Expand All @@ -105,7 +105,7 @@ public void highWaterOffset(LinkedHashMap<String, EventSourceRecord<String, Alar
null,
null);

try(ClassProducer producer = new ClassProducer(clientOverrides)) {
try(ActionProducer producer = new ActionProducer(clientOverrides)) {
Future<RecordMetadata> future = producer.send("TESTING", expected);

// Block until sent or an exception is thrown
Expand All @@ -125,7 +125,7 @@ public void highWaterOffset(LinkedHashMap<String, EventSourceRecord<String, Alar
Assert.assertEquals(expected, results.values().iterator().next().getValue());
} finally {
// Cleanup
try(ClassProducer producer = new ClassProducer(clientOverrides)) {
try(ActionProducer producer = new ActionProducer(clientOverrides)) {
Future<RecordMetadata> future = producer.send("TESTING", null);

// Block until sent or an exception is thrown
Expand All @@ -135,24 +135,25 @@ public void highWaterOffset(LinkedHashMap<String, EventSourceRecord<String, Alar
}

@Test
public void instanceTest() throws InterruptedException, ExecutionException, TimeoutException {
LinkedHashMap<String, EventSourceRecord<String, AlarmInstance>> results = new LinkedHashMap<>();
public void alarmTest() throws InterruptedException, ExecutionException, TimeoutException {
LinkedHashMap<String, EventSourceRecord<String, Alarm>> results = new LinkedHashMap<>();

try(InstanceConsumer consumer = new InstanceConsumer(clientOverrides)) {
try(AlarmConsumer consumer = new AlarmConsumer(clientOverrides)) {
consumer.addListener(new EventSourceListener<>() {
@Override
public void highWaterOffset(LinkedHashMap<String, EventSourceRecord<String, AlarmInstance>> records) {
public void highWaterOffset(LinkedHashMap<String, EventSourceRecord<String, Alarm>> records) {
results.putAll(records);
}
});

AlarmInstance expected = new AlarmInstance("class", "device",
Alarm expected = new Alarm("action", "device",
new Source(),
Arrays.asList(new String[]{"location1"}),
"managedby",
"maskedby",
"screencommand");

try(InstanceProducer producer = new InstanceProducer(clientOverrides)) {
try(AlarmProducer producer = new AlarmProducer(clientOverrides)) {
Future<RecordMetadata> future = producer.send("TESTING", expected);

// Block until sent or an exception is thrown
Expand All @@ -172,7 +173,7 @@ public void highWaterOffset(LinkedHashMap<String, EventSourceRecord<String, Alar
Assert.assertEquals(expected, results.values().iterator().next().getValue());
} finally {
// Cleanup
try(InstanceProducer producer = new InstanceProducer(clientOverrides)) {
try(AlarmProducer producer = new AlarmProducer(clientOverrides)) {
Future<RecordMetadata> future = producer.send("TESTING", null);

// Block until sent or an exception is thrown
Expand Down
19 changes: 15 additions & 4 deletions src/main/avro/AlarmInstance.avsc → src/main/avro/Alarm.avsc
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
{
"type": "record",
"name": "AlarmInstance",
"name": "Alarm",
"namespace": "org.jlab.jaws.entity",
"doc": "Instance of an alarm class",
"doc": "An alarm instance",
"fields": [
{
"name": "alarmclass",
"name": "action",
"type": {
"type": "string",
"avro.java.string": "String"
},
"doc": "The alarm class; provides inheritable shared values",
"doc": "The alarm action; provides inheritable shared class values",
"default": "base"
},
{
Expand Down Expand Up @@ -75,6 +75,17 @@
},
"doc": "The locations associated with the alarm"
},
{
"name": "managedby",
"type": [
"null",
{
"type": "string",
"avro.java.string": "String"
}
],
"doc": "Whom manages this alarm"
},
{
"name": "maskedby",
"type": [
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
{
"type": "record",
"name": "AlarmClass",
"name": "AlarmAction",
"namespace": "org.jlab.jaws.entity",
"doc": "An alarm class",
"doc": "An alarm action (class of alarm)",
"fields": [
{
"name": "category",
"name": "system",
"type": {
"type": "string",
"avro.java.string": "String"
},
"doc": "The alarm category name"
"doc": "The alarm system name"
},
{
"name": "priority",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
{
"type": "record",
"name": "AlarmCategory",
"name": "AlarmSystem",
"namespace": "org.jlab.jaws.entity",
"doc": "Value of a named category",
"doc": "Value of a named system",
"fields": [
{
"name": "team",
Expand Down
12 changes: 6 additions & 6 deletions src/main/avro/EffectiveRegistration.avsc
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,21 @@
"doc": "Processed registration (instance + class)",
"fields": [
{
"name": "instance",
"name": "alarm",
"type": [
"null",
"AlarmInstance"
"Alarm"
],
"doc": "The alarm instance",
"doc": "The alarm registration instance",
"default": null
},
{
"name": "class",
"name": "action",
"type": [
"null",
"AlarmClass"
"AlarmAction"
],
"doc": "The alarm class",
"doc": "The alarm action (class of alarm)",
"default": null
}
]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package org.jlab.jaws.clients;

import org.jlab.jaws.entity.AlarmInstance;
import org.jlab.jaws.entity.AlarmAction;
import org.jlab.kafka.eventsource.EventSourceConfig;

import java.time.Instant;
Expand All @@ -9,13 +9,13 @@
/**
* A Consumer provides default properties values for GROUP, TOPIC, KEY_DESERIALIZER, and VALUE_DESERIALIZER.
*/
public class InstanceConsumer extends JAWSConsumer<String, AlarmInstance> {
public class ActionConsumer extends JAWSConsumer<String, AlarmAction> {
/**
* Create a new Consumer with the provided property overrides.
*
* @param props The properties, which will override any defaults set by this class
*/
public InstanceConsumer(Properties props) {
public ActionConsumer(Properties props) {
super(setDefaults(props));
}

Expand All @@ -26,8 +26,8 @@ private static Properties setDefaults(Properties overrides) {
overrides = new Properties();
}

defaults.put(EventSourceConfig.GROUP_ID_CONFIG, "instance-consumer" + Instant.now().toString() + "-" + Math.random());
defaults.put(EventSourceConfig.TOPIC_CONFIG, InstanceProducer.TOPIC);
defaults.put(EventSourceConfig.GROUP_ID_CONFIG, "action-consumer" + Instant.now().toString() + "-" + Math.random());
defaults.put(EventSourceConfig.TOPIC_CONFIG, ActionProducer.TOPIC);
defaults.put(EventSourceConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
defaults.put(EventSourceConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroDeserializer");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Header;
import org.jlab.jaws.entity.AlarmInstance;
import org.jlab.jaws.entity.AlarmAction;

import java.time.Instant;
import java.util.Properties;
Expand All @@ -14,18 +14,18 @@
* A Producer provides default properties values for CLIENT_ID, TOPIC, KEY_DESERIALIZER, and VALUE_DESERIALIZER.
* A default send method is also provided.
*/
public class InstanceProducer extends JAWSProducer<String, AlarmInstance> {
public class ActionProducer extends JAWSProducer<String, AlarmAction> {
/**
* The topic name
*/
public static final String TOPIC = "alarm-instances";
public static final String TOPIC = "alarm-actions";

/**
* Create a new Producer with the provided property overrides.
*
* @param props The properties, which will override any defaults set by this class
*/
public InstanceProducer(Properties props) {
public ActionProducer(Properties props) {
super(setDefaults(props));
}

Expand All @@ -36,7 +36,7 @@ private static Properties setDefaults(Properties overrides) {
overrides = new Properties();
}

defaults.put(ProducerConfig.CLIENT_ID_CONFIG, "instance-producer" + Instant.now().toString() + "-" + Math.random());
defaults.put(ProducerConfig.CLIENT_ID_CONFIG, "action-producer" + Instant.now().toString() + "-" + Math.random());
defaults.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
defaults.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer");

Expand All @@ -52,7 +52,7 @@ private static Properties setDefaults(Properties overrides) {
* @param value The message value
* @return An asynchronous call Future reference
*/
public Future<RecordMetadata> send(String key, AlarmInstance value) {
public Future<RecordMetadata> send(String key, AlarmAction value) {

Iterable<Header> headers = getDefaultHeaders();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package org.jlab.jaws.clients;

import org.jlab.jaws.entity.AlarmClass;
import org.jlab.jaws.entity.Alarm;
import org.jlab.kafka.eventsource.EventSourceConfig;

import java.time.Instant;
Expand All @@ -9,13 +9,13 @@
/**
* A Consumer provides default properties values for GROUP, TOPIC, KEY_DESERIALIZER, and VALUE_DESERIALIZER.
*/
public class ClassConsumer extends JAWSConsumer<String, AlarmClass> {
public class AlarmConsumer extends JAWSConsumer<String, Alarm> {
/**
* Create a new Consumer with the provided property overrides.
*
* @param props The properties, which will override any defaults set by this class
*/
public ClassConsumer(Properties props) {
public AlarmConsumer(Properties props) {
super(setDefaults(props));
}

Expand All @@ -26,8 +26,8 @@ private static Properties setDefaults(Properties overrides) {
overrides = new Properties();
}

defaults.put(EventSourceConfig.GROUP_ID_CONFIG, "class-consumer" + Instant.now().toString() + "-" + Math.random());
defaults.put(EventSourceConfig.TOPIC_CONFIG, ClassProducer.TOPIC);
defaults.put(EventSourceConfig.GROUP_ID_CONFIG, "alarm-consumer" + Instant.now().toString() + "-" + Math.random());
defaults.put(EventSourceConfig.TOPIC_CONFIG, AlarmProducer.TOPIC);
defaults.put(EventSourceConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
defaults.put(EventSourceConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroDeserializer");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Header;
import org.jlab.jaws.entity.AlarmClass;
import org.jlab.jaws.entity.Alarm;

import java.time.Instant;
import java.util.Properties;
Expand All @@ -14,18 +14,18 @@
* A Producer provides default properties values for CLIENT_ID, TOPIC, KEY_DESERIALIZER, and VALUE_DESERIALIZER.
* A default send method is also provided.
*/
public class ClassProducer extends JAWSProducer<String, AlarmClass> {
public class AlarmProducer extends JAWSProducer<String, Alarm> {
/**
* The topic name
*/
public static final String TOPIC = "alarm-classes";
public static final String TOPIC = "alarms";

/**
* Create a new Producer with the provided property overrides.
*
* @param props The properties, which will override any defaults set by this class
*/
public ClassProducer(Properties props) {
public AlarmProducer(Properties props) {
super(setDefaults(props));
}

Expand All @@ -36,7 +36,7 @@ private static Properties setDefaults(Properties overrides) {
overrides = new Properties();
}

defaults.put(ProducerConfig.CLIENT_ID_CONFIG, "class-producer" + Instant.now().toString() + "-" + Math.random());
defaults.put(ProducerConfig.CLIENT_ID_CONFIG, "alarm-producer" + Instant.now().toString() + "-" + Math.random());
defaults.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
defaults.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer");

Expand All @@ -52,7 +52,7 @@ private static Properties setDefaults(Properties overrides) {
* @param value The message value
* @return An asynchronous call Future reference
*/
public Future<RecordMetadata> send(String key, AlarmClass value) {
public Future<RecordMetadata> send(String key, Alarm value) {

Iterable<Header> headers = getDefaultHeaders();

Expand Down
Loading

0 comments on commit ebfca09

Please sign in to comment.