Kafak connect 简介
Kafaka connect 是一种用于在Kafka和其他系统之间可扩展的、可靠的流式传输数据的工具。它使得能够快速定义将大量数据集合移入和移出Kafka的连接器变得简单。Kafka Connect可以从数据库或应用程序服务器收集数据到Kafka topic,使数据可用于低延迟的流处理。导出作业可以将数据从Kafka topic传输到二次存储和查询系统,或者传递到批处理系统以进行离线分析。
Kafaka connect 概念
Kafaka connect 有几个重要的概念:
- Source:负责导入数据到kafka
- Sink:负责从kafka导出数据
- Connectors :通过管理任务来协调数据流的高级抽象
- Tasks:数据写入kafk和从kafka中读出的具体实现
- Workers:运行connectors和tasks的进程
- Converters:kafka connect和其他存储系统直接发送或者接受数据之间转换数据
- Transforms:一种轻量级数据调整的工具
Kafka connect 工作模式
Kafka connect 有两种工作模式
- standalone:在standalone模式中,所有的worker都在一个独立的进程中完成
- distributed:distributed模式具有高扩展性,以及提供自动容错机制。你可以使用一个group.ip来启动很多worker进程,在有效的worker进程中它们会自动的去协调执行connector和task,如果你新加了一个worker或者挂了一个worker,其他的worker会检测到然后在重新分配connector和task。 在分布式模式下通过rest api管理connector
- GET /connectors – 返回所有正在运行的connector名。
- POST /connectors – 新建一个connector; 请求体必须是json格式并且需要包含name字段和config字段,name是connector的名字,config是json格式,必须包含你的connector的配置信息。
- GET /connectors/{name} – 获取指定connetor的信息。
- GET /connectors/{name}/config – 获取指定connector的配置信息。
- PUT /connectors/{name}/config – 更新指定connector的配置信息。
- GET /connectors/{name}/status – 获取指定connector的状态,包括它是否在运行、停止、或者失败,如果发生错误,还会列出错误的具体信息。
- GET /connectors/{name}/tasks – 获取指定connector正在运行的task。
- GET /connectors/{name}/tasks/{taskid}/status – 获取指定connector的task的状态信息。
- PUT /connectors/{name}/pause – 暂停connector和它的task,停止数据处理知道它被恢复。
- PUT /connectors/{name}/resume – 恢复一个被暂停的connector。
- POST /connectors/{name}/restart – 重启一个connector,尤其是在一个connector运行失败的情况下比较常用
- POST /connectors/{name}/tasks/{taskId}/restart – 重启一个task,一般是因为它运行失败才这样做。
- DELETE /connectors/{name} – 删除一个connector,停止它的所有task并删除配置。
Kafka connect 快速启动
Kafka connect 只是Apache Kafka提出的一种kafka数据流处理的框架,目前有很多开源的、优秀的实现,比较著名的是Confluent平台,支持很多Kafka connect的实现,例如Elasticsearch(Sink)、HDFS(Sink)、JDBC等等。下面主要介绍Confluent平台中kafka-connect-elasticsearch的使用。首先在上下载confluent-3.3.0。想运行kafka-connect-elasticsearch的前提是提供kafka服务以及ES服务。
standalone模式
1、首先更改配置connect-standalone.properties
#broker列表bootstrap.servers=10.120.241.1:9200key.converter=org.apache.kafka.connect.json.JsonConvertervalue.converter=org.apache.kafka.connect.json.JsonConverter#是否需要schemas进行转码,我们使用的是json数据,所以设置成falsekey.converter.schemas.enable=falsevalue.converter.schemas.enable=falseinternal.key.converter=org.apache.kafka.connect.json.JsonConverterinternal.value.converter=org.apache.kafka.connect.json.JsonConverterinternal.key.converter.schemas.enable=falseinternal.value.converter.schemas.enable=falseoffset.storage.file.filename=/tmp/connect.offsetsoffset.flush.interval.ms=10000# Note: symlinks will be followed to discover dependencies or plugins.# Examples: # plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,#plugin.path=
2、更改配置quickstart-elasticsearch.properties
name=elasticsearch-sinkconnector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnectortasks.max=1topics=estest1012key.ignore=trueschema.ignore=trueconnection.url=http://10.120.241.194:9200type.name=kafka-connect
注:需要配置schema.ignore=true,如果不配置会抛异常(由于使用的json数据)
3、启动kafka-connect-elasticsearch
./bin/connect-standalone ./etc/kafka/connect-standalone.properties ./etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties
4、测试效果
通过kafka-console-producer.sh发一条json格式的消息,然后查询es索引distribute模式
1、修改配置connect-distributed.properties
# broker列表bootstrap.servers=10.120.241.1:9200# 同一集群中group.id需要配置一致,且不能和别的消费者同名group.id=connect-cluster# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will# need to configure these based on the format they want their data in when loaded from or stored into Kafkakey.converter=org.apache.kafka.connect.json.JsonConvertervalue.converter=org.apache.kafka.connect.json.JsonConverter# 使用json数据同样配置成falsekey.converter.schemas.enable=falsevalue.converter.schemas.enable=false····
2、 手动创建集群模式所必须的kafka的几个topic
# config.storage.topic=connect-configs$ bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-configs --replication-factor 3 --partitions 1 --config cleanup.policy=compact# offset.storage.topic=connect-offsets$ bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-offsets --replication-factor 3 --partitions 50 --config cleanup.policy=compact# status.storage.topic=connect-status$ $ bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-status --replication-factor 3 --partitions 10 --config cleanup.policy=compact
- config.storage.topic:topic用于存储connector和任务配置;注意,这应该是一个单个的partition,多副本的topic
- offset.storage.topic:用于存储offsets;这个topic应该配置多个partition和副本。
- status.storage.topic:用于存储状态;这个topic 可以有多个partitions和副本
3、 启动worker
./bin/connect-distributed ./etc/kafka/connect-distributed.properties
4、使用restful启动connect
curl 'http://localhost:8083/connectors' -X POST -i -H "Content-Type:application/json" -d '{ "name":"elasticsearch-sink", "config":{"connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", "tasks.max":10, "topics":"estest1012", "key.ignore":true, "schema.ignore":true, "connection.url":"http://10.120.241.194:9200", "type.name":"kafka-connect"} }'
5、 查看所有connnect,以及状态
6、 配置日志 默认情况下日志只在控制台输出,如需要保存文件需要修改配置connect-log4j.properties,例如下:log4j.rootLogger=INFO, stdout, stdfilelog4j.appender.stdout=org.apache.log4j.ConsoleAppenderlog4j.appender.stdout.layout=org.apache.log4j.PatternLayoutlog4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%nlog4j.appender.stdfile=org.apache.log4j.DailyRollingFileAppenderlog4j.appender.stdfile.DatePattern='.'yyyy-MM-dd-HHlog4j.appender.stdfile.File=${kafka.logs.dir}/stdout.loglog4j.appender.stdfile.layout=org.apache.log4j.PatternLayoutlog4j.appender.stdfile.layout.ConversionPattern=[%d] %p %m (%c)%nlog4j.logger.org.apache.zookeeper=ERRORlog4j.logger.org.I0Itec.zkclient=ERRORlog4j.logger.org.reflections=ERROR
Kafak connect 开发
本文将开发一个kafka-connect示例,主要的功能是将kafka中的消息持久化到文件中。
- 新建maven工程,并添加connect-api依赖
org.apache.kafka connect-api ${kafka.version}
- 主要是实现两个类
public class NeteaseFileSinkConnector extends SinkConnector{ /** * 配置项 */ public static final String FILE_CONFIG = "file"; /** * 配置校验 */ private static final ConfigDef CONFIG_DEF = new ConfigDef() .define(FILE_CONFIG, Type.STRING, null, Importance.HIGH, "Destination filename. If not specified, the standard output will be used"); private String filename; @Override public ConfigDef config() { // TODO Auto-generated method stub return CONFIG_DEF; } /** * 从配置文件中初始化配置 */ @Override public void start(Mapprops) { // TODO Auto-generated method stub filename = props.get(FILE_CONFIG); } @Override public void stop() { // TODO Auto-generated method stub } /** * 返回执行持久化的任务类 */ @Override public Class taskClass() { // TODO Auto-generated method stub return FileStreamSinkTask.class; } /** * 返回配置 */ @Override public List
- 打包 打包需要注意的是: A、 不能把Kafka Connect API打包在内。 B、 把所依赖的jar包统一放到服务器的某个路径下(需要在配置文件中添加到classpath),推荐在..\confluent-3.3.0\share\java目录下建立一个文件夹(以kafka-connect-….开头命名),这样confluent会自动把该路径加载的classpath中。
- 在分布式模式下添加该connect
- 如下图消费两条消息后可以看到文件test.sink.txt已经有内容了: