脚本之家,脚本语言编程技术及教程分享平台!
分类导航

Python|VBS|Ruby|Lua|perl|VBA|Golang|PowerShell|Erlang|autoit|Dos|bat|shell|

服务器之家 - 脚本之家 - Python - python结合shell自动创建kafka的连接器实战教程

python结合shell自动创建kafka的连接器实战教程

2022-12-12 10:58时空无限 Python

这篇文章主要介绍了python结合shell自动创建kafka的连接器,需要安装连接oracle的python包,获取oracle表信息,本文给大家介绍的非常详细,需要的朋友可以参考下

环境

?
1
2
3
4
5
6
cat /etc/redhat-release
CentOS Linux release 7.5.1804 (Core)
[root@localhost ~]# uname -a
Linux localhost.localdomain 3.10.0-862.el7.x86_64 #1 SMP Fri Apr 20 16:44:24 UTC 2018 x86_64 x86_64 x86_64 GNU/Linux
python -V
Python 2.7.5

安装连接oracle的python包

?
1
pip install cx_Oracle==7.3

获取oracle表信息

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
cat query_oracle.py
#!/usr/bin/env python
import cx_Oracle
import sys
import os
import csv
import traceback
file = open("oracle.txt", 'w').close()
user = "test"
passwd = "test"
listener = '10.0.2.15:1521/orcl'
conn = cx_Oracle.connect(user, passwd, listener)
cursor = conn.cursor()
sql = "select table_name from user_tables"
 
cursor.execute(sql)
LIST1=[]
while True:
    row = cursor.fetchone()
    if row == None:
        break
    for table in row:
        #print table
        LIST1.append(table)
LIST2=[]
for i in LIST1:
    sql3 = "select COLUMN_NAME,DATA_TYPE,DATA_PRECISION,DATA_SCALE from cols WHERE TABLE_name=upper('%s')" %i
    cursor.execute(sql3)
    cursor.execute(sql3)
    row3 = cursor.fetchall()
    for data in row3:
        #LIST2.append(i)
        LIST2.extend(list(data))
    LIST2.append(i)
    f=open('oracle.txt','a+')
    print >> f,LIST2
    LIST2=[]
#f=open('test.txt','a+')
#select table_name,column_name,DATA_TYPE from cols WHERE TABLE_name=upper('student');
#select column_name,DATA_TYPE from cols WHERE TABLE_name=upper('student');

去掉多余部分

?
1
2
3
4
5
6
cat auto.sh
#!/bin/bash
#python query_oracle.py |tr "," ' '|tr "'" ' '|tr "[" " "|tr "]" " "
#>oracle.txt
>oracle_tables.txt
cat oracle.txt |tr "[],'" " "|sed "s#[ ][ ]*# #g"|sed 's/^[ \t]*//g' >> oracle_tables.txt
?
1
2
3
4
cat oracle_tables.txt
SNO NUMBER 19 0 SNAME VARCHAR2 None None SSEX VARCHAR2 None None SBIRTHDAY DATE None None SCLASS VARCHAR2 None None STUDENT DATE_DATE
SNO2 NUMBER 19 0 SNAME VARCHAR2 None None SSEX VARCHAR2 None None SBIRTHDAY DATE None None SCLASS VARCHAR2 None None STUDENT2 INPUT_TIME
SNO3 NUMBER 19 2 SNAME VARCHAR2 None None SSEX VARCHAR2 None None SBIRTHDAY DATE None None SCLASS VARCHAR2 None None STUDENT3 DATA_DATE

