Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Example with either self-generating source or Kinesis Data Stream source that submits data to an Amazon Lambda endpoint

Code Block
import com.fasterxml.jackson.databind.ser.std.ByteArraySerializer;

import javassist.bytecode.ByteArray;

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.http.HttpSink;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;

import events.MyEvent;

import java.io.IOException;
import java.util.Properties;

public class HttpSinkTest {

    private static DataStream<byte []> createSourceFromStaticConfig(
            StreamExecutionEnvironment env, String region, String inputStreamName) {

        Properties inputProperties = new Properties();
        inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
        inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
        inputProperties.setProperty(ConsumerConfigConstants.SHARD_USE_ADAPTIVE_READS, "true");
        return env.addSource(
                        new FlinkKinesisConsumer<byte []>(
                                inputStreamName, new DeserializationSchema<byte[]>() {
                            @Override
                            public byte[] deserialize(byte[] message) throws IOException {
                                return message;
                            }

                            @Override
                            public boolean isEndOfStream(byte[] nextElement) {
                                return false;
                            }

                            @Override
                            public TypeInformation<byte[]> getProducedType() {
                                return TypeInformation.of(byte[].class);
                            }
                        }, inputProperties))
                .name("flink_kinesis_consumer_01");
    }

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        boolean isSelfGenerating =
                Boolean.parseBoolean(System.getProperty("selfGenerating", "false"));
        int numRecordsPerSecond =
                Integer.parseInt(System.getProperty("numRecordsPerSecond", "100"));

        DataStream<byte[]> sampleEvents = null;

        if (isSelfGenerating) {
            sampleEvents =
                    env.addSource(
                            new SourceFunction<byte []>() {
                                @Override
                                public void run(SourceContext<byte []> sourceContext)
                                        throws Exception {
                                    while (true) {

                                        // create X number of records per second
                                        MyEvent me = new MyEvent();
                                        int x = numRecordsPerSecond;
                                        for (int i = 0; i < x; i++) {
                                            sourceContext.collect(me.getSampleEvents().getBytes());
                                        }
                                        Thread.sleep(1000);
                                    }
                                }

                                @Override
                                public void cancel() {}
                            });
        } else {
            sampleEvents = createSourceFromStaticConfig(env, "us-east-2", "input-stream");
        }

        sampleEvents.sinkTo(
                new HttpSink<>(
                        "https://my-url.lambda-url.us-east-1.on.aws/",
                        "POST"));

        env.execute();
    }
}

Limitations

Currently, this only supports sink changes, and POST requests. In the future iterations we'd like to support HTTP Sources and GETS, PUTs, DELETEs, etc.

...

  • additional methods
  • better authentication mechanisms
  • Table API
  • SQL API
  • Source Connector

The following files comprise the connector:

Code Block
languagejava
titleHTTPSink.java
package org.apache.flink.streaming.connectors.http;

import org.apache.flink.connector.base.sink.AsyncSinkBase;
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.connector.base.sink.writer.RequestEntryWrapper;
import org.apache.flink.core.io.SimpleVersionedSerializer;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

/** @param <InputT> */
public class HttpSink<InputT> extends AsyncSinkBase<InputT, byte []> {

    private final String endpointURL;
    private final String method;
    private static final ElementConverter<Object, byte []> ELEMENT_CONVERTER =
            ((element, context) -> (byte[]) element);
    private static final int MAX_BATCH_SIZE = 10000;
    private static final int MAX_IN_FLIGHT_REQUESTS = 100000; // must be > max_batch_size
    private static final int MAX_BUFFERED_REQUESTS = 100000; // must be > max_batch_size
    private static final int MAX_BATCH_SIZE_IN_BYTES = 10000000;
    private static final int MAX_TIME_IN_BUFFER_MS = 1000;
    private static final int MAX_RECORD_SIZE_IN_BYTES = 10000000;

