同步MySQL增量数据到ClickHouse
2018/02/26
posted in
Tech
2018/02/26
posted in
Tech
kafka
的安装只需要下载、解压、启动即可
wget http://mirror.its.dal.ca/apache/kafka/1.0.0/kafka_2.11-1.0.0.tgz
tar -zxf kafka_2.11-1.0.0.tgz
cd kafka_2.11-1.0.0/
kafka
需要依赖于zk
,zk
可以直接使用kafka
安装包里自带的zk
。
bin/zookeeper-server-start.sh config/zookeeper.properties &
但是在启动之前,我们可能需要先修改一下配置文件config/server.properties
一般需要修改的几个参数有
# kafka broker的id
broker.id=0
# kafka监听的地址
listeners=PLAINTEXT://:9092
# kafka的日志地址
log.dirs=/tmp/kafka-logs
# kafka使用的zk的地址
zookeeper.connect=localhost:2181
启动broker
bin/kafka-server-start.sh config/server.properties &
查看kafka
是否启动了
$ jps
1240 QuorumPeerMain
1817 Jps
1518 Kafka
上面的QuorumPeerMain
就是zk
,Kafka
就是我们刚才启动的broker
我们还可以在启动两个新broker
,但是需要先复制配置文件并修改里面的broker.id
、listeners
、log.dirs
值,使之不冲突
bin/kafka-server-start.sh config/server-1.properties &
bin/kafka-server-start.sh config/server-2.properties &
然后我们可以测试一些简单的操作
创建名为test
的topic
,只有一个分区,一个副本
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --partitons 1 --topic test
查看topic
bin/kafka-topics.sh --list --zookeeper 127.0.0.1:2181
查看test
的分区和副本状态
bin/kafka-topics.sh --describe --zookeeper 127.0.0.1:2181 --topic test
使用生产者推送消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
然后打开另一个窗口,使用消费者从开始获取消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
关于Maxwell
的相关资料可以直接查看官网
下载并解压maxwell
wget https://github.com/zendesk/maxwell/releases/download/v1.13.2/maxwell-1.13.2.tar.gz
tar -zxf maxwell-1.13.2.tar.gz
cd maxwell-1.13.2
在mysql
中创建一个maxwell
账户
GRANT ALL on maxwell.* to 'maxwell'@'%' identified by 'XXXXXX';
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%';
拷贝一份配置文件的模板config.properties.example
cp config.properties.example config.properties
然后编辑配置文件,并将ddl操作
发送给单独的topic
# tl;dr config
log_level=info
producer=kafka
kafka.bootstrap.servers=localhost:9092
# mysql login info
host=localhost
user=maxwell
password=maxwell
######### output format stuff ###############
# 记录binlog position(默认关闭)
output_binlog_position=true
# 记录gtid(默认关闭)
output_gtid_position=true
# 记录空值字段(默认开启)
output_nulls=true
# 记录server_id(默认关闭)
output_server_id=true
# 记录thread_id(默认关闭)
output_thread_id=true
# 记录原始的SQL语句,需要在mysql中打开参数"binlog_rows_query_log_events" must be enabled"(默认关闭)
output_row_query=true
# 记录commit和xid信息(默认开启)
output_commit_info=true
# 记录ddl操作
output_ddl=true
######### kafka stuff ###############
# binlog日志解析到的topic
kafka_topic=maxwell
# ddl操作的binlog日志解析到的topic,需要开启前面的output_ddl选项
ddl_kafka_topic=maxwell_ddl
为maxwell
创建topic
bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic maxwell --partitions 20 --replication-factor 1
bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic maxwell_ddl --partitions 1 --replication-factor 1
使用配置文件启动maxwell
bin/maxwell --config=config.properties &
现在我们可以修改数据库的数据,然后在kafka目录下观察队列中的数据
# 在kafka中查看binlog的变化
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic maxwell --from-beginning
# 在kafka中查看ddl的binlog的变化
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic maxwell_ddl --from-beginning
官方文档直接提供了使用apt-get
安装的方法
但是官方并没有提供yum
的安装方法,我这里使用了第三方提供的仓库,
# 安装yum-config-manager程序
yum install yum-utils
# centos 6 的仓库
yum-config-manager --add-repo http://repo.red-soft.biz/repos/clickhouse/repo/clickhouse-el6.repo
# centos 7 的仓库
yum-config-manager --add-repo http://repo.red-soft.biz/repos/clickhouse/repo/clickhouse-el7.repo
# 安装软件包
yum install clickhouse-server clickhouse-client clickhouse-server-common clickhouse-compressor
修改配置文件vim /etc/clickhouse-server/config.xml
...
<!-- 修改监听IP -->
<listen_host>::</listen_host>
<listen_host>0.0.0.0</listen_host>
...
<!-- 修改数据路径 -->
<path>/data/clickhouse/</path>
<tmp_path>/data/clickhouse/tmp/</tmp_path>
...
启动服务
service clickhouse-server start
进入clickhouse
clickhouse-client -mn
创建
CREATE TABLE data_record.row_record (
r_date Date,
database String,
table String,
type String,
ts Int64,
xid Int64,
position String,
gtid Nullable(String),
server_id Int64,
thread_id Int64,
data String,
old String) ENGINE = MergeTree(r_date, (r_date, database, table), 8192)
下面是一个简单的例子
# -*- coding: utf-8 -*-
from confluent_kafka import Consumer, KafkaError
import clickhouse_driver
import logging
import json
import datetime
class ChWriter(object):
def __init__(self, setting):
self.conn = clickhouse_driver.Client(**setting)
def ch_insert(self, sql, dicts):
"""插入数据"""
self.conn.execute(sql, dicts)
def prefunc(data):
data['r_date'] = datetime.datetime.fromtimestamp(data['ts']).date()
print('timestamp:', data['ts'], '\n', 'date:', data['r_date'])
data['gtid'] = None
data['data'] = str(data['data'])
if 'position' not in data: data['position'] = ''
if 'server_id' not in data: data['server_id'] = 0
if 'thread_id' not in data: data['thread_id'] = 0
if data['type'] == 'update':
data['old'] = str(data['old'])
else:
data['old'] = ''
return data
if __name__ == "__main__":
# logging.basicConfig(filename='example.log', level=logging.DEBUG)
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch_setting = {
'host': '10.10.4.103',
'port': 9000,
'user': 'default',
'password': '',
}
chw = ChWriter(ch_setting)
sql = "INSERT INTO `data_record`.`row_record`(r_date,database,table,type,ts,xid,position,gtid,server_id,thread_id,data,old) VALUES"
c = Consumer({'bootstrap.servers': '10.10.4.103:9093', 'group.id': '0',
'default.topic.config': {'auto.offset.reset': 'smallest'}})
c.subscribe(['maxwell'])
running = True
while running:
msg = c.poll()
if not msg.error():
r_data = msg.value().decode('utf-8')
print('Received message: %s' % r_data)
data = json.loads(r_data)
prefunc(data)
chw.ch_insert(sql, [data])
elif msg.error().code() != KafkaError._PARTITION_EOF:
print(msg.error())
running = False
c.close()