Skip to content

Commit

Permalink
feat: configurable jetstream storage (#328)
Browse files Browse the repository at this point in the history
Signed-off-by: jyu6 <[email protected]>
  • Loading branch information
jy4096 authored and whynowy committed Nov 18, 2022
1 parent ee5cd64 commit 049e5c6
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 3 deletions.
6 changes: 6 additions & 0 deletions config/advanced-install/namespaced-controller-wo-crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ data:
maxMsgs: 2000000
maxAge: 168h
maxBytes: -1
# 0: File, 1: Memory
storage: 0
replicas: 3
duplicates: 60s
# The default consumer properties for the created streams
Expand All @@ -147,12 +149,16 @@ data:
history: 5
ttl: 3h
maxBytes: 0
# 0: File, 1: Memory
storage: 0
replicas: 3
procBucket:
maxValueSize: 0
history: 1
ttl: 72h
maxBytes: 0
# 0: File, 1: Memory
storage: 0
replicas: 3
versions:
- version: latest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ data:
maxMsgs: 2000000
maxAge: 168h
maxBytes: -1
# 0: File, 1: Memory
storage: 0
replicas: 3
duplicates: 60s
# The default consumer properties for the created streams
Expand All @@ -63,12 +65,16 @@ data:
history: 5
ttl: 3h
maxBytes: 0
# 0: File, 1: Memory
storage: 0
replicas: 3
procBucket:
maxValueSize: 0
history: 1
ttl: 72h
maxBytes: 0
# 0: File, 1: Memory
storage: 0
replicas: 3
versions:
- version: latest
Expand Down
6 changes: 6 additions & 0 deletions config/install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10451,6 +10451,8 @@ data:
maxMsgs: 2000000
maxAge: 168h
maxBytes: -1
# 0: File, 1: Memory
storage: 0
replicas: 3
duplicates: 60s
# The default consumer properties for the created streams
Expand All @@ -10462,12 +10464,16 @@ data:
history: 5
ttl: 3h
maxBytes: 0
# 0: File, 1: Memory
storage: 0
replicas: 3
procBucket:
maxValueSize: 0
history: 1
ttl: 72h
maxBytes: 0
# 0: File, 1: Memory
storage: 0
replicas: 3
versions:
- version: latest
Expand Down
6 changes: 6 additions & 0 deletions config/namespace-install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10362,6 +10362,8 @@ data:
maxMsgs: 2000000
maxAge: 168h
maxBytes: -1
# 0: File, 1: Memory
storage: 0
replicas: 3
duplicates: 60s
# The default consumer properties for the created streams
Expand All @@ -10373,12 +10375,16 @@ data:
history: 5
ttl: 3h
maxBytes: 0
# 0: File, 1: Memory
storage: 0
replicas: 3
procBucket:
maxValueSize: 0
history: 1
ttl: 72h
maxBytes: 0
# 0: File, 1: Memory
storage: 0
replicas: 3
versions:
- version: latest
Expand Down
2 changes: 2 additions & 0 deletions docs/inter-step-buffer-service.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ bufferConfig: |
maxMsgs: 50000
maxAge: 168h
maxBytes: -1
# 0: File, 1: Memory
storage: 0
replicas: 3
duplicates: 60s
# The consumer properties for the created streams
Expand Down
6 changes: 6 additions & 0 deletions docs/numaflow-controller-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ data:
maxMsgs: 2000000
maxAge: 168h
maxBytes: -1
# 0: File, 1: Memory
storage: 0
replicas: 3
duplicates: 60s
# The default consumer properties for the created streams
Expand All @@ -63,12 +65,16 @@ data:
history: 5
ttl: 72h
maxBytes: 0
# 0: File, 1: Memory
storage: 0
replicas: 3
procBucket:
maxValueSize: 0
history: 1
ttl: 72h
maxBytes: 0
# 0: File, 1: Memory
storage: 0
replicas: 3
versions:
- version: latest
Expand Down
6 changes: 3 additions & 3 deletions pkg/isbsvc/jetstream_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (jss *jetStreamSvc) CreateBuffers(ctx context.Context, buffers []dfv1.Buffe
MaxMsgs: v.GetInt64("stream.maxMsgs"),
MaxAge: v.GetDuration("stream.maxAge"),
MaxBytes: v.GetInt64("stream.maxBytes"),
Storage: nats.FileStorage,
Storage: nats.StorageType(v.GetInt("stream.storage")),
Replicas: v.GetInt("stream.replicas"),
Duplicates: v.GetDuration("stream.duplicates"), // No duplication in this period
}); err != nil {
Expand Down Expand Up @@ -133,7 +133,7 @@ func (jss *jetStreamSvc) CreateBuffers(ctx context.Context, buffers []dfv1.Buffe
History: uint8(v.GetUint("otBucket.history")),
TTL: v.GetDuration("otBucket.ttl"),
MaxBytes: v.GetInt64("otBucket.maxBytes"),
Storage: nats.FileStorage,
Storage: nats.StorageType(v.GetInt("otBucket.storage")),
Replicas: v.GetInt("otBucket.replicas"),
Placement: nil,
}); err != nil {
Expand All @@ -153,7 +153,7 @@ func (jss *jetStreamSvc) CreateBuffers(ctx context.Context, buffers []dfv1.Buffe
History: uint8(v.GetUint("procBucket.history")),
TTL: v.GetDuration("procBucket.ttl"),
MaxBytes: v.GetInt64("procBucket.maxBytes"),
Storage: nats.FileStorage,
Storage: nats.StorageType(v.GetInt("procBucket.storage")),
Replicas: v.GetInt("procBucket.replicas"),
Placement: nil,
}); err != nil {
Expand Down

0 comments on commit 049e5c6

Please sign in to comment.