Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
420 changes: 420 additions & 0 deletions .lgtm.yml

Large diffs are not rendered by default.

75 changes: 75 additions & 0 deletions contrib/storage-jdbc/WritingToJDBC.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# Writing to JDBC Data Sources
It is now possible to write to databases via Drill's JDBC Storage Plugin. At present Drill supports the following query formats for writing:

* `CREATE TABLE AS`
* `CREATE TABLE IF NOT EXISTS`
* `DROP TABLE`
* `DROP TABLE IF NOT EXISTS`

For further information about Drill's support for CTAS queries please refer to the documentation page here: https://drill.apache.org/docs/create-table-as-ctas/. The syntax is
exactly the same as writing to a file. As with writing to files, it is a best practice to avoid `SELECT *` queries in the CTAS query.

Not all JDBC sources will support writing. In order for the connector to successfully write, the source system must support `CREATE TABLE AS` as well as `INSERT` queries.
At present, Writing has been tested with MySQL, Postgres and H2.

#### Note about Apache Phoenix
Apache Phoenix uses slightly non-standard syntax for INSERTs. The JDBC writer should support writes to Apache Phoenix though this has not been tested and should be regarded as
an experimental feature.

## Configuring the Connection for Writing
Firstly, it should go without saying that the Database to which you are writing should have a user permissions which allow writing. Next, you will need to set the `writable`
parameter to `true` as shown below:

### Setting the Batch Size
Drill after creating the table, Drill will execute a series of `INSERT` queries with the data you are adding to the new table. How many records can be inserted into the
database at once is a function of your specific database. Larger numbers will result in fewer insert queries, and more likely faster overall performance, but may also overload
your database connection. You can configure the batch size by setting the `writerBatchSize` variable in the configuration as shown below. The default is 10000 records per batch.

### Sample Writable MySQL Connection
```json
{
"type": "jdbc",
"driver": "com.mysql.cj.jdbc.Driver",
"url": "jdbc:mysql://localhost:3306/?useJDBCCompliantTimezoneShift=true&serverTimezone=EST5EDT",
"username": "<username>",
"password": "<password>",
"writable": true,
"writerBatchSize": 10000,
"enabled": true
}
```
### Sample Writable Postgres Connection
```json
{
"type": "jdbc",
"driver": "org.postgresql.Driver",
"url": "jdbc:postgresql://localhost:5432/sakila?defaultRowFetchSize=2",
"username": "postgres",
"sourceParameters": {
"minimumIdle": 5,
"autoCommit": false,
"connectionTestQuery": "select version() as postgresql_version",
"dataSource.cachePrepStmts": true,
"dataSource.prepStmtCacheSize": 250
},
"writable": true
}
```

## Limitations

### Row Limits
The first issue to be aware of is that most relational databases have some sort of limit on how many rows can be inserted at once and how many columns a table may contain. It
is important to be aware of these limits and make sure that your database is configured to receive the amount of data you are trying to write. For example, you can configure
MySQL by setting the `max_packet_size` variable to accept very large inserts.

### Data Types
While JDBC is a standard for interface, different databases handle datatypes in different manners. The JDBC writer tries to map data types to the most generic way possible so
that it will work in as many cases as possible.

#### Compound Data Types
Most relational databases do not support compound fields of any sort. As a result, attempting to write a compound type to a JDBC data source, will result in an exception.
Future functionality may include the possibility of converting complex types to strings and inserting those strings into the target database.

#### VarBinary Data
It is not currently possible to insert a VarBinary field into a JDBC database.
19 changes: 18 additions & 1 deletion contrib/storage-jdbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
<mysql.connector.version>8.0.25</mysql.connector.version>
<clickhouse.jdbc.version>0.3.1</clickhouse.jdbc.version>
<h2.version>1.4.200</h2.version>
<postgresql.version>42.2.24</postgresql.version>
</properties>

<dependencies>
Expand All @@ -46,7 +47,11 @@
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
</dependency>

<dependency>
<groupId>${calcite.groupId}</groupId>
<artifactId>calcite-server</artifactId>
<version>${calcite.version}</version>
</dependency>
<!-- Test dependencies -->
<dependency>
<groupId>org.apache.drill.exec</groupId>
Expand All @@ -68,6 +73,12 @@
<version>${mysql.connector.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>${postgresql.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mysql</artifactId>
Expand All @@ -92,6 +103,12 @@
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>postgresql</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,30 @@
package org.apache.drill.exec.store.jdbc;

import javax.sql.DataSource;
import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.calcite.adapter.jdbc.JdbcConvention;
import org.apache.calcite.adapter.jdbc.JdbcSchema;
import org.apache.calcite.schema.Function;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.Table;
import org.apache.calcite.sql.SqlDialect;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.Writer;
import org.apache.drill.exec.planner.logical.CreateTableEntry;
import org.apache.drill.exec.store.AbstractSchema;
import org.apache.drill.exec.store.StorageStrategy;
import org.apache.drill.exec.store.jdbc.utils.JdbcDDLQueryUtils;
import org.apache.drill.exec.store.jdbc.utils.JdbcQueryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -42,14 +52,20 @@ public class CapitalizingJdbcSchema extends AbstractSchema {
private final Map<String, CapitalizingJdbcSchema> schemaMap;
private final JdbcSchema inner;
private final boolean caseSensitive;
private final JdbcStoragePlugin plugin;
private final String catalog;
private final String schema;

public CapitalizingJdbcSchema(List<String> parentSchemaPath, String name,
DataSource dataSource,
SqlDialect dialect, JdbcConvention convention, String catalog, String schema, boolean caseSensitive) {
SqlDialect dialect, DrillJdbcConvention convention, String catalog, String schema, boolean caseSensitive) {
super(parentSchemaPath, name);
this.schemaMap = new HashMap<>();
this.inner = new JdbcSchema(dataSource, dialect, convention, catalog, schema);
this.caseSensitive = caseSensitive;
this.plugin = convention.getPlugin();
this.schema = schema;
this.catalog = catalog;
}

@Override
Expand Down Expand Up @@ -93,6 +109,68 @@ public Set<String> getTableNames() {
return inner.getTableNames();
}


@Override
public CreateTableEntry createNewTable(String tableName, List<String> partitionColumns, StorageStrategy strategy) {
if (! plugin.getConfig().isWritable()) {
throw UserException
.dataWriteError()
.message(plugin.getName() + " is not writable.")
.build(logger);
}

return new CreateTableEntry() {

@Override
public Writer getWriter(PhysicalOperator child) throws IOException {
String tableWithSchema = JdbcQueryBuilder.buildCompleteTableName(tableName, catalog, schema);
return new JdbcWriter(child, tableWithSchema, inner, plugin);
}

@Override
public List<String> getPartitionColumns() {
return Collections.emptyList();
}
};
}

@Override
public void dropTable(String tableName) {
if (! plugin.getConfig().isWritable()) {
throw UserException
.dataWriteError()
.message(plugin.getName() + " is not writable.")
.build(logger);
}

String tableWithSchema = JdbcQueryBuilder.buildCompleteTableName(tableName, catalog, schema);
String dropTableQuery = String.format("DROP TABLE %s", tableWithSchema);
dropTableQuery = JdbcDDLQueryUtils.cleanDDLQuery(dropTableQuery, plugin.getDialect());

try (Connection conn = inner.getDataSource().getConnection();
Statement stmt = conn.createStatement()) {
logger.debug("Executing drop table query: {}", dropTableQuery);
int successfullyDropped = stmt.executeUpdate(dropTableQuery);
logger.debug("Result: {}", successfullyDropped);
if (successfullyDropped > 0) {
throw UserException.dataWriteError()
.message("Error while dropping table " + tableName)
.addContext(stmt.getWarnings().getMessage())
.build(logger);
}
} catch (SQLException e) {
throw UserException.dataWriteError(e)
.message("Failure while trying to drop table '%s'.", tableName)
.addContext("plugin", name)
.build(logger);
}
}

@Override
public boolean isMutable() {
return plugin.getConfig().isWritable();
}

@Override
public Table getTable(String name) {
if (isCatalogSchema()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,11 @@ public void register(RelOptPlanner planner) {
rules.forEach(planner::addRule);
}

Set<RelOptRule> getRules() {
public Set<RelOptRule> getRules() {
return rules;
}

JdbcStoragePlugin getPlugin() {
public JdbcStoragePlugin getPlugin() {
return plugin;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,11 @@ private TupleMetadata buildSchema() throws SQLException {
// column index in ResultSetMetaData starts from 1
int jdbcType = meta.getColumnType(i);
int width = Math.min(meta.getPrecision(i), DRILL_REL_DATATYPE_SYSTEM.getMaxNumericPrecision());
int scale = Math.min(meta.getScale(i), DRILL_REL_DATATYPE_SYSTEM.getMaxNumericScale());

// Note, if both the precision and scale are not defined in the query, Drill defaults to 38 for both
// Which causes an overflow exception. We reduce the scale by one here to avoid this. The better solution
// would be for the user to provide the precision and scale.
int scale = Math.min(meta.getScale(i), DRILL_REL_DATATYPE_SYSTEM.getMaxNumericScale() - 1);

MinorType minorType = JDBC_TYPE_MAPPINGS.get(jdbcType);
if (minorType == null) {
Expand All @@ -233,7 +237,7 @@ private TupleMetadata buildSchema() throws SQLException {
continue;
}

jdbcColumns.add(new JdbcColumn(name, minorType, i));
jdbcColumns.add(new JdbcColumn(name, minorType, i, scale, width));
// Precision and scale are passed for all readers whether they are needed or not.
builder.addNullable(name, minorType, width, scale);
}
Expand Down Expand Up @@ -277,7 +281,7 @@ private void populateWriterArray() {
columnWriters.add(new JdbcBitWriter(col.colName, rowWriter, col.colPosition));
break;
case VARDECIMAL:
columnWriters.add(new JdbcVardecimalWriter(col.colName, rowWriter, col.colPosition));
columnWriters.add(new JdbcVardecimalWriter(col.colName, rowWriter, col.colPosition, col.scale, col.precision));
break;
default:
logger.warn("Unsupported data type {} found at column {}", col.type.getDescriptorForType(), col.colName);
Expand Down Expand Up @@ -305,11 +309,15 @@ public static class JdbcColumn {
final String colName;
final MinorType type;
final int colPosition;
final int scale;
final int precision;

public JdbcColumn (String colName, MinorType type, int colPosition) {
public JdbcColumn (String colName, MinorType type, int colPosition, int scale, int precision) {
this.colName = colName;
this.type = type;
this.colPosition = colPosition;
this.scale = scale;
this.precision = precision;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ class JdbcCatalogSchema extends AbstractSchema {

if (!schemasAdded) {
// there were no schemas, just create a default one (the jdbc system doesn't support catalogs/schemas).
schemaMap.put(SchemaFactory.DEFAULT_WS_NAME, new CapitalizingJdbcSchema(Collections.emptyList(), name, source, dialect, convention, null, null, caseSensitive));
schemaMap.put(SchemaFactory.DEFAULT_WS_NAME, new CapitalizingJdbcSchema(Collections.emptyList(), name, source, dialect,
convention, null, null, caseSensitive));
}
} else {
// We already have catalogs. Add schemas in this context of their catalogs.
Expand Down
Loading