Início Tecnologia TF.Distribute 101: Treinando Keras em vários dispositivos e máquinas

TF.Distribute 101: Treinando Keras em vários dispositivos e máquinas

1
0

 

Visão geral do conteúdo

  • Introdução
  • Configurar
  • Treinamento síncrono de vários dispositivos de vários dispositivos
  • Usando retornos de chamada para garantir a tolerância a falhas
  • dicas de desempenho tf.data
  • Treinamento síncrono distribuído de vários trabalhadores
  • Exemplo: código em execução em uma configuração de vários trabalhadores
  • Leitura adicional

Introdução

Geralmente, existem duas maneiras de distribuir a computação em vários dispositivos:

Paralelismo de dadosonde um único modelo é replicado em vários dispositivos ou várias máquinas. Cada um deles processa diferentes lotes de dados e, em seguida, fundem seus resultados. Existem muitas variantes dessa configuração, que diferem na maneira como as diferentes réplicas de modelo mesclam resultados, se permanecem sincronizadas a cada lote ou se são mais vagamente acopladas, etc.

Paralelismo do modeloonde diferentes partes de um único modelo são executadas em diferentes dispositivos, processando um único lote de dados juntos. Isso funciona melhor com modelos que possuem uma arquitetura naturalmente paralela, como modelos que apresentam várias ramificações.

Este guia se concentra no paralelismo de dados, em particular Paralelismo de dados síncronosonde as diferentes réplicas do modelo permanecem sincronizadas após cada lote que processam. A sincronicidade mantém o comportamento de convergência do modelo idêntico ao que você veria para o treinamento em um único dispositivo.

Especificamente, este guia ensina como usar o tf.distribute API para treinar modelos de Keras em várias GPUs, com alterações mínimas em seu código, nas duas configurações a seguir:

  • Em várias GPUs (normalmente 2 a 8) instaladas em uma única máquina (host único, treinamento de vários dispositivos). Esta é a configuração mais comum para pesquisadores e fluxos de trabalho da indústria em pequena escala.
  • Em um cluster de muitas máquinas, cada uma hospedando uma ou várias GPUs (treinamento distribuído por vários trabalhadores). Esta é uma boa configuração para fluxos de trabalho da indústria em larga escala, por exemplo, treinamento de modelos de classificação de imagem de alta resolução em dezenas de milhões de imagens usando 20-100 GPUs.

Configurar


import tensorflow as tf
import keras

Treinamento síncrono de vários dispositivos de vários dispositivos

Nesta configuração, você tem uma máquina com várias GPUs (normalmente 2 a 8). Cada dispositivo executará uma cópia do seu modelo (chamada de A réplica). Por simplicidade, a seguir, assumiremos que estamos lidando com 8 GPUs, sem perda de generalidade.

Como funciona

Em cada etapa do treinamento:

  • O lote atual de dados (chamado Lote global) é dividido em 8 sub-lotes diferentes (chamados lotes locais). Por exemplo, se o lote global tiver 512 amostras, cada um dos 8 lotes locais terá 64 amostras.
  • Cada uma das 8 réplicas processa independentemente um lote local: elas executam um passe para a frente e, em seguida, um passe para trás, emitindo o gradiente dos pesos em relação à perda do modelo no lote local.
  • As atualizações de peso originárias dos gradientes locais são mescladas eficientemente nas 8 réplicas. Como isso é feito no final de cada etapa, as réplicas sempre permanecem sincronizadas.

Na prática, o processo de atualização de síncrona dos pesos das réplicas de modelo é tratado no nível de cada variável de peso individual. Isso é feito através de um variável espelhada objeto.

Como usá -lo

Para fazer treinamento síncrono de vários dispositivos com um modelo Keras, você usaria o tf.distribute.MirroredStrategy API. Aqui está como funciona:

  • Instanciar a MirroredStrategyopcionalmente, configurando quais dispositivos específicos você deseja usar (por padrão, a estratégia usará todas as GPUs disponíveis).
  • Use o objeto de estratégia para abrir um escopo e, dentro desse escopo, crie todos os objetos Keras que você precisa que contenham variáveis. Normalmente, isso significa Criando e compilando o modelo dentro do escopo de distribuição.
  • Treine o modelo via fit() como de costume.

Importante, recomendamos que você use tf.data.Dataset Objetos para carregar dados em um fluxo de trabalho com vários dispositivos ou distribuídos.

Esquematicamente, parece assim:


# Create a MirroredStrategy.
strategy = tf.distribute.MirroredStrategy()
print('Number of devices: {}'.format(strategy.num_replicas_in_sync))

# Open a strategy scope.
with strategy.scope():
  # Everything that creates variables should be under the strategy scope.
  # In general this is only model construction & `compile()`.
  model = Model(...)
  model.compile(...)

# Train the model on all available devices.
model.fit(train_dataset, validation_data=val_dataset, ...)

# Test the model on all available devices.
model.evaluate(test_dataset)

Aqui está um exemplo simples de ponta a ponta:


def get_compiled_model():
    # Make a simple 2-layer densely-connected neural network.
    inputs = keras.Input(shape=(784,))
    x = keras.layers.Dense(256, activation="relu")(inputs)
    x = keras.layers.Dense(256, activation="relu")(x)
    outputs = keras.layers.Dense(10)(x)
    model = keras.Model(inputs, outputs)
    model.compile(
        optimizer=keras.optimizers.Adam(),
        loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        metrics=[keras.metrics.SparseCategoricalAccuracy()],
    )
    return model


def get_dataset():
    batch_size = 32
    num_val_samples = 10000

    # Return the MNIST dataset in the form of a `tf.data.Dataset`.
    (x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()

    # Preprocess the data (these are Numpy arrays)
    x_train = x_train.reshape(-1, 784).astype("float32") / 255
    x_test = x_test.reshape(-1, 784).astype("float32") / 255
    y_train = y_train.astype("float32")
    y_test = y_test.astype("float32")

    # Reserve num_val_samples samples for validation
    x_val = x_train[-num_val_samples:]
    y_val = y_train[-num_val_samples:]
    x_train = x_train[:-num_val_samples]
    y_train = y_train[:-num_val_samples]
    return (
        tf.data.Dataset.from_tensor_slices((x_train, y_train)).batch(batch_size),
        tf.data.Dataset.from_tensor_slices((x_val, y_val)).batch(batch_size),
        tf.data.Dataset.from_tensor_slices((x_test, y_test)).batch(batch_size),
    )


# Create a MirroredStrategy.
strategy = tf.distribute.MirroredStrategy()
print("Number of devices: {}".format(strategy.num_replicas_in_sync))

# Open a strategy scope.
with strategy.scope():
    # Everything that creates variables should be under the strategy scope.
    # In general this is only model construction & `compile()`.
    model = get_compiled_model()

# Train the model on all available devices.
train_dataset, val_dataset, test_dataset = get_dataset()
model.fit(train_dataset, epochs=2, validation_data=val_dataset)

# Test the model on all available devices.
model.evaluate(test_dataset)

INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1', '/job:localhost/replica:0/task:0/device:GPU:2', '/job:localhost/replica:0/task:0/device:GPU:3')
Number of devices: 4
2023-07-19 11:35:32.379801: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:786] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 50000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:0"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 784
        }
      }
      shape {
      }
    }
  }
}
attr {
  key: "replicate_on_split"
  value {
    b: false
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
}
Epoch 1/2
INFO:tensorflow:Collective all_reduce tensors: 6 all_reduces, num_devices = 4, group_size = 4, implementation = CommunicationImplementation.NCCL, num_packs = 1
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Collective all_reduce tensors: 6 all_reduces, num_devices = 4, group_size = 4, implementation = CommunicationImplementation.NCCL, num_packs = 1
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
1556/1563 [============================>.] - ETA: 0s - loss: 0.2236 - sparse_categorical_accuracy: 0.9328INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
2023-07-19 11:35:46.769935: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:786] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 10000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:2"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 784
        }
      }
      shape {
      }
    }
  }
}
attr {
  key: "replicate_on_split"
  value {
    b: false
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
}
1563/1563 [==============================] - 16s 7ms/step - loss: 0.2238 - sparse_categorical_accuracy: 0.9328 - val_loss: 0.1347 - val_sparse_categorical_accuracy: 0.9592
Epoch 2/2
1563/1563 [==============================] - 11s 7ms/step - loss: 0.0940 - sparse_categorical_accuracy: 0.9717 - val_loss: 0.0984 - val_sparse_categorical_accuracy: 0.9684
2023-07-19 11:35:59.993148: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:786] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 10000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:4"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 784
        }
      }
      shape {
      }
    }
  }
}
attr {
  key: "replicate_on_split"
  value {
    b: false
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
}
313/313 [==============================] - 2s 4ms/step - loss: 0.1057 - sparse_categorical_accuracy: 0.9676
[0.10571097582578659, 0.9675999879837036]

Usando retornos de chamada para garantir a tolerância a falhas

Ao usar o treinamento distribuído, você deve sempre garantir que tenha uma estratégia para se recuperar da falha (tolerância a falhas). A maneira mais simples de lidar com isso é passar ModelCheckpoint retorno de chamada para fit()para salvar seu modelo em intervalos regulares (por exemplo, a cada 100 lotes ou todas as épocas). Você pode reiniciar o treinamento do seu modelo salvo.

Aqui está um exemplo simples:


import os
from tensorflow import keras

# Prepare a directory to store all the checkpoints.
checkpoint_dir = "./ckpt"
if not os.path.exists(checkpoint_dir):
    os.makedirs(checkpoint_dir)


def make_or_restore_model():
    # Either restore the latest model, or create a fresh one
    # if there is no checkpoint available.
    checkpoints = [checkpoint_dir + "/" + name for name in os.listdir(checkpoint_dir)]
    if checkpoints:
        latest_checkpoint = max(checkpoints, key=os.path.getctime)
        print("Restoring from", latest_checkpoint)
        return keras.models.load_model(latest_checkpoint)
    print("Creating a new model")
    return get_compiled_model()


def run_training(epochs=1):
    # Create a MirroredStrategy.
    strategy = tf.distribute.MirroredStrategy()

    # Open a strategy scope and create/restore the model
    with strategy.scope():
        model = make_or_restore_model()

    callbacks = [
        # This callback saves a SavedModel every epoch
        # We include the current epoch in the folder name.
        keras.callbacks.ModelCheckpoint(
            filepath=checkpoint_dir + "/ckpt-{epoch}", save_freq="epoch"
        )
    ]
    model.fit(
        train_dataset,
        epochs=epochs,
        callbacks=callbacks,
        validation_data=val_dataset,
        verbose=2,
    )


# Running the first time creates the model
run_training(epochs=1)

# Calling the same function again will resume from where we left off
run_training(epochs=1)

INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1', '/job:localhost/replica:0/task:0/device:GPU:2', '/job:localhost/replica:0/task:0/device:GPU:3')
Creating a new model
2023-07-19 11:36:01.811216: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:786] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 50000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:0"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 784
        }
      }
      shape {
      }
    }
  }
}
attr {
  key: "replicate_on_split"
  value {
    b: false
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
}
INFO:tensorflow:Collective all_reduce tensors: 6 all_reduces, num_devices = 4, group_size = 4, implementation = CommunicationImplementation.NCCL, num_packs = 1
INFO:tensorflow:Collective all_reduce tensors: 6 all_reduces, num_devices = 4, group_size = 4, implementation = CommunicationImplementation.NCCL, num_packs = 1
2023-07-19 11:36:13.671835: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:786] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 10000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:2"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 784
        }
      }
      shape {
      }
    }
  }
}
attr {
  key: "replicate_on_split"
  value {
    b: false
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
}
INFO:tensorflow:Assets written to: ./ckpt/ckpt-1/assets
INFO:tensorflow:Assets written to: ./ckpt/ckpt-1/assets
1563/1563 - 14s - loss: 0.2268 - sparse_categorical_accuracy: 0.9322 - val_loss: 0.1148 - val_sparse_categorical_accuracy: 0.9656 - 14s/epoch - 9ms/step
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1', '/job:localhost/replica:0/task:0/device:GPU:2', '/job:localhost/replica:0/task:0/device:GPU:3')
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1', '/job:localhost/replica:0/task:0/device:GPU:2', '/job:localhost/replica:0/task:0/device:GPU:3')
Restoring from ./ckpt/ckpt-1
2023-07-19 11:36:16.521031: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:786] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 50000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:0"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 784
        }
      }
      shape {
      }
    }
  }
}
attr {
  key: "replicate_on_split"
  value {
    b: false
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
}
INFO:tensorflow:Collective all_reduce tensors: 6 all_reduces, num_devices = 4, group_size = 4, implementation = CommunicationImplementation.NCCL, num_packs = 1
INFO:tensorflow:Collective all_reduce tensors: 6 all_reduces, num_devices = 4, group_size = 4, implementation = CommunicationImplementation.NCCL, num_packs = 1
INFO:tensorflow:Collective all_reduce tensors: 6 all_reduces, num_devices = 4, group_size = 4, implementation = CommunicationImplementation.NCCL, num_packs = 1
INFO:tensorflow:Collective all_reduce tensors: 6 all_reduces, num_devices = 4, group_size = 4, implementation = CommunicationImplementation.NCCL, num_packs = 1
2023-07-19 11:36:28.440092: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:786] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 10000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:2"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 784
        }
      }
      shape {
      }
    }
  }
}
attr {
  key: "replicate_on_split"
  value {
    b: false
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
}
INFO:tensorflow:Assets written to: ./ckpt/ckpt-1/assets
INFO:tensorflow:Assets written to: ./ckpt/ckpt-1/assets
1563/1563 - 13s - loss: 0.0974 - sparse_categorical_accuracy: 0.9703 - val_loss: 0.0960 - val_sparse_categorical_accuracy: 0.9724 - 13s/epoch - 9ms/step

tf.data Dicas de desempenho

Ao fazer treinamento distribuído, a eficiência com a qual você carrega dados geralmente pode se tornar crítica. Aqui estão algumas dicas para garantir que seu tf.data Os pipelines funcionam o mais rápido possível.

Nota sobre o lote de dados

Ao criar seu conjunto de dados, verifique se ele está em lote com o tamanho do lote global. Por exemplo, se cada um dos seus 8 GPUs for capaz de executar um lote de 64 amostras, você liga para usar um tamanho global em lote de 512.

Chamando dataset.cache()

Se você ligar .cache() Em um conjunto de dados, seus dados serão armazenados em cache após a primeira iteração sobre os dados. Cada iteração subsequente usará os dados em cache. O cache pode estar na memória (padrão) ou em um arquivo local que você especificar.

Isso pode melhorar o desempenho quando:

  • Seus dados não devem mudar de iteração para iteração
  • Você está lendo dados de um sistema de arquivos distribuído remoto
  • Você está lendo dados do disco local, mas seus dados se encaixariam na memória e seu fluxo de trabalho é significativamente ligado a IO (por exemplo, arquivos de imagem de leitura e decodificação).

Chamando dataset.prefetch(buffer_size)

Você quase sempre deveria ligar .prefetch(buffer_size) Depois de criar um conjunto de dados. Isso significa que seu pipeline de dados será executado de forma assíncrona a partir do seu modelo, com novas amostras sendo pré -processadas e armazenadas em um buffer enquanto as amostras de lote atuais são usadas para treinar o modelo. O próximo lote será pré -buscado na memória da GPU quando o lote atual terminar.

Treinamento síncrono distribuído de vários trabalhadores

Como funciona

Nesta configuração, você tem várias máquinas (chamadas trabalhadores), cada um com uma ou várias GPUs nelas. Muito parecido com o que acontece para o treinamento de host único, cada GPU disponível executará uma réplica de modelo, e o valor das variáveis ​​de cada réplica é mantido em sincronia após cada lote.

É importante ressaltar que a implementação atual pressupõe que todos os trabalhadores tenham o mesmo número de GPUs (cluster homogêneo).

Como usá -lo

  1. Configure um cluster (fornecemos ponteiros abaixo).
  2. Configurar um apropriado TF_CONFIG variável de ambiente em cada trabalhador. Isso diz ao trabalhador qual é o seu papel e como se comunicar com seus colegas.
  3. Em cada trabalhador, execute o código de construção e compilação do seu modelo dentro do escopo de um MultiWorkerMirroredStrategy Objeto, da mesma forma que fizemos para o treinamento único.
  4. Execute o código de avaliação em uma máquina de avaliador designada.

Configurando um cluster

Primeiro, configure um cluster (coletivo de máquinas). Cada máquina individualmente deve ser configurada para poder executar seu modelo (normalmente, cada máquina executará a mesma imagem do Docker) e capaz de acessar sua fonte de dados (por exemplo, GCS).

O gerenciamento de cluster está além do escopo deste guia. Aqui está um documento para ajudá -lo a começar. Você também pode dar uma olhada no Kubeflow.

Configurando o TF_CONFIG variável de ambiente

Enquanto o código em execução em cada trabalhador é quase o mesmo que o código usado no fluxo de trabalho único (exceto com um diferente tf.distribute objeto de estratégia), uma diferença significativa entre o fluxo de trabalho de host único e o fluxo de trabalho de vários trabalhadores é que você precisa definir um TF_CONFIG Variável de ambiente em cada máquina em execução em seu cluster.

O TF_CONFIG A variável de ambiente é uma string json que especifica:

  • A configuração do cluster, enquanto a lista de endereços e portas das máquinas que compõem o cluster
  • A “tarefa” do trabalhador, que é o papel que essa máquina específica precisa desempenhar no cluster.

Um exemplo de tf_config é:


os.environ['TF_CONFIG'] = json.dumps({
    'cluster': {
        'worker': ["localhost:12345", "localhost:23456"]
    },
    'task': {'type': 'worker', 'index': 0}
})

Na configuração de treinamento síncrono de vários trabalhadores, as funções válidas (tipos de tarefas) para as máquinas são “trabalhadores” e “avaliador”.

Por exemplo, se você tiver 8 máquinas com 4 GPUs cada, poderá ter 7 trabalhadores e um avaliador.

  • Os trabalhadores treinam o modelo, cada um sub-lotes de processamento de um lote global.
  • Um dos trabalhadores (trabalhadores 0) servirá como “chefe”, um tipo específico de trabalhador responsável por salvar logs e pontos de verificação para reutilização posterior (normalmente para um local de armazenamento em nuvem).
  • O avaliador executa um loop contínuo que carrega o ponto de verificação mais recente salvo pelo trabalhador -chefe, executa a avaliação (assíncrona dos outros trabalhadores) e escreve registros de avaliação (por exemplo, registros de tensorboard).

Executando o código em cada trabalhador

Você executaria o código de treinamento em cada trabalhador (incluindo o chefe) e o código de avaliação no avaliador.

O código de treinamento é basicamente o mesmo que você usaria na configuração de host único, exceto usando MultiWorkerMirroredStrategy em vez de MirroredStrategy.

Cada trabalhador executava o mesmo código (menos a diferença explicada na nota abaixo), incluindo os mesmos retornos de chamada.

Observação: Os retornos de chamada que salvam pontos de verificação do modelo ou logs devem salvar em um diretório diferente para cada trabalhador. É prática padrão que todos os trabalhadores devem economizar no disco local (que normalmente é temporário), Exceto trabalhador 0que salvaria os pontos de verificação de logs do Tensorboard em um local de armazenamento em nuvem para acesso e reutilização posteriores.

O avaliador simplesmente usaria MirroredStrategy (Como é executado em uma única máquina e não precisa se comunicar com outras máquinas) e ligar model.evaluate(). Ele estaria carregando o ponto de verificação mais recente salvo pelo trabalhador -chefe em um local de armazenamento em nuvem e salvaria os logs de avaliação no mesmo local que os principais registros.

Exemplo: código em execução em uma configuração de vários trabalhadores

No chefe (trabalhador 0):


# Set TF_CONFIG
os.environ['TF_CONFIG'] = json.dumps({
    'cluster': {
        'worker': ["localhost:12345", "localhost:23456"]
    },
    'task': {'type': 'worker', 'index': 0}
})


# Open a strategy scope and create/restore the model.
strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
with strategy.scope():
  model = make_or_restore_model()

callbacks = [
    # This callback saves a SavedModel every 100 batches
    keras.callbacks.ModelCheckpoint(filepath='path/to/cloud/location/ckpt',
                                    save_freq=100),
    keras.callbacks.TensorBoard('path/to/cloud/location/tb/')
]
model.fit(train_dataset,
          callbacks=callbacks,
          ...)

Em outros trabalhadores:


# Set TF_CONFIG
worker_index = 1  # For instance
os.environ['TF_CONFIG'] = json.dumps({
    'cluster': {
        'worker': ["localhost:12345", "localhost:23456"]
    },
    'task': {'type': 'worker', 'index': worker_index}
})


# Open a strategy scope and create/restore the model.
# You can restore from the checkpoint saved by the chief.
strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
with strategy.scope():
  model = make_or_restore_model()

callbacks = [
    keras.callbacks.ModelCheckpoint(filepath='local/path/ckpt', save_freq=100),
    keras.callbacks.TensorBoard('local/path/tb/')
]
model.fit(train_dataset,
          callbacks=callbacks,
          ...)

No avaliador:


strategy = tf.distribute.MirroredStrategy()
with strategy.scope():
  model = make_or_restore_model()  # Restore from the checkpoint saved by the chief.

results = model.evaluate(val_dataset)
# Then, log the results on a shared location, write TensorBoard logs, etc

Publicado originalmente no Tensorflow Site, este artigo aparece aqui sob uma nova manchete e é licenciado no CC por 4.0. Amostras de código compartilhadas sob a licença Apache 2.0

fonte

DEIXE UMA RESPOSTA

Por favor digite seu comentário!
Por favor, digite seu nome aqui