服务器之家:专注于VPS、云服务器配置技术及软件下载分享
分类导航

PHP教程|ASP.NET教程|Java教程|ASP教程|编程技术|正则表达式|C/C++|IOS|C#|Swift|Android|VB|R语言|JavaScript|易语言|vb.net|

服务器之家 - 编程语言 - C# - C# 基于消息发布订阅模型的示例(上)

C# 基于消息发布订阅模型的示例(上)

2022-11-03 12:17Hello——寻梦者! C#

这篇文章主要介绍了C# 基于消息发布订阅模型的示例,帮助大家更好的理解和学习使用c#,感兴趣的朋友可以了解下

  在我们的开发过程中,我们经常会遇到这样的场景就是一个对象的其中的一些状态依赖于另外的一个对象的状态,而且这两个对象之间彼此是没有关联的,及两者之间的耦合性非常低,特别是在这种基于容器模型的开发中遇到的会非常多,比如Prism框架或者MEF这种框架中,而我们会发现在这样的系统中我们经常使用一种Publish和Subscribe的模式来进行交互,这种交互有什么好处呢?基于带着这些问题的思考,我们来一步步来剖析!

  首先第一步就是定义一个叫做IEventAggregator的接口,里面定义了一些重载的Subscribe和Publish方法,我们具体来看一看这个接口:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/// <summary>
   ///   Enables loosely-coupled publication of and subscription to events.
   /// </summary>
   public interface IEventAggregator
   {
       /// <summary>
       ///   Gets or sets the default publication thread marshaller.
       /// </summary>
       /// <value>
       ///   The default publication thread marshaller.
       /// </value>
       Action<System.Action> PublicationThreadMarshaller { get; set; }
 
       /// <summary>
       ///   Subscribes an instance to all events declared through implementations of <see cref = "IHandle{T}" />
       /// </summary>
       /// <param name = "instance">The instance to subscribe for event publication.</param>
       void Subscribe(object instance);
 
       /// <summary>
       ///   Unsubscribes the instance from all events.
       /// </summary>
       /// <param name = "instance">The instance to unsubscribe.</param>
       void Unsubscribe(object instance);
 
       /// <summary>
       ///   Publishes a message.
       /// </summary>
       /// <param name = "message">The message instance.</param>
       /// <remarks>
       ///   Uses the default thread marshaller during publication.
       /// </remarks>
       void Publish(object message);
 
       /// <summary>
       ///   Publishes a message.
       /// </summary>
       /// <param name = "message">The message instance.</param>
       /// <param name = "marshal">Allows the publisher to provide a custom thread marshaller for the message publication.</param>
       void Publish(object message, Action<System.Action> marshal);
   }

  有了这个接口,接下来就是怎样去实现这个接口中的各种方法,我们来看看具体的实现过程。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
