Working code examples for different Chisel CLI use cases including PyTorch training, data processing, and multi-GPU usage
Example | Use Case | GPU Recommendation | Command |
---|---|---|---|
Basic Usage | Matrix operations, getting started | A100_80GB_1 | chisel python basic_example.py |
Command Line Arguments | Scripts with parameters | A100_80GB_1 | chisel python args_example.py --epochs 5 |
Deep Learning | PyTorch model training | A100_80GB_2 | chisel python train_model.py |
Multi-GPU Processing | Parallel GPU computing | A100_80GB_4 | chisel python multi_gpu_example.py |
Data Processing | Large dataset processing | A100_80GB_2 | chisel python process_data.py |
from chisel import ChiselApp, GPUType
app = ChiselApp("basic-example", gpu=GPUType.A100_80GB_2)
@app.capture_trace(trace_name="matrix_multiply", record_shapes=True)
def matrix_multiply(size: int = 1000):
import torch
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"🎯 Using device: {device}")
# Create random matrices
a = torch.randn(size, size, device=device)
b = torch.randn(size, size, device=device)
# Perform matrix multiplication
result = torch.mm(a, b)
print(f"✅ Matrix multiplication completed! Shape: {result.shape}")
return result.cpu().numpy()
@app.capture_trace(trace_name="simple_computation")
def simple_computation(n: int = 1000000):
import torch
device = "cuda" if torch.cuda.is_available() else "cpu"
x = torch.randn(n, device=device)
result = x.pow(2).sum()
print(f"✅ Computation completed! Result: {result.item()}")
return result.item()
if __name__ == "__main__":
print("🚀 Starting Chisel example")
# Run matrix multiplication
matrix_result = matrix_multiply(500)
# Run simple computation
computation_result = simple_computation(100000)
print("✅ Example completed!")
import argparse
from chisel import ChiselApp, GPUType
app = ChiselApp("args-example", gpu=GPUType.A100_80GB_1)
@app.capture_trace(trace_name="parameterized_ops", record_shapes=True)
def parameterized_operations(iterations: int, batch_size: int, learning_rate: float):
import torch
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"🎯 Using device: {device}")
print(f"📊 Parameters: iterations={iterations}, batch_size={batch_size}, lr={learning_rate}")
for i in range(iterations):
# Simulate training step
data = torch.randn(batch_size, 100, device=device)
weights = torch.randn(100, 10, device=device)
# Forward pass
output = torch.mm(data, weights)
loss = torch.mean(output.pow(2))
# Simulate gradient step
grad = torch.autograd.grad(loss, weights, create_graph=False)[0]
weights = weights - learning_rate * grad
if (i + 1) % max(1, iterations // 5) == 0:
print(f" Iteration {i + 1}/{iterations}: Loss = {loss.item():.4f}")
print("✅ Parameterized operations completed!")
return loss.cpu().item()
def main():
parser = argparse.ArgumentParser(description="Chisel CLI Args Example")
parser.add_argument("--iterations", type=int, default=10, help="Number of iterations")
parser.add_argument("--batch-size", type=int, default=32, help="Batch size")
parser.add_argument("--learning-rate", type=float, default=0.01, help="Learning rate")
parser.add_argument("--verbose", action="store_true", help="Verbose output")
args = parser.parse_args()
if args.verbose:
print(f"🔧 Configuration: {args}")
print("🚀 Starting parameterized Chisel example")
result = parameterized_operations(
iterations=args.iterations,
batch_size=args.batch_size,
learning_rate=args.learning_rate
)
print(f"🎯 Final result: {result:.6f}")
print("✅ Example completed!")
if __name__ == "__main__":
main()
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from chisel import ChiselApp, GPUType
app = ChiselApp("deep-learning", gpu=GPUType.A100_80GB_2)
class SimpleNN(nn.Module):
"""Simple neural network for demonstration."""
def __init__(self, input_size: int, hidden_size: int, output_size: int):
super().__init__()
self.layers = nn.Sequential(
nn.Linear(input_size, hidden_size),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(hidden_size, hidden_size // 2),
nn.ReLU(),
nn.Dropout(0.1),
nn.Linear(hidden_size // 2, output_size)
)
def forward(self, x):
return self.layers(x)
@app.capture_trace(trace_name="data_generation")
def generate_synthetic_data(n_samples: int = 10000, n_features: int = 100):
"""Generate synthetic dataset for training."""
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"📊 Generating {n_samples} samples with {n_features} features")
# Generate random features
X = torch.randn(n_samples, n_features, device=device)
# Generate true weights for synthetic target
true_weights = torch.randn(n_features, device=device)
noise = torch.randn(n_samples, device=device) * 0.1
# Create synthetic target
y = torch.mm(X, true_weights.unsqueeze(1)).squeeze() + noise
print(f"✅ Data generated: X.shape={X.shape}, y.shape={y.shape}")
return X.cpu(), y.cpu()
@app.capture_trace(trace_name="model_training", profile_memory=True)
def train_model(X, y, epochs: int = 50, batch_size: int = 256, learning_rate: float = 0.001):
"""Train the neural network."""
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"🎯 Training on: {device}")
# Move data to device
X, y = X.to(device), y.to(device)
# Create dataset and dataloader
dataset = TensorDataset(X, y)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
# Initialize model, loss, and optimizer
model = SimpleNN(X.shape[1], 256, 1).to(device)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
print(f"🧠 Model: {sum(p.numel() for p in model.parameters())} parameters")
# Training loop
model.train()
for epoch in range(epochs):
total_loss = 0
num_batches = 0
for batch_X, batch_y in dataloader:
optimizer.zero_grad()
# Forward pass
outputs = model(batch_X).squeeze()
loss = criterion(outputs, batch_y)
# Backward pass
loss.backward()
optimizer.step()
total_loss += loss.item()
num_batches += 1
avg_loss = total_loss / num_batches
if (epoch + 1) % 10 == 0:
print(f" Epoch [{epoch+1}/{epochs}], Average Loss: {avg_loss:.6f}")
print("✅ Training completed!")
return model.cpu()
@app.capture_trace(trace_name="model_evaluation")
def evaluate_model(model, X_test, y_test):
"""Evaluate the trained model."""
device = "cuda" if torch.cuda.is_available() else "cpu"
model = model.to(device)
X_test, y_test = X_test.to(device), y_test.to(device)
model.eval()
with torch.no_grad():
predictions = model(X_test).squeeze()
mse = nn.MSELoss()(predictions, y_test)
mae = nn.L1Loss()(predictions, y_test)
print(f"📈 Evaluation Results:")
print(f" MSE: {mse.item():.6f}")
print(f" MAE: {mae.item():.6f}")
return {"mse": mse.item(), "mae": mae.item()}
def main():
print("🚀 Starting Deep Learning Example")
# Generate data
X, y = generate_synthetic_data(50000, 100)
# Split data
split_idx = int(0.8 * len(X))
X_train, X_test = X[:split_idx], X[split_idx:]
y_train, y_test = y[:split_idx], y[split_idx:]
# Train model
model = train_model(X_train, y_train, epochs=100)
# Evaluate model
metrics = evaluate_model(model, X_test, y_test)
print("✅ Deep learning example completed!")
return model, metrics
if __name__ == "__main__":
trained_model, results = main()
import torch
import torch.nn as nn
from torch.nn.parallel import DataParallel, DistributedDataParallel
from chisel import ChiselApp, GPUType
app = ChiselApp("multi-gpu", gpu=GPUType.A100_80GB_4)
@app.capture_trace(trace_name="gpu_detection")
def detect_gpu_setup():
"""Detect and report GPU configuration."""
if not torch.cuda.is_available():
print("❌ CUDA not available")
return False, 0
n_gpus = torch.cuda.device_count()
print(f"🎯 Found {n_gpus} GPU(s)")
for i in range(n_gpus):
name = torch.cuda.get_device_name(i)
memory = torch.cuda.get_device_properties(i).total_memory / 1024**3
print(f" GPU {i}: {name} ({memory:.1f}GB)")
return True, n_gpus
class LargeModel(nn.Module):
"""Large model to demonstrate multi-GPU usage."""
def __init__(self, input_size=1000, hidden_size=4096, output_size=1000):
super().__init__()
self.layers = nn.Sequential(
nn.Linear(input_size, hidden_size),
nn.ReLU(),
nn.Dropout(0.1),
nn.Linear(hidden_size, hidden_size),
nn.ReLU(),
nn.Dropout(0.1),
nn.Linear(hidden_size, hidden_size),
nn.ReLU(),
nn.Linear(hidden_size, output_size)
)
def forward(self, x):
return self.layers(x)
@app.capture_trace(trace_name="data_parallel_training", profile_memory=True)
def train_data_parallel(batch_size=512, n_batches=100):
"""Training with DataParallel for single-node multi-GPU."""
has_cuda, n_gpus = detect_gpu_setup()
if not has_cuda:
print("❌ No CUDA GPUs available")
return None
# Create model
model = LargeModel()
# Setup DataParallel if multiple GPUs
if n_gpus > 1:
print(f"🚀 Using DataParallel with {n_gpus} GPUs")
model = DataParallel(model)
effective_batch_size = batch_size * n_gpus
print(f"📊 Effective batch size: {effective_batch_size}")
else:
effective_batch_size = batch_size
model = model.cuda()
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
print(f"🏋️ Training with {sum(p.numel() for p in model.parameters())} parameters")
# Training loop
model.train()
for batch_idx in range(n_batches):
# Generate batch data
inputs = torch.randn(effective_batch_size, 1000, device='cuda')
targets = torch.randn(effective_batch_size, 1000, device='cuda')
optimizer.zero_grad()
# Forward pass
outputs = model(inputs)
loss = criterion(outputs, targets)
# Backward pass
loss.backward()
optimizer.step()
if (batch_idx + 1) % 20 == 0:
memory_used = torch.cuda.max_memory_allocated() / 1024**3
print(f" Batch [{batch_idx+1}/{n_batches}], Loss: {loss.item():.6f}, GPU Memory: {memory_used:.1f}GB")
torch.cuda.reset_peak_memory_stats()
print("✅ Data parallel training completed!")
return model
@app.capture_trace(trace_name="distributed_setup")
def setup_distributed_training():
"""Setup for distributed training across multiple nodes."""
import torch.distributed as dist
import os
# Check if running in distributed environment
if 'WORLD_SIZE' in os.environ:
world_size = int(os.environ['WORLD_SIZE'])
local_rank = int(os.environ.get('LOCAL_RANK', 0))
print(f"🌐 Distributed training: world_size={world_size}, local_rank={local_rank}")
# Initialize process group
dist.init_process_group(backend='nccl')
torch.cuda.set_device(local_rank)
return True, local_rank, world_size
else:
print("💻 Single node training")
return False, 0, 1
@app.capture_trace(trace_name="model_parallel_example")
def model_parallel_example():
"""Demonstrate model parallelism for very large models."""
if torch.cuda.device_count() < 2:
print("⚠️ Need at least 2 GPUs for model parallelism")
return
class ModelParallelNN(nn.Module):
def __init__(self):
super().__init__()
# First part on GPU 0
self.part1 = nn.Sequential(
nn.Linear(1000, 4096),
nn.ReLU(),
nn.Linear(4096, 4096),
nn.ReLU()
).to('cuda:0')
# Second part on GPU 1
self.part2 = nn.Sequential(
nn.Linear(4096, 4096),
nn.ReLU(),
nn.Linear(4096, 1000)
).to('cuda:1')
def forward(self, x):
x = x.to('cuda:0')
x = self.part1(x)
x = x.to('cuda:1')
x = self.part2(x)
return x
model = ModelParallelNN()
print("🔄 Model parallelism setup complete")
# Test forward pass
input_data = torch.randn(128, 1000)
output = model(input_data)
print(f"✅ Model parallel forward pass: {input_data.shape} -> {output.shape}")
return model
def main():
print("🚀 Starting Multi-GPU Example")
# Detect GPU setup
has_cuda, n_gpus = detect_gpu_setup()
if has_cuda:
# Data parallel training
model = train_data_parallel()
# Model parallel example (if enough GPUs)
if n_gpus >= 2:
mp_model = model_parallel_example()
# Distributed training setup
is_distributed, local_rank, world_size = setup_distributed_training()
print("✅ Multi-GPU example completed!")
if __name__ == "__main__":
main()
import torch
import numpy as np
from chisel import ChiselApp, GPUType
app = ChiselApp("data-processing", gpu=GPUType.A100_80GB_2)
@app.capture_trace(trace_name="generate_large_dataset")
def generate_large_dataset(n_samples: int = 1000000, n_features: int = 512):
"""Generate a large synthetic dataset."""
print(f"📊 Generating dataset: {n_samples} samples × {n_features} features")
# Generate on CPU first to simulate real data loading
data = np.random.randn(n_samples, n_features).astype(np.float32)
labels = np.random.randint(0, 10, size=n_samples)
print(f"💾 Dataset size: {data.nbytes / 1024**3:.2f}GB")
return data, labels
@app.capture_trace(trace_name="batch_processing", profile_memory=True)
def process_data_in_batches(data, labels, batch_size: int = 10000):
"""Process large dataset in batches to manage GPU memory."""
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"🎯 Processing on: {device}")
n_samples = len(data)
n_batches = (n_samples + batch_size - 1) // batch_size
print(f"🔄 Processing {n_samples} samples in {n_batches} batches of {batch_size}")
results = []
for i in range(n_batches):
start_idx = i * batch_size
end_idx = min((i + 1) * batch_size, n_samples)
# Load batch to GPU
batch_data = torch.tensor(data[start_idx:end_idx], device=device)
batch_labels = torch.tensor(labels[start_idx:end_idx], device=device)
# Process batch
with torch.cuda.amp.autocast():
# Normalize data
normalized = torch.nn.functional.normalize(batch_data, dim=1)
# Compute features (e.g., PCA-like transformation)
mean = torch.mean(normalized, dim=0, keepdim=True)
centered = normalized - mean
features = torch.mm(centered, centered.t())[:, :10] # Top 10 features
# Apply non-linear transformation
processed = torch.tanh(features) * batch_labels.unsqueeze(1).float()
# Move back to CPU to save GPU memory
results.append(processed.cpu())
# Clear GPU memory
del batch_data, batch_labels, normalized, features, processed
if (i + 1) % 10 == 0:
memory_used = torch.cuda.max_memory_allocated() / 1024**3
print(f" Processed batch {i+1}/{n_batches}, GPU Memory: {memory_used:.2f}GB")
torch.cuda.empty_cache()
# Combine all results
final_result = torch.cat(results, dim=0)
print(f"✅ Processing completed! Result shape: {final_result.shape}")
return final_result.numpy()
@app.capture_trace(trace_name="statistical_analysis")
def compute_statistics(processed_data):
"""Compute statistical analysis on processed data."""
device = "cuda" if torch.cuda.is_available() else "cpu"
data_tensor = torch.tensor(processed_data, device=device)
print("📈 Computing statistics...")
# Basic statistics
mean = torch.mean(data_tensor, dim=0)
std = torch.std(data_tensor, dim=0)
min_vals = torch.min(data_tensor, dim=0)[0]
max_vals = torch.max(data_tensor, dim=0)[0]
# Correlation matrix
centered = data_tensor - mean
cov_matrix = torch.mm(centered.t(), centered) / (data_tensor.shape[0] - 1)
# Eigenvalues for PCA-like analysis
eigenvals = torch.linalg.eigvals(cov_matrix).real
stats = {
'mean': mean.cpu().numpy(),
'std': std.cpu().numpy(),
'min': min_vals.cpu().numpy(),
'max': max_vals.cpu().numpy(),
'eigenvals': eigenvals.cpu().numpy()
}
print(f"📊 Statistics computed for {data_tensor.shape[0]} samples")
return stats
@app.capture_trace(trace_name="parallel_feature_extraction")
def parallel_feature_extraction(data, window_size: int = 100):
"""Extract features using parallel processing."""
device = "cuda" if torch.cuda.is_available() else "cpu"
data_tensor = torch.tensor(data, device=device)
print(f"🔍 Extracting features with window size {window_size}")
n_samples, n_features = data_tensor.shape
n_windows = n_samples - window_size + 1
# Create sliding windows using unfold
windows = data_tensor.unfold(0, window_size, 1) # Shape: (n_windows, n_features, window_size)
# Parallel feature extraction across windows
features = []
# Mean and std across window
window_means = torch.mean(windows, dim=2)
window_stds = torch.std(windows, dim=2)
# Min and max across window
window_mins = torch.min(windows, dim=2)[0]
window_maxs = torch.max(windows, dim=2)[0]
# Combine features
combined_features = torch.cat([
window_means, window_stds, window_mins, window_maxs
], dim=1)
print(f"✅ Extracted features shape: {combined_features.shape}")
return combined_features.cpu().numpy()
def main():
print("🚀 Starting Data Processing Example")
# Generate large dataset
data, labels = generate_large_dataset(500000, 256)
# Process in batches
processed_data = process_data_in_batches(data, labels, batch_size=5000)
# Compute statistics
stats = compute_statistics(processed_data)
# Extract features
features = parallel_feature_extraction(data[:10000], window_size=50)
print("📋 Processing Summary:")
print(f" Original data: {data.shape}")
print(f" Processed data: {processed_data.shape}")
print(f" Features: {features.shape}")
print(f" Mean of processed data: {np.mean(stats['mean']):.4f}")
print("✅ Data processing example completed!")
if __name__ == "__main__":
main()
Memory Management
import torch
import gc
# Clear GPU cache regularly
torch.cuda.empty_cache()
gc.collect()
# Use context managers for temporary tensors
with torch.no_grad():
temp_tensor = torch.randn(1000, 1000, device='cuda')
result = process(temp_tensor)
# temp_tensor automatically cleaned up
# Process large data in chunks
def process_large_tensor(large_tensor, chunk_size=1000):
results = []
for i in range(0, len(large_tensor), chunk_size):
chunk = large_tensor[i:i+chunk_size]
result = process_chunk(chunk)
results.append(result.cpu()) # Move to CPU immediately
return torch.cat(results)
Batch Optimization
def find_max_batch_size(model, input_shape, max_memory_gb=80):
"""Binary search for optimal batch size."""
model = model.cuda()
low, high = 1, 2048
optimal_batch = 1
while low <= high:
mid = (low + high) // 2
try:
# Test batch size
test_input = torch.randn(mid, *input_shape, device='cuda')
with torch.no_grad():
_ = model(test_input)
memory_used = torch.cuda.max_memory_allocated() / 1024**3
if memory_used < max_memory_gb * 0.8:
optimal_batch = mid
low = mid + 1
else:
high = mid - 1
torch.cuda.empty_cache()
except RuntimeError:
high = mid - 1
return optimal_batch
Mixed Precision
from torch.cuda.amp import autocast, GradScaler
scaler = GradScaler()
optimizer = torch.optim.Adam(model.parameters())
for batch in dataloader:
optimizer.zero_grad()
with autocast():
output = model(batch)
loss = criterion(output, target)
scaler.scale(loss).backward()
scaler.step(optimizer)
scaler.update()
Data Loading
from torch.utils.data import DataLoader
# Optimized data loader
dataloader = DataLoader(
dataset,
batch_size=batch_size,
shuffle=True,
num_workers=4, # Parallel data loading
pin_memory=True, # Faster GPU transfer
persistent_workers=True, # Keep workers alive
prefetch_factor=2 # Prefetch batches
)
# Async data transfer
for batch in dataloader:
batch = batch.cuda(non_blocking=True)
# Process batch
# Template for robust GPU processing
@app.capture_trace(trace_name="robust_processing", profile_memory=True)
def robust_gpu_function(data):
try:
device = "cuda" if torch.cuda.is_available() else "cpu"
# Clear memory at start
if device == "cuda":
torch.cuda.empty_cache()
# Process with error handling
result = process_data_safely(data, device)
return result
except RuntimeError as e:
if "out of memory" in str(e):
print("⚠️ GPU memory error - trying smaller batch size")
return process_with_smaller_batches(data)
else:
raise e
finally:
# Cleanup
if device == "cuda":
torch.cuda.empty_cache()