builder()
+ .addAll(splits)
+ .add(nullSplit)
+ .build();
+ }
+
+ return splits;
+ }
+
@Override
public Connection getConnection() {
if (this.connection == null) {
@@ -128,6 +187,15 @@ public Connection createConnection() {
return getConnection();
}
+ @Override
+ protected DBSplitter getSplitter(int sqlDataType) {
+ // Use SafeBigDecimalSplitter for columns having high precision decimal or numeric columns
+ if (sqlDataType == Types.NUMERIC || sqlDataType == Types.DECIMAL) {
+ return new SafeBigDecimalSplitter();
+ }
+ return super.getSplitter(sqlDataType);
+ }
+
@Override
public RecordReader createDBRecordReader(DBInputSplit split, Configuration conf) throws IOException {
final RecordReader dbRecordReader = super.createDBRecordReader(split, conf);
diff --git a/database-commons/src/main/java/io/cdap/plugin/db/source/SafeBigDecimalSplitter.java b/database-commons/src/main/java/io/cdap/plugin/db/source/SafeBigDecimalSplitter.java
new file mode 100644
index 000000000..8649515e8
--- /dev/null
+++ b/database-commons/src/main/java/io/cdap/plugin/db/source/SafeBigDecimalSplitter.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright © 2025 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.db.source;
+
+import org.apache.hadoop.mapreduce.lib.db.BigDecimalSplitter;
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+
+/**
+ * Safe implementation of {@link BigDecimalSplitter} to ensure precise division of BigDecimal values while calculating
+ * split points for NUMERIC and DECIMAL types.
+ *
+ * Problem: The default {@link BigDecimalSplitter} implementation may return 0 when the numerator is smaller than the
+ * denominator (e.g., 1 / 4 = 0), due to the lack of a defined scale for division. Since the result (0) is smaller than
+ * {@link BigDecimalSplitter#MIN_INCREMENT} (i.e. {@code 10000 * Double.MIN_VALUE}), the split size defaults to
+ * {@code MIN_INCREMENT}, leading to an excessive number of splits (~10M) and potential OOM errors.
+ *
+ * Fix: This implementation derives scale from column metadata, adds a buffer of 5 decimal places, and uses
+ * {@link RoundingMode#HALF_UP} as the rounding mode.
Note: This class is used by {@link DataDrivenETLDBInputFormat}.
+ */
+public class SafeBigDecimalSplitter extends BigDecimalSplitter {
+
+ /* An additional buffer of +5 digits is applied to preserve accuracy during division. */
+ public static final int SCALE_BUFFER = 5;
+ /**
+ * Performs safe division with correct scale handling.
+ *
+ * @param numerator the dividend (BigDecimal)
+ * @param denominator the divisor (BigDecimal)
+ * @return quotient with derived scale
+ * @throws ArithmeticException if denominator is zero
+ */
+ @Override
+ protected BigDecimal tryDivide(BigDecimal numerator, BigDecimal denominator) {
+ // Determine the required scale for the division and add a buffer to ensure accuracy
+ int effectiveScale = Math.max(numerator.scale(), denominator.scale()) + SCALE_BUFFER;
+ return numerator.divide(denominator, effectiveScale, RoundingMode.HALF_UP);
+ }
+}
diff --git a/database-commons/src/test/java/io/cdap/plugin/db/source/AbstractDBSourceTest.java b/database-commons/src/test/java/io/cdap/plugin/db/source/AbstractDBSourceTest.java
index 3dc7a2d1c..a8be38b46 100644
--- a/database-commons/src/test/java/io/cdap/plugin/db/source/AbstractDBSourceTest.java
+++ b/database-commons/src/test/java/io/cdap/plugin/db/source/AbstractDBSourceTest.java
@@ -43,11 +43,17 @@ public class AbstractDBSourceTest {
Schema.Field.of("double_column", Schema.nullableOf(Schema.of(Schema.Type.DOUBLE))),
Schema.Field.of("boolean_column", Schema.nullableOf(Schema.of(Schema.Type.BOOLEAN)))
);
+ private static final AbstractDBSource.DBSourceConfig TEST_CONFIG = new AbstractDBSource.DBSourceConfig() {
+ @Override
+ public String getConnectionString() {
+ return "";
+ }
+ };
@Test
public void testValidateSourceSchemaCorrectSchema() {
MockFailureCollector collector = new MockFailureCollector(MOCK_STAGE);
- AbstractDBSource.DBSourceConfig.validateSchema(SCHEMA, SCHEMA, collector);
+ TEST_CONFIG.validateSchema(SCHEMA, SCHEMA, collector);
Assert.assertEquals(0, collector.getValidationFailures().size());
}
@@ -65,7 +71,7 @@ public void testValidateSourceSchemaMismatchFields() {
);
MockFailureCollector collector = new MockFailureCollector(MOCK_STAGE);
- AbstractDBSource.DBSourceConfig.validateSchema(actualSchema, SCHEMA, collector);
+ TEST_CONFIG.validateSchema(actualSchema, SCHEMA, collector);
assertPropertyValidationFailed(collector, "boolean_column");
}
@@ -84,7 +90,7 @@ public void testValidateSourceSchemaInvalidFieldType() {
);
MockFailureCollector collector = new MockFailureCollector(MOCK_STAGE);
- AbstractDBSource.DBSourceConfig.validateSchema(actualSchema, SCHEMA, collector);
+ TEST_CONFIG.validateSchema(actualSchema, SCHEMA, collector);
assertPropertyValidationFailed(collector, "boolean_column");
}
diff --git a/database-commons/src/test/java/io/cdap/plugin/db/source/DataDrivenETLDBInputFormatTest.java b/database-commons/src/test/java/io/cdap/plugin/db/source/DataDrivenETLDBInputFormatTest.java
new file mode 100644
index 000000000..b369d008b
--- /dev/null
+++ b/database-commons/src/test/java/io/cdap/plugin/db/source/DataDrivenETLDBInputFormatTest.java
@@ -0,0 +1,121 @@
+/*
+ * Copyright © 2019 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.db.source;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
+import org.apache.hadoop.mapreduce.lib.db.DataDrivenDBInputFormat;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+@RunWith(MockitoJUnitRunner.class)
+public class DataDrivenETLDBInputFormatTest {
+
+ @Mock
+ private JobContext mockJobContext;
+ @Mock
+ private DBConfiguration mockDbConfiguration;
+
+ private DataDrivenETLDBInputFormat inputFormat;
+
+ @Before
+ public void setUp() {
+ inputFormat = Mockito.spy(new DataDrivenETLDBInputFormat());
+ Mockito.doReturn(mockDbConfiguration).when(inputFormat).getDBConf();
+ Mockito.doReturn("id").when(mockDbConfiguration).getInputOrderBy();
+ }
+
+ @Test
+ public void testGetSplitsAddsNullSplit() throws IOException {
+ DataDrivenDBInputFormat.DataDrivenDBInputSplit existingSplit =
+ new DataDrivenDBInputFormat.DataDrivenDBInputSplit("id >= 0", "id < 100");
+ List initialSplits = ImmutableList.of(existingSplit);
+ Mockito.doReturn(initialSplits).when(inputFormat).getBaseSplits(mockJobContext);
+
+ List finalSplits = inputFormat.getSplits(mockJobContext);
+
+ Assert.assertEquals("A new split for NULLs should be added", 2, finalSplits.size());
+
+ DataDrivenDBInputFormat.DataDrivenDBInputSplit nullSplit =
+ (DataDrivenDBInputFormat.DataDrivenDBInputSplit) finalSplits.get(1);
+ Assert.assertEquals("id IS NULL", nullSplit.getLowerClause());
+ Assert.assertEquals("id IS NULL", nullSplit.getUpperClause());
+ }
+
+ @Test
+ public void testGetSplitsDoesNotAddNullSplitIfPresent() throws IOException {
+ DataDrivenDBInputFormat.DataDrivenDBInputSplit existingSplit =
+ new DataDrivenDBInputFormat.DataDrivenDBInputSplit("id >= 0", "id < 100");
+ DataDrivenDBInputFormat.DataDrivenDBInputSplit nullSplit =
+ new DataDrivenDBInputFormat.DataDrivenDBInputSplit("id IS NULL", "id IS NULL");
+ List initialSplits = ImmutableList.of(existingSplit, nullSplit);
+
+ Mockito.doReturn(initialSplits).when(inputFormat).getBaseSplits(mockJobContext);
+
+ List finalSplits = inputFormat.getSplits(mockJobContext);
+
+ Assert.assertEquals("Should not add a duplicate NULL split", 2, finalSplits.size());
+ }
+
+ @Test
+ public void testGetSplitsDoesNotAddNullSplitIfSelectAllPresent() throws IOException {
+ DataDrivenDBInputFormat.DataDrivenDBInputSplit existingSplit =
+ new DataDrivenDBInputFormat.DataDrivenDBInputSplit("1=1", "1=1");
+ List initialSplits = ImmutableList.of(existingSplit);
+
+ Mockito.doReturn(initialSplits).when(inputFormat).getBaseSplits(mockJobContext);
+
+ List finalSplits = inputFormat.getSplits(mockJobContext);
+
+ Assert.assertEquals("Should not add a NULL split", 1, finalSplits.size());
+ }
+
+ @Test
+ public void testGetSplitsDoesNotAddNullSplitIfBaseReturnsNull() throws IOException {
+ Mockito.doReturn(null).when(inputFormat).getBaseSplits(mockJobContext);
+
+ List finalSplits = inputFormat.getSplits(mockJobContext);
+ Assert.assertEquals("Should not add a NULL split", 1, finalSplits.size());
+
+ DataDrivenDBInputFormat.DataDrivenDBInputSplit split =
+ (DataDrivenDBInputFormat.DataDrivenDBInputSplit) finalSplits.get(0);
+ Assert.assertEquals("1=1", split.getLowerClause());
+ }
+
+ @Test
+ public void testGetSplitsDoesNotAddNullSplitIfBaseReturnsEmptyList() throws IOException {
+ Mockito.doReturn(Collections.emptyList()).when(inputFormat).getBaseSplits(mockJobContext);
+
+ List finalSplits = inputFormat.getSplits(mockJobContext);
+ Assert.assertEquals("Should not add a NULL split", 1, finalSplits.size());
+
+ DataDrivenDBInputFormat.DataDrivenDBInputSplit split =
+ (DataDrivenDBInputFormat.DataDrivenDBInputSplit) finalSplits.get(0);
+ Assert.assertEquals("1=1", split.getLowerClause());
+ }
+}
diff --git a/database-commons/src/test/java/io/cdap/plugin/db/source/SafeBigDecimalSplitterTest.java b/database-commons/src/test/java/io/cdap/plugin/db/source/SafeBigDecimalSplitterTest.java
new file mode 100644
index 000000000..4aff4eac2
--- /dev/null
+++ b/database-commons/src/test/java/io/cdap/plugin/db/source/SafeBigDecimalSplitterTest.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright © 2025 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.db.source;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.lib.db.BigDecimalSplitter;
+import org.junit.Test;
+
+import java.math.BigDecimal;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test class for {@link SafeBigDecimalSplitter}
+ */
+public class SafeBigDecimalSplitterTest {
+ private final SafeBigDecimalSplitter splitter = new SafeBigDecimalSplitter();
+
+ @Test
+ public void testSmallRangeDivision() {
+ BigDecimal result = splitter.tryDivide(BigDecimal.ONE, new BigDecimal("4"));
+ assertEquals(new BigDecimal("0.25000"), result);
+ }
+
+ @Test
+ public void testLargePrecision() {
+ BigDecimal numerator = new BigDecimal("1.0000000000000000001");
+ BigDecimal denominator = new BigDecimal("3");
+ BigDecimal result = splitter.tryDivide(numerator, denominator);
+ assertTrue(result.compareTo(BigDecimal.ZERO) > 0);
+ }
+
+ @Test
+ public void testDivisionByZero() {
+ assertThrows(ArithmeticException.class, () ->
+ splitter.tryDivide(BigDecimal.ONE, BigDecimal.ZERO));
+ }
+
+ @Test
+ public void testDivisionWithZeroNumerator() {
+ // when minVal == maxVal
+ BigDecimal result = splitter.tryDivide(BigDecimal.ZERO, BigDecimal.ONE);
+ assertEquals(0, result.compareTo(BigDecimal.ZERO));
+ }
+
+ @Test
+ public void testSplits() throws SQLException {
+ BigDecimal minVal = BigDecimal.valueOf(1);
+ BigDecimal maxVal = BigDecimal.valueOf(2);
+ int numSplits = 4;
+ ResultSet resultSet = mock(ResultSet.class);
+ Configuration conf = mock(Configuration.class);
+ when(conf.getInt("mapreduce.job.maps", 1)).thenReturn(numSplits);
+ when(resultSet.getBigDecimal(1)).thenReturn(minVal);
+ when(resultSet.getBigDecimal(2)).thenReturn(maxVal);
+ BigDecimalSplitter bigDecimalSplitter = new SafeBigDecimalSplitter();
+ List actualSplits = bigDecimalSplitter.split(conf, resultSet, "id");
+ assertEquals(numSplits, actualSplits.size());
+ }
+
+ @Test
+ public void testSplitsWithMinValueEqualToMaxValue() throws SQLException {
+ // when minVal == maxVal
+ BigDecimal minVal = BigDecimal.valueOf(1);
+ BigDecimal maxVal = BigDecimal.valueOf(1);
+ int numSplits = 1;
+ ResultSet resultSet = mock(ResultSet.class);
+ Configuration conf = mock(Configuration.class);
+ when(conf.getInt("mapreduce.job.maps", 1)).thenReturn(numSplits);
+ when(resultSet.getBigDecimal(1)).thenReturn(minVal);
+ when(resultSet.getBigDecimal(2)).thenReturn(maxVal);
+ BigDecimalSplitter bigDecimalSplitter = new SafeBigDecimalSplitter();
+ List actualSplits = bigDecimalSplitter.split(conf, resultSet, "id");
+ assertEquals(numSplits, actualSplits.size());
+ }
+}
diff --git a/db2-plugin/pom.xml b/db2-plugin/pom.xml
index a43bcb92e..f11d6aa75 100644
--- a/db2-plugin/pom.xml
+++ b/db2-plugin/pom.xml
@@ -20,7 +20,7 @@
database-plugins-parent
io.cdap.plugin
- 1.11.0-SNAPSHOT
+ 1.11.13
IBM DB2 plugin
@@ -77,12 +77,6 @@
db2jcc4
test