Início Tecnologia Corrigindo o texto ilegal ao sincronizar o Oracle a Doris com o...

Corrigindo o texto ilegal ao sincronizar o Oracle a Doris com o Seatunnel 2.3.9

7
0

Ao usar Seatunnel 2.3.9 Para sincronizar dados de Oráculo para Dorisvocê pode encontrar caracteres iluminados – especialmente se o banco de dados Oracle usar o Conjunto de caracteres ASCII. Mas não entre em pânico – este artigo o leva por que Isso acontece e Como consertar.

🧠 Causa raiz

A questão decorre de como o Seatunnel lê dados do Oracle. Se o Oracle estiver usando um conjunto de caracteres como o ASCII e você está sincronizando para Doris (que espera UTF-8 adequado ou outras codificações compatíveis), os caracteres chineses podem se tornar ilegíveis.

A chave é interceptar e re-codificar os dados quando são lidos no Oracle ResultSet.

🔍 Compreendendo o fluxo de leitura do Seatunnel

Vejamos os internos do SeatUnnel que lidam com a ingestão de dados do JDBC:

1. JdbcSourceFactory

Esta aula:

  • Carrega suas configurações de origem.
  • Construtos JdbcSourceConfig e JdbcDialect.
  • Cria um JdbcSource exemplo.

2. JdbcSource

Esse:

  • Inicializa a SourceSplitEnumerator para dividir as tarefas.
  • Cria um JdbcSourceReader para executá -los.

3. JdbcSourceReader

Responsável por:

  • Construindo o JdbcInputFormat.
  • Chamando repetidamente o pollNext() Método para buscar dados.

4. pollNext() Método

Este método:

  • Chamadas open() em JdbcInputFormat Para preparar o PreparedStatement e ResultSet.
  • Então chamadas nextRecord() Para processar o ResultSet e convertê -lo em um SeaTunnelRow.

5. nextRecord() e o problema de codificação

Em JdbcInputFormat:

  • O nextRecord() chamadas de método toInternal() em JdbcRowConverter.
  • A implementação padrão usa JdbcFieldTypeUtils.getString(rs, resultSetIndex).

💥 Problema: Se o resultado do resultado contiver caracteres chineses armazenados como ASCII, esse método retornará texto ilegal.

✅ Estratégia de solução

Precisamos detectar a codificação da fonte e re-codificar os dados No momento, foi recuperado do resultado do resultado.

Veja como fazer:

🛠 Etapas de implementação

Etapa 1: Adicionar parâmetros de charset

Em JdbcInputFormatadicionar:

personal closing Map params;

No construtor:

public JdbcInputFormat(JdbcSourceConfig config, Map tables) {
    this.jdbcDialect = JdbcDialectLoader.load(config.getJdbcConnectionConfig().getUrl(), config.getCompatibleMode());
    this.chunkSplitter = ChunkSplitter.create(config);
    this.jdbcRowConverter = jdbcDialect.getRowConverter();
    this.tables = tables;
    this.params = config.getJdbcConnectionConfig().getProperties(); // <-- get charset data right here
}

Etapa 2: Passe params para o conversor da linha

No nextRecord() método de JdbcInputFormatatualize a chamada do método para:

SeaTunnelRow seaTunnelRow = jdbcRowConverter.toInternal(resultSet, splitTableSchema, params);

Etapa 3: Adicionar método de codificação

Em AbstractJdbcRowConverterdefina:

public static String convertCharset(byte[] worth, String charSet) {
    if (worth == null || worth.size == 0) {
        return null;
    }
    log.data("Worth bytes: {}", Arrays.toString(worth));
    attempt {
        return new String(worth, charSet);
    } catch (UnsupportedEncodingException e) {
        throw new RuntimeException(e);
    }
}

Etapa 4: modifique toInternal() para tipos de string

Em AbstractJdbcRowConverteratualize o STRING Tipo de manuseio assim:

case STRING:
    if (params == null || params.isEmpty()) {
        fields[fieldIndex] = JdbcFieldTypeUtils.getString(rs, resultSetIndex);
    } else {
        String sourceCharset = params.get("sourceCharset");
        if ("GBK".equalsIgnoreCase(sourceCharset)) {
            fields[fieldIndex] = convertCharset(JdbcFieldTypeUtils.getBytes(rs, resultSetIndex), sourceCharset);
        } else {
            fields[fieldIndex] = JdbcFieldTypeUtils.getString(rs, resultSetIndex);
        }
    }
    break;

Etapa 5: reconstruir e implantar

Depois de fazer as mudanças acima:

  1. Reconstruir o connector-jdbc módulo.
  2. Substitua o existente connector-jdbc-2.3.9.jar Sob o Seatunnel’s connectors diretório.
  3. Reinicie o cluster do Seatunnel.

🧾 Dicas de configuração

  • Se o seu banco de dados Oracle não tem problemas de codificaçãovocê não precisa passar pelo sourceCharset propriedade.
  • Se necessário, passe assim em sua configuração:
sourceCharset=GBK
  • Para depurar o log de connector-jdbcverifique o toras do trabalhador no Seatunnel logs diretório.

✅ Resumo

Ao adicionar um mecanismo simples de troca de charset e ajustar a implementação da fonte JDBC, você pode eliminar caracteres ilegais ao sincronizar os dados do Oracle com Doris usando o SeatUnnel.

Não há mais caracteres quebrados – seu pipeline de dados ficou mais inteligente. 🚀

fonte

DEIXE UMA RESPOSTA

Por favor digite seu comentário!
Por favor, digite seu nome aqui