脚本之家,脚本语言编程技术及教程分享平台!
分类导航

Python|VBS|Ruby|Lua|perl|VBA|Golang|PowerShell|Erlang|autoit|Dos|bat|

服务器之家 - 脚本之家 - Golang - Go语言k8s kubernetes使用leader election实现选举

Go语言k8s kubernetes使用leader election实现选举

2022-11-30 11:26李蓝 Golang

这篇文章主要为大家介绍了Go语言 k8s kubernetes 使用leader election选举,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

一、背景

在kubernetes的世界中,很多组件仅仅需要一个实例在运行,比如controller-manager或第三方的controller,但是为了高可用性,需要组件有多个副本,在发生故障的时候需要自动切换。因此,需要利用leader election的机制多副本部署,单实例运行的模式。应用程序可以使用外部的组件比如ZooKeeper或Etcd等中间件进行leader eleaction, ZooKeeper的实现是采用临时节点的方案,临时节点存活与客户端与ZooKeeper的会话期间,在会话结束后,临时节点会被立刻删除,临时节点被删除后,其他处于被动状态的服务实例会竞争生成临时节点,生成临时节点的客户端(服务实例)就变成Leader,从而保证整个集群中只有一个活跃的实例,在发生故障的时候,也能快速的实现主从之间的迁移。Etcd是一个分布式的kv存储组件,利用Raft协议维护副本的状态服务,Etcd的Revision机制可以实现分布式锁的功能,Etcd的concurrency利用的分布式锁的能力实现了选Leader的功能(本文更多关注的是k8s本身的能力,Etcd的concurrency机制不做详细介绍)。

kubernetes使用的Etcd作为底层的存储组件,因此我们是不是有可能利用kubernetes的API实现选leader的功能呢?其实kubernetes的SIG已经提供了这方面的能力,主要是通过configmap/lease/endpoint的资源实现选Leader的功能。

二、官网代码示例

kubernetes官方提供了一个使用的例子,源码在:github.com/kubernetes/…

选举的过程中,每个实例的状态有可能是:

  • 选择成功->运行业务代码
  • 等待状态,有其他实例成为了leader。当leader放弃锁后,此状态的实例有可能会成为新的leader
  • 释放leader的锁,在运行的业务代码退出

在稳定的环境中,实例一旦成为了leader,通常情况是不会释放锁的,会保持一直运行的状态,这样有利于业务的稳定和Controller快速的对资源的状态变化做成相应的操作。只有在网络不稳定或误操作删除实例的情况下,才会触发leader的重新选举。

