Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • additional methods
  • better authentication mechanisms
  • Table API
  • SQL API
  • Source Connector



The following files comprise the connector:

Code Block
languagejava
titleHTTPSink.java
package org.apache.flink.streaming.connectors.http;

import org.apache.flink.connector.base.sink.AsyncSinkBase;
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.connector.base.sink.writer.RequestEntryWrapper;
import org.apache.flink.core.io.SimpleVersionedSerializer;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

/** @param <InputT> */
public class HttpSink<InputT> extends AsyncSinkBase<InputT, byte []> {

    private final String endpointURL;
    private final String method;
    private static final ElementConverter<Object, byte []> ELEMENT_CONVERTER =
            ((element, context) -> (byte[]) element);
    private static final int MAX_BATCH_SIZE = 10000;
    private static final int MAX_IN_FLIGHT_REQUESTS = 100000; // must be > max_batch_size
    private static final int MAX_BUFFERED_REQUESTS = 100000; // must be > max_batch_size
    private static final int MAX_BATCH_SIZE_IN_BYTES = 10000000;
    private static final int MAX_TIME_IN_BUFFER_MS = 1000;
    private static final int MAX_RECORD_SIZE_IN_BYTES = 10000000;

    public HttpSink(String endpointURL, String method) {
        super(
                (ElementConverter<InputT, byte []>) ELEMENT_CONVERTER,
                MAX_BATCH_SIZE,
                MAX_IN_FLIGHT_REQUESTS,
                MAX_BUFFERED_REQUESTS,
                MAX_BATCH_SIZE_IN_BYTES,
                MAX_TIME_IN_BUFFER_MS,
                MAX_RECORD_SIZE_IN_BYTES);
        this.endpointURL = endpointURL;
        this.method = method;
    }

    @Override
    public StatefulSinkWriter<InputT, BufferedRequestState<byte []>> createWriter(
            InitContext initContext) throws IOException {
        System.out.println("creating writer...");
        return new HttpSinkWriter(
                ELEMENT_CONVERTER,
                initContext,
                MAX_BATCH_SIZE,
                MAX_IN_FLIGHT_REQUESTS,
                MAX_BUFFERED_REQUESTS,
                MAX_BATCH_SIZE_IN_BYTES,
                MAX_TIME_IN_BUFFER_MS,
                MAX_RECORD_SIZE_IN_BYTES,
                endpointURL,
                method);
    }

    @Override
    public StatefulSinkWriter<InputT, BufferedRequestState<byte []>> restoreWriter(
            InitContext initContext, Collection<BufferedRequestState<byte []>> collection)
            throws IOException {
        return new HttpSinkWriter(
                ELEMENT_CONVERTER,
                initContext,
                MAX_BATCH_SIZE,
                MAX_IN_FLIGHT_REQUESTS,
                MAX_BUFFERED_REQUESTS,
                MAX_BATCH_SIZE_IN_BYTES,
                MAX_TIME_IN_BUFFER_MS,
                MAX_RECORD_SIZE_IN_BYTES,
                endpointURL,
                method);
    }

    @Override
    public SimpleVersionedSerializer<BufferedRequestState<byte []>> getWriterStateSerializer() {

        return new SimpleVersionedSerializer<BufferedRequestState<byte []>>() {
            @Override
            public int getVersion() {
                return 0;
            }

            /**
             * Serializes state in form of
             * [DATA_IDENTIFIER,NUM_OF_ELEMENTS,SIZE1,REQUEST1,SIZE2,REQUEST2....].
             */
            @Override
            public byte[] serialize(BufferedRequestState<byte []> obj) throws IOException {
                Collection<RequestEntryWrapper<byte []>> bufferState =
                        obj.getBufferedRequestEntries();

                try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
                        final DataOutputStream out = new DataOutputStream(baos)) {

                    out.writeLong(-1);
                    out.writeInt(bufferState.size());

                    for (RequestEntryWrapper<byte []> wrapper : bufferState) {
                        out.writeLong(wrapper.getSize());
                        serializeRequestToStream(wrapper.getRequestEntry(), out);
                    }

                    return baos.toByteArray();
                }
            }

            @Override
            public BufferedRequestState<byte []> deserialize(int version, byte[] serialized)
                    throws IOException {
                try (final ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
                        final DataInputStream in = new DataInputStream(bais)) {

                    validateIdentifier(in);

                    int size = in.readInt();
                    List<RequestEntryWrapper<String>> serializedState = new ArrayList<>();

                    for (int i = 0; i < size; i++) {
                        long requestSize = in.readLong();
                        String request = deserializeRequestFromStream(requestSize, in);
                        serializedState.add(new RequestEntryWrapper<>(request, requestSize));
                    }

                    return new BufferedRequestState(serializedState);
                }
            }

            protected void serializeRequestToStream(byte [] request, DataOutputStream out)
                    throws IOException {
                out.flush();
            }

            protected String deserializeRequestFromStream(long requestSize, DataInputStream in)
                    throws IOException {
                return in.toString();
            }

            private void validateIdentifier(DataInputStream in) throws IOException {
                if (in.readLong() != -1) {
                    throw new IllegalStateException("Corrupted data to deserialize");
                }
            }
        };
    }
}


