You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Next »

The intent of this connector is to sink data from Apache Flink systems to arbitrary HTTP endpoints.

Status

Current state: Under Discussion

Discussion thread: https://lists.apache.org/thread/cd60ln4pjgml7sv4kh23o1fohcfwvjcz

JIRA: here (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Apache Flink supports many sinks–whether it be to Apache Kafka, Filesystem, Kinesis Data Streams, etc. However, for many users, their unique sinks are not implemented or available by default in Apache Flink's connector ecosystem. Sinking to an arbitrary endpoint can be particularly useful for sending real-time data to a REST-enabled client such as Amazon Lambda. With the success of Stateful Functions and the idea of allowing flexibility for users, we propose to create a sink that supports HTTP endpoints with operations such as POST, GET, PUT, PATCH, and DELETE.

Public Interfaces


Public Interfaces:

A user implementing an HTTP Sink will create an instance of the Sink by defining a new HTTPSink<> , providing an endpoint URL (String) and a method (POST, PUT, GET, etc).

HTTP Sink Constructor:

HTTPSink.java
    public HttpSink(String endpointURL, String method)

The underlying implementation uses FLIP-171 (AsyncSink) to deliver batches of messages to HTTP Endpoints.


Example with either self-generating source or Kinesis Data Stream source that submits data to an Amazon Lambda endpoint

import com.fasterxml.jackson.databind.ser.std.ByteArraySerializer;

import javassist.bytecode.ByteArray;

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.http.HttpSink;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;

import events.MyEvent;

import java.io.IOException;
import java.util.Properties;

public class HttpSinkTest {

    private static DataStream<byte []> createSourceFromStaticConfig(
            StreamExecutionEnvironment env, String region, String inputStreamName) {

        Properties inputProperties = new Properties();
        inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
        inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
        inputProperties.setProperty(ConsumerConfigConstants.SHARD_USE_ADAPTIVE_READS, "true");
        return env.addSource(
                        new FlinkKinesisConsumer<byte []>(
                                inputStreamName, new DeserializationSchema<byte[]>() {
                            @Override
                            public byte[] deserialize(byte[] message) throws IOException {
                                return message;
                            }

                            @Override
                            public boolean isEndOfStream(byte[] nextElement) {
                                return false;
                            }

                            @Override
                            public TypeInformation<byte[]> getProducedType() {
                                return TypeInformation.of(byte[].class);
                            }
                        }, inputProperties))
                .name("flink_kinesis_consumer_01");
    }

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        boolean isSelfGenerating =
                Boolean.parseBoolean(System.getProperty("selfGenerating", "false"));
        int numRecordsPerSecond =
                Integer.parseInt(System.getProperty("numRecordsPerSecond", "100"));

        DataStream<byte[]> sampleEvents = null;

        if (isSelfGenerating) {
            sampleEvents =
                    env.addSource(
                            new SourceFunction<byte []>() {
                                @Override
                                public void run(SourceContext<byte []> sourceContext)
                                        throws Exception {
                                    while (true) {

                                        // create X number of records per second
                                        MyEvent me = new MyEvent();
                                        int x = numRecordsPerSecond;
                                        for (int i = 0; i < x; i++) {
                                            sourceContext.collect(me.getSampleEvents().getBytes());
                                        }
                                        Thread.sleep(1000);
                                    }
                                }

                                @Override
                                public void cancel() {}
                            });
        } else {
            sampleEvents = createSourceFromStaticConfig(env, "us-east-2", "input-stream");
        }

        sampleEvents.sinkTo(
                new HttpSink<>(
                        "https://my-url.lambda-url.us-east-1.on.aws/",
                        "POST"));

        env.execute();
    }
}



Limitations

Currently, this only supports sink changes, and POST requests. In the future iterations we'd like to support HTTP Sources and GETS, PUTs, DELETEs, etc.

Proposed Changes

This FLIP proposes adding the above mentioned HTTP Connector which allows for sinking data to a POST-accepting endpoint. The connector will also handle retries through the Async Sink API according to standard HTTP Status Code retry mechanisms.


In the future, we'd like to add support for:

  • additional methods
  • better authentication mechanisms
  • Table API
  • SQL API
  • Source Connector



The following files comprise the connector:

HTTPSink.java
package org.apache.flink.streaming.connectors.http;

import org.apache.flink.connector.base.sink.AsyncSinkBase;
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.connector.base.sink.writer.RequestEntryWrapper;
import org.apache.flink.core.io.SimpleVersionedSerializer;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

/** @param <InputT> */
public class HttpSink<InputT> extends AsyncSinkBase<InputT, byte []> {

