现在正在做一个项目,需要用来对图片进行分类,考虑了一下34,50,101结果决定使用34,原因是最省成本,训练代码如下:
先验证一下数据集
import os
import json
from collections import Counter
with open('./AgriculturalDisease_trainingset/AgriculturalDisease_train_annotations.json') as f:
data = json.load(f)
all_classes = [item['disease_class'] for item in data]
class_dist = Counter(all_classes)
# 验证标签范围
invalid = [c for c in all_classes if not 0 <= c <= 59]
print(f"总样本数: {len(data)}")
print(f"类别分布: {class_dist}")
print(f"非法标签数量: {len(invalid)} (值: {set(invalid)})")
# 验证图片存在性
from pathlib import Path
missing_files = []
for item in data:
# 检查图片是否存在
img_path = os.path.join('./AgriculturalDisease_trainingset/images', item['image_id'])
if not os.path.exists(img_path):
missing_files.append(img_path)
print(f"缺失图片数量: {len(missing_files)}")
训练代码
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms, models
from torch.utils.data import Dataset, DataLoader
import json
from PIL import Image
import os
import numpy as np
from tqdm import tqdm
import time
# 配置参数
CFG = {
'batch_size': 32,
'num_workers': 4,
'num_epochs': 20,
'lr': 1e-4,
'image_size': 256,
'num_classes': 61, # 0-60共61个类别
'device': torch.device('cuda' if torch.cuda.is_available() else 'cpu')
}
# 自定义数据集
class CustomDataset(Dataset):
def __init__(self, json_path, img_dir, transform=None):
with open(json_path) as f:
self.data = json.load(f)
self.img_dir = img_dir
self.transform = transform
# 标签验证
all_classes = [item['disease_class'] for item in self.data]
assert min(all_classes) >= 0 and max(all_classes) <= 60, "标签范围应为0-60"
self.class_dist = np.bincount(all_classes)
print("类别分布:", self.class_dist)
def __len__(self):
return len(self.data)
def __getitem__(self, idx):
img_name = os.path.join(self.img_dir, self.data[idx]['image_id'])
image = Image.open(img_name).convert('RGB')
label = self.data[idx]['disease_class'] # 直接使用0-based标签
# 合法性检查
if label < 0 or label > 60:
raise ValueError(f"非法标签值 {label},应为0-60")
if self.transform:
image = self.transform(image)
return image, label
# 数据增强
train_transform = transforms.Compose([
transforms.Resize((CFG['image_size'], CFG['image_size'])),
transforms.RandomRotation(35),
transforms.ColorJitter(brightness=0.3, contrast=0.3, saturation=0.3),
transforms.RandomHorizontalFlip(),
transforms.RandomVerticalFlip(),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])
val_transform = transforms.Compose([
transforms.Resize((CFG['image_size'], CFG['image_size'])),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])
# 创建数据集
dataset = CustomDataset('./AgriculturalDisease_trainingset/AgriculturalDisease_train_annotations.json', './AgriculturalDisease_trainingset/images', transform=train_transform)
train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, val_size])
val_dataset.dataset.transform = val_transform
# 数据加载器
train_loader = DataLoader(train_dataset, batch_size=CFG['batch_size'],
shuffle=True, num_workers=CFG['num_workers'],
pin_memory=True)
val_loader = DataLoader(val_dataset, batch_size=CFG['batch_size'],
shuffle=False, num_workers=CFG['num_workers'],
pin_memory=True)
# 模型定义
model = models.resnet34(pretrained=True) # 使用更大的模型
model.fc = nn.Linear(model.fc.in_features, CFG['num_classes'])
model = model.to(CFG['device'])
# 优化配置
criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=CFG['lr'], weight_decay=1e-4)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'max', patience=3)
# 训练循环
best_acc = 0.0
for epoch in range(CFG['num_epochs']):
start_time = time.time()
model.train()
train_loss = 0.0
correct = 0
total = 0
# 训练进度条
train_bar = tqdm(train_loader,
desc=f'Epoch {epoch+1}/{CFG["num_epochs"]} [Train]',
ncols=100,
ascii=True)
for images, labels in train_bar:
images, labels = images.to(CFG['device']), labels.to(CFG['device'])
optimizer.zero_grad()
outputs = model(images)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
# 统计指标
train_loss += loss.item() * images.size(0)
_, predicted = outputs.max(1)
total += labels.size(0)
correct += predicted.eq(labels).sum().item()
# 实时更新进度条
train_bar.set_postfix({
'loss': f"{loss.item():.4f}",
'acc': f"{correct/total:.3f}",
'lr': optimizer.param_groups[0]['lr']
})
# 验证进度条
model.eval()
val_loss = 0.0
val_correct = 0
val_total = 0
val_bar = tqdm(val_loader,
desc=f'Epoch {epoch+1}/{CFG["num_epochs"]} [Val] ',
ncols=100,
ascii=True)
with torch.no_grad():
for images, labels in val_bar:
images, labels = images.to(CFG['device']), labels.to(CFG['device'])
outputs = model(images)
loss = criterion(outputs, labels)
val_loss += loss.item() * images.size(0)
_, predicted = outputs.max(1)
val_total += labels.size(0)
val_correct += predicted.eq(labels).sum().item()
val_bar.set_postfix({
'loss': f"{loss.item():.4f}",
'acc': f"{val_correct/val_total:.3f}"
})
# 计算指标
train_loss = train_loss / len(train_loader.dataset)
train_acc = correct / total
val_loss = val_loss / len(val_loader.dataset)
val_acc = val_correct / val_total
# 学习率调整
scheduler.step(val_acc)
# 时间统计
epoch_time = time.time() - start_time
time_msg = time.strftime("%H:%M:%S", time.gmtime(epoch_time))
# 打印总结
print(f"\
Epoch {epoch+1} Summary:")
print(f"Train Loss: {train_loss:.4f} | Acc: {train_acc:.4f}")
print(f"Val Loss: {val_loss:.4f} | Acc: {val_acc:.4f}")
print(f"Time: {time_msg} | LR: {optimizer.param_groups[0]['lr']:.2e}\
")
# 保存最佳模型
if val_acc > best_acc:
best_acc = val_acc
torch.save({
'model': model.state_dict(),
'acc': best_acc,
'epoch': epoch+1
}, 'best_model.pth')
print(f"New best model saved at epoch {epoch+1} with acc {best_acc:.4f}!")

评论