脚本之家,脚本语言编程技术及教程分享平台!
分类导航

Python|VBS|Ruby|Lua|perl|VBA|Golang|PowerShell|Erlang|autoit|Dos|bat|

服务器之家 - 脚本之家 - Golang - 使用client go实现自定义控制器的方法

使用client go实现自定义控制器的方法

2022-10-07 15:20油腻中年李大鹅 Golang

本文我们来使用client-go实现一个自定义控制器,通过判断service的Annotations属性是否包含ingress/http,如果包含则创建ingress,如果不包含则不创建,对client go自定义控制器相关知识感兴趣的朋友一起看看吧

介绍

我们已经知道,Service对集群之外暴露服务的主要方式有两种:NodePort和LoadBalancer,但是这两种方式,都有一定的缺点:

  • NodePort方式的缺点是会占用很多集群机器的端口,那么当集群服务变多的时候,这个缺点就愈发明显。
  • LoadBalancer的缺点是每个Service都需要一个LB,浪费,麻烦,并且需要Kubernetes之外的设备的支持。

基于这种现状,Kubernetes提供了Ingress资源对象,Ingress只需要一个NodePort或者一个LB就可以满足暴露多个Service的需求。

客户端首先对 域名 执行 DNS 解析,得到 Ingress Controller 所在节点的 IP,然后客户端向 Ingress Controller 发送 HTTP 请求,然后根据 Ingress 对象里面的描述匹配域名,找到对应的 Service 对象,并获取关联的 Endpoints 列表,将客户端的请求转发给其中一个 Pod。

使用client go实现自定义控制器的方法

本文我们来使用client-go实现一个自定义控制器,通过判断serviceAnnotations属性是否包含ingress/http,如果包含则创建ingress,如果不包含则不创建。而且如果存在ingress则进行删除。

具体实现

首先我们创建项目。

?
1
2
3
4
5
6
7
8
9
10
11
$ mkdir ingress-manager && cd ingress-manager
$ go mod init ingress-manager
# 由于控制器部分的内容比较多,将它们单独放到pkg目录下
$ mkdir pkg
# 最终项目目录结构如下
.
├── go.mod
├── go.sum
├── main.go
└── pkg
    └── controller.go

接着我们来实现controller部分:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
package pkg
import (
    "context"
    apiCoreV1 "k8s.io/api/core/v1"
    netV1 "k8s.io/api/networking/v1"
    "k8s.io/apimachinery/pkg/api/errors"
    metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/util/runtime"
    "k8s.io/apimachinery/pkg/util/wait"
    informersCoreV1 "k8s.io/client-go/informers/core/v1"
    informersNetV1 "k8s.io/client-go/informers/networking/v1"
    "k8s.io/client-go/kubernetes"
    coreV1 "k8s.io/client-go/listers/core/v1"
    v1 "k8s.io/client-go/listers/networking/v1"
    "k8s.io/client-go/tools/cache"
    "k8s.io/client-go/util/workqueue"
    "reflect"
    "time"
)
const (
    workNum  = 5  // 工作的节点数
    maxRetry = 10 // 最大重试次数
)
// 定义控制器
type Controller struct {
    client        kubernetes.Interface
    ingressLister v1.IngressLister
    serviceLister coreV1.ServiceLister
    queue         workqueue.RateLimitingInterface
}
// 初始化控制器
func NewController(client kubernetes.Interface, serviceInformer informersCoreV1.ServiceInformer, ingressInformer informersNetV1.IngressInformer) Controller {
    c := Controller{
        client:        client,
        ingressLister: ingressInformer.Lister(),
        serviceLister: serviceInformer.Lister(),
        queue:         workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ingressManager"),
    }
    // 添加事件处理函数
    serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    c.addService,
        UpdateFunc: c.updateService,
    })
    ingressInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        DeleteFunc: c.deleteIngress,
    })
    return c
}
// 入队
func (c *Controller) enqueue(obj interface{}) {
    key, err := cache.MetaNamespaceKeyFunc(obj)
    if err != nil {
        runtime.HandleError(err)
    }
    c.queue.Add(key)
}
func (c *Controller) addService(obj interface{}) {
    c.enqueue(obj)
}
func (c *Controller) updateService(oldObj, newObj interface{}) {
    // todo 比较annotation
    // 这里只是比较了对象是否相同,如果相同,直接返回
    if reflect.DeepEqual(oldObj, newObj) {
        return
    }
    c.enqueue(newObj)
}
func (c *Controller) deleteIngress(obj interface{}) {
    ingress := obj.(*netV1.Ingress)
    ownerReference := metaV1.GetControllerOf(ingress)
    if ownerReference == nil {
        return
    }
    // 判断是否为真的service
    if ownerReference.Kind != "Service" {
        return
    }
    c.queue.Add(ingress.Namespace + "/" + ingress.Name)
}
// 启动控制器,可以看到开了五个协程,真正干活的是worker
func (c *Controller) Run(stopCh chan struct{}) {
    for i := 0; i < workNum; i++ {
        go wait.Until(c.worker, time.Minute, stopCh)
    }
    <-stopCh
}
func (c *Controller) worker() {
    for c.processNextItem() {
    }
}
// 业务真正处理的地方
func (c *Controller) processNextItem() bool {
    // 获取key
    item, shutdown := c.queue.Get()
    if shutdown {
        return false
    }
    defer c.queue.Done(item)
  // 调用业务逻辑
    err := c.syncService(item.(string))
    if err != nil {
    // 对错误进行处理
        c.handlerError(item.(string), err)
        return false
    }
    return true
}
 
