# 使用Connect API创建自定义Kafka Connect

如果正在使用Apache Kafka,那么就有机会使用Kafka Connect连接器以进行数据的流入和流出。虽然可用的连接器越来越多,但是这些已有的仍有可能无法满足需求,这时就可以使用Kafka的Connect API创建自定义的连接器。这套API提供了一种简便的方法来创建容错的Kafka生产者或消费者,以将数据流入和流出Kafka。

本文将介绍Kafka Connect框架的基本概念和架构。然后分为四个步骤来逐步介绍Kafka Connect的开发。本文将主要集中在源端连接器上,但是涵盖的许多概念也将适用于接收端连接器,另外也会介绍相关的最佳实践。

# 什么是Kafka Connect?

Kafka Connect专门用于将数据流入/流出Kafka。从高层次上讲,连接器是一项管理任务及其配置的作业。在幕后,Kafka Connect可以创建容错的Kafka生产者和消费者,并跟踪他们已写入或读取的Kafka记录的偏移量。

除此之外,Kafka Connect还提供了许多强大的功能。它们可以轻松配置为将无法处理或无效的消息路由到死信队列,在源端将消息写入Kafka或接收端从Kafka消费消息之前应用单消息转换,并与Confluent模式注册表集成进行自动化的模式注册和管理,并将数据转换为Avro或JSON等类型。通过利用现有的连接器,开发者可以快速创建容错的数据管道,以可靠的方式将数据从外部源流式传输到Kafka主题,或者从Kafka主题流到外部接收端,而这些都只需配置即可无需写代码。

每个连接器实例都可以将其工作分解为多个任务,从而使复制数据的工作并行化并提供可伸缩性。连接器实例启动任务时,它将传递每个任务必须的配置属性,这些配置和该任务的状态以及生产和消费的记录的最新偏移量一起,都会被任务保存在主题外部。由于任务不存储任何状态,因此可以随时停止、启动或重启。新启动的任务仅需从Kafka取得最新的偏移量,然后继续运行即可。

Kafka Connect可以以独立或分布式模式运行。在独立模式下Kafka Connect在单个工作节点上运行,即在一个JVM进程中执行连接器及其任务。在分布式模式下,连接器及其任务在多个工作节点之间达到平衡。一般建议在分布式模式下运行Kafka Connect,因为独立模式没有容错能力。

要以分布式模式启动连接器,需要向Kafka Connect的REST API发送一个POST请求。该请求将触发Kafka Connect在多个节点自动调度连接器和任务的执行。在工作节点出现故障或被添加到组时,Kafka将自动协调以重新平衡这些连接器和任务。

# Kafka Connect入门

Kafka本身不包含连接器,连接器需要单独下载,启动这些连接器也很容易,只需要和配置参数一起,向Kafka Connect的REST API发送一个POST请求即可。

如果要集成的技术还没有Kafka Connect,本文将指导开发者逐步完成Kafka Connect的开发。实际上创建连接器只需实现几个Kafka Connect接口,而Kafka Connect框架负责其余的工作,因此开发者只需专注于实现特定集成的逻辑即可。

# Kafka Connect API

通过实现Kafka Connect API提供的若干接口和抽象类,就可以接入Kafka Connect框架。比如一个基本的源端连接器,将需要提供以下三类扩展:SourceConnectorSourceTaskAbstractConfig。这些共同定义了自定义Kafka Connect的配置和运行时行为,下面还会介绍如何启动并运行这个自定义的Kafka Connect。

# 步骤1:定义配置属性

启动连接器后,它们将处理一系列的属性,包括允许连接器及其任务与外部接收端或源端进行通信的属性、设置并行任务的最大数量、指定数据流对应的Kafka主题、自定义连接器运行所必须的参数等。

首先会将配置值作为String实例传给连接器,例如可以看Connector#start的方法签名:

public abstract class Connector implements Versioned {
[...]
	public abstract void start(Map<String, String> props);
[...]
}

启动时传给连接器之后,可以通过将其传给Kafka Connect API提供的AbstractConfig类实例,解析为更合适的类型。开发连接器的第一步是创建一个继承于AbstractConfig的类,该类可用于定义类型以及每个属性的默认值、验证方式、建议和文档。

假设正在开发用于处理来自云存储的数据流的源端连接器,在启动该连接器所需的配置属性中,可能希望包括用于生成记录的Kafka主题、要导入的对象的键前缀的白名单,下面是这个配置类的示例:

public class CloudStorageSourceConnectorConfig extends AbstractConfig {

    public CloudStorageSourceConnectorConfig(Map originals) {
        super(configDef(), originals);
    }

