Page properties | ||
---|---|---|
|
The intent of this connector is to sink data from Apache Flink systems to arbitrary HTTP endpoints.
Status
Current state: Under Discussion Abandoned
Discussion thread: https://lists.apache.org/thread/cd60ln4pjgml7sv4kh23o1fohcfwvjcz
...
Example with either self-generating source or Kinesis Data Stream source that submits data to an Amazon Lambda endpoint
Code Block |
---|
import com.fasterxml.jackson.databind.ser.std.ByteArraySerializer;
import javassist.bytecode.ByteArray;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.http.HttpSink;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
import events.MyEvent;
import java.io.IOException;
import java.util.Properties;
public class HttpSinkTest {
private static DataStream<byte []> createSourceFromStaticConfig(
StreamExecutionEnvironment env, String region, String inputStreamName) {
Properties inputProperties = new Properties();
inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
inputProperties.setProperty(ConsumerConfigConstants.SHARD_USE_ADAPTIVE_READS, "true");
return env.addSource(
new FlinkKinesisConsumer<byte []>(
inputStreamName, new DeserializationSchema<byte[]>() {
@Override
public byte[] deserialize(byte[] message) throws IOException {
return message;
}
@Override
public boolean isEndOfStream(byte[] nextElement) {
return false;
}
@Override
public TypeInformation<byte[]> getProducedType() {
return TypeInformation.of(byte[].class);
}
}, inputProperties))
.name("flink_kinesis_consumer_01");
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
boolean isSelfGenerating =
Boolean.parseBoolean(System.getProperty("selfGenerating", "false"));
int numRecordsPerSecond =
Integer.parseInt(System.getProperty("numRecordsPerSecond", "100"));
DataStream<byte[]> sampleEvents = null;
if (isSelfGenerating) {
sampleEvents =
env.addSource(
new SourceFunction<byte []>() {
@Override
public void run(SourceContext<byte []> sourceContext)
throws Exception {
while (true) {
// create X number of records per second
MyEvent me = new MyEvent();
int x = numRecordsPerSecond;
for (int i = 0; i < x; i++) {
sourceContext.collect(me.getSampleEvents().getBytes());
}
Thread.sleep(1000);
}
}
@Override
public void cancel() {}
});
} else {
sampleEvents = createSourceFromStaticConfig(env, "us-east-2", "input-stream");
}
sampleEvents.sinkTo(
new HttpSink<>(
"https://my-url.lambda-url.us-east-1.on.aws/",
"POST"));
env.execute();
}
}
|
Limitations
Currently, this only supports sink changes, and POST requests. In the future iterations we'd like to support HTTP Sources and GETS, PUTs, DELETEs, etc.
...
- additional methods
- better authentication mechanisms
- Table API
- SQL API
- Source Connector
The following files comprise the connector:
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.flink.streaming.connectors.http;
import org.apache.flink.connector.base.sink.AsyncSinkBase;
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.connector.base.sink.writer.RequestEntryWrapper;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
/** @param <InputT> */
public class HttpSink<InputT> extends AsyncSinkBase<InputT, byte []> {
private final String endpointURL;
private final String method;
private static final ElementConverter<Object, byte []> ELEMENT_CONVERTER =
((element, context) -> (byte[]) element);
private static final int MAX_BATCH_SIZE = 10000;
private static final int MAX_IN_FLIGHT_REQUESTS = 100000; // must be > max_batch_size
private static final int MAX_BUFFERED_REQUESTS = 100000; // must be > max_batch_size
private static final int MAX_BATCH_SIZE_IN_BYTES = 10000000;
private static final int MAX_TIME_IN_BUFFER_MS = 1000;
private static final int MAX_RECORD_SIZE_IN_BYTES = 10000000;
public HttpSink(String endpointURL, String method) {
super(
(ElementConverter<InputT, byte []>) ELEMENT_CONVERTER,
MAX_BATCH_SIZE,
MAX_IN_FLIGHT_REQUESTS,
MAX_BUFFERED_REQUESTS,
MAX_BATCH_SIZE_IN_BYTES,
MAX_TIME_IN_BUFFER_MS,
MAX_RECORD_SIZE_IN_BYTES);
this.endpointURL = endpointURL;
this.method = method;
}
@Override
public StatefulSinkWriter<InputT, BufferedRequestState<byte []>> createWriter(
InitContext initContext) throws IOException {
System.out.println("creating writer...");
return new HttpSinkWriter(
ELEMENT_CONVERTER,
initContext,
MAX_BATCH_SIZE,
MAX_IN_FLIGHT_REQUESTS,
MAX_BUFFERED_REQUESTS,
MAX_BATCH_SIZE_IN_BYTES,
MAX_TIME_IN_BUFFER_MS,
MAX_RECORD_SIZE_IN_BYTES,
endpointURL,
method);
}
@Override
public StatefulSinkWriter<InputT, BufferedRequestState<byte []>> restoreWriter(
InitContext initContext, Collection<BufferedRequestState<byte []>> collection)
throws IOException {
return new HttpSinkWriter(
ELEMENT_CONVERTER,
initContext,
MAX_BATCH_SIZE,
MAX_IN_FLIGHT_REQUESTS,
MAX_BUFFERED_REQUESTS,
MAX_BATCH_SIZE_IN_BYTES,
MAX_TIME_IN_BUFFER_MS,
MAX_RECORD_SIZE_IN_BYTES,
endpointURL,
method);
}
@Override
public SimpleVersionedSerializer<BufferedRequestState<byte []>> getWriterStateSerializer() {
return new SimpleVersionedSerializer<BufferedRequestState<byte []>>() {
@Override
public int getVersion() {
return 0;
}
/**
* Serializes state in form of
* [DATA_IDENTIFIER,NUM_OF_ELEMENTS,SIZE1,REQUEST1,SIZE2,REQUEST2....].
*/
@Override
public byte[] serialize(BufferedRequestState<byte []> obj) throws IOException {
Collection<RequestEntryWrapper<byte []>> bufferState =
obj.getBufferedRequestEntries();
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final DataOutputStream out = new DataOutputStream(baos)) {
out.writeLong(-1);
out.writeInt(bufferState.size());
for (RequestEntryWrapper<byte []> wrapper : bufferState) {
out.writeLong(wrapper.getSize());
serializeRequestToStream(wrapper.getRequestEntry(), out);
}
return baos.toByteArray();
}
}
@Override
public BufferedRequestState<byte []> deserialize(int version, byte[] serialized)
throws IOException {
try (final ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
final DataInputStream in = new DataInputStream(bais)) {
validateIdentifier(in);
int size = in.readInt();
List<RequestEntryWrapper<String>> serializedState = new ArrayList<>();
for (int i = 0; i < size; i++) {
long requestSize = in.readLong();
String request = deserializeRequestFromStream(requestSize, in);
serializedState.add(new RequestEntryWrapper<>(request, requestSize));
}
return new BufferedRequestState(serializedState);
}
}
protected void serializeRequestToStream(byte [] request, DataOutputStream out)
throws IOException {
out.flush();
}
protected String deserializeRequestFromStream(long requestSize, DataInputStream in)
throws IOException {
return in.toString();
}
private void validateIdentifier(DataInputStream in) throws IOException {
if (in.readLong() != -1) {
throw new IllegalStateException("Corrupted data to deserialize");
}
}
};
}
}
|
...
language | java |
---|---|
title | HTTPSinkWriter |
...
Compatibility, Deprecation, and Migration Plan
...