脚本之家,脚本语言编程技术及教程分享平台!
分类导航

Python|VBS|Ruby|Lua|perl|VBA|Golang|PowerShell|Erlang|autoit|Dos|bat|shell|

服务器之家 - 脚本之家 - Golang - Golang分布式注册中心实现流程讲解

Golang分布式注册中心实现流程讲解

2023-05-11 14:20未来谁可知 Golang

这篇文章主要介绍了Golang分布式注册中心实现流程,注册中心可以用于服务发现,服务注册,配置管理等方面,在分布式系统中,服务的发现和注册是非常重要的组成部分,需要的朋友可以参考下

动手实现一个分布式注册中心

以一个日志微服务为例,将日志服务注册到注册中心展开!

Golang分布式注册中心实现流程讲解

日志服务

log/Server.go

其实这一个日志类的功能就是有基本的写文件功能,然后就是注册一个http的接口去写日志进去

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package log
import (
    "io/ioutil"
    stlog "log"
    "net/http"
    "os"
)
var log *stlog.Logger
type fileLog string
// 编写日志的方法
func (fl fileLog) Write(data []byte) (int, error) {
    f, err := os.OpenFile(string(fl), os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0600)
    if err != nil {
        return 0, err
    }
    defer f.Close()
    return f.Write(data)
}
// 启动一个日志对象 参数为日志文件名
func Run(destination string) {
    log = stlog.New(fileLog(destination), "[go] - ", stlog.LstdFlags)
}
// 自身注册的一个服务方法
func RegisterHandlers() {
    http.HandleFunc("/log", func(w http.ResponseWriter, r *http.Request) {
        switch r.Method {
        case http.MethodPost:
            msg, err := ioutil.ReadAll(r.Body)
            if err != nil || len(msg) == 0 {
                w.WriteHeader(http.StatusBadRequest)
                return
            }
            write(string(msg))
        default:
            w.WriteHeader(http.StatusMethodNotAllowed)
            return
        }
    })
}
func write(message string) {
    log.Printf("%v\n", message)
}

log/Client.go

提供给外部服务的接口,定义好日志的命名格式,来显示调用接口去使用已经注册好的日志接口并且返回状态

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package log
import (
    "bytes"
    "distributed/registry"
    "fmt"
    "net/http"
    stlog "log"
)
func SetClientLogger(serviceURL string, clientService registry.ServiceName) {
    stlog.SetPrefix(fmt.Sprintf("[%v] - ", clientService))
    stlog.SetFlags(0)
    stlog.SetOutput(&clientLogger{url: serviceURL})
}
type clientLogger struct {
    url string
}
func (cl clientLogger) Write(data []byte) (int, error) {
    b := bytes.NewBuffer([]byte(data))
    res, err := http.Post(cl.url+"/log", "text/plain", b)
    if err != nil {
        return 0, err
    }
    if res.StatusCode != http.StatusOK {
        return 0, fmt.Errorf("Failed to send log message. Service responded with %d - %s", res.StatusCode, res.Status)
    }
    return len(data), nil
}

主启动程序LogService

启动服务Logservice,主要执行start方法,里面有细节实现服务注册与服务发现

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package main
import (
    "context"
    "distributed/log"
    "distributed/registry"
    "distributed/service"
    "fmt"
    stlog "log"
)
func main() {
  // 初始化启动一个日志文件对象
    log.Run("./distributed.log")
  // 日志服务注册的端口和地址
    host, port := "localhost", "4000"
    serviceAddress := fmt.Sprintf("http://%s:%s", host, port)
  // 初始化注册对象
    r := registry.Registration{
        ServiceName:      registry.LogService, // 自身服务名
        ServiceURL:       serviceAddress,  // 自身服务地址
        RequiredServices: make([]registry.ServiceName, 0),// 依赖服务
        ServiceUpdateURL: serviceAddress + "/services", // 服务列表
        HeartbeatURL: serviceAddress + "/heartbeat"// 心跳
    }
  // 启动日志服务包含服务注册,发现等细节
    ctx, err := service.Start(
        context.Background(),
        host,
        port,
        r,
        log.RegisterHandlers,
    )
  // 异常写入到日志中
    if err != nil {
        stlog.Fatalln(err)
    }
  // 超时停止退出服务
    <-ctx.Done()
    fmt.Println("Shutting down log service.")
}

服务启动与注册

service/service.go

Start 启动服务的主方法

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/*
 host: 地址
 port: 端口号
 reg:  注册的服务对象
 registerHandlersFunc: 注册方法
*/
func Start(ctx context.Context, host, port string,
    reg registry.Registration,
    registerHandlersFunc func()) (context.Context, error) {
    registerHandlersFunc()  // 启动注册方法
    // 启动服务
    ctx = startService(ctx, reg.ServiceName, host, port)
    // 注册服务
    err := registry.RegisterService(reg)
    if err != nil {
        return ctx, err
    }
    return ctx, nil
}

startService

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func startService(ctx context.Context, serviceName registry.ServiceName,
    host, port string) context.Context {
    ctx, cancel := context.WithCancel(ctx)
    var srv http.Server
    srv.Addr = ":" + port
    // 该协程为监听http服务,并且停止服务的时候cancel
    go func() {
        log.Println(srv.ListenAndServe())
        // 删除对应的服务
        err := registry.ShutdownService(fmt.Sprintf("http://%s:%s", host, port))
        if err != nil {
            log.Println(err)
        }
        cancel()
    }()
    // 该协程为监听手动停止服务的信号
    go func() {
        fmt.Printf("%v started. Press any key to stop. \n", serviceName)
        var s string
        fmt.Scanln(&s)
        err := registry.ShutdownService(fmt.Sprintf("http://%s:%s", host, port))
        if err != nil {
            log.Println(err)
        }
        srv.Shutdown(ctx)
        cancel()
    }()
    return ctx
}

服务注册与发现

registry/client.go

注册服务的时候会连着心跳以及服务更新的方法一起注册!

而服务更新里面的细节就是自己自定义了一个Handler然后ServeHttp方法里面去update全局的服务提供对象,

update主要是更新服务和删除服务的最新消息

然后就是提供一个注销服务的方法

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package registry
import (
    "bytes"
    "encoding/json"
    "fmt"
    "log"
    "math/rand"
    "net/http"
    "net/url"
    "sync"
)
// 注册服务
func RegisterService(r Registration) error {
    // 获得心跳地址并注册
    heartbeatURL, err := url.Parse(r.HeartbeatURL)
    if err != nil {
        return err
    }
    http.HandleFunc(heartbeatURL.Path, func (w http.ResponseWriter, r *http.Request)  {
        w.WriteHeader(http.StatusOK)
    })
   // 获得服务更新地址,并且自定义http服务的handler,因为每次更新服务的时候,可以在ServeHttp方法里面去维护
    serviceUpdateURL, err := url.Parse(r.ServiceUpdateURL)
    if err != nil {
        return err
    }
    http.Handle(serviceUpdateURL.Path, &serviceUpdateHanlder{})
    // 写入buf值将服务对象发送给注册中心的services地址
    buf := new(bytes.Buffer)
    enc := json.NewEncoder(buf)
    err = enc.Encode(r)
    if err != nil {
        return err
    }
    res, err := http.Post(ServicesURL, "application/json", buf)
    if err != nil {
        return err
    }
    if res.StatusCode != http.StatusOK {
        return fmt.Errorf("Failed to register service. Registry service "+
            "responded with code %v", res.StatusCode)
    }
    return nil
}
type serviceUpdateHanlder struct{}
func (suh serviceUpdateHanlder) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    if r.Method != http.MethodPost {
        w.WriteHeader(http.StatusMethodNotAllowed)
        return
    }
    dec := json.NewDecoder(r.Body)
    var p patch
    err := dec.Decode(&p)
    if err != nil {
        log.Println(err)
        w.WriteHeader(http.StatusBadRequest)
        return
    }
    fmt.Printf("Updated received %v\n", p)
    prov.Update(p) // 更新服务提供对象
}
// 删除对应注册中心的服务地址
func ShutdownService(url string) error {
    req, err := http.NewRequest(http.MethodDelete, ServicesURL,
        bytes.NewBuffer([]byte(url)))
    if err != nil {
        return err
    }
    req.Header.Add("Content-Type", "text/plain")
    res, err := http.DefaultClient.Do(req)
    if err != nil {
        return err
    }
    if res.StatusCode != http.StatusOK {
        return fmt.Errorf("Failed to deregister service. Registry "+
            "service responded with code %v", res.StatusCode)
    }
    return nil
}
// 更新服务列表
func (p *providers) Update(pat patch) {
    p.mutex.Lock()
    defer p.mutex.Unlock()
    // 将patch中有新增的进行添加
    for _, patchEntry := range pat.Added {
        if _, ok := p.services[patchEntry.Name]; !ok {
            p.services[patchEntry.Name] = make([]string, 0)
        }
        p.services[patchEntry.Name] = append(p.services[patchEntry.Name],
            patchEntry.URL)
    }
    // 将patch中被标记删除的
    for _, patchEntry := range pat.Removed {
        if providerURLs, ok := p.services[patchEntry.Name]; ok {
            for i := range providerURLs {
                if providerURLs[i] == patchEntry.URL {
                    p.services[patchEntry.Name] = append(providerURLs[:i],
                        providerURLs[i+1:]...)
                }
            }
        }
    }
}
// 根据服务名负载均衡随机获取服务地址
func (p providers) get(name ServiceName) (string, error) {
    providers, ok := p.services[name]
    if !ok {
        return "", fmt.Errorf("No providers available for service %v", name)
    }
    idx := int(rand.Float32() * float32(len(providers)))
    return providers[idx], nil
}
// 对外暴露生产者的方法
func GetProvider(name ServiceName) (string, error) {
    return prov.get(name)
}
type providers struct {
    services map[ServiceName][]string
    mutex    *sync.RWMutex
}
// 服务提供对象
var prov = providers{
    services: make(map[ServiceName][]string),  // 服务列表  服务名->集群地址集合
    mutex:    new(sync.RWMutex),  // 锁  防止服务注册更新时的并发情况
}

registry/registration.go

主要是一些关于服务使用到的参数以及对象!

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package registry
type Registration struct {
    ServiceName      ServiceName   // 服务名
    ServiceURL       string        // 服务地址
    RequiredServices []ServiceName // 依赖的服务
    ServiceUpdateURL string        // 服务更新的地址
    HeartbeatURL     string        // 心跳地址
}
type ServiceName string
// 服务名集合
const (
    LogService     = ServiceName("LogService")
    GradingService = ServiceName("GradingService")
    PortalService  = ServiceName("Portald")
)
// 服务对象参数
type patchEntry struct {
    Name ServiceName
    URL  string
}
// 更新的服务对象参数 
type patch struct {
    Added   []patchEntry
    Removed []patchEntry
}

registry/server.go

