博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Kafka connect介绍、部署及开发
阅读量:6758 次
发布时间:2019-06-26

本文共 8889 字,大约阅读时间需要 29 分钟。

  hot3.png

Kafak connect 简介

Kafaka connect 是一种用于在Kafka和其他系统之间可扩展的、可靠的流式传输数据的工具。它使得能够快速定义将大量数据集合移入和移出Kafka的连接器变得简单。Kafka Connect可以从数据库或应用程序服务器收集数据到Kafka topic,使数据可用于低延迟的流处理。导出作业可以将数据从Kafka topic传输到二次存储和查询系统,或者传递到批处理系统以进行离线分析。

Kafaka connect 概念

Kafaka connect 有几个重要的概念:

  • Source:负责导入数据到kafka
  • Sink:负责从kafka导出数据
  • Connectors :通过管理任务来协调数据流的高级抽象
  • Tasks:数据写入kafk和从kafka中读出的具体实现
  • Workers:运行connectors和tasks的进程
  • Converters:kafka connect和其他存储系统直接发送或者接受数据之间转换数据
  • Transforms:一种轻量级数据调整的工具

Kafka connect 工作模式

Kafka connect 有两种工作模式

  • standalone:在standalone模式中,所有的worker都在一个独立的进程中完成
  • distributed:distributed模式具有高扩展性,以及提供自动容错机制。你可以使用一个group.ip来启动很多worker进程,在有效的worker进程中它们会自动的去协调执行connector和task,如果你新加了一个worker或者挂了一个worker,其他的worker会检测到然后在重新分配connector和task。
    在分布式模式下通过rest api管理connector
  • GET /connectors – 返回所有正在运行的connector名。
  • POST /connectors – 新建一个connector; 请求体必须是json格式并且需要包含name字段和config字段,name是connector的名字,config是json格式,必须包含你的connector的配置信息。
  • GET /connectors/{name} – 获取指定connetor的信息。
  • GET /connectors/{name}/config – 获取指定connector的配置信息。
  • PUT /connectors/{name}/config – 更新指定connector的配置信息。
  • GET /connectors/{name}/status – 获取指定connector的状态,包括它是否在运行、停止、或者失败,如果发生错误,还会列出错误的具体信息。
  • GET /connectors/{name}/tasks – 获取指定connector正在运行的task。
  • GET /connectors/{name}/tasks/{taskid}/status – 获取指定connector的task的状态信息。
  • PUT /connectors/{name}/pause – 暂停connector和它的task,停止数据处理知道它被恢复。
  • PUT /connectors/{name}/resume – 恢复一个被暂停的connector。
  • POST /connectors/{name}/restart – 重启一个connector,尤其是在一个connector运行失败的情况下比较常用
  • POST /connectors/{name}/tasks/{taskId}/restart – 重启一个task,一般是因为它运行失败才这样做。
  • DELETE /connectors/{name} – 删除一个connector,停止它的所有task并删除配置。

Kafka connect 快速启动

Kafka connect 只是Apache Kafka提出的一种kafka数据流处理的框架,目前有很多开源的、优秀的实现,比较著名的是Confluent平台,支持很多Kafka connect的实现,例如Elasticsearch(Sink)、HDFS(Sink)、JDBC等等。下面主要介绍Confluent平台中kafka-connect-elasticsearch的使用。首先在上下载confluent-3.3.0。想运行kafka-connect-elasticsearch的前提是提供kafka服务以及ES服务。

standalone模式

1、首先更改配置connect-standalone.properties

#broker列表bootstrap.servers=10.120.241.1:9200key.converter=org.apache.kafka.connect.json.JsonConvertervalue.converter=org.apache.kafka.connect.json.JsonConverter#是否需要schemas进行转码,我们使用的是json数据,所以设置成falsekey.converter.schemas.enable=falsevalue.converter.schemas.enable=falseinternal.key.converter=org.apache.kafka.connect.json.JsonConverterinternal.value.converter=org.apache.kafka.connect.json.JsonConverterinternal.key.converter.schemas.enable=falseinternal.value.converter.schemas.enable=falseoffset.storage.file.filename=/tmp/connect.offsetsoffset.flush.interval.ms=10000# Note: symlinks will be followed to discover dependencies or plugins.# Examples: # plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,#plugin.path=

