解决库存扣减及订单创建时防止并发死锁的问题

在我们日常开发的过程可有会遇到以下错误

事务(进程 ID 82)与另一个进程被死锁在 锁 资源上,并且已被选作死锁牺牲品。请重新运行该事务

很多开发人员对于这个问题的排查起来是比较困难的,而生产生的原因多种多样,很多人认是因为表中的数据太多了同时操作的人多人才会产生这种错误,下面我们来还原一下死锁的过程。

我们看一下以下sql代码(该样例代码测试环境为SqlServer)

1. 第一先创建一个测试表H_Test

复制以下代码

  SET ANSI_NULLS ON
  GO

  SET QUOTED_IDENTIFIER ON
  GO

  CREATE TABLE [dbo].[H_TEST](
    [Id] [int] IDENTITY(1,1) NOT NULL,
    [DID] [int] NULL,
    [UNAME] [nvarchar](50) NULL,
    [UNAME2] [nvarchar](50) NULL,
  CONSTRAINT [PK_H_TEST_3994ceeb-a4b8-41e1-b06b-1e59a2e51d8c] PRIMARY KEY CLUSTERED 
  (
    [Id] ASC
  )WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
  ) ON [PRIMARY]
  GO

  EXEC sys.sp_addextendedproperty @name=N'MS_Description', @value=N'自增主键' , @level0type=N'SCHEMA',@level0name=N'dbo', @level1type=N'TABLE',@level1name=N'H_TEST', @level2type=N'COLUMN',@level2name=N'Id'
  GO

  EXEC sys.sp_addextendedproperty @name=N'MS_Description', @value=N'DID' , @level0type=N'SCHEMA',@level0name=N'dbo', @level1type=N'TABLE',@level1name=N'H_TEST', @level2type=N'COLUMN',@level2name=N'DID'
  GO

  EXEC sys.sp_addextendedproperty @name=N'MS_Description', @value=N'UNAME' , @level0type=N'SCHEMA',@level0name=N'dbo', @level1type=N'TABLE',@level1name=N'H_TEST', @level2type=N'COLUMN',@level2name=N'UNAME'
  GO

  EXEC sys.sp_addextendedproperty @name=N'MS_Description', @value=N'UNAME2' , @level0type=N'SCHEMA',@level0name=N'dbo', @level1type=N'TABLE',@level1name=N'H_TEST', @level2type=N'COLUMN',@level2name=N'UNAME2'
  GO

  insert [dbo].[H_TEST](DID,UNAME,UNAME2) VALUES(1,'HI','HI2');
  insert [dbo].[H_TEST](DID,UNAME,UNAME2) VALUES(2,'HISQL','HISQL2');

2. 打开两个查询窗口

在窗口1中复制以下代码

  begin tran 
  update dbo.H_TEST 
  set UNAME='d1' 
  where dID=1 
  waitfor delay '00:00:10' 
  update H_TEST 
  set UNAME='d2' 
  where dID=2
  commit tran 

在窗口2中复制以下代码

  begin tran 
  update H_TEST 
  set UNAME='d2' 
  where dID=2
  waitfor delay '00:00:10' 


  update dbo.H_TEST 
  set UNAME='d1' 
  where dID=1
  commit tran 

3. 执行代码

同时执行窗口1和窗口2的代码,在等待一段时间后你就可以看到以下错误如下所示

死锁图

通过以上的测试就还原了产生死锁的过程,刚才的测试表H_Test中只有两条数据,其实产生死锁与数据大小没有很大的关系,其实与整个事务的执行长短有关系,两个业务都在操作同一条数据,且一个事务中包含非常复杂的处理逻辑且执行时间比较长那么在并发或相对较多的业务操作时就会产生死锁。

那么怎样解决死锁?

1. 减少事务的执行时间。

优化代码将不需要包在事务的逻辑分离出来以减少锁的占用时间.可以减少一部分的死锁,但在高并发操作时依然会产生死锁

2. 业务锁

日常我们用到的锁都是高度依赖于数据来锁定来保证数据的原子性问题,但这样有一个很大的BUG就是对数据库的性能压力非常大,在出现高并发时可能应用扛得住数据库扛不住的情况

下面介绍的就是基于HiSql 的业务锁机制解决死锁问题,我们模拟一种场景 扣减库存并生成订单那么我们模拟创建两张表 库存表H_Stock 及订单表H_Order 表创建的sql如下