Code Block
languagejava
titleHTTPSinkWriter
package org.apache.flink.streaming.connectors.http;

import javassist.bytecode.ByteArray;

import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;

import org.asynchttpclient.AsyncCompletionHandler;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.BoundRequestBuilder;
import org.asynchttpclient.Response;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;

import static org.asynchttpclient.Dsl.*;

public class HttpSinkWriter<InputT> extends AsyncSinkWriter<InputT, byte []> {

    AsyncHttpClient asyncHttpClient;
    private final transient String ENDPOINT_URL;
    private final transient String METHOD;
    private static final List<Integer> retryableErrors = Arrays.asList(408, 502, 503, 504);
    private static final List<Integer> nonRetryableErrors = Arrays.asList(400, 401, 402, 403, 404);
    private final SinkWriterMetricGroup metrics;

    /* A counter for the total number of records that have been sent */
    private final Counter numRecordsSendCounter;

    /* A counter for the total number of records that have encountered an error during put */
    private final Counter numRecordsSendErrorsCounter;

    /* A counter for the total number of bytes that have been sent */
    private final Counter numBytesSendCounter;

    public HttpSinkWriter(
            ElementConverter<InputT, byte []> elementConverter,
            Sink.InitContext context,
            int maxBatchSize,
            int maxInFlightRequests,
            int maxBufferedRequests,
            long maxBatchSizeInBytes,
            long maxTimeInBufferMS,
            long maxRecordSizeInBytes,
            String endpointURL,
            String method) {
        super(
                elementConverter,
                context,
                maxBatchSize,
                maxInFlightRequests,
                maxBufferedRequests,
                maxBatchSizeInBytes,
                maxTimeInBufferMS,
                maxRecordSizeInBytes);

        this.ENDPOINT_URL = endpointURL;
        this.METHOD = method;
        asyncHttpClient = asyncHttpClient();
        this.metrics = context.metricGroup();
        this.numRecordsSendCounter = metrics.getIOMetricGroup().getNumRecordsOutCounter();
        this.numBytesSendCounter = metrics.getIOMetricGroup().getNumBytesOutCounter();
        this.numRecordsSendErrorsCounter = metrics.getNumRecordsSendErrorsCounter();
    }

    @Override
    protected void submitRequestEntries(List<byte []> list, Consumer<List<byte []>> consumer) {

        List<byte []> retryList = new ArrayList<byte []>();
        CountDownLatch cdl = new CountDownLatch(list.size());

        for (byte[] item : list) {
            BoundRequestBuilder postRequest =
                    asyncHttpClient.preparePost(ENDPOINT_URL).setBody(item);
            postRequest.execute(
                    new AsyncCompletionHandler<Object>() {
                        @Override
                        public Object onCompleted(Response response) throws Exception {
                            if (retryableErrors.contains(response.getStatusCode())) {
                                retryList.add(item);
                                numRecordsSendErrorsCounter.inc();
                            } else if (!nonRetryableErrors.contains(response.getStatusCode())) {
                                numRecordsSendCounter.inc();
                                numBytesSendCounter.inc(response.getResponseBodyAsBytes().length);
                            }
                            else
                            {
                                numRecordsSendErrorsCounter.inc();
                                getFatalExceptionCons().accept(new Exception(response.getStatusText()));
                            }
                            cdl.countDown();
                            return response;
                        }
                    });
        }

        try {
            cdl.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        consumer.accept(new ArrayList(retryList));
    }

    /**
     * This method allows the getting of the size of a {@code RequestEntryT} in bytes. The size in
     * this case is measured as the total bytes that is written to the destination as a result of
     * persisting this particular {@code RequestEntryT} rather than the serialized length (which may
     * be the same).
     *
     * @param requestEntry the requestEntry for which we want to know the size
     * @return the size of the requestEntry, as defined previously
     */
    @Override
    protected long getSizeInBytes(byte[] requestEntry) {
        if(requestEntry != null)
        {
            return requestEntry.length;
        }
        return 0;
    }



}


Compatibility, Deprecation, and Migration Plan

...