2、更改配置quickstart-elasticsearch.properties

name=elasticsearch-sinkconnector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnectortasks.max=1topics=estest1012key.ignore=trueschema.ignore=trueconnection.url=http://10.120.241.194:9200type.name=kafka-connect

注:需要配置schema.ignore=true,如果不配置会抛异常(由于使用的json数据)

3、启动kafka-connect-elasticsearch

./bin/connect-standalone ./etc/kafka/connect-standalone.properties  ./etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties

4、测试效果

通过kafka-console-producer.sh发一条json格式的消息,然后查询es索引 输入图片说明

distribute模式

1、修改配置connect-distributed.properties

# broker列表bootstrap.servers=10.120.241.1:9200# 同一集群中group.id需要配置一致,且不能和别的消费者同名group.id=connect-cluster# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will# need to configure these based on the format they want their data in when loaded from or stored into Kafkakey.converter=org.apache.kafka.connect.json.JsonConvertervalue.converter=org.apache.kafka.connect.json.JsonConverter# 使用json数据同样配置成falsekey.converter.schemas.enable=falsevalue.converter.schemas.enable=false····

2、 手动创建集群模式所必须的kafka的几个topic

# config.storage.topic=connect-configs$ bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-configs --replication-factor 3 --partitions 1 --config cleanup.policy=compact# offset.storage.topic=connect-offsets$ bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-offsets --replication-factor 3 --partitions 50 --config cleanup.policy=compact# status.storage.topic=connect-status$ $ bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-status --replication-factor 3 --partitions 10 --config cleanup.policy=compact
  • config.storage.topic:topic用于存储connector和任务配置;注意,这应该是一个单个的partition,多副本的topic
  • offset.storage.topic:用于存储offsets;这个topic应该配置多个partition和副本。
  • status.storage.topic:用于存储状态;这个topic 可以有多个partitions和副本

3、 启动worker

./bin/connect-distributed ./etc/kafka/connect-distributed.properties

4、使用restful启动connect

curl 'http://localhost:8083/connectors' -X POST -i -H "Content-Type:application/json" -d       '{ "name":"elasticsearch-sink",         "config":{"connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",                  "tasks.max":10,                  "topics":"estest1012",                  "key.ignore":true,                  "schema.ignore":true,                  "connection.url":"http://10.120.241.194:9200",                  "type.name":"kafka-connect"}      }'

5、 查看所有connnect,以及状态

输入图片说明输入图片说明

6、 配置日志
默认情况下日志只在控制台输出,如需要保存文件需要修改配置connect-log4j.properties,例如下:

log4j.rootLogger=INFO, stdout, stdfilelog4j.appender.stdout=org.apache.log4j.ConsoleAppenderlog4j.appender.stdout.layout=org.apache.log4j.PatternLayoutlog4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%nlog4j.appender.stdfile=org.apache.log4j.DailyRollingFileAppenderlog4j.appender.stdfile.DatePattern='.'yyyy-MM-dd-HHlog4j.appender.stdfile.File=${kafka.logs.dir}/stdout.loglog4j.appender.stdfile.layout=org.apache.log4j.PatternLayoutlog4j.appender.stdfile.layout.ConversionPattern=[%d] %p %m (%c)%nlog4j.logger.org.apache.zookeeper=ERRORlog4j.logger.org.I0Itec.zkclient=ERRORlog4j.logger.org.reflections=ERROR

Kafak connect 开发

本文将开发一个kafka-connect示例,主要的功能是将kafka中的消息持久化到文件中。

  • 新建maven工程,并添加connect-api依赖
org.apache.kafka
connect-api
${kafka.version}
  • 主要是实现两个类
