When using the Data Lake sink, the incoming events are stored in an InfluxDB.
Implementation
org.apache.streampipes.sinks.internal.jvm.datalake
The concrete implementation comprises a Data Lake class, a Data Lake Controller class, a Data Lake InfluxDB Client class and a Data Lake Parameters class. The code is basically the same as for the InfluxDB sink (org.apache.streampipes.sinks.databases.jvm.influxdb).
Data Lake Parameters Class
The parameter class defines the necessary parameters for the configuration of the sink.
parameter | description |
---|---|
influxDbHost | hostname/URL of the InfluxDB instance. (including http(s)://) |
influxDbPort | port of the InfluxDB instance |
databaseName | name of the database where events will be stored |
measureName | name of the Measurement where events will be stored (will be created if it does not exist) |
user | username for the InfluxDB server |
password | password for the InfluxDB server |
timestampField | field which contains the required timestamp (field type = http://schema.org/DateTime) |
batchSize | indicates how many events are written into a buffer, before they are written to the database |
flushDuration | maximum waiting time for the buffer to fill the Buffer size before it will be written to the database in ms |
dimensionProperties | list containing the tag fields (scope = dimension property) |
Data Lake Controller Class
In controller class, the model is declared for viewing and configuration in Pipeline Editor, and initializes sink on invocation of pipeline.
The measurement name and the timestamp field are derived from user input, the remaining parameters (except batch size and flush duration) from org.apache.streampipes.sinks.internal.jvm.config.SinksInternalJvmConfig. Batch size is fixed to 2000 events and flush duration is set to 500 ms.
Data Lake Class
The data lake class itself essentially controls the saving of events to the database. For this purpose, it uses the Data Lake InfluxDB Client.
method name | description |
---|---|
onInvocation | starting the DataLakeInfluxDbClient, registering and initializing new measurement series in InfluxDB |
onEvent | adding empty label field to incoming event and storing event in database |
onDetach | stopping the DataLakeInfluxDbClient |
Image data, unlike events, is not stored directly in database but as Image files in a corresponding directory (writeToImageFile).
In addition, the class contains two utility methods (registerAtDataLake and prepareString)
Data Lake InfluxDB Client Class
Client class that connects to InfluxDB and writes events directly to database. Uses the Data Lake Parameters described above.
method name | description |
---|---|
validate | checks whether the influxDbHost is valid |
connect | connects to the InfluxDB server, sets the database and initializes the batch-behaviour |
databaseExists | checks whether the given database exists |
createDatabase | creates a new database with the given name |
save | saves an event to the connnected InfluxDB database |
stop | shuts down the connection to the InfluxDB server |
TODO:
- validate(): use validation method (org.apache.commons.validator.routines.InetAddressValidator) instead of regex check
REST API
DataLakeNoUserResourceV3
org.apache.streampipes.rest.impl.datalake
This class contains the basic interface definition for setting up a new measurement series in Data Lake and calls the underlying methods of org.apache.streampipes.dataexplorer.DataLakeNoUserManagementV3. Usage of the related API calls does not require any authentification with valid username and password.
method name | request type | path | description |
---|---|---|---|
addDataLake | POST | /{measure} | adds new measurement series with specified measure name and related event properties (column names) in InfluxDB |
TODO:
- [STREAMPIPES-348]: fix issue with special characters in user-defined measure name
- add authentication obligation to addDataLake method
DataLakeResourceV3
org.apache.streampipes.ps
This class contains the extended interface definition and calls the underlying methods of org.apache.streampipes.dataexplorer.DataLakeManagementV3 and org.apache.streampipes.dataexplorer.utils.DataExplorerUtils when invoked. Usage of below mentioned API calls requires authentification with valid username and password.
method name | request type | path | description |
---|---|---|---|
getPage | GET | /data/{index}/paging | returns pages with predefined number of events per page of a specific measurement series from InfluxDB |
getAllInfos | GET | /info | returns list with ids of all existing measurement series from InfluxDB |
getAllData | GET | /data/{index} /data/{index}/last/{value}/{unit} /data/{index}/{startdate}/{enddate} | returns all stored events of a specific mesurement series from InfluxDB |
getAllDataGrouping | GET | /data/{index}/{startdate}/{enddate}/grouping/{groupingTag} | returns all events within a specified time frame of a specific mesurement series grouped by a specific tag from InfluxDB |
removeAllData | DELETE | /data/delete/all | removes all stored events from InfluxDB |
downloadData | GET | /data/{index}/download | downloads all events of a specific mesurement series from InfluxDB in desired format |
getImage | GET | /data/image/{route}/file | returns png image from file route |
saveImageCoco | POST | /data/image/{route}/coco | stores image as file at file route |
getImageCoco | GET | /data/image/{route}/coco | returns image at file route as application/json |
labelData | POST | /data/{index}/{startdate}/{enddate}/labeling/{column}/{timestampColumn}?label= | updates label in specified column for all events within specified time frame to provided label value |
TODO:
- fix export of data from data lake, which currently returns two timestamp fields
References:
- Apache StreamPipes Documentation: https://streampipes.apache.org/docs/docs/pe/org.apache.streampipes.sinks.internal.jvm.datalake/