    private final String endpointURL;
    private final String method;
    private static final ElementConverter<Object, byte []> ELEMENT_CONVERTER =
            ((element, context) -> (byte[]) element);
    private static final int MAX_BATCH_SIZE = 10000;
    private static final int MAX_IN_FLIGHT_REQUESTS = 100000; // must be > max_batch_size
    private static final int MAX_BUFFERED_REQUESTS = 100000; // must be > max_batch_size
    private static final int MAX_BATCH_SIZE_IN_BYTES = 10000000;
    private static final int MAX_TIME_IN_BUFFER_MS = 1000;
    private static final int MAX_RECORD_SIZE_IN_BYTES = 10000000;

    public HttpSink(String endpointURL, String method) {
        super(
                (ElementConverter<InputT, byte []>) ELEMENT_CONVERTER,
                MAX_BATCH_SIZE,
                MAX_IN_FLIGHT_REQUESTS,
                MAX_BUFFERED_REQUESTS,
                MAX_BATCH_SIZE_IN_BYTES,
                MAX_TIME_IN_BUFFER_MS,
                MAX_RECORD_SIZE_IN_BYTES);
        this.endpointURL = endpointURL;
        this.method = method;
    }

    @Override
    public StatefulSinkWriter<InputT, BufferedRequestState<byte []>> createWriter(
            InitContext initContext) throws IOException {
        System.out.println("creating writer...");
        return new HttpSinkWriter(
                ELEMENT_CONVERTER,
                initContext,
                MAX_BATCH_SIZE,
                MAX_IN_FLIGHT_REQUESTS,
                MAX_BUFFERED_REQUESTS,
                MAX_BATCH_SIZE_IN_BYTES,
                MAX_TIME_IN_BUFFER_MS,
                MAX_RECORD_SIZE_IN_BYTES,
                endpointURL,
                method);
    }

    @Override
    public StatefulSinkWriter<InputT, BufferedRequestState<byte []>> restoreWriter(
            InitContext initContext, Collection<BufferedRequestState<byte []>> collection)
            throws IOException {
        return new HttpSinkWriter(
                ELEMENT_CONVERTER,
                initContext,
                MAX_BATCH_SIZE,
                MAX_IN_FLIGHT_REQUESTS,
                MAX_BUFFERED_REQUESTS,
                MAX_BATCH_SIZE_IN_BYTES,
                MAX_TIME_IN_BUFFER_MS,
                MAX_RECORD_SIZE_IN_BYTES,
                endpointURL,
                method);
    }

    @Override
    public SimpleVersionedSerializer<BufferedRequestState<byte []>> getWriterStateSerializer() {

        return new SimpleVersionedSerializer<BufferedRequestState<byte []>>() {
            @Override
            public int getVersion() {
                return 0;
            }

            /**
             * Serializes state in form of
             * [DATA_IDENTIFIER,NUM_OF_ELEMENTS,SIZE1,REQUEST1,SIZE2,REQUEST2....].
             */
            @Override
            public byte[] serialize(BufferedRequestState<byte []> obj) throws IOException {
                Collection<RequestEntryWrapper<byte []>> bufferState =
                        obj.getBufferedRequestEntries();

                try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
                        final DataOutputStream out = new DataOutputStream(baos)) {

                    out.writeLong(-1);
                    out.writeInt(bufferState.size());

                    for (RequestEntryWrapper<byte []> wrapper : bufferState) {
                        out.writeLong(wrapper.getSize());
                        serializeRequestToStream(wrapper.getRequestEntry(), out);
                    }

                    return baos.toByteArray();
                }
            }

            @Override
            public BufferedRequestState<byte []> deserialize(int version, byte[] serialized)
                    throws IOException {
                try (final ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
                        final DataInputStream in = new DataInputStream(bais)) {

                    validateIdentifier(in);

                    int size = in.readInt();
                    List<RequestEntryWrapper<String>> serializedState = new ArrayList<>();

                    for (int i = 0; i < size; i++) {
                        long requestSize = in.readLong();
                        String request = deserializeRequestFromStream(requestSize, in);
                        serializedState.add(new RequestEntryWrapper<>(request, requestSize));
                    }

                    return new BufferedRequestState(serializedState);
                }
            }

            protected void serializeRequestToStream(byte [] request, DataOutputStream out)
                    throws IOException {
                out.flush();
            }

            protected String deserializeRequestFromStream(long requestSize, DataInputStream in)
                    throws IOException {
                return in.toString();
            }

            private void validateIdentifier(DataInputStream in) throws IOException {
                if (in.readLong() != -1) {
                    throw new IllegalStateException("Corrupted data to deserialize");
                }
            }
        };
    }
}