服务端的注册中心服务的增删改查管理以及心跳检测,及时将最新的更新的服务消息通知回给客户端

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
package registry
import (
    "bytes"
    "encoding/json"
    "fmt"
    "io/ioutil"
    "log"
    "net/http"
    "sync"
    "time"
)
const ServerPort = ":3000"
const ServicesURL = "http://localhost" + ServerPort + "/services"   // 注册中心地址
// 服务对象集合
type registry struct {
    registrations []Registration
    mutex         *sync.RWMutex
}
// 添加服务
func (r *registry) add(reg Registration) error {
    r.mutex.Lock()
    r.registrations = append(r.registrations, reg)
    r.mutex.Unlock()
    err := r.sendRequiredServices(reg)
    r.notify(patch{
        Added: []patchEntry{
            patchEntry{
                Name: reg.ServiceName,
                URL:  reg.ServiceURL,
            },
        },
    })
    return err
}
// 通知服务接口请求去刷新改变后到服务
func (r registry) notify(fullPatch patch) {
    r.mutex.RLock()
    defer r.mutex.RUnlock()
    for _, reg := range r.registrations {
        go func(reg Registration) {
            for _, reqService := range reg.RequiredServices {
                p := patch{Added: []patchEntry{}, Removed: []patchEntry{}}
                sendUpdate := false
                for _, added := range fullPatch.Added {
                    if added.Name == reqService {
                        p.Added = append(p.Added, added)
                        sendUpdate = true
                    }
                }
                for _, removed := range fullPatch.Removed {
                    if removed.Name == reqService {
                        p.Removed = append(p.Removed, removed)
                        sendUpdate = true
                    }
                }
                if sendUpdate {
                    err := r.sendPatch(p, reg.ServiceUpdateURL)
                    if err != nil {
                        log.Println(err)
                        return
                    }
                }
            }
        }(reg)
    }
}
// 更新每个服务的依赖服务
func (r registry) sendRequiredServices(reg Registration) error {
    r.mutex.RLock()
    defer r.mutex.RUnlock()
    var p patch
    for _, serviceReg := range r.registrations {
        for _, reqService := range reg.RequiredServices {
            if serviceReg.ServiceName == reqService {
                p.Added = append(p.Added, patchEntry{
                    Name: serviceReg.ServiceName,
                    URL:  serviceReg.ServiceURL,
                })
            }
        }
    }
    err := r.sendPatch(p, reg.ServiceUpdateURL)
    if err != nil {
        return err
    }
    return nil
}
// 告诉客户端更新,最新的服务列表是这个
func (r registry) sendPatch(p patch, url string) error {
    d, err := json.Marshal(p)
    if err != nil {
        return err
    }
    _, err = http.Post(url, "application/json", bytes.NewBuffer(d))
    if err != nil {
        return err
    }
    return nil
}
// 注册中心删除服务对象
func (r *registry) remove(url string) error {
    for i := range reg.registrations {
        if reg.registrations[i].ServiceURL == url {
            // 通知客户端更新对象信息
            r.notify(patch{
                Removed: []patchEntry{
                    {
                        Name: r.registrations[i].ServiceName,
                        URL:  r.registrations[i].ServiceURL,
                    },
                },
            })
            r.mutex.Lock()
            reg.registrations = append(reg.registrations[:i], reg.registrations[i+1:]...)
            r.mutex.Unlock()
            return nil
        }
    }
    return fmt.Errorf("Service at URL %s not found", url)
}
// 心跳检测
func (r *registry) heartbeat(freq time.Duration) {
    for {
        var wg sync.WaitGroup
        for _, reg := range r.registrations {
            wg.Add(1)
            go func(reg Registration) {
                defer wg.Done()
                success := true
                for attemps := 0; attemps < 3; attemps++ {
                    res, err := http.Get(reg.HeartbeatURL)
                    if err != nil {
                        log.Println(err)
                    } else if res.StatusCode == http.StatusOK {
                        log.Printf("Heartbeat check passed for %v", reg.ServiceName)
                        // 如果心跳恢复了,把服务重新注册回来
                        if !success {
                            r.add(reg)
                        }
                        break;
                    }
                    // 如果执行到这就代表着心跳没有响应,那就代表着需要回收注销该服务了
                    log.Printf("Heartbeat check failed for %v", reg.ServiceName)
                    if success {
                        success = false
                        r.remove(reg.ServiceURL)
                    }
                    time.Sleep(1 * time.Second)
                }
            }(reg)
            wg.Wait()
            time.Sleep(freq)
        }
    }
}
var once sync.Once
func SetupRegistryService() {
    // 保证执行一次进行服务到心跳 每三秒循环一遍
    once.Do(func() {
        go reg.heartbeat(3 * time.Second)
    })
}
var reg = registry{
    registrations: make([]Registration, 0),
    mutex:         new(sync.RWMutex),
}
type RegistryService struct{}
func (s RegistryService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    log.Println("Request received")
    switch r.Method {
    case http.MethodPost:
        dec := json.NewDecoder(r.Body)
        var r Registration
        err := dec.Decode(&r)
        if err != nil {
            log.Println(err)
            w.WriteHeader(http.StatusBadRequest)
            return
        }
        log.Printf("Adding service: %v with URL: %s\n", r.ServiceName,
            r.ServiceURL)
        err = reg.add(r)
        if err != nil {
            log.Println(err)
            w.WriteHeader(http.StatusBadRequest)
            return
        }
    case http.MethodDelete:
        payload, err := ioutil.ReadAll(r.Body)
        if err != nil {
            log.Println(err)
            w.WriteHeader(http.StatusInternalServerError)
            return
        }
        url := string(payload)
        log.Printf("Removing service at URL: %s", url)
        err = reg.remove(url)
        if err != nil {
            log.Println(err)
            w.WriteHeader(http.StatusInternalServerError)
            return
        }
    default:
        w.WriteHeader(http.StatusMethodNotAllowed)
        return
    }
}

