Skip to content

Commit

Permalink
feat: Redis streams source (#628)
Browse files Browse the repository at this point in the history
Signed-off-by: Julie Vogelmani <[email protected]>
  • Loading branch information
juliev0 authored and whynowy committed Apr 3, 2023
1 parent 562f418 commit 968cc5f
Show file tree
Hide file tree
Showing 35 changed files with 2,517 additions and 653 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ jobs:
max-parallel: 7
matrix:
driver: [jetstream]
case: [e2e-suite-1, e2e-suite-2, kafka-e2e, http-e2e, nats-e2e, sdks-e2e, reduce-e2e]
case: [e2e-suite-1, e2e-suite-2, kafka-e2e, http-e2e, nats-e2e, redis-source-e2e, sdks-e2e, reduce-e2e]
steps:
- name: Checkout code
uses: actions/checkout@v3
Expand Down
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ test-e2e-suite-2:
test-kafka-e2e:
test-http-e2e:
test-nats-e2e:
test-redis-source-e2e:
test-sdks-e2e:
test-reduce-e2e:
test-%:
Expand Down
50 changes: 50 additions & 0 deletions api/json-schema/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -18899,6 +18899,53 @@
},
"type": "object"
},
"io.numaproj.numaflow.v1alpha1.RedisStreamsSource": {
"properties": {
"consumerGroup": {
"type": "string"
},
"masterName": {
"description": "Only required when Sentinel is used",
"type": "string"
},
"password": {
"$ref": "#/definitions/io.k8s.api.core.v1.SecretKeySelector",
"description": "Redis password secret selector"
},
"readFromBeginning": {
"description": "if true, stream starts being read from the beginning; otherwise, the latest",
"type": "boolean"
},
"sentinelPassword": {
"$ref": "#/definitions/io.k8s.api.core.v1.SecretKeySelector",
"description": "Sentinel password secret selector"
},
"sentinelUrl": {
"description": "Sentinel URL, will be ignored if Redis URL is provided",
"type": "string"
},
"stream": {
"type": "string"
},
"tls": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.TLS"
},
"url": {
"description": "Redis URL",
"type": "string"
},
"user": {
"description": "Redis user",
"type": "string"
}
},
"required": [
"stream",
"consumerGroup",
"readFromBeginning"
],
"type": "object"
},
"io.numaproj.numaflow.v1alpha1.SASL": {
"properties": {
"gssapi": {
Expand Down Expand Up @@ -19036,6 +19083,9 @@
"nats": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.NatsSource"
},
"redisStreams": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.RedisStreamsSource"
},
"transformer": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.UDTransformer"
}
Expand Down
50 changes: 50 additions & 0 deletions api/openapi-spec/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -18885,6 +18885,53 @@
}
}
},
"io.numaproj.numaflow.v1alpha1.RedisStreamsSource": {
"type": "object",
"required": [
"stream",
"consumerGroup",
"readFromBeginning"
],
"properties": {
"consumerGroup": {
"type": "string"
},
"masterName": {
"description": "Only required when Sentinel is used",
"type": "string"
},
"password": {
"description": "Redis password secret selector",
"$ref": "#/definitions/io.k8s.api.core.v1.SecretKeySelector"
},
"readFromBeginning": {
"description": "if true, stream starts being read from the beginning; otherwise, the latest",
"type": "boolean"
},
"sentinelPassword": {
"description": "Sentinel password secret selector",
"$ref": "#/definitions/io.k8s.api.core.v1.SecretKeySelector"
},
"sentinelUrl": {
"description": "Sentinel URL, will be ignored if Redis URL is provided",
"type": "string"
},
"stream": {
"type": "string"
},
"tls": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.TLS"
},
"url": {
"description": "Redis URL",
"type": "string"
},
"user": {
"description": "Redis user",
"type": "string"
}
}
},
"io.numaproj.numaflow.v1alpha1.SASL": {
"type": "object",
"required": [
Expand Down Expand Up @@ -19023,6 +19070,9 @@
"nats": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.NatsSource"
},
"redisStreams": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.RedisStreamsSource"
},
"transformer": {
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.UDTransformer"
}
Expand Down
81 changes: 81 additions & 0 deletions config/base/crds/full/numaflow.numaproj.io_pipelines.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4186,6 +4186,87 @@ spec:
- subject
- url
type: object
redisStreams:
properties:
consumerGroup:
type: string
masterName:
type: string
password:
properties:
key:
type: string
name:
type: string
optional:
type: boolean
required:
- key
type: object
readFromBeginning:
type: boolean
sentinelPassword:
properties:
key:
type: string
name:
type: string
optional:
type: boolean
required:
- key
type: object
sentinelUrl:
type: string
stream:
type: string
tls:
properties:
caCertSecret:
properties:
key:
type: string
name:
type: string
optional:
type: boolean
required:
- key
type: object
clientCertSecret:
properties:
key:
type: string
name:
type: string
optional:
type: boolean
required:
- key
type: object
clientKeySecret:
properties:
key:
type: string
name:
type: string
optional:
type: boolean
required:
- key
type: object
insecureSkipVerify:
type: boolean
type: object
url:
type: string
user:
type: string
required:
- consumerGroup
- readFromBeginning
- stream
type: object
transformer:
properties:
builtin:
Expand Down
81 changes: 81 additions & 0 deletions config/base/crds/full/numaflow.numaproj.io_vertices.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2643,6 +2643,87 @@ spec:
- subject
- url
type: object
redisStreams:
properties:
consumerGroup:
type: string
masterName:
type: string
password:
properties:
key:
type: string
name:
type: string
optional:
type: boolean
required:
- key
type: object
readFromBeginning:
type: boolean
sentinelPassword:
properties:
key:
type: string
name:
type: string
optional:
type: boolean
required:
- key
type: object
sentinelUrl:
type: string
stream:
type: string
tls:
properties:
caCertSecret:
properties:
key:
type: string
name:
type: string
optional:
type: boolean
required:
- key
type: object
clientCertSecret:
properties:
key:
type: string
name:
type: string
optional:
type: boolean
required:
- key
type: object
clientKeySecret:
properties:
key:
type: string
name:
type: string
optional:
type: boolean
required:
- key
type: object
insecureSkipVerify:
type: boolean
type: object
url:
type: string
user:
type: string
required:
- consumerGroup
- readFromBeginning
- stream
type: object
transformer:
properties:
builtin:
Expand Down
Loading

0 comments on commit 968cc5f

Please sign in to comment.