服务器之家:专注于VPS、云服务器配置技术及软件下载分享
分类导航

Mysql|Sql Server|Oracle|Redis|MongoDB|PostgreSQL|Sqlite|DB2|mariadb|Access|数据库技术|

服务器之家 - 数据库 - Redis - Redis实现库存扣减的解决方案防止商品超卖

Redis实现库存扣减的解决方案防止商品超卖

2022-10-25 16:02Resourceful! Redis

在日常开发中有很多地方都有类似扣减库存的操作,比如电商系统中的商品库存,抽奖系统中的奖品库存等,基于redis实现扣减库存的具体实现,初始化库存回调函数(IStockCallback)扣减库存服务(StockService),感兴趣的朋友跟随小

Redis 如何实现库存扣减操作?如何防止商品被超卖?

基于数据库单库存 基于数据库多库存 基于redis 基于redis实现扣减库存的具体实现 初始化库存回调函数(IStockCallback) 扣减库存服务(StockService)。

Redis实现库存扣减的解决方案防止商品超卖

在日常开发中有很多地方都有类似扣减库存的操作,比如电商系统中的商品库存,抽奖系统中的奖品库存等。

解决方案

1. 使用mysql数据库

使用一个字段来存储库存,每次扣减库存去更新这个字段。

2. 还是使用数据库

但是将库存分层多份存到多条记录里面,扣减库存的时候路由一下,这样子增大了并发量,但是还是避免不了大量的去访问数据库来更新库存。

3. 将库存放到redis使用redis的incrby特性来扣减库存。

分析

在上面的第一种和第二种方式都是基于数据来扣减库存。

[基于数据库单库存]

第一种方式在所有请求都会在这里等待锁,获取锁有去扣减库存。在并发量不高的情况下可以使用,但是一旦并发量大了就会有大量请求阻塞在这里,导致请求超时,进而整个系统雪崩;而且会频繁的去访问数据库,大量占用数据库资源,所以在并发高的情况下这种方式不适用。

[基于数据库多库存]

第二种方式其实是第一种方式的优化版本,在一定程度上提高了并发量,但是在还是会大量的对数据库做更新操作大量占用数据库资源。

基于数据库来实现扣减库存还存在的一些问题:

用数据库扣减库存的方式,扣减库存的操作必须在一条语句中执行,不能先selec在update,这样在并发下会出现超扣的情况。如:

?
1
update number set x=x-1 where x > 0
  • MySQL自身对于高并发的处理性能就会出现问题,一般来说,MySQL的处理性能会随着并发thread上升而上升,但是到了一定的并发度之后会出现明显的拐点,之后一路下降,最终甚至会比单thread的性能还要差。
  • 当减库存和高并发碰到一起的时候,由于操作的库存数目在同一行,就会出现争抢InnoDB行锁的问题,导致出现互相等待甚至死锁,从而大大降低MySQL的处理性能,最终导致前端页面出现超时异常。

[基于redis]

针对上述问题的问题我们就有了第三种方案,将库存放到缓存,利用redis的incrby特性来扣减库存,解决了超扣和性能问题。但是一旦缓存丢失需要考虑恢复方案。比如抽奖系统扣奖品库存的时候,初始库存=总的库存数-已经发放的奖励数,但是如果是异步发奖,需要等到MQ消息消费完了才能重启redis初始化库存,否则也存在库存不一致的问题。

基于redis实现扣减库存的具体实现

  • 我们使用redis的lua脚本来实现扣减库存
  • 由于是分布式环境下所以还需要一个分布式锁来控制只能有一个服务去初始化库存
  • 需要提供一个回调函数,在初始化库存的时候去调用这个函数获取初始化库存

[初始化库存回调函数(IStockCallback )]

?
1
2
3
4
5
6
7
8
9
10
11
12
/**
 * 获取库存回调
 * @author yuhao.wang
 */
public interface IStockCallback {
 
 /**
  * 获取库存
  * @return
  */
 int getStock();
}

[扣减库存服务(StockService)]

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
/**
 * 扣库存
 *
 * @author yuhao.wang
 */
@Service
public class StockService {
    Logger logger = LoggerFactory.getLogger(StockService.class);
 
    /**
     * 不限库存
     */
    public static final long UNINITIALIZED_STOCK = -3L;
 
    /**
     * Redis 客户端
     */
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
 
    /**
     * 执行扣库存的脚本
     */
    public static final String STOCK_LUA;
 
