Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

[FEATURE REQUEST]: Hyperspace indexing on s3 based data. #359

Open
2 tasks
nikhilsimha opened this issue Feb 16, 2021 · 10 comments
Open
2 tasks

[FEATURE REQUEST]: Hyperspace indexing on s3 based data. #359

nikhilsimha opened this issue Feb 16, 2021 · 10 comments
Labels
enhancement New feature or request untriaged This is the default tag for a newly created issue

Comments

@nikhilsimha
Copy link

Feature requested

As a developer at Airbnb, I want to be able to index over s3 based scan nodes, in order to create hyperspace indexes.

I am not entirely sure that s3 is the reason why I see the following error.

Exception in thread "main" com.microsoft.hyperspace.HyperspaceException: Only creating index over HDFS file based scan nodes is supported.
        at com.microsoft.hyperspace.actions.CreateAction.validate(CreateAction.scala:47)
        at com.microsoft.hyperspace.actions.Action$class.run(Action.scala:88)
        at com.microsoft.hyperspace.actions.CreateAction.run(CreateAction.scala:30)
        at com.microsoft.hyperspace.index.IndexCollectionManager.create(IndexCollectionManager.scala:43)
        at com.microsoft.hyperspace.index.CachingIndexCollectionManager.create(CachingIndexCollectionManager.scala:77)
        at com.microsoft.hyperspace.Hyperspace.createIndex(Hyperspace.scala:42) 

I am not sure because, looking at the code that throws the exception, I don't see anything specific to s3 in the isLogicalRelation function.

    // CreateAction.scala:47
    // We currently only support createIndex() over HDFS file based scan nodes.
    if (!LogicalPlanUtils.isLogicalRelation(df.queryExecution.optimizedPlan)) {
      throw HyperspaceException(
        "Only creating index over HDFS file based scan nodes is supported.")
    }
    
    // the filter itself.
    def isLogicalRelation(logicalPlan: LogicalPlan): Boolean = {
    logicalPlan match {
      case _: LogicalRelation => true
      case _ => false
    }
  }

Acceptance criteria

  • CreateIndex is tested on s3 - I can help
  • spark version support needs to be 2.4
@nikhilsimha nikhilsimha added enhancement New feature or request untriaged This is the default tag for a newly created issue labels Feb 16, 2021
@imback82
Copy link
Contributor

@nikhilsimha Could you share how you constructed the DataFrame that you passed to createIndex?

@nikhilsimha
Copy link
Author

Wow, thanks for the quick reply!
Here are the relevant code snippets:

    import com.microsoft.hyperspace._
    import com.microsoft.hyperspace.index._
    val hyperspace = new Hyperspace(leftDf.sparkSession)

    val (leftTagged, additionalCols) = if (leftDf.schema.names.contains(Constants.TimeColumn)) {
      leftDf.withTimestampBasedPartition(Constants.TimePartitionColumn) ->
        Seq(Constants.TimeColumn, Constants.TimePartitionColumn)
    } else {
      leftDf -> Seq.empty[String]
    }

    if (enableHyperspace) {
      tableUtils.sparkSession.enableHyperspace()
      val leftKeys = joinConf.leftKeyCols ++ Seq(Constants.PartitionColumn) ++ additionalCols
      val leftIndexConf = IndexConfig(
        s"${joinConf.metaData.cleanName}_left_index",
        leftKeys,
        leftTagged.schema.names.filterNot(leftKeys.contains)
      )
      hyperspace.createIndex(leftTagged, leftIndexConf)
    }

The leftDf comes from a simple scanQuery of a hive table(T) based on s3 - select * from table T where ds > 'x' and ds < 'y'.

@imback82
Copy link
Contributor

Oh, you cannot have a filter in the dataframe from which you are trying to create an index. You should be able to create an index if leftDf is simply spark.table("table").

We are tracking to support materialized view here: #186

@nikhilsimha
Copy link
Author

I see. That makes sense. This is not an s3 issue then? If I restructure my code to save the relation to a table and execute the join, it should work?

@imback82
Copy link
Contributor

I don't think this is s3 related issue. As long as you create an index from a relation (no filter, etc.), it should work. Please let me know if you encounter any other issues.

@ashutosh-hs
Copy link

ashutosh-hs commented Feb 19, 2021

I was facing same issue. I changed the df preparation to reading the relation without any filter.
df preparation and index creation steps:

df = spark.table("table")
hs.createIndex(df, IndexConfig("index", ["id"], ["name"]))

Now I am getting following error when I try to run hs.createIndex:

py4j.protocol.Py4JJavaError: An error occurred while calling o71.createIndex.
: java.lang.NullPointerException
	at com.microsoft.hyperspace.index.sources.FileBasedSourceProviderManager.run(FileBasedSourceProviderManager.scala:156)
	at com.microsoft.hyperspace.index.sources.FileBasedSourceProviderManager.signature(FileBasedSourceProviderManager.scala:91)
	at com.microsoft.hyperspace.index.FileBasedSignatureProvider.$anonfun$fingerprintVisitor$1(FileBasedSignatureProvider.scala:53)
	at com.microsoft.hyperspace.index.FileBasedSignatureProvider.$anonfun$fingerprintVisitor$1$adapted(FileBasedSignatureProvider.scala:51)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:177)
	at com.microsoft.hyperspace.index.FileBasedSignatureProvider.fingerprintVisitor(FileBasedSignatureProvider.scala:51)
	at com.microsoft.hyperspace.index.FileBasedSignatureProvider.signature(FileBasedSignatureProvider.scala:40)
	at com.microsoft.hyperspace.index.IndexSignatureProvider.signature(IndexSignatureProvider.scala:45)
	at com.microsoft.hyperspace.actions.CreateActionBase.getIndexLogEntry(CreateActionBase.scala:64)
	at com.microsoft.hyperspace.actions.CreateAction.logEntry(CreateAction.scala:38)
	at com.microsoft.hyperspace.actions.Action.begin(Action.scala:50)
	at com.microsoft.hyperspace.actions.Action.run(Action.scala:90)
	at com.microsoft.hyperspace.actions.Action.run$(Action.scala:83)
	at com.microsoft.hyperspace.actions.CreateAction.run(CreateAction.scala:30)
	at com.microsoft.hyperspace.index.IndexCollectionManager.create(IndexCollectionManager.scala:47)
	at com.microsoft.hyperspace.index.CachingIndexCollectionManager.create(CachingIndexCollectionManager.scala:78)
	at com.microsoft.hyperspace.Hyperspace.createIndex(Hyperspace.scala:43)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:564)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:832)

I am using :
python: 3.8
spark 3.0.1

@imback82
Copy link
Contributor

@ashutosh-hs Hyperspace doesn't officially support Spark 3 yet (#85). Do you see the same error in Spark 2.4?

In any case, could you run df.explain and copy/paste the output here? (I am thinking depending on the catalog, the relation may be an unsupported one).

@ashutosh-hs
Copy link

ashutosh-hs commented Feb 23, 2021

@imback82 I couldn't test it with Spark 2.4 yet. Will post it here when I have tested that.
following is the output of df.explain(extended=True):

+-Relation[resourceType#395,id#396,meta#397,implicitRules#398,language#399,text#400,contained#401,extension#402,modifierExtension#403,identifier#404,active#405,name#406,telecom#407,gender#408,birthDate#409,deceasedBoolean#410,deceasedDatetime#411,address#412,maritalStatus#413,multipleBirthBoolean#414,multipleBirthInteger#415,photo#416,contact#417,communication#418,... 7 more fields] parquet

== Optimized Logical Plan ==
Relation[resourceType#395,id#396,meta#397,implicitRules#398,language#399,text#400,contained#401,extension#402,modifierExtension#403,identifier#404,active#405,name#406,telecom#407,gender#408,birthDate#409,deceasedBoolean#410,deceasedDatetime#411,address#412,maritalStatus#413,multipleBirthBoolean#414,multipleBirthInteger#415,photo#416,contact#417,communication#418,... 7 more fields] parquet

== Physical Plan ==
FileScan parquet default.patient[resourceType#395,id#396,meta#397,implicitRules#398,language#399,text#400,contained#401,extension#402,modifierExtension#403,identifier#404,active#405,name#406,telecom#407,gender#408,birthDate#409,deceasedBoolean#410,deceasedDatetime#411,address#412,maritalStatus#413,multipleBirthBoolean#414,multipleBirthInteger#415,photo#416,contact#417,communication#418,... 7 more fields] Batched: false, DataFilters: [], Format: Parquet, Location: TahoeLogFileIndex[s3a://demo1/warehouse/patient], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<resourceType:string,id:string,meta:struct<id:string,extension:array<string>,versionId:stri...
>>> df.explain('cost')
== Optimized Logical Plan ==
Relation[resourceType#395,id#396,meta#397,implicitRules#398,language#399,text#400,contained#401,extension#402,modifierExtension#403,identifier#404,active#405,name#406,telecom#407,gender#408,birthDate#409,deceasedBoolean#410,deceasedDatetime#411,address#412,maritalStatus#413,multipleBirthBoolean#414,multipleBirthInteger#415,photo#416,contact#417,communication#418,... 7 more fields] parquet, Statistics(sizeInBytes=76.2 MiB)

== Physical Plan ==
FileScan parquet default.patient[resourceType#395,id#396,meta#397,implicitRules#398,language#399,text#400,contained#401,extension#402,modifierExtension#403,identifier#404,active#405,name#406,telecom#407,gender#408,birthDate#409,deceasedBoolean#410,deceasedDatetime#411,address#412,maritalStatus#413,multipleBirthBoolean#414,multipleBirthInteger#415,photo#416,contact#417,communication#418,... 7 more fields] Batched: false, DataFilters: [], Format: Parquet, Location: TahoeLogFileIndex[s3a://demo1/warehouse/patient], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<resourceType:string,id:string,meta:struct<id:string,extension:array<string>,versionId:stri...

I am using :
python: 3.8
spark 3.0.1
hyperspace: 0.4

@dai-chen
Copy link

@imback82 Did you figure out the solution? I'm having same error even if the DataFrame is created from tableName directly. I'm using latest code with Spark 3.1.2.

@edjones84
Copy link

@imback82 Did you figure out the solution? I'm having same error even if the DataFrame is created from tableName directly. I'm using latest code with Spark 3.1.2.

Yes I am having the same issue using Spark 3.1.1 - Just wondering if this is a spark version problem and there is any solution to this?

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
enhancement New feature or request untriaged This is the default tag for a newly created issue
Projects
None yet
Development

No branches or pull requests

5 participants