Problema encontrado
Em nosso projeto, o SeatUnnel é usado para extrair dados do banco de dados de negócios para o data warehouse (Starrocks), e já usamos com sucesso o MySQL-CDC para sincronização em tempo real em larga escala. No entanto, encontramos uma questão anormal ao sincronizar uma tabela MySQL específica: depois que o trabalho começou, os logs mostraram zero contagem de leitura e gravação, e o trabalho não parou por um longo tempo. Após 6 horas em execução, ele terminou com um erro de tempo limite do ponto de verificação.
A estrutura do trabalho é a seguinte (informações confidenciais removidas):
Torros -chave durante a execução:
Fundo
- Cenário: Extração de dados em tempo real do MySQL para Starrocks usando o MySQL-CDC
- Seatunnel Versão: 2.3.9
- Versão MySQL: 8.x.
- Starrocks Versão: 3.2
- Volume de dados da tabela de origem: 60-70 milhões de linhas
Questões -chave
- Por que as contagens de leitura e gravação permanecem em 0?
- Por que leva tanto tempo para lançar um erro de tempo limite?
Processo de análise
Usamos o MySQL-CDC para muitos trabalhos de sincronização antes, e as configurações eram as mesmas, então o problema provavelmente não está no próprio Seatunnel.
Comparamos esta tabela de origem com os anteriormente bem -sucedidos para ver se havia diferenças.
Com certeza, encontramos algo suspeito:
Todas as tabelas anteriores tinham teclas primárias de incremento automático; este não. Ele tinha apenas vários índices exclusivos.
Então, surge a pergunta: como exatamente o Seatunnel Sync Data?
Até onde sabemos, o SeatUnnel usa uma abordagem em duas etapas ao sincronizar dados do CDC: Primeira sincronização de instantâneos e depois sincronização incremental baseada em binlog.
Como a contagem de leitura permanece zero, o trabalho deve estar preso na fase de instantâneos. Então, como funciona a sincronização do instantâneo?
Verificamos os documentos oficiais do Seatunnel:
Mysql cdc | Apache Seatunnel:
https://seatunnel.apache.org/zh-cn/docs/2.3.9/connector-v2/source/mysql-cdc
Não há nenhuma explicação arquitetônica, mas encontramos alguns parâmetros configuráveis.
Explicação do parâmetro
chunk-key.even-distribution.factor.upper-bound
Valor padrão: 100
Descrição:
O limite superior do fator de distribuição de chaves do pedaço. Esse fator é usado para determinar se os dados da tabela são distribuídos uniformemente. Se o fator de distribuição (por exemplo, (max (id) – min (id) + 1) / contagem de linhas) for ≤ esse valor, a tabela é considerada distribuída uniformemente e usará Chunking uniforme. Se exceder esse valor, e o número estimado de fragmentos ultrapassar sample-sharding.threshold
a estratégia de sharding baseada em amostragem será usada. Padrão: 100.0
chunk-key.even-distribution.factor.lower-bound
Valor padrão: 0.5
Descrição:
O limite inferior do fator de distribuição. Se o fator de distribuição for ≥ esse valor, a tabela será considerada distribuída uniformemente. Caso contrário, é considerado irregular e pode desencadear o sharding baseado em amostragem.
sample-sharding.threshold
Valor padrão: 1000
Descrição:
Se o fator de distribuição estiver fora do [lower, upper] Range e o número estimado de fragmentos (aprox. Contagem de linhas / tamanho de pedaço) excede esse limite, a estratégia de sharding baseada em amostragem será usada. Isso melhora a eficiência para grandes conjuntos de dados.
inverse-sampling.rate
Valor padrão: 1000
Descrição:
Usado em sharding baseado em amostragem. Um valor de 1000 significa uma taxa de amostragem de 1/1000. Ele controla a granularidade da amostragem e afeta o número de fragmentos finais.
snapshot.split.size
Valor padrão: 8096
Descrição:
O número de linhas por pedaço na sincronização instantânea. As mesas serão divididas em pedaços com base nisso.
snapshot.fetch.size
Valor padrão: 1024
Descrição:
Número máximo de linhas buscadas por pesquisa durante a leitura do instantâneo.
Com esses parâmetros, aprendemos:
Durante a sincronização do instantâneo, o Seatunnel Chunks em várias divisões. A estratégia de sharding depende se os dados são distribuídos uniformemente.
Nossa mesa possui ~ 60 milhões de linhas (estimadas pela equipe de negócios, pois não conseguimos contá -las diretamente).
Como a tabela não tem chave primária, não tínhamos certeza do que o campo de Seatunnel usa para Chunking.
Assumimos que ele usava a coluna ID, que possui um índice exclusivo e testado:
SELECT MAX(ID), MIN(ID) FROM table;
- Valor da chave máxima: 804306477418
- Valor da chave min: 607312608210
- Fator de distribuição = (804306477418 – 607312608210 + 1) / 60.000.000 ≈ 3283.23
Isso está claramente fora do [0.5, 100] A faixa “mesmo” → So Seatunnel considera essa distribuição desigual.
- Tamanho do pedaço padrão: 8096
- Contagem de shard = 60.000.000 / 8096 ≈ 7411 → maior que
sample-sharding.threshold
(1000)
Portanto, o Seatunnel provavelmente mudou para o sharding baseado em amostragem.
- Taxa de amostragem (inversa): 1000 → Precisa provar 60.000 linhas
Nesse ponto, estávamos convencidos de que o Seatunnel estava preso a amostragem – e ficamos curiosos: como exatamente isso é amostrado? Por que isso dura 6 horas?
Mesmo com 60m de linhas, a amostragem de 60k não deve ser que lento. Certamente está digitalizando a coluna ID (que tem um índice exclusivo)?
Decidimos mergulhar no código -fonte.
Github: https://github.com/apache/seatunnel/
A arquitetura de Seatunnel é bastante complexa, e a configuração do ambiente levou um dia inteiro (principalmente a configuração de dependência).
Encontrar a lógica crítica levou outro dia – rastreamos de mensagens de log e pesquisas de palavras -chave.
Análise parcial do código -fonte
private List splitTableIntoChunks(
JdbcConnection jdbc, TableId tableId, Column splitColumn) throws Exception {
final String splitColumnName = splitColumn.name();
// Get min/max values
final Object[] minMax = queryMinMax(jdbc, tableId, splitColumn);
final Object min = minMax[0];
final Object max = minMax[1];
if (min == null || max == null || min.equals(max)) {
// Empty table or only one row — full table scan as a chunk
return Collections.singletonList(ChunkRange.all());
}
// Get chunk size, distribution factor bounds, and sampling threshold from config
final int chunkSize = sourceConfig.getSplitSize();
final double distributionFactorUpper = sourceConfig.getDistributionFactorUpper();
final double distributionFactorLower = sourceConfig.getDistributionFactorLower();
final int sampleShardingThreshold = sourceConfig.getSampleShardingThreshold();
log.info("Splitting table {} into chunks, split column: {}, min: {}, max: {}, chunk size: {}, "
+ "distribution factor upper: {}, distribution factor lower: {}, sample sharding threshold: {}",
tableId, splitColumnName, min, max, chunkSize,
distributionFactorUpper, distributionFactorLower, sampleShardingThreshold);
if (isEvenlySplitColumn(splitColumn)) {
long approximateRowCnt = queryApproximateRowCnt(jdbc, tableId);
double distributionFactor = calculateDistributionFactor(tableId, min, max, approximateRowCnt);
boolean dataIsEvenlyDistributed =
doubleCompare(distributionFactor, distributionFactorLower) >= 0 &&
doubleCompare(distributionFactor, distributionFactorUpper) <= 0;
if (dataIsEvenlyDistributed) {
final int dynamicChunkSize = Math.max((int) (distributionFactor * chunkSize), 1);
return splitEvenlySizedChunks(tableId, min, max, approximateRowCnt, chunkSize, dynamicChunkSize);
} else {
int shardCount = (int) (approximateRowCnt / chunkSize);
int inverseSamplingRate = sourceConfig.getInverseSamplingRate();
if (sampleShardingThreshold < shardCount) {
if (inverseSamplingRate > chunkSize) {
log.warn("inverseSamplingRate {} > chunkSize {}, adjusting...", inverseSamplingRate, chunkSize);
inverseSamplingRate = chunkSize;
}
log.info("Using sampling sharding for table {}, rate = {}", tableId, inverseSamplingRate);
Object[] sample = sampleDataFromColumn(jdbc, tableId, splitColumn, inverseSamplingRate);
log.info("Sampled {} records from table {}", sample.length, tableId);
return efficientShardingThroughSampling(tableId, sample, approximateRowCnt, shardCount);
}
return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize);
}
} else {
return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize);
}
}
Vamos nos concentrar na lógica de amostragem:
public static Object[] skipReadAndSortSampleData(
JdbcConnection jdbc, TableId tableId, String columnName, int inverseSamplingRate
) throws Exception {
final String sampleQuery = String.format("SELECT %s FROM %s", quote(columnName), quote(tableId));
Statement stmt = null;
ResultSet rs = null;
List
Esta é a lógica de amostragem principal:
Ele digitaliza toda a linha da tabela por linha, amostragem 1 de cada 1000 registros.
Isso explica por que estava funcionando tão devagar – vimos Processing row index
mensagens nos toras e se perguntou o que eles estavam fazendo.
Aproximadamente 60.000 IDs foram amostrados.
Agora, para a estratégia de sharding baseada em amostragem:
protected List efficientShardingThroughSampling(
TableId tableId, Object[] sampleData, long approximateRowCnt, int shardCount
) {
log.info("Using sampling-based sharding on table {}, approx rows: {}, shards: {}",
tableId, approximateRowCnt, shardCount);
List splits = new ArrayList<>();
if (shardCount == 0) {
splits.add(ChunkRange.of(null, null));
return splits;
}
double approxSamplePerShard = (double) sampleData.length / shardCount;
Object lastEnd = null;
if (approxSamplePerShard <= 1) {
splits.add(ChunkRange.of(null, sampleData[0]));
lastEnd = sampleData[0];
for (int i = 1; i < sampleData.length; i++) {
if (!sampleData[i].equals(lastEnd)) {
splits.add(ChunkRange.of(lastEnd, sampleData[i]));
lastEnd = sampleData[i];
}
}
splits.add(ChunkRange.of(lastEnd, null));
} else {
for (int i = 0; i < shardCount; i++) {
Object chunkStart = lastEnd;
Object chunkEnd = (i < shardCount - 1)
? sampleData[(int) ((i + 1) * approxSamplePerShard)]
: null;
if (i == 0 || i == shardCount - 1 || !Objects.equals(chunkEnd, chunkStart)) {
splits.add(ChunkRange.of(chunkStart, chunkEnd));
lastEnd = chunkEnd;
}
}
}
return splits;
}
Cada pedaço recebe um início e um final distintos com base nos IDs amostrados classificados – sem sobreposição.
Vamos olhar para o ChunkRange
classe que representa o resultado:
O shatching de instantâneo permite leituras de dados paralelos, acelerando a sincronização histórica.
Solução final
Através da análise acima, confirmamos que o trabalho estava preso na fase instantânea que executa a amostragem, acionada porque o Seatunnel determinou que a tabela de origem foi distribuída de forma desigual.
Como o trabalho de sincronização foi bloqueado por dias, criamos uma correção simples: ajuste os limiares do fator de distribuição para que o Seatunnel tratasse a tabela como distribuída uniformemente e pula a amostragem.
O intervalo de fatores padrão é 0.5 ~ 100
mas o fator da nossa tabela foi ~ 3283 – por isso aumentamos o limite superior para 4000. A configuração final foi:
snapshot.split.size
: Nossa tabela era altamente escassa, por isso aumentamos esse valor drasticamente (multiplicado aleatoriamente por 1000 – reconhecidamente não muito científico).
table-names-config
: Especificado manualmente a chave primária e a chave dividida, pois a tabela não tinha chave primária e não tínhamos certeza de qual coluna o Seatunnel usou. Melhor ser explícito.
Resultado final
Finalmente começou a sincronizar! 🎉