Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LogWatch throws IOException #3636

Closed
cmdjulian opened this issue Dec 7, 2021 · 7 comments
Closed

LogWatch throws IOException #3636

cmdjulian opened this issue Dec 7, 2021 · 7 comments
Assignees

Comments

@cmdjulian
Copy link

cmdjulian commented Dec 7, 2021

Describe the bug

In versions prior to version 5.0.0 especially 4.13.3 the following code works fine collecting and printing logs:

private val logger = KotlinLogging.logger { }

private fun streamLogs(jobId: UUID, linesToSkip: Long) {
    val reader = client.batch().jobs().withName("$jobId").watchLog().output.bufferedReader()
    streamLogs(reader, linesToSkip)
}

private fun streamLogs(reader: BufferedReader, linesToSkip: Long) {
    if (linesToSkip > 0) reader.skip(linesToSkip)
    while (!Thread.currentThread().isInterrupted) {
        // On exit null is read and method therefore terminates
        val line = reader.readLine()
        if(line.isNullOrBlank()) break else logger.info { line }
    }
}

After the latest update from Spring which is now using KubernetesClient 5.9.0 under the hood I'm getting the error attached at the bottom

The problem is not the Exception, the problem lays in when the Exception occurs. it is thrown very unpredictable. I always results in loss of log lines so its not thrown at the end of the pipe getting closed.
Sometimes a few lines from the beginning are missing, sometimes a few lines at the end are missing and sometimes it even crashes directly after the first readLine occurred and returning zero read lines.
If I have long running Jobs the problem doesn't occur, it only occurs with Jobs lasting only 1 to 2 seconds.

Before calling the LogWatch I make sure that the Pod of the Job is running because when I don't do that I get an error that the Pod is not ready for log extraction. If I use the withLogWaitTimeout(5000) method the Job is maybe already failed and therefore the method is never actually reaching the pod success state and always is running in the timeout.

If I screwed up here and there is a better way to achieve that please let me now.
What I need is waiting before the Job Pod gets ready --> stream logs (all logs line by line) and return from the method if the pod terminates --> extract exit code.

I tried it with the latest version as well with the same outcome.

Fabric8 Kubernetes Client version

5.10.1@latest

Steps to reproduce

Run Job in Kubernetes with a runtime of one to two seconds and see the log watch failing

Expected behavior

Same behavior as in version 4.13.0.
Just returning null if the stream is closed and don't throw an IOException.

Runtime

other (please specify in additional context)

Kubernetes API Server version

1.21.6

Environment

Linux

Fabric8 Kubernetes Client Logs

