-
Notifications
You must be signed in to change notification settings - Fork 4.1k
/
AbstractJdbcCompatibleSourceOperations.java
308 lines (261 loc) · 13.7 KB
/
AbstractJdbcCompatibleSourceOperations.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.db.jdbc;
import static io.airbyte.db.DataTypeUtils.TIMESTAMPTZ_FORMATTER;
import static io.airbyte.db.DataTypeUtils.TIMETZ_FORMATTER;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.DataTypeUtils;
import io.airbyte.db.JdbcCompatibleSourceOperations;
import java.math.BigDecimal;
import java.sql.Date;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.text.ParseException;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.OffsetTime;
import java.time.chrono.IsoEra;
import java.time.format.DateTimeParseException;
import java.util.Collections;
import javax.xml.bind.DatatypeConverter;
/**
* Source operation skeleton for JDBC compatible databases.
*/
public abstract class AbstractJdbcCompatibleSourceOperations<Datatype> implements JdbcCompatibleSourceOperations<Datatype> {
/**
* A Date representing the earliest date in CE. Any date before this is in BCE.
*/
private static final Date ONE_CE = Date.valueOf("0001-01-01");
@Override
public JsonNode rowToJson(final ResultSet queryContext) throws SQLException {
// the first call communicates with the database. after that the result is cached.
final int columnCount = queryContext.getMetaData().getColumnCount();
final ObjectNode jsonNode = (ObjectNode) Jsons.jsonNode(Collections.emptyMap());
for (int i = 1; i <= columnCount; i++) {
// attempt to access the column. this allows us to know if it is null before we do type-specific
// parsing. if it is null, we can move on. while awkward, this seems to be the agreed upon way of
// checking for null values with jdbc.
queryContext.getObject(i);
if (queryContext.wasNull()) {
continue;
}
// convert to java types that will convert into reasonable json.
copyToJsonField(queryContext, i, jsonNode);
}
return jsonNode;
}
protected void putArray(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
final ArrayNode arrayNode = new ObjectMapper().createArrayNode();
final ResultSet arrayResultSet = resultSet.getArray(index).getResultSet();
while (arrayResultSet.next()) {
arrayNode.add(arrayResultSet.getString(2));
}
node.set(columnName, arrayNode);
}
protected void putBoolean(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
node.put(columnName, resultSet.getBoolean(index));
}
/**
* In some sources Short might have value larger than {@link Short#MAX_VALUE}. E.q. MySQL has
* unsigned smallint type, which can contain value 65535. If we fail to cast Short value, we will
* try to cast Integer.
*/
protected void putShortInt(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
try {
node.put(columnName, resultSet.getShort(index));
} catch (final SQLException e) {
node.put(columnName, DataTypeUtils.returnNullIfInvalid(() -> resultSet.getInt(index)));
}
}
/**
* In some sources Integer might have value larger than {@link Integer#MAX_VALUE}. E.q. MySQL has
* unsigned Integer type, which can contain value 3428724653. If we fail to cast Integer value, we
* will try to cast Long.
*/
protected void putInteger(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
try {
node.put(columnName, resultSet.getInt(index));
} catch (final SQLException e) {
node.put(columnName, DataTypeUtils.returnNullIfInvalid(() -> resultSet.getLong(index)));
}
}
protected void putBigInt(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
node.put(columnName, DataTypeUtils.returnNullIfInvalid(() -> resultSet.getLong(index)));
}
protected void putDouble(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
node.put(columnName, DataTypeUtils.returnNullIfInvalid(() -> resultSet.getDouble(index), Double::isFinite));
}
protected void putFloat(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
node.put(columnName, DataTypeUtils.returnNullIfInvalid(() -> resultSet.getFloat(index), Float::isFinite));
}
protected void putBigDecimal(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
node.put(columnName, DataTypeUtils.returnNullIfInvalid(() -> resultSet.getBigDecimal(index)));
}
protected void putString(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
node.put(columnName, resultSet.getString(index));
}
protected void putDate(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
node.put(columnName, DateTimeConverter.convertToDate(getObject(resultSet, index, LocalDate.class)));
}
protected void putTime(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
node.put(columnName, DateTimeConverter.convertToTime(getObject(resultSet, index, LocalTime.class)));
}
protected void putTimestamp(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
// https://www.cis.upenn.edu/~bcpierce/courses/629/jdkdocs/guide/jdbc/getstart/mapping.doc.html
final Instant instant = resultSet.getTimestamp(index).toInstant();
node.put(columnName, DataTypeUtils.toISO8601StringWithMicroseconds(instant));
}
protected void putBinary(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
node.put(columnName, resultSet.getBytes(index));
}
protected void putDefault(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
node.put(columnName, resultSet.getString(index));
}
protected void setTime(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException {
try {
preparedStatement.setObject(parameterIndex, LocalTime.parse(value));
} catch (final DateTimeParseException e) {
setTimestamp(preparedStatement, parameterIndex, value);
}
}
protected void setTimestamp(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException {
// parse time, and timestamp the same way. this seems to not cause an problems and allows us
// to treat them all as ISO8601. if this causes any problems down the line, we can adjust.
// Parsing TIME as a TIMESTAMP might potentially break for ClickHouse cause it doesn't expect TIME
// value in the following format
try {
var valueWithoutMicros = value;
final StringBuilder nanos = new StringBuilder();
final var dotIndex = value.indexOf(".");
if (dotIndex > 0) {
final var micro = value.substring(value.lastIndexOf('.') + 1, value.length() - 1);
nanos.append(micro);
valueWithoutMicros = value.replace("." + micro, "");
}
while (nanos.length() != 9) {
nanos.append("0");
}
final var timestamp = Timestamp
.from(DataTypeUtils.getDateFormat().parse(valueWithoutMicros).toInstant());
timestamp.setNanos(Integer.parseInt(nanos.toString()));
preparedStatement.setTimestamp(parameterIndex, timestamp);
} catch (final ParseException e) {
throw new RuntimeException(e);
}
}
protected void setDate(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException {
try {
preparedStatement.setObject(parameterIndex, LocalDate.parse(value));
} catch (final DateTimeParseException e) {
setDateAsTimestamp(preparedStatement, parameterIndex, value);
}
}
private void setDateAsTimestamp(PreparedStatement preparedStatement, int parameterIndex, String value) throws SQLException {
try {
final Timestamp from = Timestamp.from(DataTypeUtils.getDateFormat().parse(value).toInstant());
preparedStatement.setDate(parameterIndex, new Date(from.getTime()));
} catch (final ParseException e) {
throw new RuntimeException(e);
}
}
protected void setBit(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException {
// todo (cgardens) - currently we do not support bit because it requires special handling in the
// prepared statement.
// see
// https://www.postgresql-archive.org/Problems-with-BIT-datatype-and-preparedStatment-td5733533.html.
throw new RuntimeException("BIT value is not supported as incremental parameter!");
}
protected void setBoolean(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException {
preparedStatement.setBoolean(parameterIndex, Boolean.parseBoolean(value));
}
protected void setShortInt(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException {
preparedStatement.setShort(parameterIndex, Short.parseShort(value));
}
protected void setInteger(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException {
preparedStatement.setInt(parameterIndex, Integer.parseInt(value));
}
protected void setBigInteger(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException {
preparedStatement.setLong(parameterIndex, Long.parseLong(value));
}
protected void setDouble(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException {
preparedStatement.setDouble(parameterIndex, Double.parseDouble(value));
}
protected void setReal(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException {
preparedStatement.setFloat(parameterIndex, Float.parseFloat(value));
}
protected void setDecimal(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException {
preparedStatement.setBigDecimal(parameterIndex, new BigDecimal(value));
}
protected void setString(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException {
preparedStatement.setString(parameterIndex, value);
}
protected void setBinary(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException {
preparedStatement.setBytes(parameterIndex, DatatypeConverter.parseHexBinary(value));
}
protected <ObjectType> ObjectType getObject(final ResultSet resultSet, final int index, final Class<ObjectType> clazz) throws SQLException {
return resultSet.getObject(index, clazz);
}
protected void putTimeWithTimezone(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
final OffsetTime timetz = getObject(resultSet, index, OffsetTime.class);
node.put(columnName, timetz.format(TIMETZ_FORMATTER));
}
protected void putTimestampWithTimezone(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index)
throws SQLException {
final OffsetDateTime timestamptz = getObject(resultSet, index, OffsetDateTime.class);
final LocalDate localDate = timestamptz.toLocalDate();
node.put(columnName, resolveEra(localDate, timestamptz.format(TIMESTAMPTZ_FORMATTER)));
}
/**
* Modifies a string representation of a date/timestamp and normalizes its era indicator.
* Specifically, if this is a BCE value:
* <ul>
* <li>The leading negative sign will be removed if present</li>
* <li>The "BC" suffix will be appended, if not already present</li>
* </ul>
*
* You most likely would prefer to call one of the overloaded methods, which accept temporal types.
*/
public static String resolveEra(final boolean isBce, final String value) {
String mangledValue = value;
if (isBce) {
if (mangledValue.startsWith("-")) {
mangledValue = mangledValue.substring(1);
}
if (!mangledValue.endsWith(" BC")) {
mangledValue += " BC";
}
}
return mangledValue;
}
public static boolean isBce(final LocalDate date) {
return date.getEra().equals(IsoEra.BCE);
}
public static String resolveEra(final LocalDate date, final String value) {
return resolveEra(isBce(date), value);
}
/**
* java.sql.Date objects don't properly represent their era (for example, using toLocalDate() always
* returns an object in CE). So to determine the era, we just check whether the date is before 1 AD.
*
* This is technically kind of sketchy due to ancient timestamps being weird (leap years, etc.), but
* my understanding is that {@link #ONE_CE} has the same weirdness, so it cancels out.
*/
public static String resolveEra(final Date date, final String value) {
return resolveEra(date.before(ONE_CE), value);
}
/**
* See {@link #resolveEra(Date, String)} for explanation.
*/
public static String resolveEra(final Timestamp timestamp, final String value) {
return resolveEra(timestamp.before(ONE_CE), value);
}
}