Skip to content

Commit

Permalink
Add fixes for CrateDB
Browse files Browse the repository at this point in the history
  • Loading branch information
grzegorz8 committed Nov 12, 2024
1 parent 00256d4 commit 2a03cf5
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ protected DataType getMapping(String pgType, int precision, int scale) {
return DataTypes.STRING();
case PG_STRING_ARRAY:
return DataTypes.ARRAY(DataTypes.STRING());
case PG_JSON: // TODO: Add support for JSON type.
return null;
case PG_JSONB: // CrateDB supports JSON type, but not JSONB.
return null;
default:
return super.getMapping(pgType, precision, scale);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.jdbc.postgres.database.dialect.CompatiblePostgresDialectConverter;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.types.logical.RowType;

/**
Expand All @@ -39,4 +40,9 @@ public CrateDBDialectConverter(RowType rowType) {
public String compatibleConverterName() {
return "CrateDB";
}

@Override
protected JdbcDeserializationConverter createVarcharConverter() {
return val -> StringData.fromString((String) val);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ public class PostgresTypeMapper implements JdbcCatalogTypeMapper {
private static final String PG_CHARACTER_ARRAY = "_character";
private static final String PG_CHARACTER_VARYING = "varchar";
private static final String PG_CHARACTER_VARYING_ARRAY = "_varchar";
private static final String PG_JSON = "json";
private static final String PG_JSONB = "jsonb";
protected static final String PG_JSON = "json";
protected static final String PG_JSONB = "jsonb";

@Override
public DataType mapping(ObjectPath tablePath, ResultSetMetaData metadata, int colIndex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,19 +96,23 @@ private JdbcDeserializationConverter createPostgresArrayConverter(ArrayType arra
private JdbcDeserializationConverter createPrimitiveConverter(LogicalType type) {
switch (type.getTypeRoot()) {
case VARCHAR:
return val -> {
if (val instanceof PGobject) {
PGobject obj = (PGobject) val;
return StringData.fromString(obj.getValue());
} else {
return StringData.fromString((String) val);
}
};
return createVarcharConverter();
default:
return super.createInternalConverter(type);
}
}

protected JdbcDeserializationConverter createVarcharConverter() {
return val -> {
if (val instanceof PGobject) {
PGobject obj = (PGobject) val;
return StringData.fromString(obj.getValue());
} else {
return StringData.fromString((String) val);
}
};
}

@Override
public String converterName() {
return "PostgreSQL";
Expand Down

0 comments on commit 2a03cf5

Please sign in to comment.