服务器之家:专注于VPS、云服务器配置技术及软件下载分享
分类导航

PHP教程|ASP.NET教程|Java教程|ASP教程|编程技术|正则表达式|C/C++|IOS|C#|Swift|Android|VB|R语言|JavaScript|易语言|vb.net|

服务器之家 - 编程语言 - C# - 如何实现定时推送的具体方案

如何实现定时推送的具体方案

2022-11-15 13:50dotNet源计划 C#

在工作当中遇到了一个需要定时向客户端推送新闻、文章等内容。小项目又用不了大框架,这个时候在网上搜了很久没有找到合适的解决方案,直到看到了一位大佬写的文章提供了一个非常不错的思路本篇文章也是受到他的启发实现了

详细内容

详细内容大概分为4个部分,1.应用场景 2.遇到问题 3.设计 4.实现 5.运行效果

1.应用场景

需要定时推送数据,且轻量化的实现。

2.遇到问题

  • 如果启动一个定时器去定时轮询
  • (1)轮询效率比较低
  • (2)每次扫库,已经被执行过记录,仍然会被扫描(只是不会出现在结果集中),会做重复工作
  • (3)时效性不够好,如果每小时轮询一次,最差的情况下会有时间误差
  • 如何利用“延时消息”,对于每个任务只触发一次,保证效率的同时保证实时性,是今天要讨论的问题。

3.设计

高效延时消息,包含两个重要的数据结构:

  1. 环形队列,例如可以创建一个包含3600个slot的环形队列(本质是个数组)
  2. 任务集合,环上每一个slot是一个Set

同时,启动一个timer,这个timer每隔1s,在上述环形队列中移动一格,有一个Current Index指针来标识正在检测的slot。

Task结构中有两个很重要的属性:

  1. Cycle-Num:当Current Index第几圈扫描到这个Slot时,执行任务
  2. Task-Function:需要执行的任务指针

假设当前Current Index指向第一格,当有延时消息到达之后,例如希望3610秒之后,触发一个延时消息任务,只需:

  1. 计算这个Task应该放在哪一个slot,现在指向1,3610秒之后,应该是第11格,所以这个Task应该放在第11个slot的Set中
  2. 计算这个Task的Cycle-Num,由于环形队列是3600格(每秒移动一格,正好1小时),这个任务是3610秒后执行,所以应该绕3610/3600=1圈之后再执行,于是Cycle-Num=1

Current Index不停的移动,每秒移动到一个新slot,这个slot中对应的Set,每个Task看Cycle-Num是不是0:

  1. 如果不是0,说明还需要多移动几圈,将Cycle-Num减1
  2. 如果是0,说明马上要执行这个Task了,取出Task-Funciton执行(可以用单独的线程来执行Task),并把这个Task从Set中删除

使用了“延时消息”方案之后,“订单48小时后关闭评价”的需求,只需将在订单关闭时,触发一个48小时之后的延时消息即可:

  1. 无需再轮询全部订单,效率高
  2. 一个订单,任务只执行一次
  3. 时效性好,精确到秒(控制timer移动频率可以控制精度)

4.实现

首先写一个方案要理清楚自己的项目结构,我做了如下分层。

如何实现定时推送的具体方案

Interfaces , 这层里主要约束延迟消息队列的队列和消息任务行。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public interface IRingQueue<T>
{
    /// <summary>
    /// Add tasks [add tasks will automatically generate: task Id, task slot location, number of task cycles]
    /// </summary>
    /// <param name="delayTime">The specified task is executed after N seconds.</param>
    /// <param name="action">Definitions of callback</param>
    void Add(long delayTime,Action<T> action);
    /// <summary>
    /// Add tasks [add tasks will automatically generate: task Id, task slot location, number of task cycles]
    /// </summary>
    /// <param name="delayTime">The specified task is executed after N seconds.</param>
    /// <param name="action">Definitions of callback.</param>
    /// <param name="data">Parameters used in the callback function.</param>
    void Add(long delayTime, Action<T> action, T data);
    /// <summary>
    /// Add tasks [add tasks will automatically generate: task Id, task slot location, number of task cycles]
    /// </summary>
    /// <param name="delayTime"></param>
    /// <param name="action">Definitions of callback</param>
    /// <param name="data">Parameters used in the callback function.</param>
    /// <param name="id">Task ID, used when deleting tasks.</param>
    void Add(long delayTime, Action<T> action, T data, long id);
    /// <summary>
    /// Remove tasks [need to know: where the task is, which specific task].
    /// </summary>
    /// <param name="index">Task slot location</param>
    /// <param name="id">Task ID, used when deleting tasks.</param>
    void Remove(long id);
    /// <summary>
    /// Launch queue.
    /// </summary>
    void Start();
}
 public interface ITask
 {
 }