HiSql怎样使用 请参照hisql快速上手

库存表sql代码


  CREATE TABLE [dbo].[H_Stock](
    [Batch] [varchar](20) NOT NULL,
    [Material] [varchar](20) NOT NULL,
    [Location] [varchar](5) NULL,
    [st_kc] [decimal](18, 2) NULL,
    [CreateTime] [datetime] NULL,
    [CreateName] [nvarchar](50) NULL,
    [ModiTime] [datetime] NULL,
    [ModiName] [nvarchar](50) NULL,
  CONSTRAINT [PK_H_Stock] PRIMARY KEY CLUSTERED 
  (
    [Batch] ASC,
    [Material] ASC
  )WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
  ) ON [PRIMARY]
  GO

  ALTER TABLE [dbo].[H_Stock] ADD  CONSTRAINT [DF_H_Stock_st_kc]  DEFAULT ((0)) FOR [st_kc]
  GO

  ALTER TABLE [dbo].[H_Stock] ADD  CONSTRAINT [DF_H_Stock_CreateTime]  DEFAULT (getdate()) FOR [CreateTime]
  GO

  ALTER TABLE [dbo].[H_Stock] ADD  CONSTRAINT [DF_H_Stock_CreateName]  DEFAULT ('') FOR [CreateName]
  GO

  ALTER TABLE [dbo].[H_Stock] ADD  CONSTRAINT [DF_H_Stock_ModiTime]  DEFAULT (getdate()) FOR [ModiTime]
  GO

  ALTER TABLE [dbo].[H_Stock] ADD  CONSTRAINT [DF_H_Stock_ModiName]  DEFAULT ('') FOR [ModiName]
  GO

  EXEC sys.sp_addextendedproperty @name=N'MS_Description', @value=N'批次号' , @level0type=N'SCHEMA',@level0name=N'dbo', @level1type=N'TABLE',@level1name=N'H_Stock', @level2type=N'COLUMN',@level2name=N'Batch'
  GO

  EXEC sys.sp_addextendedproperty @name=N'MS_Description', @value=N'款号' , @level0type=N'SCHEMA',@level0name=N'dbo', @level1type=N'TABLE',@level1name=N'H_Stock', @level2type=N'COLUMN',@level2name=N'Material'
  GO

  EXEC sys.sp_addextendedproperty @name=N'MS_Description', @value=N'库位' , @level0type=N'SCHEMA',@level0name=N'dbo', @level1type=N'TABLE',@level1name=N'H_Stock', @level2type=N'COLUMN',@level2name=N'Location'
  GO

  EXEC sys.sp_addextendedproperty @name=N'MS_Description', @value=N'库存数' , @level0type=N'SCHEMA',@level0name=N'dbo', @level1type=N'TABLE',@level1name=N'H_Stock', @level2type=N'COLUMN',@level2name=N'st_kc'
  GO

  EXEC sys.sp_addextendedproperty @name=N'MS_Description', @value=N'创建时间' , @level0type=N'SCHEMA',@level0name=N'dbo', @level1type=N'TABLE',@level1name=N'H_Stock', @level2type=N'COLUMN',@level2name=N'CreateTime'
  GO

  EXEC sys.sp_addextendedproperty @name=N'MS_Description', @value=N'创建人' , @level0type=N'SCHEMA',@level0name=N'dbo', @level1type=N'TABLE',@level1name=N'H_Stock', @level2type=N'COLUMN',@level2name=N'CreateName'
  GO

  EXEC sys.sp_addextendedproperty @name=N'MS_Description', @value=N'修改时间' , @level0type=N'SCHEMA',@level0name=N'dbo', @level1type=N'TABLE',@level1name=N'H_Stock', @level2type=N'COLUMN',@level2name=N'ModiTime'
  GO

  EXEC sys.sp_addextendedproperty @name=N'MS_Description', @value=N'修改人' , @level0type=N'SCHEMA',@level0name=N'dbo', @level1type=N'TABLE',@level1name=N'H_Stock', @level2type=N'COLUMN',@level2name=N'ModiName'
  GO


