Status

Current state

Accepted

Discussion thread

https://lists.apache.org/thread/lrww4d7cdxgtg8o3gt8b8foymzpvq7z3

JIRA

Unable to render Jira issues macro, execution error.

Source Code

Released

03/22/2023

Motivation

It is common for teams using Cassandra to find themselves looking for a way to interact with large amounts of data for analytics workloads. However, Cassandra’s standard APIs aren’t designed for large scale data egress/ingest as the native read/write paths weren’t designed for bulk analytics.

We're proposing the contribution of this CEP which would include a library component called Cassandra Spark Bulk Analytics (CSBA) library. It enables the implementation of custom Spark applications that can either read or write large amounts of Cassandra data at up to 1.7 Gbps/instance reads and up to 7 Gbps/instance writes (depending on hardware), by accessing the persistent storage of nodes in the cluster via the Cassandra Sidecar.

Audience

Software engineering teams and professionals interested in leveraging Apache Spark to build applications that consume or produce large amounts of data persisted in Apache Cassandra.

Data Science professionals who would like to leverage data in Cassandra but may not be able to do so today given the limitations of existing solutions.

Goals

Increase the speed at which Spark jobs can read/write data to/from Cassandra while reducing the impact of those operations on the main Cassandra processes.

Non-Goals

Although much of the infrastructure work done to support Spark would likely be usable by other analytics systems as well, the first release should focus on Spark with follow-on work for separating the Spark-specific code from more general-purpose parts that may be useful in other areas.

Approach

Leveraging existing Cassandra libraries, we can read and write SSTables directly from external systems rather than relying on the read/write paths built into Cassandra. In many use cases, this separation is incredibly useful as we can delegate much of the work the Cassandra cluster would normally do to an external system like Spark, utilizing additional compute and memory resources available in the Spark environment. We leverage the Apache Cassandra Sidecar (Sidecar) today as a system of data transfer for SSTable data, and coordination with the Cassandra storage layer. It is also possible to implement an Object Storage data layer that can read from other data stores if snapshots are stored externally to the running Cassandra system.

Implementation and Proposed Changes

The CSBA library is developed in Java with integration tests written in Python and contains implementations of both Spark Bulk Reader and Spark Bulk Writer. It leverages the Cassandra Sidecar for the bulk of its communications with Cassandra and operational API, but depends heavily on using Cassandra classes as a library for much of its local processing.

As an end-user, using the CSBA library is similar to using other Spark input or output formats.

To write using the bulk writer:


Dataset<Row> df = getInputDataset();

df.write()
  .format("org.apache.cassandra.spark.bulkwriter.CassandraBulkSource")
  .option("SIDECAR_INSTANCES", "localhost,localhost2") // Provide at least one Sidecar host to which we can connect
  .option("KEYSPACE", "spark_test")
  .option("TABLE", "student")
   .option("BULK_WRITER_CL", "LOCAL_QUORUM")
   .option("LOCAL_DC", "DC1")
   .option("KEYSTORE_PATH", "/path/to/keystore")
   .option("KEYSTORE_PASSWORD", getKeystorePassFromSafePlace())
   .option("KEYSTORE_TYPE", "PKCS12")
   .option("CASSANDRA_SSL_ENABLED", "true")
  .mode("append")
  .save();


To read using the Bulk Reader:


final Dataset<Row> df = SQLContext.getOrCreate(sc).read()
    .format("org.apache.cassandra.spark.bulkreader.CassandraDataSource")
    .option("sidecar_instances", "localhost,localhost2") // Provide at least one Sidecar host to which we can connect
    .option("keyspace", "my_keyspace")
    .option("table", "my_table")
    .option("DC", "DC1")
    .option("snapshotName", "my_sbr_snapshot_123")
    .option("createSnapshot", **true**)
    .option("defaultParallelism", sc.defaultParallelism())
    .option("numCores", numCores)
    .load();
    
// sum entire dataset on column 'c'
final long result = df.agg(sum("c")).first().getLong(0);

The Spark Bulk Reader (SBR) enables integration between Cassandra and Spark jobs, allowing users to run arbitrary Spark jobs against a Cassandra cluster securely and consistently and with minimal impact on the Cassandra cluster itself (mostly network traffic and the cost of snapshots). It leverages the SSTable index information loaded from the Cassandra snapshot to read only the portion of the SSTable data file necessary to satisfy the query, so reducing the network bandwidth to the minimum required to consistently read the data set.

