THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
- additional methods
- better authentication mechanisms
- Table API
- SQL API
- Source Connector
The following files comprise the connector:
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.flink.streaming.connectors.http;
import org.apache.flink.connector.base.sink.AsyncSinkBase;
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.connector.base.sink.writer.RequestEntryWrapper;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
/** @param <InputT> */
public class HttpSink<InputT> extends AsyncSinkBase<InputT, byte []> {
private final String endpointURL;
private final String method;
private static final ElementConverter<Object, byte []> ELEMENT_CONVERTER =
((element, context) -> (byte[]) element);
private static final int MAX_BATCH_SIZE = 10000;
private static final int MAX_IN_FLIGHT_REQUESTS = 100000; // must be > max_batch_size
private static final int MAX_BUFFERED_REQUESTS = 100000; // must be > max_batch_size
private static final int MAX_BATCH_SIZE_IN_BYTES = 10000000;
private static final int MAX_TIME_IN_BUFFER_MS = 1000;
private static final int MAX_RECORD_SIZE_IN_BYTES = 10000000;
public HttpSink(String endpointURL, String method) {
super(
(ElementConverter<InputT, byte []>) ELEMENT_CONVERTER,
MAX_BATCH_SIZE,
MAX_IN_FLIGHT_REQUESTS,
MAX_BUFFERED_REQUESTS,
MAX_BATCH_SIZE_IN_BYTES,
MAX_TIME_IN_BUFFER_MS,
MAX_RECORD_SIZE_IN_BYTES);
this.endpointURL = endpointURL;
this.method = method;
}
@Override
public StatefulSinkWriter<InputT, BufferedRequestState<byte []>> createWriter(
InitContext initContext) throws IOException {
System.out.println("creating writer...");
return new HttpSinkWriter(
ELEMENT_CONVERTER,
initContext,
MAX_BATCH_SIZE,
MAX_IN_FLIGHT_REQUESTS,
MAX_BUFFERED_REQUESTS,
MAX_BATCH_SIZE_IN_BYTES,
MAX_TIME_IN_BUFFER_MS,
MAX_RECORD_SIZE_IN_BYTES,
endpointURL,
method);
}
@Override
public StatefulSinkWriter<InputT, BufferedRequestState<byte []>> restoreWriter(
InitContext initContext, Collection<BufferedRequestState<byte []>> collection)
throws IOException {
return new HttpSinkWriter(
ELEMENT_CONVERTER,
initContext,
MAX_BATCH_SIZE,
MAX_IN_FLIGHT_REQUESTS,
MAX_BUFFERED_REQUESTS,
MAX_BATCH_SIZE_IN_BYTES,
MAX_TIME_IN_BUFFER_MS,
MAX_RECORD_SIZE_IN_BYTES,
endpointURL,
method);
}
@Override
public SimpleVersionedSerializer<BufferedRequestState<byte []>> getWriterStateSerializer() {
return new SimpleVersionedSerializer<BufferedRequestState<byte []>>() {
@Override
public int getVersion() {
return 0;
}
/**
* Serializes state in form of
* [DATA_IDENTIFIER,NUM_OF_ELEMENTS,SIZE1,REQUEST1,SIZE2,REQUEST2....].
*/
@Override
public byte[] serialize(BufferedRequestState<byte []> obj) throws IOException {
Collection<RequestEntryWrapper<byte []>> bufferState =
obj.getBufferedRequestEntries();
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final DataOutputStream out = new DataOutputStream(baos)) {
out.writeLong(-1);
out.writeInt(bufferState.size());
for (RequestEntryWrapper<byte []> wrapper : bufferState) {
out.writeLong(wrapper.getSize());
serializeRequestToStream(wrapper.getRequestEntry(), out);
}
return baos.toByteArray();
}
}
@Override
public BufferedRequestState<byte []> deserialize(int version, byte[] serialized)
throws IOException {
try (final ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
final DataInputStream in = new DataInputStream(bais)) {
validateIdentifier(in);
int size = in.readInt();
List<RequestEntryWrapper<String>> serializedState = new ArrayList<>();
for (int i = 0; i < size; i++) {
long requestSize = in.readLong();
String request = deserializeRequestFromStream(requestSize, in);
serializedState.add(new RequestEntryWrapper<>(request, requestSize));
}
return new BufferedRequestState(serializedState);
}
}
protected void serializeRequestToStream(byte [] request, DataOutputStream out)
throws IOException {
out.flush();
}
protected String deserializeRequestFromStream(long requestSize, DataInputStream in)
throws IOException {
return in.toString();
}
private void validateIdentifier(DataInputStream in) throws IOException {
if (in.readLong() != -1) {
throw new IllegalStateException("Corrupted data to deserialize");
}
}
};
}
}
|
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.flink.streaming.connectors.http;
import javassist.bytecode.ByteArray;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.asynchttpclient.AsyncCompletionHandler;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.BoundRequestBuilder;
import org.asynchttpclient.Response;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
import static org.asynchttpclient.Dsl.*;
public class HttpSinkWriter<InputT> extends AsyncSinkWriter<InputT, byte []> {
AsyncHttpClient asyncHttpClient;
private final transient String ENDPOINT_URL;
private final transient String METHOD;
private static final List<Integer> retryableErrors = Arrays.asList(408, 502, 503, 504);
private static final List<Integer> nonRetryableErrors = Arrays.asList(400, 401, 402, 403, 404);
private final SinkWriterMetricGroup metrics;
/* A counter for the total number of records that have been sent */
private final Counter numRecordsSendCounter;
/* A counter for the total number of records that have encountered an error during put */
private final Counter numRecordsSendErrorsCounter;
/* A counter for the total number of bytes that have been sent */
private final Counter numBytesSendCounter;
public HttpSinkWriter(
ElementConverter<InputT, byte []> elementConverter,
Sink.InitContext context,
int maxBatchSize,
int maxInFlightRequests,
int maxBufferedRequests,
long maxBatchSizeInBytes,
long maxTimeInBufferMS,
long maxRecordSizeInBytes,
String endpointURL,
String method) {
super(
elementConverter,
context,
maxBatchSize,
maxInFlightRequests,
maxBufferedRequests,
maxBatchSizeInBytes,
maxTimeInBufferMS,
maxRecordSizeInBytes);
this.ENDPOINT_URL = endpointURL;
this.METHOD = method;
asyncHttpClient = asyncHttpClient();
this.metrics = context.metricGroup();
this.numRecordsSendCounter = metrics.getIOMetricGroup().getNumRecordsOutCounter();
this.numBytesSendCounter = metrics.getIOMetricGroup().getNumBytesOutCounter();
this.numRecordsSendErrorsCounter = metrics.getNumRecordsSendErrorsCounter();
}
@Override
protected void submitRequestEntries(List<byte []> list, Consumer<List<byte []>> consumer) {
List<byte []> retryList = new ArrayList<byte []>();
CountDownLatch cdl = new CountDownLatch(list.size());
for (byte[] item : list) {
BoundRequestBuilder postRequest =
asyncHttpClient.preparePost(ENDPOINT_URL).setBody(item);
postRequest.execute(
new AsyncCompletionHandler<Object>() {
@Override
public Object onCompleted(Response response) throws Exception {
if (retryableErrors.contains(response.getStatusCode())) {
retryList.add(item);
numRecordsSendErrorsCounter.inc();
} else if (!nonRetryableErrors.contains(response.getStatusCode())) {
numRecordsSendCounter.inc();
numBytesSendCounter.inc(response.getResponseBodyAsBytes().length);
}
else
{
numRecordsSendErrorsCounter.inc();
getFatalExceptionCons().accept(new Exception(response.getStatusText()));
}
cdl.countDown();
return response;
}
});
}
try {
cdl.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
consumer.accept(new ArrayList(retryList));
}
/**
* This method allows the getting of the size of a {@code RequestEntryT} in bytes. The size in
* this case is measured as the total bytes that is written to the destination as a result of
* persisting this particular {@code RequestEntryT} rather than the serialized length (which may
* be the same).
*
* @param requestEntry the requestEntry for which we want to know the size
* @return the size of the requestEntry, as defined previously
*/
@Override
protected long getSizeInBytes(byte[] requestEntry) {
if(requestEntry != null)
{
return requestEntry.length;
}
return 0;
}
}
|
Compatibility, Deprecation, and Migration Plan
...