    public HttpSink(String endpointURL, String method) {
        super(
                (ElementConverter<InputT, byte []>) ELEMENT_CONVERTER,
                MAX_BATCH_SIZE,
                MAX_IN_FLIGHT_REQUESTS,
                MAX_BUFFERED_REQUESTS,
                MAX_BATCH_SIZE_IN_BYTES,
                MAX_TIME_IN_BUFFER_MS,
                MAX_RECORD_SIZE_IN_BYTES);
        this.endpointURL = endpointURL;
        this.method = method;
    }

    @Override
    public StatefulSinkWriter<InputT, BufferedRequestState<byte []>> createWriter(
            InitContext initContext) throws IOException {
        System.out.println("creating writer...");
        return new HttpSinkWriter(
                ELEMENT_CONVERTER,
                initContext,
                MAX_BATCH_SIZE,
                MAX_IN_FLIGHT_REQUESTS,
                MAX_BUFFERED_REQUESTS,
                MAX_BATCH_SIZE_IN_BYTES,
                MAX_TIME_IN_BUFFER_MS,
                MAX_RECORD_SIZE_IN_BYTES,
                endpointURL,
                method);
    }

    @Override
    public StatefulSinkWriter<InputT, BufferedRequestState<byte []>> restoreWriter(
            InitContext initContext, Collection<BufferedRequestState<byte []>> collection)
            throws IOException {
        return new HttpSinkWriter(
                ELEMENT_CONVERTER,
                initContext,
                MAX_BATCH_SIZE,
                MAX_IN_FLIGHT_REQUESTS,
                MAX_BUFFERED_REQUESTS,
                MAX_BATCH_SIZE_IN_BYTES,
                MAX_TIME_IN_BUFFER_MS,
                MAX_RECORD_SIZE_IN_BYTES,
                endpointURL,
                method);
    }

    @Override
    public SimpleVersionedSerializer<BufferedRequestState<byte []>> getWriterStateSerializer() {

        return new SimpleVersionedSerializer<BufferedRequestState<byte []>>() {
            @Override
            public int getVersion() {
                return 0;
            }

            /**
             * Serializes state in form of
             * [DATA_IDENTIFIER,NUM_OF_ELEMENTS,SIZE1,REQUEST1,SIZE2,REQUEST2....].
             */
            @Override
            public byte[] serialize(BufferedRequestState<byte []> obj) throws IOException {
                Collection<RequestEntryWrapper<byte []>> bufferState =
                        obj.getBufferedRequestEntries();

                try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
                        final DataOutputStream out = new DataOutputStream(baos)) {

                    out.writeLong(-1);
                    out.writeInt(bufferState.size());

                    for (RequestEntryWrapper<byte []> wrapper : bufferState) {
                        out.writeLong(wrapper.getSize());
                        serializeRequestToStream(wrapper.getRequestEntry(), out);
                    }

                    return baos.toByteArray();
                }
            }

            @Override
            public BufferedRequestState<byte []> deserialize(int version, byte[] serialized)
                    throws IOException {
                try (final ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
                        final DataInputStream in = new DataInputStream(bais)) {

                    validateIdentifier(in);

                    int size = in.readInt();
                    List<RequestEntryWrapper<String>> serializedState = new ArrayList<>();

                    for (int i = 0; i < size; i++) {
                        long requestSize = in.readLong();
                        String request = deserializeRequestFromStream(requestSize, in);
                        serializedState.add(new RequestEntryWrapper<>(request, requestSize));
                    }

                    return new BufferedRequestState(serializedState);
                }
            }

            protected void serializeRequestToStream(byte [] request, DataOutputStream out)
                    throws IOException {
                out.flush();
            }

            protected String deserializeRequestFromStream(long requestSize, DataInputStream in)
                    throws IOException {
                return in.toString();
            }

            private void validateIdentifier(DataInputStream in) throws IOException {
                if (in.readLong() != -1) {
                    throw new IllegalStateException("Corrupted data to deserialize");
                }
            }
        };
    }
}

...

languagejava
titleHTTPSinkWriter

...

Compatibility, Deprecation, and Migration Plan

...