到此这篇关于Golang分布式注册中心实现流程讲解的文章就介绍到这了,更多相关Golang分布式注册中心内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!

原文链接:https://blog.csdn.net/jiohfgj/article/details/125656522

延伸 · 阅读

精彩推荐
  • GolangMacOS中 VSCode 安装 GO 插件失败问题的快速解决方法

    MacOS中 VSCode 安装 GO 插件失败问题的快速解决方法

    这篇文章主要介绍了MacOS中 VSCode 安装 GO 插件失败问题的快速解决方法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的...

    小谷xg4392020-06-09
  • Golanggo语言基础语法示例

    go语言基础语法示例

    这篇文章主要介绍了go语言基础语法示例,介绍了go语言较为全面的基础知识,具有一定参考价值,需要的可以了解下。 ...

    冬叶''s blog3822020-05-11
  • GolangGo语言区别于其他语言的特性

    Go语言区别于其他语言的特性

    在本文中,今天这篇文章将给大家介绍一下 Go 与其他语言不同的 9 个特性,需要的朋友可以参考下面文章的具体内容...

    海拥9722021-11-19
  • GolangGo GORM 事务详细介绍

    Go GORM 事务详细介绍

    这篇文章主要介绍了Go GORM事务详细介绍,GORM 会在事务里执行写入操作创建、更新、删除,具体详细介绍需要的朋友可以参考下面文章的简单介绍...

    v2v17732022-07-31
  • GolangGo语言实现Snowflake雪花算法

    Go语言实现Snowflake雪花算法

    雪花算法产生的背景当然是twitter高并发环境下对唯一ID生成的需求,得益于twitter内部牛逼的技术,雪花算法能够流传于至今并且被广泛使用,本文就详细的...

    luozhiyun7192021-08-07
  • GolangGolang小数操作指南之判断小数点位数与四舍五入

    Golang小数操作指南之判断小数点位数与四舍五入

    这篇文章主要给大家介绍了关于Golang小数操作指南之判断小数点位数与四舍五入的相关资料,文中通过实例代码介绍的非常详细,对大家的学习或者工作具有...

    头秃猫轻王5852022-09-05
  • GolangGo语言对字符串进行SHA1哈希运算的方法

    Go语言对字符串进行SHA1哈希运算的方法

    这篇文章主要介绍了Go语言对字符串进行SHA1哈希运算的方法,实例分析了Go语言针对字符串操作的技巧,具有一定参考借鉴价值,需要的朋友可以参考下 ...

    work2410902020-04-21
  • Golang从 bug 中学习:六大开源项目告诉你 go 并发编程的那些坑

    从 bug 中学习:六大开源项目告诉你 go 并发编程的那些坑

    并发编程中,go不仅仅支持传统的通过共享内存的方式来通信,更推崇通过channel来传递消息,这种新的并发编程模型会出现不同于以往的bug。...

    腾讯技术8812021-02-24