Slide 1: Introduction to Data Processing Acceleration
Data processing acceleration involves optimizing techniques to handle large datasets efficiently. Python offers various libraries and methods to speed up data manipulation and analysis.
import pandas as pd
import numpy as np
# Load a large dataset
df = pd.read_csv('large_dataset.csv')
# Use vectorized operations for faster processing
df['new_column'] = np.where(df['existing_column'] > 100, 'High', 'Low')
# Efficient groupby operation
result = df.groupby('category')['value'].agg(['mean', 'sum', 'count'])
print(result.head())
Slide 2: Parallel Processing with multiprocessing
Parallel processing allows simultaneous execution of tasks, significantly reducing computation time for CPU-bound operations.
from multiprocessing import Pool
import time
def process_chunk(chunk):
return sum(x ** 2 for x in chunk)
data = list(range(10000000))
chunk_size = len(data) // 4
if __name__ == '__main__':
start = time.time()
with Pool(4) as p:
result = sum(p.map(process_chunk, [data[i:i+chunk_size] for i in range(0, len(data), chunk_size)]))
end = time.time()
print(f"Result: {result}")
print(f"Time taken: {end - start:.2f} seconds")
Slide 3: Numba for Just-in-Time Compilation
Numba accelerates Python functions by compiling them to native machine code at runtime.
import numpy as np
from numba import jit
import time
@jit(nopython=True)
def monte_carlo_pi(nsamples):
acc = 0
for i in range(nsamples):
x = np.random.random()
y = np.random.random()
if (x ** 2 + y ** 2) < 1.0:
acc += 1
return 4.0 * acc / nsamples
nsamples = 10000000
start = time.time()
pi_estimate = monte_carlo_pi(nsamples)
end = time.time()
print(f"Pi estimate: {pi_estimate}")
print(f"Time taken: {end - start:.2f} seconds")
Slide 4: GPU Acceleration with CUDA
GPU acceleration can dramatically speed up certain types of computations, especially in machine learning and scientific computing.
import numpy as np
from numba import cuda
import math
@cuda.jit
def gpu_add(a, b, result):
i = cuda.grid(1)
if i < result.shape[0]:
result[i] = a[i] + b[i]
n = 1000000
a = np.arange(n).astype(np.float32)
b = np.arange(n).astype(np.float32)
result = np.zeros_like(a)
threads_per_block = 256
blocks_per_grid = math.ceil(n / threads_per_block)
gpu_add[blocks_per_grid, threads_per_block](a, b, result)
print("First 10 elements of the result:")
print(result[:10])
Slide 5: Dask for Scalable Computing
Dask provides advanced parallelism for analytics, enabling you to scale your computations to larger-than-memory datasets.
import dask.dataframe as dd
import dask.array as da
import numpy as np
# Create a large Dask array
x = da.random.random((10000, 10000), chunks=(1000, 1000))
# Perform operations on the array
result = (x + 1).mean().compute()
print(f"Mean of the array: {result}")
# Create a Dask DataFrame
df = dd.from_pandas(pd.DataFrame(np.random.randn(100000, 3), columns=['A', 'B', 'C']), npartitions=4)
# Perform operations on the DataFrame
result = df.groupby('A')['B'].mean().compute()
print("Mean of column B grouped by A:")
print(result.head())
Slide 6: Model Training Acceleration: Mini-batch Gradient Descent
Mini-batch gradient descent is a technique to speed up model training by updating parameters based on small batches of data.
import numpy as np
class NeuralNetwork:
def __init__(self, input_size, hidden_size, output_size):
self.W1 = np.random.randn(input_size, hidden_size)
self.W2 = np.random.randn(hidden_size, output_size)
def forward(self, X):
self.z = np.dot(X, self.W1)
self.a = self.sigmoid(self.z)
self.output = np.dot(self.a, self.W2)
return self.output
def sigmoid(self, x):
return 1 / (1 + np.exp(-x))
def train(self, X, y, learning_rate, batch_size, epochs):
for _ in range(epochs):
for i in range(0, len(X), batch_size):
X_batch = X[i:i+batch_size]
y_batch = y[i:i+batch_size]
output = self.forward(X_batch)
error = y_batch - output
d_W2 = np.dot(self.a.T, error)
d_W1 = np.dot(X_batch.T, np.dot(error, self.W2.T) * self.a * (1 - self.a))
self.W2 += learning_rate * d_W2
self.W1 += learning_rate * d_W1
# Usage
nn = NeuralNetwork(input_size=2, hidden_size=4, output_size=1)
X = np.array([[0, 0], [0, 1], [1, 0], [1, 1]])
y = np.array([[0], [1], [1], [0]])
nn.train(X, y, learning_rate=0.1, batch_size=2, epochs=1000)
print("Predictions after training:")
print(nn.forward(X))
Slide 7: Transfer Learning for Faster Model Training
Transfer learning accelerates model training by leveraging pre-trained models and fine-tuning them for specific tasks.
import torch
import torchvision.models as models
from torchvision import transforms
from PIL import Image
# Load a pre-trained ResNet model
model = models.resnet50(pretrained=True)
# Freeze all layers
for param in model.parameters():
param.requires_grad = False
# Replace the last fully connected layer
num_ftrs = model.fc.in_features
model.fc = torch.nn.Linear(num_ftrs, 10) # 10 is the number of classes in our new task
# Prepare an image
image = Image.open('sample_image.jpg')
preprocess = transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])
input_tensor = preprocess(image)
input_batch = input_tensor.unsqueeze(0)
# Make a prediction
with torch.no_grad():
output = model(input_batch)
print("Prediction:", output[0])
Slide 8: Distributed Training with PyTorch
Distributed training allows model training across multiple GPUs or machines, significantly reducing training time for large models.
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP
def setup(rank, world_size):
dist.init_process_group("gloo", rank=rank, world_size=world_size)
def cleanup():
dist.destroy_process_group()
class SimpleModel(torch.nn.Module):
def __init__(self):
super(SimpleModel, self).__init__()
self.linear = torch.nn.Linear(10, 1)
def forward(self, x):
return self.linear(x)
def train(rank, world_size):
setup(rank, world_size)
model = SimpleModel().to(rank)
ddp_model = DDP(model, device_ids=[rank])
loss_fn = torch.nn.MSELoss()
optimizer = torch.optim.SGD(ddp_model.parameters(), lr=0.001)
for _ in range(100):
optimizer.zero_grad()
outputs = ddp_model(torch.randn(20, 10).to(rank))
labels = torch.randn(20, 1).to(rank)
loss = loss_fn(outputs, labels)
loss.backward()
optimizer.step()
cleanup()
if __name__ == "__main__":
world_size = 2
mp.spawn(train, args=(world_size,), nprocs=world_size, join=True)
Slide 9: Model Quantization for Faster Inference
Model quantization reduces model size and improves inference speed by converting floating-point weights to lower-precision formats.
import torch
# Define a simple model
class SimpleModel(torch.nn.Module):
def __init__(self):
super(SimpleModel, self).__init__()
self.conv = torch.nn.Conv2d(3, 10, 3)
self.relu = torch.nn.ReLU()
def forward(self, x):
return self.relu(self.conv(x))
# Create an instance of the model
model = SimpleModel()
# Generate sample input
example_input = torch.rand(1, 3, 224, 224)
# Quantize the model
quantized_model = torch.quantization.quantize_dynamic(
model, {torch.nn.Conv2d}, dtype=torch.qint8
)
# Compare model sizes
print(f"Original model size: {sum(p.numel() for p in model.parameters())}")
print(f"Quantized model size: {sum(p.numel() for p in quantized_model.parameters())}")
# Compare inference times
def benchmark(model, input_data):
start_time = torch.cuda.Event(enable_timing=True)
end_time = torch.cuda.Event(enable_timing=True)
start_time.record()
model(input_data)
end_time.record()
torch.cuda.synchronize()
return start_time.elapsed_time(end_time)
print(f"Original model inference time: {benchmark(model, example_input):.2f} ms")
print(f"Quantized model inference time: {benchmark(quantized_model, example_input):.2f} ms")
Slide 10: Hyperparameter Optimization with Optuna
Efficient hyperparameter optimization can significantly reduce the time needed to find the best model configuration.
import optuna
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
def objective(trial):
iris = load_iris()
X, y = iris.data, iris.target
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
n_estimators = trial.suggest_int('n_estimators', 2, 20)
max_depth = trial.suggest_int('max_depth', 1, 32, log=True)
min_samples_split = trial.suggest_int('min_samples_split', 2, 16)
clf = RandomForestClassifier(n_estimators=n_estimators, max_depth=max_depth,
min_samples_split=min_samples_split)
clf.fit(X_train, y_train)
y_pred = clf.predict(X_test)
return accuracy_score(y_test, y_pred)
study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=100)
print('Number of finished trials:', len(study.trials))
print('Best trial:')
trial = study.best_trial
print(' Value:', trial.value)
print(' Params:')
for key, value in trial.params.items():
print(' {}: {}'.format(key, value))
Slide 11: Real-Life Example: Image Classification Acceleration
This example demonstrates how to accelerate image classification using transfer learning and GPU acceleration.
import torch
import torchvision.models as models
import torchvision.transforms as transforms
from PIL import Image
import time
# Load a pre-trained ResNet model
model = models.resnet50(pretrained=True)
model.eval()
# Move model to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
# Define image transformations
transform = transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])
# Load and preprocess the image
image = Image.open("elephant.jpg")
input_tensor = transform(image)
input_batch = input_tensor.unsqueeze(0).to(device)
# Perform inference
start_time = time.time()
with torch.no_grad():
output = model(input_batch)
end_time = time.time()
# Get the predicted class
_, predicted_idx = torch.max(output, 1)
with open("imagenet_classes.txt") as f:
classes = [line.strip() for line in f.readlines()]
predicted_class = classes[predicted_idx.item()]
print(f"Predicted class: {predicted_class}")
print(f"Inference time: {(end_time - start_time) * 1000:.2f} ms")
Slide 12: Real-Life Example: Natural Language Processing Acceleration
This example shows how to accelerate text classification using BERT and GPU acceleration.
import torch
from transformers import BertTokenizer, BertForSequenceClassification
import time
# Load pre-trained BERT model and tokenizer
model_name = 'bert-base-uncased'
tokenizer = BertTokenizer.from_pretrained(model_name)
model = BertForSequenceClassification.from_pretrained(model_name)
# Move model to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
# Prepare input text
text = "This movie is fantastic! I really enjoyed watching it."
# Tokenize and encode the text
inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=512)
inputs = {k: v.to(device) for k, v in inputs.items()}
# Perform inference
start_time = time.time()
with torch.no_grad():
outputs = model(**inputs)
end_time = time.time()
# Get the predicted class
predicted_class = torch.argmax(outputs.logits).item()
print(f"Predicted class: {predicted_class}")
print(f"Inference time: {(end_time - start_time) * 1000:.2f} ms")
Slide 13: Cython for Performance Boost
Cython allows you to compile Python code to C, providing significant performance improvements for computationally intensive tasks.
# fibonacci.pyx
def fibonacci_cy(int n):
cdef int i
cdef long long a = 0, b = 1
for i in range(n):
a, b = b, a + b
return a
# setup.py
from setuptools import setup
from Cython.Build import cythonize
setup(
ext_modules = cythonize("fibonacci.pyx")
)
# main.py
import time
from fibonacci import fibonacci_cy
def fibonacci_py(n):
a, b = 0, 1
for _ in range(n):
a, b = b, a + b
return a
n = 1000000
start = time.time()
result_py = fibonacci_py(n)
end = time.time()
print(f"Python time: {end - start:.4f} seconds")
start = time.time()
result_cy = fibonacci_cy(n)
end = time.time()
print(f"Cython time: {end - start:.4f} seconds")
print(f"Results match: {result_py == result_cy}")
Slide 14: Memory-Mapped File I/O for Large Datasets
Memory-mapped file I/O allows efficient handling of large datasets that don't fit into memory.
import numpy as np
import time
# Create a large array and save it to a file
large_array = np.random.rand(1000000, 100)
np.save('large_array.npy', large_array)
# Read the array using standard I/O
start = time.time()
array_standard = np.load('large_array.npy')
end = time.time()
print(f"Standard I/O time: {end - start:.4f} seconds")
# Read the array using memory-mapped I/O
start = time.time()
array_mmap = np.load('large_array.npy', mmap_mode='r')
end = time.time()
print(f"Memory-mapped I/O time: {end - start:.4f} seconds")
# Perform an operation on the arrays
start = time.time()
result_standard = np.mean(array_standard, axis=0)
end = time.time()
print(f"Standard array operation time: {end - start:.4f} seconds")
start = time.time()
result_mmap = np.mean(array_mmap, axis=0)
end = time.time()
print(f"Memory-mapped array operation time: {end - start:.4f} seconds")
print(f"Results match: {np.allclose(result_standard, result_mmap)}")
Slide 15: Additional Resources
For further exploration of accelerating data processing and model training:
- "Optimizing Python Code for Data Science" - ArXiv.org: https://arxiv.org/abs/2001.11880
- "Efficient Processing of Deep Neural Networks: A Tutorial and Survey" - ArXiv.org: https://arxiv.org/abs/1703.09039
- "A Survey on Distributed Machine Learning" - ArXiv.org: https://arxiv.org/abs/1912.09789
These resources provide in-depth discussions on various acceleration techniques and their applications in data science and machine learning.