我正在试图找出一个很好的解决方案来管理Golang中的数据库事务,并在不同的服务之间使用相同的事务.

假设我正在构建一个论坛,这个论坛有posts个和comments个.

我在数据库中的posts表上有comments_count列,它跟踪帖子的 comments 数量.

当我为给定的帖子创建 comments 时,我还需要更新posts表并增加该帖子的comments_count列.

我的项目 struct 由几个层组成:数据库/业务/Web

目前我的代码是这样的吗?

main.go

package main

import (
    "context"
    "github.com/jackc/pgx/v5/pgxpool"
    "net/http"
    "vkosev/stack/db/repository"
    "vkosev/stack/services"
    "vkosev/stack/web"
)

func main() {
    dbConString := "postgres://user:password@host:port/database"

    dbPool, _ := pgxpool.New(context.Background(), dbConString)

    postRepo := repository.NewPostRepository(dbPool)
    commentRepo := repository.NewCommentRepository(dbPool)

    postService := services.NewPostService(postRepo)
    commentService := services.NewCommentService(commentRepo)

    handler := web.NewHandler(postService, commentService)

    mux := http.NewServeMux()

    mux.HandleFunc("POST /comments/{postId}", handler.CreateComment)

    _ = http.ListenAndServe(":8080", mux)
}

web.go

package web

type Handler struct {
    postService    *services.PostService
    commentService *services.CommentService
}

func NewHandler(postService *services.PostService, commentService *services.CommentService) *Handler {
    return &Handler{
        postService:    postService,
        commentService: commentService,
    }
}

func (h *Handler) CreateComment(w http.ResponseWriter, r *http.Request) {
    postId := getPostIdeFromRequest(r)
    comment := getCommentFromRequest(r)

    newComment := h.commentService.Create(comment, postId)
    err := h.postService.IncreaseCount(postId)
    
    if err != nil {
        // write some error message
    }

    writeJSON(w, http.StatusOK, newComment)
}

services.go

package services

type PostService struct {
    postRepo *repository.PostRepository
}

func NewPostService(postRepo *repository.PostRepository) *PostService {
    return &PostService{postRepo: postRepo}
}

func (ps *PostService) IncreaseCount(postId int) error {
    return ps.postRepo.IncreaseCount(postId)
}

type CommentService struct {
    commentRepo *repository.CommentRepository
}

func NewCommentService(commentRepo *repository.CommentRepository) *CommentService {
    return &CommentService{commentRepo: commentRepo}
}

func (cs *CommentService) Create(comment models.Comment, postId int) *models.Comment {
    return cs.commentRepo.Save(comment, postId)
}

repository.go

package repository

type PostRepository struct {
    pool *pgxpool.Pool
}

func NewPostRepository(pool *pgxpool.Pool) *PostRepository {
    return &PostRepository{pool: pool}
}

func (pr *PostRepository) IncreaseCount(postId int) error {
    // call pr.pool and increase comments count for post with the given ID
}
type CommentRepository struct {
    pool *pgxpool.Pool
}

func NewCommentRepository(pool *pgxpool.Pool) *CommentRepository {
    return &CommentRepository{pool: pool}
}

func (cr *CommentRepository) Save(comment models.Comment, postId int) *models.Comment {
    // call cr.pool and insert comment into the DB
}

我在main.go中初始化所有必要的依赖项,并将它们注入到需要它们的位置,然后使用处理程序来处理每条路由.

现在,我需要一个事务,以便在由于某种原因无法更新帖子的 comments 计数时,回滚 comments 的创建.

我想最简单的方法是只将Tx传入方法,但这看起来很难看.

我希望有某种方法来抽象数据库逻辑,这样存储库就不会关心它们是否使用了事务.

并且还管理这handler种方法中的交易. 这样我就可以拥有这样的东西:

func (h *Handler) CreateComment(w http.ResponseWriter, r *http.Request) {
    postId := getPostIdeFromRequest(r)
    comment := getCommentFromRequest(r)

    // Begin transaction
    newComment := h.commentService.Create(comment, postId)

    err := h.postService.IncreaseCount(postId)

    if err != nil {
        // rollback the transaction

        // write some error message
    }

    // commit the transaction
    writeJSON(w, http.StatusOK, newComment)
}

推荐答案

您拆分服务和存储库的方法是一个非常好的开始.下面这些对我来说很管用:

  • 利用contextAPI.使服务和存储库中的所有方法都接受context作为第一个参数.
  • 创建如下所示的新界面Session:
type Session interface {
    Begin(ctx context.Context) (Session, error)
    Transaction(ctx context.Context, f func(context.Context) error) error
    Rollback() error
    Commit() error
    Context() context.Context
}
  • 使用这样的接口允许您在服务中使用任何类型的交易系统(无论它是不是数据库,或者任何类型的数据库).
  • 创建包装pgxpoolSession的实现.
  • BeginTransaction应该向context.Context注入一个DB实例.
  • 创建一个从上下文返回pgxpool的函数,如下所示:
// Use dbKey as the context value key
type dbKey struct{}

func DB(ctx context.Context, fallback *pgxpool.Pool) (pgx.Tx, error) {
    db := ctx.Value(dbKey{})
    if db == nil {
        return fallback.Begin()
    }
    return db.(pgx.Tx), nil
}
  • 在您的所有存储库中,始终使用此函数来检索数据库(并使用repo.pool.Begin()作为备用).这样,存储库就不知道(也不必知道)操作是否在事务内部.服务可以调用多个不同的存储库和多个方法,而根本不用担心底层机制.它还有助于为您的服务编写测试,而无需依赖您的存储库或数据库.这应该也能很好地处理嵌套事务.
  • Session参数添加到需要它们的服务的构造函数中.
  • 在您的服务中,只有在需要执行多个存储库操作(即:拥有业务事务)时才使用Session.对于单个操作,您只需立即调用存储库,由于后备,它将使用没有TX的数据库.

我在GORM上使用了这种方法,而不是直接在PGx上使用,所以这可能不会很好地工作,因为您需要使用pgx.Tx,而不是单个类型*gorm.DB.我认为这应该会很好地工作,我希望这会帮助你继续前进.祝好运!


完整的示例

下面是一个更完整的示例,它基于我在项目中使用的实现(使用GORM).在本例中,我们有一个用户和一个用户操作历史.当用户注册时,我们希望在用户记录旁边创建一条"注册"操作记录.

我知道您没有使用GORM,但逻辑保持不变,您只需以不同的方式实现您的会话和存储库.

会话实施

package session

import (
    "context"
    "database/sql"

    "gorm.io/gorm"
)

// Session aims at facilitating business transactions while abstracting the underlying mechanism,
// be it a database transaction or another transaction mechanism. This allows services to execute
// multiple business use-cases and easily rollback changes in case of error, without creating a
// dependency to the database layer.
//
// Sessions should be constituted of a root session created with a "New"-type constructor and allow
// the creation of child sessions with `Begin()` and `Transaction()`. Nested transactions should be supported
// as well.
type Session interface {
    // Begin returns a new session with the given context and a started transaction.
    // Using the returned session should have no side-effect on the parent session.
    // The underlying transaction mechanism is injected as a value into the new session's context.
    Begin(ctx context.Context) (Session, error)

    // Transaction executes a transaction. If the given function returns an error, the transaction
    // is rolled back. Otherwise it is automatically committed before `Transaction()` returns.
    // The underlying transaction mechanism is injected into the context as a value.
    Transaction(ctx context.Context, f func(context.Context) error) error

    // Rollback the changes in the transaction. This action is final.
    Rollback() error

    // Commit the changes in the transaction. This action is final.
    Commit() error

    // Context returns the session's context. If it's the root session, `context.Background()` is returned.
    // If it's a child session started with `Begin()`, then the context will contain the associated
    // transaction mechanism as a value.
    Context() context.Context
}

// Gorm session implementation.
type Gorm struct {
    db        *gorm.DB
    TxOptions *sql.TxOptions
    ctx       context.Context
}

// GORM create a new root session for Gorm.
// The transaction options are optional.
func GORM(db *gorm.DB, opt *sql.TxOptions) Gorm {
    return Gorm{
        db:        db,
        TxOptions: opt,
        ctx:       context.Background(),
    }
}

// Begin returns a new session with the given context and a started DB transaction.
// The returned session has manual controls. Make sure a call to `Rollback()` or `Commit()`
// is executed before the session is expired (eligible for garbage collection).
// The Gorm DB associated with this session is injected as a value into the new session's context.
// If a Gorm DB is found in the given context, it will be used instead of this Session's DB, allowing for
// nested transactions.
func (s Gorm) Begin(ctx context.Context) (Session, error) {
    tx := DB(ctx, s.db).WithContext(ctx).Begin(s.TxOptions)
    if tx.Error != nil {
        return nil, tx.Error
    }
    return Gorm{
        ctx:       context.WithValue(ctx, dbKey{}, tx),
        TxOptions: s.TxOptions,
        db:        tx,
    }, nil
}

// Rollback the changes in the transaction. This action is final.
func (s Gorm) Rollback() error {
    return s.db.Rollback().Error
}

