Início Tecnologia Por que seu trabalho de cdc de setennel está pendurado na fase...

Por que seu trabalho de cdc de setennel está pendurado na fase instantânea (e como consertá -lo)

1
0

 

Problema encontrado

Em nosso projeto, o SeatUnnel é usado para extrair dados do banco de dados de negócios para o data warehouse (Starrocks), e já usamos com sucesso o MySQL-CDC para sincronização em tempo real em larga escala. No entanto, encontramos uma questão anormal ao sincronizar uma tabela MySQL específica: depois que o trabalho começou, os logs mostraram zero contagem de leitura e gravação, e o trabalho não parou por um longo tempo. Após 6 horas em execução, ele terminou com um erro de tempo limite do ponto de verificação.

A estrutura do trabalho é a seguinte (informações confidenciais removidas):

” alt=”” aria-hidden=”true” />

Torros -chave durante a execução:

Fundo

  • Cenário: Extração de dados em tempo real do MySQL para Starrocks usando o MySQL-CDC
  • Seatunnel Versão: 2.3.9
  • Versão MySQL: 8.x.
  • Starrocks Versão: 3.2
  • Volume de dados da tabela de origem: 60-70 milhões de linhas

Questões -chave

  1. Por que as contagens de leitura e gravação permanecem em 0?
  2. Por que leva tanto tempo para lançar um erro de tempo limite?

Processo de análise

Usamos o MySQL-CDC para muitos trabalhos de sincronização antes, e as configurações eram as mesmas, então o problema provavelmente não está no próprio Seatunnel.

Comparamos esta tabela de origem com os anteriormente bem -sucedidos para ver se havia diferenças.

Com certeza, encontramos algo suspeito:

Todas as tabelas anteriores tinham teclas primárias de incremento automático; este não. Ele tinha apenas vários índices exclusivos.

Então, surge a pergunta: como exatamente o Seatunnel Sync Data?

Até onde sabemos, o SeatUnnel usa uma abordagem em duas etapas ao sincronizar dados do CDC: Primeira sincronização de instantâneos e depois sincronização incremental baseada em binlog.

Como a contagem de leitura permanece zero, o trabalho deve estar preso na fase de instantâneos. Então, como funciona a sincronização do instantâneo?

Verificamos os documentos oficiais do Seatunnel:

Mysql cdc | Apache Seatunnel:

https://seatunnel.apache.org/zh-cn/docs/2.3.9/connector-v2/source/mysql-cdc

Não há nenhuma explicação arquitetônica, mas encontramos alguns parâmetros configuráveis.

Explicação do parâmetro

chunk-key.even-distribution.factor.upper-bound

Valor padrão: 100

Descrição:

O limite superior do fator de distribuição de chaves do pedaço. Esse fator é usado para determinar se os dados da tabela são distribuídos uniformemente. Se o fator de distribuição (por exemplo, (max (id) – min (id) + 1) / contagem de linhas) for ≤ esse valor, a tabela é considerada distribuída uniformemente e usará Chunking uniforme. Se exceder esse valor, e o número estimado de fragmentos ultrapassar sample-sharding.thresholda estratégia de sharding baseada em amostragem será usada. Padrão: 100.0

chunk-key.even-distribution.factor.lower-bound

Valor padrão: 0.5

Descrição:

O limite inferior do fator de distribuição. Se o fator de distribuição for ≥ esse valor, a tabela será considerada distribuída uniformemente. Caso contrário, é considerado irregular e pode desencadear o sharding baseado em amostragem.

sample-sharding.threshold

Valor padrão: 1000

Descrição:

Se o fator de distribuição estiver fora do [lower, upper] Range e o número estimado de fragmentos (aprox. Contagem de linhas / tamanho de pedaço) excede esse limite, a estratégia de sharding baseada em amostragem será usada. Isso melhora a eficiência para grandes conjuntos de dados.

inverse-sampling.rate

Valor padrão: 1000

Descrição:

Usado em sharding baseado em amostragem. Um valor de 1000 significa uma taxa de amostragem de 1/1000. Ele controla a granularidade da amostragem e afeta o número de fragmentos finais.

snapshot.split.size

Valor padrão: 8096

Descrição:

O número de linhas por pedaço na sincronização instantânea. As mesas serão divididas em pedaços com base nisso.

