Skip to content

Commit

Permalink
feat: controller to create buckets (#47)
Browse files Browse the repository at this point in the history
  • Loading branch information
whynowy authored Jun 5, 2022
1 parent 8328739 commit f423002
Show file tree
Hide file tree
Showing 9 changed files with 151 additions and 40 deletions.
12 changes: 12 additions & 0 deletions config/base/controller-manager/numaflow-controller-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,18 @@ data:
consumer:
ackWait: 60s
maxAckPending: 20000
otBucket:
maxValueSize: 0
history: 1
ttl: 72h
maxBytes: 0
replicas: 3
procBucket:
maxValueSize: 0
history: 1
ttl: 72h
maxBytes: 0
replicas: 3
versions:
- version: latest
natsImage: nats:2.8.3
Expand Down
22 changes: 13 additions & 9 deletions config/base/crds/numaflow.numaproj.io_interstepbufferservices.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -910,15 +910,19 @@ spec:
type: object
type: object
bufferConfig:
description: Optional configuration for the streams and consumers
to be created in this JetStream service, if specified, it will
be merged with the default configuration in numaflow-controller-config.
It accepts a YAML format configuration, it may include 2 sections,
"stream" and "consumer". Available fields under "stream" include
"retention" (e.g. interest, limits, workerQueue), "maxMsgs",
"maxAge" (e.g. 72h), "replicas" (1, 3, 5), "duplicates" (e.g.
5m). Available fields under "consumer" include "ackWait" (e.g.
60s)
description: Optional configuration for the streams, consumers
and buckets to be created in this JetStream service, if specified,
it will be merged with the default configuration in numaflow-controller-config.
It accepts a YAML format configuration, it may include 4 sections,
"stream", "consumer", "otBucket" and "procBucket". Available
fields under "stream" include "retention" (e.g. interest, limits,
workerQueue), "maxMsgs", "maxAge" (e.g. 72h), "replicas" (1,
3, 5), "duplicates" (e.g. 5m). Available fields under "consumer"
include "ackWait" (e.g. 60s) Available fields under "otBucket"
include "maxValueSize", "history", "ttl" (e.g. 72h), "maxBytes",
"replicas" (1, 3, 5). Available fields under "procBucket" include
"maxValueSize", "history", "ttl" (e.g. 72h), "maxBytes", "replicas"
(1, 3, 5).
type: string
containerTemplate:
description: ContainerTemplate contains customized spec for NATS
Expand Down
34 changes: 25 additions & 9 deletions config/install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -909,15 +909,19 @@ spec:
type: object
type: object
bufferConfig:
description: Optional configuration for the streams and consumers
to be created in this JetStream service, if specified, it will
be merged with the default configuration in numaflow-controller-config.
It accepts a YAML format configuration, it may include 2 sections,
"stream" and "consumer". Available fields under "stream" include
"retention" (e.g. interest, limits, workerQueue), "maxMsgs",
"maxAge" (e.g. 72h), "replicas" (1, 3, 5), "duplicates" (e.g.
5m). Available fields under "consumer" include "ackWait" (e.g.
60s)
description: Optional configuration for the streams, consumers
and buckets to be created in this JetStream service, if specified,
it will be merged with the default configuration in numaflow-controller-config.
It accepts a YAML format configuration, it may include 4 sections,
"stream", "consumer", "otBucket" and "procBucket". Available
fields under "stream" include "retention" (e.g. interest, limits,
workerQueue), "maxMsgs", "maxAge" (e.g. 72h), "replicas" (1,
3, 5), "duplicates" (e.g. 5m). Available fields under "consumer"
include "ackWait" (e.g. 60s) Available fields under "otBucket"
include "maxValueSize", "history", "ttl" (e.g. 72h), "maxBytes",
"replicas" (1, 3, 5). Available fields under "procBucket" include
"maxValueSize", "history", "ttl" (e.g. 72h), "maxBytes", "replicas"
(1, 3, 5).
type: string
containerTemplate:
description: ContainerTemplate contains customized spec for NATS
Expand Down Expand Up @@ -12666,6 +12670,18 @@ data:
consumer:
ackWait: 60s
maxAckPending: 20000
otBucket:
maxValueSize: 0
history: 1
ttl: 72h
maxBytes: 0
replicas: 3
procBucket:
maxValueSize: 0
history: 1
ttl: 72h
maxBytes: 0
replicas: 3
versions:
- version: latest
natsImage: nats:2.8.3
Expand Down
1 change: 1 addition & 0 deletions controllers/isbsvc/installer/installer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ func TestInstall(t *testing.T) {
assert.NoError(t, err)
assert.True(t, testObj.Status.IsReady())
assert.NotNil(t, testObj.Status.Config.JetStream)
assert.NotEmpty(t, testObj.Status.Config.JetStream.BufferConfig)
assert.NotEmpty(t, testObj.Status.Config.JetStream.URL)
assert.NotNil(t, testObj.Status.Config.JetStream.Auth)
assert.NotNil(t, testObj.Status.Config.JetStream.Auth.User)
Expand Down
20 changes: 12 additions & 8 deletions docs/APIs.md
Original file line number Diff line number Diff line change
Expand Up @@ -1589,14 +1589,18 @@ available arguments.
<td>
<em>(Optional)</em>
<p>
Optional configuration for the streams and consumers to be created in
this JetStream service, if specified, it will be merged with the default
configuration in numaflow-controller-config. It accepts a YAML format
configuration, it may include 2 sections, “stream” and “consumer”.
Available fields under “stream” include “retention” (e.g. interest,
limits, workerQueue), “maxMsgs”, “maxAge” (e.g. 72h), “replicas” (1, 3,
5), “duplicates” (e.g. 5m). Available fields under “consumer” include
“ackWait” (e.g. 60s)
Optional configuration for the streams, consumers and buckets to be
created in this JetStream service, if specified, it will be merged with
the default configuration in numaflow-controller-config. It accepts a
YAML format configuration, it may include 4 sections, “stream”,
“consumer”, “otBucket” and “procBucket”. Available fields under “stream”
include “retention” (e.g. interest, limits, workerQueue), “maxMsgs”,
“maxAge” (e.g. 72h), “replicas” (1, 3, 5), “duplicates” (e.g. 5m).
Available fields under “consumer” include “ackWait” (e.g. 60s) Available
fields under “otBucket” include “maxValueSize”, “history”, “ttl”
(e.g. 72h), “maxBytes”, “replicas” (1, 3, 5). Available fields under
“procBucket” include “maxValueSize”, “history”, “ttl” (e.g. 72h),
“maxBytes”, “replicas” (1, 3, 5).
</p>
</td>
</tr>
Expand Down
6 changes: 4 additions & 2 deletions pkg/apis/numaflow/v1alpha1/generated.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions pkg/apis/numaflow/v1alpha1/jetstream_buffer_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,12 @@ type JetStreamBufferService struct {
// Check https://docs.nats.io/ for all the available arguments.
// +optional
StartArgs []string `json:"startArgs,omitempty" protobuf:"bytes,17,rep,name=startArgs"`
// Optional configuration for the streams and consumers to be created in this JetStream service, if specified, it will be merged with the default configuration in numaflow-controller-config.
// It accepts a YAML format configuration, it may include 2 sections, "stream" and "consumer".
// Optional configuration for the streams, consumers and buckets to be created in this JetStream service, if specified, it will be merged with the default configuration in numaflow-controller-config.
// It accepts a YAML format configuration, it may include 4 sections, "stream", "consumer", "otBucket" and "procBucket".
// Available fields under "stream" include "retention" (e.g. interest, limits, workerQueue), "maxMsgs", "maxAge" (e.g. 72h), "replicas" (1, 3, 5), "duplicates" (e.g. 5m).
// Available fields under "consumer" include "ackWait" (e.g. 60s)
// Available fields under "otBucket" include "maxValueSize", "history", "ttl" (e.g. 72h), "maxBytes", "replicas" (1, 3, 5).
// Available fields under "procBucket" include "maxValueSize", "history", "ttl" (e.g. 72h), "maxBytes", "replicas" (1, 3, 5).
// +optional
BufferConfig *string `json:"bufferConfig,omitempty" protobuf:"bytes,18,opt,name=bufferConfig"`
// Whether encrypt the data at rest, defaults to false
Expand Down
2 changes: 1 addition & 1 deletion pkg/isbsvc/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
)

// ISBService is an interface used to do the operations on ISBS
// ISBService is an interface used to do the operations on ISBSvc
type ISBService interface {
CreateBuffers(ctx context.Context, buffers []string, opts ...BufferCreateOption) error
DeleteBuffers(ctx context.Context, buffers []string) error
Expand Down
88 changes: 79 additions & 9 deletions pkg/isbsvc/jetstream_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,13 @@ func (jss *jetStreamSvc) CreateBuffers(ctx context.Context, buffers []string, op
}
for _, b := range buffers {
// Create a stream for each buffer
streamName := streamName(jss.pipelineName, b)
streamName := JetStreamName(jss.pipelineName, b)
_, err := js.StreamInfo(streamName)
if err != nil {
if !errors.Is(err, nats.ErrStreamNotFound) {
return fmt.Errorf("failed to query information of stream %q during buffer creating, %w", streamName, err)
}
if _, err = js.AddStream(&nats.StreamConfig{
if _, err := js.AddStream(&nats.StreamConfig{
Name: streamName,
Subjects: []string{streamName}, // Use the stream name as the only subject
Retention: nats.RetentionPolicy(v.GetInt("stream.retention")),
Expand All @@ -99,6 +99,46 @@ func (jss *jetStreamSvc) CreateBuffers(ctx context.Context, buffers []string, op
}
log.Infow("Succeeded to create a consumer for a stream", zap.String("stream", streamName), zap.String("consumer", streamName))
}
// Create offset-timeline bucket
otBucket := JetStreamOTBucket(jss.pipelineName, b)
if _, err := js.KeyValue(otBucket); err != nil {
if !errors.Is(err, nats.ErrBucketNotFound) {
return fmt.Errorf("failed to query information of bucket %q during buffer creating, %w", otBucket, err)
}
if _, err := js.CreateKeyValue(&nats.KeyValueConfig{
Bucket: otBucket,
Description: fmt.Sprintf("Offset timeline bucket of buffer [%s]", b),
MaxValueSize: v.GetInt32("otBucket.maxValueSize"),
History: uint8(v.GetUint("otBucket.history")),
TTL: v.GetDuration("otBucket.ttl"),
MaxBytes: v.GetInt64("otBucket.maxBytes"),
Storage: nats.FileStorage,
Replicas: v.GetInt("otBucket.replicas"),
Placement: nil,
}); err != nil {
return fmt.Errorf("failed to create offset timeline bucket %q, %w", otBucket, err)
}
}
// Create processor bucket
procBucket := JetStreamProcessorBucket(jss.pipelineName, b)
if _, err := js.KeyValue(procBucket); err != nil {
if !errors.Is(err, nats.ErrBucketNotFound) {
return fmt.Errorf("failed to query information of bucket %q during buffer creating, %w", procBucket, err)
}
if _, err := js.CreateKeyValue(&nats.KeyValueConfig{
Bucket: procBucket,
Description: fmt.Sprintf("Processor bucket of buffer [%s]", b),
MaxValueSize: v.GetInt32("procBucket.maxValueSize"),
History: uint8(v.GetUint("procBucket.history")),
TTL: v.GetDuration("procBucket.ttl"),
MaxBytes: v.GetInt64("procBucket.maxBytes"),
Storage: nats.FileStorage,
Replicas: v.GetInt("procBucket.replicas"),
Placement: nil,
}); err != nil {
return fmt.Errorf("failed to create processor bucket %q, %w", otBucket, err)
}
}
}
return nil
}
Expand All @@ -115,16 +155,27 @@ func (jss *jetStreamSvc) DeleteBuffers(ctx context.Context, buffers []string) er
return fmt.Errorf("failed to get a js context from nats connection, %w", err)
}
for _, b := range buffers {
streamName := fmt.Sprintf("%s-%s", jss.pipelineName, b)
streamName := JetStreamName(jss.pipelineName, b)
if err := js.DeleteStream(streamName); err != nil && !errors.Is(err, nats.ErrStreamNotFound) {
return fmt.Errorf("failed to delete stream %q, %w", streamName, err)
}
log.Infow("succeeded to delete a stream", zap.String("stream", streamName))
log.Infow("Succeeded to delete a stream", zap.String("stream", streamName))
otBucket := JetStreamOTBucket(jss.pipelineName, b)
if err := js.DeleteKeyValue(otBucket); err != nil && !errors.Is(err, nats.ErrBucketNotFound) {
return fmt.Errorf("failed to delete offset timeline bucket %q, %w", otBucket, err)
}
log.Infow("Succeeded to delete an offset timeline bucket", zap.String("bucket", otBucket))
procBucket := JetStreamProcessorBucket(jss.pipelineName, b)
if err := js.DeleteKeyValue(procBucket); err != nil && !errors.Is(err, nats.ErrBucketNotFound) {
return fmt.Errorf("failed to delete processor bucket %q, %w", procBucket, err)
}
log.Infow("Succeeded to delete a processor bucket", zap.String("bucket", procBucket))
}
return nil
}

func (jss *jetStreamSvc) ValidateBuffers(ctx context.Context, buffers []string) error {
log := logging.FromContext(ctx)
nc, err := clients.NewInClusterJetStreamClient().Connect(ctx)
if err != nil {
return fmt.Errorf("failed to get an in-cluster nats connection, %w", err)
Expand All @@ -135,11 +186,22 @@ func (jss *jetStreamSvc) ValidateBuffers(ctx context.Context, buffers []string)
return fmt.Errorf("failed to get a js context from nats connection, %w", err)
}
for _, b := range buffers {
streamName := fmt.Sprintf("%s-%s", jss.pipelineName, b)
_, err := js.StreamInfo(streamName)
if err != nil {
streamName := JetStreamName(jss.pipelineName, b)
if _, err := js.StreamInfo(streamName); err != nil {
return fmt.Errorf("failed to query information of stream %q, %w", streamName, err)
}

otBucket := JetStreamOTBucket(jss.pipelineName, b)
if _, err := js.KeyValue(otBucket); err != nil {
// TODO: throw an error
log.Warnw("Failed to query bucket", zap.String("bucket", otBucket), zap.Error(err))
}

procBucket := JetStreamProcessorBucket(jss.pipelineName, b)
if _, err := js.KeyValue(procBucket); err != nil {
// TODO: throw an error
log.Warnw("Failed to query bucket", zap.String("bucket", procBucket), zap.Error(err))
}
}
return nil
}
Expand All @@ -159,7 +221,7 @@ func (jss *jetStreamSvc) GetBufferInfo(ctx context.Context, buffer string) (*Buf
return nil, fmt.Errorf("failed to get a JetStream context from nats connection, %w", err)
}
}
streamName := streamName(jss.pipelineName, buffer)
streamName := JetStreamName(jss.pipelineName, buffer)
stream, err := js.StreamInfo(streamName)
if err != nil {
return nil, fmt.Errorf("failed to get information of stream %q", streamName)
Expand All @@ -181,6 +243,14 @@ func (jss *jetStreamSvc) GetBufferInfo(ctx context.Context, buffer string) (*Buf
return bufferInfo, nil
}

func streamName(pipelineName, bufferName string) string {
func JetStreamName(pipelineName, bufferName string) string {
return fmt.Sprintf("%s-%s", pipelineName, bufferName)
}

func JetStreamOTBucket(pipelineName, bufferName string) string {
return fmt.Sprintf("%s-%s_OT", pipelineName, bufferName)
}

func JetStreamProcessorBucket(pipelineName, bufferName string) string {
return fmt.Sprintf("%s-%s_PROCESSORS", pipelineName, bufferName)
}

0 comments on commit f423002

Please sign in to comment.