Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current state

DraftAccepted

Discussion thread

JIRA

...

https://lists.apache.org/thread/lrww4d7cdxgtg8o3gt8b8foymzpvq7z3

JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyCASSANDRA-16222

Source Code

Released

03/22/2023

Motivation

...


Dataset<Row> df = getInputDataset();

df.write()
  .format("org.apache.cassandra.spark.bulkwriter.CassandraBulkSource")
  .option("CASSANDRASIDECAR_HOSTSINSTANCES", "127.0.0.1,127.0.0.2localhost,localhost2") // Provide at least one CassandraSidecar host to which we can connect
  .option("KEYSPACE", "spark_test")
  .option("TABLE", "student")
   .option("BULK_WRITER_CL", IConsistencyLevel.CL."LOCAL_QUORUM.name("))
   .option("LOCAL_DC", "DC1")
   .option("KEYSTORE_PATH", "/path/to/keystore")
   .option("KEYSTORE_PASSWORD", getKeystorePassFromSafePlace())
   .option("KEYSTORE_TYPE", "PKCS12")
   .option("CASSANDRA_SSL_ENABLED", "true")
  .mode("append")
  .save();

...


final Dataset<Row> df = SQLContext.getOrCreate(sc).read()
    .format("org.apache.cassandra.spark.bulkreader.CassandraDataSource")
    .option("cassandrasidecar_hostsinstances", "127.0.0.1,127.0.0.2localhost,localhost2") // Provide at least one CassandraSidecar host to which we can connect
    .option("keyspace", "my_keyspace")
    .option("table", "my_table")
    .option("DC", "DC1")
    .option("snapshotName", "my_sbr_snapshot_123")
    .option("createSnapshot", **true**)
    .option("defaultParallelism", sc.defaultParallelism())
    .option("numCores", numCores)
    .load();
    
// sum entire dataset on column 'c'
final long result = df.agg(sum("c")).first().getLong(0);

...

Additionally, the SBR benefits from the fact that the Cassandra codebase can always read SSTables in the previous major version format. If Cassandra supported both reading from and writing to the previous major SSTable format, we would be able to remove the dependency on embedding multiple versions of the cassandra-all  jar into the solution in order to support mixed-mode clusters, as you would see during major version upgrades.

Architecture Diagrams/Overview of Data Flow

(NOTE: Click to enlarge)

Image Added

Image Added

New or Changed Public Interfaces

...

  • There are many features currently built into the Sidecar that could likely be accomplished in Cassandra directly with minimal impact, but it wasn’t feasible when the tool was initially developed and having these features in the Sidecar have other benefits, especially around decoupling these tools from the main Cassandra release cycle and also isolating this functionality from interfering with operations in the Cassandra process itself.

    • Upload/stage SSTables & coordinate import in Cassandra itself
      • This would allow Spark tasks to upload SSTables to only one node and then have that node coordinate the import, validating the desired consistency level was reached
      • This would also significantly reduce the bandwidth requirements between Spark and Cassandra, as today these files are uploaded to every replica
    • Stream SSTable snapshots directly from Cassandra to the Bulk Reader
      • If Cassandra could support range read requests, the Bulk Reader could create a snapshot and then read the data directly from Cassandra
  • We currently don’t natively support vnodes in any way, but it should be possible to build vnode support into the system in the futureWhile there is nothing inherent in the solution to prevent support for vnodes, they are not currently tested as the testing infrastructure doesn't (yet) support them. Work is ongoing to remove this limitation in the testing infrastructure at which point we should be able to officially test and support vnodes.

Rejected Alternatives

  • Spark Cassandra Connector, as it is significantly slower compared to directly reading and writing SSTables in Spark. The library provides an order-of-magnitude speed up compared to Cassandra reads/writes performed through the default CQL driver library and the Spark Cassandra Connector, with reads and writes several folds faster.
  • Modifications to allow this kind of bulk loading directly to the Cassandra server itself. While we now have zero-copy streaming available in Cassandra, and it would perhaps be possible to use some of that code to be leveraged to reduce the impact on C*, there was no way to do this when the library was initially created (Cassandra 2.1 days) it there may still be good reasons to isolate the uploads from the main Cassandra process. However, this would require a significant rework of the Cassandra Native protocol. While it is theoretically feasible, practically it will be a massive change that may create issues for existing users to migrate over. We might pursue this in the future.
  • Modifications to nodetool import to do “coordinated imports,” mostly due to time constraints. It is likely there could be some value in having Cassandra coordinate imports into multiple instances at once and manage the consistenty level checks. Additionally, it may be possible to use the knowledge that all instances accepted a particular sstable import to mark sstables as repaired, which would cut down on post-import repair.