func (c *Controller) syncService(item string) error {
    namespace, name, err := cache.SplitMetaNamespaceKey(item)
    if err != nil {
        return err
    }
    // 获取service
    service, err := c.serviceLister.Services(namespace).Get(name)
    if err != nil {
        if errors.IsNotFound(err) {
            return nil
        }
        return err
    }
    // 新增和删除
    _, ok := service.GetAnnotations()["ingress/http"]
    ingress, err := c.ingressLister.Ingresses(namespace).Get(name)
    if err != nil && !errors.IsNotFound(err) {
        return err
    }
    if ok && errors.IsNotFound(err) {
        // 创建ingress
        ig := c.constructIngress(service)
        _, err := c.client.NetworkingV1().Ingresses(namespace).Create(context.TODO(), ig, metaV1.CreateOptions{})
        if err != nil {
            return err
        }
    } else if !ok && ingress != nil {
        // 删除ingress
        err := c.client.NetworkingV1().Ingresses(namespace).Delete(context.TODO(), name, metaV1.DeleteOptions{})
        if err != nil {
            return err
        }
    }
    return nil
}
func (c *Controller) handlerError(key string, err error) {
    // 如果出现错误,重新加入队列,最大处理10次
    if c.queue.NumRequeues(key) <= maxRetry {
        c.queue.AddRateLimited(key)
        return
    }
    runtime.HandleError(err)
    c.queue.Forget(key)
}
func (c *Controller) constructIngress(service *apiCoreV1.Service) *netV1.Ingress {
    // 构造ingress
    pathType := netV1.PathTypePrefix
    ingress := netV1.Ingress{}
    ingress.ObjectMeta.OwnerReferences = []metaV1.OwnerReference{
        *metaV1.NewControllerRef(service, apiCoreV1.SchemeGroupVersion.WithKind("Service")),
    }
    ingress.Namespace = service.Namespace
    ingress.Name = service.Name
    ingress.Spec = netV1.IngressSpec{
        Rules: []netV1.IngressRule{
            {
                Host: "example.com",
                IngressRuleValue: netV1.IngressRuleValue{
                    HTTP: &netV1.HTTPIngressRuleValue{
                        Paths: []netV1.HTTPIngressPath{
                            {
                                Path:     "/",
                                PathType: &pathType,
                                Backend: netV1.IngressBackend{
                                    Service: &netV1.IngressServiceBackend{
                                        Name: service.Name,
                                        Port: netV1.ServiceBackendPort{
                                            Number: 80,
                                        },
                                    },
                                },
                            },
                        },
                    },
                },
            },
        },
    }
    return &ingress
}

接下来我们来实现main:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package main
import (
    "ingress-manager/pkg"
    "k8s.io/client-go/informers"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/rest"
    "k8s.io/client-go/tools/clientcmd"
)
func main() {
    // 获取config
    // 先尝试从集群外部获取,获取不到则从集群内部获取
    var config, err = clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
    if err != nil {
        clusterConfig, err := rest.InClusterConfig()
        if err != nil {
            panic(err)
        }
        config = clusterConfig
    }
    // 通过config创建 clientSet
    clientSet, err := kubernetes.NewForConfig(config)
    if err != nil {
        panic(err)
    }
    // 通过 client 创建 informer,添加事件处理函数
    factory := informers.NewSharedInformerFactory(clientSet, 0)
    serviceInformer := factory.Core().V1().Services()
    ingressInformer := factory.Networking().V1().Ingresses()
    newController := pkg.NewController(clientSet, serviceInformer, ingressInformer)
    // 启动 informer
    stopCh := make(chan struct{})
    factory.Start(stopCh)
    factory.WaitForCacheSync(stopCh)
    newController.Run(stopCh)
}

测试

首先创建deploy和service:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
apiVersion: apps/v1
kind: Deployment
metadata:
  name: my-nginx
spec:
  selector:
    matchLabels:
      app: my-nginx
  template:
    metadata:
      labels:
        app: my-nginx
    spec:
      containers:
        - name: my-nginx
          image: nginx:1.17.1
          ports:
            - containerPort: 80
---
apiVersion: v1
kind: Service
metadata:
  name: my-nginx
  labels:
    app: my-nginx
spec:
  ports:
    - port: 80
      protocol: TCP
      name: http
  selector:
    app: my-nginx

创建完成后进行查看:

?
1
2
3
4
5
6
$ kubectl get deploy,service,ingress
NAME                              READY   UP-TO-DATE   AVAILABLE   AGE
deployment.apps/my-nginx          1/1     1            1           7m
NAME                 TYPE        CLUSTER-IP     EXTERNAL-IP   PORT(S)   AGE
service/kubernetes   ClusterIP   10.96.0.1      <none>        443/TCP   78d
service/my-nginx     ClusterIP   10.105.32.46   <none>        80/TCP    7m

