副标题[/!--empirenews.page--]
一、聊什么
为了满足本系列读者的需求,我先介绍一下Kafka在Apache Flink中的使用。所以本篇以一个简单的示例,向大家介绍在Apache Flink中如何使用Kafka。
二、Kafka 简介
Apache Kafka是一个分布式发布-订阅消息传递系统。 它最初由LinkedIn公司开发,LinkedIn于2010年贡献给了Apache基金会并成为顶级开源项目。Kafka用于构建实时数据管道和流式应用程序。它具有水平扩展性、容错性、极快的速度,目前也得到了广泛的应用。
Kafka不但是分布式消息系统而且也支持流式计算,所以在介绍Kafka在Apache Flink中的应用之前,先以一个Kafka的简单示例直观了解什么是Kafka。
1. 安装
本篇不是系统的,详尽的介绍Kafka,而是想让大家直观认识Kafka,以便在Apahe Flink中进行很好的应用,所以我们以最简单的方式安装Kafka。
(1) 下载二进制包:
- curl -L -O http://mirrors.shu.edu.cn/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz
(2) 解压安装
Kafka安装只需要将下载的tgz解压即可,如下:
- jincheng:kafka jincheng.sunjc$ tar -zxf kafka_2.11-2.1.0.tgz
- jincheng:kafka jincheng.sunjc$ cd kafka_2.11-2.1.0
- jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ ls
- LICENSE NOTICE bin config libs site-docs
其中bin包含了所有Kafka的管理命令,如接下来我们要启动的Kafka的Server。
(3) 启动Kafka Server
Kafka是一个发布订阅系统,消息订阅首先要有个服务存在。我们启动一个Kafka Server 实例。 Kafka需要使用ZooKeeper,要进行投产部署我们需要安装ZooKeeper集群,这不在本篇的介绍范围内,所以我们利用Kafka提供的脚本,安装一个只有一个节点的ZooKeeper实例。如下:
- jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/zookeeper-server-start.sh config/zookeeper.properties &
-
- [2019-01-13 09:06:19,985] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
- ....
- ....
- [2019-01-13 09:06:20,061] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)
启动之后,ZooKeeper会绑定2181端口(默认)。接下来我们启动Kafka Server,如下:
- jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-server-start.sh config/server.properties
- [2019-01-13 09:09:16,937] INFO Registered kafkakafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
- [2019-01-13 09:09:17,267] INFO starting (kafka.server.KafkaServer)
- [2019-01-13 09:09:17,267] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
- [2019-01-13 09:09:17,284] INFO [ZooKeeperClient] Initializing a new session to localhost:2181. (kafka.zookeeper.ZooKeeperClient)
- ...
- ...
- [2019-01-13 09:09:18,253] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
如果上面一切顺利,Kafka的安装就完成了。
2. 创建Topic
Kafka是消息订阅系统,首先创建可以被订阅的Topic,我们创建一个名为flink-tipic的Topic,在一个新的terminal中,执行如下命令:
- jincheng:kafka_2.11-2.1.0 jincheng.sunjc$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic flink-tipic
-
- Created topic "flink-tipic".
在Kafka Server的terminal中也会输出如下成功创建信息:
- ...
- [2019-01-13 09:13:31,156] INFO Created log for partition flink-tipic-0 in /tmp/kafka-logs with properties {compression.type -> producer, message.format.version -> 2.1-IV2, file.delete.delay.ms -> 60000, max.message.bytes -> 1000012, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, message.downconversion.enable -> true, min.insync.replicas -> 1, segment.jitter.ms -> 0, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, cleanup.policy -> [delete], flush.ms -> 9223372036854775807, segment.ms -> 604800000, segment.bytes -> 1073741824, retention.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760, flush.messages -> 9223372036854775807}. (kafka.log.LogManager)...
上面显示了flink-topic的基本属性配置,如消息压缩方式,消息格式,备份数量等等。
(编辑:青岛站长网)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|