You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

The intent of this connector is to sink data from Apache Flink systems to arbitrary HTTP endpoints.

Status

Current state: Under Discussion

Discussion thread: here (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)

JIRA: here (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Apache Flink supports many sinks–whether it be to Apache Kafka, Filesystem, Kinesis Data Streams, etc. However, for many users, their unique sinks are not implemented or available by default in Apache Flink's connector ecosystem. Sinking to an arbitrary endpoint can be particularly useful for sending real-time data to a REST-enabled client such as Amazon Lambda. With the success of Stateful Functions and the idea of allowing flexibility for users, we propose to create a sink that supports HTTP endpoints with operations such as POST, GET, PUT, PATCH, and DELETE.

Public Interfaces


Public Interfaces:

A user implementing an HTTP Sink will create an instance of the Sink by defining a new HTTPSink<> , providing an endpoint URL (String) and a method (POST, PUT, GET, etc).

HTTP Sink Constructor:

HTTPSink.java
    public HttpSink(String endpointURL, String method)

The underlying implementation uses FLIP-171 (AsyncSink) to deliver batches of messages to HTTP Endpoints.


Example with either self-generating source or Kinesis Data Stream source that submits data to an Amazon Lambda endpoint

import com.fasterxml.jackson.databind.ser.std.ByteArraySerializer;

import javassist.bytecode.ByteArray;

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.http.HttpSink;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;

import events.MyEvent;

import java.io.IOException;
import java.util.Properties;

public class HttpSinkTest {

    private static DataStream<byte []> createSourceFromStaticConfig(
            StreamExecutionEnvironment env, String region, String inputStreamName) {

        Properties inputProperties = new Properties();
        inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
        inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
        inputProperties.setProperty(ConsumerConfigConstants.SHARD_USE_ADAPTIVE_READS, "true");
        return env.addSource(
                        new FlinkKinesisConsumer<byte []>(
                                inputStreamName, new DeserializationSchema<byte[]>() {
                            @Override
                            public byte[] deserialize(byte[] message) throws IOException {
                                return message;
                            }

                            @Override
                            public boolean isEndOfStream(byte[] nextElement) {
                                return false;
                            }

                            @Override
                            public TypeInformation<byte[]> getProducedType() {
                                return TypeInformation.of(byte[].class);
                            }
                        }, inputProperties))
                .name("flink_kinesis_consumer_01");
    }

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        boolean isSelfGenerating =
                Boolean.parseBoolean(System.getProperty("selfGenerating", "false"));
        int numRecordsPerSecond =
                Integer.parseInt(System.getProperty("numRecordsPerSecond", "100"));

        DataStream<byte[]> sampleEvents = null;

        if (isSelfGenerating) {
            sampleEvents =
                    env.addSource(
                            new SourceFunction<byte []>() {
                                @Override
                                public void run(SourceContext<byte []> sourceContext)
                                        throws Exception {
                                    while (true) {

                                        // create X number of records per second
                                        MyEvent me = new MyEvent();
                                        int x = numRecordsPerSecond;
                                        for (int i = 0; i < x; i++) {
                                            sourceContext.collect(me.getSampleEvents().getBytes());
                                        }
                                        Thread.sleep(1000);
                                    }
                                }

                                @Override
                                public void cancel() {}
                            });
        } else {
            sampleEvents = createSourceFromStaticConfig(env, "us-east-2", "input-stream");
        }

        sampleEvents.sinkTo(
                new HttpSink<>(
                        "https://my-url.lambda-url.us-east-1.on.aws/",
                        "POST"));

        env.execute();
    }
}



Limitations

Currently, this only supports sink changes, and POST requests. In the future iterations we'd like to support HTTP Sources and GETS, PUTs, DELETEs, etc.

Proposed Changes

This FLIP proposes adding the above mentioned HTTP Connector which allows for sinking data to a POST-accepting endpoint. The connector will also handle retries through the Async Sink API according to standard HTTP Status Code retry mechanisms.


In the future, we'd like to add support for:

  • additional methods
  • better authentication mechanisms
  • Table API
  • SQL API
  • Source Connector

Compatibility, Deprecation, and Migration Plan

  • No impact to existing connectors for users

Test Plan

This application can be tested either internally to the referenced HTTPSink Example above or by writing a REST endpoint which has the capability to return status codes. The integration tests rely on the destination as much as the Flink app.

Rejected Alternatives

  • StatefulFunctions require logic to be within the Lambda application rather than the Flink app for windowing.
  • AsyncIO functions calling REST endpoints could be used for the same purpose, however this sink can be much more readily ported to the Table and SQL API and allows for arbitrary endpoints.
  • No labels