The Spark Bulk Writer (SBW) provides an efficient way to write data from Spark into a Cassandra cluster. The Input Spark DataFrame is partitioned into Spark tasks by the appropriate Cassandra partitioning algorithm and sorted based on clustering keys using standard Spark functionality. Each Spark task then creates SSTables using the CQLSSTableWriter implementation in Cassandra for records in a particular token range and pushes them to Cassandra hosts using a direct file upload via Apache Cassandra Sidecar. This avoids impacting the Cassandra JVM. Once the SSTables are uploaded, the files are imported into Cassandra (currently by using the nodetool import command, but future work should allow this to be done via direct JMX calls from the Sidecar itself).

Special care is taken to validate the SSTables before import by:

  1. Creating the SSTables
  2. Calculating a file hash for each file in the SSTable
  3. Reading them in the Spark task before upload
  4. Confirming the file hashes in the Sidecar during the upload process.

This is done to leverage the additional compute capacity most Spark environments provide and eliminate the requirement that nodetool import do an extended verification, which can take significant amounts of IO & CPU on the Cassandra host which could impact the read/write path and consequently the latencies. It is limited to one file per keyspace.

The majority of changes required to implement the Cassandra Spark Bulk Analytics library are actually outside of the existing Cassandra codebase, and are enabled by adding a number of new endpoints to the Apache Cassandra Sidecar (see below). However, the use of Apache Cassandra classes as a library is not well-supported in the Apache Cassandra codebase and, over the years of development and use of this library, we have found several instances where this use-case has been either under-tested or not taken into consideration when changes were made. Therefore, by officially supporting the CSBA library there will be some additional testing and integration burden added to the core Cassandra development effort.

A few examples of areas where changes to the Cassandra codebase caused issues for these use-cases include:

  • Changes that made it impossible to use the CQLSSTableWriter without access to internal-only classes (LocalDate).
  • JNA library usages that threw exceptions where they could be safely ignored when running in Client mode.
  • Changes to static initializers that make it more difficult for consumers to use cassandra-all as a library, requiring several different calls that keep changing rather than, for example, making client mode just work every time.

Additionally, the SBR benefits from the fact that the Cassandra codebase can always read SSTables in the previous major version format. If Cassandra supported both reading from and writing to the previous major SSTable format, we would be able to remove the dependency on embedding multiple versions of the cassandra-all  jar into the solution in order to support mixed-mode clusters, as you would see during major version upgrades.

Architecture Diagrams/Overview of Data Flow

(NOTE: Click to enlarge)

New or Changed Public Interfaces

Sidecar Public Interfaces

Several new endpoints will be added to the Sidecar to provide support for Bulk Analytics. The functionality required includes creation, listing, downloading, uploading and cleaning up SSTable components; getting information about existing keyspaces, tables, schemas; getting information about the Cassandra cluster such as the state of the ring, existing Cassandra versions of the cluster.

  • replica ranges - given a keyspace, what nodes (including pending) own what ranges of tokens
  • partitioner - What partitioner is in use by the cluster
  • schema - Gets the definition of a specific table and the Replication Factor settings
  • keyspaces - Gets a list of key spaces and tables available
  • snapshot - Creates a new snapshot (to be used by the SBR)
  • clear-snapshot - remove snapshots after a SBR job is complete (best-effort)
  • list-snapshot - Lists the files associated with a snapshot
  • upload-sstable - Allows a client to upload SSTable components to a staging directory for later import into Cassandra
  • stream-sstable - Stream SSTable file bytes to the SBR (Already exists in the Sidecar - here for completeness)
  • commit-sstables - imports staged sstables (via a call to nodetool import or some new method, perhaps a write to a virtual table or direct JMX call, given shelling out to nodetool probably isn’t the best solution long-term).
  • clean-upload-session - removes staged SSTables (usually used as a best-effort cleanup in the case of a failed upload)
  • nodetool/gossipinfo - Used to detect some specific instance state information, like when a node is in bootstrap replace state. This could potentially be included in the ring endpoint as well.
  • package-versions - Gets a list of the Cassandra and Sidecar versions for the instance or a consolidated list for the entire cluster, so the CSBA library can make determinations about the version of SSTables to create/read, and detect if it is capable of interacting with the cluster.
    • this could also be implemented by using the sstable version available in gossip state, which may be a more direct source of sstable information than inferring it from the Cassandra version.
  • feature-config - allows for the operator to configure CSBA-related features, such as enabling/disabling SBR/SBW access, fixing SSTable versions to a specific, configured Cassandra version (helpful during upgrades or when the latest SBW would otherwise want to write in a newer format than it currently supports).

