diff --git a/README.md b/README.md index 9543148b53..1877a96401 100755 --- a/README.md +++ b/README.md @@ -1,7 +1,29 @@ -## What is TiSpark? +# TiSpark +[![Maven Central](https://maven-badges.herokuapp.com/maven-central/com.pingcap.tispark/tispark-core/badge.svg)](https://maven-badges.herokuapp.com/maven-central/com.pingcap.tispark/tispark-core) +[![Javadocs](http://javadoc.io/badge/com.pingcap.tispark/tispark-core.svg)](http://javadoc.io/doc/com.pingcap.tispark/tispark-core) +[![License](https://img.shields.io/github/license/pingcap/tispark.svg)](https://github.com/pingcap/tispark/blob/master/LICENSE) TiSpark is a thin layer built for running Apache Spark on top of TiDB/TiKV to answer the complex OLAP queries. It takes advantages of both the Spark platform and the distributed TiKV cluster, at the same time, seamlessly glues to TiDB, the distributed OLTP database, to provide a Hybrid Transactional/Analytical Processing (HTAP) to serve as a one-stop solution for online transactions and analysis. +## Getting TiSpark +The current stable version is 1.0.1. + +If you are using maven, add the following to your pom.xml: +```xml + + com.pingcap.tispark + tispark-core + 1.0.1 + +``` + +If you're using SBT, add the following line to your build file: +```scala +libraryDependencies += "com.pingcap.tispark" % "tispark-core" % "1.0.1" +``` + +For other build tools, you can visit search.maven.org and search with GroupId [![Maven Search](https://img.shields.io/badge/com.pingcap-tikv/tispark-green.svg)](http://search.maven.org/#search%7Cga%7C1%7Cpingcap)(This search will also list all available modules of TiSpark including tikv-client). + ## TiSpark Architecture ![architecture](./docs/architecture.png) @@ -17,17 +39,17 @@ TiSpark is a thin layer built for running Apache Spark on top of TiDB/TiKV to an TiSpark depends on the existence of TiKV clusters and PDs. It also needs to setup and use Spark clustering platform. -A thin layer of TiSpark. Most of the logic is inside tikv-java-client library. -https://github.com/pingcap/tikv-client-lib-java +A thin layer of TiSpark. Most of the logic is inside tikv-client library. +https://github.com/pingcap/tispark/tree/master/tikv-client -Uses as below +## Quick Start +From Spark-shell: ``` ./bin/spark-shell --jars /wherever-it-is/tispark-${version}-jar-with-dependencies.jar ``` ``` - import org.apache.spark.sql.TiContext val ti = new TiContext(spark) @@ -93,6 +115,7 @@ Below configurations can be put together with spark-defaults.conf or passed in t | spark.tispark.plan.downgrade.index_threshold | 10000 | If index scan ranges on one region exceeds this limit in original request, downgrade this region's request to table scan rather than original planned index scan | | spark.tispark.type.unsupported_mysql_types | "time,enum,set,year,json" | A comma separated list of mysql types TiSpark does not support currently, refer to `Unsupported MySQL Type List` below | | spark.tispark.request.timezone.offset | Local Timezone offset | An integer, represents timezone offset to UTC time(like 28800, GMT+8), this value will be added to requests issued to TiKV | +| spark.tispark.show_rowid | Show implicit row Id | If to show implicit row Id if exists | ## Unsupported MySQL Type List diff --git a/core/pom.xml b/core/pom.xml index 9b5f457678..1d4f41d96a 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -17,7 +17,7 @@ 3.0.4 2.3.0 - 5.1.18 + 5.1.44 2.6.8 @@ -104,6 +104,12 @@ testCompile + + attach-javadocs + + doc-jar + + ${scala.version} @@ -173,11 +179,10 @@ org.apache.maven.plugins maven-source-plugin - 2.1.2 + 3.0.1 attach-sources - deploy jar-no-fork @@ -194,13 +199,38 @@ org.apache.maven.plugins maven-javadoc-plugin - 2.8 + 2.9.1 - UTF-8 - UTF-8 - UTF-8 - zh_CN + ${javadoc.skip} + + + attach-javadocs + + jar + + + -Xdoclint:none + + + + + + + org.apache.maven.plugins + maven-gpg-plugin + 1.5 + + ${gpg.skip} + + + + sign-artifacts + + sign + + + @@ -239,7 +269,7 @@ org.apache.maven.plugins maven-surefire-plugin - 2.7 + 2.22.0 true @@ -252,6 +282,7 @@ ${project.build.directory}/surefire-reports . WDF TestSuite.txt + -Dfile.encoding=UTF-8 diff --git a/core/scripts/tispark-sql b/core/scripts/tispark-sql index 89dddcf13f..dd80f64584 100755 --- a/core/scripts/tispark-sql +++ b/core/scripts/tispark-sql @@ -17,7 +17,7 @@ # limitations under the License. # -TISPARK_JAR=tispark-1.0.1-jar-with-dependencies.jar +TISPARK_JAR=tispark-core-1.0.1-jar-with-dependencies.jar if [ -z "${SPARK_HOME}" ]; then source "$(dirname "$0")"/find-spark-home diff --git a/core/src/main/scala/com/pingcap/tispark/TiConfigConst.scala b/core/src/main/scala/com/pingcap/tispark/TiConfigConst.scala index 649adf5f4b..8f498defa6 100644 --- a/core/src/main/scala/com/pingcap/tispark/TiConfigConst.scala +++ b/core/src/main/scala/com/pingcap/tispark/TiConfigConst.scala @@ -38,4 +38,5 @@ object TiConfigConst { val UNSUPPORTED_TYPES: String = "spark.tispark.type.unsupported_mysql_types" val ENABLE_AUTO_LOAD_STATISTICS: String = "spark.tispark.statistics.auto_load" val CACHE_EXPIRE_AFTER_ACCESS: String = "spark.tispark.statistics.expire_after_access" + val SHOW_ROWID: String = "spark.tispark.show_rowid" } diff --git a/core/src/main/scala/com/pingcap/tispark/TiUtils.scala b/core/src/main/scala/com/pingcap/tispark/TiUtils.scala index 777ec24f0a..acceec2fba 100644 --- a/core/src/main/scala/com/pingcap/tispark/TiUtils.scala +++ b/core/src/main/scala/com/pingcap/tispark/TiUtils.scala @@ -143,6 +143,7 @@ object TiUtils { case _: EnumType => sql.types.LongType case _: SetType => sql.types.LongType case _: YearType => sql.types.LongType + case _: JsonType => sql.types.StringType } } @@ -214,6 +215,10 @@ object TiUtils { val priority = CommandPri.valueOf(conf.get(TiConfigConst.REQUEST_COMMAND_PRIORITY)) tiConf.setCommandPriority(priority) } + + if (conf.contains(TiConfigConst.SHOW_ROWID)) { + tiConf.setShowRowId(conf.get(TiConfigConst.SHOW_ROWID).toBoolean) + } tiConf } diff --git a/core/src/main/scala/org/apache/spark/sql/TiContext.scala b/core/src/main/scala/org/apache/spark/sql/TiContext.scala index 3a3dd07a48..c1e3fc811b 100644 --- a/core/src/main/scala/org/apache/spark/sql/TiContext.scala +++ b/core/src/main/scala/org/apache/spark/sql/TiContext.scala @@ -129,15 +129,25 @@ class TiContext(val session: SparkSession) extends Serializable with Logging { } } - // tidbMapTable does not do any check any meta information - // it just register table for later use - def tidbMapTable(dbName: String, tableName: String): Unit = { + def getDataFrame(dbName: String, tableName: String): DataFrame = { val tiRelation = new TiDBRelation( tiSession, new TiTableReference(dbName, tableName), meta )(sqlContext) - sqlContext.baseRelationToDataFrame(tiRelation).createTempView(tableName) + sqlContext.baseRelationToDataFrame(tiRelation) + } + + // tidbMapTable does not do any check any meta information + // it just register table for later use + def tidbMapTable(dbName: String, tableName: String): DataFrame = { + val df = getDataFrame(dbName, tableName) + df.createOrReplaceTempView(tableName) + df + } + + def tidbMapDatabase(dbName: String, dbNameAsPrefix: Boolean): Unit = { + tidbMapDatabase(dbName, dbNameAsPrefix, autoLoad) } def tidbMapDatabase(dbName: String, diff --git a/core/src/main/scala/org/apache/spark/sql/TiStrategy.scala b/core/src/main/scala/org/apache/spark/sql/TiStrategy.scala index 0a32b9501f..923e5b67c4 100644 --- a/core/src/main/scala/org/apache/spark/sql/TiStrategy.scala +++ b/core/src/main/scala/org/apache/spark/sql/TiStrategy.scala @@ -61,7 +61,7 @@ class TiStrategy(context: SQLContext) extends Strategy with Logging { def typeBlackList: TypeBlacklist = { val blacklistString = - sqlConf.getConfString(TiConfigConst.UNSUPPORTED_TYPES, "time,enum,set,year,json") + sqlConf.getConfString(TiConfigConst.UNSUPPORTED_TYPES, "time,enum,set,year") new TypeBlacklist(blacklistString) } diff --git a/core/src/main/scala/org/apache/spark/sql/execution/CoprocessorRDD.scala b/core/src/main/scala/org/apache/spark/sql/execution/CoprocessorRDD.scala index 598bd42256..5b36de526f 100644 --- a/core/src/main/scala/org/apache/spark/sql/execution/CoprocessorRDD.scala +++ b/core/src/main/scala/org/apache/spark/sql/execution/CoprocessorRDD.scala @@ -182,6 +182,10 @@ case class RegionTaskExec(child: SparkPlan, mutableRow } + def isTaskRangeSizeInvalid(task: RegionTask): Boolean = + task == null || + task.getRanges.size() > tiConf.getMaxRequestKeyRangeSize + override protected def doExecute(): RDD[InternalRow] = { val numOutputRows = longMetric("numOutputRows") val numHandles = longMetric("numHandles") @@ -218,41 +222,17 @@ case class RegionTaskExec(child: SparkPlan, new ExecutorCompletionService[util.Iterator[TiRow]](session.getThreadPoolForIndexScan) var rowIterator: util.Iterator[TiRow] = null - /** - * Checks whether the tasks are valid. - * - * Currently we only check whether the task list contains only one [[RegionTask]], - * since in each partition, handles received are from the same region. - * - * @param tasks tasks to examine - */ - def proceedTasksOrThrow(tasks: Seq[RegionTask]): Seq[RegionTask] = { - if (tasks.lengthCompare(1) != 0) { - throw new RuntimeException(s"Unexpected region task size:${tasks.size}, expecting 1") - } - tasks - } - // After `splitAndSortHandlesByRegion`, ranges in the task are arranged in order // TODO: Maybe we can optimize splitAndSortHandlesByRegion if we are sure the handles are in same region? - val indexTasks = proceedTasksOrThrow( + val indexTasks = RangeSplitter .newSplitter(session.getRegionManager) .splitAndSortHandlesByRegion( dagRequest.getTableInfo.getId, new TLongArrayList(handles) ) - ) - val indexTaskRanges = indexTasks.head.getRanges - def feedBatch(): TLongArrayList = { - val handles = new array.TLongArrayList(512) - while (handleIterator.hasNext && - handles.size() < batchSize) { - handles.add(handleIterator.next()) - } - handles - } + val indexTaskRanges = indexTasks.flatMap { _.getRanges } /** * Checks whether the number of handle ranges retrieved from TiKV exceeds the `downgradeThreshold` after handle merge. @@ -263,51 +243,52 @@ case class RegionTaskExec(child: SparkPlan, indexTaskRanges.size() > downgradeThreshold } + def submitTasks(tasks: List[RegionTask], dagRequest: TiDAGRequest): Unit = { + taskCount += 1 + val task = new Callable[util.Iterator[TiRow]] { + override def call(): util.Iterator[TiRow] = + CoprocessIterator.getRowIterator(dagRequest, tasks, session) + } + completionService.submit(task) + } + /** * If one task's ranges list exceeds some threshold, we split it into two sub tasks and * each has half of the original ranges. * - * @param tasks task list to examine + * @param task task to examine * @return split task list */ - def splitTasks(tasks: Seq[RegionTask]): mutable.Seq[RegionTask] = { + def splitTasks(task: RegionTask): mutable.Seq[RegionTask] = { val finalTasks = mutable.ListBuffer[RegionTask]() val queue = mutable.Queue[RegionTask]() - for (task <- tasks) { - queue += task - while (queue.nonEmpty) { - val front = queue.dequeue - if (isTaskRangeSizeInvalid(front)) { - // use (size + 1) / 2 here rather than size / 2 - // to avoid extra single task generated by odd list - front.getRanges - .grouped((task.getRanges.size() + 1) / 2) - .foreach(range => { - queue += RegionTask.newInstance(task.getRegion, task.getStore, range) - }) - } else { - // add all ranges satisfying task range size to final task list - finalTasks += front - } + queue += task + while (queue.nonEmpty) { + val front = queue.dequeue + if (isTaskRangeSizeInvalid(front)) { + // use (size + 1) / 2 here rather than size / 2 + // to avoid extra single task generated by odd list + front.getRanges + .grouped((front.getRanges.size() + 1) / 2) + .foreach(range => { + queue += RegionTask.newInstance(front.getRegion, front.getStore, range) + }) + } else { + // add all ranges satisfying task range size to final task list + finalTasks += front } } - logger.info(s"Split ${tasks.size} tasks into ${finalTasks.size} tasks.") + logger.info(s"Split $task into ${finalTasks.size} tasks.") finalTasks } - def isTaskRangeSizeInvalid(task: RegionTask): Boolean = { - task == null || - task.getRanges.size() > tiConf.getMaxRequestKeyRangeSize - } - - def submitTasks(tasks: List[RegionTask], dagRequest: TiDAGRequest): Unit = { - taskCount += 1 - val task = new Callable[util.Iterator[TiRow]] { - override def call(): util.Iterator[TiRow] = { - CoprocessIterator.getRowIterator(dagRequest, tasks, session) - } + def feedBatch(): TLongArrayList = { + val handles = new array.TLongArrayList(512) + while (handleIterator.hasNext && + handles.size() < batchSize) { + handles.add(handleIterator.next()) } - completionService.submit(task) + handles } def doIndexScan(): Unit = { @@ -317,30 +298,34 @@ case class RegionTaskExec(child: SparkPlan, logger.info("Single batch handles size:" + handleList.size()) // After `splitAndSortHandlesByRegion`, ranges in the task are arranged in order // TODO: Maybe we can optimize splitAndSortHandlesByRegion if we are sure the handles are in same region? - val task = proceedTasksOrThrow( + val indexTasks = RangeSplitter .newSplitter(session.getRegionManager) .splitAndSortHandlesByRegion( dagRequest.getTableInfo.getId, new TLongArrayList(handleList) ) - ) - val taskRange = task.head.getRanges - val tasks = splitTasks(task) - numIndexScanTasks += tasks.size - - logger.info(s"Single batch RegionTask size:${tasks.size}") - tasks.foreach(task => { - logger.info( - s"Single batch RegionTask={Host:${task.getHost}," + - s"Region:${task.getRegion}," + - s"Store:{id=${task.getStore.getId},address=${task.getStore.getAddress}}, " + - s"RangesListSize:${task.getRanges.size}}" - ) - }) - - submitTasks(tasks.toList, dagRequest) - numIndexRangesScanned += taskRange.size + + indexTasks.foreach { task => + val taskRange = task.getRanges + val tasks = splitTasks(task) + numIndexScanTasks += tasks.size + + if (logger.isDebugEnabled) { + logger.debug(s"Single batch RegionTask size:${tasks.size}") + tasks.foreach(task => { + logger.debug( + s"Single batch RegionTask={Host:${task.getHost}," + + s"Region:${task.getRegion}," + + s"Store:{id=${task.getStore.getId},address=${task.getStore.getAddress}}, " + + s"RangesListSize:${task.getRanges.size}}" + ) + }) + } + + submitTasks(tasks.toList, dagRequest) + numIndexRangesScanned += taskRange.size + } } } @@ -354,7 +339,7 @@ case class RegionTaskExec(child: SparkPlan, def doDowngradeScan(taskRanges: List[KeyRange]): Unit = { // Restore original filters to perform downgraded table scan logic // TODO: Maybe we can optimize splitRangeByRegion if we are sure the key ranges are in the same region? - val downgradeTasks = proceedTasksOrThrow( + val downgradeTasks = try { RangeSplitter .newSplitter(session.getRegionManager) @@ -368,23 +353,23 @@ case class RegionTaskExec(child: SparkPlan, .newSplitter(session.getRegionManager) .splitRangeByRegion(KeyRangeUtils.mergeSortedRanges(taskRanges)) } - ) - val task = downgradeTasks.head - val downgradeTaskRanges = task.getRanges - logger.info( - s"Merged ${taskRanges.size} index ranges to ${downgradeTaskRanges.size} ranges." - ) - logger.info( - s"Unary task downgraded, task info:Host={${task.getHost}}, " + - s"RegionId={${task.getRegion.getId}}, " + - s"Store={id=${task.getStore.getId},addr=${task.getStore.getAddress}}, " + - s"RangesListSize=${downgradeTaskRanges.size}}" - ) - numDowngradedTasks += 1 - numDowngradeRangesScanned += downgradeTaskRanges.size + downgradeTasks.foreach { task => + val downgradeTaskRanges = task.getRanges + logger.info( + s"Merged ${taskRanges.size} index ranges to ${downgradeTaskRanges.size} ranges." + ) + logger.info( + s"Unary task downgraded, task info:Host={${task.getHost}}, " + + s"RegionId={${task.getRegion.getId}}, " + + s"Store={id=${task.getStore.getId},addr=${task.getStore.getAddress}}, " + + s"RangesListSize=${downgradeTaskRanges.size}}" + ) + numDowngradedTasks += 1 + numDowngradeRangesScanned += downgradeTaskRanges.size - submitTasks(downgradeTasks.toList, downgradeDagRequest) + submitTasks(downgradeTasks.toList, downgradeDagRequest) + } } val schemaInferrer: SchemaInfer = if (satisfyDowngradeThreshold) { diff --git a/core/src/main/scala/org/apache/spark/sql/hive/TiSessionCatalog.scala b/core/src/main/scala/org/apache/spark/sql/hive/TiSessionCatalog.scala index 260f8167e8..f11493cf0a 100644 --- a/core/src/main/scala/org/apache/spark/sql/hive/TiSessionCatalog.scala +++ b/core/src/main/scala/org/apache/spark/sql/hive/TiSessionCatalog.scala @@ -145,8 +145,8 @@ class TiSessionCatalog(externalCatalog: HiveExternalCatalog, TableIdentifier(name) } } - meta.getTables(database.get).map { db => - TableIdentifier(db.getName, Option(db.getName)) + meta.getTables(database.get).map { table => + TableIdentifier(table.getName, Option(dbName)) } ++ localTempViews } else { super.listTables(db, pattern) diff --git a/core/src/main/scala/org/apache/spark/sql/hive/thriftserver/TiSparkSQLEnv.scala b/core/src/main/scala/org/apache/spark/sql/hive/thriftserver/TiSparkSQLEnv.scala index cd49d59054..2f1787d816 100644 --- a/core/src/main/scala/org/apache/spark/sql/hive/thriftserver/TiSparkSQLEnv.scala +++ b/core/src/main/scala/org/apache/spark/sql/hive/thriftserver/TiSparkSQLEnv.scala @@ -47,7 +47,8 @@ private[hive] object TiSparkSQLEnv extends Logging { ) // Injection point for TiSparkSession - val sparkSession = TiSparkSession.builder + val sparkSession = TiSparkSession + .builder() .config(sparkConf) .enableHiveSupport() .getOrCreate() diff --git a/core/src/test/resources/prefix-index/PrefixTest.sql b/core/src/test/resources/prefix-index/PrefixTest.sql new file mode 100644 index 0000000000..571238685d --- /dev/null +++ b/core/src/test/resources/prefix-index/PrefixTest.sql @@ -0,0 +1,11 @@ +DROP TABLE IF EXISTS `prefix`; +CREATE TABLE `prefix` ( + `a` int(11) NOT NULL, + `b` varchar(55) DEFAULT NULL, + `c` int(11) DEFAULT NULL, + PRIMARY KEY (`a`), + KEY `prefix_index` (`b`(2)), + KEY `prefix_complex` (`a`, `b`(2)) +) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin; +INSERT INTO `prefix` VALUES(0, "b", 2), (1, "bbb", 3), (2, "bbc", 4), (3, "bbb", 5), (4, "abc", 6), (5, "abc", 7), (6, "abc", 7), (7, "ÿÿ", 8), (8, "ÿÿ0", 9), (9, "ÿÿÿ", 10); +ANALYZE TABLE `prefix`; \ No newline at end of file diff --git a/core/src/test/resources/prefix-index/UTF8Test.sql b/core/src/test/resources/prefix-index/UTF8Test.sql new file mode 100644 index 0000000000..310f24487c --- /dev/null +++ b/core/src/test/resources/prefix-index/UTF8Test.sql @@ -0,0 +1,6 @@ +DROP TABLE IF EXISTS `t1`; +CREATE TABLE `t1` ( + `name` varchar(12) DEFAULT NULL, + KEY `pname` (`name`(12)) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci; +insert into t1 values('借款策略集_网页'); \ No newline at end of file diff --git a/core/src/test/resources/tidb_config.properties.template b/core/src/test/resources/tidb_config.properties.template index 8783450546..69b48d449f 100644 --- a/core/src/test/resources/tidb_config.properties.template +++ b/core/src/test/resources/tidb_config.properties.template @@ -4,6 +4,8 @@ tidb.addr=127.0.0.1 tidb.port=4000 # TiDB login user tidb.user=root +# TiDB login password +tidb.password= # TPCH database name, if you already have a tpch database in TiDB, specify the db name so that TPCH tests will run on this database tpch.db=tpch_test # Placement Driver address:port diff --git a/core/src/test/scala/org/apache/spark/sql/BaseTiSparkSuite.scala b/core/src/test/scala/org/apache/spark/sql/BaseTiSparkSuite.scala index 0254a64403..371de4a81d 100644 --- a/core/src/test/scala/org/apache/spark/sql/BaseTiSparkSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/BaseTiSparkSuite.scala @@ -287,16 +287,18 @@ class BaseTiSparkSuite extends QueryTest with SharedSQLContext { if (skipTiDB || !compResult(r1, r3, isOrdered)) { fail( s"""Failed with - |TiSpark:\t\t${mapStringNestedList(r1)} - |Spark With JDBC:${mapStringNestedList(r2)} - |TiDB:\t\t\t${mapStringNestedList(r3)}""".stripMargin + |TiSpark:\t\t${listToString(r1)} + |Spark With JDBC:${listToString(r2)} + |TiDB:\t\t\t${listToString(r3)}""".stripMargin ) } } } - private def mapStringNestedList(result: List[List[Any]]): String = - if (result == null) "null" else result.map(mapStringList).mkString(",") + private def listToString(result: List[List[Any]]): String = + if (result == null) s"[len: null] = null" + else if (result.isEmpty) s"[len: 0] = Empty" + else s"[len: ${result.length}] = ${result.map(mapStringList).mkString(",")}" private def mapStringList(result: List[Any]): String = if (result == null) "null" else "List(" + result.map(mapString).mkString(",") + ")" diff --git a/core/src/test/scala/org/apache/spark/sql/IssueTestSuite.scala b/core/src/test/scala/org/apache/spark/sql/IssueTestSuite.scala index b3a4fb601b..08b24411e7 100644 --- a/core/src/test/scala/org/apache/spark/sql/IssueTestSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/IssueTestSuite.scala @@ -167,7 +167,68 @@ class IssueTestSuite extends BaseTiSparkSuite { judge("select count(c1 + c2) from t") } - override def afterAll(): Unit = { + test("json support") { + tidbStmt.execute("drop table if exists t") + tidbStmt.execute("create table t(json_doc json)") + tidbStmt.execute( + """insert into t values ('null'), + ('true'), + ('false'), + ('0'), + ('1'), + ('-1'), + ('2147483647'), + ('-2147483648'), + ('9223372036854775807'), + ('-9223372036854775808'), + ('0.5'), + ('-0.5'), + ('""'), + ('"a"'), + ('"\\t"'), + ('"\\n"'), + ('"\\""'), + ('"\\u0001"'), + ('[]'), + ('"中文"'), + (JSON_ARRAY(null, false, true, 0, 0.5, "hello", JSON_ARRAY("nested_array"), JSON_OBJECT("nested", "object"))), + (JSON_OBJECT("a", null, "b", true, "c", false, "d", 0, "e", 0.5, "f", "hello", "nested_array", JSON_ARRAY(1, 2, 3), "nested_object", JSON_OBJECT("hello", 1)))""" + ) + refreshConnections() + + runTest( + "select json_doc from t", + skipJDBC = true, + rTiDB = List( + List("null"), + List(true), + List(false), + List(0), + List(1), + List(-1), + List(2147483647), + List(-2147483648), + List(9223372036854775807L), + List(-9223372036854775808L), + List(0.5), + List(-0.5), + List("\"\""), + List("\"a\""), + List("\"\\t\""), + List("\"\\n\""), + List("\"\\\"\""), + List("\"\\u0001\""), + List("[]"), + List("\"中文\""), + List("[null,false,true,0,0.5,\"hello\",[\"nested_array\"],{\"nested\":\"object\"}]"), + List( + "{\"a\":null,\"b\":true,\"c\":false,\"d\":0,\"e\":0.5,\"f\":\"hello\",\"nested_array\":[1,2,3],\"nested_object\":{\"hello\":1}}" + ) + ) + ) + } + + override def afterAll(): Unit = try { tidbStmt.execute("drop table if exists t") tidbStmt.execute("drop table if exists tmp_debug") @@ -177,5 +238,4 @@ class IssueTestSuite extends BaseTiSparkSuite { } finally { super.afterAll() } - } } diff --git a/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/core/src/test/scala/org/apache/spark/sql/QueryTest.scala index 22d1090b5f..5bd02929c7 100644 --- a/core/src/test/scala/org/apache/spark/sql/QueryTest.scala +++ b/core/src/test/scala/org/apache/spark/sql/QueryTest.scala @@ -50,6 +50,7 @@ abstract class QueryTest extends PlanTest { case d: java.math.BigDecimal => d.doubleValue() case d: BigDecimal => d.bigDecimal.doubleValue() case d: Number => d.doubleValue() + case d: String => BigDecimal(d).doubleValue() case _ => 0.0 } @@ -62,8 +63,21 @@ abstract class QueryTest extends PlanTest { new SimpleDateFormat("yy-MM-dd HH:mm:ss").format(value) } - def compValue(lhs: Any, rhs: Any): Boolean = { - if (lhs == rhs || lhs.toString == rhs.toString) { + def compNull(l: Any, r: Any): Boolean = { + if (l == null) { + if (r == null || r.toString.equalsIgnoreCase("null")) { + return true + } + } else if (l.toString.equalsIgnoreCase("null")) { + if (r == null || r.toString.equalsIgnoreCase("null")) { + return true + } + } + false + } + + def compValue(lhs: Any, rhs: Any): Boolean = + if (lhs == rhs || compNull(lhs, rhs) || lhs.toString == rhs.toString) { true } else lhs match { @@ -91,13 +105,14 @@ abstract class QueryTest extends PlanTest { case _ => false } - } def compRow(lhs: List[Any], rhs: List[Any]): Boolean = { if (lhs == null && rhs == null) { true } else if (lhs == null || rhs == null) { false + } else if (lhs.length != rhs.length) { + false } else { !lhs.zipWithIndex.exists { case (value, i) => !compValue(value, rhs(i)) @@ -113,13 +128,22 @@ abstract class QueryTest extends PlanTest { if (lhs != null && rhs != null) { try { - if (!isOrdered) { + if (lhs.length != rhs.length) { + false + } else if (!isOrdered) { comp( lhs.sortWith((_1, _2) => _1.mkString("").compare(_2.mkString("")) < 0), rhs.sortWith((_1, _2) => _1.mkString("").compare(_2.mkString("")) < 0) ) } else { - comp(lhs, rhs) + implicit object NullableListOrdering extends Ordering[List[Any]] { + override def compare(p1: List[Any], p2: List[Any]): Int = + p1.contains(null).compareTo(p2.contains(null)) + } + comp( + lhs.sortBy[List[Any]](x => x), + rhs.sortBy[List[Any]](x => x) + ) } } catch { // TODO:Remove this temporary exception handling diff --git a/core/src/test/scala/org/apache/spark/sql/expression/index/PrefixIndexTestSuite.scala b/core/src/test/scala/org/apache/spark/sql/expression/index/PrefixIndexTestSuite.scala new file mode 100644 index 0000000000..893461bc01 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/sql/expression/index/PrefixIndexTestSuite.scala @@ -0,0 +1,83 @@ +/* + * Copyright 2017 PingCAP, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.expression.index + +import java.nio.charset.Charset + +import org.apache.spark.sql.BaseTiSparkSuite +import org.apache.spark.sql.catalyst.util.resourceToString + +class PrefixIndexTestSuite extends BaseTiSparkSuite { + // https://github.com/pingcap/tispark/issues/272 + test("Prefix index read does not work correctly") { + tidbStmt.execute( + resourceToString( + s"prefix-index/PrefixTest.sql", + classLoader = Thread.currentThread().getContextClassLoader + ) + ) + refreshConnections() + // add explain to show if we have actually used prefix index in plan + explainAndRunTest("select a, b from prefix where b < \"bbc\"") + explainAndRunTest("select a, b from prefix where a = 1 and b = \"bbb\"") + explainAndRunTest("select b from prefix where b = \"bbc\"") + explainAndRunTest("select b from prefix where b != \"bbc\"") + explainAndRunTest("select b from prefix where b >= \"bbc\" and b < \"bbd\"") + // FIXME: following test results in INDEX range [bb, bb] and TABLE range (-INF, bbc), while the table range should have been [bb, bb] + // FYI, the predicate is [[b] LESS_THAN "bbc"], Not(IsNull([b])), [[b] EQUAL "bb"] + explainAndRunTest("select c, b from prefix where b = \"bb\" and b < \"bbc\"") + println(Charset.defaultCharset()) + explainAndRunTest( + "select c, b from prefix where b > \"ÿ\" and b < \"ÿÿc\"", + skipJDBC = true, + rTiDB = List(List(8, "ÿÿ"), List(9, "ÿÿ0")) + ) + // add LIKE tests for prefix index + explainAndRunTest("select a, b from prefix where b LIKE 'b%'") + explainAndRunTest("select a, b from prefix where b LIKE 'ab%'") + explainAndRunTest( + "select a, b from prefix where b LIKE 'ÿÿ%'", + skipJDBC = true, + rTiDB = List(List(7, "ÿÿ"), List(8, "ÿÿ0"), List(9, "ÿÿÿ")) + ) + explainAndRunTest("select a, b from prefix where b LIKE 'b%b'") + explainAndRunTest("select a, b from prefix where b LIKE 'ÿ%'", skipJDBC = true) + explainAndRunTest("select a, b from prefix where b LIKE '%b'") + explainAndRunTest("select a, b from prefix where b LIKE '%'") + } + + // https://github.com/pingcap/tispark/issues/397 + test("Prefix index implementation for utf8 string is incorrect") { + tidbStmt.execute( + resourceToString( + s"prefix-index/UTF8Test.sql", + classLoader = Thread.currentThread().getContextClassLoader + ) + ) + refreshConnections() + + spark.sql("select * from t1").show + runTest("select * from t1 where name = '借款策略集_网页'", skipJDBC = true) + } + + override def afterAll(): Unit = + try { + tidbStmt.execute("DROP TABLE IF EXISTS `prefix`") + tidbStmt.execute("DROP TABLE IF EXISTS `t1`") + } finally { + super.afterAll() + } +} diff --git a/core/src/test/scala/org/apache/spark/sql/expression/index/UnsignedTestSuite.scala b/core/src/test/scala/org/apache/spark/sql/expression/index/UnsignedTestSuite.scala index b1439ca278..30ac5b0a0a 100644 --- a/core/src/test/scala/org/apache/spark/sql/expression/index/UnsignedTestSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/expression/index/UnsignedTestSuite.scala @@ -71,7 +71,7 @@ class UnsignedTestSuite extends BaseTiSparkSuite { List(3, 5, 7), List(unsignedLongMaxValue, unsignedLongMaxValue, LongMaxValue) ), - List(List.empty), + List.empty, List( List(0, unsignedLongMaxValue, LongMinValue), List(1, 1, 1), @@ -84,7 +84,7 @@ class UnsignedTestSuite extends BaseTiSparkSuite { List(2, 3, 4), List(3, 5, 7) ), - List(List.empty), + List.empty, List( List(0, unsignedLongMaxValue, LongMinValue), List(1, 1, 1), @@ -99,14 +99,14 @@ class UnsignedTestSuite extends BaseTiSparkSuite { List(3, 5, 7), List(unsignedLongMaxValue, unsignedLongMaxValue, LongMaxValue) ), - List(List.empty), - List(List.empty), + List.empty, + List.empty, List(List(unsignedLongMaxValue), List(1), List(3), List(5), List(unsignedLongMaxValue)), List(List(1), List(3), List(5)), - List(List.empty), + List.empty, List(List(unsignedLongMaxValue), List(1), List(3), List(5), List(unsignedLongMaxValue)), List(List(unsignedLongMaxValue), List(1), List(3), List(5), List(unsignedLongMaxValue)), - List(List.empty), + List.empty, List(List(1), List(2), List(3)), List(List(0), List(1), List(2), List(3), List(unsignedLongMaxValue)) ) diff --git a/core/src/test/scala/org/apache/spark/sql/statistics/StatisticsManagerSuite.scala b/core/src/test/scala/org/apache/spark/sql/statistics/StatisticsManagerSuite.scala index c0b3187ef6..3b60d9209f 100644 --- a/core/src/test/scala/org/apache/spark/sql/statistics/StatisticsManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/statistics/StatisticsManagerSuite.scala @@ -113,7 +113,9 @@ class StatisticsManagerSuite extends BaseTiSparkSuite { testSelectRowCount(expressions, idx, 46) } - ignore("select tp_int from full_data_type_table_idx where tp_int < 5390653 and tp_int > -46759812") { + ignore( + "select tp_int from full_data_type_table_idx where tp_int < 5390653 and tp_int > -46759812" + ) { val indexes = fDataIdxTbl.getIndices val idx = indexes.filter(_.getIndexColumns.asScala.exists(_.matchName("tp_int"))).head diff --git a/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala b/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala index 91f955daab..fa9ce04b76 100644 --- a/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala +++ b/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala @@ -161,6 +161,8 @@ object SharedSQLContext extends Logging { if (_tidbConnection == null) { val jdbcUsername = getOrElse(_tidbConf, TiDB_USER, "root") + val jdbcPassword = getOrElse(_tidbConf, TiDB_PASSWORD, "") + val jdbcHostname = getOrElse(_tidbConf, TiDB_ADDRESS, "127.0.0.1") val jdbcPort = Integer.parseInt(getOrElse(_tidbConf, TiDB_PORT, "4000")) @@ -168,9 +170,9 @@ object SharedSQLContext extends Logging { val loadData = getOrElse(_tidbConf, SHOULD_LOAD_DATA, "true").toBoolean jdbcUrl = - s"jdbc:mysql://$jdbcHostname:$jdbcPort/?user=$jdbcUsername&zeroDateTimeBehavior=convertToNull" + s"jdbc:mysql://$jdbcHostname:$jdbcPort/?user=$jdbcUsername&password=$jdbcPassword&useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&useSSL=false" - _tidbConnection = DriverManager.getConnection(jdbcUrl, jdbcUsername, "") + _tidbConnection = DriverManager.getConnection(jdbcUrl, jdbcUsername, jdbcPassword) _statement = _tidbConnection.createStatement() if (loadData && !forceNotLoad) { diff --git a/core/src/test/scala/org/apache/spark/sql/test/TestConstants.scala b/core/src/test/scala/org/apache/spark/sql/test/TestConstants.scala index ff4232a597..ee435f04b0 100644 --- a/core/src/test/scala/org/apache/spark/sql/test/TestConstants.scala +++ b/core/src/test/scala/org/apache/spark/sql/test/TestConstants.scala @@ -21,6 +21,7 @@ object TestConstants { val TiDB_ADDRESS = "tidb.addr" val TiDB_PORT = "tidb.port" val TiDB_USER = "tidb.user" + val TiDB_PASSWORD = "tidb.password" val TPCH_DB_NAME = "tpch.db" val SHOULD_LOAD_DATA = "test.data.load" val SHOULD_SKIP_TEST = "test.skip" diff --git a/docs/userguide.md b/docs/userguide.md index 079016b6c9..0733014655 100644 --- a/docs/userguide.md +++ b/docs/userguide.md @@ -101,7 +101,7 @@ Download the TiSpark's jar package [here](http://download.pingcap.org/tispark-${ Running TiSpark on an existing Spark cluster does not require a reboot of the cluster. You can use Spark's `--jars` parameter to introduce TiSpark as a dependency: ``` -Spark-shell --jars $ PATH / tispark-0.1.0.jar +Spark-shell --jars $ PATH / tispark-core-1.0.1-jar-with-dependencies.jar ``` If you want to deploy TiSpark as a default component, simply place the TiSpark jar package into the jars path for each node of the Spark cluster and restart the Spark cluster: diff --git a/pom.xml b/pom.xml index 546421c002..ed30e32a1f 100644 --- a/pom.xml +++ b/pom.xml @@ -3,9 +3,9 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 - org.apache - apache - 18 + org.apache + apache + 18 com.pingcap.tispark @@ -16,15 +16,50 @@ http://github.copm/pingcap/tispark - - Apache 2.0 License - http://www.apache.org/licenses/LICENSE-2.0.html - repo - + + Apache 2.0 License + http://www.apache.org/licenses/LICENSE-2.0.html + repo + + PingCAP + + + + Xiaoyu Ma + maxiaoyu@pingcap.com + PingCAP + https://www.pingcap.com + + + Yifei Wu + birdstorm@pingcap.com + PingCAP + https://www.pingcap.com + + + Gansen Hu + hugansen@pingcap.com + PingCAP + https://www.pingcap.com + + + Zhexuan Yang + yangzhexuan@pingcap.com + PingCAP + https://www.pingcap.com + + + + + scm:git:git://github.com/pingcap/tispark.git + scm:git:ssh://github.com:pingcap/tispark.git + https://github.com/pingcap/tispark/tree/master + + UTF-8 UTF-8 @@ -33,8 +68,21 @@ 2.1.1 2.11 2.11.8 + true + true + + + ossrh + https://oss.sonatype.org/content/repositories/snapshots + + + ossrh + https://oss.sonatype.org/service/local/staging/deploy/maven2/ + + + core tikv-client @@ -90,4 +138,34 @@ provided + + + + + org.apache.maven.plugins + maven-surefire-plugin + 2.22.0 + + + + + + + org.apache.maven.plugins + maven-gpg-plugin + 1.5 + + ${gpg.skip} + + + + sign-artifacts + + sign + + + + + + diff --git a/python/README.md b/python/README.md index 7cd80e9757..3c40a50ff8 100644 --- a/python/README.md +++ b/python/README.md @@ -86,7 +86,7 @@ spark.sql("select count(*) from customer").show() 2. Prepare your TiSpark environment as above and execute ```bash -./bin/spark-submit --jars /where-ever-it-is/tispark-1.0-jar-with-dependencies.jar test.py +./bin/spark-submit --jars /where-ever-it-is/tispark-core-1.0.1-jar-with-dependencies.jar test.py ``` 3. Result: diff --git a/tikv-client/pom.xml b/tikv-client/pom.xml index 1fbb0dfb0e..e3c63261fd 100644 --- a/tikv-client/pom.xml +++ b/tikv-client/pom.xml @@ -257,12 +257,39 @@ org.apache.maven.plugins maven-javadoc-plugin - 2.8 + 2.9.1 - UTF-8 - UTF-8 - UTF-8 + ${javadoc.skip} + + + attach-javadocs + + jar + + + -Xdoclint:none + + + + + + + org.apache.maven.plugins + maven-gpg-plugin + 1.5 + + ${gpg.skip} + + + + sign-artifacts + verify + + sign + + + diff --git a/tikv-client/src/main/java/com/pingcap/tikv/TiConfiguration.java b/tikv-client/src/main/java/com/pingcap/tikv/TiConfiguration.java index 5f5ff083fa..c5dd14dd80 100644 --- a/tikv-client/src/main/java/com/pingcap/tikv/TiConfiguration.java +++ b/tikv-client/src/main/java/com/pingcap/tikv/TiConfiguration.java @@ -43,6 +43,7 @@ public class TiConfiguration implements Serializable { private static final int DEF_TABLE_SCAN_CONCURRENCY = 512; private static final CommandPri DEF_COMMAND_PRIORITY = CommandPri.Low; private static final IsolationLevel DEF_ISOLATION_LEVEL = IsolationLevel.RC; + private static final boolean DEF_SHOW_ROWID = false; private int timeout = DEF_TIMEOUT; private TimeUnit timeoutUnit = DEF_TIMEOUT_UNIT; @@ -58,6 +59,7 @@ public class TiConfiguration implements Serializable { private CommandPri commandPriority = DEF_COMMAND_PRIORITY; private IsolationLevel isolationLevel = DEF_ISOLATION_LEVEL; private int maxRequestKeyRangeSize = MAX_REQUEST_KEY_RANGE_SIZE; + private boolean showRowId = DEF_SHOW_ROWID; public static TiConfiguration createDefault(String pdAddrsStr) { Objects.requireNonNull(pdAddrsStr, "pdAddrsStr is null"); @@ -197,4 +199,12 @@ public void setMaxRequestKeyRangeSize(int maxRequestKeyRangeSize) { } this.maxRequestKeyRangeSize = maxRequestKeyRangeSize; } + + public void setShowRowId(boolean flag) { + this.showRowId = flag; + } + + public boolean ifShowRowId() { + return showRowId; + } } diff --git a/tikv-client/src/main/java/com/pingcap/tikv/TiSession.java b/tikv-client/src/main/java/com/pingcap/tikv/TiSession.java index 50bca57fb2..48c3696cab 100644 --- a/tikv-client/src/main/java/com/pingcap/tikv/TiSession.java +++ b/tikv-client/src/main/java/com/pingcap/tikv/TiSession.java @@ -84,7 +84,9 @@ public Catalog getCatalog() { if (catalog == null) { catalog = new Catalog(() -> createSnapshot(), conf.getMetaReloadPeriod(), - conf.getMetaReloadPeriodUnit()); + conf.getMetaReloadPeriodUnit(), + conf.ifShowRowId() + ); } res = catalog; } diff --git a/tikv-client/src/main/java/com/pingcap/tikv/catalog/Catalog.java b/tikv-client/src/main/java/com/pingcap/tikv/catalog/Catalog.java index c0832141a3..e2dfd19420 100644 --- a/tikv-client/src/main/java/com/pingcap/tikv/catalog/Catalog.java +++ b/tikv-client/src/main/java/com/pingcap/tikv/catalog/Catalog.java @@ -15,26 +15,31 @@ package com.pingcap.tikv.catalog; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.pingcap.tikv.Snapshot; import com.pingcap.tikv.meta.TiDBInfo; import com.pingcap.tikv.meta.TiTableInfo; -import org.apache.log4j.Logger; - import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.concurrent.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.function.Supplier; +import java.util.stream.Collectors; +import org.apache.log4j.Logger; public class Catalog implements AutoCloseable { private Supplier snapshotProvider; private ScheduledExecutorService service; private CatalogCache metaCache; + private final boolean showRowId; private final Logger logger = Logger.getLogger(this.getClass()); @Override @@ -110,9 +115,14 @@ private Map loadDatabases() { } } - public Catalog(Supplier snapshotProvider, int refreshPeriod, TimeUnit periodUnit) { + public Catalog( + Supplier snapshotProvider, + int refreshPeriod, + TimeUnit periodUnit, + boolean showRowId) { this.snapshotProvider = Objects.requireNonNull(snapshotProvider, "Snapshot Provider is null"); + this.showRowId = showRowId; metaCache = new CatalogCache(new CatalogTransaction(snapshotProvider.get())); service = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setDaemon(true).build()); service.scheduleAtFixedRate(() -> { @@ -125,12 +135,6 @@ public Catalog(Supplier snapshotProvider, int refreshPeriod, TimeUnit }, refreshPeriod, refreshPeriod, periodUnit); } - public Catalog(Supplier snapshotProvider) { - this.snapshotProvider = Objects.requireNonNull(snapshotProvider, - "Snapshot Provider is null"); - metaCache = new CatalogCache(new CatalogTransaction(snapshotProvider.get())); - } - public void reloadCache() { Snapshot snapshot = snapshotProvider.get(); CatalogTransaction newTrx = new CatalogTransaction(snapshot); @@ -146,7 +150,15 @@ public List listDatabases() { public List listTables(TiDBInfo database) { Objects.requireNonNull(database, "database is null"); - return metaCache.listTables(database); + if (showRowId) { + return metaCache + .listTables(database) + .stream() + .map(table -> table.copyTableWithRowId()) + .collect(Collectors.toList()); + } else { + return metaCache.listTables(database); + } } public TiDBInfo getDatabase(String dbName) { @@ -165,15 +177,25 @@ public TiTableInfo getTable(String dbName, String tableName) { public TiTableInfo getTable(TiDBInfo database, String tableName) { Objects.requireNonNull(database, "database is null"); Objects.requireNonNull(tableName, "tableName is null"); - return metaCache.getTable(database, tableName); + TiTableInfo table = metaCache.getTable(database, tableName); + if (showRowId) { + return table.copyTableWithRowId(); + } else { + return table; + } } + @VisibleForTesting public TiTableInfo getTable(TiDBInfo database, long tableId) { Objects.requireNonNull(database, "database is null"); Collection tables = listTables(database); for (TiTableInfo table : tables) { if (table.getId() == tableId) { - return table; + if (showRowId) { + return table.copyTableWithRowId(); + } else { + return table; + } } } return null; diff --git a/tikv-client/src/main/java/com/pingcap/tikv/codec/MyDecimal.java b/tikv-client/src/main/java/com/pingcap/tikv/codec/MyDecimal.java index ea12c9233d..c40b9f039e 100644 --- a/tikv-client/src/main/java/com/pingcap/tikv/codec/MyDecimal.java +++ b/tikv-client/src/main/java/com/pingcap/tikv/codec/MyDecimal.java @@ -15,6 +15,7 @@ package com.pingcap.tikv.codec; +import com.google.common.annotations.VisibleForTesting; import java.math.BigDecimal; import java.util.Arrays; @@ -58,12 +59,12 @@ public class MyDecimal { /* * Returns total precision of this decimal. Basically, it is sum of digitsInt and digitsFrac. But there * are some special cases need to be token care of such as 000.001. + * Precision reflects the actual effective precision without leading zero */ public int precision() { int frac = this.digitsFrac; int digitsInt = - this.removeLeadingZeros()[ - 1]; /*this function return an array and the second element is digitsInt*/ + this.removeLeadingZeros()[1]; /*this function return an array and the second element is digitsInt*/ int precision = digitsInt + frac; // if no precision, it is just 0. if (precision == 0) { @@ -72,7 +73,10 @@ public int precision() { return precision; } - /** Returns fraction digits that counts how many digits after ".". */ + /** + * Returns fraction digits that counts how many digits after ".". + * frac() reflects the actual effective fraction without trailing zero + */ public int frac() { return digitsFrac; } @@ -92,8 +96,7 @@ public void fromDecimal(double value) { * * @param precision precision specifies total digits that this decimal will be.. * @param frac frac specifies how many fraction digits - * @param bin bin is binary string which represents a decimal value. TODO: (zhexuany) overflow and - * truncated exception need to be done later. + * @param bin bin is binary string which represents a decimal value. */ public int fromBin(int precision, int frac, int[] bin) { if (bin.length == 0) { @@ -134,13 +137,13 @@ public int fromBin(int precision, int frac, int[] bin) { wordsIntTo = wordBufLen; wordsFracTo = 0; overflow = true; + } else { + wordsIntTo = wordsInt; + wordsFracTo = wordBufLen - wordsInt; + truncated = true; } - wordsIntTo = wordsInt; - wordsFracTo = wordBufLen - wordsInt; - truncated = true; } - wordsIntTo = wordsInt; - wordsFracTo = wordsFrac; + if (overflow || truncated) { if (wordsIntTo < oldWordsIntTo) { binIdx += dig2bytes[leadingDigits] + (wordsInt - wordsIntTo) * wordSize; @@ -151,8 +154,8 @@ public int fromBin(int precision, int frac, int[] bin) { } this.negative = mask != 0; - this.digitsInt = wordsInt * digitsPerWord + leadingDigits; - this.digitsFrac = wordsFrac * digitsPerWord + trailingDigits; + this.digitsInt = (byte)(wordsInt * digitsPerWord + leadingDigits); + this.digitsFrac = (byte)(wordsFrac * digitsPerWord + trailingDigits); int wordIdx = 0; if (leadingDigits > 0) { @@ -264,25 +267,26 @@ private int digitsToWords(int digits) { /** * Reads a word from a array at given size. * - * @param b b is source data. + * @param b b is source data of unsigned byte as int[] * @param size is word size which can be used in switch statement. * @param start start indicates the where start to read. */ - private int readWord(int[] b, int size, int start) { + @VisibleForTesting + public static int readWord(int[] b, int size, int start) { int x = 0; switch (size) { case 1: - x = b[start]; + x = (byte)b[start]; break; case 2: - x = (b[start] << 8) + b[start + 1]; + x = (((byte)b[start]) << 8) + (b[start + 1] & 0xFF); break; case 3: int sign = b[start] & 128; if (sign > 0) { - x = 255 << 24 | (b[start] & 0xFF) << 16 | (b[start + 1] & 0xFF) << 8 | (b[start + 2]); + x = 0xFF << 24 | (b[start] << 16) | (b[start + 1] << 8) | (b[start + 2]); } else { - x = b[start] << 16 | b[start + 1] << 8 | b[start + 2]; + x = b[start] << 16 | (b[start + 1] << 8) | b[start + 2]; } break; case 4: @@ -320,7 +324,7 @@ private void fromCharArray(char[] str) { // [-, 1, 2, 3] // [+, 1, 2, 3] // for +/-, we need skip them and record sign information into negative field. - switch (str[0]) { + switch (str[startIdx]) { case '-': this.negative = true; startIdx++; @@ -335,8 +339,8 @@ private void fromCharArray(char[] str) { } // we initialize strIdx in case of sign notation, here we need substract startIdx from strIdx casue strIdx is used for counting the number of digits. int digitsInt = strIdx - startIdx; - int digitsFrac = 0; - int endIdx = 0; + int digitsFrac; + int endIdx; if (strIdx < str.length && str[strIdx] == '.') { endIdx = strIdx + 1; // detect where is the end index of this char array. @@ -363,13 +367,12 @@ private void fromCharArray(char[] str) { wordsInt = wordBufLen; wordsFrac = 0; overflow = true; + } else { + wordsFrac = wordBufLen - wordsInt; + truncated = true; } - // wordsIntTo = wordsInt; - wordsFrac = wordBufLen - wordsInt; - truncated = true; + } - // wordsIntTo = wordsInt; - // wordsFracTo = wordsFrac; if (overflow || truncated) { digitsFrac = wordsFrac * digitsPerWord; @@ -438,7 +441,7 @@ private void fromCharArray(char[] str) { } // parser a string to a int. - private int strToInt(String str) { + private int strToLong(String str) { str = str.trim(); if (str.isEmpty()) { return 0; @@ -466,11 +469,6 @@ private int strToInt(String str) { return r; } - // TODO throw overflow or truncated - private int[] fixWordCntError(int wordsIntTo, int WordsFracTo) { - return null; - } - // Returns a decimal string. public String toString() { char[] str; @@ -556,8 +554,8 @@ private int stringSize() { return digitsInt + digitsFrac + 3; } - public int toInt() { - int x = 0; + public long toLong() { + long x = 0; int wordIdx = 0; for (int i = this.digitsInt; i > 0; i -= digitsPerWord) { /* @@ -566,32 +564,34 @@ public int toInt() { because |LONGLONG_MIN| > LONGLONG_MAX so we can convert -9223372036854775808 correctly */ - x = x * wordBase - this.wordBuf[wordIdx]; + long y = x; + x = x * wordBase - (long)this.wordBuf[wordIdx]; wordIdx++; - // if (y < Integer.MIN_VALUE/wordBase || x > y) { - /* - the decimal is bigger than any possible integer - return border integer depending on the sign - */ - // if (this.negative){ - // TODO throw overflow exception - // return math.Integer.MIN_VALUE, ErrOverflow - // } - // return math.MaxInt64, ErrOverflow - // } + if (y < Long.MIN_VALUE/wordBase || x > y) { + /* + the decimal is bigger than any possible integer + return border integer depending on the sign + */ + if (this.negative) { + return Long.MIN_VALUE; + } + return Long.MAX_VALUE; + } } + /* boundary case: 9223372036854775808 */ - if (!this.negative && x == Integer.MIN_VALUE) { - // return math.MaxInt64, ErrOverflow + if (!this.negative && x == Long.MIN_VALUE) { + return Long.MAX_VALUE; } + if (!this.negative) { x = -x; } for (int i = this.digitsFrac; i > 0; i -= digitsPerWord) { - // if (this.wordBuf[wordIdx] != 0){ - // return x, ErrTruncated - // } - // wordIdx++; + if (this.wordBuf[wordIdx] != 0){ + return x; + } + wordIdx++; } return x; } @@ -700,7 +700,6 @@ public int[] toBin(int precision, int frac) { int originFracSize = fracSize; int[] bin = new int[intSize + fracSize]; int binIdx = 0; - //TODO, overflow and truncated later int[] res = this.removeLeadingZeros(); int wordIdxFrom = res[0]; int digitsIntFrom = res[1]; diff --git a/tikv-client/src/main/java/com/pingcap/tikv/expression/visitor/IndexRangeBuilder.java b/tikv-client/src/main/java/com/pingcap/tikv/expression/visitor/IndexRangeBuilder.java index 6fe95ac97d..dcf5313bf1 100644 --- a/tikv-client/src/main/java/com/pingcap/tikv/expression/visitor/IndexRangeBuilder.java +++ b/tikv-client/src/main/java/com/pingcap/tikv/expression/visitor/IndexRangeBuilder.java @@ -140,6 +140,9 @@ protected RangeSet visit(ComparisonBinaryExpression node, Void context ranges.add(Range.singleton(literal)); break; case NOT_EQUAL: + // Should return full range because prefix index predicate for NOT_EQUAL + // will be split into an NOT_EQUAL filter and a full range scan + ranges.add(Range.all()); break; default: throwOnError(node); diff --git a/tikv-client/src/main/java/com/pingcap/tikv/key/TypedKey.java b/tikv-client/src/main/java/com/pingcap/tikv/key/TypedKey.java index 924ce26a1c..40e334427a 100644 --- a/tikv-client/src/main/java/com/pingcap/tikv/key/TypedKey.java +++ b/tikv-client/src/main/java/com/pingcap/tikv/key/TypedKey.java @@ -59,7 +59,7 @@ public static TypedKey toTypedKey(Object val, DataType type, int prefixLength) { private static byte[] encodeKey(Object val, DataType type, int prefixLength) { CodecDataOutput cdo = new CodecDataOutput(); - type.encodeKey(cdo, val, prefixLength); + type.encodeKey(cdo, val, type, prefixLength); return cdo.toBytes(); } diff --git a/tikv-client/src/main/java/com/pingcap/tikv/meta/CIStr.java b/tikv-client/src/main/java/com/pingcap/tikv/meta/CIStr.java index 95d03a4b44..b93af6e491 100644 --- a/tikv-client/src/main/java/com/pingcap/tikv/meta/CIStr.java +++ b/tikv-client/src/main/java/com/pingcap/tikv/meta/CIStr.java @@ -16,9 +16,11 @@ package com.pingcap.tikv.meta; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; /** This class is mapping TiDB's CIStr/ For internal use only. */ +@JsonIgnoreProperties(ignoreUnknown = true) public class CIStr { private final String o; // original private final String l; diff --git a/tikv-client/src/main/java/com/pingcap/tikv/meta/Collation.java b/tikv-client/src/main/java/com/pingcap/tikv/meta/Collation.java index d7affff9b8..a363b10796 100644 --- a/tikv-client/src/main/java/com/pingcap/tikv/meta/Collation.java +++ b/tikv-client/src/main/java/com/pingcap/tikv/meta/Collation.java @@ -27,7 +27,7 @@ public static int translate(String collation) { return code; } - static String translate(int code) { + public static String translate(int code) { String collation = collationCodeMap.get(code); if (collation == null) { return ""; diff --git a/tikv-client/src/main/java/com/pingcap/tikv/meta/TiColumnInfo.java b/tikv-client/src/main/java/com/pingcap/tikv/meta/TiColumnInfo.java index ebdf235808..fb28777da4 100644 --- a/tikv-client/src/main/java/com/pingcap/tikv/meta/TiColumnInfo.java +++ b/tikv-client/src/main/java/com/pingcap/tikv/meta/TiColumnInfo.java @@ -15,6 +15,8 @@ package com.pingcap.tikv.meta; +import static java.util.Objects.requireNonNull; + import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; @@ -25,13 +27,11 @@ import com.pingcap.tikv.types.DataType; import com.pingcap.tikv.types.DataType.EncodeType; import com.pingcap.tikv.types.DataTypeFactory; - +import com.pingcap.tikv.types.IntegerType; import java.io.Serializable; import java.util.List; import java.util.Objects; -import static java.util.Objects.requireNonNull; - @JsonIgnoreProperties(ignoreUnknown = true) public class TiColumnInfo implements Serializable { private final long id; @@ -44,6 +44,10 @@ public class TiColumnInfo implements Serializable { private final String defaultValue; private final String originDefaultValue; + public static TiColumnInfo getRowIdColumn(int offset) { + return new TiColumnInfo(-1, "_tidb_rowid", offset, IntegerType.ROW_ID_TYPE, true); + } + @VisibleForTesting private static final int PK_MASK = 0x2; @@ -70,6 +74,42 @@ public TiColumnInfo( this.isPrimaryKey = (type.getFlag() & PK_MASK) > 0; } + public TiColumnInfo( + long id, + String name, + int offset, + DataType type, + SchemaState schemaState, + String originalDefaultValue, + String defaultValue, + String comment) { + this.id = id; + this.name = requireNonNull(name, "column name is null").toLowerCase(); + this.offset = offset; + this.type = requireNonNull(type, "data type is null"); + this.schemaState = schemaState; + this.comment = comment; + this.defaultValue = defaultValue; + this.originDefaultValue = originalDefaultValue; + this.isPrimaryKey = (type.getFlag() & PK_MASK) > 0; + } + + public TiColumnInfo copyWithoutPrimaryKey() { + InternalTypeHolder typeHolder = type.toTypeHolder(); + typeHolder.setFlag(type.getFlag() & (~TiColumnInfo.PK_MASK)); + DataType newType = DataTypeFactory.of(typeHolder); + return new TiColumnInfo( + this.id, + this.name, + this.offset, + newType, + this.schemaState, + this.originDefaultValue, + this.defaultValue, + this.comment + ); + } + @VisibleForTesting public TiColumnInfo(long id, String name, int offset, DataType type, boolean isPrimaryKey) { this.id = id; @@ -119,7 +159,11 @@ public String getDefaultValue() { return defaultValue; } - public ByteString getOriginDefaultValue() { + public String getOriginDefaultValue() { + return originDefaultValue; + } + + public ByteString getOriginDefaultValueAsByteString() { CodecDataOutput cdo = new CodecDataOutput(); type.encode(cdo, EncodeType.VALUE, type.getOriginDefaultValue(originDefaultValue)); return cdo.toByteString(); @@ -127,22 +171,50 @@ public ByteString getOriginDefaultValue() { @JsonIgnoreProperties(ignoreUnknown = true) public static class InternalTypeHolder { - private final int tp; - private final int flag; - private final long flen; - private final int decimal; - private final String charset; - private final String collate; - private final String defaultValue; - private final String originDefaultValue; - private final List elems; + private int tp; + private int flag; + private long flen; + private int decimal; + private String charset; + private String collate; + private String defaultValue; + private String originDefaultValue; + private List elems; + + public void setTp(int tp) { + this.tp = tp; + } - public String getDefaultValue() { - return defaultValue; + public void setFlag(int flag) { + this.flag = flag; } - public String getOriginDefaultValue() { - return originDefaultValue; + public void setFlen(long flen) { + this.flen = flen; + } + + public void setDecimal(int decimal) { + this.decimal = decimal; + } + + public void setCharset(String charset) { + this.charset = charset; + } + + public void setCollate(String collate) { + this.collate = collate; + } + + public void setDefaultValue(String defaultValue) { + this.defaultValue = defaultValue; + } + + public void setOriginDefaultValue(String originDefaultValue) { + this.originDefaultValue = originDefaultValue; + } + + public void setElems(List elems) { + this.elems = elems; } interface Builder { @@ -211,6 +283,15 @@ public String getCollate() { public List getElems() { return elems; } + + public String getDefaultValue() { + return defaultValue; + } + + public String getOriginDefaultValue() { + return originDefaultValue; + } + } TiIndexColumn toFakeIndexColumn() { @@ -235,7 +316,7 @@ ColumnInfo.Builder toProtoBuilder(TiTableInfo table) { .setColumnLen((int) type.getLength()) .setDecimal(type.getDecimal()) .setFlag(type.getFlag()) - .setDefaultVal(getOriginDefaultValue()) + .setDefaultVal(getOriginDefaultValueAsByteString()) .setPkHandle(table.isPkHandle() && isPrimaryKey()) .addAllElems(type.getElems()); } diff --git a/tikv-client/src/main/java/com/pingcap/tikv/meta/TiDBInfo.java b/tikv-client/src/main/java/com/pingcap/tikv/meta/TiDBInfo.java index 70da89a78c..485e8078cb 100644 --- a/tikv-client/src/main/java/com/pingcap/tikv/meta/TiDBInfo.java +++ b/tikv-client/src/main/java/com/pingcap/tikv/meta/TiDBInfo.java @@ -16,9 +16,11 @@ package com.pingcap.tikv.meta; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; import java.util.List; +@JsonIgnoreProperties(ignoreUnknown = true) public class TiDBInfo { private long id; private String name; diff --git a/tikv-client/src/main/java/com/pingcap/tikv/meta/TiIndexInfo.java b/tikv-client/src/main/java/com/pingcap/tikv/meta/TiIndexInfo.java index 65bf963d81..a477a394be 100644 --- a/tikv-client/src/main/java/com/pingcap/tikv/meta/TiIndexInfo.java +++ b/tikv-client/src/main/java/com/pingcap/tikv/meta/TiIndexInfo.java @@ -16,6 +16,7 @@ package com.pingcap.tikv.meta; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; @@ -29,6 +30,7 @@ import static java.util.Objects.requireNonNull; +@JsonIgnoreProperties(ignoreUnknown = true) public class TiIndexInfo implements Serializable { private final long id; private final String name; diff --git a/tikv-client/src/main/java/com/pingcap/tikv/meta/TiTableInfo.java b/tikv-client/src/main/java/com/pingcap/tikv/meta/TiTableInfo.java index 2ebe8408c1..75e884f089 100644 --- a/tikv-client/src/main/java/com/pingcap/tikv/meta/TiTableInfo.java +++ b/tikv-client/src/main/java/com/pingcap/tikv/meta/TiTableInfo.java @@ -20,10 +20,12 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.base.Objects; import com.google.common.collect.ImmutableList; import com.pingcap.tidb.tipb.TableInfo; import com.pingcap.tikv.exception.TiClientInternalException; +import com.pingcap.tikv.meta.TiColumnInfo.InternalTypeHolder; +import com.pingcap.tikv.types.DataType; +import com.pingcap.tikv.types.DataTypeFactory; import java.io.Serializable; import java.util.List; import java.util.stream.Collectors; @@ -147,6 +149,38 @@ public TiColumnInfo getPrimaryKeyColumn() { return null; } + public TiTableInfo copyTableWithRowId() { + if (!isPkHandle()) { + ImmutableList.Builder newColumns = ImmutableList.builder(); + for (TiColumnInfo col : getColumns()) { + DataType type = col.getType(); + InternalTypeHolder typeHolder = type.toTypeHolder(); + typeHolder.setFlag(type.getFlag() & (~DataType.PriKeyFlag)); + DataType newType = DataTypeFactory.of(typeHolder); + TiColumnInfo newCol = + new TiColumnInfo( + col.getId(), + col.getName(), + col.getOffset(), + newType, + col.getSchemaState(), + col.getOriginDefaultValue(), + col.getDefaultValue(), + col.getComment() + ); + newColumns.add(newCol.copyWithoutPrimaryKey()); + } + newColumns.add(TiColumnInfo.getRowIdColumn(getColumns().size())); + return new TiTableInfo( + getId(), CIStr.newCIStr(getName()), getCharset(), getCollate(), + true, newColumns.build(), getIndices(), + getComment(), getAutoIncId(), getMaxColumnId(), + getMaxIndexId(), getOldSchemaId()); + } else { + return this; + } + } + @Override public String toString() { return toProto().toString(); diff --git a/tikv-client/src/main/java/com/pingcap/tikv/operation/KVErrorHandler.java b/tikv-client/src/main/java/com/pingcap/tikv/operation/KVErrorHandler.java index ae0e4c9ef6..5af4645816 100644 --- a/tikv-client/src/main/java/com/pingcap/tikv/operation/KVErrorHandler.java +++ b/tikv-client/src/main/java/com/pingcap/tikv/operation/KVErrorHandler.java @@ -95,6 +95,19 @@ private void notifyRegionCacheInvalidate(long regionId) { regionId, 0, true, false, CacheInvalidateEvent.CacheType.REGION_STORE)); + logger.info("Accumulating cache invalidation info to driver:regionId=" + regionId + ",type=" + + CacheInvalidateEvent.CacheType.REGION_STORE.name()); + } else { + logger.warn("Failed to send notification back to driver since CacheInvalidateCallBack is null in executor node."); + } + } + + private void notifyStoreCacheInvalidate(long storeId) { + if (cacheInvalidateCallBack != null) { + cacheInvalidateCallBack.apply(new CacheInvalidateEvent( + 0, storeId, + false, true, + CacheInvalidateEvent.CacheType.REGION_STORE)); } else { logger.warn("Failed to send notification back to driver since CacheInvalidateCallBack is null in executor node."); } @@ -114,7 +127,12 @@ public boolean handleResponseError(BackOffer backOffer, RespT resp) { Errorpb.Error error = getRegionError(resp); if (error != null) { if (error.hasNotLeader()) { + // this error is reported from raftstore: + // peer of current request is not leader, the following might be its causes: + // 1. cache is outdated, region has changed its leader, can be solved by re-fetching from PD + // 2. leader of current region is missing, need to wait and then fetch region info from PD long newStoreId = error.getNotLeader().getLeader().getStoreId(); + boolean retry = true; // update Leader here logger.warn(String.format("NotLeader Error with region id %d and store id %d, new store id %d", @@ -126,14 +144,21 @@ public boolean handleResponseError(BackOffer backOffer, RespT resp) { // if there's current no leader, we do not trigger update pd cache logic // since issuing store = NO_LEADER_STORE_ID requests to pd will definitely fail. if (newStoreId != NO_LEADER_STORE_ID) { - regionManager.updateLeader(ctxRegion.getId(), newStoreId); + if (!this.regionManager.updateLeader(ctxRegion.getId(), newStoreId) || + !recv.onNotLeader(this.regionManager.getStoreById(newStoreId))) { + // If update leader fails, we need to fetch new region info from pd, + // and re-split key range for new region. Setting retry to false will + // stop retry and enter handleCopResponse logic, which would use RegionMiss + // backOff strategy to wait, fetch new region and re-split key range. + // onNotLeader is only needed when updateLeader succeeds, thus switch + // to a new store address. + retry = false; + } notifyRegionStoreCacheInvalidate( ctxRegion.getId(), newStoreId, CacheInvalidateEvent.CacheType.LEADER ); - recv.onNotLeader(this.regionManager.getRegionById(ctxRegion.getId()), - this.regionManager.getStoreById(newStoreId)); backOffFuncType = BackOffFunction.BackOffFuncType.BoUpdateLeader; } else { @@ -143,32 +168,44 @@ public boolean handleResponseError(BackOffer backOffer, RespT resp) { backOffer.doBackOff(backOffFuncType, new GrpcException(error.toString())); - return true; + return retry; } else if (error.hasStoreNotMatch()) { - logger.warn(String.format("Store Not Match happened with region id %d, store id %d", - ctxRegion.getId(), - ctxRegion.getLeader().getStoreId())); - - invalidateRegionStoreCache(ctxRegion); - recv.onStoreNotMatch(); + // this error is reported from raftstore: + // store_id requested at the moment is inconsistent with that expected + // Solution:re-fetch from PD + long storeId = ctxRegion.getLeader().getStoreId(); + logger.warn(String.format("Store Not Match happened with region id %d, store id %d", ctxRegion.getId(), storeId)); + + this.regionManager.invalidateStore(storeId); + recv.onStoreNotMatch(this.regionManager.getStoreById(storeId)); + notifyStoreCacheInvalidate(storeId); return true; } else if (error.hasStaleEpoch()) { + // this error is reported from raftstore: + // region has outdated version,please try later. logger.warn(String.format("Stale Epoch encountered for region [%s]", ctxRegion)); this.regionManager.onRegionStale(ctxRegion.getId()); notifyRegionCacheInvalidate(ctxRegion.getId()); return false; } else if (error.hasServerIsBusy()) { + // this error is reported from kv: + // will occur when write pressure is high. Please try later. logger.warn(String.format("Server is busy for region [%s], reason: %s", ctxRegion, error.getServerIsBusy().getReason())); backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoServerBusy, new StatusRuntimeException(Status.fromCode(Status.Code.UNAVAILABLE).withDescription(error.toString()))); return true; } else if (error.hasStaleCommand()) { + // this error is reported from raftstore: + // command outdated, please try later logger.warn(String.format("Stale command for region [%s]", ctxRegion)); return true; } else if (error.hasRaftEntryTooLarge()) { logger.warn(String.format("Raft too large for region [%s]", ctxRegion)); throw new StatusRuntimeException(Status.fromCode(Status.Code.UNAVAILABLE).withDescription(error.toString())); } else if (error.hasKeyNotInRegion()) { + // this error is reported from raftstore: + // key requested is not in current region + // should not happen here. ByteString invalidKey = error.getKeyNotInRegion().getKey(); logger.error(String.format("Key not in region [%s] for key [%s], this error should not happen here.", ctxRegion, KeyUtils.formatBytes(invalidKey))); throw new StatusRuntimeException(Status.UNKNOWN.withDescription(error.toString())); diff --git a/tikv-client/src/main/java/com/pingcap/tikv/region/RegionErrorReceiver.java b/tikv-client/src/main/java/com/pingcap/tikv/region/RegionErrorReceiver.java index 0d64c36389..4b2089c65f 100644 --- a/tikv-client/src/main/java/com/pingcap/tikv/region/RegionErrorReceiver.java +++ b/tikv-client/src/main/java/com/pingcap/tikv/region/RegionErrorReceiver.java @@ -21,6 +21,6 @@ import com.pingcap.tikv.kvproto.Metapb.Store; public interface RegionErrorReceiver { - void onNotLeader(TiRegion region, Store store); - void onStoreNotMatch(); + boolean onNotLeader(Store store); + void onStoreNotMatch(Store store); } diff --git a/tikv-client/src/main/java/com/pingcap/tikv/region/RegionManager.java b/tikv-client/src/main/java/com/pingcap/tikv/region/RegionManager.java index c1232fbf12..dfc3b7a629 100644 --- a/tikv-client/src/main/java/com/pingcap/tikv/region/RegionManager.java +++ b/tikv-client/src/main/java/com/pingcap/tikv/region/RegionManager.java @@ -211,14 +211,18 @@ public void onRegionStale(long regionId) { cache.invalidateRegion(regionId); } - public void updateLeader(long regionId, long storeId) { - TiRegion r = cache.getRegionById(regionId); + public boolean updateLeader(long regionId, long storeId) { + TiRegion r = cache.regionCache.get(regionId); if (r != null) { if (!r.switchPeer(storeId)) { + // failed to switch leader, possibly region is outdated, we need to drop region cache from regionCache + logger.warn("Cannot find peer when updating leader (" + regionId + "," + storeId + ")"); // drop region cache using verId cache.invalidateRegion(regionId); + return false; } } + return true; } /** diff --git a/tikv-client/src/main/java/com/pingcap/tikv/region/RegionStoreClient.java b/tikv-client/src/main/java/com/pingcap/tikv/region/RegionStoreClient.java index 1430d67ebb..a8d7048e26 100644 --- a/tikv-client/src/main/java/com/pingcap/tikv/region/RegionStoreClient.java +++ b/tikv-client/src/main/java/com/pingcap/tikv/region/RegionStoreClient.java @@ -59,7 +59,6 @@ import com.pingcap.tikv.util.BackOffFunction; import com.pingcap.tikv.util.BackOffer; import com.pingcap.tikv.util.ConcreteBackOffer; -import com.pingcap.tikv.util.Pair; import com.pingcap.tikv.util.RangeSplitter; import io.grpc.ManagedChannel; import java.util.Iterator; @@ -368,28 +367,40 @@ protected TikvStub getAsyncStub() { return asyncStub.withDeadlineAfter(getConf().getTimeout(), getConf().getTimeoutUnit()); } + /** + * onNotLeader deals with NotLeaderError and returns whether re-splitting key range is needed + * + * @param newStore the new store presented by NotLeader Error + * @return false when re-split is needed. + */ @Override - public void onNotLeader(TiRegion newRegion, Store newStore) { + public boolean onNotLeader(Store newStore) { + if (logger.isDebugEnabled()) { + logger.debug(region + ", new leader = " + newStore.getId()); + } + TiRegion cachedRegion = regionManager.getRegionById(region.getId()); + // When switch leader fails or the region changed its key range, + // it would be necessary to re-split task's key range for new region. + if (!region.switchPeer(newStore.getId()) || + !region.getStartKey().equals(cachedRegion.getStartKey()) || + !region.getEndKey().equals(cachedRegion.getEndKey())) { + return false; + } String addressStr = newStore.getAddress(); ManagedChannel channel = getSession().getChannel(addressStr); - region = newRegion; - if (!region.switchPeer(newStore.getId())) { - throw new TiClientInternalException("Failed to switch leader"); - } blockingStub = TikvGrpc.newBlockingStub(channel); asyncStub = TikvGrpc.newStub(channel); + return true; } @Override - public void onStoreNotMatch() { - Pair regionStorePair = - regionManager.getRegionStorePairByRegionId(region.getId()); - Store store = regionStorePair.second; + public void onStoreNotMatch(Store store) { String addressStr = store.getAddress(); ManagedChannel channel = getSession().getChannel(addressStr); blockingStub = TikvGrpc.newBlockingStub(channel); asyncStub = TikvGrpc.newStub(channel); - region = regionStorePair.first; - region.switchPeer(store.getId()); + if (logger.isDebugEnabled() && region.getLeader().getStoreId() != store.getId()) { + logger.debug("store_not_match may occur? " + region + ", original store = " + store.getId() + " address = " + addressStr); + } } } diff --git a/tikv-client/src/main/java/com/pingcap/tikv/types/Converter.java b/tikv-client/src/main/java/com/pingcap/tikv/types/Converter.java index 5371fb9b0c..bdaddf025b 100644 --- a/tikv-client/src/main/java/com/pingcap/tikv/types/Converter.java +++ b/tikv-client/src/main/java/com/pingcap/tikv/types/Converter.java @@ -18,6 +18,7 @@ package com.pingcap.tikv.types; import com.pingcap.tikv.exception.TypeException; +import org.apache.spark.unsafe.types.UTF8String; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.joda.time.format.DateTimeFormat; @@ -27,6 +28,7 @@ import java.math.BigInteger; import java.sql.Date; import java.sql.Timestamp; +import java.util.Arrays; import static java.util.Objects.requireNonNull; @@ -66,6 +68,26 @@ public static byte[] convertToBytes(Object val) { throw new TypeException(String.format("Cannot cast %s to bytes", val.getClass().getSimpleName())); } + static byte[] convertToBytes(Object val, int prefixLength) { + requireNonNull(val, "val is null"); + if (val instanceof byte[]) { + return Arrays.copyOf((byte[])val, prefixLength); + } else if (val instanceof String) { + return Arrays.copyOf(((String) val).getBytes(), prefixLength); + } + throw new TypeException(String.format("Cannot cast %s to bytes", val.getClass().getSimpleName())); + } + + static byte[] convertUtf8ToBytes(Object val, int prefixLength) { + requireNonNull(val, "val is null"); + if (val instanceof byte[]) { + return UTF8String.fromBytes(((byte[]) val)).substring(0, prefixLength).getBytes(); + } else if (val instanceof String) { + return UTF8String.fromString(((String) val)).substring(0, prefixLength).getBytes(); + } + throw new TypeException(String.format("Cannot cast %s to bytes", val.getClass().getSimpleName())); + } + private static final DateTimeZone localTimeZone = DateTimeZone.getDefault(); private static final DateTimeFormatter localDateTimeFormatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").withZone(localTimeZone); diff --git a/tikv-client/src/main/java/com/pingcap/tikv/types/DataType.java b/tikv-client/src/main/java/com/pingcap/tikv/types/DataType.java index bf3648473b..1c7da42887 100644 --- a/tikv-client/src/main/java/com/pingcap/tikv/types/DataType.java +++ b/tikv-client/src/main/java/com/pingcap/tikv/types/DataType.java @@ -15,6 +15,9 @@ package com.pingcap.tikv.types; +import static com.pingcap.tikv.codec.Codec.isNullFlag; +import static java.util.Objects.requireNonNull; + import com.google.common.collect.ImmutableList; import com.pingcap.tidb.tipb.ExprType; import com.pingcap.tikv.codec.Codec; @@ -23,33 +26,29 @@ import com.pingcap.tikv.exception.TypeException; import com.pingcap.tikv.meta.Collation; import com.pingcap.tikv.meta.TiColumnInfo; - +import com.pingcap.tikv.meta.TiColumnInfo.InternalTypeHolder; import java.io.Serializable; -import java.util.Arrays; import java.util.List; -import static com.pingcap.tikv.codec.Codec.isNullFlag; -import static java.util.Objects.requireNonNull; - /** Base Type for encoding and decoding TiDB row information. */ public abstract class DataType implements Serializable { // Flag Information for strict mysql type - protected static final int NotNullFlag = 1; /* Field can't be NULL */ - protected static final int PriKeyFlag = 2; /* Field is part of a primary key */ - protected static final int UniqueKeyFlag = 4; /* Field is part of a unique key */ - protected static final int MultipleKeyFlag = 8; /* Field is part of a key */ - protected static final int BlobFlag = 16; /* Field is a blob */ - protected static final int UnsignedFlag = 32; /* Field is unsigned */ - protected static final int ZerofillFlag = 64; /* Field is zerofill */ - protected static final int BinaryFlag = 128; /* Field is binary */ - protected static final int EnumFlag = 256; /* Field is an enum */ - protected static final int AutoIncrementFlag = 512; /* Field is an auto increment field */ - protected static final int TimestampFlag = 1024; /* Field is a timestamp */ - protected static final int SetFlag = 2048; /* Field is a set */ - protected static final int NoDefaultValueFlag = 4096; /* Field doesn't have a default value */ - protected static final int OnUpdateNowFlag = 8192; /* Field is set to NOW on UPDATE */ - protected static final int NumFlag = 32768; /* Field is a num (for clients) */ + public static final int NotNullFlag = 1; /* Field can't be NULL */ + public static final int PriKeyFlag = 2; /* Field is part of a primary key */ + public static final int UniqueKeyFlag = 4; /* Field is part of a unique key */ + public static final int MultipleKeyFlag = 8; /* Field is part of a key */ + public static final int BlobFlag = 16; /* Field is a blob */ + public static final int UnsignedFlag = 32; /* Field is unsigned */ + public static final int ZerofillFlag = 64; /* Field is zerofill */ + public static final int BinaryFlag = 128; /* Field is binary */ + public static final int EnumFlag = 256; /* Field is an enum */ + public static final int AutoIncrementFlag = 512; /* Field is an auto increment field */ + public static final int TimestampFlag = 1024; /* Field is a timestamp */ + public static final int SetFlag = 2048; /* Field is a set */ + public static final int NoDefaultValueFlag = 4096; /* Field doesn't have a default value */ + public static final int OnUpdateNowFlag = 8192; /* Field is set to NOW on UPDATE */ + public static final int NumFlag = 32768; /* Field is a num (for clients) */ public enum EncodeType { KEY, @@ -90,6 +89,16 @@ protected DataType(MySQLType type) { this.collation = Collation.DEF_COLLATION_CODE; } + protected DataType(MySQLType type, int flag, int len, int decimal, String charset, int collation) { + this.tp = type; + this.flag = flag; + this.elems = ImmutableList.of(); + this.length = len; + this.decimal = decimal; + this.charset = charset; + this.collation = collation; + } + protected abstract Object decodeNotNull(int flag, CodecDataInput cdi); /** @@ -160,19 +169,27 @@ public void encode(CodecDataOutput cdo, EncodeType encodeType, Object value) { * * @param cdo destination of data. * @param value value to be encoded. + * @param type data value type. * @param prefixLength specifies prefix length of value to be encoded. * When prefixLength is DataType.UNSPECIFIED_LEN, * encode full length of value. */ - public void encodeKey(CodecDataOutput cdo, Object value, int prefixLength) { + public void encodeKey(CodecDataOutput cdo, Object value, DataType type, int prefixLength) { requireNonNull(cdo, "cdo is null"); if (value == null) { encodeNull(cdo); } else if (prefixLength == DataType.UNSPECIFIED_LEN) { encodeKey(cdo, value); } else if (isPrefixIndexSupported()) { - byte[] bytes = Converter.convertToBytes(value); - Codec.BytesCodec.writeBytesFully(cdo, Arrays.copyOf(bytes, prefixLength)); + byte[] bytes; + // When charset is utf8/utf8mb4, prefix length should be the number of utf8 characters + // rather than length of its encoded byte value. + if (type.getCharset().equalsIgnoreCase("utf8") || type.getCharset().equalsIgnoreCase("utf8mb4")) { + bytes = Converter.convertUtf8ToBytes(value, prefixLength); + } else { + bytes = Converter.convertToBytes(value, prefixLength); + } + Codec.BytesCodec.writeBytesFully(cdo, bytes); } else { throw new TypeException("Data type can not encode with prefix"); } @@ -323,4 +340,11 @@ public int hashCode() { * (length == 0 ? 1 : length) * (elems.hashCode())); } + + public InternalTypeHolder toTypeHolder() { + return new InternalTypeHolder( + getTypeCode(), flag, length, decimal, + charset, "", "", Collation.translate(collation), elems + ); + } } diff --git a/tikv-client/src/main/java/com/pingcap/tikv/types/DataTypeFactory.java b/tikv-client/src/main/java/com/pingcap/tikv/types/DataTypeFactory.java index d04051f0b0..a069c04225 100644 --- a/tikv-client/src/main/java/com/pingcap/tikv/types/DataTypeFactory.java +++ b/tikv-client/src/main/java/com/pingcap/tikv/types/DataTypeFactory.java @@ -44,7 +44,7 @@ public class DataTypeFactory { extractTypeMap(EnumType.subTypes, EnumType.class, builder, instBuilder); extractTypeMap(SetType.subTypes, SetType.class, builder, instBuilder); extractTypeMap(YearType.subTypes, YearType.class, builder, instBuilder); - extractTypeMap(new MySQLType[]{MySQLType.TypeJSON}, StringType.class, builder, instBuilder); + extractTypeMap(JsonType.subTypes, JsonType.class, builder, instBuilder); dataTypeCreatorMap = builder.build(); dataTypeInstanceMap = instBuilder.build(); } diff --git a/tikv-client/src/main/java/com/pingcap/tikv/types/IntegerType.java b/tikv-client/src/main/java/com/pingcap/tikv/types/IntegerType.java index 4951be41fe..d44d4f8b47 100644 --- a/tikv-client/src/main/java/com/pingcap/tikv/types/IntegerType.java +++ b/tikv-client/src/main/java/com/pingcap/tikv/types/IntegerType.java @@ -24,8 +24,8 @@ import com.pingcap.tikv.codec.CodecDataInput; import com.pingcap.tikv.codec.CodecDataOutput; import com.pingcap.tikv.exception.TypeException; +import com.pingcap.tikv.meta.Collation; import com.pingcap.tikv.meta.TiColumnInfo; - import java.math.BigDecimal; public class IntegerType extends DataType { @@ -36,11 +36,24 @@ public class IntegerType extends DataType { public static final IntegerType BIGINT = new IntegerType(MySQLType.TypeLonglong); public static final IntegerType BOOLEAN = TINYINT; + public static final IntegerType ROW_ID_TYPE = new IntegerType( + MySQLType.TypeLonglong, + PriKeyFlag, + 20, + 0 + ); + + + public static final MySQLType[] subTypes = new MySQLType[] { MySQLType.TypeTiny, MySQLType.TypeShort, MySQLType.TypeInt24, MySQLType.TypeLong, MySQLType.TypeLonglong }; + protected IntegerType(MySQLType type, int flag, int len, int decimal) { + super(type, flag, len, decimal, "", Collation.DEF_COLLATION_CODE); + } + protected IntegerType(MySQLType tp) { super(tp); } diff --git a/tikv-client/src/main/java/com/pingcap/tikv/types/JsonType.java b/tikv-client/src/main/java/com/pingcap/tikv/types/JsonType.java new file mode 100644 index 0000000000..d011ede5b0 --- /dev/null +++ b/tikv-client/src/main/java/com/pingcap/tikv/types/JsonType.java @@ -0,0 +1,394 @@ +package com.pingcap.tikv.types; + +import com.google.common.io.ByteArrayDataInput; +import com.google.common.io.ByteStreams; +import com.google.gson.*; +import com.pingcap.tidb.tipb.ExprType; +import com.pingcap.tikv.codec.CodecDataInput; +import com.pingcap.tikv.codec.CodecDataOutput; +import com.pingcap.tikv.meta.TiColumnInfo; +import org.apache.commons.io.Charsets; + +import javax.annotation.Nullable; +import java.io.DataInput; +import java.io.IOException; +import java.io.UncheckedIOException; + +public class JsonType extends DataType { + + private static final int KEY_ENTRY_LENGTH = 6; + private static final int VALUE_ENTRY_SIZE = 5; + + // TypeCodeObject indicates the JSON is an object. + private static final byte TYPE_CODE_OBJECT = 0x01; + // TypeCodeArray indicates the JSON is an array. + private static final byte TYPE_CODE_ARRAY = 0x03; + // TypeCodeLiteral indicates the JSON is a literal. + private static final byte TYPE_CODE_LITERAL = 0x04; + // TypeCodeInt64 indicates the JSON is a signed integer. + private static final byte TYPE_CODE_INT64 = 0x09; + // TypeCodeUint64 indicates the JSON is a unsigned integer. + private static final byte TYPE_CODE_UINT64 = 0x0a; + // TypeCodeFloat64 indicates the JSON is a double float number. + private static final byte TYPE_CODE_FLOAT64 = 0x0b; + // TypeCodeString indicates the JSON is a string. + private static final byte TYPE_CODE_STRING = 0x0c; + + // LiteralNil represents JSON null. + private static final byte LITERAL_NIL = 0x00; + // LiteralTrue represents JSON true. + private static final byte LITERAL_TRUE = 0x01; + // LiteralFalse represents JSON false. + private static final byte LITERAL_FALSE = 0x02; + private static final JsonPrimitive JSON_FALSE = new JsonPrimitive(false); + private static final JsonPrimitive JSON_TRUE = new JsonPrimitive(true); + public static MySQLType[] subTypes = new MySQLType[]{ + MySQLType.TypeJSON + }; + + + protected JsonType(TiColumnInfo.InternalTypeHolder holder) { + super(holder); + } + + public JsonType(MySQLType type) { + super(type); + } + + + public JsonType(MySQLType type, int flag, int len, int decimal, String charset, int collation) { + super(type, flag, len, decimal, charset, collation); + } + + + @Override + protected Object decodeNotNull(int flag, CodecDataInput cdi) { + byte type = readByte(cdi); + return parseValue(type, cdi).toString(); + } + + /* + The binary JSON format from MySQL 5.7 is as follows: + JSON doc ::= type value + type ::= + 0x01 | // large JSON object + 0x03 | // large JSON array + 0x04 | // literal (true/false/null) + 0x05 | // int16 + 0x06 | // uint16 + 0x07 | // int32 + 0x08 | // uint32 + 0x09 | // int64 + 0x0a | // uint64 + 0x0b | // double + 0x0c | // utf8mb4 string + value ::= + object | + array | + literal | + number | + string | + object ::= element-count size key-entry* value-entry* key* value* + array ::= element-count size value-entry* value* + // number of members in object or number of elements in array + element-count ::= uint32 + // number of bytes in the binary representation of the object or array + size ::= uint32 + key-entry ::= key-offset key-length + key-offset ::= uint32 + key-length ::= uint16 // key length must be less than 64KB + value-entry ::= type offset-or-inlined-value + // This field holds either the offset to where the value is stored, + // or the value itself if it is small enough to be inlined (that is, + // if it is a JSON literal or a small enough [u]int). + offset-or-inlined-value ::= uint32 + key ::= utf8mb4-data + literal ::= + 0x00 | // JSON null literal + 0x01 | // JSON true literal + 0x02 | // JSON false literal + number ::= .... // little-endian format for [u]int(16|32|64), whereas + // double is stored in a platform-independent, eight-byte + // format using float8store() + string ::= data-length utf8mb4-data + data-length ::= uint8* // If the high bit of a byte is 1, the length + // field is continued in the next byte, + // otherwise it is the last byte of the length + // field. So we need 1 byte to represent + // lengths up to 127, 2 bytes to represent + // lengths up to 16383, and so on... +*/ + private JsonElement parseValue(byte type, DataInput di) { + switch (type) { + case TYPE_CODE_OBJECT: + return parseObject(di); + case TYPE_CODE_ARRAY: + return parseArray(di); + case TYPE_CODE_LITERAL: + return parseLiteralJson(di); + case TYPE_CODE_INT64: + return new JsonPrimitive(parseInt64(di)); + case TYPE_CODE_UINT64: + return new JsonPrimitive(parseUint64(di)); + case TYPE_CODE_FLOAT64: + return new JsonPrimitive(parseDouble(di)); + case TYPE_CODE_STRING: + long length = parseDataLength(di); + return new JsonPrimitive(parseString(di, length)); + default: + throw new AssertionError("error type|type=" + (int) type); + } + } + + // * notice use this as a unsigned long + private long parseUint64(DataInput cdi) { + byte[] readBuffer = new byte[8]; + readFully(cdi, readBuffer, 0, 8); + + return ((long)(readBuffer[7]) << 56) + + ((long)(readBuffer[6] & 255) << 48) + + ((long)(readBuffer[5] & 255) << 40) + + ((long)(readBuffer[4] & 255) << 32) + + ((long)(readBuffer[3] & 255) << 24) + + ((readBuffer[2] & 255) << 16) + + ((readBuffer[1] & 255) << 8) + + ((readBuffer[0] & 255) << 0); + } + + private void readFully(DataInput cdi, byte[] readBuffer, final int off, final int len) { + try { + cdi.readFully(readBuffer, off, len); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + + private long parseInt64(DataInput cdi) { + byte[] readBuffer = new byte[8]; + readFully(cdi, readBuffer); + return ((long)readBuffer[7] << 56) + + ((long)(readBuffer[6] & 255) << 48) + + ((long)(readBuffer[5] & 255) << 40) + + ((long)(readBuffer[4] & 255) << 32) + + ((long)(readBuffer[3] & 255) << 24) + + ((readBuffer[2] & 255) << 16) + + ((readBuffer[1] & 255) << 8) + + ((readBuffer[0] & 255) << 0); + } + + private long parseUint32(DataInput cdi) { + byte[] readBuffer = new byte[4]; + readFully(cdi, readBuffer); + + return + ((long)(readBuffer[3] & 255) << 24) + + ((readBuffer[2] & 255) << 16) + + ((readBuffer[1] & 255) << 8) + + ((readBuffer[0] & 255) << 0) + ; + } + + private double parseDouble(DataInput cdi) { + byte[] readBuffer = new byte[8]; + readFully(cdi, readBuffer); + return Double.longBitsToDouble(((long) readBuffer[7] << 56) + + ((long) (readBuffer[6] & 255) << 48) + + ((long) (readBuffer[5] & 255) << 40) + + ((long) (readBuffer[4] & 255) << 32) + + ((long) (readBuffer[3] & 255) << 24) + + ((readBuffer[2] & 255) << 16) + + ((readBuffer[1] & 255) << 8) + + ((readBuffer[0] & 255) << 0)); + } + + private int parseUint16(DataInput cdi) { + byte[] readBuffer = new byte[2]; + readFully(cdi, readBuffer); + + return + ((readBuffer[1] & 255) << 8) + + ((readBuffer[0] & 255) << 0) + ; + } + + private String parseString(DataInput di, long length) { + + + byte[] buffer = new byte[Math.toIntExact(length)]; + readFully(di, buffer); + return new String(buffer, Charsets.UTF_8); + } + + /** + * func Uvarint(buf []byte) (uint64, int) { + * var x uint64 + * var s uint + * for i, b := range buf { + * if b < 0x80 { + * if i > 9 || i == 9 && b > 1 { + * return 0, -(i + 1) // overflow + * } + * return x | uint64(b)<=9 leading bytes"); + } + x |= ((long)(b & 0x7f)) << s; + s += 7; + i++; + } + + if (i == 9 && b > 1) { + throw new IllegalArgumentException("overflow: 8 leading byte and last one > 1"); + } + x |= ((long)b) << s; + return x; + } + + private @Nullable Boolean parseLiteral(DataInput cdi) { + byte type; + type = readByte(cdi); + switch (type) { + case LITERAL_FALSE: + return Boolean.FALSE; + case LITERAL_NIL: + return null; + case LITERAL_TRUE: + return Boolean.TRUE; + default: + throw new AssertionError("unknown literal type|" + (int) type); + } + } + + private byte readByte(DataInput cdi) { + byte type; + try { + type = cdi.readByte(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + return type; + } + + private JsonArray parseArray(DataInput di) { + long elementCount = parseUint32(di); + long size = parseUint32(di); + byte[] buffer = new byte[Math.toIntExact(size-8)]; + readFully(di, buffer); + JsonArray jsonArray = new JsonArray(); + for (int i = 0; i < elementCount; i++) { + JsonElement value = parseValueEntry(buffer, VALUE_ENTRY_SIZE * i); + jsonArray.add(value); + } + return jsonArray; + } + + private JsonObject parseObject(DataInput di) { + long elementCount = parseUint32(di); + long size = parseUint32(di); + + byte[] buffer = new byte[Math.toIntExact(size - 8)]; + readFully(di, buffer); + JsonObject jsonObject = new JsonObject(); + for (int i = 0; i < elementCount; i++) { + KeyEntry keyEntry = parseKeyEntry(ByteStreams.newDataInput(buffer, i * KEY_ENTRY_LENGTH)); + String key = parseString(ByteStreams.newDataInput(buffer, Math.toIntExact(keyEntry.keyOffset - 8)), keyEntry.keyLength); + long valueEntryOffset = elementCount * KEY_ENTRY_LENGTH + i * VALUE_ENTRY_SIZE; + JsonElement value = parseValueEntry(buffer, valueEntryOffset); + jsonObject.add(key, value); + } + return jsonObject; + } + + private JsonElement parseValueEntry(byte[] buffer, long valueEntryOffset) { + byte valueType = buffer[Math.toIntExact(valueEntryOffset)]; + JsonElement value; + ByteArrayDataInput bs = ByteStreams.newDataInput(buffer, Math.toIntExact(valueEntryOffset + 1)); + switch (valueType) { + case TYPE_CODE_LITERAL: + value = parseLiteralJson(bs); + break; + default: + long valueOffset = parseUint32(bs); + value = parseValue(valueType, ByteStreams.newDataInput(buffer, Math.toIntExact(valueOffset - 8))); + } return value; + } + + private JsonElement parseLiteralJson(DataInput di) { + JsonElement value; + Boolean bool = parseLiteral(di); + if (bool == null) { + value = JsonNull.INSTANCE; + } else if (bool) { + value = JSON_TRUE; + } else { + value = JSON_FALSE; + } + return value; + } + + private KeyEntry parseKeyEntry(DataInput di) { + return new KeyEntry( + parseUint32(di), + parseUint16(di) + ); + } + + static class KeyEntry { + long keyOffset; + int keyLength; + + public KeyEntry(long keyOffset, int keyLength) { + this.keyOffset = keyOffset; + this.keyLength = keyLength; + } + } + + + private void readFully(DataInput di, byte[] buffer) { + try { + di.readFully(buffer); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + protected void encodeKey(CodecDataOutput cdo, Object value) { + throw new UnsupportedOperationException("JsonType.encodeKey|value=" + value); + } + + @Override + protected void encodeValue(CodecDataOutput cdo, Object value) { + throw new UnsupportedOperationException("JsonType.encodeValue|value=" + value); + } + + @Override + protected void encodeProto(CodecDataOutput cdo, Object value) { + throw new UnsupportedOperationException("JsonType.encodeProto|value=" + value); + } + + @Override + public ExprType getProtoExprType() { + return ExprType.MysqlJson; + } + + @Override + public Object getOriginDefaultValueNonNull(String value) { + throw new AssertionError("json can't have a default value"); + } +} diff --git a/tikv-client/src/test/java/com/pingcap/tikv/codec/MyDecimalTest.java b/tikv-client/src/test/java/com/pingcap/tikv/codec/MyDecimalTest.java index dc6a208c42..e5dff67cfb 100644 --- a/tikv-client/src/test/java/com/pingcap/tikv/codec/MyDecimalTest.java +++ b/tikv-client/src/test/java/com/pingcap/tikv/codec/MyDecimalTest.java @@ -25,50 +25,85 @@ import org.junit.Test; public class MyDecimalTest { + @Test + public void toLongTest() { + List tests = new ArrayList<>(); + tests.add(new MyDecimalTestStruct("-9223372036854775807", "-9223372036854775807")); + tests.add(new MyDecimalTestStruct("-9223372036854775808", "-9223372036854775808")); + tests.add(new MyDecimalTestStruct("9223372036854775808", "9223372036854775807")); + tests.add(new MyDecimalTestStruct("-9223372036854775809", "-9223372036854775808")); + tests.add(new MyDecimalTestStruct("18446744073709551615", "9223372036854775807")); + + for(MyDecimalTestStruct t : tests) { + MyDecimal dec = new MyDecimal(); + dec.fromString(t.in); + String result = dec.toLong() + ""; + assertEquals(t.out, result); + } + } + @Test public void fromStringTest() throws Exception { - List test = new ArrayList<>(); - test.add(new MyDecimalTestStruct("12345", "12345", 5, 0)); - test.add(new MyDecimalTestStruct("12345.", "12345", 5, 0)); - test.add(new MyDecimalTestStruct("123.45", "123.45", 5, 2)); - test.add(new MyDecimalTestStruct("-123.45", "-123.45", 5, 2)); - test.add(new MyDecimalTestStruct(".00012345000098765", "0.00012345000098765", 17, 17)); - test.add(new MyDecimalTestStruct(".12345000098765", "0.12345000098765", 14, 14)); - test.add( + List tests = new ArrayList<>(); + tests.add(new MyDecimalTestStruct("1111111111111111111111111.111111111111111111111111111111", + "1111111111111111111111111.111111111111111111111111111111", 65, 30)); + tests.add(new MyDecimalTestStruct("12345", "12345", 5, 0)); + tests.add(new MyDecimalTestStruct("12345.", "12345", 5, 0)); + tests.add(new MyDecimalTestStruct("123.45", "123.45", 5, 2)); + tests.add(new MyDecimalTestStruct("-123.45", "-123.45", 5, 2)); + tests.add(new MyDecimalTestStruct(".00012345000098765", "0.00012345000098765", 17, 17)); + tests.add(new MyDecimalTestStruct(".12345000098765", "0.12345000098765", 14, 14)); + tests.add( new MyDecimalTestStruct("-.000000012345000098765", "-0.000000012345000098765", 21, 21)); - test.add(new MyDecimalTestStruct("0000000.001", "0.001", 3, 3)); - test.add(new MyDecimalTestStruct("1234500009876.5", "1234500009876.5", 14, 1)); - test.forEach( - (a) -> { - MyDecimal dec = new MyDecimal(); - dec.fromString(a.in); - assertEquals(a.precision, dec.precision()); - assertEquals(a.frac, dec.frac()); - assertEquals(a.out, dec.toString()); - }); + tests.add( + new MyDecimalTestStruct("-.000000012345000098765", "-0.000000012345000098765", 2, 21)); + tests.add( + new MyDecimalTestStruct("-.000000012345000098765", "-0.000000012345000098765", 21, 2)); + tests.add(new MyDecimalTestStruct("0000000.001", "0.001", 3, 3)); + tests.add(new MyDecimalTestStruct("1234500009876.5", "1234500009876.5", 14, 1)); + for(MyDecimalTestStruct t : tests) { + MyDecimal dec = new MyDecimal(); + dec.fromString(t.in); + assertEquals(t.out, dec.toString()); + } } @Test - public void toBinToBinFromBinTest() throws Exception { - List test = new ArrayList<>(); - test.add(new MyDecimalTestStruct("-10.55", "-10.55", 4, 2)); - test.add(new MyDecimalTestStruct("12345", "12345", 5, 0)); - test.add(new MyDecimalTestStruct("-12345", "-12345", 5, 0)); - test.add(new MyDecimalTestStruct("0000000.001", "0.001", 3, 3)); - test.add(new MyDecimalTestStruct("0.00012345000098765", "0.00012345000098765", 17, 17)); - test.add(new MyDecimalTestStruct("-0.00012345000098765", "-0.00012345000098765", 17, 17)); - test.forEach( - (a) -> { - MyDecimal dec = new MyDecimal(); - dec.fromString(a.in); - assertEquals(a.out, dec.toString()); - int[] bin = dec.toBin(dec.precision(), dec.frac()); - dec.clear(); - dec.fromBin(a.precision, a.frac, bin); - assertEquals(a.precision, dec.precision()); - assertEquals(a.frac, dec.frac()); - assertEquals(a.out, dec.toString()); - }); + public void readWordTest() throws Exception { + assertEquals(MyDecimal.readWord(new int[]{250}, 1, 0), -6); + assertEquals(MyDecimal.readWord(new int[]{50}, 1, 0), 50); + + assertEquals(MyDecimal.readWord(new int[]{250, 250}, 2, 0), -1286); + assertEquals(MyDecimal.readWord(new int[]{50, 50}, 2, 0), 12850); + + assertEquals(MyDecimal.readWord(new int[]{250, 250, 250}, 3, 0), -328966); + assertEquals(MyDecimal.readWord(new int[]{50, 50, 50}, 3, 0), 3289650); + + assertEquals(MyDecimal.readWord(new int[]{250, 250, 250, 250}, 4, 0), -84215046); + assertEquals(MyDecimal.readWord(new int[]{50, 50, 50, 50}, 4, 0), 842150450); + } + + @Test + public void toBinFromBinTest() throws Exception { + List tests = new ArrayList<>(); + String decValStr = "11111111111111111111111111111111111.111111111111111111111111111111"; + tests.add(new MyDecimalTestStruct(decValStr, decValStr, 65, 30)); + tests.add(new MyDecimalTestStruct("12345000098765", "12345000098765", 14, 0)); + tests.add(new MyDecimalTestStruct("-10.55", "-10.55", 4, 2)); + tests.add(new MyDecimalTestStruct("12345", "12345", 5, 0)); + tests.add(new MyDecimalTestStruct("-12345", "-12345", 5, 0)); + tests.add(new MyDecimalTestStruct("0000000.001", "0.001", 3, 3)); + tests.add(new MyDecimalTestStruct("0.00012345000098765", "0.00012345000098765", 17, 17)); + tests.add(new MyDecimalTestStruct("-0.00012345000098765", "-0.00012345000098765", 17, 17)); + for (MyDecimalTestStruct a : tests) { + MyDecimal dec = new MyDecimal(); + dec.fromString(a.in); + assertEquals(a.out, dec.toString()); + int[] bin = dec.toBin(a.precision, a.frac); + dec.clear(); + dec.fromBin(a.precision, a.frac, bin); + assertEquals(a.out, dec.toString()); + } } @Test @@ -80,7 +115,6 @@ public void toBinTest() throws Exception { new int[] { 0x7E, 0xF2, 0x04, 0xC7, 0x2D, 0xFB, 0x2D, }; - // something wrong with toBin and fromBin assertArrayEquals(expected, data); } @@ -91,6 +125,11 @@ private class MyDecimalTestStruct { int precision; int frac; + + MyDecimalTestStruct(String in, String out) { + this.in = in; + this.out = out; + } MyDecimalTestStruct(String in, String out, int precision, int frac) { this.in = in; this.out = out;