    static {
        /**
         *
         * @desc 扣减库存Lua脚本
         * 库存(stock)-1:表示不限库存
         * 库存(stock)0:表示没有库存
         * 库存(stock)大于0:表示剩余库存
         *
         * @params 库存key
         * @return
         *   -3:库存未初始化
         *   -2:库存不足
         *   -1:不限库存
         *   大于等于0:剩余库存(扣减之后剩余的库存)
         *      redis缓存的库存(value)是-1表示不限库存,直接返回1
         */
        StringBuilder sb = new StringBuilder();
        sb.append("if (redis.call('exists', KEYS[1]) == 1) then");
        sb.append("    local stock = tonumber(redis.call('get', KEYS[1]));");
        sb.append("    local num = tonumber(ARGV[1]);");
        sb.append("    if (stock == -1) then");
        sb.append("        return -1;");
        sb.append("    end;");
        sb.append("    if (stock >= num) then");
        sb.append("        return redis.call('incrby', KEYS[1], 0 - num);");
        sb.append("    end;");
        sb.append("    return -2;");
        sb.append("end;");
        sb.append("return -3;");
        STOCK_LUA = sb.toString();
    }
 
    /**
     * @param key           库存key
     * @param expire        库存有效时间,单位秒
     * @param num           扣减数量
     * @param stockCallback 初始化库存回调函数
     * @return -2:库存不足; -1:不限库存; 大于等于0:扣减库存之后的剩余库存
     */
    public long stock(String key, long expire, int num, IStockCallback stockCallback) {
        long stock = stock(key, num);
        // 初始化库存
        if (stock == UNINITIALIZED_STOCK) {
            RedisLock redisLock = new RedisLock(redisTemplate, key);
            try {
                // 获取锁
                if (redisLock.tryLock()) {
                    // 双重验证,避免并发时重复回源到数据库
                    stock = stock(key, num);
                    if (stock == UNINITIALIZED_STOCK) {
                        // 获取初始化库存
                        final int initStock = stockCallback.getStock();
                        // 将库存设置到redis
                        redisTemplate.opsForValue().set(key, initStock, expire, TimeUnit.SECONDS);
                        // 调一次扣库存的操作
                        stock = stock(key, num);
                    }
                }
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            } finally {
                redisLock.unlock();
            }
 
        }
        return stock;
    }
 
    /**
     * 加库存(还原库存)
     *
     * @param key    库存key
     * @param num    库存数量
     * @return
     */
    public long addStock(String key, int num) {
 
        return addStock(key, null, num);
    }
 
    /**
     * 加库存
     *
     * @param key    库存key
     * @param expire 过期时间(秒)
     * @param num    库存数量
     * @return
     */
    public long addStock(String key, Long expire, int num) {
        boolean hasKey = redisTemplate.hasKey(key);
        // 判断key是否存在,存在就直接更新
        if (hasKey) {
            return redisTemplate.opsForValue().increment(key, num);
        }
 
        Assert.notNull(expire,"初始化库存失败,库存过期时间不能为null");
        RedisLock redisLock = new RedisLock(redisTemplate, key);
        try {
            if (redisLock.tryLock()) {
                // 获取到锁后再次判断一下是否有key
                hasKey = redisTemplate.hasKey(key);
                if (!hasKey) {
                    // 初始化库存
                    redisTemplate.opsForValue().set(key, num, expire, TimeUnit.SECONDS);
                }
            }
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        } finally {
            redisLock.unlock();
        }
 
        return num;
    }
 
    /**
     * 获取库存
     *
     * @param key 库存key
     * @return -1:不限库存; 大于等于0:剩余库存
     */
    public int getStock(String key) {
        Integer stock = (Integer) redisTemplate.opsForValue().get(key);
        return stock == null ? -1 : stock;
    }
 
    /**
     * 扣库存
     *
     * @param key 库存key
     * @param num 扣减库存数量
     * @return 扣减之后剩余的库存【-3:库存未初始化; -2:库存不足; -1:不限库存; 大于等于0:扣减库存之后的剩余库存】
     */
    private Long stock(String key, int num) {
        // 脚本里的KEYS参数
        List<String> keys = new ArrayList<>();
        keys.add(key);
        // 脚本里的ARGV参数
        List<String> args = new ArrayList<>();
        args.add(Integer.toString(num));
 
        long result = redisTemplate.execute(new RedisCallback<Long>() {
            @Override
            public Long doInRedis(RedisConnection connection) throws DataAccessException {
                Object nativeConnection = connection.getNativeConnection();
                // 集群模式和单机模式虽然执行脚本的方法一样,但是没有共同的接口,所以只能分开执行
                // 集群模式
                if (nativeConnection instanceof JedisCluster) {
                    return (Long) ((JedisCluster) nativeConnection).eval(STOCK_LUA, keys, args);
                }
 
                // 单机模式
                else if (nativeConnection instanceof Jedis) {
                    return (Long) ((Jedis) nativeConnection).eval(STOCK_LUA, keys, args);
                }
                return UNINITIALIZED_STOCK;
            }
        });
        return result;
    }
}

