Ao usar Seatunnel 2.3.9 Para sincronizar dados de Oráculo para Dorisvocê pode encontrar caracteres iluminados – especialmente se o banco de dados Oracle usar o Conjunto de caracteres ASCII. Mas não entre em pânico – este artigo o leva por que Isso acontece e Como consertar.
🧠 Causa raiz
A questão decorre de como o Seatunnel lê dados do Oracle. Se o Oracle estiver usando um conjunto de caracteres como o ASCII e você está sincronizando para Doris (que espera UTF-8 adequado ou outras codificações compatíveis), os caracteres chineses podem se tornar ilegíveis.
A chave é interceptar e re-codificar os dados quando são lidos no Oracle ResultSet
.
🔍 Compreendendo o fluxo de leitura do Seatunnel
Vejamos os internos do SeatUnnel que lidam com a ingestão de dados do JDBC:
1. JdbcSourceFactory
Esta aula:
- Carrega suas configurações de origem.
- Construtos
JdbcSourceConfig
eJdbcDialect
. - Cria um
JdbcSource
exemplo.
2. JdbcSource
Esse:
- Inicializa a
SourceSplitEnumerator
para dividir as tarefas. - Cria um
JdbcSourceReader
para executá -los.
3. JdbcSourceReader
Responsável por:
- Construindo o
JdbcInputFormat
. - Chamando repetidamente o
pollNext()
Método para buscar dados.
4. pollNext()
Método
Este método:
- Chamadas
open()
emJdbcInputFormat
Para preparar oPreparedStatement
eResultSet
. - Então chamadas
nextRecord()
Para processar oResultSet
e convertê -lo em umSeaTunnelRow
.
5. nextRecord()
e o problema de codificação
Em JdbcInputFormat
:
- O
nextRecord()
chamadas de métodotoInternal()
emJdbcRowConverter
. - A implementação padrão usa
JdbcFieldTypeUtils.getString(rs, resultSetIndex)
.
💥 Problema: Se o resultado do resultado contiver caracteres chineses armazenados como ASCII, esse método retornará texto ilegal.
✅ Estratégia de solução
Precisamos detectar a codificação da fonte e re-codificar os dados No momento, foi recuperado do resultado do resultado.
Veja como fazer:
🛠 Etapas de implementação
Etapa 1: Adicionar parâmetros de charset
Em JdbcInputFormat
adicionar:
personal closing Map params;
No construtor:
public JdbcInputFormat(JdbcSourceConfig config, Map tables) {
this.jdbcDialect = JdbcDialectLoader.load(config.getJdbcConnectionConfig().getUrl(), config.getCompatibleMode());
this.chunkSplitter = ChunkSplitter.create(config);
this.jdbcRowConverter = jdbcDialect.getRowConverter();
this.tables = tables;
this.params = config.getJdbcConnectionConfig().getProperties(); // <-- get charset data right here
}
Etapa 2: Passe params
para o conversor da linha
No nextRecord()
método de JdbcInputFormat
atualize a chamada do método para:
SeaTunnelRow seaTunnelRow = jdbcRowConverter.toInternal(resultSet, splitTableSchema, params);
Etapa 3: Adicionar método de codificação
Em AbstractJdbcRowConverter
defina:
public static String convertCharset(byte[] worth, String charSet) {
if (worth == null || worth.size == 0) {
return null;
}
log.data("Worth bytes: {}", Arrays.toString(worth));
attempt {
return new String(worth, charSet);
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
}
Etapa 4: modifique toInternal()
para tipos de string
Em AbstractJdbcRowConverter
atualize o STRING
Tipo de manuseio assim:
case STRING:
if (params == null || params.isEmpty()) {
fields[fieldIndex] = JdbcFieldTypeUtils.getString(rs, resultSetIndex);
} else {
String sourceCharset = params.get("sourceCharset");
if ("GBK".equalsIgnoreCase(sourceCharset)) {
fields[fieldIndex] = convertCharset(JdbcFieldTypeUtils.getBytes(rs, resultSetIndex), sourceCharset);
} else {
fields[fieldIndex] = JdbcFieldTypeUtils.getString(rs, resultSetIndex);
}
}
break;
Etapa 5: reconstruir e implantar
Depois de fazer as mudanças acima:
- Reconstruir o
connector-jdbc
módulo. - Substitua o existente
connector-jdbc-2.3.9.jar
Sob o Seatunnel’sconnectors
diretório. - Reinicie o cluster do Seatunnel.
🧾 Dicas de configuração
- Se o seu banco de dados Oracle não tem problemas de codificaçãovocê não precisa passar pelo
sourceCharset
propriedade. - Se necessário, passe assim em sua configuração:
sourceCharset=GBK
- Para depurar o log de
connector-jdbc
verifique o toras do trabalhador no Seatunnellogs
diretório.
✅ Resumo
Ao adicionar um mecanismo simples de troca de charset e ajustar a implementação da fonte JDBC, você pode eliminar caracteres ilegais ao sincronizar os dados do Oracle com Doris usando o SeatUnnel.
Não há mais caracteres quebrados – seu pipeline de dados ficou mais inteligente. 🚀