    protected static ConfigDef configDef() {
        return new ConfigDef()
                .define("bucket",
                        ConfigDef.Type.STRING,
                        ConfigDef.Importance.HIGH,
                        "Name of the bucket to import objects from")
                .define("prefix.whitelist",
                        ConfigDef.Type.LIST,
                        ConfigDef.Importance.HIGH,
                        "Whitelist of object key prefixes")
                .define("topic",
                        ConfigDef.Type.STRING,
                        ConfigDef.Importance.HIGH,
                        "Name of Kafka topic to produce to");
    }
}

注意在该示例中,将prefix.whitelist属性定义为List类型。当将原始值的映射传递给AbstractConfig类时,将根据配置定义将配置属性解析为适当的类型。结果就是以后从连接器的配置实例中获取prefix.whitelist值时,就是List类型了,即使该值最初是以逗号分隔的String形式传给连接器的,例如path/to/file/1,path/to/file/2,path/to/file/3这样的。

至少,每个配置定义都将需要一个配置名、配置值类型、重要性级别、记录配置属性的简短描述以及大多数情况下的默认值。但是还可以利用更高级的功能,例如定义配置组、在启动时调用的验证器、向用户建议配置值的推荐器以及指定配置顺序、对其他配置的依赖等。实际上,最佳实践是尽可能地包括验证器、推荐器、组和默认值,以确保用户在配置错误时能立即获得反馈,并可以轻松理解可用的配置项及其逻辑分组。

完成配置类后,就可以将注意力转向启动连接器,下面是CloudStorageSourceConnector类的start方法的示例实现:

public class CloudStorageSourceConnector extends SourceConnector {

    private CloudStorageSourceConnectorConfig connectorConfig;

    @Override
    public void start(Map<String, String> props) {
        this.connectorConfig = new CloudStorageConnectorConfig(props);
        this.configProps = Collections.unmodifiableMap(props);
    }

   [...]
}

连接器启动时,将创建自定义配置类的新实例,该实例为Kafka Connect框架提供配置定义。如果缺少任何必需的配置或提供的配置类型不正确,验证器将自动触发启动失败并显示相应的错误消息。

# 步骤2:将配置属性传递给任务

下一步是实现Connector#taskConfigs方法,该方法将返回一个映射列表,其中包含每个任务将用于将数据流传输到Kafka或从Kafka流出的配置属性:

public abstract class Connector implements Versioned {
[...]
	public abstract List<Map<String, String>> taskConfigs(int maxTasks);
[...]
}

该方法接受一个要并行运行的最大任务数的int值,其从启动时提供的tasks.max配置属性中提取。

taskConfigs返回的List中的每个映射对应于任务使用的配置属性。根据连接器正在执行的工作类型,虽然所有任务接收相同的配置属性可能有意义,但是也可能希望不同的任务实例获得不同的属性。例如假设要划分对象键前缀的数量,以在多个正在运行的任务实例之间平均传输数据。如果给定具有3个键前缀的白名单,可以为3个任务实例中的每个实例提供一个键前缀以导入对象。然后,每个任务可以专注于具有特定键前缀的对象的流数据,从而将工作拆分为并行任务。

实现taskConfig时要牢记一些注意事项。首先需要提供tasks.max配置属性,以使用户能够限制并行运行的任务数,它提供taskConfig所返回列表的大小上限。其次,返回列表的大小将决定启动多少任务。例如,对于数据库连接器,可能希望每个任务都从单个表中提取数据。如果数据库相对简单并且只有2个表,那么即使传递给taskConfigsmaxTasks值大于2,也可以返回一个大小为2的列表,而如果有6个表但maxTasks值为2,那么将需要每个任务从3个表中提取。

为了帮助执行此分组,Kafka Connect API提供了ConnectorUtils#groupPartitions工具方法,该方法将目标元素列表划分为所需的组数。同样,在这个云存储示例中,可以实现taskConfig获取对象键前缀的白名单,根据maxTasks的值或者前缀白名单的大小对列表进行划分,并返回一个配置列表,每个配置包含不同的对象键前缀,下面是一个示例实现:

@Override
    public List<Map<String, String>> taskConfigs(int maxTasks) {
        List prefixes = connectorConfig.getList(PREFIX_WHITELIST_CONFIG);
        int numGroups = Math.min(prefixes.size(), maxTasks);
        List<List> groupedPrefixes = ConnectorUtils.groupPartitions(prefixes, numGroups);
        List<Map<String, String>> taskConfigs = new ArrayList<>(groupedPrefixes.size());
        
        for (List taskPrefixes : groupedPrefixes) {
            Map<String, String> taskProps = new HashMap<>(configProps);
            taskProps.put(TASK_PREFIXES, String.join(",", taskPrefixes));
            taskConfigs.add(taskProps);
        }

        return taskConfigs;
    }