HTTPSinkWriter
package org.apache.flink.streaming.connectors.http;

import javassist.bytecode.ByteArray;

import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;

import org.asynchttpclient.AsyncCompletionHandler;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.BoundRequestBuilder;
import org.asynchttpclient.Response;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;

import static org.asynchttpclient.Dsl.*;

public class HttpSinkWriter<InputT> extends AsyncSinkWriter<InputT, byte []> {

    AsyncHttpClient asyncHttpClient;
    private final transient String ENDPOINT_URL;
    private final transient String METHOD;
    private static final List<Integer> retryableErrors = Arrays.asList(408, 502, 503, 504);
    private static final List<Integer> nonRetryableErrors = Arrays.asList(400, 401, 402, 403, 404);
    private final SinkWriterMetricGroup metrics;

    /* A counter for the total number of records that have been sent */
    private final Counter numRecordsSendCounter;

    /* A counter for the total number of records that have encountered an error during put */
    private final Counter numRecordsSendErrorsCounter;

    /* A counter for the total number of bytes that have been sent */
    private final Counter numBytesSendCounter;

    public HttpSinkWriter(
            ElementConverter<InputT, byte []> elementConverter,
            Sink.InitContext context,
            int maxBatchSize,
            int maxInFlightRequests,
            int maxBufferedRequests,
            long maxBatchSizeInBytes,
            long maxTimeInBufferMS,
            long maxRecordSizeInBytes,
            String endpointURL,
            String method) {
        super(
                elementConverter,
                context,
                maxBatchSize,
                maxInFlightRequests,
                maxBufferedRequests,
                maxBatchSizeInBytes,
                maxTimeInBufferMS,
                maxRecordSizeInBytes);

        this.ENDPOINT_URL = endpointURL;
        this.METHOD = method;
        asyncHttpClient = asyncHttpClient();
        this.metrics = context.metricGroup();
        this.numRecordsSendCounter = metrics.getIOMetricGroup().getNumRecordsOutCounter();
        this.numBytesSendCounter = metrics.getIOMetricGroup().getNumBytesOutCounter();
        this.numRecordsSendErrorsCounter = metrics.getNumRecordsSendErrorsCounter();
    }

    @Override
    protected void submitRequestEntries(List<byte []> list, Consumer<List<byte []>> consumer) {

        List<byte []> retryList = new ArrayList<byte []>();
        CountDownLatch cdl = new CountDownLatch(list.size());

        for (byte[] item : list) {
            BoundRequestBuilder postRequest =
                    asyncHttpClient.preparePost(ENDPOINT_URL).setBody(item);
            postRequest.execute(
                    new AsyncCompletionHandler<Object>() {
                        @Override
                        public Object onCompleted(Response response) throws Exception {
                            if (retryableErrors.contains(response.getStatusCode())) {
                                retryList.add(item);
                                numRecordsSendErrorsCounter.inc();
                            } else if (!nonRetryableErrors.contains(response.getStatusCode())) {
                                numRecordsSendCounter.inc();
                                numBytesSendCounter.inc(response.getResponseBodyAsBytes().length);
                            }
                            else
                            {
                                numRecordsSendErrorsCounter.inc();
                                getFatalExceptionCons().accept(new Exception(response.getStatusText()));
                            }
                            cdl.countDown();
                            return response;
                        }
                    });
        }

        try {
            cdl.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        consumer.accept(new ArrayList(retryList));
    }

    /**
     * This method allows the getting of the size of a {@code RequestEntryT} in bytes. The size in
     * this case is measured as the total bytes that is written to the destination as a result of
     * persisting this particular {@code RequestEntryT} rather than the serialized length (which may
     * be the same).
     *
     * @param requestEntry the requestEntry for which we want to know the size
     * @return the size of the requestEntry, as defined previously
     */
    @Override
    protected long getSizeInBytes(byte[] requestEntry) {
        if(requestEntry != null)
        {
            return requestEntry.length;
        }
        return 0;
    }



}


Compatibility, Deprecation, and Migration Plan

  • No impact to existing connectors for users

Test Plan

This application can be tested either internally to the referenced HTTPSink Example above or by writing a REST endpoint which has the capability to return status codes. The integration tests rely on the destination as much as the Flink app.

Rejected Alternatives

  • StatefulFunctions require logic to be within the Lambda application rather than the Flink app for windowing.
  • AsyncIO functions calling REST endpoints could be used for the same purpose, however this sink can be much more readily ported to the Table and SQL API and allows for arbitrary endpoints.
  • No labels