订单表sql


  CREATE TABLE [dbo].[H_Order](
    [OrderId] [bigint] NOT NULL,
    [Batch] [varchar](20) NOT NULL,
    [Material] [varchar](20) NOT NULL,
    [Shop] [varchar](5) NULL,
    [Location] [varchar](5) NULL,
    [SalesNum] [decimal](18, 2) NULL,
    [CreateTime] [datetime] NULL,
    [CreateName] [nvarchar](50) NULL,
    [ModiTime] [datetime] NULL,
    [ModiName] [nvarchar](50) NULL,
  CONSTRAINT [PK_H_Order] PRIMARY KEY CLUSTERED 
  (
    [OrderId] ASC,
    [Batch] ASC,
    [Material] ASC
  )WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
  ) ON [PRIMARY]
  GO

  ALTER TABLE [dbo].[H_Order] ADD  CONSTRAINT [DF_H_Order_SalesNum]  DEFAULT ((0)) FOR [SalesNum]
  GO

  ALTER TABLE [dbo].[H_Order] ADD  CONSTRAINT [DF_H_Order_CreateTime]  DEFAULT (getdate()) FOR [CreateTime]
  GO

  ALTER TABLE [dbo].[H_Order] ADD  CONSTRAINT [DF_H_Order_CreateName]  DEFAULT ('') FOR [CreateName]
  GO

  ALTER TABLE [dbo].[H_Order] ADD  CONSTRAINT [DF_H_Order_ModiTime]  DEFAULT (getdate()) FOR [ModiTime]
  GO

  ALTER TABLE [dbo].[H_Order] ADD  CONSTRAINT [DF_H_Order_ModiName]  DEFAULT ('') FOR [ModiName]
  GO

  EXEC sys.sp_addextendedproperty @name=N'MS_Description', @value=N'批次号' , @level0type=N'SCHEMA',@level0name=N'dbo', @level1type=N'TABLE',@level1name=N'H_Order', @level2type=N'COLUMN',@level2name=N'Batch'
  GO

  EXEC sys.sp_addextendedproperty @name=N'MS_Description', @value=N'款号' , @level0type=N'SCHEMA',@level0name=N'dbo', @level1type=N'TABLE',@level1name=N'H_Order', @level2type=N'COLUMN',@level2name=N'Material'
  GO

  EXEC sys.sp_addextendedproperty @name=N'MS_Description', @value=N'门店' , @level0type=N'SCHEMA',@level0name=N'dbo', @level1type=N'TABLE',@level1name=N'H_Order', @level2type=N'COLUMN',@level2name=N'Shop'
  GO

  EXEC sys.sp_addextendedproperty @name=N'MS_Description', @value=N'出库库位' , @level0type=N'SCHEMA',@level0name=N'dbo', @level1type=N'TABLE',@level1name=N'H_Order', @level2type=N'COLUMN',@level2name=N'Location'
  GO

  EXEC sys.sp_addextendedproperty @name=N'MS_Description', @value=N'销售数量' , @level0type=N'SCHEMA',@level0name=N'dbo', @level1type=N'TABLE',@level1name=N'H_Order', @level2type=N'COLUMN',@level2name=N'SalesNum'
  GO

  EXEC sys.sp_addextendedproperty @name=N'MS_Description', @value=N'创建时间' , @level0type=N'SCHEMA',@level0name=N'dbo', @level1type=N'TABLE',@level1name=N'H_Order', @level2type=N'COLUMN',@level2name=N'CreateTime'
  GO

  EXEC sys.sp_addextendedproperty @name=N'MS_Description', @value=N'创建人' , @level0type=N'SCHEMA',@level0name=N'dbo', @level1type=N'TABLE',@level1name=N'H_Order', @level2type=N'COLUMN',@level2name=N'CreateName'
  GO

  EXEC sys.sp_addextendedproperty @name=N'MS_Description', @value=N'修改时间' , @level0type=N'SCHEMA',@level0name=N'dbo', @level1type=N'TABLE',@level1name=N'H_Order', @level2type=N'COLUMN',@level2name=N'ModiTime'
  GO

  EXEC sys.sp_addextendedproperty @name=N'MS_Description', @value=N'修改人' , @level0type=N'SCHEMA',@level0name=N'dbo', @level1type=N'TABLE',@level1name=N'H_Order', @level2type=N'COLUMN',@level2name=N'ModiName'
  GO



测试场景

开启多个线程随机产生不同的订单(一个订单中有不同批次和数量)直至库存扣减完成并检测是否有锁产生,且库存有没有少扣和超扣,如果达到这两个目标说明测试是成功的

c# 代码