Additional Sidecar functionality

  • Periodic disk cleanup for staged uploads and snapshots
    • These files can consume large amounts of disk space (either unimported uploads, or snapshots where the only pointers back to sstables become the snapshot files after the live sstables are compacted)
    • Should have both “normal” expiry (with a configurable expiration time) and some sort of “emergency” cleanup based on percentage of disk available
    • This could live either in the Sidecar or become part of a disk cleanup feature in Cassandra.
  • Keyspace/Table-level access control framework for any of the above APIs that grant access to read or write to Cassandra
    • This should integrate with existing Cassandra authentication and authorization tables so there’s a single source of truth for authentication/authorization decisions.
    • May require caching for some period of time to prevent large numbers of requests from negatively impacting Cassandra itself.

Compatibility, Deprecation, and Migration Plan

As there is no similar functionality today, there’s no deprecation or migration plan necessary. Compatibility is tested in the Test Plan section.

Test Plan

The CSBA library is currently validated three levels of automated testings:

  • Unit tests — written in Java using JUnit, they cover individual classes and components of the library.
  • End-to-end tests — also written in Java using JUnit, but check the library’s behavior end-to-end by executing test Spark applications locally.
  • Integration tests — written in Python. Requires a special Docker container that runs several Cassandra instances + associated Sidecar instances for testing. It may be possible to rewrite these using the in-jvm dtest framework if we could extend that to also run a sidecar instance per cassandra instance.

Potential Improvements

  • There are many features currently built into the Sidecar that could likely be accomplished in Cassandra directly with minimal impact, but it wasn’t feasible when the tool was initially developed and having these features in the Sidecar have other benefits, especially around decoupling these tools from the main Cassandra release cycle and also isolating this functionality from interfering with operations in the Cassandra process itself.

    • Upload/stage SSTables & coordinate import in Cassandra itself
      • This would allow Spark tasks to upload SSTables to only one node and then have that node coordinate the import, validating the desired consistency level was reached
      • This would also significantly reduce the bandwidth requirements between Spark and Cassandra, as today these files are uploaded to every replica
    • Stream SSTable snapshots directly from Cassandra to the Bulk Reader
      • If Cassandra could support range read requests, the Bulk Reader could create a snapshot and then read the data directly from Cassandra
  • While there is nothing inherent in the solution to prevent support for vnodes, they are not currently tested as the testing infrastructure doesn't (yet) support them. Work is ongoing to remove this limitation in the testing infrastructure at which point we should be able to officially test and support vnodes.

Rejected Alternatives

  • Spark Cassandra Connector, as it is significantly slower compared to directly reading and writing SSTables in Spark. The library provides an order-of-magnitude speed up compared to Cassandra reads/writes performed through the default CQL driver library and the Spark Cassandra Connector, with reads and writes several folds faster.
  • Modifications to allow this kind of bulk loading directly to the Cassandra server itself. While we now have zero-copy streaming available in Cassandra, and it would perhaps be possible to use some of that code to be leveraged to reduce the impact on C*, there was no way to do this when the library was initially created (Cassandra 2.1 days) it there may still be good reasons to isolate the uploads from the main Cassandra process. However, this would require a significant rework of the Cassandra Native protocol. While it is theoretically feasible, practically it will be a massive change that may create issues for existing users to migrate over. We might pursue this in the future.
  • Modifications to nodetool import to do “coordinated imports,” mostly due to time constraints. It is likely there could be some value in having Cassandra coordinate imports into multiple instances at once and manage the consistenty level checks. Additionally, it may be possible to use the knowledge that all instances accepted a particular sstable import to mark sstables as repaired, which would cut down on post-import repair.
  • No labels

3 Comments

  1. This looks really nice! Regarding vnodes, they are the default config and all features support them. I think we should only support new tools/features like this if they work with the default config, rather than only with configs for particular use cases where vnodes are disabled.

  2. Therefore, by officially supporting the CSBA library there will be some additional testing and integration burden added to the core Cassandra development effort.

    In my opinion, this burden need to be clearly explained in the CEP as it will impact the amount of work that the community will have to provide if it agrees with it.

    My impression from the CEP is that it will impose some constraints on the C* internals which is something I would like avoid unless we have a clearly defined API. Am I misunderstanding the CEP?

  3. Hey folks - happy to answer questions but I'll be sending the DISCUSS message out a bit later today, and would like to keep the discussion on the mailing list.