Achieves,这层里实现之前定义的接口,这里写成抽象类是为了后面方便扩展。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using DelayMessageApp.Interfaces;
namespace DelayMessageApp.Achieves.Base
{
public abstract class BaseQueue<T> : IRingQueue<T>
{
    private long _pointer = 0L;
    private ConcurrentBag<BaseTask<T>>[] _arraySlot;
    private int ArrayMax;
    /// <summary>
    /// Ring queue.
    /// </summary>
    public ConcurrentBag<BaseTask<T>>[] ArraySlot
    {
        get { return _arraySlot ?? (_arraySlot = new ConcurrentBag<BaseTask<T>>[ArrayMax]); }
    }
    public BaseQueue(int arrayMax)
    {
        if (arrayMax < 60 && arrayMax % 60 == 0)
            throw new Exception("Ring queue length cannot be less than 60 and is a multiple of 60 .");
        ArrayMax = arrayMax;
    }
    public void Add(long delayTime, Action<T> action)
    {
        Add(delayTime, action, default(T));
    }
    public void Add(long delayTime,Action<T> action,T data)
    {
        Add(delayTime, action, data,0);
    }
    public void Add(long delayTime, Action<T> action, T data,long id)
    {
        NextSlot(delayTime, out long cycle, out long pointer);
        ArraySlot[pointer] =  ArraySlot[pointer] ?? (ArraySlot[pointer] = new ConcurrentBag<BaseTask<T>>());
        var baseTask = new BaseTask<T>(cycle, action, data,id);
        ArraySlot[pointer].Add(baseTask);
    }
    /// <summary>
    /// Remove tasks based on ID.
    /// </summary>
    /// <param name="id"></param>
    public void Remove(long id)
    {
        try
        {
            Parallel.ForEach(ArraySlot, (ConcurrentBag<BaseTask<T>> collection, ParallelLoopState state) =>
            {
                var resulTask = collection.FirstOrDefault(p => p.Id == id);
                if (resulTask != null)
                {
                    collection.TryTake(out resulTask);
                    state.Break();
                }
            });
        }
        catch (Exception e)
        {
            Console.WriteLine(e);
        }
    }
    public void Start()
    {
        while (true)
        {
            RightMovePointer();
            Thread.Sleep(1000);
            Console.WriteLine(DateTime.Now.ToString());
        }
    }
    /// <summary>
    /// Calculate the information of the next slot.
    /// </summary>
    /// <param name="delayTime">Delayed execution time.</param>
    /// <param name="cycle">Number of turns.</param>
    /// <param name="index">Task location.</param>
    private void NextSlot(long delayTime, out long cycle,out long index)
    {
        try
        {
            var circle = delayTime / ArrayMax;
            var second = delayTime % ArrayMax;
            var current_pointer = GetPointer();
            var queue_index = 0L;
            if (delayTime - ArrayMax > ArrayMax)
            {
                circle = 1;
            }
            else if (second > ArrayMax)
            {
                circle += 1;
            }
            if (delayTime - circle * ArrayMax < ArrayMax)
            {
                second = delayTime - circle * ArrayMax;
            }
            if (current_pointer + delayTime >= ArrayMax)
            {
                cycle = (int)((current_pointer + delayTime) / ArrayMax);
                if (current_pointer + second - ArrayMax < 0)
                {
                    queue_index = current_pointer + second;
                }
                else if (current_pointer + second - ArrayMax > 0)
                {
                    queue_index = current_pointer + second - ArrayMax;
                }
            }
            else
            {
                cycle = 0;
                queue_index = current_pointer + second;
            }
            index = queue_index;
        }
        catch (Exception e)
        {
            Console.WriteLine(e);
            throw;
        }
    }
    /// <summary>
    /// Get the current location of the pointer.
    /// </summary>
    /// <returns></returns>
    private long GetPointer()
    {
        return Interlocked.Read(ref _pointer);
    }
    /// <summary>
    /// Reset pointer position.
    /// </summary>
    private void ReSetPointer()
    {
        Interlocked.Exchange(ref _pointer, 0);
    }
    /// <summary>
    /// Pointer moves clockwise.
    /// </summary>
    private void RightMovePointer()
    {
        try
        {
            if (GetPointer() >= ArrayMax - 1)
            {
                ReSetPointer();
            }
            else
            {
                Interlocked.Increment(ref _pointer);
            }
            var pointer = GetPointer();
            var taskCollection = ArraySlot[pointer];
            if (taskCollection == null || taskCollection.Count == 0) return;
            Parallel.ForEach(taskCollection, (BaseTask<T> task) =>
            {
                if (task.Cycle > 0)
                {
                    task.SubCycleNumber();
                }
                if (task.Cycle <= 0)
                {
                    taskCollection.TryTake(out task);
                    task.TaskAction(task.Data);
                }
            });
        }
        catch (Exception e)
        {
            Console.WriteLine(e);
            throw;
        }
    }
}
}
using System;
using System.Threading;
using DelayMessageApp.Interfaces;
namespace DelayMessageApp.Achieves.Base
{
public class BaseTask<T> : ITask
{
    private long _cycle;
    private long _id;
    private T _data;
    public Action<T> TaskAction { get; set; }
    public long Cycle
    {
        get { return Interlocked.Read(ref _cycle); }
        set { Interlocked.Exchange(ref _cycle, value); }
    }
    public long Id
    {
        get { return _id; }
        set { _id = value; }
    }
    public T Data
    {
        get { return _data; }
        set { _data = value; }
    }
    public BaseTask(long cycle, Action<T> action, T data,long id)
    {
        Cycle = cycle;
        TaskAction = action;
        Data = data;
        Id = id;
    }
    public BaseTask(long cycle, Action<T> action,T data)
    {
        Cycle = cycle;
        TaskAction = action;
        Data = data;
    }
    public BaseTask(long cycle, Action<T> action)
    {
        Cycle = cycle;
        TaskAction = action;
    }
    public void SubCycleNumber()
    {
        Interlocked.Decrement(ref _cycle);
    }
}
}

