Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kinesis Client Library v2 #75

Open
hoangtrucit opened this issue Aug 27, 2018 · 17 comments
Open

Kinesis Client Library v2 #75

hoangtrucit opened this issue Aug 27, 2018 · 17 comments

Comments

@hoangtrucit
Copy link

Now, I using Kinesis Client Library v2 and I have problem

[aws-java-sdk-NettyEventLoop-2-7] DEBUG software.amazon.awssdk.http.nio.netty.internal.http2.SdkHttp2FrameLogger - OUTBOUND SETTINGS: ack=false settings={INITIAL_WINDOW_SIZE=1048576, MAX_HEADER_LIST_SIZE=8192}

[aws-java-sdk-NettyEventLoop-2-7] DEBUG software.amazon.awssdk.http.nio.netty.internal.http2.SdkHttp2FrameLogger - OUTBOUND WINDOW_UPDATE: streamId=0 windowSizeIncrement=1966082

[aws-java-sdk-NettyEventLoop-2-0] ERROR software.amazon.awssdk.http.nio.netty.internal.RunnableRequest - Failed to create connection to http://0.0.0.0:4568/

Do this currently support Kinesis Client Library v2 ? If yes, how to solve it ?

@arjantop
Copy link

arjantop commented Sep 3, 2018

@hoangtrucit Are you trying to use the new SubscribeToShard api?

@prascuna
Copy link

prascuna commented Sep 4, 2018

Similar issue here, using KCL 2.0.1

[aws-java-sdk-NettyEventLoop-2-3] ERROR software.amazon.awssdk.http.nio.netty.internal.RunnableRequest - Failed to create connection to http://localhost:4567/
java.io.IOException: The channel was closed before the protocol could be determined.
	at software.amazon.awssdk.http.nio.netty.internal.ChannelPipelineInitializer$1.channelUnregistered(ChannelPipelineInitializer.java:99)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:181)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:167)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:160)
	at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:181)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:167)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:160)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelUnregistered(DefaultChannelPipeline.java:1412)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:181)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:167)
	at io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:865)
	at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:830)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:464)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884)
	at java.lang.Thread.run(Thread.java:745)

@yonekawa
Copy link

KCL v2 use fan-out strategy as default. the strategy use RegisterStreamConsumer and SubscribeToShard that is not supported by kinesalite.
https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java#L91

I guess we can change strategy to polling by set RetrievalConfig to Scheduler to be use GetRecords API.

@prascuna
Copy link

I managed to make some progress on this, but I eventually gave up.
Kinesalite doesn't seem to support HTTP2, so I injected an HTTP1_1 client into KinesisAsyncClient.
Also I had to disable CBOR via system property and I had to import kinesalite's certificate in my JKS like that:

sudo keytool -import -alias kinesalite -keystore "/Library/Java/JavaVirtualMachines/jdk1.8.0_181.jdk/Contents/Home/jre/lib/security/cacerts" -file server-crt.pem
  System.setProperty("aws.cborEnabled", "false")
  val kinesisClient = KinesisAsyncClient
    .builder()
    .region(Region.US_EAST_1)
    .httpClient(NettyNioAsyncHttpClient.builder().protocol(Protocol.HTTP1_1).build())
    .endpointOverride(new URI("https://kinesalite:4567"))
    .build()

At this point the connection could be established, but kinesalite doesn't implement a bunch of other features ( I just stopped at DescribeStreamSummary and DescribeStreamConsumer)

@MariaLapovska
Copy link

@prascuna does that mean that kinesalite doesn't fully support the KCL v2 at the moment? I followed your advice and it helped to set the connection with kinesis, but I'm stiil getting the following exception, during DescribeStreamSummary request:

2018-12-17 17:49:42:600 36344 [main] ERROR s.a.kinesis.coordinator.Scheduler - Worker.run caught exception, sleeping for 1000 milli seconds!
software.amazon.awssdk.services.kinesis.model.KinesisException: null (Service: Kinesis, Status Code: 400, Request ID: 5e94b930-0213-11e9-82e8-6981636a0842)
	at software.amazon.awssdk.services.kinesis.model.KinesisException$BuilderImpl.build(KinesisException.java:95)
	at software.amazon.awssdk.services.kinesis.model.KinesisException$BuilderImpl.build(KinesisException.java:56)
	at software.amazon.awssdk.core.internal.http.response.SdkErrorResponseHandler.handle(SdkErrorResponseHandler.java:46)
	at software.amazon.awssdk.core.internal.http.response.SdkErrorResponseHandler.handle(SdkErrorResponseHandler.java:30)
	at software.amazon.awssdk.core.internal.http.async.SyncResponseHandlerAdapter.complete(SyncResponseHandlerAdapter.java:92)
	at software.amazon.awssdk.core.client.handler.BaseAsyncClientHandler$InterceptorCallingHttpResponseHandler.complete(BaseAsyncClientHandler.java:225)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage$ResponseHandler.handleResponse(MakeAsyncHttpRequestStage.java:185)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage$ResponseHandler.complete(MakeAsyncHttpRequestStage.java:171)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage$ResponseHandler.complete(MakeAsyncHttpRequestStage.java:122)
	at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler$PublisherAdapter$1.onComplete(ResponseHandler.java:262)
	at com.typesafe.netty.HandlerPublisher.complete(HandlerPublisher.java:408)
	at com.typesafe.netty.HandlerPublisher.handlerRemoved(HandlerPublisher.java:395)
	at io.netty.channel.DefaultChannelPipeline.callHandlerRemoved0(DefaultChannelPipeline.java:670)
	at io.netty.channel.DefaultChannelPipeline.remove(DefaultChannelPipeline.java:505)
	at io.netty.channel.DefaultChannelPipeline.remove(DefaultChannelPipeline.java:451)
	at com.typesafe.netty.http.HttpStreamsHandler.removeHandlerIfActive(HttpStreamsHandler.java:328)
	at com.typesafe.netty.http.HttpStreamsHandler.handleReadHttpContent(HttpStreamsHandler.java:189)
	at com.typesafe.netty.http.HttpStreamsHandler.channelRead(HttpStreamsHandler.java:165)
	at com.typesafe.netty.http.HttpStreamsClientHandler.channelRead(HttpStreamsClientHandler.java:148)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284)
	at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:646)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:581)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:498)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:460)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884)
	at java.lang.Thread.run(Thread.java:748)

