The intent of this connector is to sink data from Apache Flink systems to arbitrary HTTP endpoints.


Current state: Under Discussion

Discussion thread: here (<- link to

JIRA: here (<- link to

Apache Flink supports many sinks–whether it be to Apache Kafka, Filesystem, Kinesis Data Streams, etc. However, for many users, their unique sinks are not implemented or available by default in Apache Flink's connector ecosystem. Sinking to an arbitrary endpoint can be particularly useful for sending real-time data to a REST-enabled client such as Amazon Lambda. With the success of Stateful Functions and the idea of allowing flexibility for users, we propose to create a sink that supports HTTP endpoints with operations such as POST, GET, PUT, PATCH, and DELETE.

Public Interfaces

Public Interfaces:

A user implementing an HTTP Sink will create an instance of the Sink by defining a new HTTPSink<> , providing an endpoint URL (String) and a method (POST, PUT, GET, etc).

HTTP Sink Constructor:
    public HttpSink(String endpointURL, String method)

The underlying implementation uses FLIP-171 (AsyncSink) to deliver batches of messages to HTTP Endpoints.

Example with either self-generating source or Kinesis Data Stream source that submits data to an Amazon Lambda endpoint

import com.fasterxml.jackson.databind.ser.std.ByteArraySerializer;

import javassist.bytecode.ByteArray;

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.http.HttpSink;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;

import events.MyEvent;

import java.util.Properties;

public class HttpSinkTest {

    private static DataStream<byte []> createSourceFromStaticConfig(
            StreamExecutionEnvironment env, String region, String inputStreamName) {

        Properties inputProperties = new Properties();
        inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
        inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
        inputProperties.setProperty(ConsumerConfigConstants.SHARD_USE_ADAPTIVE_READS, "true");
        return env.addSource(
                        new FlinkKinesisConsumer<byte []>(
                                inputStreamName, new DeserializationSchema<byte[]>() {
                            public byte[] deserialize(byte[] message) throws IOException {
                                return message;

                            public boolean isEndOfStream(byte[] nextElement) {
                                return false;

                            public TypeInformation<byte[]> getProducedType() {
                                return TypeInformation.of(byte[].class);
                        }, inputProperties))

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        boolean isSelfGenerating =
                Boolean.parseBoolean(System.getProperty("selfGenerating", "false"));
        int numRecordsPerSecond =
                Integer.parseInt(System.getProperty("numRecordsPerSecond", "100"));

        DataStream<byte[]> sampleEvents = null;

        if (isSelfGenerating) {
            sampleEvents =
                            new SourceFunction<byte []>() {
                                public void run(SourceContext<byte []> sourceContext)
                                        throws Exception {
                                    while (true) {

                                        // create X number of records per second
                                        MyEvent me = new MyEvent();
                                        int x = numRecordsPerSecond;
                                        for (int i = 0; i < x; i++) {

                                public void cancel() {}
        } else {
            sampleEvents = createSourceFromStaticConfig(env, "us-east-2", "input-stream");

                new HttpSink<>(



Currently, this only supports sink changes, and POST requests. In the future iterations we'd like to support HTTP Sources and GETS, PUTs, DELETEs, etc.

Proposed Changes

This FLIP proposes adding the above mentioned HTTP Connector which allows for sinking data to a POST-accepting endpoint. The connector will also handle retries through the Async Sink API according to standard HTTP Status Code retry mechanisms.

In the future, we'd like to add support for:

  • additional methods
  • better authentication mechanisms
  • Table API
  • Source Connector

Compatibility, Deprecation, and Migration Plan

  • No impact to existing connectors for users

Test Plan

This application can be tested either internally to the referenced HTTPSink Example above or by writing a REST endpoint which has the capability to return status codes. The integration tests rely on the destination as much as the Flink app.

Rejected Alternatives

  • StatefulFunctions require logic to be within the Lambda application rather than the Flink app for windowing.
  • AsyncIO functions calling REST endpoints could be used for the same purpose, however this sink can be much more readily ported to the Table and SQL API and allows for arbitrary endpoints.