/// <summary>
    ///   Enables loosely-coupled publication of and subscription to events.
    /// </summary>
    public class EventAggregator : IEventAggregator
    {
        /// <summary>
        ///   The default thread marshaller used for publication;
        /// </summary>
        public static Action<System.Action> DefaultPublicationThreadMarshaller = action => action();
 
        readonly List<Handler> handlers = new List<Handler>();
 
        /// <summary>
        ///   Initializes a new instance of the <see cref = "EventAggregator" /> class.
        /// </summary>
        public EventAggregator()
        {
            PublicationThreadMarshaller = DefaultPublicationThreadMarshaller;
        }
 
        /// <summary>
        ///   Gets or sets the default publication thread marshaller.
        /// </summary>
        /// <value>
        ///   The default publication thread marshaller.
        /// </value>
        public Action<System.Action> PublicationThreadMarshaller { get; set; }
 
        /// <summary>
        ///   Subscribes an instance to all events declared through implementations of <see cref = "IHandle{T}" />
        /// </summary>
        /// <param name = "instance">The instance to subscribe for event publication.</param>
        public virtual void Subscribe(object instance)
        {
            lock(handlers)
            {
                if (handlers.Any(x => x.Matches(instance)))
                {
                    return;
                }                   
                handlers.Add(new Handler(instance));
            }
        }
 
        /// <summary>
        ///   Unsubscribes the instance from all events.
        /// </summary>
        /// <param name = "instance">The instance to unsubscribe.</param>
        public virtual void Unsubscribe(object instance)
        {
            lock(handlers)
            {
                var found = handlers.FirstOrDefault(x => x.Matches(instance));
                if (found != null)
                {
                   handlers.Remove(found);
                }                  
            }
        }
 
        /// <summary>
        ///   Publishes a message.
        /// </summary>
        /// <param name = "message">The message instance.</param>
        /// <remarks>
        ///   Does not marshall the the publication to any special thread by default.
        /// </remarks>
        public virtual void Publish(object message)
        {
            Publish(message, PublicationThreadMarshaller);
        }
 
        /// <summary>
        ///   Publishes a message.
        /// </summary>
        /// <param name = "message">The message instance.</param>
        /// <param name = "marshal">Allows the publisher to provide a custom thread marshaller for the message publication.</param>
        public virtual void Publish(object message, Action<System.Action> marshal)
        {
            Handler[] toNotify;
            lock (handlers)
            {
                toNotify = handlers.ToArray();
            }
            marshal(() =>
            {
                var messageType = message.GetType();
                var dead = toNotify
                    .Where(handler => !handler.Handle(messageType, message))
                    .ToList();
 
                if(dead.Any())
                {
                    lock(handlers)
                    {
                        foreach(var handler in dead)
                        {
                            handlers.Remove(handler);
                        }
                    }
                }
            });
        }
 
        protected class Handler
        {
            readonly WeakReference reference;
            readonly Dictionary<Type, MethodInfo> supportedHandlers = new Dictionary<Type, MethodInfo>();
 
            public Handler(object handler)
            {
                reference = new WeakReference(handler);
 
                var interfaces = handler.GetType().GetInterfaces()
                    .Where(x => typeof(IHandle).IsAssignableFrom(x) && x.IsGenericType);
 
                foreach(var @interface in interfaces)
                {
                    var type = @interface.GetGenericArguments()[0];
                    var method = @interface.GetMethod("Handle");
                    supportedHandlers[type] = method;
                }
            }
 
            public bool Matches(object instance)
            {
                return reference.Target == instance;
            }
 
            public bool Handle(Type messageType, object message)
            {
                var target = reference.Target;
                if(target == null)
                    return false;
 
                foreach(var pair in supportedHandlers)
                {
                    if(pair.Key.IsAssignableFrom(messageType))
                    {
                        pair.Value.Invoke(target, new[] { message });
                        return true;
                    }
                }
                return true;
            }
        }
    }

  首先在EventAggregator的内部维护了一个LIst<Handler>的List对象,用来存放一系列的Handle,那么这个嵌套类Handler到底起什么作用呢?

  我们会发现在每一次当执行这个Subscribe的方法的时候,会将当前object类型的参数instance传入到Handler这个对象中,在Handler这个类的构造函数中,首先将这个instance放入到一个弱引用中去,然后再获取这个对象所有继承的接口,并查看是否继承了IHandle<TMessage>这个泛型的接口,如果能够获取到,那么就通过反射获取到当前instance中定义的Handle方法,并获取到其中定义的表示泛型类型的类型实参或泛型类型定义的类型形参,并把这两个对象放到内部定义的一个Dictionary<Type, MethodInfo>字典之中,这样就把这样一个活得具体的处理方法的Handler对象放到了一个List<Handler>集合中,这个就是订阅消息的核心部分,所以当前的对象要想订阅一个消息,那么必须实现泛型接口IHandle<TMessage>,并且实现接口中的方法,同时最重要的就是在当前对象的构造函数函数中去订阅消息(即执行Subscribe(this),我们来看一看这个泛型接口IHandle<TMessage> 

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public interface IHandle {}
 
/// <summary>
///   Denotes a class which can handle a particular type of message.
/// </summary>
/// <typeparam name = "TMessage">The type of message to handle.</typeparam>
public interface IHandle<TMessage> : IHandle
{
    /// <summary>
    ///   Handles the message.
    /// </summary>
    /// <param name = "message">The message.</param>
    void Handle(TMessage message);
}

  在看完了Subscribe这个方法后,后面我们就来看看Unsubscribe方法吧,这个思路其实很简单就是找到List<Handler>中的这个对象,并且移除当前的对象就可以了,那么下面我们关注的重点就是Publish这个方法中到底实现了什么?首先来看看代码,然后再来做一步步分析。 

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/// <summary>
        ///   Publishes a message.
        /// </summary>
        /// <param name = "message">The message instance.</param>
        /// <param name = "marshal">Allows the publisher to provide a custom thread marshaller for the message publication.</param>
        public virtual void Publish(object message, Action<System.Action> marshal)
        {
            Handler[] toNotify;
            lock (handlers)
            {
                toNotify = handlers.ToArray();
            }
            marshal(() =>
            {
                var messageType = message.GetType();
                var dead = toNotify
                    .Where(handler => !handler.Handle(messageType, message))
                    .ToList();
 
                if(dead.Any())
                {
                    lock(handlers)
                    {
                        foreach(var handler in dead)
                        {
                            handlers.Remove(handler);
                        }
                    }
                }
            });
        }

  我们看到,在发布一个object类型的message的时候,必然对应着另外的一个对象来处理这个消息,那么怎样找到这个消息的处理这呢?

  对,我们在Subscribe一个对象的时候不是已经通过反射将订阅这个消息的对象及方法都存在了一个List<Handler>中去了吗?那么我们只需要在这个List中找到对应的和message类型一致的那个对象并执行里面的Handle方法不就可以了吗?确实是一个很好的思路,这里我们看代码也是这样实行的。

  这里面还有一个要点就是,如果执行的方法返回了false,就是执行不成功,那么就从当前的List<Handler>中移除掉这个对象,因为这样的操作是没有任何意义的,通过这样的过程我们就能够完没地去实现两个对象之间的消息传递了,另外我们通过总结以后就能够发现,这个思路实现的重点包括以下方面:

  1 所有消息订阅的对象必须实现统一的接口IHandle<TMessage>,并实现里面的Handel方法。

  2 整个EventAggregator必须是单实例或者是静态的,这样才能够在统一的集合中去实现上述的各种操作。

  最后还是按照之前的惯例,最后给出一个具体的实例来做相关的说明,请点击此处进行下载,在下篇中我们将介绍一种简单版的基于事件的发布和订阅模式的例子。

以上就是C# 基于消息发布订阅模型的示例(上)的详细内容,更多关于c# 发布订阅模型的资料请关注服务器之家其它相关文章!

原文链接:https://www.cnblogs.com/seekdream/p/8884584.html

延伸 · 阅读

精彩推荐
  • C#C#贪吃蛇游戏实现分析

    C#贪吃蛇游戏实现分析

    这篇文章主要为大家分析了C#贪吃蛇游戏的实现代码,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下...

    冰封一夏10342022-01-06
  • C#C# 使用Free Spire.Presentation 实现对PPT插入、编辑、删除表格

    C# 使用Free Spire.Presentation 实现对PPT插入、编辑、删除表格

    小编发现使用.NET组件——Free Spire.Presentation,在C#中添加该产品DLL文件,可以简单快速地实现对演示文稿的表格插入、编辑和删除等操作,具体实现代码大家...

    E-iceblue6572022-01-24
  • C#C#使用ODBC与OLEDB连接数据库的方法示例

    C#使用ODBC与OLEDB连接数据库的方法示例

    这篇文章主要介绍了C#使用ODBC与OLEDB连接数据库的方法,结合实例形式分析了C#基于ODBC与OLEDB实现数据库连接操作简单操作技巧,需要的朋友可以参考下...

    aparche8172022-01-05
  • C#C#编程实现动态改变配置文件信息的方法

    C#编程实现动态改变配置文件信息的方法

    这篇文章主要介绍了C#编程实现动态改变配置文件信息的方法,涉及C#针对xml格式文件的相关操作技巧,具有一定参考借鉴价值,需要的朋友可以参考下...

    aparche3802021-11-24
  • C#解析C#设计模式编程中适配器模式的实现

    解析C#设计模式编程中适配器模式的实现

    这篇文章主要介绍了C#设计模式编程中适配器模式的实现,分别举了类的对象适配器与对象的适配器模式的例子,需要的朋友可以参考下...

    Libing12212021-11-12
  • C#C#实现连接SQL Server2012数据库并执行SQL语句的方法

    C#实现连接SQL Server2012数据库并执行SQL语句的方法

    这篇文章主要介绍了C#实现连接SQL Server2012数据库并执行SQL语句的方法,结合实例形式较为详细的分析了C#连接SQL Server2012数据库并执行查询、插入等操作的相...

    wz_微臣9552022-01-25
  • C#C# 字符串处理小工具

    C# 字符串处理小工具

    本文主要介绍C#字符串处理小工具,实现功能包括:转换为大写;转换为小写;反转字符串;匹配某字符串出现次数;正则匹配;base64加密;base64解密;R...

    Aaxuan5802021-12-28
  • C#C#实现图片切割的方法

    C#实现图片切割的方法

    这篇文章主要介绍了C#实现图片切割的方法,涉及C#使用Graphics实现图片属性的相关设置、保存等操作技巧,需要的朋友可以参考下...

    JoeBlackzqq8892022-01-07