@oripwk
Copy link

oripwk commented Jan 7, 2019

I confirm AWS v2 is not working. I had and solved the following issues:

  1. io.netty.handler.codec.http2.Http2Exception: First received frame was not SETTINGS. Hex dump for first 5 bytes: 3c68656164
    Was solved by configuring Netty to use HTTP 1.1
  2. software.amazon.awssdk.services.kinesis.model.KinesisException: null (Service: Kinesis, Status Code: 502, Request ID: null)
    Was solved by setting env var CBOR_ENABLED to false

Now it's stuck on getting HTTP 400 error.

So AWS Java SDK v2 is not supported

@etspaceman
Copy link

@mhart What's the status of this? We'd really like to use kinesalite for the 2.x consumer as the 1.x consumer hasn't had a release for nearly a year.

@mhart
Copy link
Owner

mhart commented Mar 5, 2019

@etspaceman the status is that it's still open. I know nothing about KCL v2 – it seems like there's a lot of work involved in supporting this, but perhaps the localstack folks know more?

@israelcolomer
Copy link

We really need this feature too! And I presume more will follow as using the KCL v2 becomes more standardised. Looks like on localstack they're waiting for kinesalite to address it (referencing this current issue) on localstack/localstack#893.

@Harshank
Copy link

Any updates on this issue? Thanks!

@etspaceman
Copy link

You can get the V2 library to work w/ localstack (kinesalite), see comment here:

localstack/localstack#893 (comment)

@rwinograd
Copy link

rwinograd commented May 1, 2020

@mhart regarding your question about complexity, LocalStack has patched their layer that sits before Kinesalite to add API implementations for the missing APIs (RegisterStreamConsumer, DeregisterStreamConsumer, ListStreamConsumers, and DescribeStreamConsumer).

They back the register/deregister/list with an in-memory data structure and avoid any complexity from the NextToken API. That seems to have been sufficient to fix the KCL v2 integration issues.

@slnowak
Copy link

slnowak commented May 23, 2020

@rwinograd isn't SubscribeToShard missing though? I'm using latest version of localstack and I'm unable to make it working with KCLv2 (well - it is possible, but you have to set RetrievalSpecificConfig to PollingConfig in KCL settings)

@Anja05
Copy link

Anja05 commented Sep 25, 2020

I confirm AWS v2 is not working. I had and solved the following issues:

  1. io.netty.handler.codec.http2.Http2Exception: First received frame was not SETTINGS. Hex dump for first 5 bytes: 3c68656164
    Was solved by configuring Netty to use HTTP 1.1
  2. software.amazon.awssdk.services.kinesis.model.KinesisException: null (Service: Kinesis, Status Code: 502, Request ID: null)
    Was solved by setting env var CBOR_ENABLED to false

Now it's stuck on getting HTTP 400 error.

So AWS Java SDK v2 is not supported

Hi @oripwk did you solve this problem. I m also facing the same issue.Stuck on getting 400 error. Please suggest if you resolve this problem

@prateeksinghal10
Copy link

@oripwk @Anja05 any work arounds for http 400 error?

@Anja05
Copy link

Anja05 commented Nov 6, 2020

@oripwk @Anja05 any work arounds for http 400 error?
@prateeksinghal10
Hope this is resolved!! If not please check your polling configuration. Please set up your polling configuration if it is not set. In localstack you have to explicitly specify to poll the stream
//Sample
final PollingConfig pollingConfig =
new PollingConfig(awsKinesisConfiguration.getStreamName(), kinesisAsyncClient);
retrievalConfig.retrievalSpecificConfig(pollingConfig);

@upanshu21
Copy link

@oripwk @Anja05 any work arounds for http 400 error?
@prateeksinghal10
Hope this is resolved!! If not please check your polling configuration. Please set up your polling configuration if it is not set. In localstack you have to explicitly specify to poll the stream
//Sample
final PollingConfig pollingConfig =
new PollingConfig(awsKinesisConfiguration.getStreamName(), kinesisAsyncClient);
retrievalConfig.retrievalSpecificConfig(pollingConfig);

@Anja05 where do we specify the polling config?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests