Skip to content

Commit

Permalink
Allow TiSpark retrieve row id (#367)
Browse files Browse the repository at this point in the history
  • Loading branch information
ilovesoup authored and birdstorm committed Aug 24, 2018
1 parent 4e4b4ad commit 85c3ed6
Show file tree
Hide file tree
Showing 12 changed files with 253 additions and 58 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ Below configurations can be put together with spark-defaults.conf or passed in t
| spark.tispark.plan.downgrade.index_threshold | 10000 | If index scan ranges on one region exceeds this limit in original request, downgrade this region's request to table scan rather than original planned index scan |
| spark.tispark.type.unsupported_mysql_types | "time,enum,set,year,json" | A comma separated list of mysql types TiSpark does not support currently, refer to `Unsupported MySQL Type List` below |
| spark.tispark.request.timezone.offset | Local Timezone offset | An integer, represents timezone offset to UTC time(like 28800, GMT+8), this value will be added to requests issued to TiKV |
| spark.tispark.show_rowid | Show implicit row Id | If to show implicit row Id if exists |

## Unsupported MySQL Type List

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,5 @@ object TiConfigConst {
val UNSUPPORTED_TYPES: String = "spark.tispark.type.unsupported_mysql_types"
val ENABLE_AUTO_LOAD_STATISTICS: String = "spark.tispark.statistics.auto_load"
val CACHE_EXPIRE_AFTER_ACCESS: String = "spark.tispark.statistics.expire_after_access"
val SHOW_ROWID: String = "spark.tispark.show_rowid"
}
4 changes: 4 additions & 0 deletions core/src/main/scala/com/pingcap/tispark/TiUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,10 @@ object TiUtils {
val priority = CommandPri.valueOf(conf.get(TiConfigConst.REQUEST_COMMAND_PRIORITY))
tiConf.setCommandPriority(priority)
}

if (conf.contains(TiConfigConst.SHOW_ROWID)) {
tiConf.setShowRowId(conf.get(TiConfigConst.SHOW_ROWID).toBoolean)
}
tiConf
}

Expand Down
18 changes: 14 additions & 4 deletions core/src/main/scala/org/apache/spark/sql/TiContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -129,15 +129,25 @@ class TiContext(val session: SparkSession) extends Serializable with Logging {
}
}

// tidbMapTable does not do any check any meta information
// it just register table for later use
def tidbMapTable(dbName: String, tableName: String): Unit = {
def getDataFrame(dbName: String, tableName: String): DataFrame = {
val tiRelation = new TiDBRelation(
tiSession,
new TiTableReference(dbName, tableName),
meta
)(sqlContext)
sqlContext.baseRelationToDataFrame(tiRelation).createTempView(tableName)
sqlContext.baseRelationToDataFrame(tiRelation)
}

// tidbMapTable does not do any check any meta information
// it just register table for later use
def tidbMapTable(dbName: String, tableName: String): DataFrame = {
val df = getDataFrame(dbName, tableName)
df.createOrReplaceTempView(tableName)
df
}

def tidbMapDatabase(dbName: String, dbNameAsPrefix: Boolean): Unit = {
tidbMapDatabase(dbName, dbNameAsPrefix, autoLoad)
}

def tidbMapDatabase(dbName: String,
Expand Down
10 changes: 10 additions & 0 deletions tikv-client/src/main/java/com/pingcap/tikv/TiConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class TiConfiguration implements Serializable {
private static final int DEF_TABLE_SCAN_CONCURRENCY = 512;
private static final CommandPri DEF_COMMAND_PRIORITY = CommandPri.Low;
private static final IsolationLevel DEF_ISOLATION_LEVEL = IsolationLevel.RC;
private static final boolean DEF_SHOW_ROWID = false;

private int timeout = DEF_TIMEOUT;
private TimeUnit timeoutUnit = DEF_TIMEOUT_UNIT;
Expand All @@ -58,6 +59,7 @@ public class TiConfiguration implements Serializable {
private CommandPri commandPriority = DEF_COMMAND_PRIORITY;
private IsolationLevel isolationLevel = DEF_ISOLATION_LEVEL;
private int maxRequestKeyRangeSize = MAX_REQUEST_KEY_RANGE_SIZE;
private boolean showRowId = DEF_SHOW_ROWID;

public static TiConfiguration createDefault(String pdAddrsStr) {
Objects.requireNonNull(pdAddrsStr, "pdAddrsStr is null");
Expand Down Expand Up @@ -197,4 +199,12 @@ public void setMaxRequestKeyRangeSize(int maxRequestKeyRangeSize) {
}
this.maxRequestKeyRangeSize = maxRequestKeyRangeSize;
}

public void setShowRowId(boolean flag) {
this.showRowId = flag;
}

public boolean ifShowRowId() {
return showRowId;
}
}
4 changes: 3 additions & 1 deletion tikv-client/src/main/java/com/pingcap/tikv/TiSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,9 @@ public Catalog getCatalog() {
if (catalog == null) {
catalog = new Catalog(() -> createSnapshot(),
conf.getMetaReloadPeriod(),
conf.getMetaReloadPeriodUnit());
conf.getMetaReloadPeriodUnit(),
conf.ifShowRowId()
);
}
res = catalog;
}
Expand Down
48 changes: 35 additions & 13 deletions tikv-client/src/main/java/com/pingcap/tikv/catalog/Catalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,31 @@

package com.pingcap.tikv.catalog;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.pingcap.tikv.Snapshot;
import com.pingcap.tikv.meta.TiDBInfo;
import com.pingcap.tikv.meta.TiTableInfo;
import org.apache.log4j.Logger;

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.log4j.Logger;

public class Catalog implements AutoCloseable {
private Supplier<Snapshot> snapshotProvider;
private ScheduledExecutorService service;
private CatalogCache metaCache;
private final boolean showRowId;
private final Logger logger = Logger.getLogger(this.getClass());

@Override
Expand Down Expand Up @@ -110,9 +115,14 @@ private Map<String, TiDBInfo> loadDatabases() {
}
}

public Catalog(Supplier<Snapshot> snapshotProvider, int refreshPeriod, TimeUnit periodUnit) {
public Catalog(
Supplier<Snapshot> snapshotProvider,
int refreshPeriod,
TimeUnit periodUnit,
boolean showRowId) {
this.snapshotProvider = Objects.requireNonNull(snapshotProvider,
"Snapshot Provider is null");
this.showRowId = showRowId;
metaCache = new CatalogCache(new CatalogTransaction(snapshotProvider.get()));
service = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setDaemon(true).build());
service.scheduleAtFixedRate(() -> {
Expand All @@ -125,12 +135,6 @@ public Catalog(Supplier<Snapshot> snapshotProvider, int refreshPeriod, TimeUnit
}, refreshPeriod, refreshPeriod, periodUnit);
}

public Catalog(Supplier<Snapshot> snapshotProvider) {
this.snapshotProvider = Objects.requireNonNull(snapshotProvider,
"Snapshot Provider is null");
metaCache = new CatalogCache(new CatalogTransaction(snapshotProvider.get()));
}

public void reloadCache() {
Snapshot snapshot = snapshotProvider.get();
CatalogTransaction newTrx = new CatalogTransaction(snapshot);
Expand All @@ -146,7 +150,15 @@ public List<TiDBInfo> listDatabases() {

public List<TiTableInfo> listTables(TiDBInfo database) {
Objects.requireNonNull(database, "database is null");
return metaCache.listTables(database);
if (showRowId) {
return metaCache
.listTables(database)
.stream()
.map(table -> table.copyTableWithRowId())
.collect(Collectors.toList());
} else {
return metaCache.listTables(database);
}
}

public TiDBInfo getDatabase(String dbName) {
Expand All @@ -165,15 +177,25 @@ public TiTableInfo getTable(String dbName, String tableName) {
public TiTableInfo getTable(TiDBInfo database, String tableName) {
Objects.requireNonNull(database, "database is null");
Objects.requireNonNull(tableName, "tableName is null");
return metaCache.getTable(database, tableName);
TiTableInfo table = metaCache.getTable(database, tableName);
if (showRowId) {
return table.copyTableWithRowId();
} else {
return table;
}
}

@VisibleForTesting
public TiTableInfo getTable(TiDBInfo database, long tableId) {
Objects.requireNonNull(database, "database is null");
Collection<TiTableInfo> tables = listTables(database);
for (TiTableInfo table : tables) {
if (table.getId() == tableId) {
return table;
if (showRowId) {
return table.copyTableWithRowId();
} else {
return table;
}
}
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public static int translate(String collation) {
return code;
}

static String translate(int code) {
public static String translate(int code) {
String collation = collationCodeMap.get(code);
if (collation == null) {
return "";
Expand Down
117 changes: 99 additions & 18 deletions tikv-client/src/main/java/com/pingcap/tikv/meta/TiColumnInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

package com.pingcap.tikv.meta;

import static java.util.Objects.requireNonNull;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
Expand All @@ -25,13 +27,11 @@
import com.pingcap.tikv.types.DataType;
import com.pingcap.tikv.types.DataType.EncodeType;
import com.pingcap.tikv.types.DataTypeFactory;

import com.pingcap.tikv.types.IntegerType;
import java.io.Serializable;
import java.util.List;
import java.util.Objects;

import static java.util.Objects.requireNonNull;

@JsonIgnoreProperties(ignoreUnknown = true)
public class TiColumnInfo implements Serializable {
private final long id;
Expand All @@ -44,6 +44,10 @@ public class TiColumnInfo implements Serializable {
private final String defaultValue;
private final String originDefaultValue;

public static TiColumnInfo getRowIdColumn(int offset) {
return new TiColumnInfo(-1, "_tidb_rowid", offset, IntegerType.ROW_ID_TYPE, true);
}

@VisibleForTesting
private static final int PK_MASK = 0x2;

Expand All @@ -70,6 +74,42 @@ public TiColumnInfo(
this.isPrimaryKey = (type.getFlag() & PK_MASK) > 0;
}

public TiColumnInfo(
long id,
String name,
int offset,
DataType type,
SchemaState schemaState,
String originalDefaultValue,
String defaultValue,
String comment) {
this.id = id;
this.name = requireNonNull(name, "column name is null").toLowerCase();
this.offset = offset;
this.type = requireNonNull(type, "data type is null");
this.schemaState = schemaState;
this.comment = comment;
this.defaultValue = defaultValue;
this.originDefaultValue = originalDefaultValue;
this.isPrimaryKey = (type.getFlag() & PK_MASK) > 0;
}

public TiColumnInfo copyWithoutPrimaryKey() {
InternalTypeHolder typeHolder = type.toTypeHolder();
typeHolder.setFlag(type.getFlag() & (~TiColumnInfo.PK_MASK));
DataType newType = DataTypeFactory.of(typeHolder);
return new TiColumnInfo(
this.id,
this.name,
this.offset,
newType,
this.schemaState,
this.originDefaultValue,
this.defaultValue,
this.comment
);
}

@VisibleForTesting
public TiColumnInfo(long id, String name, int offset, DataType type, boolean isPrimaryKey) {
this.id = id;
Expand Down Expand Up @@ -119,30 +159,62 @@ public String getDefaultValue() {
return defaultValue;
}

public ByteString getOriginDefaultValue() {
public String getOriginDefaultValue() {
return originDefaultValue;
}

public ByteString getOriginDefaultValueAsByteString() {
CodecDataOutput cdo = new CodecDataOutput();
type.encode(cdo, EncodeType.VALUE, type.getOriginDefaultValue(originDefaultValue));
return cdo.toByteString();
}

@JsonIgnoreProperties(ignoreUnknown = true)
public static class InternalTypeHolder {
private final int tp;
private final int flag;
private final long flen;
private final int decimal;
private final String charset;
private final String collate;
private final String defaultValue;
private final String originDefaultValue;
private final List<String> elems;
private int tp;
private int flag;
private long flen;
private int decimal;
private String charset;
private String collate;
private String defaultValue;
private String originDefaultValue;
private List<String> elems;

public void setTp(int tp) {
this.tp = tp;
}

public String getDefaultValue() {
return defaultValue;
public void setFlag(int flag) {
this.flag = flag;
}

public String getOriginDefaultValue() {
return originDefaultValue;
public void setFlen(long flen) {
this.flen = flen;
}

public void setDecimal(int decimal) {
this.decimal = decimal;
}

public void setCharset(String charset) {
this.charset = charset;
}

public void setCollate(String collate) {
this.collate = collate;
}

public void setDefaultValue(String defaultValue) {
this.defaultValue = defaultValue;
}

public void setOriginDefaultValue(String originDefaultValue) {
this.originDefaultValue = originDefaultValue;
}

public void setElems(List<String> elems) {
this.elems = elems;
}

interface Builder<E extends DataType> {
Expand Down Expand Up @@ -211,6 +283,15 @@ public String getCollate() {
public List<String> getElems() {
return elems;
}

public String getDefaultValue() {
return defaultValue;
}

public String getOriginDefaultValue() {
return originDefaultValue;
}

}

TiIndexColumn toFakeIndexColumn() {
Expand All @@ -235,7 +316,7 @@ ColumnInfo.Builder toProtoBuilder(TiTableInfo table) {
.setColumnLen((int) type.getLength())
.setDecimal(type.getDecimal())
.setFlag(type.getFlag())
.setDefaultVal(getOriginDefaultValue())
.setDefaultVal(getOriginDefaultValueAsByteString())
.setPkHandle(table.isPkHandle() && isPrimaryKey())
.addAllElems(type.getElems());
}
Expand Down
Loading

0 comments on commit 85c3ed6

Please sign in to comment.