启动时,Kafka Connect框架会将taskConfigs返回的列表中的每个配置映射传递给每个任务。

连接器还需要实现其他方法,但是这些方法的实现相对简单。Connector#stop使开发者在连接器停止之前有机会关闭可能打开的所有资源。尽管完成所需的操作很简单,但重要的是Connector#stop不要长时间阻塞关闭过程。Connector#taskClass返回自定义任务的类名,Connector#config应返回自定义配置类中定义的ConfigDef,最后Connector#version必须返回连接器的版本。

# 步骤3:任务轮询

如同Connector类,Task包括startstopversion抽象方法。不过将数据流传输到Kafka的大多数逻辑将在poll方法中实现,由Kafka Connect框架针对每个任务不断调用该方法:

public abstract List poll() throws InterruptedException;

可以看到,poll方法返回SourceRecord实例列表。源记录主要用于存储Connect记录的头、键和值,但它也存储元数据,例如源分区和源偏移量。

源分区和源偏移量只是一个Map,可用于跟踪已经复制到Kafka的源数据的标记。在大多数情况下,源分区反映了任务配置,该配置使任务可以专注于导入特定的数据组。

例如,这个云存储源连接器根据对象键前缀的白名单导入对象。在Task#poll的实现中,导入的对象包装在包含源分区的SourceRecord中,该源分区是一个Map,包含了有关记录来源的各种信息,还可以存储任务进行对象导入的对象键前缀。SourceRecord实例还包含源偏移量,该偏移量用于标识从源导入的对象,源偏移量可以包含有关存储桶中对象的标识信​​息:完整的对象键名称、版本ID、上次修改的时间戳以及其他此类字段,以后任务可以使用源分区和偏移量来跟踪已经导入的对象。

Kafka Connect框架会自动将偏移量提交给offset.storage.topic属性配置的主题。重启Connect工作节点或任务时,它可以使用任务的SourceTaskContext来获取OffsetStorageReader,该对象具有offset方法,可以用于获取给定源分区记录的最新偏移量。然后,任务可以使用偏移量和分区信息来继续从源中导入数据,而不会重复或忽略记录。

# 步骤4:创建监控线程

Kafka Connect REST API包含一个用于修改连接器配置的端点。提交PUT请求,连接器和任务将在可用的工作进程之间再平衡,以确保配置更改不会导致节点之间的工作负载不均衡。

不过开发者可能希望将连接器设计为能够接收源中的更改,获取新配置并再平衡可用工作节点之间的工作负载,而不必手动向Kafka Connect API提交请求。监控可能需要重新配置并自动重新配置以适应这些更改的外部源更改的连接器称为动态连接器。

为了使连接器动态化,需要创建一个单独的线程来监控变更,并在连接器启动时创建一个新的监控线程实例:

public class MySourceConnector extends SourceConnector {

    private MonitoringThread monitoringThread;

    @Override
    public void start(Map<String, String> props) {
        [...]
        monitoringThread = new MonitoringThread(context);
    }
    [...]
}

源连接器还需要将其ConnectorContext传递给监控线程。如果监控检测到外部源中的更改,需要重新配置,它将调用ConnectorContext#requestTaskReconfiguration触发Kafka Connect框架以更新其任务配置。

由于更新的配置通常意味着需要更改输入分区,因此Kafka Connect框架还可以再平衡可用工作节点之间的工作负载。启动时,源连接器可以将轮询间隔属性传递给监控线程,该监控线程可以在CountDownLatch上设置等待时间。下面是一个示例实现,它会等待一定的毫秒数,然后再次向外部源查询更改:

public class MonitoringThread extends Thread {

    [...]
    private final Long pollInterval;

    public MonitoringThread(ConnectorContext context, Long pollInterval) {
        [...]
        this.pollInterval = pollInterval;
    }

    @Override
    public void run() {
        while (shutdownLatch.getCount() > 0) {
            if (sourceHasChanged()) {
                context.requestTaskReconfiguration();
            }

            try {
                shutdownLatch.await(pollInterval, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                log.warn("MonitoringThread interrupted: ", e);
            }
        }
    }
    [...]
}

实现了一个监控线程后,当外部源发生更改时可以触发任务重新配置,这样就有了一个动态的Kafka Connect。

至此,虽然要让这个连接器完全正常运行还需要一些其他的工作,但是创建一个动态的源端连接器的主要内容就这么多了。

18624049226

最后更新时间:: 7/22/2020, 2:42:40 PM