上面的命令我分别获取deploy,service,ingress,但是只获取到了deployservice,这符合我们的预期。接着我们给service/m-nginx中的annotations添加ingress/http: nginx

?
1
2
3
4
5
6
7
8
9
10
$ kubectl edit service/my-nginx
apiVersion: v1
kind: Service
metadata:
  annotations:
    ingress/http: nginx
    kubectl.kubernetes.io/last-applied-configuration: |
      {"apiVersion":"v1","kind":"Service","metadata":{"annotations":{},"labels":{"app":"my-nginx"},"name":"my-nginx","namespace":"default"},"spec":{"ports":[{"name":"http","port":80,"protocol":"TCP"}],"selector":{"app":"my-nginx"}}}
      ......
service/my-nginx edited

重新进行查看:

?
1
2
3
4
5
6
7
8
9
$ kubectl get deploy,service,ingress
NAME                              READY   UP-TO-DATE   AVAILABLE   AGE
deployment.apps/demo-deployment   1/1     1            1           41d
deployment.apps/my-nginx          1/1     1            1           11m
NAME                 TYPE        CLUSTER-IP     EXTERNAL-IP   PORT(S)   AGE
service/kubernetes   ClusterIP   10.96.0.1      <none>        443/TCP   78d
service/my-nginx     ClusterIP   10.105.32.46   <none>        80/TCP    11m
NAME                                 CLASS    HOSTS         ADDRESS   PORTS   AGE
ingress.networking.k8s.io/my-nginx   <none>   example.com             80      19s

接着我们再来测试下,将ingress/http: nginx注释掉,看看ingress是否会自动删除:

?
1
2
3
4
5
6
7
$ kubectl get deploy,service,ingress
NAME                              READY   UP-TO-DATE   AVAILABLE   AGE
deployment.apps/demo-deployment   1/1     1            1           41d
deployment.apps/my-nginx          1/1     1            1           19m
NAME                 TYPE        CLUSTER-IP     EXTERNAL-IP   PORT(S)   AGE
service/kubernetes   ClusterIP   10.96.0.1      <none>        443/TCP   78d
service/my-nginx     ClusterIP   10.105.32.46   <none>        80/TCP    19m

我们发现和我们预期的效果一样。

如果service被删除了,ingress肯定也是不会存在的。这个这里就不多演示了。有兴趣可以自行测试下。

到此这篇关于使用client-go实现自定义控制器的文章就介绍到这了,更多相关client-go自定义控制器内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!

原文链接:https://juejin.cn/post/7096297114812514318

延伸 · 阅读

精彩推荐
  • GolangGolang中channel使用的一些小技巧

    Golang中channel使用的一些小技巧

    这篇文章主要介绍了Golang中channel使用的一些小技巧,本文讲解了关闭2次、读取的时候channel提前关闭了、向已经关闭的channel写数据等技巧及这实例代码,需要...

    脚本之家5712020-04-26
  • GolangGolang语言如何高效拼接字符串详解

    Golang语言如何高效拼接字符串详解

    最近在做性能优化,有个函数里面的耗时特别长,看里面的操作大多是一些字符串拼接的操作,而字符串拼接在 golang 里面其实有很多种实现,下面这篇文章主要...

    frank8812021-11-26
  • Golang利用go-kit组件进行服务注册与发现和健康检查的操作

    利用go-kit组件进行服务注册与发现和健康检查的操作

    这篇文章主要介绍了利用go-kit组件进行服务注册与发现和健康检查的操作,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧...

    鹿灏楷silves9152021-06-02
  • GolangGO语言(golang)基础知识

    GO语言(golang)基础知识

    这篇文章主要介绍了GO语言(golang)基础知识,需要的朋友可以参考下 ...

    hebedich4282020-04-12
  • Golang利用Golang如何调用Linux命令详解

    利用Golang如何调用Linux命令详解

    这篇文章主要给大家介绍了Golang中使用os/exec来执行 Linux 命令的相关资料,文中给出了详细的示例代码,对大家具有一定的参考学习价值,需要的朋友们下...

    田飞雨的网站12532020-05-06
  • Golanggolang实现通过smtp发送电子邮件的方法

    golang实现通过smtp发送电子邮件的方法

    这篇文章主要介绍了golang实现通过smtp发送电子邮件的方法,实例分析了Go语言基于SMTP协议发送邮件的相关技巧,需要的朋友可以参考下 ...

    dotcoo7542020-04-30
  • Golanggo代码实现买房贷款月供计算的方法

    go代码实现买房贷款月供计算的方法

    今天小编就为大家分享一篇关于go代码实现买房贷款月供计算的方法,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟...

    stpeace2442020-05-25
  • Golanggolang连接redis库及基本操作示例过程

    golang连接redis库及基本操作示例过程

    这篇文章主要介绍了golang连接redis库及基本操作示例过程,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步早日升职加薪...

    Jeff的技术栈10232022-09-15