同步MySQL增量数据到ClickHouse

2018/02/26 posted in  Tech
Tags: 

安装

kafka

kafka的安装只需要下载、解压、启动即可

wget http://mirror.its.dal.ca/apache/kafka/1.0.0/kafka_2.11-1.0.0.tgz
tar -zxf kafka_2.11-1.0.0.tgz 
cd kafka_2.11-1.0.0/

kafka需要依赖于zkzk可以直接使用kafka安装包里自带的zk

bin/zookeeper-server-start.sh config/zookeeper.properties &

但是在启动之前,我们可能需要先修改一下配置文件config/server.properties
一般需要修改的几个参数有

# kafka broker的id
broker.id=0
# kafka监听的地址
listeners=PLAINTEXT://:9092
# kafka的日志地址
log.dirs=/tmp/kafka-logs
# kafka使用的zk的地址
zookeeper.connect=localhost:2181

启动broker

bin/kafka-server-start.sh config/server.properties &

查看kafka是否启动了

$ jps
1240 QuorumPeerMain
1817 Jps
1518 Kafka

上面的QuorumPeerMain就是zkKafka就是我们刚才启动的broker
我们还可以在启动两个新broker,但是需要先复制配置文件并修改里面的broker.idlistenerslog.dirs值,使之不冲突

bin/kafka-server-start.sh config/server-1.properties &
bin/kafka-server-start.sh config/server-2.properties &

然后我们可以测试一些简单的操作
创建名为testtopic,只有一个分区,一个副本

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --partitons 1 --topic test

查看topic

bin/kafka-topics.sh --list --zookeeper 127.0.0.1:2181

查看test的分区和副本状态

bin/kafka-topics.sh --describe --zookeeper 127.0.0.1:2181 --topic test

使用生产者推送消息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

然后打开另一个窗口,使用消费者从开始获取消息

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

Maxwell

关于Maxwell的相关资料可以直接查看官网
下载并解压maxwell

wget https://github.com/zendesk/maxwell/releases/download/v1.13.2/maxwell-1.13.2.tar.gz
tar -zxf maxwell-1.13.2.tar.gz
cd maxwell-1.13.2

mysql中创建一个maxwell账户

GRANT ALL on maxwell.* to 'maxwell'@'%' identified by 'XXXXXX';
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%';

拷贝一份配置文件的模板config.properties.example

cp config.properties.example config.properties

然后编辑配置文件,并将ddl操作发送给单独的topic

# tl;dr config
log_level=info
producer=kafka
kafka.bootstrap.servers=localhost:9092

# mysql login info
host=localhost
user=maxwell
password=maxwell

######### output format stuff ###############
# 记录binlog position(默认关闭)
output_binlog_position=true
# 记录gtid(默认关闭)
output_gtid_position=true
# 记录空值字段(默认开启)
output_nulls=true
# 记录server_id(默认关闭)
output_server_id=true
# 记录thread_id(默认关闭)
output_thread_id=true
# 记录原始的SQL语句,需要在mysql中打开参数"binlog_rows_query_log_events" must be enabled"(默认关闭)
output_row_query=true
# 记录commit和xid信息(默认开启)
output_commit_info=true
# 记录ddl操作
output_ddl=true

######### kafka stuff ###############
# binlog日志解析到的topic
kafka_topic=maxwell
# ddl操作的binlog日志解析到的topic,需要开启前面的output_ddl选项
ddl_kafka_topic=maxwell_ddl

maxwell创建topic

bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic maxwell --partitions 20 --replication-factor 1
bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic maxwell_ddl --partitions 1 --replication-factor 1

使用配置文件启动maxwell

bin/maxwell --config=config.properties &

现在我们可以修改数据库的数据,然后在kafka目录下观察队列中的数据

# 在kafka中查看binlog的变化
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic maxwell --from-beginning
# 在kafka中查看ddl的binlog的变化
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic maxwell_ddl --from-beginning

clickhouse

官方文档直接提供了使用apt-get安装的方法
但是官方并没有提供yum的安装方法,我这里使用了第三方提供的仓库,

# 安装yum-config-manager程序
yum install yum-utils
# centos 6 的仓库
yum-config-manager --add-repo http://repo.red-soft.biz/repos/clickhouse/repo/clickhouse-el6.repo
# centos 7 的仓库
yum-config-manager --add-repo http://repo.red-soft.biz/repos/clickhouse/repo/clickhouse-el7.repo
# 安装软件包
yum install clickhouse-server clickhouse-client clickhouse-server-common clickhouse-compressor

修改配置文件vim /etc/clickhouse-server/config.xml

...
<!-- 修改监听IP -->
<listen_host>::</listen_host>
   <listen_host>0.0.0.0</listen_host>

...

   <!-- 修改数据路径 -->
   <path>/data/clickhouse/</path>
   <tmp_path>/data/clickhouse/tmp/</tmp_path>
...

启动服务

service clickhouse-server start

进入clickhouse

clickhouse-client -mn

创建

CREATE TABLE data_record.row_record ( 
    r_date Date,  
    database String,  
    table String,  
    type String,  
    ts Int64,  
    xid Int64,  
    position String,  
    gtid Nullable(String),  
    server_id Int64,  
    thread_id Int64,  
    data String,  
    old String) ENGINE = MergeTree(r_date, (r_date, database, table), 8192)

同步数据

使用Python从kafka导入数据到clickhouse

下面是一个简单的例子

# -*- coding: utf-8 -*-
from confluent_kafka import Consumer, KafkaError
import clickhouse_driver
import logging
import json
import datetime


class ChWriter(object):
    def __init__(self, setting):
        self.conn = clickhouse_driver.Client(**setting)

    def ch_insert(self, sql, dicts):
        """插入数据"""
        self.conn.execute(sql, dicts)


def prefunc(data):
    data['r_date'] = datetime.datetime.fromtimestamp(data['ts']).date()
    print('timestamp:', data['ts'], '\n', 'date:', data['r_date'])
    data['gtid'] = None
    data['data'] = str(data['data'])
    if 'position' not in data: data['position'] = ''
    if 'server_id' not in data: data['server_id'] = 0
    if 'thread_id' not in data: data['thread_id'] = 0
    if data['type'] == 'update':
        data['old'] = str(data['old'])
    else:
        data['old'] = ''
    return data


if __name__ == "__main__":
    # logging.basicConfig(filename='example.log', level=logging.DEBUG)
    logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

    ch_setting = {
        'host': '10.10.4.103',
        'port': 9000,
        'user': 'default',
        'password': '',
    }

    chw = ChWriter(ch_setting)
    sql = "INSERT INTO `data_record`.`row_record`(r_date,database,table,type,ts,xid,position,gtid,server_id,thread_id,data,old) VALUES"

    c = Consumer({'bootstrap.servers': '10.10.4.103:9093', 'group.id': '0',
                  'default.topic.config': {'auto.offset.reset': 'smallest'}})
    c.subscribe(['maxwell'])
    running = True
    while running:
        msg = c.poll()
        if not msg.error():
            r_data = msg.value().decode('utf-8')
            print('Received message: %s' % r_data)
            data = json.loads(r_data)
            prefunc(data)
            chw.ch_insert(sql, [data])
        elif msg.error().code() != KafkaError._PARTITION_EOF:
            print(msg.error())
            running = False
    c.close()