class Program
  {
    static void Main(string[] args)
    {
        Console.WriteLine("测试!");
        StockThread();
        var s = Console.ReadLine();
    }

    static void StockThread()
    {

        //如果有安装redis可以启用以下测试一下
        //HiSql.Global.RedisOn = true;//开启redis缓存
        //HiSql.Global.RedisOptions = new RedisOptions { Host = "172.16.80.178", PassWord = "pwd123", Port = 6379, CacheRegion = "TST", Database = 0 };
        HiSqlClient sqlClient = Demo_Init.GetSqlClient();

        //清除库存表和订单表数据
        sqlClient.CodeFirst.Truncate("H_Stock");
        sqlClient.CodeFirst.Truncate("H_Order");

        //初始化库存数据
        sqlClient.Modi("H_Stock", new List<object> {
            new { Batch="9000112112",Material="ST0021",Location="A001",st_kc=5000},
            new { Batch="8000252241",Material="ST0080",Location="A001",st_kc=1000},
            new { Batch="7000252241",Material="ST0026",Location="A001",st_kc=1500}

        }).ExecCommand();

        //第一种场景 一个订单中只有一个批次
        string[] grp_arr1 = new string[] { "9000112112" };

        //第二种场景 一个订单中有两个批次
        string[] grp_arr2 = new string[] {  "8000252241" , "9000112112"   };

        //第三中场景一个订单中有三个批次
        string[] grp_arr3 = new string[] { "8000252241", "9000112112", "7000252241" };


        Random random = new Random();

        HiSqlClient _sqlClient = Demo_Init.GetSqlClient();


        //表结构缓存预热
        var _dt1= _sqlClient.HiSql("select * from H_Order").Take(1).Skip(1).ToTable();
        var _dt2 = _sqlClient.HiSql("select * from H_Stock").Take(1).Skip(1).ToTable();


        //开启10个线程运行
        Parallel.For(0, 10, (index, y) => {

            
          int grpidx = index % 3;

          string[] grparr = null;
          if (grpidx == 0)
              grparr = grp_arr1;
          else if (grpidx == 1)
              grparr = grp_arr2;
          else
              grparr = grp_arr3;

          //Thread.Sleep(random.Next(10) * 200);

          Console.WriteLine($" {index}线程Id:{Thread.CurrentThread.ManagedThreadId}\t{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff")}");
          //执行订单创建
          var rtn = CreateSale(grparr);

          Console.WriteLine(rtn.Item2);

        });
    }




    static Tuple<bool, string> CreateSale(string[] grparr)
    {
        Random random = new Random();
        HiSqlClient _sqlClient = Demo_Init.GetSqlClient();
        bool _flag = true;

        Tuple<bool, string> rtn = new Tuple<bool, string>(true, "执行");

        //指定雪花ID使用的引擎 (可以不指定)
        HiSql.Snowflake.SnowType = SnowType.IdWorker;
        //产生一个唯一的订单号
        Int64 orderid = HiSql.Snowflake.NextId();

        //加锁并执行 将一个订单的批次都加锁防止同一时间被其它业务修改
        var _rtn = HiSql.Lock.LockOnExecute(grparr, () =>
        {

            //能执行到此说明已经加锁成功(注:非数据库级加锁)

            DataTable dt = _sqlClient.HiSql($"select Batch,Material,Location,st_kc from H_Stock where  Batch in ({grparr.ToSqlIn()}) and st_kc>0").ToTable();

            if (dt.Rows.Count > 0)
            {
                List<object> lstorder = new List<object>();
                
                Console.WriteLine($"雪花ID{orderid}");
                string _shop = "4301";//门店编号
                _sqlClient.BeginTran();
                foreach (string n in grparr)
                {
                    int s = random.Next(1,10);
                    int v = _sqlClient.Update("H_Stock", new { st_kc = $"`st_kc`-{s}" }).Where($"Batch='{n}' and st_kc>={s}").ExecCommand();
                    if (v == 0)
                    {
                        _flag = false;
                        Console.WriteLine($"批次:[{n}]扣减[{s}]失败");
                        rtn = new Tuple<bool, string>(false, $"批次:[{n}]库存已经不足");
                        _sqlClient.RollBackTran();
                        break;
                    }
                    else
                    {
                        DataRow _drow = dt.AsEnumerable().Where(s => s.Field<string>("Batch").Equals(n)).FirstOrDefault();
                        if (_drow != null)
                        {
                            lstorder.Add(
                                new
                                {
                                    OrderId = orderid,
                                    Batch = _drow["Batch"].ToString(),
                                    Material = _drow["Material"].ToString(),
                                    Shop = _shop,
                                    Location = _drow["Location"].ToString(),
                                    SalesNum = s,
                                }

                                );


                        }
                        else
                        {
                            _flag = false;
                            Console.WriteLine($"批次:[{n}]扣减[{s}]失败 未找到库存");
                            _sqlClient.RollBackTran();
                            break;

                        }

                    }
                }
                if (_flag)
                {
                    //生成订单
                    if (lstorder.Count > 0)
                        _sqlClient.Insert("H_Order", lstorder).ExecCommand();
                    _sqlClient.CommitTran();
                }
            }
            else
            {
                Console.WriteLine($"库存不足...");
                rtn = new Tuple<bool, string>(false, "库存已经不足");
            }



        }, new LckInfo
        {
            UName = "tanar",
            Ip = "127.0.0.1"


        }, 20, 10);//加锁超时时间设定
        _sqlClient.Close();

        Console.WriteLine(_rtn.Item2);

        //可以注释线程等待
        //Thread.Sleep(random.Next(1,10)*100);


        if (rtn.Item1)
            return CreateSale(grparr);
        else
            return rtn;


    }
  }