Logic,这层主要实现调用逻辑,调用者最终只需要关心把任务放进队列并指定什么时候执行就行了,根本不需要关心其它的任何信息。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public static void Start()
{
    //1.Initialize queues of different granularity.
    IRingQueue<NewsModel> minuteRingQueue = new MinuteQueue<NewsModel>();
    //2.Open thread.
    var lstTasks = new List<Task>
    {
        Task.Factory.StartNew(minuteRingQueue.Start)
    };
    //3.Add tasks performed in different periods.
    minuteRingQueue.Add(5, new Action<NewsModel>((NewsModel newsObj) =>
    {
        Console.WriteLine(newsObj.News);
    }), new NewsModel() { News = "Trump's visit to China!" });
    minuteRingQueue.Add(10, new Action<NewsModel>((NewsModel newsObj) =>
    {
        Console.WriteLine(newsObj.News);
    }), new NewsModel() { News = "Putin Pu's visit to China!" });
    minuteRingQueue.Add(60, new Action<NewsModel>((NewsModel newsObj) =>
    {
        Console.WriteLine(newsObj.News);
    }), new NewsModel() { News = "Eisenhower's visit to China!" });
    minuteRingQueue.Add(120, new Action<NewsModel>((NewsModel newsObj) =>
    {
        Console.WriteLine(newsObj.News);
    }), new NewsModel() { News = "Xi Jinping's visit to the US!" });
    //3.Waiting for all tasks to complete is usually not completed. Because there is an infinite loop.
    //F5 Run the program and see the effect.
    Task.WaitAll(lstTasks.ToArray());
    Console.Read();
}

Models,这层就是用来在延迟任务中带入的数据模型类而已了。自己用的时候换成任意自定义类型都可以。

5.运行效果

如何实现定时推送的具体方案

到此这篇关于如何实现定时推送的具体方案的文章就介绍到这了,希望对大家有所帮助,更多相关C#内容请搜索服务器之家以前的文章或继续浏览下面的相关文章,希望大家以后多多支持服务器之家!

原文链接:https://www.cnblogs.com/justzhuzhu/p/14669109.html

延伸 · 阅读

精彩推荐
  • C#常用C#正则表达式汇总介绍

    常用C#正则表达式汇总介绍

    c#正则表达式,用于字符串处理、表单验证等场合,实用高效。现将一些常用的表达式收集于此,以备不时之需。...

    C#教程网6652021-11-08
  • C#Unity实现新手引导镂空效果

    Unity实现新手引导镂空效果

    这篇文章主要为大家详细介绍了Unity实现新手引导的镂空效果,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下...

    LLLLL__11532022-08-27
  • C#Unity平台模拟自动挡驾驶汽车

    Unity平台模拟自动挡驾驶汽车

    这篇文章主要为大家详细介绍了Unity平台模拟自动挡驾驶汽车,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下...

    DwarfTitan3722022-09-27
  • C#C#实现获取本地内网(局域网)和外网(公网)IP地址的方法分析

    C#实现获取本地内网(局域网)和外网(公网)IP地址的方法分析

    这篇文章主要介绍了C#实现获取本地内网(局域网)和外网(公网)IP地址的方法,结合实例形式总结分析了C#获取IP地址相关原理、实现方法与操作注意事项,需要...

    willingtolove7562022-08-29
  • C#C#利用GDI+画图的基础实例教程

    C#利用GDI+画图的基础实例教程

    编写图形程序时需要使用GDI(Graphics Device Interface,图形设备接口),所以通过网上的相关资料整理了这篇文章,下面这篇文章主要给大家介绍了关于C#利用...

    Dandelion_drq4402022-02-22
  • C#winform 调用摄像头扫码识别二维码的实现步骤

    winform 调用摄像头扫码识别二维码的实现步骤

    这篇文章主要介绍了winform 调用摄像头扫码识别二维码的实现步骤,帮助大家更好的理解和学习使用winform,感兴趣的朋友可以了解下...

    随风去远方10032022-11-01
  • C#C#连接到sql server2008数据库的实例代码

    C#连接到sql server2008数据库的实例代码

    这篇文章主要介绍了C#连接到sql server2008数据库的实例代码,需要的朋友可以参考下...

    醉墨重生11652022-01-21
  • C#Unity3D实现导航效果

    Unity3D实现导航效果

    这篇文章主要为大家详细介绍了Unity3D实现简单导航效果,具有一定的参考价值,感兴趣的小伙伴们可以参考一下...

    wang_lvril10582022-03-10