[调用]

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/**
 * @author yuhao.wang
 */
@RestController
public class StockController {
 
    @Autowired
    private StockService stockService;
 
    @RequestMapping(value = "stock", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
    public Object stock() {
        // 商品ID
        long commodityId = 1;
        // 库存ID
        String redisKey = "redis_key:stock:" + commodityId;
        long stock = stockService.stock(redisKey, 60 * 60, 2, () -> initStock(commodityId));
        return stock >= 0;
    }
 
    /**
     * 获取初始的库存
     *
     * @return
     */
    private int initStock(long commodityId) {
        // TODO 这里做一些初始化库存的操作
        return 1000;
    }
 
    @RequestMapping(value = "getStock", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
    public Object getStock() {
        // 商品ID
        long commodityId = 1;
        // 库存ID
        String redisKey = "redis_key:stock:" + commodityId;
 
        return stockService.getStock(redisKey);
    }
 
    @RequestMapping(value = "addStock", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
    public Object addStock() {
        // 商品ID
        long commodityId = 2;
        // 库存ID
        String redisKey = "redis_key:stock:" + commodityId;
 
        return stockService.addStock(redisKey, 2);
    }
}

结语

到此这篇关于Redis 如何实现库存扣减操作?如何防止商品被超卖?的文章就介绍到这了,更多相关Redis库存扣减内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!

原文链接:https://blog.csdn.net/qq_44866828/article/details/125500046

延伸 · 阅读

精彩推荐
  • RedisRedis操作命令总结

    Redis操作命令总结

    这篇文章主要介绍了Redis操作命令总结,本文讲解了key pattern 查询相应的key、字符串类型的操作、链表操作、hashes类型及操作、集合结构操作、有序集合、服...

    junjie2662019-10-22
  • RedisRedisRepository 分享和纠错

    RedisRepository 分享和纠错

    本文主要介绍了RedisRepository分享和纠错。具有很好的参考价值,下面跟着小编一起来看下吧 ...

    坦荡3482020-04-18
  • RedisRedis 实战篇:巧用 Bitmap 实现亿级海量数据统计

    Redis 实战篇:巧用 Bitmap 实现亿级海量数据统计

    本文将由二值状态统计类型作为实战篇系列的开篇,文中将用到 String、Set、Zset、List、hash 以外的拓展数据类型 Bitmap 来实现。...

    码哥字节7302021-05-24
  • Redis为何Redis使用跳表而非红黑树实现SortedSet

    为何Redis使用跳表而非红黑树实现SortedSet

    本篇文章主要介绍了为何Redis使用跳表而非红黑树实现SortedSet,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下...

    JavaEdge.5112021-09-28
  • RedisRedis字符串对象实用笔记

    Redis字符串对象实用笔记

    这篇文章主要给大家介绍了关于Redis字符串对象的相关资料,文中通过示例代码介绍的非常详细,对大家学习或者使用Redis具有一定的参考学习价值,需要的...

    Yuicon3842019-11-23
  • RedisRedis 串行生成顺序编码的方法实现

    Redis 串行生成顺序编码的方法实现

    本文主要介绍了 Redis 串行生成顺序编码的方法实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下...

    这个杀手冷死了11012022-10-13
  • RedisRedis教程(十二):服务器管理命令总结

    Redis教程(十二):服务器管理命令总结

    这篇文章主要介绍了Redis教程(十二):服务器管理命令总结,本文讲解了CONFIGGETparameter、CONFIG SETparameter value、FLUSHALL等命令,需要的朋友可以参考下 ...

    Redis教程网4072019-10-24
  • RedisRedis主从集群切换数据丢失的解决方案

    Redis主从集群切换数据丢失的解决方案

    这篇文章主要介绍了Redis主从集群切换数据丢失的解决方案,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧...

    不清不慎5012021-07-31