java.io.IOException: Pipe closed
	at java.io.PipedInputStream.read(Unknown Source) ~[?:?]
	at java.io.PipedInputStream.read(Unknown Source) ~[?:?]
	at sun.nio.cs.StreamDecoder.readBytes(Unknown Source) ~[?:?]
	at sun.nio.cs.StreamDecoder.implRead(Unknown Source) ~[?:?]
	at sun.nio.cs.StreamDecoder.read(Unknown Source) ~[?:?]
	at java.io.InputStreamReader.read(Unknown Source) ~[?:?]
	at java.io.BufferedReader.fill(Unknown Source) ~[?:?]
	at java.io.BufferedReader.readLine(Unknown Source) ~[?:?]
	at java.io.BufferedReader.readLine(Unknown Source) ~[?:?]
	at de.etalytics.scripts.kubernetes.job.JobServiceOnKubernetesImpl.streamLogs(JobServiceOnKubernetesImpl.kt:62) ~[kubernetes-1.4.1-plain.jar:?]
	at de.etalytics.scripts.kubernetes.job.JobServiceOnKubernetesImpl.streamLogs(JobServiceOnKubernetesImpl.kt:53) ~[kubernetes-1.4.1-plain.jar:?]
	at de.etalytics.scripts.kubernetes.job.JobServiceOnKubernetesImpl.retrieveLogs$lambda-1(JobServiceOnKubernetesImpl.kt:42) ~[kubernetes-1.4.1-plain.jar:?]
	at java.util.concurrent.CompletableFuture$UniAccept.tryFire(Unknown Source) ~[?:?]
	at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) ~[?:?]
	at java.util.concurrent.CompletableFuture.complete(Unknown Source) ~[?:?]
	at io.fabric8.kubernetes.client.dsl.base.BaseOperation.lambda$informOnCondition$6(BaseOperation.java:972) ~[kubernetes-client-5.9.0.jar:?]
	at io.fabric8.kubernetes.client.dsl.base.BaseOperation$2.onUpdate(BaseOperation.java:991) ~[kubernetes-client-5.9.0.jar:?]
	at io.fabric8.kubernetes.client.dsl.base.BaseOperation$2.onUpdate(BaseOperation.java:980) ~[kubernetes-client-5.9.0.jar:?]
	at io.fabric8.kubernetes.client.informers.cache.ProcessorListener$UpdateNotification.handle(ProcessorListener.java:85) ~[kubernetes-client-5.9.0.jar:?]
	at io.fabric8.kubernetes.client.informers.cache.ProcessorListener.add(ProcessorListener.java:47) ~[kubernetes-client-5.9.0.jar:?]
	at io.fabric8.kubernetes.client.informers.cache.SharedProcessor.lambda$distribute$0(SharedProcessor.java:79) ~[kubernetes-client-5.9.0.jar:?]
	at io.fabric8.kubernetes.client.informers.cache.SharedProcessor.lambda$distribute$1(SharedProcessor.java:101) ~[kubernetes-client-5.9.0.jar:?]
	at io.fabric8.kubernetes.client.utils.SerialExecutor.lambda$execute$0(SerialExecutor.java:40) ~[kubernetes-client-5.9.0.jar:?]
	at io.fabric8.kubernetes.client.utils.SerialExecutor.scheduleNext(SerialExecutor.java:52) [kubernetes-client-5.9.0.jar:?]
	at io.fabric8.kubernetes.client.utils.SerialExecutor.execute(SerialExecutor.java:46) [kubernetes-client-5.9.0.jar:?]
	at io.fabric8.kubernetes.client.informers.cache.SharedProcessor.distribute(SharedProcessor.java:98) [kubernetes-client-5.9.0.jar:?]
	at io.fabric8.kubernetes.client.informers.cache.SharedProcessor.distribute(SharedProcessor.java:79) [kubernetes-client-5.9.0.jar:?]
	at io.fabric8.kubernetes.client.informers.cache.ProcessorStore.update(ProcessorStore.java:48) [kubernetes-client-5.9.0.jar:?]
	at io.fabric8.kubernetes.client.informers.cache.ProcessorStore.update(ProcessorStore.java:29) [kubernetes-client-5.9.0.jar:?]
	at io.fabric8.kubernetes.client.informers.cache.Reflector$ReflectorWatcher.eventReceived(Reflector.java:134) [kubernetes-client-5.9.0.jar:?]
	at io.fabric8.kubernetes.client.informers.cache.Reflector$ReflectorWatcher.eventReceived(Reflector.java:114) [kubernetes-client-5.9.0.jar:?]
	at io.fabric8.kubernetes.client.utils.WatcherToggle.eventReceived(WatcherToggle.java:49) [kubernetes-client-5.9.0.jar:?]
	at io.fabric8.kubernetes.client.dsl.internal.AbstractWatchManager.eventReceived(AbstractWatchManager.java:186) [kubernetes-client-5.9.0.jar:?]
	at io.fabric8.kubernetes.client.dsl.internal.AbstractWatchManager.onMessage(AbstractWatchManager.java:242) [kubernetes-client-5.9.0.jar:?]
	at io.fabric8.kubernetes.client.dsl.internal.WatcherWebSocketListener.onMessage(WatcherWebSocketListener.java:93) [kubernetes-client-5.9.0.jar:?]
	at okhttp3.internal.ws.RealWebSocket.onReadMessage(RealWebSocket.java:322) [okhttp-3.14.9.jar:?]
	at okhttp3.internal.ws.WebSocketReader.readMessageFrame(WebSocketReader.java:219) [okhttp-3.14.9.jar:?]
	at okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:105) [okhttp-3.14.9.jar:?]
	at okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:273) [okhttp-3.14.9.jar:?]
	at okhttp3.internal.ws.RealWebSocket$1.onResponse(RealWebSocket.java:209) [okhttp-3.14.9.jar:?]
	at okhttp3.RealCall$AsyncCall.execute(RealCall.java:174) [okhttp-3.14.9.jar:?]
	at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32) [okhttp-3.14.9.jar:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]
	at java.lang.Thread.run(Unknown Source) [?:?]