shell 脚本处理表信息文件

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
cat connect.sh
#!/bin/bash
#获取临时文件的行数
FILE_NUM=$(cat oracle_tables.txt |egrep -v '#|^$'|wc -l)
#清空自动创建连接器的脚本
>create-connect.sh
#循环临时文件每一行
for i in `seq $FILE_NUM`
do
    FILE_LINE=$(sed -n ${i}p oracle_tables.txt)
    TABLE_NAME=$(echo ${FILE_LINE}|sed 's/[ \t]*$//g'|awk '{print $(NF-1)}')
    COL_NUM=$(echo ${FILE_LINE}|sed 's/[ \t]*$//g'|awk -F "[ ]" '{print NF}')
    REAL_COL_NUM=`expr $COL_NUM - 2`
    #清空临时文件
    >${TABLE_NAME}.txt
    >${TABLE_NAME}.sql
    #循环临时文件每行列名所在的列
    for j in `seq 1 4 $REAL_COL_NUM`
    do
        k=`expr $j + 1`
        m=`expr $j + 2`
        n=`expr $j + 3`
        COL_NAME=$(echo $FILE_LINE|cut -d " " -f${j})
        COL_DATA_TYPE=$(echo $FILE_LINE|cut -d " " -f${k})
        COL_DATA_PRECISION=$(echo $FILE_LINE|cut -d " " -f${m})
        COL_DATA_SCALE=$(echo $FILE_LINE|cut -d " " -f${n})
        #判断列的数据类型是否是NUMBER
        if [ "$COL_DATA_TYPE" = "NUMBER" ]
        then
        #循环拼接SQL查询中的CAST(* AS *) AS *部分,追加到临时文件中
            echo "CAST($COL_NAME AS $COL_DATA_TYPE($COL_DATA_PRECISION,$COL_DATA_SCALE)) AS $COL_NAME" >> ${TABLE_NAME}.txt
        else
        #循环拼接SQL查询中的列名部分,追加到临时文件中
            echo "$COL_NAME" >> ${TABLE_NAME}.txt
        fi
    done
    #拼接完整的SQL语句,追加到临时文件中
    echo "select $(cat ${TABLE_NAME}.txt |tr "\n" ","|sed -e 's/,$/\n/') from $TABLE_NAME where $(sed -n ${i}p oracle_tables.txt|cut -d ' ' -f$COL_NUM)>=trunc(sysdate-2) and $(sed -n ${i}p oracle_tables.txt|cut -d ' ' -f$COL_NUM)<trunc(sysdate-1)" >> ${TABLE_NAME}.sql
#循环追加每个表对应的连接器到自动创建连接器的脚本中
cat >> create-connect.sh << EOF
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "jdbc_source_$TABLE_NAME",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:oracle:thin:@{{ ORACLE_IP }}:{{ ORACLE_PORT }}:orcl",
"connection.user": "{{ ORACLE_USER }}",
"connection.password": "{{ ORACLE_PASSWD }}",
"topic.prefix": "YC_$TABLE_NAME",
"mode": "{{ CONNECT_MODE }}",
"query": "$(cat ${TABLE_NAME}.sql)"
}
}' >/dev/null 2>&1
EOF
done

说明:脚本中{{ 变量名 }}部分的内容是获取ansible中的变量,这个脚本是和ansible结合使用的。

