import torch import torch.nn as nn import torch.nn.functional as F from .initialization import initialize_resnet from .SharedUtils import additive_func class Downsample(nn.Module): def __init__(self, nIn, nOut, stride): super(Downsample, self).__init__() assert stride == 2 and nOut == 2 * nIn, "stride:{} IO:{},{}".format( stride, nIn, nOut ) self.in_dim = nIn self.out_dim = nOut self.avg = nn.AvgPool2d(kernel_size=2, stride=2, padding=0) self.conv = nn.Conv2d(nIn, nOut, kernel_size=1, stride=1, padding=0, bias=False) def forward(self, x): x = self.avg(x) out = self.conv(x) return out class ConvBNReLU(nn.Module): def __init__(self, nIn, nOut, kernel, stride, padding, bias, relu): super(ConvBNReLU, self).__init__() self.conv = nn.Conv2d( nIn, nOut, kernel_size=kernel, stride=stride, padding=padding, bias=bias ) self.bn = nn.BatchNorm2d(nOut) if relu: self.relu = nn.ReLU(inplace=True) else: self.relu = None self.out_dim = nOut self.num_conv = 1 def forward(self, x): conv = self.conv(x) bn = self.bn(conv) if self.relu: return self.relu(bn) else: return bn class ResNetBasicblock(nn.Module): expansion = 1 def __init__(self, inplanes, planes, stride): super(ResNetBasicblock, self).__init__() assert stride == 1 or stride == 2, "invalid stride {:}".format(stride) self.conv_a = ConvBNReLU(inplanes, planes, 3, stride, 1, False, True) self.conv_b = ConvBNReLU(planes, planes, 3, 1, 1, False, False) if stride == 2: self.downsample = Downsample(inplanes, planes, stride) elif inplanes != planes: self.downsample = ConvBNReLU(inplanes, planes, 1, 1, 0, False, False) else: self.downsample = None self.out_dim = planes self.num_conv = 2 def forward(self, inputs): basicblock = self.conv_a(inputs) basicblock = self.conv_b(basicblock) if self.downsample is not None: residual = self.downsample(inputs) else: residual = inputs out = additive_func(residual, basicblock) return F.relu(out, inplace=True) class ResNetBottleneck(nn.Module): expansion = 4 def __init__(self, inplanes, planes, stride): super(ResNetBottleneck, self).__init__() assert stride == 1 or stride == 2, "invalid stride {:}".format(stride) self.conv_1x1 = ConvBNReLU(inplanes, planes, 1, 1, 0, False, True) self.conv_3x3 = ConvBNReLU(planes, planes, 3, stride, 1, False, True) self.conv_1x4 = ConvBNReLU( planes, planes * self.expansion, 1, 1, 0, False, False ) if stride == 2: self.downsample = Downsample(inplanes, planes * self.expansion, stride) elif inplanes != planes * self.expansion: self.downsample = ConvBNReLU( inplanes, planes * self.expansion, 1, 1, 0, False, False ) else: self.downsample = None self.out_dim = planes * self.expansion self.num_conv = 3 def forward(self, inputs): bottleneck = self.conv_1x1(inputs) bottleneck = self.conv_3x3(bottleneck) bottleneck = self.conv_1x4(bottleneck) if self.downsample is not None: residual = self.downsample(inputs) else: residual = inputs out = additive_func(residual, bottleneck) return F.relu(out, inplace=True) class CifarResNet(nn.Module): def __init__(self, block_name, depth, num_classes, zero_init_residual): super(CifarResNet, self).__init__() # Model type specifies number of layers for CIFAR-10 and CIFAR-100 model if block_name == "ResNetBasicblock": block = ResNetBasicblock assert (depth - 2) % 6 == 0, "depth should be one of 20, 32, 44, 56, 110" layer_blocks = (depth - 2) // 6 elif block_name == "ResNetBottleneck": block = ResNetBottleneck assert (depth - 2) % 9 == 0, "depth should be one of 164" layer_blocks = (depth - 2) // 9 else: raise ValueError("invalid block : {:}".format(block_name)) self.message = "CifarResNet : Block : {:}, Depth : {:}, Layers for each block : {:}".format( block_name, depth, layer_blocks ) self.num_classes = num_classes self.channels = [16] self.layers = nn.ModuleList([ConvBNReLU(3, 16, 3, 1, 1, False, True)]) for stage in range(3): for iL in range(layer_blocks): iC = self.channels[-1] planes = 16 * (2 ** stage) stride = 2 if stage > 0 and iL == 0 else 1 module = block(iC, planes, stride) self.channels.append(module.out_dim) self.layers.append(module) self.message += "\nstage={:}, ilayer={:02d}/{:02d}, block={:03d}, iC={:3d}, oC={:3d}, stride={:}".format( stage, iL, layer_blocks, len(self.layers) - 1, iC, module.out_dim, stride, ) self.avgpool = nn.AvgPool2d(8) self.classifier = nn.Linear(module.out_dim, num_classes) assert ( sum(x.num_conv for x in self.layers) + 1 == depth ), "invalid depth check {:} vs {:}".format( sum(x.num_conv for x in self.layers) + 1, depth ) self.apply(initialize_resnet) if zero_init_residual: for m in self.modules(): if isinstance(m, ResNetBasicblock): nn.init.constant_(m.conv_b.bn.weight, 0) elif isinstance(m, ResNetBottleneck): nn.init.constant_(m.conv_1x4.bn.weight, 0) def get_message(self): return self.message def forward(self, inputs): x = inputs for i, layer in enumerate(self.layers): x = layer(x) features = self.avgpool(x) features = features.view(features.size(0), -1) logits = self.classifier(features) return features, logits