You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 71 Next »

When using the Data Lake sink, the incoming events are stored in an InfluxDB.

Implementation

org.apache.streampipes.sinks.internal.jvm.datalake

The concrete implementation comprises a Data Lake class, a Data Lake Controller class, a Data Lake InfluxDB Client class and a Data Lake Parameters class. The code is basically the same as for the InfluxDB sink (org.apache.streampipes.sinks.databases.jvm.influxdb).

Data Lake Parameters Class

The parameter class defines the necessary parameters for the configuration of the sink.

parameterdescription

influxDbHost

hostname/URL of the InfluxDB instance. (including http(s)://)
influxDbPortport of the InfluxDB instance
databaseNamename of the database where events will be stored
measureNamename of the Measurement where events will be stored (will be created if it does not exist)
userusername for the InfluxDB server
passwordpassword for the InfluxDB server
timestampFieldfield which contains the required timestamp (field type = http://schema.org/DateTime)
batchSizeindicates how many events are written into a buffer, before they are written to the database
flushDurationmaximum waiting time for the buffer to fill the Buffer size before it will be written to the database in ms
dimensionPropertieslist containing the tag fields (scope = dimension property)

Data Lake Controller Class

In controller class, the model is declared for viewing and configuration in Pipeline Editor, and initializes sink on invocation of pipeline.

The measurement name and the timestamp field are derived from user input, the remaining parameters (except batch size and flush duration) from org.apache.streampipes.sinks.internal.jvm.config.SinksInternalJvmConfig. Batch size is fixed to 2000 events and flush duration is set to 500 ms.

Data Lake Class

The data lake class itself essentially controls the saving of events to the database. For this purpose, it uses the Data Lake InfluxDB Client.

method namedescription
onInvocation

starting the DataLakeInfluxDbClient, registering and initializing new measurement series in InfluxDB

onEventadding empty label field to incoming event and storing event in database
onDetachstopping the DataLakeInfluxDbClient

Image data, unlike events, is not stored directly in database but as Image files in a corresponding directory (writeToImageFile).
In addition, the class contains two utility methods (registerAtDataLake and prepareString)

Data Lake InfluxDB Client Class

Client class that connects to InfluxDB and writes events directly to database. Uses the Data Lake Parameters described above.

method namedescription
validate

checks whether the influxDbHost is valid

connectconnects to the InfluxDB server, sets the database and initializes the batch-behaviour
databaseExistschecks whether the given database exists
createDatabasecreates a new database with the given name
savesaves an event to the connnected InfluxDB database
stopshuts down the connection to the InfluxDB server

TODO:

  • validate(): use validation method (org.apache.commons.validator.routines.InetAddressValidator) instead of regex check


REST API

DataLakeNoUserResourceV3

org.apache.streampipes.rest.impl.datalake

This class contains the basic interface definition for setting up a new measurement series in Data Lake and calls the underlying methods of org.apache.streampipes.dataexplorer.DataLakeNoUserManagementV3. Usage of the related API calls does not require any authentification with valid username and password.

method namerequest typepathdescription
addDataLake

POST

/{measure}adds new measurement series with specified measure name and related event properties (column names) in InfluxDB


TODO:

  • [STREAMPIPES-348]: fix issue with special characters in user-defined measure name
  • add authentication obligation to addDataLake method

DataLakeResourceV3

org.apache.streampipes.ps

This class contains the extended interface definition and calls the underlying methods of org.apache.streampipes.dataexplorer.DataLakeManagementV3 and org.apache.streampipes.dataexplorer.utils.DataExplorerUtils when invoked. Usage of below mentioned API calls requires authentification with valid username and password.

method namerequest typepathdescription

getPage

GET

/data/{index}/paging

returns pages with predefined number of events per page of a specific measurement series from InfluxDB

getAllInfos

GET

/info

returns list with ids of all existing measurement series (including event schema) from InfluxDB

getAllData

GET

/data/{index}

/data/{index}/last/{value}/{unit}

/data/{index}/{startdate}/{enddate}

returns all stored events of a specific mesurement series from InfluxDB

returns an aggregated set of all stored events of a specific mesurement series from InfluxDB

returns all stored events within the specified time frame of a specific mesurement series from InfluxDB

getAllDataGroupingGET/data/{index}/{startdate}/{enddate}/grouping/{groupingTag}returns all events within a specified time frame of a specific mesurement series grouped by a specific tag from InfluxDB

removeAllData

DELETE

/data/delete/all

removes all stored events from InfluxDB
downloadDataGET

/data/{index}/download

/data/{index}/{startdate}/{enddate}/download


downloads all events of a specific mesurement series from InfluxDB in desired format

downloads all events within a specified time frame of a specific mesurement series from InfluxDB in desired format

getImage

GET

/data/image/{route}/file

returns png image from file route

saveImageCoco

POST

/data/image/{route}/coco

stores image as file at file route

getImageCoco

GET

/data/image/{route}/coco

returns image at file route as application/json
labelDataPOST

/data/{index}/{startdate}/{enddate}/labeling/{column}/{timestampColumn}?label=

updates label in specified column for all events within specified time frame to provided label value


TODO:

  • fix export of data from data lake, which currently returns two timestamp fields
  • extend aggregation functionality to support non-numeric values (e.g. strings → majority vote) add the possibility to specify an aggregation function
  • in general: alignment of the single endpoint definitions and consideration of the extensions below


Ideas for possible adaptations and extensions of the REST API

In addition to the TODOs listed above in the text, the following adjustments and enhancements might be worth considering. Thereby, it is important that the implementation of the endpoints is as independent as possible from the technology of the data lake (e.g. avoiding InfluxDB-specific methods).

  • Extension of the remove endpoint by the capability to
    • selectively delete an individual measurement series
    • delete measurements of a measurement series within a specific time interval or before a specific date
  • Adding an edit endpoint for adjusting data lake specific properties such as retention time.

Both extensions could be included in a kind of data management tool in the UI within an admin view (e.g. in the manner of the pipeline view).

Another possible adaptation would be the comprehensive implementation of an append-only approach for time series data. In particular, the functionality of the labelData method would have to be adapted here, which currently works with updates of existing DB entries.


Suggestion for the revised endpoint definitions

pathquery parameters + description
/list
/data/{index}
  • startDate
  • endDate
  • groupBy
  • aggregateFunction
  • timeInterval
/data/{index}/paging
  • page
  • itemsPerPage
/data/{index}/download
  • format
  • startDate
  • endDate
/data/{index}/delete
  • startdate
  • enddate
/clear



References:

  • No labels