Additional context

Kubernetes: k3s

@shawkins
Copy link
Contributor

shawkins commented Dec 7, 2021

If no output is provided to watchLog, the inputstream created will get auto-closed

In the closure logic there is a flush - but that flush does not ensure the read side fully consumes the buffer, just that it wakes up if needed. So the subsequent close call then results in the exception you are seeing even with data still left to be read.

I think the code should only close the PipedOutputStream, that will then give you an exception only once you've read all of the data.

This is somewhat related to #3243

@cmdjulian
Copy link
Author

Thank you for the fast reply, is there something I can do right now to work around this? I also read the issue you linked and tried to adapt it for me but unfortunately I couldn't make it work.

@shawkins
Copy link
Contributor

shawkins commented Dec 7, 2021

@cmdjulian I was thinking that you could take responsibility for the pipes:

var out = new PipedOutputStream()
var output = new PipedInputStream()
output.connect(out);
client.batch().jobs().withName("$jobId").watchLog(out)
// now use output instead

Then it won't close the output. However I don't see that the LogWatch logic will call close on out at any point then - which seems like you'd have no good way to know if things failed prematurely. I think this implies we should close the out stream regardless of whether it is passed in or not.

@cmdjulian
Copy link
Author

Yep it's like you said, when using the solution you propose I get an java.io.IOException: Write end dead, however all log lines are then already arrived.
From my understanding this arises because output is not properly closed. I'm therefore with you in closing the output. This should fix it. Do you want me to submit a PR or how do we proceed?

@shawkins
Copy link
Contributor

shawkins commented Dec 7, 2021

Do you want me to submit a PR or how do we proceed?

Yes, if you have the time, please open a pr.

This is what I was thinking locally about simplifying the LogWatchCallback constructor to:

    ...
    if (out == null) {
      this.out = new PipedOutputStream();
      this.output = new PipedInputStream();
      try {
        this.output.connect((PipedOutputStream) this.out);
      } catch (IOException e) {
        throw KubernetesClientException.launderThrowable(e);
      }
    } else {
      this.out = out;
      this.output = null;
    }
    toClose.add(this.out);

@shawkins
Copy link
Contributor

shawkins commented Dec 9, 2021

@cmdjulian let me know if you want me to submit a pr instead.

@rohanKanojia are you okay with closing the passed in OutputStream? At the very least that would need a javadoc and probably called out in the changelog. Or do you want that to be specific to only when out is an instanceof PipedOutputStream?

@cmdjulian
Copy link
Author

Would be nice if you could open the pr @shawkins, thanks

shawkins added a commit to shawkins/kubernetes-client that referenced this issue Dec 10, 2021
the inputstream is not directly closed, as that leads to data loss
the outputstream is closed when it's piped so that the connected stream
sees eof
@shawkins shawkins self-assigned this Dec 10, 2021
@manusa manusa closed this as completed in c1505ba Dec 16, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants