Skip to content

Commit

Permalink
Add StreamsConsumer
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed May 31, 2019
1 parent f9b355e commit 3f69f41
Show file tree
Hide file tree
Showing 9 changed files with 784 additions and 1 deletion.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ The `Unreleased` section name is replaced by the expected version of next releas
## [Unreleased]

### Added

- Add `StreamsConsumer` [#35](https://github.com/jet/Jet.ConfluentKafka.FSharp/pull/35)

### Changed

- Split reusable components of `ParallelConsumer` out into independent `Propulsion` and `Propulsion.Kafka` libraries [#34](https://github.com/jet/Jet.ConfluentKafka.FSharp/pull/34)
Expand Down
7 changes: 7 additions & 0 deletions Jet.ConfluentKafka.FSharp.sln
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ Project("{6EC3EE1D-3C4E-46DD-8F32-0CC8E7565705}") = "Propulsion", "src\Propulsio
EndProject
Project("{6EC3EE1D-3C4E-46DD-8F32-0CC8E7565705}") = "Propulsion.Kafka", "src\Propulsion.Kafka\Propulsion.Kafka.fsproj", "{5F176C54-609B-4D2E-804B-3C1F60ADDAF4}"
EndProject
Project("{6EC3EE1D-3C4E-46DD-8F32-0CC8E7565705}") = "Propulsion.Tests", "tests\Propulsion.Tests\Propulsion.Tests.fsproj", "{BBD50DA2-7932-4D24-888D-C168F4788705}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand All @@ -59,6 +61,10 @@ Global
{5F176C54-609B-4D2E-804B-3C1F60ADDAF4}.Debug|Any CPU.Build.0 = Debug|Any CPU
{5F176C54-609B-4D2E-804B-3C1F60ADDAF4}.Release|Any CPU.ActiveCfg = Release|Any CPU
{5F176C54-609B-4D2E-804B-3C1F60ADDAF4}.Release|Any CPU.Build.0 = Release|Any CPU
{BBD50DA2-7932-4D24-888D-C168F4788705}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{BBD50DA2-7932-4D24-888D-C168F4788705}.Debug|Any CPU.Build.0 = Debug|Any CPU
{BBD50DA2-7932-4D24-888D-C168F4788705}.Release|Any CPU.ActiveCfg = Release|Any CPU
{BBD50DA2-7932-4D24-888D-C168F4788705}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -68,6 +74,7 @@ Global
{76802BE3-00C2-4B1D-96A2-95A3E2136DBE} = {4670F7C4-A4FD-4E3F-B97C-99F9B3FC1898}
{0F72360F-1C14-46E3-9A60-B6BF87BD726D} = {4670F7C4-A4FD-4E3F-B97C-99F9B3FC1898}
{5F176C54-609B-4D2E-804B-3C1F60ADDAF4} = {4670F7C4-A4FD-4E3F-B97C-99F9B3FC1898}
{BBD50DA2-7932-4D24-888D-C168F4788705} = {302B09C4-7F38-4CF7-93B9-1B7A6035386E}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {DF04AF73-7412-46E5-9CC8-15CB48E3139A}
Expand Down
1 change: 1 addition & 0 deletions build.proj
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

<Target Name="VSTest">
<Exec Command="dotnet test tests/Jet.ConfluentKafka.FSharp.Integration $(Cfg) $(TestOptions)" />
<Exec Command="dotnet test tests/Propulsion.Tests $(Cfg) $(TestOptions)" />
</Target>

<Target Name="Build" DependsOnTargets="Pack;VSTest" />
Expand Down
35 changes: 34 additions & 1 deletion src/Propulsion.Kafka/KafkaPropulsion.fs
Original file line number Diff line number Diff line change
Expand Up @@ -207,4 +207,37 @@ type ParallelConsumer private () =
?maxSubmissionsPerPartition, ?pumpInterval, ?statsInterval, ?logExternalStats) =
let mapConsumeResult (x : ConsumeResult<string,string>) = KeyValuePair(x.Key, x.Value)
ParallelConsumer.Start<KeyValuePair<string,string>>(log, config, maxDop, mapConsumeResult, handle >> Async.Catch,
?maxSubmissionsPerPartition=maxSubmissionsPerPartition, ?pumpInterval=pumpInterval, ?statsInterval=statsInterval, ?logExternalStats=logExternalStats)
?maxSubmissionsPerPartition=maxSubmissionsPerPartition, ?pumpInterval=pumpInterval, ?statsInterval=statsInterval, ?logExternalStats=logExternalStats)

type StreamsConsumer =
/// Builds a processing pipeline per the `config` running up to `dop` instances of `handle` concurrently to maximize global throughput across partitions.
/// Processor pumps until `handle` yields a `Choice2Of2` or `Stop()` is requested.
static member Start<'M>
( log : ILogger, config : Jet.ConfluentKafka.FSharp.KafkaConsumerConfig, maxDop, enumStreamItems, handle, categorize,
?maxSubmissionsPerPartition, ?pumpInterval, ?statsInterval, ?stateInterval, ?logExternalStats) =
let statsInterval, stateInterval = defaultArg statsInterval (TimeSpan.FromMinutes 5.), defaultArg stateInterval (TimeSpan.FromMinutes 5.)
let pumpInterval = defaultArg pumpInterval (TimeSpan.FromMilliseconds 5.)
let maxSubmissionsPerPartition = defaultArg maxSubmissionsPerPartition 5

let dispatcher = Streams.Scheduling.Dispatcher<_> maxDop
let stats = Streams.Scheduling.Stats(log, statsInterval, stateInterval)
let dumpStreams (streams : Streams.Scheduling.StreamStates<_>) log =
logExternalStats |> Option.iter (fun f -> f log)
streams.Dump(log, Streams.Buffering.StreamState.eventsSize, categorize)
let streamsScheduler = Streams.Scheduling.StreamSchedulingEngine.Create(dispatcher, stats, handle, dumpStreams)
let mapConsumedMessagesToStreamsBatch onCompletion (x : Submission.SubmissionBatch<KeyValuePair<string,string>>) : Streams.Scheduling.StreamsBatch<_> =
let onCompletion () = x.onCompletion(); onCompletion()
Streams.Scheduling.StreamsBatch.Create(onCompletion, Seq.collect enumStreamItems x.messages) |> fst
let tryCompactQueue (queue : Queue<Streams.Scheduling.StreamsBatch<_>>) =
let mutable acc, worked = None, false
for x in queue do
match acc with
| None -> acc <- Some x
| Some a -> if a.TryMerge x then worked <- true
worked
let submitStreamsBatch (x : Streams.Scheduling.StreamsBatch<_>) : int =
streamsScheduler.Submit x
x.RemainingStreamsCount
let submitter = Submission.SubmissionEngine(log, maxSubmissionsPerPartition, mapConsumedMessagesToStreamsBatch, submitStreamsBatch, statsInterval, pumpInterval, tryCompactQueue)
let mapResult (x : Confluent.Kafka.ConsumeResult<string,string>) = KeyValuePair(x.Key,x.Value)
PipelinedConsumer.Start(log, config, mapResult, submitter.Ingest, submitter.Pump(), streamsScheduler.Pump, dispatcher.Pump(), statsInterval)
1 change: 1 addition & 0 deletions src/Propulsion/Propulsion.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
<Compile Include="Infrastructure.fs" />
<Compile Include="Propulsion.fs" />
<Compile Include="ParallelPropulsion.fs" />
<Compile Include="StreamsPropulsion.fs" />
</ItemGroup>

<ItemGroup>
Expand Down
Loading

0 comments on commit 3f69f41

Please sign in to comment.