This commit is contained in:
Jack 2025-12-27 20:31:00 +08:00
parent 850a4bcb73
commit 9b5fdf31b9
3 changed files with 154 additions and 15 deletions

View File

@ -17,8 +17,13 @@ name = "mall_prod"
max_idle_conns = 25
max_open_conns = 100
conn_max_lifetime = 1 # 小时
conn_max_idle_time = 45 # 分钟
# 旧配置(保留注释以便回退):
# conn_max_lifetime = 1 # 小时
# conn_max_idle_time = 45 # 分钟
# 新配置,单位:分钟
conn_max_lifetime_minutes = 60 # 分钟
conn_max_idle_minutes = 45 # 分钟
# COS配置
[cossdk]

View File

@ -24,8 +24,9 @@ func InitMySQLDB() {
Name string `mapstructure:"name"`
MaxIdleConns int `mapstructure:"max_idle_conns"`
MaxOpenConns int `mapstructure:"max_open_conns"`
ConnMaxLifetime time.Duration `mapstructure:"conn_max_lifetime"`
ConnMaxIdleTime time.Duration `mapstructure:"conn_max_idle_time"`
// 单位:分钟(在配置文件中以整数表示),如果未设置将使用默认值
ConnMaxLifetimeMinutes int `mapstructure:"conn_max_lifetime_minutes"`
ConnMaxIdleMinutes int `mapstructure:"conn_max_idle_minutes"`
} `mapstructure:"mysql"`
}{}
@ -74,15 +75,39 @@ func InitMySQLDB() {
}
// SetMaxIdleConns 设置空闲连接池中连接的最大数量
if config.MySQL.MaxIdleConns == 0 {
config.MySQL.MaxIdleConns = 10
}
sqlDB.SetMaxIdleConns(config.MySQL.MaxIdleConns)
// SetMaxOpenConns 设置打开数据库连接的最大数量。
if config.MySQL.MaxOpenConns == 0 {
config.MySQL.MaxOpenConns = 100
}
sqlDB.SetMaxOpenConns(config.MySQL.MaxOpenConns)
// SetConnMaxLifetime 设置了连接可复用的最大时间。
sqlDB.SetConnMaxLifetime(config.MySQL.ConnMaxLifetime * time.Hour)
// 如果配置中未设置超时时间,使用合理默认值(单位:分钟)
if config.MySQL.ConnMaxLifetimeMinutes == 0 {
config.MySQL.ConnMaxLifetimeMinutes = 30
}
if config.MySQL.ConnMaxIdleMinutes == 0 {
config.MySQL.ConnMaxIdleMinutes = 10
}
// SetConnMaxIdleTime 设置连接最大空闲时间
sqlDB.SetConnMaxIdleTime(config.MySQL.ConnMaxIdleTime * time.Minute)
// SetConnMaxLifetime 设置了连接可复用的最大时间(分钟)
sqlDB.SetConnMaxLifetime(time.Duration(config.MySQL.ConnMaxLifetimeMinutes) * time.Minute)
// SetConnMaxIdleTime 设置连接最大空闲时间(分钟)
sqlDB.SetConnMaxIdleTime(time.Duration(config.MySQL.ConnMaxIdleMinutes) * time.Minute)
// 启动一个轻量的监控协程,周期性记录连接池状态,便于调优
go func() {
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
for range ticker.C {
stats := sqlDB.Stats()
log.Printf("[DB Stats] Open=%d InUse=%d Idle=%d WaitCount=%d", stats.OpenConnections, stats.InUse, stats.Idle, stats.WaitCount)
}
}()
}

View File

@ -0,0 +1,109 @@
package services
import (
"errors"
"log"
"sync"
"gorm.io/gorm"
"fafa-crawler/src/models"
)
// BulkInsertLibraryProducts 使用 CreateInBatches 并发插入 products。
// 参数:
// - db: gorm DB
// - products: 待插入记录切片
// - batchSize: 每个批次的记录数(<=0 则使用 1000
// - workers: 并发 worker 数(<=0 则使用 4
func BulkInsertLibraryProducts(db *gorm.DB, products []models.LibraryProduct, batchSize, workers int) error {
if db == nil {
return errors.New("db is nil")
}
n := len(products)
if n == 0 {
return nil
}
if batchSize <= 0 {
batchSize = 1000
}
if workers <= 0 {
workers = 4
}
// 切分为多个批次(每个批次不超过 batchSize
chunks := make([][]models.LibraryProduct, 0, (n+batchSize-1)/batchSize)
for i := 0; i < n; i += batchSize {
end := i + batchSize
if end > n {
end = n
}
chunks = append(chunks, products[i:end])
}
jobs := make(chan []models.LibraryProduct, len(chunks))
errCh := make(chan error, workers)
var wg sync.WaitGroup
worker := func() {
defer wg.Done()
for chunk := range jobs {
tx := db.Begin()
if tx.Error != nil {
errCh <- tx.Error
return
}
if err := tx.Session(&gorm.Session{CreateBatchSize: batchSize}).CreateInBatches(chunk, batchSize).Error; err != nil {
_ = tx.Rollback()
errCh <- err
return
}
if err := tx.Commit().Error; err != nil {
errCh <- err
return
}
}
}
// 启动 workers
for i := 0; i < workers; i++ {
wg.Add(1)
go worker()
}
// 发送任务
for _, c := range chunks {
jobs <- c
}
close(jobs)
wg.Wait()
close(errCh)
// 如果有错误,返回第一个错误
for e := range errCh {
if e != nil {
log.Printf("Bulk insert error: %v", e)
return e
}
}
return nil
}
// GenerateDummyProducts 用于测试/演示,生产 count 个简单的 LibraryProduct
func GenerateDummyProducts(count int) []models.LibraryProduct {
out := make([]models.LibraryProduct, 0, count)
for i := 0; i < count; i++ {
p := models.LibraryProduct{
Name: "demo_product",
Sname: "demo",
Keywords: "demo",
Price: 0.0,
Status: 1,
}
out = append(out, p)
}
return out
}