增强版处理表信息脚本

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
#!/bin/bash
#获取临时文件的行数
FILE_NUM=$(cat oracle_time_tables.txt |egrep -v '#|^$'|wc -l)
#清空创建连接器的脚本并追加echos函数
> create-jdbc-connect.sh
cat >> create-jdbc-connect.sh << EOF
#!/bin/bash
echos(){
case \$1 in
red)    echo -e "\033[31m \$2 \033[0m";;
green)  echo -e "\033[32m \$2 \033[0m";;
yellow) echo -e "\033[33m \$2 \033[0m";;
blue)   echo -e "\033[34m \$2 \033[0m";;
purple) echo -e "\033[35m \$2 \033[0m";;
*)      echo "\$2";;
esac
}
EOF
> create-jdbc-connect-time.sh
cat >> create-jdbc-connect-time.sh << EOF
#!/bin/bash
echos(){
case \$1 in
red)    echo -e "\033[31m \$2 \033[0m";;
green)  echo -e "\033[32m \$2 \033[0m";;
yellow) echo -e "\033[33m \$2 \033[0m";;
blue)   echo -e "\033[34m \$2 \033[0m";;
purple) echo -e "\033[35m \$2 \033[0m";;
*)      echo "\$2";;
esac
}
EOF
#创建表相关文件目录
mkdir -p ./TABLE_TIME
#循环临时文件每一行
for i in `seq $FILE_NUM`
do
    FILE_LINE=$(sed -n ${i}p oracle_time_tables.txt)
    TABLE_NAME=$(echo ${FILE_LINE}|sed 's/[ \t]*$//g'|awk '{print $(NF)}')
    COL_NUM=$(echo ${FILE_LINE}|sed 's/[ \t]*$//g'|awk -F "[ ]" '{print NF}')
    REAL_COL_NUM=`expr $COL_NUM - 2`
    #清空临时文件
    >./TABLE_TIME/${TABLE_NAME}_time.txt
    >./TABLE_TIME/${TABLE_NAME}_time.sql
    >./TABLE_TIME/${TABLE_NAME}.sql
    #循环临时文件每行列名所在的列
    for j in `seq 1 4 $REAL_COL_NUM`
    do
        k=`expr $j + 1`
        m=`expr $j + 2`
        n=`expr $j + 3`
        COL_NAME=$(echo $FILE_LINE|cut -d " " -f${j})
        COL_DATA_TYPE=$(echo $FILE_LINE|cut -d " " -f${k})
        COL_DATA_PRECISION=$(echo $FILE_LINE|cut -d " " -f${m})
        COL_DATA_SCALE=$(echo $FILE_LINE|cut -d " " -f${n})
        #判断列的数据类型是否是NUMBER
        if [ "$COL_DATA_TYPE" = "NUMBER" ]
        then
        #循环拼接SQL查询中的CAST(* AS *) AS *部分,追加到临时文件中
            echo "CAST($COL_NAME AS $COL_DATA_TYPE($COL_DATA_PRECISION,$COL_DATA_SCALE)) AS $COL_NAME" >> ./TABLE_TIME/${TABLE_NAME}_time.txt
        else
        #循环拼接SQL查询中的列名部分,追加到临时文件中
            echo "$COL_NAME" >> ./TABLE_TIME/${TABLE_NAME}_time.txt
        fi
        #判断是否存在hosts中定义的时间列,如果有就追加该列名进一个临时文件中
        TIME_COL=({{ TABLE_TIME_COL }})
        for TIME in ${TIME_COL[@]}
        do
            if [ "$COL_NAME" = "$TIME" ]
            then
                echo "$COL_NAME" > ./TABLE_TIME/${TABLE_NAME}_TIME_COL.txt
            fi
        done
    done
    #拼接完整的SQL语句,追加到临时文件中
    if [ -f "./TABLE_TIME/${TABLE_NAME}_TIME_COL.txt" ]
    then
    #echo "select $(cat ./TABLE_TIME/${TABLE_NAME}.txt |tr "\n" ","|sed -e 's/,$/\n/') from {{ ORACLE_TABLES_USER }}.$TABLE_NAME where $(sed -n ${i}p oracle_tables.txt|cut -d ' ' -f$COL_NUM)>=trunc(sysdate-2) and $(sed -n ${i}p oracle_tables.txt|cut -d ' ' -f$COL_NUM)<trunc(sysdate-1)" >> ./TABLE_TIME/${TABLE_NAME}_time.sql
        echo "select $(cat ./TABLE_TIME/${TABLE_NAME}_time.txt |tr "\n" ","|sed -e 's/,$/\n/') from {{ ORACLE_TABLES_USER }}.$TABLE_NAME where $(cat ./TABLE_TIME/${TABLE_NAME}_TIME_COL.txt)>=trunc(sysdate-2) and $(cat ./TABLE_TIME/${TABLE_NAME}_TIME_COL.txt)<trunc(sysdate-1)" >> ./TABLE_TIME/${TABLE_NAME}_time.sql
    else
        echo "select $(cat ./TABLE_TIME/${TABLE_NAME}_time.txt |tr "\n" ","|sed -e 's/,$/\n/') from {{ ORACLE_TABLES_USER }}.$TABLE_NAME" >> ./TABLE_TIME/${TABLE_NAME}.sql
    fi
#循环追加每个表对应的连接器到自动创建连接器的脚本中
if [ -f "./TABLE_TIME/${TABLE_NAME}_TIME_COL.txt" ]
then
cat >> create-jdbc-connect-time.sh << EOF
#创建表 $TABLE_NAME 连接器的命令如下
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "jdbc_time_$TABLE_NAME",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:oracle:thin:@{{ ORACLE_IP }}:{{ ORACLE_PORT }}:{{ ORACLE_SERVER_NAME }}",
"connection.user": "{{ ORACLE_USER }}",
"connection.password": "{{ ORACLE_PASSWD }}",
"topic.prefix": "YC_${TABLE_NAME}_INSERT",
"poll.interval.ms": "86400000",
"mode": "{{ CONNECT_MODE }}",
"numeric.mapping": "best_fit",
"query": "$(cat ./TABLE_TIME/${TABLE_NAME}_time.sql)"
}
}' >/dev/null 2>&1
#判断连接器是否创建成功
if [ \$? -eq 0 ]
then
    echos green "\$(date +"%F %H:%M:%S") 创建jdbc_time_${TABLE_NAME} 连接器成功"