// Commit the changes in the transaction. This action is final.
func (s Gorm) Commit() error {
    return s.db.Commit().Error
}

// Context returns the session's context. If it's the root session, `context.Background()`
// is returned. If it's a child session started with `Begin()`, then the context will contain
// the associated Gorm DB and can be used in combination with `session.DB()`.
func (s Gorm) Context() context.Context {
    return s.ctx
}

// dbKey the key used to store the database in the context.
type dbKey struct{}

// Transaction executes a transaction. If the given function returns an error, the transaction
// is rolled back. Otherwise it is automatically committed before `Transaction()` returns.
//
// The Gorm DB associated with this session is injected into the context as a value so `session.DB()`
// can be used to retrieve it.
func (s Gorm) Transaction(ctx context.Context, f func(context.Context) error) error {
    tx := DB(ctx, s.db).WithContext(ctx).Begin(s.TxOptions)
    if tx.Error != nil {
        return tx.Error
    }
    c := context.WithValue(ctx, dbKey{}, tx)
    err := f(c)
    if err != nil {
        tx.Rollback()
        return err
    }
    return tx.Commit().Error
}

// DB returns the Gorm instance stored in the given context. Returns the given fallback
// if no Gorm DB could be found in the context.
func DB(ctx context.Context, fallback *gorm.DB) *gorm.DB {
    db := ctx.Value(dbKey{})
    if db == nil {
        return fallback
    }
    return db.(*gorm.DB)
}

服务实施

package user

import (
    "context"
    "example-module/database/model"
    "example-module/dto"

    "example-module/session"
    "example-module/typeutil"
)

type Repository interface {
    Create(ctx context.Context, user *model.User) (*model.User, error)
    CreateHistory(ctx context.Context, history *model.History) (*model.History, error)
}

type Service struct {
    session    session.Session
    repository Repository
}

func NewService(session session.Session, repository Repository) *Service {
    return &Service{
        session:    session,
        repository: repository,
    }
}

// Register create a new user with an associated "register" history.
func (s *Service) Register(ctx context.Context, user *dto.RegisterUser) (*dto.User, error) {

    // Model mapping from DTO to model (using the `copier` library)
    u := typeutil.Copy(&model.User{}, user)

    err := s.session.Transaction(ctx, func(ctx context.Context) error {
        // You can also call another service from here, not necessarily a repository.
        var err error
        u, err = s.repository.Create(ctx, u)
        if err != nil {
            return err
        }

        history := &model.History{
            UserID: u.ID,
            Action: "register",
        }
        _, err = s.repository.CreateHistory(ctx, history)
        return err
    })

    // Convert back to a DTO user using json marshal/unmarshal
    return typeutil.MustConvert[*dto.User](u), err
}

存储库实施

package repository

import (
    "context"
    "example-module/database/model"

    "gorm.io/gorm"
    "gorm.io/gorm/clause"
    "example-module/session"
)

type User struct {
    DB *gorm.DB
}

func NewUser(db *gorm.DB) *User {
    return &User{
        DB: db,
    }
}

func (r *User) Create(ctx context.Context, user *model.User) (*model.User, error) {
    db := session.DB(ctx, r.DB).Omit(clause.Associations).Create(&user)
    return user, db.Error
}

func (r *User) CreateHistory(ctx context.Context, history *model.History) (*model.History, error) {
    db := session.DB(ctx, r.DB).Omit(clause.Associations).Create(&history)
    return history, db.Error
}

Main(初始化)

session := session.GORM(myDB, nil)
userRepository := repository.NewUser(myDB)
userService := user.NewService(session, userRepository)

Sql相关问答推荐

R中对Arrow duckdb工作流的SQL查询

在Oracle SQL中将列值转换为行

在SQL查询中使用COALESS

返回找到的最小和最大row_number()spark SQL

具有多个条件的SQL否定

从单个表达式中的分隔字符串中取平均值

如何在 SNOSQL 中执行反连接(或 where 子句过滤)以查找字段不包含另一个表中的值的行?

在 PostgreSQL 中使用 ltree 列进行累积

如何判断小数点后千位是否不为0

GRAFANA 数据库查询错误:pq:列名称不存在

具有分组条件的不同计数 (DAX)

如何计算两个非周期性时间序列+分组的重叠持续时间

清理 XML 数据

带有数组输入参数的Snowflake UDF优化

查询以查找今天和昨天的数据之间的差异以及伪列

获取 SQL Server 中每一行的两个-之间的文本

BigQuery Pivot 遗漏行

从每行中排除最大元素

有没有一种方法可以将始终遵循序列的单个字段的值组合起来,以创建每个 ID 的所有移动?

使用 SQL 表中的连接列删除重复记录