Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 New destination: Redis #7653

Merged
merged 15 commits into from
Nov 20, 2021
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"destinationDefinitionId": "d4d3fef9-e319-45c2-881a-bd02ce44cc9f",
"name": "Redis",
"dockerRepository": "airbyte/destination-redis",
"dockerImageTag": "0.1.0",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/redis"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
*
!Dockerfile
!build
11 changes: 11 additions & 0 deletions airbyte-integrations/connectors/destination-redis/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
FROM airbyte/integration-base-java:dev

WORKDIR /airbyte
ENV APPLICATION destination-redis

COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.0
LABEL io.airbyte.name=airbyte/destination-redis
68 changes: 68 additions & 0 deletions airbyte-integrations/connectors/destination-redis/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# Destination Redis

This is the repository for the Redis destination connector in Java.
For information about how to use this connector within Airbyte, see [the User Documentation](https://docs.airbyte.io/integrations/destinations/redis).

## Local development

#### Building via Gradle
From the Airbyte repository root, run:
```
./gradlew :airbyte-integrations:connectors:destination-redis:build
```

#### Create credentials
**If you are a community contributor**, generate the necessary credentials and place them in `secrets/config.json` conforming to the spec file in `src/main/resources/spec.json`.
Note that the `secrets` directory is git-ignored by default, so there is no danger of accidentally checking in sensitive information.

**If you are an Airbyte core member**, follow the [instructions](https://docs.airbyte.io/connector-development#using-credentials-in-ci) to set up the credentials.

### Locally running the connector docker image

#### Build
Build the connector image via Gradle:
```
./gradlew :airbyte-integrations:connectors:destination-redis:airbyteDocker
```
When building via Gradle, the docker image name and tag, respectively, are the values of the `io.airbyte.name` and `io.airbyte.version` `LABEL`s in
the Dockerfile.

#### Run
Then run any of the connector commands as follows:
```
docker run --rm airbyte/destination-redis:dev spec
docker run --rm -v $(pwd)/secrets:/secrets airbyte/destination-redis:dev check --config /secrets/config.json
docker run --rm -v $(pwd)/secrets:/secrets airbyte/destination-redis:dev discover --config /secrets/config.json
docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/destination-redis:dev read --config /secrets/config.json --catalog /integration_tests/configured_catalog.json
```

## Testing
We use `JUnit` for Java tests.

### Unit and Integration Tests
Place unit tests under `src/test/io/airbyte/integrations/destinations/redis`.

#### Acceptance Tests
Airbyte has a standard test suite that all destination connectors must pass. Implement the `TODO`s in
`src/test-integration/java/io/airbyte/integrations/destinations/redisDestinationAcceptanceTest.java`.

### Using gradle to run tests
All commands should be run from airbyte project root.
To run unit tests:
```
./gradlew :airbyte-integrations:connectors:destination-redis:unitTest
```
To run acceptance and custom integration tests:
```
./gradlew :airbyte-integrations:connectors:destination-redis:integrationTest
```

## Dependency Management

### Publishing a new version of the connector
You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what?
1. Make sure your changes are passing unit and integration tests.
1. Bump the connector version in `Dockerfile` -- just increment the value of the `LABEL io.airbyte.version` appropriately (we use [SemVer](https://semver.org/)).
1. Create a Pull Request.
1. Pat yourself on the back for being an awesome contributor.
1. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master.
25 changes: 25 additions & 0 deletions airbyte-integrations/connectors/destination-redis/bootstrap.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Redis Destination

Redis is an open source (BSD licensed), in-memory data structure store, used as a database, cache, pub/sub and message broker.
Redis provides data structures such as strings, hashes, lists, sets, sorted sets with range queries, bitmaps, hyperloglogs, geospatial indexes, and streams.
Redis has built-in replication, Lua scripting, LRU eviction, transactions, and different levels of on-disk persistence.
To achieve top performance, Redis works with an in-memory dataset. Depending on your use case, you can persist your data either by periodically dumping the dataset to disk or by appending each command to a disk-based log. You can also disable persistence if you just need a feature-rich, networked, in-memory cache.
[Read more about Redis](https://redis.io/)


This connector maps an incoming Airbyte namespace and stream to a different key in the Redis data structure. The connector supports the `append` sync mode by
adding keys to an existing keyset and `overwrite` by deleting the existing ones and replacing them with the new ones.

The implementation uses the [Jedis](https://github.com/redis/jedis) java client to access the Redis cache. [RedisCache](./src/main/java/io/airbyte/integrations/destination/redis/RedisCache.java) is the main entrypoint for defining operations that can be performed against Redis.
The interface allows you to implement any Redis supported data type for storing data based on your needs.
At the moment there is only one implementation [RedisHCache](./src/main/java/io/airbyte/integrations/destination/redis/RedisHCache.java) which stores the incoming messages in a Hash structure. Internally it uses a Jedis instance retrieved from the
[RedisPoolManager](./src/main/java/io/airbyte/integrations/destination/redis/RedisPoolManager.java). Retrieve records from the Redis cache are mapped to [RedisRecord](./src/main/java/io/airbyte/integrations/destination/redis/RedisRecord.java)

The [RedisMessageConsumer](./src/main/java/io/airbyte/integrations/destination/redis/RedisMessageConsumer.java)
class contains the logic for handling airbyte messages and storing them in Redis.

## Development

See the [RedisHCache](./src/main/java/io/airbyte/integrations/destination/redis/RedisHCache.java) class for an example on how to use the Jedis client for accessing the Redis cache.

If you want to learn more, read the [Jedis docs](https://github.com/redis/jedis/wiki)
35 changes: 35 additions & 0 deletions airbyte-integrations/connectors/destination-redis/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
plugins {
id 'application'
id 'airbyte-docker'
id 'airbyte-integration-test-java'
}

application {
mainClass = 'io.airbyte.integrations.destination.redis.RedisDestination'
}

def redisDriver = '3.7.0'
def assertVersion = '3.21.0'
def testContainersVersion = '1.16.2'

dependencies {
implementation project(':airbyte-config:models')
implementation project(':airbyte-protocol:models')
implementation project(':airbyte-integrations:bases:base-java')
implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)


// https://mvnrepository.com/artifact/redis.clients/jedis
implementation "redis.clients:jedis:${redisDriver}"
// https://mvnrepository.com/artifact/com.fasterxml.jackson.datatype/jackson-datatype-jsr310
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.13.0'


// https://mvnrepository.com/artifact/org.assertj/assertj-core
testImplementation "org.assertj:assertj-core:${assertVersion}"
// https://mvnrepository.com/artifact/org.testcontainers/testcontainers
testImplementation "org.testcontainers:testcontainers:${testContainersVersion}"

integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')
integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-redis')
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
version: "3.2"
services:
redis:
image: "redis:6.2.6"
command: redis-server --requirepass pw
ports:
- "6379:6379"
# uncomment if you want to mount volumes for persistence
# volumes:
# - $PWD/redis-data:/var/lib/redis
# - $PWD/redis.conf:/usr/local/etc/redis/redis.conf
environment:
- REDIS_REPLICATION_MODE=master
networks:
node_net:
ipv4_address: 172.28.1.4

# networking for the Redis container
networks:
node_net:
ipam:
driver: default
config:
- subnet: 172.28.0.0/16
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.redis;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.Closeable;
import java.time.Instant;
import java.util.List;

/**
* Interface defined to support caching in different Redis data types for different purposes.
*/
public interface RedisCache extends Closeable {

ObjectMapper objectMapper = new ObjectMapper()
.findAndRegisterModules();

/**
* Return implementation cache type.
*
*/
CacheType cacheType();

/**
* Insert data in the implementing Redis cache type.
*
* @param key to insert data in
* @param timestamp of the data
* @param data to be inserted
*/
void insert(String key, Instant timestamp, String data);

/**
* Copy data from one key to another with the option to replace.
*
* @param sourceKey key to copy data from
* @param destinationKey key to copy data to
* @param replace flag indicating whether the previous data should be deleted
*/
void copy(String sourceKey, String destinationKey, boolean replace);

/**
* Delete all data with the provided key.
*
* @param key to delete data for
*/
void delete(String key);

/**
* Retrieve all data with the provided key.
*
* @param key for which to retrieve data
* @return List of RedisRecords for the provided key.
*/
List<RedisRecord> getAll(String key);

/**
* Ping the Redis cache for a healthcheck status.
*
* @param message to send with the healthcheck
*/
void ping(String message);

/**
* Remove all keys with the related data from all existing databases
*/
void flushAll();

/**
* Close all underlying resource for the Redis cache.
*/
@Override
void close();

enum CacheType {
HASH
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.redis;

public class RedisCacheFactory {

private RedisCacheFactory() {

}

static RedisCache newInstance(RedisConfig redisConfig) {
return switch (redisConfig.getCacheType()) {
case HASH -> new RedisHCache(redisConfig);
};
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.redis;

import com.fasterxml.jackson.databind.JsonNode;

class RedisConfig {

private final String host;

private final int port;

private final String username;

private final String password;

private final RedisCache.CacheType cacheType;

public RedisConfig(JsonNode jsonNode) {
this.host = jsonNode.get("host").asText();
this.port = jsonNode.get("port").asInt(6379);
this.username = jsonNode.get("username").asText();
this.password = jsonNode.get("password").asText();
var type = jsonNode.get("cache_type").asText();
this.cacheType = RedisCache.CacheType.valueOf(type.toUpperCase());
}

public RedisConfig(String host, int port, String username, String password, RedisCache.CacheType cacheType) {
this.host = host;
this.port = port;
this.username = username;
this.password = password;
this.cacheType = cacheType;
}

public String getHost() {
return host;
}

public int getPort() {
return port;
}

public String getUsername() {
return username;
}

public String getPassword() {
return password;
}

public RedisCache.CacheType getCacheType() {
return cacheType;
}

@Override
public String toString() {
return "RedisConfig{" +
"host='" + host + '\'' +
", port=" + port +
", username='" + username + '\'' +
", password='" + password + '\'' +
", cacheType=" + cacheType +
'}';
}

}
Loading