Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Added source code contribution for this CEP

...

https://lists.apache.org/thread/lrww4d7cdxgtg8o3gt8b8foymzpvq7z3

JIRA

Source Code

Released

03/22/2023

Motivation

...


Dataset<Row> df = getInputDataset();

df.write()
  .format("org.apache.cassandra.spark.bulkwriter.CassandraBulkSource")
  .option("CASSANDRASIDECAR_HOSTSINSTANCES", "127.0.0.1,127.0.0.2localhost,localhost2") // Provide at least one CassandraSidecar host to which we can connect
  .option("KEYSPACE", "spark_test")
  .option("TABLE", "student")
   .option("BULK_WRITER_CL", IConsistencyLevel.CL."LOCAL_QUORUM.name("))
   .option("LOCAL_DC", "DC1")
   .option("KEYSTORE_PATH", "/path/to/keystore")
   .option("KEYSTORE_PASSWORD", getKeystorePassFromSafePlace())
   .option("KEYSTORE_TYPE", "PKCS12")
   .option("CASSANDRA_SSL_ENABLED", "true")
  .mode("append")
  .save();

...


final Dataset<Row> df = SQLContext.getOrCreate(sc).read()
    .format("org.apache.cassandra.spark.bulkreader.CassandraDataSource")
    .option("cassandrasidecar_hostsinstances", "127.0.0.1,127.0.0.2localhost,localhost2") // Provide at least one CassandraSidecar host to which we can connect
    .option("keyspace", "my_keyspace")
    .option("table", "my_table")
    .option("DC", "DC1")
    .option("snapshotName", "my_sbr_snapshot_123")
    .option("createSnapshot", **true**)
    .option("defaultParallelism", sc.defaultParallelism())
    .option("numCores", numCores)
    .load();
    
// sum entire dataset on column 'c'
final long result = df.agg(sum("c")).first().getLong(0);

...