安装

kafka

kafka的安装只需要下载解压启动即可

1
2
3
wget http://mirror.its.dal.ca/apache/kafka/1.0.0/kafka_2.11-1.0.0.tgz
tar -zxf kafka_2.11-1.0.0.tgz
cd kafka_2.11-1.0.0/

kafka需要依赖于zkzk可以直接使用kafka安装包里自带的zk

1
bin/zookeeper-server-start.sh config/zookeeper.properties &

但是在启动之前,我们可能需要先修改一下配置文件config/server.properties

一般需要修改的几个参数有

1
2
3
4
5
6
7
8
# kafka broker的id
broker.id=0
# kafka监听的地址
listeners=PLAINTEXT://:9092
# kafka的日志地址
log.dirs=/tmp/kafka-logs
# kafka使用的zk的地址
zookeeper.connect=localhost:2181

启动broker

1
bin/kafka-server-start.sh config/server.properties &

查看kafka是否启动了

1
2
3
4
$ jps
1240 QuorumPeerMain
1817 Jps
1518 Kafka

上面的QuorumPeerMain就是zkKafka就是我们刚才启动的broker

我们还可以在启动两个新broker,但是需要先复制配置文件并修改里面的broker.idlistenerslog.dirs值,使之不冲突

1
2
bin/kafka-server-start.sh config/server-1.properties &
bin/kafka-server-start.sh config/server-2.properties &

然后我们可以测试一些简单的操作

创建名为testtopic,只有一个分区,一个副本

1
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --partitons 1 --topic test

查看topic

1
bin/kafka-topics.sh --list --zookeeper 127.0.0.1:2181

查看test的分区和副本状态

1
bin/kafka-topics.sh --describe --zookeeper 127.0.0.1:2181 --topic test

使用生产者推送消息

1
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

然后打开另一个窗口,使用消费者从开始获取消息

1
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

Maxwell

关于Maxwell的相关资料可以直接查看官网

下载并解压maxwell

1
2
3
wget https://github.com/zendesk/maxwell/releases/download/v1.13.2/maxwell-1.13.2.tar.gz
tar -zxf maxwell-1.13.2.tar.gz
cd maxwell-1.13.2

在mysqlg中创建一个maxwell账户

1
2
GRANT ALL on maxwell.* to 'maxwell'@'%' identified by 'XXXXXX';
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%';

拷贝一份配置文件的模板config.properties.example

1
cp config.properties.example config.properties

然后编辑配置文件,并将ddl操作发送给单独的topic

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# tl;dr config
log_level=info
producer=kafka
kafka.bootstrap.servers=localhost:9092

# mysql login info
host=localhost
user=maxwell
password=maxwell

######### output format stuff ###############
# 记录binlog position(默认关闭)
output_binlog_position=true
# 记录gtid(默认关闭)
output_gtid_position=true
# 记录空值字段(默认开启)
output_nulls=true
# 记录server_id(默认关闭)
output_server_id=true
# 记录thread_id(默认关闭)
output_thread_id=true
# 记录原始的SQL语句,需要在mysql中打开参数"binlog_rows_query_log_events" must be enabled"(默认关闭)
output_row_query=true
# 记录commit和xid信息(默认开启)
output_commit_info=true
# 记录ddl操作
output_ddl=true

######### kafka stuff ###############
# binlog日志解析到的topic
kafka_topic=maxwell
# ddl操作的binlog日志解析到的topic,需要开启前面的output_ddl选项
ddl_kafka_topic=maxwell_ddl

maxwell创建topic

1
2
bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic maxwell --partitions 20 --replication-factor 1
bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic maxwell_ddl --partitions 1 --replication-factor 1

使用配置文件启动maxwell

1
bin/maxwell --config=config.properties &

现在我们可以修改数据库的数据,然后在kafka目录下观察队列中的数据

1
2
3
4
# 在kafka中查看binlog的变化
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic maxwell --from-beginning
# 在kafka中查看ddl的binlog的变化
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic maxwell_ddl --from-beginning

clickhouse

官方文档直接提供了使用apt-get安装的方法

但是官方并没有提供yum的安装方法,我这里使用了第三方提供的仓库,

1
2
3
4
5
6
7
8
# 安装yum-config-manager程序
yum install yum-utils
# centos 6 的仓库
yum-config-manager --add-repo http://repo.red-soft.biz/repos/clickhouse/repo/clickhouse-el6.repo
# centos 7 的仓库
yum-config-manager --add-repo http://repo.red-soft.biz/repos/clickhouse/repo/clickhouse-el7.repo
# 安装软件包
yum install clickhouse-server clickhouse-client clickhouse-server-common clickhouse-compressor

修改配置文件vim /etc/clickhouse-server/config.xml

1
2
3
4
5
6
7
8
9
10
11
...
<!-- 修改监听IP -->
<listen_host>::</listen_host>
<listen_host>0.0.0.0</listen_host>

...

<!-- 修改数据路径 -->
<path>/data/clickhouse/</path>
<tmp_path>/data/clickhouse/tmp/</tmp_path>
...

启动服务

1
service clickhouse-server start

进入clickhouse

1
clickhouse-client -mn

创建

1
2
3
4
5
6
7
8
9
10
11
12
13
CREATE TABLE data_record.row_record ( 
r_date Date,
database String,
table String,
type String,
ts Int64,
xid Int64,
position String,
gtid Nullable(String),
server_id Int64,
thread_id Int64,
data String,
old String) ENGINE = MergeTree(r_date, (r_date, database, table), 8192)

使用Python从kafka导入数据到clickhouse

下面是一个简单的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# -*- coding: utf-8 -*-
from confluent_kafka import Consumer, KafkaError
import clickhouse_driver
import logging
import json
import datetime


class ChWriter(object):
def __init__(self, setting):
self.conn = clickhouse_driver.Client(**setting)

def ch_insert(self, sql, dicts):
"""插入数据"""
self.conn.execute(sql, dicts)


def prefunc(data):
data['r_date'] = datetime.datetime.fromtimestamp(data['ts']).date()
print('timestamp:', data['ts'], '\n', 'date:', data['r_date'])
data['gtid'] = None
data['data'] = str(data['data'])
if 'position' not in data: data['position'] = ''
if 'server_id' not in data: data['server_id'] = 0
if 'thread_id' not in data: data['thread_id'] = 0
if data['type'] == 'update':
data['old'] = str(data['old'])
else:
data['old'] = ''
return data


if __name__ == "__main__":
# logging.basicConfig(filename='example.log', level=logging.DEBUG)
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

ch_setting = {
'host': '10.10.4.103',
'port': 9000,
'user': 'default',
'password': '',
}

chw = ChWriter(ch_setting)
sql = "INSERT INTO `data_record`.`row_record`(r_date,database,table,type,ts,xid,position,gtid,server_id,thread_id,data,old) VALUES"

c = Consumer({'bootstrap.servers': '10.10.4.103:9093', 'group.id': '0',
'default.topic.config': {'auto.offset.reset': 'smallest'}})
c.subscribe(['maxwell'])
running = True
while running:
msg = c.poll()
if not msg.error():
r_data = msg.value().decode('utf-8')
print('Received message: %s' % r_data)
data = json.loads(r_data)
prefunc(data)
chw.ch_insert(sql, [data])
elif msg.error().code() != KafkaError._PARTITION_EOF:
print(msg.error())
running = False
c.close()