snapshot.fetch.size

Valor padrão: 1024

Descrição:

Número máximo de linhas buscadas por pesquisa durante a leitura do instantâneo.

Com esses parâmetros, aprendemos:

Durante a sincronização do instantâneo, o Seatunnel Chunks em várias divisões. A estratégia de sharding depende se os dados são distribuídos uniformemente.

Nossa mesa possui ~ 60 milhões de linhas (estimadas pela equipe de negócios, pois não conseguimos contá -las diretamente).

Como a tabela não tem chave primária, não tínhamos certeza do que o campo de Seatunnel usa para Chunking.

Assumimos que ele usava a coluna ID, que possui um índice exclusivo e testado:

SELECT MAX(ID), MIN(ID) FROM table;
  • Valor da chave máxima: 804306477418
  • Valor da chave min: 607312608210
  • Fator de distribuição = (804306477418 – 607312608210 + 1) / 60.000.000 ≈ 3283.23

Isso está claramente fora do [0.5, 100] A faixa “mesmo” → So Seatunnel considera essa distribuição desigual.

  • Tamanho do pedaço padrão: 8096
  • Contagem de shard = 60.000.000 / 8096 ≈ 7411 → maior que sample-sharding.threshold (1000)

Portanto, o Seatunnel provavelmente mudou para o sharding baseado em amostragem.

  • Taxa de amostragem (inversa): 1000 → Precisa provar 60.000 linhas

Nesse ponto, estávamos convencidos de que o Seatunnel estava preso a amostragem – e ficamos curiosos: como exatamente isso é amostrado? Por que isso dura 6 horas?

Mesmo com 60m de linhas, a amostragem de 60k não deve ser que lento. Certamente está digitalizando a coluna ID (que tem um índice exclusivo)?

Decidimos mergulhar no código -fonte.

Github: https://github.com/apache/seatunnel/

A arquitetura de Seatunnel é bastante complexa, e a configuração do ambiente levou um dia inteiro (principalmente a configuração de dependência).

Encontrar a lógica crítica levou outro dia – rastreamos de mensagens de log e pesquisas de palavras -chave.

Análise parcial do código -fonte

private List splitTableIntoChunks(
        JdbcConnection jdbc, TableId tableId, Column splitColumn) throws Exception {
    final String splitColumnName = splitColumn.name();
    // Get min/max values
    final Object[] minMax = queryMinMax(jdbc, tableId, splitColumn);
    final Object min = minMax[0];
    final Object max = minMax[1];
    if (min == null || max == null || min.equals(max)) {
        // Empty table or only one row — full table scan as a chunk
        return Collections.singletonList(ChunkRange.all());
    }

    // Get chunk size, distribution factor bounds, and sampling threshold from config
    final int chunkSize = sourceConfig.getSplitSize();
    final double distributionFactorUpper = sourceConfig.getDistributionFactorUpper();
    final double distributionFactorLower = sourceConfig.getDistributionFactorLower();
    final int sampleShardingThreshold = sourceConfig.getSampleShardingThreshold();

    log.info("Splitting table {} into chunks, split column: {}, min: {}, max: {}, chunk size: {}, "
            + "distribution factor upper: {}, distribution factor lower: {}, sample sharding threshold: {}",
            tableId, splitColumnName, min, max, chunkSize,
            distributionFactorUpper, distributionFactorLower, sampleShardingThreshold);

    if (isEvenlySplitColumn(splitColumn)) {
        long approximateRowCnt = queryApproximateRowCnt(jdbc, tableId);
        double distributionFactor = calculateDistributionFactor(tableId, min, max, approximateRowCnt);
        boolean dataIsEvenlyDistributed =
            doubleCompare(distributionFactor, distributionFactorLower) >= 0 &&
            doubleCompare(distributionFactor, distributionFactorUpper) <= 0;

        if (dataIsEvenlyDistributed) {
            final int dynamicChunkSize = Math.max((int) (distributionFactor * chunkSize), 1);
            return splitEvenlySizedChunks(tableId, min, max, approximateRowCnt, chunkSize, dynamicChunkSize);
        } else {
            int shardCount = (int) (approximateRowCnt / chunkSize);
            int inverseSamplingRate = sourceConfig.getInverseSamplingRate();
            if (sampleShardingThreshold < shardCount) {
                if (inverseSamplingRate > chunkSize) {
                    log.warn("inverseSamplingRate {} > chunkSize {}, adjusting...", inverseSamplingRate, chunkSize);
                    inverseSamplingRate = chunkSize;
                }
                log.info("Using sampling sharding for table {}, rate = {}", tableId, inverseSamplingRate);
                Object[] sample = sampleDataFromColumn(jdbc, tableId, splitColumn, inverseSamplingRate);
                log.info("Sampled {} records from table {}", sample.length, tableId);
                return efficientShardingThroughSampling(tableId, sample, approximateRowCnt, shardCount);
            }
            return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize);
        }
    } else {
        return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize);
    }
}