public class NeteaseFileSinkConnector extends SinkConnector{	/**	 * 配置项	 */	public static final String FILE_CONFIG = "file";	/**	 * 配置校验	 */	private static final ConfigDef CONFIG_DEF = new ConfigDef()	        .define(FILE_CONFIG, Type.STRING, null, Importance.HIGH, "Destination filename. If not specified, the standard output will be used");	private String filename;		@Override	public ConfigDef config() {		// TODO Auto-generated method stub		return CONFIG_DEF;	}	/**	 * 从配置文件中初始化配置	 */	@Override	public void start(Map
props) { // TODO Auto-generated method stub filename = props.get(FILE_CONFIG); } @Override public void stop() { // TODO Auto-generated method stub } /** * 返回执行持久化的任务类 */ @Override public Class
taskClass() { // TODO Auto-generated method stub return FileStreamSinkTask.class; } /** * 返回配置 */ @Override public List
> taskConfigs(int maxTasks) { // TODO Auto-generated method stub ArrayList
> configs = new ArrayList
>(); for (int i = 0; i < maxTasks; i++) { Map
config = new HashMap
(); if (filename != null) config.put(FILE_CONFIG, filename); configs.add(config); } return configs; } @Override public String version() { // TODO Auto-generated method stub return AppInfoParser.getVersion(); }}/** * 持久化任务类 * @author weifu **/public class FileStreamSinkTask extends SinkTask { private String filename; private PrintStream outputStream; public String version() { // TODO Auto-generated method stub return new NeteaseFileSinkConnector().version(); } @Override public void start(Map
props) { // TODO Auto-generated method stub filename = props.get(NeteaseFileSinkConnector.FILE_CONFIG); if (filename == null) { outputStream = System.out; } else { try { outputStream = new PrintStream(new FileOutputStream(filename, true), false, StandardCharsets.UTF_8.name()); } catch (FileNotFoundException | UnsupportedEncodingException e) { throw new ConnectException("Couldn't find or create file for FileStreamSinkTask", e); } } } /** * 持久化具体过程 */ @Override public void put(Collection
records) { // TODO Auto-generated method stub for(SinkRecord record : records){ outputStream.println("netease file connect: "); outputStream.println(record.value()); } } @Override public void flush(Map
offsets) { outputStream.flush(); } /** * 关闭文件 */ @Override public void stop() { // TODO Auto-generated method stub if(outputStream != null && outputStream != System.out){ outputStream.close(); } }}
  • 打包 打包需要注意的是:
    A、 不能把Kafka Connect API打包在内。
    B、 把所依赖的jar包统一放到服务器的某个路径下(需要在配置文件中添加到classpath),推荐在..\confluent-3.3.0\share\java目录下建立一个文件夹(以kafka-connect-….开头命名),这样confluent会自动把该路径加载的classpath中。
  • 在分布式模式下添加该connect
  • 如下图消费两条消息后可以看到文件test.sink.txt已经有内容了:
    输入图片说明

转载于:https://my.oschina.net/hnrpf/blog/1555915

你可能感兴趣的文章
position:fixed失效情况
查看>>
丢了好几天没写,只因在做个小项目吗
查看>>
SSM-Spring-13:Spring中RegexpMethodPointcutAdvisor正则方法切入点顾问
查看>>
C#目录:藏锋
查看>>
如何在代码里打印程序所占用的内存
查看>>
大道至简
查看>>
实验5 Spark SQL编程初级实践
查看>>
SVG图案
查看>>
java 基础 --- volatile
查看>>
poj3158
查看>>
ubuntu 安装kafka
查看>>
学习基础知识真的枯燥
查看>>
航院1009: FatMouse’s Trade
查看>>
IntelliJ Idea 2017 免费激活方法
查看>>
redis基本配置和相关设置
查看>>
「小程序JAVA实战」小程序开发注册用户的接口(33)
查看>>
C#键盘事件处理父窗体子窗体
查看>>
实验六
查看>>
《现代操作系统》学习笔记之存储管理之地址空间
查看>>
ASP.NET MVC2 in Action 读书笔记 [3]
查看>>