kubernetes官方提供的选举例子详解如下:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package main
import (
  "context"
  "flag"
  "os"
  "os/signal"
  "syscall"
  "time"
  "github.com/google/uuid"
  metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  clientset "k8s.io/client-go/kubernetes"
  "k8s.io/client-go/rest"
  "k8s.io/client-go/tools/clientcmd"
  "k8s.io/client-go/tools/leaderelection"
  "k8s.io/client-go/tools/leaderelection/resourcelock"
  "k8s.io/klog/v2"
)
func buildConfig(kubeconfig string) (*rest.Config, error) {
  if kubeconfig != "" {
    cfg, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
    if err != nil {
      return nil, err
    }
    return cfg, nil
  }
  cfg, err := rest.InClusterConfig()
  if err != nil {
    return nil, err
  }
  return cfg, nil
}
func main() {
  klog.InitFlags(nil)
  var kubeconfig string
  var leaseLockName string
  var leaseLockNamespace string
  var id string
  // kubeconfig 指定了kubernetes集群的配置文文件路径
  flag.StringVar(&kubeconfig, "kubeconfig", "", "absolute path to the kubeconfig file")
  // 锁的拥有者的ID,如果没有传参数进来,就随机生成一个
  flag.StringVar(&id, "id", uuid.New().String(), "the holder identity name")
  // 锁的ID,对应kubernetes中资源的name
  flag.StringVar(&leaseLockName, "lease-lock-name", "", "the lease lock resource name")
  // 锁的命名空间
  flag.StringVar(&leaseLockNamespace, "lease-lock-namespace", "", "the lease lock resource namespace")
  // 解析命令行参数
  flag.Parse()
  if leaseLockName == "" {
    klog.Fatal("unable to get lease lock resource name (missing lease-lock-name flag).")
  }
  if leaseLockNamespace == "" {
    klog.Fatal("unable to get lease lock resource namespace (missing lease-lock-namespace flag).")
  }
  // leader election uses the Kubernetes API by writing to a
  // lock object, which can be a LeaseLock object (preferred),
  // a ConfigMap, or an Endpoints (deprecated) object.
  // Conflicting writes are detected and each client handles those actions
  // independently.
  config, err := buildConfig(kubeconfig)
  if err != nil {
    klog.Fatal(err)
  }
  // 获取kubernetes集群的客户端,如果获取不到,就抛异常退出
  client := clientset.NewForConfigOrDie(config)
  // 模拟Controller的逻辑代码
  run := func(ctx context.Context) {
    // complete your controller loop here
    klog.Info("Controller loop...")
    // 不退出
    select {}
  }
  // use a Go context so we can tell the leaderelection code when we
  // want to step down
  ctx, cancel := context.WithCancel(context.Background())
  defer cancel()
  // listen for interrupts or the Linux SIGTERM signal and cancel
  // our context, which the leader election code will observe and
  // step down
  // 处理系统的系统,收到SIGTERM信号后,会退出进程
  ch := make(chan os.Signal, 1)
  signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
  go func() {
    <-ch
    klog.Info("Received termination, signaling shutdown")
    cancel()
  }()
  // we use the Lease lock type since edits to Leases are less common
  // and fewer objects in the cluster watch "all Leases".
  
  // 根据参数,生成锁。这里使用的Lease这种类型资源作为锁
  lock := &resourcelock.LeaseLock{
    LeaseMeta: metav1.ObjectMeta{
      Name:      leaseLockName,
      Namespace: leaseLockNamespace,
    },
    // 跟kubernetes集群关联起来
    Client: client.CoordinationV1(),
    LockConfig: resourcelock.ResourceLockConfig{
      Identity: id,
    },
  }
  // start the leader election code loop
  
  // 注意,选举逻辑启动时候,会传入ctx参数,如果ctx对应的cancel函数被调用,那么选举也会结束
  leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
    // 选举使用的锁
    Lock: lock,
    // IMPORTANT: you MUST ensure that any code you have that
    // is protected by the lease must terminate **before**
    // you call cancel. Otherwise, you could have a background
    // loop still running and another process could
    // get elected before your background loop finished, violating
    // the stated goal of the lease.
    //主动放弃leader,当ctx canceled的时候
    ReleaseOnCancel: true,
    LeaseDuration:   60 * time.Second,  // 选举的任期,60s一个任期,如果在60s后没有renew,那么leader就会释放锁,重新选举
    RenewDeadline:   15 * time.Second,  // renew的请求的超时时间
    RetryPeriod:     5 * time.Second, // leader获取到锁后,renew leadership的间隔。非leader,抢锁成为leader的间隔(有1.2的jitter因子,详细看代码)
    
    // 回调函数的注册
    Callbacks: leaderelection.LeaderCallbacks{
      
      // 成为leader的回调
      OnStartedLeading: func(ctx context.Context) {
        // we're notified when we start - this is where you would
        // usually put your code
        // 运行controller的逻辑
        run(ctx)
      },
      OnStoppedLeading: func() {
        // we can do cleanup here
        // 退出leader的
        klog.Infof("leader lost: %s", id)
        os.Exit(0)
      },
      OnNewLeader: func(identity string) {
        // 有新的leader当选
        // we're notified when new leader elected
        if identity == id {
          // I just got the lock
          return
        }
        klog.Infof("new leader elected: %s", identity)
      },
    },
  })
}

启动一个实例,观察日志输出和kubernetes集群上的lease资源,启动命令

go run main.go --kubeconfig=/tmp/test-kubeconfig.yaml -logtostderr=true -lease-lock-name=example -lease-lock-namespace=default -id=1

可以看到,日志有输出,id=1的实例获取到资源了。

go run main.go --kubeconfig=/tmp/test-kubeconfig.yaml -logtostderr=true -lease-lock-name=example -lease-lock-namespace=default -id=1 I1023 17:00:21.670298 94227 leaderelection.go:248] attempting to acquire leader lease default/example... I1023 17:00:21.784234 94227 leaderelection.go:258] successfully acquired lease default/example I1023 17:00:21.784316 94227 main.go:78] Controller loop...

在kubernetes的集群上,看到

Go语言k8s kubernetes使用leader election实现选举

我们接着启动一个实例,id=2,日志中输出

go run main.go --kubeconfig=/tmp/test-kubeconfig.yaml -logtostderr=true -lease-lock-name=example -lease-lock-namespace=default -id=2 I1023 17:05:00.555145 95658 leaderelection.go:248] attempting to acquire leader lease default/example... I1023 17:05:00.658202 95658 main.go:151] new leader elected: 1

可以看出,id=2的实例,没有获取到锁,并且观察到id=1的锁获取到了实例。接着我们尝试退出id=1的实例,观察id=2的实例是否会成为新的leader

Go语言k8s kubernetes使用leader election实现选举

三、锁的实现

kubernets的资源都可以实现Get/Create/Update的操作,因此,理论上所有的资源都可以作为锁的底层。kubernetes 提供了Lease/Configmap/Endpoint作为锁的底层。

锁的状态转移如下:

Go语言k8s kubernetes使用leader election实现选举

锁需要实现以下的接口

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
type Interface interface {
  // Get returns the LeaderElectionRecord
  Get(ctx context.Context) (*LeaderElectionRecord, []byte, error)
  // Create attempts to create a LeaderElectionRecord
  Create(ctx context.Context, ler LeaderElectionRecord) error
  // Update will update and existing LeaderElectionRecord
  Update(ctx context.Context, ler LeaderElectionRecord) error
  // RecordEvent is used to record events
  RecordEvent(string)
  // Identity will return the locks Identity
  Identity() string
  // Describe is used to convert details on current resource lock
  // into a string
  Describe() string
}

理论上,有Get/Create/Update三个方法,就可以实现锁的机制了。但是,需要保证update和create操作的原子性,这个就是kuberenetes的机制保证了。第二章的官网代码例子中,leaderelection.RunOrDie使用的RunOrDie接口,其实就是调用Run接口,而Run接口实现非常简单:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (le *LeaderElector) Run(ctx context.Context) {
  defer runtime.HandleCrash()
  defer func() {
    le.config.Callbacks.OnStoppedLeading()
  }()
  // 获取锁,如果没有获取到,就一直等待
  if !le.acquire(ctx) {
    return // ctx signalled done
  }
  ctx, cancel := context.WithCancel(ctx)
  defer cancel()
  // 获取到锁后,需要调用回调函数中的OnStartedLeading,运行controller的代码
  go le.config.Callbacks.OnStartedLeading(ctx)
  
  // 获取到锁后,需要不断地进行renew操作
  le.renew(ctx)
}

LeaderElector关键是需要acquire和renew的操作,acquire和renew操作代码如下:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
func (le *LeaderElector) acquire(ctx context.Context) bool {
  ctx, cancel := context.WithCancel(ctx)
  defer cancel()
  succeeded := false
  desc := le.config.Lock.Describe()
  klog.Infof("attempting to acquire leader lease %v...", desc)
  // 此接口会阻塞,利用定时的机制,获取锁,如果获取不到一直循环,除非ctx被取消。
  wait.JitterUntil(func() {
    // 获取锁
    succeeded = le.tryAcquireOrRenew(ctx)
    le.maybeReportTransition()
    if !succeeded {
      klog.V(4).Infof("failed to acquire lease %v", desc)
      return
    }
    le.config.Lock.RecordEvent("became leader")
    le.metrics.leaderOn(le.config.Name)
    klog.Infof("successfully acquired lease %v", desc)
    cancel()
  }, le.config.RetryPeriod, JitterFactor, true, ctx.Done())
  return succeeded
}
// renew loops calling tryAcquireOrRenew and returns immediately when tryAcquireOrRenew fails or ctx signals done.
func (le *LeaderElector) renew(ctx context.Context) {
  ctx, cancel := context.WithCancel(ctx)
  defer cancel()
  // 循环renew机制,renew成功,不会返回true,导致Until会不断循环
  wait.Until(func() {
    //RenewDeadline的实现在这里,如果renew超过了RenewDeadline,会导致renew失败,主退出
    timeoutCtx, timeoutCancel := context.WithTimeout(ctx, le.config.RenewDeadline)
    defer timeoutCancel()
    err := wait.PollImmediateUntil(le.config.RetryPeriod, func() (bool, error) {
      // renew锁
      return le.tryAcquireOrRenew(timeoutCtx), nil
    }, timeoutCtx.Done())
    le.maybeReportTransition()
    desc := le.config.Lock.Describe()
    if err == nil {
      klog.V(5).Infof("successfully renewed lease %v", desc)
      // renew成功
      return
    }
    le.config.Lock.RecordEvent("stopped leading")
    le.metrics.leaderOff(le.config.Name)
    klog.Infof("failed to renew lease %v: %v", desc, err)
    cancel()
  }, le.config.RetryPeriod, ctx.Done())
  // if we hold the lease, give it up
  if le.config.ReleaseOnCancel {
    le.release()
  }
}