Vamos nos concentrar na lógica de amostragem:

public static Object[] skipReadAndSortSampleData(
    JdbcConnection jdbc, TableId tableId, String columnName, int inverseSamplingRate
) throws Exception {
    final String sampleQuery = String.format("SELECT %s FROM %s", quote(columnName), quote(tableId));
    Statement stmt = null;
    ResultSet rs = null;
    List

Esta é a lógica de amostragem principal:

Ele digitaliza toda a linha da tabela por linha, amostragem 1 de cada 1000 registros.

Isso explica por que estava funcionando tão devagar – vimos Processing row indexmensagens nos toras e se perguntou o que eles estavam fazendo.

Aproximadamente 60.000 IDs foram amostrados.

Agora, para a estratégia de sharding baseada em amostragem:

protected List efficientShardingThroughSampling(
    TableId tableId, Object[] sampleData, long approximateRowCnt, int shardCount
) {
    log.info("Using sampling-based sharding on table {}, approx rows: {}, shards: {}",
            tableId, approximateRowCnt, shardCount);

    List splits = new ArrayList<>();
    if (shardCount == 0) {
        splits.add(ChunkRange.of(null, null));
        return splits;
    }

    double approxSamplePerShard = (double) sampleData.length / shardCount;
    Object lastEnd = null;

    if (approxSamplePerShard <= 1) {
        splits.add(ChunkRange.of(null, sampleData[0]));
        lastEnd = sampleData[0];
        for (int i = 1; i < sampleData.length; i++) {
            if (!sampleData[i].equals(lastEnd)) {
                splits.add(ChunkRange.of(lastEnd, sampleData[i]));
                lastEnd = sampleData[i];
            }
        }
        splits.add(ChunkRange.of(lastEnd, null));
    } else {
        for (int i = 0; i < shardCount; i++) {
            Object chunkStart = lastEnd;
            Object chunkEnd = (i < shardCount - 1)
                ? sampleData[(int) ((i + 1) * approxSamplePerShard)]
                : null;
            if (i == 0 || i == shardCount - 1 || !Objects.equals(chunkEnd, chunkStart)) {
                splits.add(ChunkRange.of(chunkStart, chunkEnd));
                lastEnd = chunkEnd;
            }
        }
    }

    return splits;
}

Cada pedaço recebe um início e um final distintos com base nos IDs amostrados classificados – sem sobreposição.

Vamos olhar para o ChunkRange classe que representa o resultado:

O shatching de instantâneo permite leituras de dados paralelos, acelerando a sincronização histórica.

Solução final

Através da análise acima, confirmamos que o trabalho estava preso na fase instantânea que executa a amostragem, acionada porque o Seatunnel determinou que a tabela de origem foi distribuída de forma desigual.

Como o trabalho de sincronização foi bloqueado por dias, criamos uma correção simples: ajuste os limiares do fator de distribuição para que o Seatunnel tratasse a tabela como distribuída uniformemente e pula a amostragem.

O intervalo de fatores padrão é 0.5 ~ 100mas o fator da nossa tabela foi ~ 3283 – por isso aumentamos o limite superior para 4000. A configuração final foi:

snapshot.split.size: Nossa tabela era altamente escassa, por isso aumentamos esse valor drasticamente (multiplicado aleatoriamente por 1000 – reconhecidamente não muito científico).

table-names-config: Especificado manualmente a chave primária e a chave dividida, pois a tabela não tinha chave primária e não tínhamos certeza de qual coluna o Seatunnel usou. Melhor ser explícito.

Resultado final

Finalmente começou a sincronizar! 🎉

fonte