else
    echos red "\$(date +"%F %H:%M:%S") 创建jdbc_time_${TABLE_NAME} 连接器失败"
fi
EOF
else
cat >> create-jdbc-connect.sh << EOF
#创建表 $TABLE_NAME 连接器的命令如下
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "jdbc_$TABLE_NAME",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:oracle:thin:@{{ ORACLE_IP }}:{{ ORACLE_PORT }}:{{ ORACLE_SERVER_NAME }}",
"connection.user": "{{ ORACLE_USER }}",
"connection.password": "{{ ORACLE_PASSWD }}",
"topic.prefix": "YC_${TABLE_NAME}_INSERT",
"poll.interval.ms": "86400000",
"mode": "{{ CONNECT_MODE }}",
"numeric.mapping": "best_fit",
"query": "$(cat ./TABLE_TIME/${TABLE_NAME}.sql)"
}
}' >/dev/null 2>&1
#判断连接器是否创建成功
if [ \$? -eq 0 ]
then
    echos green "\$(date +"%F %H:%M:%S") 创建jdbc_${TABLE_NAME} 连接器成功"
else
    echos red "\$(date +"%F %H:%M:%S") 创建jdbc_${TABLE_NAME} 连接器失败"
fi
EOF
fi
done

到此这篇关于python结合shell自动创建kafka的连接器的文章就介绍到这了,更多相关python创建kafka连接器内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!

原文链接:https://blog.csdn.net/weixin_40548182/article/details/118422058

延伸 · 阅读

精彩推荐
  • Pythonpython判断一个变量是否已经设置的方法

    python判断一个变量是否已经设置的方法

    这篇文章主要介绍了python判断一个变量是否已经设置的方法,有需要的朋友们可以跟着学习参考下。...

    laozhang4052020-08-13
  • Pythontensorflow实现简单逻辑回归

    tensorflow实现简单逻辑回归

    逻辑回归是一种分类器模型,这篇文章主要介绍了tensorflow实现简单逻辑回归,具有一定的参考价值,感兴趣的小伙伴们可以参考一下...

    WSS56787222021-04-01
  • PythonPython 使用os.remove删除文件夹时报错的解决方法

    Python 使用os.remove删除文件夹时报错的解决方法

    下面小编就为大家带来一篇Python 使用os.remove删除文件夹时报错的解决方法。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来...

    脚本之家13212020-09-17
  • PythonPython模块pexpect安装及使用流程

    Python模块pexpect安装及使用流程

    Pexpect使Python成为控制其他应用程序的更好工具,这篇文章主要介绍了Python模块之pexpect的安装及使用流程,需要的朋友可以参考下...

    二进制杯莫停4352022-10-14
  • PythonDjango开发中的日志输出的方法

    Django开发中的日志输出的方法

    这篇文章主要介绍了Django开发中的日志输出的方法,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧...

    10,相濡以沫6192021-03-11
  • Pythonpython3实现基于用户的协同过滤

    python3实现基于用户的协同过滤

    这篇文章主要为大家详细介绍了python3实现基于用户的协同过滤,具有一定的参考价值,感兴趣的小伙伴们可以参考一下...

    qqzhuimengren7412021-02-27
  • PythonPython基础之数据类型详解

    Python基础之数据类型详解

    python的数值类型包括整数,浮点数,复数,集合,小数和分数,布尔值.它们都是python中的数值类型.如果是有过其他语言编写经验的人,一定很好奇,浮点数和小数的...

    zy0101018102021-11-21
  • Pythonpython实现取余操作的简单实例

    python实现取余操作的简单实例

    在本篇文章里小编给各位分享的是一篇关于python实现取余操作的简单实例内容,需要的朋友们可以参考下。...

    FXL12042020-08-17