import os
# os.environ['CUDA_VISIBLE_DEVICES']='2'
Tiny Imagenet
import shutil,timm,os,torch,random,datasets,math
import fastcore.all as fc, numpy as np, matplotlib as mpl, matplotlib.pyplot as plt
import k_diffusion as K, torchvision.transforms as T
import torchvision.transforms.functional as TF,torch.nn.functional as F
from torch.utils.data import DataLoader,default_collate
from pathlib import Path
from torch.nn import init
from fastcore.foundation import L
from torch import nn,tensor
from operator import itemgetter
from torcheval.metrics import MulticlassAccuracy
from functools import partial
from torch.optim import lr_scheduler
from torch import optim
from torchvision.io import read_image,ImageReadMode
from glob import glob
from fastAIcourse.datasets import *
from fastAIcourse.conv import *
from fastAIcourse.learner import *
from fastAIcourse.activations import *
from fastAIcourse.init import *
from fastAIcourse.sgd import *
from fastAIcourse.resnet import *
from fastAIcourse.augment import *
from fastAIcourse.accel import *
from fastAIcourse.training import *
from fastprogress import progress_bar
=5, linewidth=140, sci_mode=False)
torch.set_printoptions(precision1)
torch.manual_seed('figure.dpi'] = 70
mpl.rcParams[
42)
set_seed(if fc.defaults.cpus>8: fc.defaults.cpus=8
= Path('Data')
path_data =True)
path_data.mkdir(exist_ok= path_data/'tiny-imagenet-200'
path
= 'http://cs231n.stanford.edu/tiny-imagenet-200.zip'
url if not path.exists():
= fc.urlsave(url, path_data)
path_zip 'Data/tiny-imagenet-200.zip', 'data')
shutil.unpack_archive(
= 512
bs
class TinyDS:
def __init__(self, path):
self.path = Path(path)
self.files = glob(str(path/'**/*.JPEG'), recursive=True)
def __len__(self): return len(self.files)
def __getitem__(self, i): return self.files[i],Path(self.files[i]).parent.parent.name
= TinyDS(path/'train') tds
0] tds[
('Data/tiny-imagenet-200/train/n02074367/images/n02074367_322.JPEG',
'n02074367')
= path/'val'/'val_annotations.txt'
path_anno = dict(o.split('\t')[:2] for o in path_anno.read_text().splitlines()) anno
class TinyValDS(TinyDS):
def __getitem__(self, i): return self.files[i],anno[os.path.basename(self.files[i])]
= TinyValDS(path/'val') vds
0] vds[
('Data/tiny-imagenet-200/val/images/val_240.JPEG', 'n02883205')
class TfmDS:
def __init__(self, ds, tfmx=fc.noop, tfmy=fc.noop): self.ds,self.tfmx,self.tfmy = ds,tfmx,tfmy
def __len__(self): return len(self.ds)
def __getitem__(self, i):
= self.ds[i]
x,y return self.tfmx(x),self.tfmy(y)
= (path/'wnids.txt').read_text().splitlines()
id2str = {v:k for k,v in enumerate(id2str)} str2id
= (tensor([0.47565, 0.40303, 0.31555]), tensor([0.28858, 0.24402, 0.26615])) xmean,xstd
def tfmx(x):
= read_image(x, mode=ImageReadMode.RGB)/255
img return (img-xmean[:,None,None])/xstd[:,None,None]
def tfmy(y): return tensor(str2id[y])
= TfmDS(tds, tfmx, tfmy)
tfm_tds = TfmDS(vds, tfmx, tfmy)
tfm_vds
def denorm(x): return (x*xstd[:,None,None]+xmean[:,None,None]).clip(0,1)
= [o.split('\t') for o in (path/'words.txt').read_text().splitlines()]
all_synsets = {k:v.split(',', maxsplit=1)[0] for k,v in all_synsets if k in id2str}
synsets
= DataLoaders(*get_dls(tfm_tds, tfm_vds, bs=bs, num_workers=8)) dls
def tfm_batch(b, tfm_x=fc.noop, tfm_y = fc.noop): return tfm_x(b[0]),tfm_y(b[1])
= nn.Sequential(T.Pad(4), T.RandomCrop(64),
tfms
T.RandomHorizontalFlip(),
RandErase())= BatchTransformCB(partial(tfm_batch, tfm_x=tfms), on_val=False)
augcb
= partial(GeneralRelu, leak=0.1, sub=0.4)
act_gr = partial(init_weights, leaky=0.1)
iw
= (32,64,128,256,512,1024)
nfs
def get_dropmodel(act=act_gr, nfs=nfs, norm=nn.BatchNorm2d, drop=0.1):
= [nn.Conv2d(3, nfs[0], 5, padding=2)]
layers # layers += [ResBlock(nfs[0], nfs[0], ks=3, stride=1, act=act, norm=norm)]
+= [ResBlock(nfs[i], nfs[i+1], act=act, norm=norm, stride=2)
layers for i in range(len(nfs)-1)]
+= [nn.AdaptiveAvgPool2d(1), nn.Flatten(), nn.Dropout(drop)]
layers += [nn.Linear(nfs[-1], 200, bias=False), nn.BatchNorm1d(200)]
layers return nn.Sequential(*layers).apply(iw)
def res_blocks(n_bk, ni, nf, stride=1, ks=3, act=act_gr, norm=None):
return nn.Sequential(*[
if i==0 else nf, nf, stride=stride if i==n_bk-1 else 1, ks=ks, act=act, norm=norm)
ResBlock(ni for i in range(n_bk)])
= (3,2,2,1,1)
nbks
def get_dropmodel(act=act_gr, nfs=nfs, nbks=nbks, norm=nn.BatchNorm2d, drop=0.2):
= [ResBlock(3, nfs[0], ks=5, stride=1, act=act, norm=norm)]
layers += [res_blocks(nbks[i], nfs[i], nfs[i+1], act=act, norm=norm, stride=2)
layers for i in range(len(nfs)-1)]
+= [nn.AdaptiveAvgPool2d(1), nn.Flatten(), nn.Dropout(drop)]
layers += [nn.Linear(nfs[-1], 200, bias=False), nn.BatchNorm1d(200)]
layers return nn.Sequential(*layers).apply(iw)
= partial(optim.AdamW, eps=1e-5) opt_func
= MetricsCB(accuracy=MulticlassAccuracy())
metrics = [DeviceCB(), metrics, ProgressCB(plot=True), MixedPrecision()]
cbs
= 25
epochs = 3e-2
lr = epochs * len(dls.train)
tmax = partial(lr_scheduler.OneCycleLR, max_lr=lr, total_steps=tmax)
sched = [BatchSchedCB(sched), augcb]
xtra = Learner(get_dropmodel(), dls, F.cross_entropy, lr=lr, cbs=cbs+xtra, opt_func=opt_func) learn
= nn.Sequential(T.Pad(4), T.RandomCrop(64),
aug_tfms
T.RandomHorizontalFlip(),
T.TrivialAugmentWide())
= T.Normalize(xmean, xstd)
norm_tfm = RandErase()
erase_tfm
from PIL import Image
def tfmx(x, aug=False):
= Image.open(x).convert('RGB')
x if aug: x = aug_tfms(x)
= TF.to_tensor(x)
x = norm_tfm(x)
x if aug: x = erase_tfm(x[None])[0]
return x
= TfmDS(tds, partial(tfmx, aug=True), tfmy)
tfm_tds = TfmDS(vds, tfmx, tfmy)
tfm_vds
= DataLoaders(*get_dls(tfm_tds, tfm_vds, bs=bs, num_workers=8)) dls
def conv(ni, nf, ks=3, stride=1, act=nn.ReLU, norm=None, bias=True):
= []
layers if norm: layers.append(norm(ni))
if act : layers.append(act())
=stride, kernel_size=ks, padding=ks//2, bias=bias))
layers.append(nn.Conv2d(ni, nf, stridereturn nn.Sequential(*layers)
def _conv_block(ni, nf, stride, act=act_gr, norm=None, ks=3):
return nn.Sequential(conv(ni, nf, stride=1 , act=act, norm=norm, ks=ks),
=stride, act=act, norm=norm, ks=ks))
conv(nf, nf, stride
class ResBlock(nn.Module):
def __init__(self, ni, nf, stride=1, ks=3, act=act_gr, norm=None):
super().__init__()
self.convs = _conv_block(ni, nf, stride, act=act, ks=ks, norm=norm)
self.idconv = fc.noop if ni==nf else conv(ni, nf, ks=1, stride=1, act=None, norm=norm)
self.pool = fc.noop if stride==1 else nn.AvgPool2d(2, ceil_mode=True)
def forward(self, x): return self.convs(x) + self.idconv(self.pool(x))
def get_dropmodel(act=act_gr, nfs=nfs, nbks=nbks, norm=nn.BatchNorm2d, drop=0.2):
= [nn.Conv2d(3, nfs[0], 5, padding=2)]
layers += [res_blocks(nbks[i], nfs[i], nfs[i+1], act=act, norm=norm, stride=2)
layers for i in range(len(nfs)-1)]
+= [act_gr(), norm(nfs[-1]), nn.AdaptiveAvgPool2d(1), nn.Flatten(), nn.Dropout(drop)]
layers += [nn.Linear(nfs[-1], 200, bias=False), nn.BatchNorm1d(200)]
layers return nn.Sequential(*layers).apply(iw)
= 50
epochs = 0.1
lr = epochs * len(dls.train)
tmax = partial(lr_scheduler.OneCycleLR, max_lr=lr, total_steps=tmax)
sched = [BatchSchedCB(sched)]
xtra = get_dropmodel(nbks=(1,2,8,2,2), nfs=(32, 64, 128, 512, 1024, 1536), drop=0.1)
model = Learner(model, dls, F.cross_entropy, lr=lr, cbs=cbs+xtra, opt_func=opt_func) learn
learn.fit(epochs)
0.00% [0/50 00:00<?]
1.02% [2/196 02:32<4:06:46 5.598]
'models/inettiny-wide-50') torch.save(learn.model,