Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Not able to run streaminglens in intellij idea #1

Open
mjose007 opened this issue Apr 30, 2020 · 3 comments
Open

Not able to run streaminglens in intellij idea #1

mjose007 opened this issue Apr 30, 2020 · 3 comments

Comments

@mjose007
Copy link

I am running my code locally in intellij idea with sreaming lens maven dependancy .
I am getting below error . No output , let me know what i am doing wrong here


package com.manu.sstreaming;

import com.qubole.spark.streaminglens.StreamingLens;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQuery;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

import org.apache.spark.sql.streaming.Trigger;
import scala.Predef;
import scala.collection.JavaConversions.*;
import scala.collection.JavaConverters;
import scala.collection.Seq;


/**
 * @author Manu Jose
 * create on : 16/04/20
 */
public class SStreamingNC {
    public static void main(String[] args) throws Exception {


        String host = "localhost";
        int port = 9999;
        //int port = Integer.parseInt(args[0]);


        SparkSession spark = SparkSession
                .builder()
                .appName("JavaStructuredNetworkWordCount")
                .master("local")
                .getOrCreate();


        Map<String, String> options = new HashMap<>();
        options.put("streamingLens.reporter.intervalMinutes", "1");

        scala.collection.immutable.Map<String, String> scalaMap = JavaConverters.mapAsScalaMapConverter(options).asScala().toMap(
                Predef.conforms());
        StreamingLens streamingLens = new StreamingLens(spark, scalaMap);
        streamingLens.registerListeners();




        // Create DataFrame representing the stream of input lines from connection to host:port
        spark.sql("SET spark.sql.streaming.metricsEnabled=true");
        Dataset<Row> lines = spark
                .readStream()
                .format("socket")
                .option("host", host)
                .option("port", port)
                .load();

        // Split the lines into words
        Dataset<String> words = lines.as(Encoders.STRING()).flatMap(
                (FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(),
                Encoders.STRING());

        // Generate running word count
        Dataset<Row> wordCounts = words.groupBy("value").count();


        // Start running the query that prints the running counts to the console
        StreamingQuery query = wordCounts.writeStream()
                .outputMode("update")
                .format("console")
                .queryName("Query_name")
                .trigger(Trigger.ProcessingTime(2 * 1000))
                .start();

       spark.streams().awaitAnyTermination();

    }

}
20/05/01 01:06:10 INFO StateStore: Getting StateStoreCoordinatorRef
20/05/01 01:06:10 INFO StateStore: Retrieved reference to StateStoreCoordinator: org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef@11ddc5d8
20/05/01 01:06:10 INFO StateStore: Env is not null
20/05/01 01:06:10 INFO StateStore: Getting StateStoreCoordinatorRef
20/05/01 01:06:10 INFO StateStore: Retrieved reference to StateStoreCoordinator: org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef@c1c5bf
20/05/01 01:06:10 INFO StateStore: Env is not null
20/05/01 01:06:10 INFO StateStore: Getting StateStoreCoordinatorRef
20/05/01 01:06:10 INFO StateStore: Retrieved reference to StateStoreCoordinator: org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef@62726145

@mjose007
Copy link
Author

mjose007 commented May 3, 2020

Any updates

@abhishekd0907
Copy link
Collaborator

abhishekd0907 commented May 14, 2020

@mjose007

Sorry for the delayed response.

Are you able to see any streaming lens related logs (even errors or failures) in your spark driver logs?

Also, you don't need need to registerListeners explicitly, this line is not needed
streamingLens.registerListeners();

only this should be sufficient
StreamingLens streamingLens = new StreamingLens(spark, scalaMap);

My guess is it would be throwing an exception since you're trying two register the listeners twice based on this code


    try {
      sparkSession.sparkContext.addSparkListener(streamingAppListener)
      logDebug("Successfully registered Spark Listener")
    } catch {
      case e: Exception =>
        throw new SparkException("Error in registering Spark Listener " +
          "Won't report StreamingLens Insights" + e.getMessage)
    }
    try {
      sparkSession.streams.addListener(queryProgressListener)
      logDebug("Successfully registered StreamingQuery Listener")
    } catch {
      case e: Exception =>
        sparkSession.sparkContext.removeSparkListener(streamingAppListener)
        throw new SparkException("Error in registering StreamingQuery Listener " +
          "Won't report StreamingLens Insights" + e.getMessage)
    }

  }

@rpatid10
Copy link

rpatid10 commented Oct 1, 2021

@mjose007 Kindly suggest #5.

For me state is always showing same. NONEWBATCH

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants