Multigrid and MgNet¶
This lecture includes:
Multigrid
MgNet
1. A framework of both Multigrid and MgNet¶
Comment out the code for MgNet, you will obtain the multigrid code.
Comment out the code for MG, you will obtain the MgNet code.
import torch
import numpy as np
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
from timeit import default_timer as timer
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
use_cuda = torch.cuda.is_available()
print('Use GPU?', use_cuda)
##### For MG: inilization of A, S, Pi, R, RT #####
def get_mg_init(A=None, S=None, Pi=None, R=None, RT=None):
A_kernel = torch.tensor([[[[0,-1,0],[-1,4,-1],[0,-1,0]]]],dtype=torch.float32)
S_kernel = torch.tensor([[[[0,1/64,0],[1/64,12/64,1/64],[0,1/64,0]]]],dtype=torch.float32)
Pi_kernel = torch.tensor([[[[0,0,0],[0,0,0],[0,0,0]]]],dtype=torch.float32)
R_kernel = torch.tensor([[[[0,0.5,0.5],[0.5,1,0.5],[0.5,0.5,0]]]],dtype=torch.float32)
RT_kernel = torch.tensor([[[[0,0.5,0.5],[0.5,1,0.5],[0.5,0.5,0]]]],dtype=torch.float32)
if A is not None:
A.weight = torch.nn.Parameter(A_kernel)
if S is not None:
S.weight = torch.nn.Parameter(S_kernel)
if Pi is not None:
Pi.weight = torch.nn.Parameter(Pi_kernel)
if R is not None:
R.weight = torch.nn.Parameter(R_kernel)
if RT is not None:
RT.weight = torch.nn.Parameter(RT_kernel)
return
##### For MG: setup for prolongation and error calculation #####
RT = nn.ConvTranspose2d(1, 1, kernel_size=3, stride=2, padding=0, bias=False)
get_mg_init(None,None,None,None,RT)
A = nn.Conv2d(1, 1, kernel_size=3,stride=1, padding=1, bias=False)
get_mg_init(A,None,None,None,None)
class MgIte(nn.Module):
def __init__(self, A, S):
super().__init__()
get_mg_init(A=A,S=S) ##### For MG: inilization of A, S #####
self.A = A
self.S = S
self.bn1 =nn.BatchNorm2d(A.weight.size(0)) ##### For MgNet: BN #####
self.bn2 =nn.BatchNorm2d(S.weight.size(0)) ##### For MgNet: BN #####
def forward(self, out):
u, f = out
u = u + (self.S(((f-self.A(u))))) ##### For MG: u = u + S*(f-A*u) #####
u = u + F.relu(self.bn2(self.S(F.relu(self.bn1((f-self.A(u))))))) ##### For MgNet: add BN and ReLU #####
out = (u, f)
return out
class MgRestriction(nn.Module):
def __init__(self, A_old, A, Pi, R):
super().__init__()
get_mg_init(A=A,Pi=Pi,R=R) ##### For MG: inilization of A, Pi, R #####
self.A_old = A_old
self.A = A
self.Pi = Pi
self.R = R
self.bn1 = nn.BatchNorm2d(Pi.weight.size(0)) ##### For MgNet: BN #####
self.bn2 = nn.BatchNorm2d(R.weight.size(0)) ##### For MgNet: BN #####
def forward(self, out):
u_old, f_old = out
u = self.Pi(u_old) ##### For MG: u = Pi*u_old #####
f = self.R(f_old-self.A_old(u_old)) + self.A(u) ##### For MG: f = R*(f_old-A_old*u_old) + A*u #####
u = F.relu(self.bn1(self.Pi(u_old))) ##### For MgNet: add BN and ReLU #####
f = F.relu(self.bn2(self.R(f_old-self.A_old(u_old)))) + self.A(u) ##### For MgNet: add BN and ReLU #####
out = (u,f)
return out
class MG(nn.Module):
def __init__(self, num_channel_input, num_iteration, num_channel_u, num_channel_f, num_classes):
super().__init__()
self.num_iteration = num_iteration
self.num_channel_u = num_channel_u
##### For MgNet: Initialization layer #####
self.conv1 = nn.Conv2d(num_channel_input, num_channel_f, kernel_size=3, stride=1, padding=1, bias=False)
self.bn1 = nn.BatchNorm2d(num_channel_f)
A = nn.Conv2d(num_channel_u, num_channel_f, kernel_size=3, stride=1, padding=1, bias=False)
S = nn.Conv2d(num_channel_f, num_channel_u, kernel_size=3,stride=1, padding=1, bias=False)
layers = []
for l, num_iteration_l in enumerate(num_iteration): #l: l-th layer. num_iteration_l: the number of iterations of l-th layer
for i in range(num_iteration_l):
layers.append(MgIte(A, S))
setattr(self, 'layer'+str(l), nn.Sequential(*layers))
# set attribute. This is equivalent to define
# self.layer1 = nn.Sequential(*layers)
# self.layer2 = nn.Sequential(*layers)
# ...
# self.layerJ = nn.Sequential(*layers)
if l < len(num_iteration)-1:
A_old = A
A = nn.Conv2d(num_channel_u, num_channel_f, kernel_size=3,stride=1, padding=1, bias=False)
S = nn.Conv2d(num_channel_f, num_channel_u, kernel_size=3,stride=1, padding=1, bias=False)
##### For MG: padding=0 #####
Pi = nn.Conv2d(num_channel_u, num_channel_u, kernel_size=3,stride=2, padding=0, bias=False)
R = nn.Conv2d(num_channel_f, num_channel_f, kernel_size=3, stride=2, padding=0, bias=False)
##### For MgNet: padding=1 #####
Pi = nn.Conv2d(num_channel_u, num_channel_u, kernel_size=3,stride=2, padding=1, bias=False)
R = nn.Conv2d(num_channel_f, num_channel_f, kernel_size=3, stride=2, padding=1, bias=False)
layers= [MgRestriction(A_old, A, Pi, R)]
##### For MgNet: average pooling and fully connected layer for classification #####
self.pooling = nn.AdaptiveAvgPool2d(1) # pooling the data in each channel to size=1
self.fc = nn.Linear(num_channel_u ,num_classes)
def forward(self, u, f):
f = F.relu(self.bn1(self.conv1(f))) ##### For MgNet: initialization of f #####
if use_cuda: ##### For MgNet: initialization of u #####
u = torch.zeros(f.size(0),self.num_channel_u,f.size(2),f.size(3), device=torch.device('cuda'))
else:
u = torch.zeros(f.size(0),self.num_channel_u,f.size(2),f.size(3))
out = (u, f)
u_list.append(u) ##### For MG: save u^j, j=1,2,...,J #####
for l in range(len(self.num_iteration)):
out = getattr(self, 'layer'+str(l))(out)
u, f = out ##### For MG: save u^j, j=1,2,...,J #####
u_list.append(u) ##### For MG: save u^j, j=1,2,...,J #####
##### For MgNet: average pooling and fully connected layer for classification #####
u, f = out
u = self.pooling(u)
u = u.view(u.shape[0], -1)
u = self.fc(u)
return u
---------------------------------------------------------------------------
ModuleNotFoundError Traceback (most recent call last)
<ipython-input-1-19d8455291c8> in <module>
----> 1 import torch
2 import numpy as np
3 import torch.nn as nn
4 import torch.nn.functional as F
5 import torch.optim as optim
ModuleNotFoundError: No module named 'torch'
2. Apply Multigrid to solve the solving the following system¶
(59)¶\[\begin{equation}\label{matrix}
A\ast u =f,
\end{equation}\]
where \(A\ast\) is a convolution for one channel with stride 1 and zero padding \(1\) $\( A=\begin{bmatrix} 0 & -1 & 0 \\ -1 & 4 & -1 \\ 0 & 1 & 0 \end{bmatrix},~~ \)\( and \) u \in \mathbb{R}^{n\times n} \(, \) f\in \mathbb{R}^{n\times n}\( and \)f_{i,j}=\dfrac{1}{(n+1)^2}$
Mulrigrid code includes: (a) comment out the code in 1 for MgNet; (b) the setup and postprocessing code below¶
def plot_solution(J,u,label_name):
N = 2 ** J -1
h = 1/2**J
X = np.arange(h, 1, h)
Y = np.arange(h, 1, h)
X, Y = np.meshgrid(X,Y) # create a mesh
a = torch.reshape(u, (N, N))
fig1 = plt.figure()
ax = Axes3D(fig1) # plot a 3D surface, (X,Y,u(X,Y))
ax.plot_surface(X, Y, np.array(a.data), rstride=1, cstride=1, cmap=plt.cm.coolwarm)
ax.set_title(label_name)
def plot_error(M,error,label_name):
#print(np.linalg.norm((f-self.A(u)).reshape(-1).detach().numpy()))
plt.figure()
plt.title('Error vs number of iterations using '+label_name)
plot = plt.plot(error)
plt.xlabel('Number of iterations')
plt.yscale('log')
plt.ylabel('Error')
plt.show()
def MG1(u,f,J,num_iteration):
u_list.clear() # Save u^0,u^1,u^2,u^3...,u^J
u = MG0(u,f)
for j in range(J-1,0,-1):
u_list[j] += RT(u_list[j+1])
u = u_list[1]
return u
# Model setup
num_channel_input = 1
num_channel_u = 1
num_channel_f = 1
num_classes = 1
J = 4
num_iteration = [2,2,2,2]
MG0=MG(num_channel_input, num_iteration, num_channel_u, num_channel_f, num_classes)
##### For MG: PDE setup u=sin(2*pi*x)*sin(2*pi*y) #####
N = 2 ** J -1
h = 1/2**J
u_exact = torch.ones(1,1,N,N)
f = torch.ones(1,1,N,N) / (N+1) **2
##### For MG: Muligrid iteration #####
M = 100
u = torch.randn(1,1,N,N)
error = [np.linalg.norm((A(u)-f).detach().numpy())] # calculate the Frobenius Norm of (A*u-f)
u_list =[] # Save u^0,u^1,u^2,u^3...,u^J
for m in range(M):
u = MG1(u,f,J,num_iteration)
error.append(np.linalg.norm((A(u)-f).detach().numpy())) # calculate the Frobenius Norm of (A*u-f)
##### Lian added for MG: Plot results #####
plot_error(M,error,'Multigrid')
plot_solution(J,u,'Numerical solution')
3. Build and training MgNet on Cifar10¶
MgNet code includes: (a) comment out the code in 1 for Multigrid; (b) the setup, training and test code below¶
def adjust_learning_rate(optimizer, epoch, init_lr):
#lr = 1.0 / (epoch + 1)
lr = init_lr * 0.1 ** (epoch // 30)
for param_group in optimizer.param_groups:
param_group['lr'] = lr
return lr
minibatch_size = 128
num_epochs = 120
lr = 0.1
num_channel_input = 3
num_channel_u = 64
num_channel_f = 64
num_classes = 10
num_iteration = [1,1,1,1]
# Step 1: Define a model
my_model = MgNet(num_channel_input, num_iteration, num_channel_u, num_channel_f, num_classes)
if use_cuda:
my_model = my_model.cuda()
# Step 2: Define a loss function and training algorithm
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(my_model.parameters(), lr=lr, momentum=0.9, weight_decay = 0.0005)
# Step 3: load dataset
normalize = torchvision.transforms.Normalize(mean=(0.4914, 0.4822, 0.4465), std=(0.2023, 0.1994, 0.2010))
transform_train = torchvision.transforms.Compose([torchvision.transforms.RandomCrop(32, padding=4),
torchvision.transforms.RandomHorizontalFlip(),
torchvision.transforms.ToTensor(),
normalize])
transform_test = torchvision.transforms.Compose([torchvision.transforms.ToTensor(),normalize])
trainset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform_train)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=minibatch_size, shuffle=True)
testset = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform_test)
testloader = torch.utils.data.DataLoader(testset, batch_size=minibatch_size, shuffle=False)
# classes = ('plane', 'car', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck')
start = timer()
#Step 4: Train the NNs
# One epoch is when an entire dataset is passed through the neural network only once.
for epoch in range(num_epochs):
start_epoch = timer()
current_lr = adjust_learning_rate(optimizer, epoch, lr)
start_training = timer()
my_model.train()
for i, (images, labels) in enumerate(trainloader):
if use_cuda:
images = images.cuda()
labels = labels.cuda()
# Forward pass to get the loss
outputs = my_model(0,images) # We need additional 0 input for u in MgNet
loss = criterion(outputs, labels)
# Backward and compute the gradient
optimizer.zero_grad()
loss.backward() #backpropragation
optimizer.step() #update the weights/parameters
end_training = timer()
print('Computation Time for training:',end_training - start_training)
# Training accuracy
start_training_acc = timer()
my_model.eval()
correct = 0
total = 0
for i, (images, labels) in enumerate(trainloader):
with torch.no_grad():
if use_cuda:
images = images.cuda()
labels = labels.cuda()
outputs = my_model(0,images) # We need additional 0 input for u in MgNet
p_max, predicted = torch.max(outputs, 1)
total += labels.size(0)
correct += (predicted == labels).sum()
training_accuracy = float(correct)/total
end_training_acc = timer()
print('Computation Time for training accuracy:',end_training_acc - start_training_acc)
# Test accuracy
start_test_acc = timer()
correct = 0
total = 0
for i, (images, labels) in enumerate(testloader):
with torch.no_grad():
if use_cuda:
images = images.cuda()
labels = labels.cuda()
outputs = my_model(0,images) # We need additional 0 input for u in MgNet
p_max, predicted = torch.max(outputs, 1)
total += labels.size(0)
correct += (predicted == labels).sum()
test_accuracy = float(correct)/total
end_test_acc = timer()
print('Computation Time for test accuracy:',end_test_acc - start_test_acc)
print('Epoch: {}, learning rate: {}, the training accuracy: {}, the test accuracy: {}' .format(epoch+1,current_lr,training_accuracy,test_accuracy))
end_epoch = timer()
print('Computation Time for one epoch:',end_epoch - start_epoch)
end = timer()
print('Total Computation Time:',end - start)