关键的实现在于tryAcquireOrRenew,而tryAcquireOrRenew就是依赖锁的状态转移机制完成核心逻辑。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {
  now := metav1.Now()
  leaderElectionRecord := rl.LeaderElectionRecord{
    HolderIdentity:       le.config.Lock.Identity(),
    LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second),
    RenewTime:            now,
    AcquireTime:          now,
  }
  // 1. obtain or create the ElectionRecord
  // 检查锁有没有
  oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx)
  if err != nil {
    // 没有锁的资源,就创建一个
    if !errors.IsNotFound(err) {
      klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err)
      return false
    }
    if err = le.config.Lock.Create(ctx, leaderElectionRecord); err != nil {
      klog.Errorf("error initially creating leader election record: %v", err)
      return false
    }
    //对外宣称自己成为了leader
    le.setObservedRecord(&leaderElectionRecord)
    return true
  }
  // 2. Record obtained, check the Identity & Time
  if !bytes.Equal(le.observedRawRecord, oldLeaderElectionRawRecord) {
    // 这个机制很重要,会如果leader会不断正常renew这个锁,oldLeaderElectionRawRecord会一直发生变化,发生变化会更新le.observedTime
    le.setObservedRecord(oldLeaderElectionRecord)
    le.observedRawRecord = oldLeaderElectionRawRecord
  }
  // 如果还没超时并且此实例不是leader(leader是其他实例),那么就直接退出
  if len(oldLeaderElectionRecord.HolderIdentity) > 0 &&
    le.observedTime.Add(le.config.LeaseDuration).After(now.Time) &&
    !le.IsLeader() {
    klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)
    return false
  }
  // 3. We're going to try to update. The leaderElectionRecord is set to it's default
  // here. Let's correct it before updating.
  // 如果是leader,就更新时间RenewTime,保证其他实例(非主)可以观察到:主还活着
  if le.IsLeader() {
    leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime
    leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions
  } else {
  // 不是leader,那么锁就发生了转移
    leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1
  }
  // 更新锁
  // update the lock itself
  if err = le.config.Lock.Update(ctx, leaderElectionRecord); err != nil {
    klog.Errorf("Failed to update lock: %v", err)
    return false
  }
  le.setObservedRecord(&leaderElectionRecord)
  return true
}

以上就是Go语言 k8s kubernetes 使用leader election的详细内容,更多关于Go k8s leader election选举的资料请关注服务器之家其它相关文章!

原文链接:https://juejin.cn/post/7157648925078323207

延伸 · 阅读

精彩推荐
  • GolangGolang定时器的2种实现方法与区别

    Golang定时器的2种实现方法与区别

    这篇文章主要给大家介绍了关于Golang定时器的2种实现方法与区别的相关资料,文中通过图文介绍的非常详细,对大家的学习或者工作具有一定的参考学习价...

    朋也8112021-03-30
  • Golanggo语言操作es的实现示例

    go语言操作es的实现示例

    本文主要介绍了go语言操作es的实现示例,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下...

    ~庞贝12242022-09-26
  • Golanggolang监听文件变化的实例

    golang监听文件变化的实例

    这篇文章主要介绍了golang监听文件变化的实例,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧...

    雪东~19342021-04-20
  • Golanggolang类型转换组件Cast的使用详解

    golang类型转换组件Cast的使用详解

    这篇文章主要介绍了golang类型转换组件Cast的使用详解,帮助大家更好的理解和学习使用golang,感兴趣的朋友可以了解下...

    三十三重天8752021-03-30
  • Golang【Go】内存中的接口类型

    【Go】内存中的接口类型

    本文完整地、详细地、深入地剖析了Go语言接口的类型结构、对象结构、实现类型、方法调用、继承扩展等等的各个方面的底层原理。...

    Golang In Memory10132021-11-01
  • GolangGo语言错误处理异常捕获+异常抛出

    Go语言错误处理异常捕获+异常抛出

    这篇文章主要介绍了Go语言错误处理异常捕获和异常抛出,Go语言的作者认为java等语言的错误处理底层实现较为复杂,就实现了函数可以返回错误类型以及简...

    酷尔。11322022-09-05
  • Golanggo+redis实现消息队列发布与订阅的详细过程

    go+redis实现消息队列发布与订阅的详细过程

    这篇文章主要介绍了go+redis实现消息队列发布与订阅,redis做消息队列的缺点:没有持久化,一旦消息没有人消费,积累到一定程度后就会丢失,本文给大家...

    升级打怪8572022-11-20
  • Golanggolang中值类型/指针类型的变量区别总结

    golang中值类型/指针类型的变量区别总结

    golang的值类型和指针类型receiver一直是大家比较混淆的地方,下面这篇文章主要给大家总结介绍了关于golang中值类型/指针类型的变量区别的相关资料,文中...

    卢春风4482020-05-12