Skip to main content
Skip to main content

Flink Connector

ClickHouse Supported

This is the official Apache Flink Sink Connector supported by ClickHouse. It is built using Flink's AsyncSinkBase and the official ClickHouse java client.

The connector supports Apache Flink's DataStream API. Table API support is planned for a future release.

Requirements

  • Java 11+ (for Flink 1.17+) or 17+ (for Flink 2.0+)
  • Apache Flink 1.17+

The connector is split into two artifacts to support both Flink 1.17+ and Flink 2.0+. Choose the artifact that matches your desired Flink version:

Flink VersionArtifactClickHouse Java Client VersionRequired Java
latestflink-connector-clickhouse-2.0.00.9.5Java 17+
2.0.1flink-connector-clickhouse-2.0.00.9.5Java 17+
2.0.0flink-connector-clickhouse-2.0.00.9.5Java 17+
1.20.2flink-connector-clickhouse-1.170.9.5Java 11+
1.19.3flink-connector-clickhouse-1.170.9.5Java 11+
1.18.1flink-connector-clickhouse-1.170.9.5Java 11+
1.17.2flink-connector-clickhouse-1.170.9.5Java 11+

_Note: the connector has not been tested against Flink versions earlier than 1.17.2

Installation & Setup

Import as a Dependency

<dependency>
    <groupId>com.clickhouse.flink</groupId>
    <artifactId>flink-connector-clickhouse-2.0.0</artifactId>
    <version>{{ stable_version }}</version>
    <classifier>all</classifier>
</dependency>
<dependency>
    <groupId>com.clickhouse.flink</groupId>
    <artifactId>flink-connector-clickhouse-1.17</artifactId>
    <version>{{ stable_version }}</version>
    <classifier>all</classifier>
</dependency>

Download the binary

The name pattern of the binary JAR is:

flink-connector-clickhouse-${flink_version}-${stable_version}-all.jar

where:

You can find all available released JAR files in the Maven Central Repository.

Using the DataStream API

Snippet

Let's say you want to insert raw CSV data into ClickHouse:

public static void main(String[] args) {
    // Configure ClickHouseClient
    ClickHouseClientConfig clickHouseClientConfig = new ClickHouseClientConfig(url, username, password, database, tableName);

    // Create an ElementConverter
    ElementConverter<String, ClickHousePayload> convertorString = new ClickHouseConvertor<>(String.class);

    // Create the sink and set the format using `setClickHouseFormat`
    ClickHouseAsyncSink<String> csvSink = new ClickHouseAsyncSink<>(
            convertorString,
            MAX_BATCH_SIZE,
            MAX_IN_FLIGHT_REQUESTS,
            MAX_BUFFERED_REQUESTS,
            MAX_BATCH_SIZE_IN_BYTES,
            MAX_TIME_IN_BUFFER_MS,
            MAX_RECORD_SIZE_IN_BYTES,
            clickHouseClientConfig
    );

    csvSink.setClickHouseFormat(ClickHouseFormat.CSV);

    // Finally, connect your DataStream to the sink.
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    Path csvFilePath = new Path(fileFullName);
    FileSource<String> csvSource = FileSource
            .forRecordStreamFormat(new TextLineInputFormat(), csvFilePath)
            .build();

    env.fromSource(
            csvSource,
            WatermarkStrategy.noWatermarks(),
            "GzipCsvSource"
    ).sinkTo(csvSink);
}

More examples and snippets can be found in our tests:

Quick Start Example

We have created maven-based example for an easy start with the ClickHouse Sink:

For more detailed instructions, see the Example Guide

DataStream API Connection Options

Clickhouse Client Options

ParametersDescriptionDefault ValueRequired
urlFully qualified Clickhouse URLN/AYes
usernameClickHouse database usernameN/AYes
passwordClickHouse database passwordN/AYes
databaseClickHouse database nameN/AYes
tableClickHouse table nameN/AYes

Additional options can be passed to the client as a Map<String, String>. All available client options are listed in ClientConfigProperties.java. For example:

Map<String, String> additionalProperties = Map.of(
    ClientConfigProperties.CA_CERTIFICATE.getKey(), "<my_CA_cert>",
    ClientConfigProperties.SSL_CERTIFICATE.getKey(), "<my_SSL_cert>",
    ClientConfigProperties.CLIENT_NETWORK_BUFFER_SIZE.getKey(), "30000",
    ClientConfigProperties.HTTP_MAX_OPEN_CONNECTIONS.getKey(), "5"
);

ClickHouseClientConfig clickHouseClientConfig = new ClickHouseClientConfig(
    url,
    username,
    password,
    database,
    tableName,
    additionalProperties,
    Map.of(), // serverSettings
    false // enableJsonSupportAsString
);

Sink Options

The following options come directly from Flink's AsyncSinkBase:

ParametersDescriptionDefault ValueRequired
maxBatchSizeMaximum number of records inserted in a single batchN/AYes
maxInFlightRequestsThe maximum number of in flight requests allowed before the sink applies backpressureN/AYes
maxBufferedRequestsThe maximum number of records that may be buffered in the sink before backpressure is appliedN/AYes
maxBatchSizeInBytesThe maximum size (in bytes) a batch may become. All batches sent will be smaller than or equal to this sizeN/AYes
maxTimeInBufferMSThe maximum time a record may stay in the sink before being flushedN/AYes
maxRecordSizeInBytesThe maximum record size that the sink will accept, records larger than this will be automatically rejectedN/AYes

Supported data types

The table below provides a quick reference for converting data types when inserting from Flink into ClickHouse.

Java TypeClickHouse TypeSupportedSerialize Method
byte/ByteInt8DataWriter.writeInt8
short/ShortInt16DataWriter.writeInt16
int/IntegerInt32DataWriter.writeInt32
long/LongInt64DataWriter.writeInt64
BigIntegerInt128DataWriter.writeInt128
BigIntegerInt256DataWriter.writeInt256
short/ShortUInt8DataWriter.writeUInt8
int/IntegerUInt8DataWriter.writeUInt8
int/IntegerUInt16DataWriter.writeUInt16
long/LongUInt32DataWriter.writeUInt32
long/LongUInt64DataWriter.writeUInt64
BigIntegerUInt64DataWriter.writeUInt64
BigIntegerUInt128DataWriter.writeUInt128
BigIntegerUInt256DataWriter.writeUInt256
BigDecimalDecimalDataWriter.writeDecimal
BigDecimalDecimal32DataWriter.writeDecimal
BigDecimalDecimal64DataWriter.writeDecimal
BigDecimalDecimal128DataWriter.writeDecimal
BigDecimalDecimal256DataWriter.writeDecimal
float/FloatFloatDataWriter.writeFloat32
double/DoubleDoubleDataWriter.writeFloat64
boolean/BooleanBooleanDataWriter.writeBoolean
StringStringDataWriter.writeString
StringFixedStringDataWriter.writeFixedString
LocalDateDateDataWriter.writeDate
LocalDateDate32DataWriter.writeDate32
LocalDateTimeDateTimeDataWriter.writeDateTime
ZonedDateTimeDateTimeDataWriter.writeDateTime
LocalDateTimeDateTime64DataWriter.writeDateTime64
ZonedDateTimeDateTime64DataWriter.writeDateTime64
int/IntegerTimeN/A
long/LongTime64N/A
byte/ByteEnum8DataWriter.writeInt8
int/IntegerEnum16DataWriter.writeInt16
java.util.UUIDUUIDDataWriter.writeIntUUID
StringJSONDataWriter.writeJSON
Array<Type>Array<Type>DataWriter.writeArray
Map<K,V>Map<K,V>DataWriter.writeMap
Tuple<Type,..>Tuple<T1,T2,..>DataWriter.writeTuple
ObjectVariantN/A

Notes:

  • A ZoneId must be provided when performing date operations.
  • Precision and scale must be provided when performing decimal operations.
  • In order for ClickHouse to parse a Java String as JSON, you need to enable enableJsonSupportAsString in ClickHouseClientConfig.

Metrics

The connector exposes the following additional metrics on top of Flink's existing metrics:

MetricDescriptionTypeStatus
numBytesSendTotal number of bytes sent to ClickHouseCounter
numRecordSendTotal number of records sent to ClickHouseCounter
numRequestSubmittedTotal number of requests sent (actual number of flushes performed)Counter
numOfDroppedBatchesTotal number of batches dropped due to non-retryable failuresCounter
numOfDroppedRecordsTotal number of records dropped due to non-retryable failuresCounter
totalBatchRetriesTotal number of batch retries due to retryable failuresCounter
writeLatencyHistogramHistogram of successful write latency distribution (ms)Histogram
writeFailureLatencyHistogramHistogram of failed write latency distribution (ms)Histogram
triggeredByMaxBatchSizeCounterTotal number of flushes triggered by reaching maxBatchSizeCounter
triggeredByMaxBatchSizeInBytesCounterTotal number of flushes triggered by reaching maxBatchSizeInBytesCounter
triggeredByMaxTimeInBufferMSCounterTotal number of flushes triggered by reaching maxTimeInBufferMSCounter
actualRecordsPerBatchHistogram of actual batch size distributionHistogram
actualBytesPerBatchHistogram of actual bytes per batch distributionHistogram

Limitations

  • The sink currently provides an at-least-once delivery guarantee. Work toward exactly-once semantics is being tracked here.
  • The sink does not yet support a dead-letter queue (DLQ) for buffering unprocessable messages. In the meantime, the connector will stop processing at the first row it is unable to handle. This feature is being tracked here.
  • The sink does not yet support creation via Flink's Table API or Flink SQL. This feature is being tracked here.

ClickHouse Version Compatibility and Security

  • The connector is tested against a range of recent ClickHouse versions, including latest and head, via a daily CI workflow. The tested versions are updated periodically as new ClickHouse releases become active. See here for the versions the connector is tested against daily.
  • See the ClickHouse security policy for known security vulnerabilities and how to report a vulnerability.
  • We recommend upgrading the connector continuously to not miss security fixes and new improvements.
  • If you have an issue with migration, please create a GitHub issue and we will respond!

Contributing and Support

If you'd like to contribute to the project or report any issues, we welcome your input! Visit our GitHub repository to open an issue, suggest improvements, or submit a pull request.

Contributions are welcome! Please check the contribution guide in the repository before starting. Thank you for helping improve the ClickHouse Flink connector!