diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceDBRecord.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceDBRecord.java index 44131a01b..5001f1185 100644 --- a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceDBRecord.java +++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceDBRecord.java @@ -35,6 +35,7 @@ import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; +import java.sql.Struct; import java.sql.Timestamp; import java.sql.Types; import java.time.LocalDateTime; @@ -341,6 +342,16 @@ private void handleOracleSpecificType(ResultSet resultSet, StructuredRecord.Buil case OracleSourceSchemaReader.LONG_RAW: recordBuilder.set(field.getName(), resultSet.getBytes(columnIndex)); break; + case Types.STRUCT: + Object structObj = resultSet.getObject(columnIndex); + if (structObj == null) { + recordBuilder.set(field.getName(), null); + } else { + Struct struct = (Struct) structObj; + StructuredRecord nestedRecord = convertStructToRecord(struct, field.getSchema()); + recordBuilder.set(field.getName(), nestedRecord); + } + break; case Types.DECIMAL: case Types.NUMERIC: // This is the only way to differentiate FLOAT/REAL columns from other numeric columns, that based on NUMBER. @@ -378,6 +389,54 @@ private int getScale(Schema schema) { return schema.isNullable() ? schema.getNonNullable().getScale() : schema.getScale(); } + /** + * Converts a {@link Struct} into a nested {@link StructuredRecord} using the provided RECORD schema. + * Attribute values are matched to schema fields by ordinal position. + * + * @param struct the SQL Struct to convert + * @param recordSchema the CDAP RECORD schema (may be nullable) + * @return a StructuredRecord with field values populated from the STRUCT attributes + */ + private StructuredRecord convertStructToRecord(Struct struct, Schema recordSchema) throws SQLException { + Schema nonNullSchema = recordSchema.isNullable() ? recordSchema.getNonNullable() : recordSchema; + StructuredRecord.Builder builder = StructuredRecord.builder(nonNullSchema); + Object[] attributes = struct.getAttributes(); + List fields = nonNullSchema.getFields(); + + for (int i = 0; i < fields.size() && i < attributes.length; i++) { + Schema.Field field = fields.get(i); + Object attrValue = attributes[i]; + + if (attrValue == null) { + builder.set(field.getName(), null); + continue; + } + + Schema fieldSchema = field.getSchema().isNullable() + ? field.getSchema().getNonNullable() : field.getSchema(); + + if (attrValue instanceof Struct) { + builder.set(field.getName(), convertStructToRecord((Struct) attrValue, fieldSchema)); + } else if (attrValue instanceof java.sql.Date) { + builder.setDate(field.getName(), ((java.sql.Date) attrValue).toLocalDate()); + } else if (attrValue instanceof java.sql.Time) { + builder.setTime(field.getName(), ((java.sql.Time) attrValue).toLocalTime()); + } else if (attrValue instanceof Timestamp) { + if (Schema.LogicalType.DATETIME.equals(fieldSchema.getLogicalType())) { + builder.setDateTime(field.getName(), ((Timestamp) attrValue).toLocalDateTime()); + } else { + builder.setTimestamp(field.getName(), + ((Timestamp) attrValue).toInstant().atZone(java.time.ZoneId.of("UTC"))); + } + } else if (attrValue instanceof BigDecimal) { + builder.setDecimal(field.getName(), (BigDecimal) attrValue); + } else { + builder.set(field.getName(), attrValue); + } + } + return builder.build(); + } + private boolean isLongOrLongRaw(int columnType) { return columnType == OracleSourceSchemaReader.LONG || columnType == OracleSourceSchemaReader.LONG_RAW; } diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceSchemaReader.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceSchemaReader.java index 208b70410..425d201b1 100644 --- a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceSchemaReader.java +++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceSchemaReader.java @@ -18,14 +18,20 @@ import com.google.common.collect.ImmutableSet; import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.plugin.common.db.DBUtils; import io.cdap.plugin.db.CommonSchemaReader; import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Types; +import java.util.ArrayList; +import java.util.List; import java.util.Set; import javax.annotation.Nullable; @@ -63,7 +69,8 @@ public class OracleSourceSchemaReader extends CommonSchemaReader { LONG, LONG_RAW, Types.NUMERIC, - Types.DECIMAL + Types.DECIMAL, + Types.STRUCT ); private final String sessionID; @@ -71,6 +78,9 @@ public class OracleSourceSchemaReader extends CommonSchemaReader { private final Boolean isPrecisionlessNumAsDecimal; private final Boolean isTimestampLtzFieldTimestamp; + // Connection reference set during getSchemaFields() to enable STRUCT schema resolution in getSchema() + private Connection connection; + public OracleSourceSchemaReader() { this(null, false, false, false); } @@ -136,11 +146,73 @@ public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLExcepti } return Schema.decimalOf(precision, scale); } + case Types.STRUCT: + if (connection == null) { + throw new SQLException("Cannot resolve STRUCT schema without a database connection. " + + "Use getSchemaFields(ResultSet) to enable STRUCT type resolution."); + } + String typeName = metadata.getColumnTypeName(index); + String oracleSchemaName = metadata.getSchemaName(index); + return getStructSchema(connection, oracleSchemaName, typeName); default: return super.getSchema(metadata, index); } } + @Override + public List getSchemaFields(ResultSet resultSet) throws SQLException { + this.connection = resultSet.getStatement().getConnection(); + return super.getSchemaFields(resultSet); + } + + /** + * Builds a CDAP RECORD schema for an Oracle STRUCT type by querying the database metadata + * for the type's attributes. + * + * @param connection the database connection + * @param schemaName the Oracle schema owning the type + * @param typeName the Oracle type name (e.g., "ADDRESS_TYPE") + * @return a CDAP RECORD schema with fields corresponding to the STRUCT's attributes + */ + private Schema getStructSchema(Connection connection, String schemaName, + String typeName) throws SQLException { + DatabaseMetaData dbMetaData = connection.getMetaData(); + List fields = new ArrayList<>(); + + try (ResultSet attrRs = dbMetaData.getAttributes(null, schemaName, typeName, "%")) { + while (attrRs.next()) { + String attrName = attrRs.getString("ATTR_NAME"); + int attrType = attrRs.getInt("DATA_TYPE"); + String attrTypeName = attrRs.getString("ATTR_TYPE_NAME"); + int attrSize = attrRs.getInt("ATTR_SIZE"); + int attrScale = attrRs.getInt("DECIMAL_DIGITS"); + int nullable = attrRs.getInt("NULLABLE"); + + Schema attrSchema; + if (attrType == Types.STRUCT) { + // Nested STRUCT — recurse + attrSchema = getStructSchema(connection, schemaName, attrTypeName); + } else { + attrSchema = DBUtils.getSchema(attrTypeName, attrType, attrSize, attrScale, + attrName, true, true); + } + + if (nullable == DatabaseMetaData.attributeNullable) { + attrSchema = Schema.nullableOf(attrSchema); + } + fields.add(Schema.Field.of(attrName, attrSchema)); + } + } + + if (fields.isEmpty()) { + throw new SQLException(String.format( + "No attributes found for Oracle STRUCT type '%s.%s'. " + + "Ensure the type exists and is accessible.", schemaName, typeName)); + } + + return Schema.recordOf(typeName, fields); + } + private @NotNull Schema getTimestampLtzSchema() { return isTimestampOldBehavior || isTimestampLtzFieldTimestamp ? Schema.of(Schema.LogicalType.TIMESTAMP_MICROS) diff --git a/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleSchemaReaderTest.java b/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleSchemaReaderTest.java index 1ff77c533..a9c2e6a05 100644 --- a/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleSchemaReaderTest.java +++ b/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleSchemaReaderTest.java @@ -24,9 +24,13 @@ import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; +import java.sql.Connection; +import java.sql.DatabaseMetaData; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Types; import java.util.List; public class OracleSchemaReaderTest { @@ -91,4 +95,60 @@ public void getSchema_timestampLTZFieldFalse_returnDatetime() throws SQLExceptio Assert.assertEquals(expectedSchemaFields.get(1).getName(), actualSchemaFields.get(1).getName()); Assert.assertEquals(expectedSchemaFields.get(1).getSchema(), actualSchemaFields.get(1).getSchema()); } + + @Test + public void getSchemaFields_structType_returnRecord() throws SQLException { + OracleSourceSchemaReader schemaReader = new OracleSourceSchemaReader(); + + ResultSet resultSet = Mockito.mock(ResultSet.class); + ResultSetMetaData metadata = Mockito.mock(ResultSetMetaData.class); + Statement statement = Mockito.mock(Statement.class); + Connection connection = Mockito.mock(Connection.class); + DatabaseMetaData dbMetaData = Mockito.mock(DatabaseMetaData.class); + ResultSet attrResultSet = Mockito.mock(ResultSet.class); + + Mockito.when(resultSet.getMetaData()).thenReturn(metadata); + Mockito.when(resultSet.getStatement()).thenReturn(statement); + Mockito.when(statement.getConnection()).thenReturn(connection); + Mockito.when(connection.getMetaData()).thenReturn(dbMetaData); + + // One STRUCT column + Mockito.when(metadata.getColumnCount()).thenReturn(1); + Mockito.when(metadata.getColumnType(1)).thenReturn(Types.STRUCT); + Mockito.when(metadata.getColumnName(1)).thenReturn("address"); + Mockito.when(metadata.getColumnTypeName(1)).thenReturn("ADDRESS_TYPE"); + Mockito.when(metadata.getSchemaName(1)).thenReturn("TEST_SCHEMA"); + Mockito.when(metadata.isNullable(1)).thenReturn(ResultSetMetaData.columnNullable); + + // Mock getAttributes for ADDRESS_TYPE with two VARCHAR2 attributes + Mockito.when(dbMetaData.getAttributes(null, "TEST_SCHEMA", "ADDRESS_TYPE", "%")) + .thenReturn(attrResultSet); + Mockito.when(attrResultSet.next()).thenReturn(true, true, false); + + // First attribute: STREET VARCHAR2(100) + Mockito.when(attrResultSet.getString("ATTR_NAME")).thenReturn("STREET", "CITY"); + Mockito.when(attrResultSet.getInt("DATA_TYPE")).thenReturn(Types.VARCHAR, Types.VARCHAR); + Mockito.when(attrResultSet.getString("ATTR_TYPE_NAME")).thenReturn("VARCHAR2", "VARCHAR2"); + Mockito.when(attrResultSet.getInt("ATTR_SIZE")).thenReturn(100, 50); + Mockito.when(attrResultSet.getInt("DECIMAL_DIGITS")).thenReturn(0, 0); + Mockito.when(attrResultSet.getInt("NULLABLE")).thenReturn((int) DatabaseMetaData.attributeNullable, + (int) DatabaseMetaData.attributeNullable); + + List actualFields = schemaReader.getSchemaFields(resultSet); + + Assert.assertEquals(1, actualFields.size()); + Schema.Field addressField = actualFields.get(0); + Assert.assertEquals("address", addressField.getName()); + + // Should be nullable record + Schema addressSchema = addressField.getSchema().isNullable() + ? addressField.getSchema().getNonNullable() : addressField.getSchema(); + Assert.assertEquals(Schema.Type.RECORD, addressSchema.getType()); + Assert.assertEquals("ADDRESS_TYPE", addressSchema.getRecordName()); + + List structFields = addressSchema.getFields(); + Assert.assertEquals(2, structFields.size()); + Assert.assertEquals("STREET", structFields.get(0).getName()); + Assert.assertEquals("CITY", structFields.get(1).getName()); + } } diff --git a/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleSourceDBRecordUnitTest.java b/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleSourceDBRecordUnitTest.java index 77136e841..99b92e075 100644 --- a/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleSourceDBRecordUnitTest.java +++ b/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleSourceDBRecordUnitTest.java @@ -22,11 +22,13 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; import java.math.BigDecimal; import java.sql.ResultSet; import java.sql.ResultSetMetaData; +import java.sql.Struct; import java.sql.Timestamp; import java.sql.Types; import java.time.ZonedDateTime; @@ -234,4 +236,87 @@ public void validateTimestampTZTypeNullHandling() throws Exception { StructuredRecord record = builder.build(); Assert.assertNull(record.get("field1")); } + + @Test + public void validateStructTypeHandling() throws Exception { + Schema addressSchema = Schema.recordOf("ADDRESS_TYPE", + Schema.Field.of("STREET", Schema.nullableOf(Schema.of(Schema.Type.STRING))), + Schema.Field.of("CITY", Schema.nullableOf(Schema.of(Schema.Type.STRING))), + Schema.Field.of("ZIP", Schema.nullableOf(Schema.of(Schema.Type.INT))) + ); + Schema.Field field1 = Schema.Field.of("address", addressSchema); + Schema schema = Schema.recordOf("dbRecord", field1); + + Struct mockStruct = Mockito.mock(Struct.class); + when(mockStruct.getSQLTypeName()).thenReturn("ADDRESS_TYPE"); + when(mockStruct.getAttributes()).thenReturn(new Object[]{"123 Main St", "Springfield", 62704}); + when(resultSet.getObject(eq(1))).thenReturn(mockStruct); + + StructuredRecord.Builder builder = StructuredRecord.builder(schema); + OracleSourceDBRecord dbRecord = new OracleSourceDBRecord(null, null); + dbRecord.handleField(resultSet, builder, field1, 1, Types.STRUCT, 0, 0); + + StructuredRecord record = builder.build(); + StructuredRecord addressRecord = record.get("address"); + Assert.assertNotNull(addressRecord); + Assert.assertEquals("123 Main St", addressRecord.get("STREET")); + Assert.assertEquals("Springfield", addressRecord.get("CITY")); + Assert.assertEquals(62704, (int) addressRecord.get("ZIP")); + } + + @Test + public void validateStructTypeNullHandling() throws Exception { + Schema addressSchema = Schema.recordOf("ADDRESS_TYPE", + Schema.Field.of("STREET", Schema.nullableOf(Schema.of(Schema.Type.STRING))) + ); + Schema.Field field1 = Schema.Field.of("address", + Schema.nullableOf(addressSchema)); + Schema schema = Schema.recordOf("dbRecord", field1); + + when(resultSet.getObject(eq(1))).thenReturn(null); + + StructuredRecord.Builder builder = StructuredRecord.builder(schema); + OracleSourceDBRecord dbRecord = new OracleSourceDBRecord(null, null); + dbRecord.handleField(resultSet, builder, field1, 1, Types.STRUCT, 0, 0); + + StructuredRecord record = builder.build(); + Assert.assertNull(record.get("address")); + } + + @Test + public void validateNestedStructTypeHandling() throws Exception { + Schema innerSchema = Schema.recordOf("ADDRESS_TYPE", + Schema.Field.of("STREET", Schema.nullableOf(Schema.of(Schema.Type.STRING))), + Schema.Field.of("CITY", Schema.nullableOf(Schema.of(Schema.Type.STRING))) + ); + Schema outerSchema = Schema.recordOf("PERSON_TYPE", + Schema.Field.of("NAME", Schema.nullableOf(Schema.of(Schema.Type.STRING))), + Schema.Field.of("HOME_ADDRESS", Schema.nullableOf(innerSchema)) + ); + Schema.Field field1 = Schema.Field.of("person", outerSchema); + Schema schema = Schema.recordOf("dbRecord", field1); + + Struct innerStruct = Mockito.mock(Struct.class); + when(innerStruct.getSQLTypeName()).thenReturn("ADDRESS_TYPE"); + when(innerStruct.getAttributes()).thenReturn(new Object[]{"123 Main St", "Springfield"}); + + Struct outerStruct = Mockito.mock(Struct.class); + when(outerStruct.getSQLTypeName()).thenReturn("PERSON_TYPE"); + when(outerStruct.getAttributes()).thenReturn(new Object[]{"John", innerStruct}); + when(resultSet.getObject(eq(1))).thenReturn(outerStruct); + + StructuredRecord.Builder builder = StructuredRecord.builder(schema); + OracleSourceDBRecord dbRecord = new OracleSourceDBRecord(null, null); + dbRecord.handleField(resultSet, builder, field1, 1, Types.STRUCT, 0, 0); + + StructuredRecord record = builder.build(); + StructuredRecord personRecord = record.get("person"); + Assert.assertNotNull(personRecord); + Assert.assertEquals("John", personRecord.get("NAME")); + + StructuredRecord addressRecord = personRecord.get("HOME_ADDRESS"); + Assert.assertNotNull(addressRecord); + Assert.assertEquals("123 Main St", addressRecord.get("STREET")); + Assert.assertEquals("Springfield", addressRecord.get("CITY")); + } }