数据库连接配置

  internal class Demo_Init
  {
      public static HiSqlClient GetSqlClient()
      {


          HiSqlClient sqlclient = new HiSqlClient(
                    new ConnectionConfig()
                    {
                        DbType = DBType.SqlServer,
                        DbServer = "local-HoneBI",
                        ConnectionString = "server=(local);uid=sa;pwd=Hone@123;database=HiSql;Encrypt=True; TrustServerCertificate=True;",//; MultipleActiveResultSets = true;
                        User = "tansar",//可以指定登陆用户的帐号
                        
                        Schema = "dbo",
                        IsEncrypt = true,
                        IsAutoClose = true,
                        SqlExecTimeOut = 60000,
                        AppEvents = new AopEvent()
                        {
                            OnDbDecryptEvent = (connstr) =>
                            {
                                //解密连接字段
                                //Console.WriteLine($"数据库连接:{connstr}");
                                return connstr;
                            },
                            OnLogSqlExecuting = (sql, param) =>
                            {
                                //sql执行前 日志记录 (异步)

                                //Console.WriteLine($"sql执行前记录{sql} time:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss ffff")}");
                            },
                            OnLogSqlExecuted = (sql, param) =>
                            {
                                //sql执行后 日志记录 (异步)
                                //Console.WriteLine($"sql执行后记录{sql} time:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss ffff")}");
                            },
                            OnSqlError = (sqlEx) =>
                            {
                                //sql执行错误后 日志记录 (异步)
                                Console.WriteLine(sqlEx.Message.ToString());
                            },
                            OnTimeOut = (int timer) =>
                            {
                                //Console.WriteLine($"执行SQL语句超过[{timer.ToString()}]毫秒...");
                            }
                        }
                    }
                    );


          //sqlclient.CodeFirst.InstallHisql();

          return sqlclient;
      }
  }

通过查询库存和订单信息核对库存是否扣减正常

select * from H_Stock 
select batch,sum(salesnum) as salesnum from H_Order group by batch
select orderid,sum(salesnum) as salesnum from H_Order group by orderid
select * from H_Order

核验结果

核验明细

通过测试过程可以发现 不会产生死锁也不会造成库存扣减异常保证了数据的一致性

作者:|tansar|,原文链接: https://www.cnblogs.com/tansar/p/16314227.html

文章推荐

ElasticSearch7.3学习(三十)----ES7.X SQL新特性解析及使用J...

python常用标准库(os系统模块、shutil文件操作模块)

Spring Authorization Server(AS)从 Mysql 中读取客户端、...

SpringBoot+Mybatis-Plus整合Sharding-JDBC5.1.1实现单库分...

Python趣味入门9:函数是你走过的套路,详解函数、调用、参...

Three.js 打造缤纷夏日3D梦中情岛 🌊

dubbo是如何实现可扩展的?

动态规划解0-1背包问题

11┃音视频直播系统之 WebRTC 进行文本聊天并实时传输文件

layui数据表格导入数据

Golang:将日志以Json格式输出到Kafka

如何在Linux系统上刷抖音