Featured Blogs - Instaclustr https://www.instaclustr.com/blog/category/featured-blogs/ Managed Services and Support for Apache Cassandra, Apache Kafka, PostgreSQL, OpenSearch, Elasticsearch and Valkey Fri, 28 Mar 2025 00:46:07 +0000 en-US hourly 1 https://wordpress.org/?v=6.8.1 https://www.instaclustr.com/wp-content/uploads/cropped-netapp_favicon-32x32.png Featured Blogs - Instaclustr https://www.instaclustr.com/blog/category/featured-blogs/ 32 32 The power of managed open source software for business success https://www.instaclustr.com/blog/the-power-of-managed-open-source-software-for-business-success/ Fri, 21 Mar 2025 11:08:52 +0000 https://www.instaclustr.com/?p=16042 Managed data infrastructure platforms are becoming essential for enterprises as they tackle the increasing adoption of open source software, rapid application development (including AI-enabled applications), and exponential data growth. A 2024 IDC survey highlights that 96% of businesses utilize open source, with 67% prioritizing it as critical to operations, making simplification and scalability vital. By...

The post The power of managed open source software for business success appeared first on Instaclustr.

]]>
Managed data infrastructure platforms are becoming essential for enterprises as they tackle the increasing adoption of open source software, rapid application development (including AI-enabled applications), and exponential data growth. A 2024 IDC survey highlights that 96% of businesses utilize open source, with 67% prioritizing it as critical to operations, making simplification and scalability vital.

By leveraging managed open source platforms, businesses can access features like automated maintenance, 24×7 monitoring, modern data architectures, robust cybersecurity compliance, and cost-saving integrations with cloud-native applications. These platforms allow IT teams to focus on delivering business value rather than managing underlying infrastructure.

However, IT leaders must consider platform limitations, such as delays in feature updates, cost structures, and migration complexities.

With global data growth expected to reach 163 zettabytes–that’s 163 trillion gigabytes–in 2024 and challenges in hiring skilled data infrastructure professionals, adopting a managed open source platform provides a timely and scalable solution for long-term growth and resilience.

Key benefits of adopting managed open source platforms

Adopting a managed open source platform brings undeniable advantages that directly align with business growth and operational efficiency.

First, these platforms provide unparalleled scalability, enabling businesses to seamlessly handle spikes in data demands without overhauling their infrastructure. This flexibility ensures that enterprises remain agile in the face of evolving market conditions and technological advancements.

Furthermore, managed open source platforms significantly reduce the total cost of ownership (TCO) by eliminating the need for in-house expertise to manage and continuously optimize complex systems. By incorporating automation, these platforms streamline repetitive maintenance tasks, freeing up IT teams to focus on strategic initiatives that directly impact customer satisfaction and revenue growth.

Security is another core advantage. Modern managed platforms are equipped with stringent cybersecurity measures and compliance tools to keep sensitive data protected while meeting the requirements of regulatory bodies. This empowers businesses to maintain trust with customers and partners while mitigating risks.

Finally, adopting managed open source platforms allows businesses to stay competitive by continuously innovating. Access to the latest features, integrations, and enhancements ensures that organizations remain at the forefront of technology trends, all while delivering a seamless user experience to end-users. It’s a strategic investment that drives both immediate impact and long-term resilience.

Conclusion

Managed open source platforms provide enterprises with the necessary infrastructure and tools to keep up with data growth and technological advancements.

By adopting these platforms, businesses can focus on their core competencies while leveraging the latest features and security measures, reducing costs, and remaining agile in a constantly evolving market. It’s a customer-centric approach that sets businesses up for success now and in the future.

So why wait? Embrace managed open source platforms today and propel your business forward!

And remember: it’s not only about managing data, but also about managing growth and delivering value to customers every step of the way. Let managed open source platforms be your partner in achieving that goal.

Get the full report from IDC here.

The post The power of managed open source software for business success appeared first on Instaclustr.

]]>
Understanding and Configuring Elasticsearch™ Node Types https://www.instaclustr.com/blog/understanding-and-configuring-elasticsearch-node-types/ https://www.instaclustr.com/blog/understanding-and-configuring-elasticsearch-node-types/#respond Wed, 14 Jul 2021 08:30:00 +0000 https://www.instaclustr.com/understanding-and-configuring-elasticsearch-node-types/ If you’re getting started with the Open Distro of Elasticsearch, or its replacement, OpenSearch, you need to start with a firm foundation on how to configure nodes within a given cluster. A key part of setting your configurations is deciding what node types you want for individual nodes.

The post Understanding and Configuring Elasticsearch™ Node Types appeared first on Instaclustr.

]]>
Elasticsearch Basics

If you’re getting started with the Open Distro of Elasticsearch, or its replacement, OpenSearch, you need to start with a firm foundation on how to configure nodes within a given cluster. A key part of setting your configurations is deciding what node types you want for individual nodes.

Read: OpenSearch and Elasticsearch Architecture

Much like Apache Cassandra—where multiple copies of the data are kept across nodes and you can add more to a live cluster—Elasticsearch scales horizontally. With Apache Lucene at the core, Elasticsearch provides a distributed, scalable, low-latency way to search your most valuable information, from articles to logs.

Within Elasticsearch, each node (be it a physical server, virtual server, etc.) is part of a cluster. Each cluster is simply a collection of different nodes, and each node can only be a part of one cluster (so clusters can’t share nodes). 

As a distributed system, Elasticsearch automatically handles partitioning documents into shards (which are stored on multiple nodes), balancing shards across nodes, duplicating shards for redundancy and availability, and routing requests to the appropriate node. These management features are the essence of what Elasticsearch does “above” the actual Apache Lucene indices.

Elasticsearch clusters

Clusters in Elasticsearch are identified by unique names, and can automatically reorganize when individual nodes are added or removed. Different node types within a cluster perform different tasks, and, like clusters, each node gets a unique name. Note that while the terminology regarding node types may change in the evolution from the Open Distro of Elasticsearch to OpenSearch, the core concepts and node tasks for each role will remain the same.

The main node types you need to be familiar with are master, data, ingest, and coordinating. Read on to learn more about different node types, best practices for node type configurations, and how to configure node types using the Instaclustr Managed Platform and APIs.

An example of a three-node cluster, where each node is master-eligible and can serve in the data, ingest, and coordinating role as needed. 

Master Nodes

Typically in Elasticsearch, all nodes are master-eligible by default. Master nodes are responsible for certain critical tasks throughout the cluster, including creating or deleting indexes, tracking nodes, and allocating shards to nodes. The master node for your cluster is elected from among your master-eligible nodes, and there can only be one node serving in the master role at a time.

Dedicated master nodes can be provisioned with fewer resources than data nodes because they only handle cluster state and not user data (note that in the Instaclustr Managed Platform, during provisioning and configuration users select “dedicated master nodes” rather than “dedicated master-eligible nodes”).

Placing these nodes into different failure zones or availability zones of a cloud, along with multiple copies of the data on the data nodes, enables the cluster as a whole to survive numerous types of server, zone, and data center failures. 

The master node is elected from among your dedicated master nodes (also known as dedicated master-eligible nodes). 

Data Nodes

Data nodes are responsible for holding data and performing data-related operations. This includes CRUD operations, indexing, search, and aggregations. You can configure data nodes so that they only do search and aggregation, not any indexing, to reduce the load in the individual nodes. All nodes are data nodes by default. 

Information is broken into shards and stored across different data nodes in your cluster.

Coordinating Nodes

Coordinating nodes direct requests to a master node or data node. These nodes essentially act as smart load balancers. Coordinating nodes help reduce the load on individual data and master nodes, which makes a particularly big difference in the case of large clusters. Coordinating nodes are sometimes called “client” nodes. All nodes are by default coordinating nodes.

Ingest Nodes

Ingest nodes are in charge of pre-processing documents before they are indexed. These are also known as “transform” nodes because they help transform documents for indexing. All nodes are also ingest nodes by default. Some organizations elect to use ingest nodes instead of Logstash for piping in and processing log data. 

An example of how an organization might organize ingest, data, dedicated master, and client nodes in a large cluster. 

Creating a Cluster in the Instaclustr Managed Platform

For Instaclustr Managed Elasticsearch customers, new clusters can easily be created from within the Instaclustr Managed Console. The minimum you need to get started is to choose your cluster name, technology (the Open Distro of Elasticsearch in this case), and software version.

Then you choose your cloud and any enterprise features you want.

Then choose different features of your cluster setup (such as dedicated master or Kibana nodes), as well as your cloud region and your number of nodes, their type, and their sizes.

If you prefer to interact with the platform via API, you can also create a new cluster through our Provisioning API. API requests should go in the form of a POST request to the provisioning API here:

https://api.instaclustr.com/provisioning/v1/extended

The following is an example of a JSON payload for a POST API request to provision a three-node production cluster hosted in AWS.

{
  "bundles": [
    {
      "bundle": "ELASTICSEARCH",
      "options": {
        "clientEncryption": "true",
        "dedicatedMasterNodes": "true",
        "masterNodeSize": "m5l-250-v2",
        "securityPlugin": "true"
      },
      "version": "opendistro-for-elasticsearch:1.8.0"
    }
  ],
  "clusterName": "elasticsearch_test",
  "clusterNetwork": "192.168.0.0/18",
  "dataCentre": "US_EAST_1",
  "nodeSize": "m5l-250-v2",
  "provider": {
    "name": "AWS_VPC"
  },
  "rackAllocation": {
    "nodesPerRack": 1,
    "numberOfRacks": 3
  },
  "slaTier": "PRODUCTION"

For further details on making API calls check out our API documentation here.

Configuring Node Types in the Instaclustr Managed Platform

The great thing about the Instaclustr Managed Platform is how easy it makes it to provision and configure your Elasticsearch cluster through your preferred means: the Console (no code required) or API. You can easily spin up a cluster, set node types and sizes, monitor performance, and resize as necessary.

Within the Instaclustr Platform, there are no dedicated data, ingest, or coordinator nodes; rather, if using dedicated master nodes, then by default the other nodes will hold all of the alternative roles and perform related tasks as needed.

The following are some tips to consider when configuring your own cluster:

Tip #1: Use Dedicated Master Nodes for Production Clusters: Typically all nodes are master-eligible, but if those nodes go under heavy load it can overwhelm the master node and cause problems for your cluster. Having three dedicated master nodes that are responsible just for serving in this role, and not for data or ingest, prevents this issue.

You can easily configure this setup for your cluster in the Console for the Instaclustr Managed Platform by checking the appropriate box when you’re in the Elasticsearch Setup section:

Tip #2: Monitor CPU Usage and OS Load: If your node is at the upper end of its potential CPU usage, or if its average OS load is greater than the number of cores, it may be time to resize or add additional nodes for greater processing capabilities.

If you’re an Instaclustr Managed Elasticsearch customer, you can easily monitor CPU usage from within the Console or via our Monitoring API. If you need to resize a node after provisioning, this can also be done via the Console or the Provisioning API without the need to get help from Instaclustr Support.

From within the Instaclustr Console, you can monitor both CPU usage and OS load for the individual nodes in your cluster. Here is an example of a CPU usage graph for a test cluster. 

Using the Monitoring API, you can also make a GET API call to get metrics. Users need to make an GET request to this address:

https://api.instaclustr.com/monitoring/v1/{clusterId}/metrics

n::cpuUtilization is the metric value to get CPU utilization and n::osload is the value for metric to use for current OS load.

You can draw from the following code sample in Python and Python 3 to make a request:

import http.client

conn = http.client.HTTPSConnection("api.instaclustr.com")

headers = { 'authorization': "Basic REPLACE_BASIC_AUTH" }

conn.request("GET", "/monitoring/v1/{clusterId}/metrics", headers=headers)

res = conn.getresponse()
data = res.read()

print(data.decode("utf-8"))

Tip #3: Choose Higher-End SSD Instances for Critical Workloads: Solid state drives (SSDs) generally provide the highest performance for Elasticsearch nodes because Elasticsearch workloads tend to be I/O bound. But when setting up your cluster, you’ll have a range of disk sizes to choose from.

When selecting your disk size, it’s important to know your anticipated workload. For data that is going to be infrequently accessed and where latency isn’t an issue, such as security logs, you can likely choose a less expensive storage option to save money. However, if a customer interaction is downstream, such as if you’re using Elasticsearch for the search function within a customer-facing application, it’s worth getting the fastest disks available to optimize these interactions.

How Will Things Change With OpenSearch?

If you’re currently using the Open Distro of Elasticsearch, you’re likely planning to move to its replacement, OpenSearch. The core node types will not change as a part of this transition, though some terminology may change. To learn more, continue to check out the Instaclustr blog for updates as we roll out OpenSearch as part of our Managed Platform for our customers.

Have more questions on configuring node types or other best practices for Elasticsearch or OpenSearch? Reach out to schedule a consultation with one of our experts.

Also Read Elasticsearch test drive Including Documents, Mappings, and Indexing; Stemming, Lemmatization, Levenshtein Fuzzy Queries, N-grams, and Slop!

Transparent, fair, and flexible pricing for your data infrastructureSee Instaclustr Pricing Here

Related Guides:

  1. Apache Kafka: Architecture, deployment and ecosystem [2025 guide]

  2. Understanding Apache Cassandra: Complete 2025 Guide

  3. Complete guide to PostgreSQL: Features, use cases, and tutorial

Related Products:

  1. NetApp Instaclustr Data Platform

The post Understanding and Configuring Elasticsearch™ Node Types appeared first on Instaclustr.

]]>
https://www.instaclustr.com/blog/understanding-and-configuring-elasticsearch-node-types/feed/ 0
The Power of Apache Kafka® Partitions: How to Get the Most out of Your Kafka Cluster https://www.instaclustr.com/blog/the-power-of-kafka-partitions-how-to-get-the-most-out-of-your-kafka-cluster/ https://www.instaclustr.com/blog/the-power-of-kafka-partitions-how-to-get-the-most-out-of-your-kafka-cluster/#respond Mon, 06 Jan 2020 12:11:51 +0000 https://www.instaclustr.com/the-power-of-kafka-partitions-how-to-get-the-most-out-of-your-kafka-cluster/ This blog provides an overview of the two fundamental concepts in Apache Kafka: Topics and Partitions. While developing and scaling our Anomalia Machina application we have discovered that distributed applications using Apache Kafka and Cassandra clusters require careful tuning to achieve close to linear scalability, and critical variables included the number of Apache Kafka topics and partitions. In this blog, we test that theory and answer questions like “What impact does increasing partitions have on throughput?” and “Is there an optimal number of partitions for a cluster to maximize write throughput?” And more!

The post The Power of Apache Kafka® Partitions: How to Get the Most out of Your Kafka Cluster appeared first on Instaclustr.

]]>
This blog provides an overview of the two fundamental concepts in Apache Kafka: Topics and Partitions. While developing and scaling our Anomalia Machina application we have discovered that distributed applications using Apache Kafka and Cassandra® clusters require careful tuning to achieve close to linear scalability, and critical variables included the number of Apache Kafka topics and partitions.

In this blog, we test that theory and answer questions like “What impact does increasing partitions have on throughput?” and “Is there an optimal number of partitions for a cluster to maximize write throughput?” And more!

1. Introduction to Kafka Partitions

Two fundamental concepts in Apache Kafka are Topics and Partitions

Topics are fundamental to how Kafka works as a streaming distributed system. They enable Kafka producers and Kafka consumers to be loosely coupled (isolated from each other) and are the mechanism that Apache Kafka uses to filter and deliver messages to specific consumers. Consumers subscribe to 1 or more topics of interest and receive messages that are sent to those topics by producers. 

2. How partitioning works

Partitions are the main concurrency mechanism in Kafka. A topic is divided into 1 or more partitions, enabling producer and consumer loads to be scaled. Specifically, a consumer group supports multiple consumers—as many consumers as partitions for a topic. The consumers are shared evenly across the partitions, allowing for the consumer load to be linearly scaled by increasing both consumers and partitions. If a consumer instance fails, the partitions are rebalanced across the remaining consumers in the group.

You can have fewer consumers than partitions (in which case consumers get messages from multiple partitions), but if you have more consumers than partitions some of the consumers will be “starved” and not receive any messages until the number of consumers drops to (or below) the number of partitions; i.e. consumers don’t share partitions (unless they are in different consumer groups). 

Here are some useful partition facts to get you started:

Each kafka topic can have one or more partitions.

  • There is no theoretical upper limit to the number of Kafka partitions you can or should have, but there is a minimum of one partition per topic. You can request as many partitions as you like, but there are practical limits to the number of partitions Kafka can handle.

The size (in terms of messages stored) of partitions is limited to what can fit on a single node.

  • If you have more data in a topic than can fit on a single node you must increase the number of partitions. Partitions are spread across the nodes in a Kafka cluster.

Message ordering in Kafka is per partition only.

Avoid unbalanced partitions

  • If you are using an (optional) message key (required for event ordering within partitions, otherwise events are round-robin load balanced across the partitions—and therefore not ordered)—then you need to ensure you have many more distinct keys (> 20 is a good start) than partitions otherwise partitions may get unbalanced, and in some cases may not even have any messages (due to hash collisions). 

Partitions can have copies to increase durability and availability

  • This will enable Kafka to failover to a broker with a replica of the partition if the broker with the leader partition fails. This is called the Replication Factor (RF) and can be 1 or more.

Replication Factor

The total number of copies of a partition is the replication factor—i.e. RF=1 means that the leader has the sole copy of the partition (there are no followers);  2 means there are 2 copies of the partition (the leader and a follower); and 3 means there are 3 copies (1 leader and 2 followers).

Note that the partition leader handles all writes and reads, as followers are purely for failover. Cleverly, followers just run Consumers to poll the data from the leaders. Partitions and Replication Factor can be configured cluster-wide or set/checked per topic (with the ic-Kafka-topics command for Instaclustr managed Kafka clusters).

  1. Kafka Partition Example

The following diagrams (from the insidebigdata series we published last year on Apache Kafka architecture) illustrate how Kafka partitions and leaders/followers work for a simple example (1 topic and 4 partitions), enable Kafka write scalability (including replication), and read scalability:

Figure 1: Kafka write scalability—showing concurrent replication to followers

Figure 2: Kafka read scalability—partitions enable concurrent consumers

4. Kafka Partitions and Replication Factor

We were curious to better understand the relationship between the number of partitions and the throughput of Kafka clusters.

While developing and scaling our Anomalia Machina application we have discovered that distributed applications using Apache Kafka and Cassandra clusters require careful tuning to achieve close to linear scalability, and critical variables included the number of topics and partitions. We had also noticed that even without a load on the Kafka cluster (writes or reads), there was measurable CPU utilization which appeared to be correlated with having more partitions.

We had a theory that the overhead was due to (attempted) message replication—i.e. the polling of the leader partitions by the followers. If this is true, then for a replication factor of 1 (leaders only) there would be no CPU overhead with increasing partitions as there are no followers polling the leaders. Conversely, increasing the replication factor will result in increased overhead. Our methodology to test this theory was simply to measure the CPU utilization while increasing the number of partitions gradually for different replication factors.

The test setup used a small production Instaclustr managed Apache Kafka cluster as follows:

3 nodes x r5.xlarge (4 cores, 32GB RAM) Instaclustr managed Kafka cluster (12 cores in total)

This graph shows the CPU overhead on the Kafka cluster with partitions increasing from 1 to 20,000, with replication factor 1 (blue), 2 (orange), and 3 (grey), for 1 topic. We also tried 100 topics (yellow, RF=3) with increasing partitions for each topic giving the same number of total partitions. 

CPU overhead on the Kafka cluster

This graph confirms that CPU overhead increases due to increasing replication factor and partitions, as CPU with RF=1 is constant (blue). It also demonstrates that overhead is higher with increasing topics (but the same number of total partitions, yellow), i.e. 100 topics with 200 partitions each have more overhead than 1 topic with 20,000 partitions.  

Note that we used up to 20,000 partitions purely to check our theory. In practice, too many partitions can cause long periods of unavailability if a broker fails. If there are many partitions it takes a long time (potentially 10s of seconds) to elect new leaders for all the partitions with leaders that are on the failed broker. 

Also note that if the partitions are increased (e.g. using the ic-Kafka-topics command) too fast, or to a value that is too large, then the clusters can be overloaded and may become unresponsive. It pays to increase the number of Kafka partitions in small increments and wait until the CPU utilization has dropped back again.

Transparent, fair, and flexible pricing for your data infrastructure: See Instaclustr Pricing Here

5. Partitions and Producer Throughput

Next, we wanted to find out a couple of things with more practical applications: What impact does increasing Kafka partitions have on throughput? And is there an optimal number of partitions for a cluster (of this size) to maximize write throughput?

Our methodology was to initially deploy the Kafka producer from our Anomalia Machina application as a load generator on another EC2 instance as follows:

1 x m4.4xlarge (16 core, 64GB RAM) EC2 instance

This isn’t a particularly large EC2 instance, but Kafka producers are very lightweight and the CPU utilization was consistently under 70% on this instance. We ran a series of load tests with a multi-threaded producer, gradually increasing the number of threads and therefore increasing the arrival rate until an obvious peak was found. We repeated this test for different numbers of partitions. The replication factor was 3, and the message size was 80 bytes.  Here’s a graph showing one run for 3 partitions showing producer threads vs. arrival rate, with a peak at 4 threads.

Producer Threads vs Arrival Rate

Repeating this process for 3 to 5,000 partitions we recorded the maximum arrival rate for each number of partitions resulting in this graph (note that the x-axis, partitions, is logarithmic), which shows that the optimal write throughput is reached at 12 partitions, dropping substantially above 100 partitions. The throughput at 5,000 partitions is only 28% of the maximum throughput. There is however only a 7% variation in throughput between 3 and 100 partitions, showing that the number of partitions isn’t really critical until it exceeds 100.

Partitions vs Maximum Arrival Rate

Twelve partitions also correspond to the total number of CPU cores in the Kafka cluster (3 nodes with 4 CPU cores each). Note that the total number of followers is (RF-1) x partitions = (3-1) x 12 = 24 which is higher but still in the “sweet spot” between 12 and 100 on the graph and maximizes the utilization of the available 12 CPU cores.

News Flash! In September 2022 we redid some of these experiments with the new version of Kafka with KRaft, and new hardware, and the results are surprising! Check out part 1 of the new series: Apache Kafka® KRaft Abandons the Zoo(Keeper): Part 1— Partitions and Data Performance

6. End-To-End Throughput and Latency Experiment

Real Kafka clusters naturally have messages going in and out, so for the next experiment, we deployed a complete application using both the Anomalia Machine Kafka producers and consumers (with the anomaly detector pipeline disabled as we are only interested in Kafka message throughput).

We used a single topic with 12 partitions, a producer with multiple threads, and 12 consumers. We monitored the producer and consumer message rates (to ensure the consumers were keeping up), and the total end-to-end latency (time from a message sent to message receive).

Replica Fetcher Threads and Producer Acks

Some articles (e.g.  Kafka Performance Tuning—Ways for Kafka OptimizationProducer Performance Tuning for Apache Kafka, Processing Trillions of Events per Day With Apache Kafka on Azure) suggest that Kafka cluster throughput can be improved by tuning the number of replica threads (the Kafka configuration parameter “num.replica.fetchers”). 

This parameter sets the number of fetcher threads available to a broker to replicate the message. As the number of partitions increases there may be thread contention if there’s only a single thread available (1 is the default), so increasing the number of threads will increase fetcher throughput at least. 

Configuration Values

For Instaclustr managed Kafka clusters this isn’t a parameter that customers can change directly, but it can be changed dynamically for a cluster, i.e. without node restarts. We will typically do this as part of a joint performance tuning exercise with customers. Here’s the list of Instaclustr Kafka default configurations.  Customers can inspect configuration values that have been changed with the kafka-configs command:

./kafka-configs.sh --command-config kafka.props --bootstrap-server <kafka broker public IP>:9092 --entity-type brokers --entity-default --describe

Default config for brokers in the cluster are:

   num.replica.fetchers=4 sensitive=false synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:num.replica.fetchers=4}

Increasing Fetcher Threads

Starting with the default producer acks=1 setting, increasing the fetcher threads from 1 to 4 gave a slight increase (7%) in the throughput (8 or more fetchers resulted in a drop in throughput so we focused on 1 or 4).  Latency ranged from a low of 7ms to 15ms at the peak throughput at both settings.

Maximum throughput, Acks=1

Comparing Different Settings

For comparison we also tried acks=all and the idempotent producer (in the producer set the “enable.idempotence” property to true) which ensures “exactly once” delivery (and which automatically sets acks=all). These two settings produced identical results so only the acks=all results are reported.  

This graph compares the maximum throughput for acks=1 (blue) and acks=all (green) with 1 fetcher thread (the default). Suprisingly the acks=all setting gave a 16% higher throughput.

Maximum throughput, 1 fetcher

Latency at Maximum Throughput

Less of a surprise (given that the producer waits for all the followers to replicate each record) is that the latency is higher for acks=all. The latency at the maximum throughput is double (30ms) that of the acks=1 setting (15ms).

Latency at maximum throughput

This graph shows the maximum throughput for acks=1 (blue) and acks=all (green) with 1 and 4 fetchers.  In practice, there wasn’t much difference in throughput between 1 and 4 fetchers for acks=all. Latencies were unchanged (i.e. latency of acks=all results were double the latency of acks=1 irrespective of fetcher threads).

Maximum TPS

We were initially puzzled that throughput for acks=all was as good or better than with acks=1. It’s still not obvious how it can be better, but a reason that it should be comparable is that consumers only ever read fully acknowledged messages, so as long as the producer rate is sufficiently high (by running multiple producer threads) the end-to-end throughput shouldn’t be less with acks=all. Also note that as the Kafka producer is actually asynchronous, the impact of the acks setting doesn’t directly impact the producer throughput or latency (i.e. the writes are handled in the producer buffer which has separate threads).

We also tried changing the number of “min.insync.replicas” from the default of 1 to 3.  However, this didn’t have any impact on the throughput. It turns out that changing the value only impacts durability and availability, as it only comes into play if a node gets out of sync, reducing the number of in-sync replicas and impacting how many replicas are guaranteed to have copies of the message and also availability (see below).

H2 7. Selecting the producer “acks” setting

How should you decide what producer acks settings out of the two that we tested (acks=1 or acks=all) to use? (note: acks=0 is also possible but it has no guarantee of message delivery if the leader fails).

You should set acks based firstly on your data durability and idempotency requirements, and then secondly on your latency requirements, and then lastly take into account throughput (as throughput can easily be increased with a bigger cluster). You can have both high durability and high throughput by using acks=all (or idempotent). Increasing the fetcher threads from 1 to 4 doesn’t have any negative impact and may improve throughput (slightly). 

However, if you need low latency then acks=1 is hard to beat, although a lightly loaded cluster (e.g. < 50% CPU utilization) with acks=all may also work. This is because the lowest load acks=all result (green) had a similar latency (12ms) to the latency at the maximum load for the acks=1 result (blue, (15ms), but the latency increased rapidly to the reported 30ms at the maximum load.

Latency with increasing producer threads

You will also want to take into account availability when setting acks. With acks=1, writes will succeed as long as the leader partition is available, so for a RF=3, 3 node cluster, you can lose up to 2 nodes before writes fail. For acks=all, writes will succeed as long as the number of insync replicas is greater or equal to the min.insync.replicas.  Acks=1 and Acks=All with min.insync.replicas=1 have the same availability (2 out of 3 nodes can fail), but as min.insync.replicas increases the availability decreases (1 node can fail with min.insync.replicas=2, and none can fail with 3). 

The Impact of Acks

This handy table summarizes the impact of the producer acks settings (for RF=3) on Durability, Availability, Latency, and Throughput:

Acks min.insync.replicas Durability Availability Latency Throughput
1 any Worst Best Best Good
All 1 Worst Best Worst Best
All 2 Good Good Worst Best
All 3 Best Worst Worst Best

Writing Records to Partitions

Apache Kafka applications use the producer client to write records to a Kafka cluster—see the Apache producer documentation for further information. Since Kafka 2.4 the default behavior when there is no key has been to use the sticky partitioner. You can also create a custom partitioner like they did in this blog.

 8. Conclusions

  1. The optimal number of partitions (for maximum throughput) per cluster is around the number of CPU cores (or slightly more, up to 100 partitions), i.e. cluster CPU cores >= optimal partitions <= 100

  2. Too many partitions result in a significant drop in throughput (however, you can get increased throughput for more partitions by increasing the size of your cluster).

  3. At the optimal number of partitions (12 for our experiments), increasing num.replica.fetchers from the default of 1 to 4 doesn’t have a substantial impact on throughput or latency.

  4. Setting producer acks=all can give comparable or even slightly better throughput compared with the default of acks=1.

  5. Setting producer acks=all results in higher latencies compared with the default of acks=1.

  6. Both producer acks=all and idempotence=true have comparable durability, throughput, and latency (i.e. the only practical difference is that idempotence=true guarantees exactly once semantics for producers).

Related Guides:

  1. Apache Kafka: Architecture, deployment and ecosystem [2025 guide]

  2. Understanding Apache Cassandra: Complete 2025 Guide

  3. Complete guide to PostgreSQL: Features, use cases, and tutorial

Related Products:

  1. NetApp Instaclustr Data Platform

The post The Power of Apache Kafka® Partitions: How to Get the Most out of Your Kafka Cluster appeared first on Instaclustr.

]]>
https://www.instaclustr.com/blog/the-power-of-kafka-partitions-how-to-get-the-most-out-of-your-kafka-cluster/feed/ 0
Cassandra vnodes: How many should I use? https://www.instaclustr.com/blog/cassandra-vnodes-how-many-should-i-use/ https://www.instaclustr.com/blog/cassandra-vnodes-how-many-should-i-use/#respond Fri, 23 Nov 2018 02:26:20 +0000 https://www.instaclustr.com/cassandra-vnodes-how-many-should-i-use/ The concept of virtual nodes (otherwise known as vnodes) has been a major feature of Apache Cassandra since it was introduced in version 1.2, back at the start of 2013. As far as major new Cassandra features go it has been very successful, however, a full understanding of the impact on real production clusters (and therefore the best practices regarding usage) has evolved over time.

The post Cassandra vnodes: How many should I use? appeared first on Instaclustr.

]]>
For quite some time, Instaclustr has been tuning the number of vnodes that we use when deploying large clusters. Recently, we have extended this to make 16 vnodes the default for all new Cassandra 3+ clusters deployed. This blog post explains the background and benefits of this change.

The concept of virtual nodes (otherwise known as vnodes) has been a major feature of Apache Cassandra since it was introduced in version 1.2, back at the start of 2013. As far as major new Cassandra features go it has been very successful, however, a full understanding of the impact on real production clusters (and therefore the best practices regarding usage) has evolved over time.

Some background

Vnodes were originally implemented to solve several overlapping problems caused by only having a single contiguous token-range per node:

  1. For any given replication factor r, there were at most only r nodes available to stream copies of each node’s data. This limited the maximum throughput for tasks like repair or when bootstrapping new nodes. When a node is offline or inconsistent we generally want to get it to a consistent state as quickly as possible.
  2. Adding (and removing) nodes to an established cluster would either require a series of expensive token movements or require the cluster to double (or halve) in size to ensure it remained balanced.
  3. It was not a trivial task to tune the load amongst nodes in order to take advantage of different (more powerful) hardware on some nodes. This required some educated guesswork, some trial-and-error, and a lot of time.

The idea of splitting each node’s token-range into multiple smaller ranges (virtual nodes) and spreading these around the ring has mostly proven to be an effective solution to those problems. Each node can now be responsible for multiple token-ranges, instead of only one. This means that a single node now shares replicas with a much larger proportion of nodes within the cluster, and is able to stream data to many other nodes, rather than only a limited few.

To ensure that deployment remained simple, a token generation algorithm was implemented that randomly generated the set of tokens for each node when that node bootstrapped (alternatively, it is also possible to set manually-crafted token values). A default value of 256 tokens per node was chosen in order to ensure that the randomly generated tokens would provide an even distribution of the token space. This value can be configured via the num_tokens option.

As time went by, it was discovered that vnodes also caused some negative performance impacts in certain cases and that they don’t always ensure that the cluster is evenly balanced. The large number of vnodes required to achieve a balanced cluster introduces performance overheads on many operational tasks (such as repair) and can increase overheads when using analytical tools like Apache Spark or Hadoop map-reduce with Cassandra, causing some tasks to take more time to complete. Similar performance overheads have been encountered when using search engines like Cassandra-Lucene-index, DSE Search, or Elassandra with Cassandra. In some use-cases these overheads are not significant enough to trade-off the consequences of not using vnodes, however, in many cases, the impact can be very large. Reducing the number of vnodes can help in this situation, however, with older Cassandra versions this often caused the cluster to become more unbalanced.

The biggest problem with vnodes appears once a cluster grows to a relatively large number of nodes. As the size of a cluster increases, so does the chance of hot spots – nodes that happen to own more token-space than the others – forming around the cluster. Since the number of vnodes cannot be easily modified once a cluster is running (especially on a production cluster), choosing the optimal number of vnodes when first deploying a cluster can be an important decision.

A new approach

In Cassandra 3.0, this issue was addressed with a new approach to vnode allocation. Rather than allocating tokens randomly, new tokens may instead be allocated such that the largest token ranges are split, with the goal of reducing hot spots. This new approach is optional and requires that a target keyspace is specified. The replication factor of the target keyspace is used to optimise the placement of new tokens. To achieve the best results, all keyspaces that hold a large amount of data should use the same replication factor as the target keyspace.

While the main goal of the new allocation algorithm is to reduce token-space imbalance for larger clusters, it also reduces imbalance for smaller clusters. This allows the number of vnodes to be significantly reduced, while still achieving a similar level of balance.

We have performed testing on both moderate sized clusters (20-500 nodes, with multiple racks and multiple DCs) and smaller clusters, and have found that a num_tokens value of 16 is optimal for the majority of use cases when used in conjunction with the new allocation algorithm. This appears to be the smallest value which still provides an acceptable distribution of the token-space. When using the old random-allocation algorithm (which remains the default), we recommend that the default value of 256 is used in order to achieve a balanced token allocation.

In our testing, we found that a variance of roughly 15% between the nodes with the smallest and largest token space allocation can be expected when using a num_tokens value of 16 along with the new allocation algorithm. Moving up to a num_tokens value of 32 provided only a modest decrease, producing a variance of 11%, whereas a cluster with only 8 tokens would have a significant increase with 22% variance.

An interesting anomaly was discovered when testing with a num_tokens value of 8. The balance variance was smaller for small clusters of 6-12 nodes and increased steadily up to 22% for a 60 node cluster. It did not appear to rise from this point onwards, as this value remained steady for a 120 node cluster. This could be due to the fact that with only 8 tokens per node there is still a high chance that new token ranges will be unbalanced when adding nodes to smaller clusters.

All Cassandra clusters recently deployed with Instaclustr are already using the new algorithm with a num_tokens value of 16, and this will remain the default configuration for all new Instaclustr clusters going forward. The target keyspace used for the allocation algorithm is automatically created based on the target replication factor chosen at cluster creation.  Existing managed clusters will remain using the default randomised allocation algorithm as change this setting would require a full data center migration.

Sign up for a free trial or contact us for any queries.

Related Guides:

  1. Apache Kafka: Architecture, deployment and ecosystem [2025 guide]
  2. Understanding Apache Cassandra: Complete 2025 Guide
  3. Complete guide to PostgreSQL: Features, use cases, and tutorial

Related Products:

  1. NetApp Instaclustr Data Platform

The post Cassandra vnodes: How many should I use? appeared first on Instaclustr.

]]>
https://www.instaclustr.com/blog/cassandra-vnodes-how-many-should-i-use/feed/ 0
Third Contact with a Monolith: Part C—In the Pod https://www.instaclustr.com/blog/third-contact-monolith-part-c-pod/ https://www.instaclustr.com/blog/third-contact-monolith-part-c-pod/#respond Fri, 29 Sep 2017 09:16:28 +0000 https://www.instaclustr.com/third-contact-monolith-part-c-pod/ A Simple Classification Problem: Will the Monolith React? Is It Safe?! Maybe a cautious approach to a bigger version of the Monolith (2km long) in a POD that is only 2m in diameter is advisable.   What do we know about how Monoliths react to stimuli? A simple classification problem consists of the category (label) “no...

The post Third Contact with a Monolith: Part C—In the Pod appeared first on Instaclustr.

]]>
A Simple Classification Problem: Will the Monolith React? Is It Safe?!

Maybe a cautious approach to a bigger version of the Monolith (2km long) in a POD that is only 2m in diameter is advisable.   What do we know about how Monoliths react to stimuli? A simple classification problem consists of the category (label) “no reaction” (0) or “reaction” (1), and the stimuli tried (features which will be used to predict the label).  In the following table, the first column in the label to be predicted (positive, 1, or negative, 0), the remaining columns are the features, and each row is an example:

classification problem table Instaclustr

This problem is trivial as a positive reaction only occurred for Touch OR Sunlight (the camera flash was a coincidence), the features are all binary, and only a single feature at a time is true. As a result of extensive tests on the Monolith imagine that there are lots more examples and data available, each feature is a floating-point number, all features don’t have values for all examples, there are potentially 1,000s of features, and a sufficiently accurate classification rule may require an arbitrary number of features.

Given that HAL has been unplugged we have to analyze the data ourselves. What do we have available in the POD?

Apache SPARK and MLLib

One approach is to use an appropriately 1960’s-vintage machine learning algorithm suitable for simple binary classification problems. Decision tree algorithms were invented in the 1960’s (Concept Learning System, by a Psychologist, Earl Hunt), were improved in the 1980’s (e.g. ID3, C4.5, and FOIL, which references my 1988 machine learning algorithm, Gargantubrain), and are still useful. Spark’s Machine Learning Library (MLLib) has a number of regression and classification algorithms, including decision trees, that improve on the originals by running them “transparently” in parallel on multiple servers for scalability.

Spark’s scalable ML library is available here. It’s easy to download Spark and run MLLib examples locally, but the scalability benefits are best realised after deployment to an Instaclustr managed Spark cluster. Here’s the documentation on the MLLib decision tree algorithm.

Training

We’ll have a look at the example Java code, starting from the call to the decision tree algorithm to build the model from the examples, and working out from there. DecisionTree and DecisionTreeModel documents are relevant.  Here’s the code.  Note the API for trainClassifier:

trainClassifier(JavaRDD<LabeledPoint> input, int numClasses,
java.util.Map<Integer,Integer> categoricalFeaturesInfo, String
impurity, int maxDepth, int maxBins)

The first argument is a JavaRDD of LabeledPoint. The other arguments are for control of the algorithm:

// Set parameters for DecisionTree learning.

// Empty categoricalFeaturesInfo indicates all features are continuous.

Integer numClasses = 2;

Map<Integer, Integer> categoricalFeaturesInfo = new HashMap<>();

String impurity = “gini”; // or “entropy”

Integer maxDepth = 5;

Integer maxBins = 32;

// Train DecisionTree model

DecisionTreeModel model = DecisionTree.trainClassifier(trainingData, numClasses, categoricalFeaturesInfo, impurity, maxDepth, maxBins);

System.out.println(“Learned classification tree model:\n” + model.toDebugString());

What is a JavaRDD and LabeledPoint?

LabeledPoint

Pinch point Instaclustr

https://spark.apache.org/docs/latest/mllib-data-types.html#labeled-point

https://spark.apache.org/docs/latest/api/java/org/apache/spark/mllib/regression/LabeledPoint.html

Decision trees are a type of supervised learner, so we need a way of telling the algorithm what class (positive or negative for binary classifiers) each training example is.  LabeledPoint is a single labelled example (a “point” in n-dimensional space). It’s a tuple consisting of a Double label (either 0 or 1 for negative or positive examples), and a Vector of features, either dense or sparse.  Features are numbered from 0 to n. These are the examples from the documentation. Note the Vectors import. This is important as the default Spark Vector is NOT CORRECT.

import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.regression.LabeledPoint;

// Create a labeled point with a positive label and a dense feature vector.

// There are three features, with values 1, 0, 3.
LabeledPoint pos = new LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0));

// Create a labeled point with a negative label and a sparse feature vector.

// There are two features, 0 and 2, with values 1 and 3 respectively.

LabeledPoint neg = new LabeledPoint(0.0, Vectors.sparse(3, new int[] {0, 2}, new double[] {1.0, 3.0}));

Resilient Distributed Datasets (RDDs)

https://spark.apache.org/docs/latest/rdd-programming-guide.html

https://spark.apache.org/docs/latest/api/java/org/apache/spark/api/java/JavaRDD.html

From the documentation:

“Spark revolves around the concept of a resilient distributed dataset (RDD), which is a fault-tolerant collection of elements that can be operated on in parallel. There are two ways to create RDDs: parallelizing an existing collection in your driver program or referencing a dataset in an external storage system… “ (such as Cassandra)

This blog has a good explanation of the Spark architecture, and explains that the features of RDDs are:

  • Resilient, i.e. fault-tolerant with the help of RDD lineage graph and so able to recompute missing or damaged partitions due to node failures.
  • Distributed with data residing on multiple nodes in a cluster.
  • Dataset is a collection of partitioned data with primitive values or values of values, e.g. tuples or other objects (that represent records of the data you work with).

RDD’s are also immutable and can be cached. Fault-tolerance depends on the execution model which computes a Directed Acyclic Graph (DAG) of stages for each job, runs stages in optimal locations based on data location, shuffles data as required, and re-runs failed stages.

LIBSVM—Sparse Data Format

https://spark.apache.org/docs/2.0.2/mllib-data-types.html

https://spark.apache.org/docs/latest/api/java/org/apache/spark/mllib/util/MLUtils.html

Where does the RDD training data come from? In the example code I read it from a local file using MLUtils.loadLibSVMFile() (I’ll come back to the parameters later):

JavaRDD<LabeledPoint> data = MLUtils.loadLibSVMFile(sc, path).toJavaRDD();

From the documentation:

“It is very common in practice to have sparse training data. MLlib supports reading training examples stored in LIBSVM format, which is the default format used by LIBSVM and LIBLINEAR. It is a text format in which each line represents a labeled sparse feature vector using the following format:

label index1:value1 index2:value2 …

where the indices are one-based and in ascending order. After loading, the feature indices are converted to zero-based.”

For the above Monolith example the LIBSVM input file looks like this (assuming that feature values of “0” indicate non-existent data):

1.0 1:1.0
0.0 2:1.0

0.0 3:1.0
0.0 4:1.0
0.0 5:1.0
0.0 6:1.0
0.0 7:1.0
1.0 8:1.0
0.0 9:1.0
0.0 10:1.0

This doesn’t seem very “user friendly” as we have lost the feature names. I wonder if there is a better data format?

Splitting (the Data, Not the Atom)

(Source: Shutterstock)

Once we have the training data in the correct format for the algorithm (JavaRDD<LabeledPoint>), but before we train the model, we need to split it into two random subsets for training and testing. JavaRDD.randomSplit() does this and takes parameters:

double[] weightsweights for splits, will be normalized if they don’t sum to 1

long seedrandom seed

// Split sample RDD into two sets, 60% training data, 40% testing data. 11 is a seed.

   JavaRDD<LabeledPoint>[] splits = data.randomSplit(new double[]{0.6, 0.4}, 11L);

   JavaRDD<LabeledPoint> trainingData = splits[0].cache();  // cache the data

   JavaRDD<LabeledPoint> testData = splits[1];

Notice the cache() call for trainingData.  What does it do? Spark is Lazy, it doesn’t evaluate RDD’s until an action forces it to. Hence RDD’s can be evaluated multiple times which is expensive. cache() creates an in memory cache “checkpoint” of an RDD which can be reused. The most obvious case is when an RDD is used multiple times (i.e. iteration), or for branching transformations (i.e. multiple different RDD’s are computed from an original RDD), in which case the original should be cached.
https://stackoverflow.com/questions/28981359/why-do-we-need-to-call-cache-or-persist-on-a-rdd

Note that for this example the initial data should be cached as we use it later to count the total, positive and negative examples.

Evaluating the Model on the Test Data – Thumbs down or up?

(Source: Shutterstock)

We trained a decision tree model above. What can we do with it? We trained it on the trainingData subset of examples leaving us with the testData to evaluate it on. MLLib computes some useful evaluation metrics which are documented here:

https://spark.apache.org/docs/latest/mllib-evaluation-metrics.html

https://spark.apache.org/docs/latest/api/java/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.html

Here’s the code which computes the evaluation metrics:

// Compute evaluation metrics.
BinaryClassificationMetrics metrics = new BinaryClassificationMetrics(predictionAndLabels.rdd());

The highlighted parameter is an RDD of (prediction, label) pairs for a subset of examples. I.e. (0, 0) (0, 1) (1, 0) (1, 0). I.e. for each example, run the model and return a tuple of predicted label and actual example label. Here’s the complete code that does this on the testData and then computes the evaluation metrics:

// For every example in testData, p, replace it by a Tuple of (predicted category, labelled category)

// E.g. (1.0,0.0) (0.0,0.0) (0.0,0.0) (0.0,1.0)

JavaPairRDD<Object, Object> predictionAndLabels = testData.mapToPair(p ->

    new Tuple2<>(model.predict(p.features()), p.label()));

// Compute evaluation metrics.

BinaryClassificationMetrics metrics = new BinaryClassificationMetrics(predictionAndLabels.rdd());

How does this work? There may be a few unfamiliar things in this code which we’ll explore: Tuple2, JavaPairRDD, mapToPair, features() and label().

Tuple(ware)

Java doesn’t have a built-in Tuple type, so you have to use the scala.Tuple2 class.

MAP

(Source: Shutterstock)

Map is an RDD transformation. Transformations pass each dataset element through a function and return a new RDD representing the result. Actions return a result after running a computation on a dataset. Transformations are lazy and are not computed until required by an action. In the above example, for every example in testData, p, it is replaced by a Tuple of (predicted category, labelled category). These values are computed by running model.predict() on all the features in the example, and using the label() of the example.

See LabeledPoint documentation for features() and label() methods.

JavaPairRDD is a (key, value) version of RDD. Instead of the general map function you need to use mapToPair.

Map (and mapToPair) are examples of “Higher Order Functions”. These are used a lot in Spark but are really pretty ancient and were first practically used in 1960 in LISP. Disturbingly the abstract sounds like an early attempt at HAL:

“A programming system called LISP … was designed to facilitate experiments … whereby a machine could be instructed to … exhibit “common sense” in carrying out its instructions.”

I used LISP for a real AI project (once), but (it ((had too) (many) brackets) (for (((me (Lots of Idiotic Spurious Parentheses))) ???!!! (are these balanced?!).

preferred(I, Prolog).

The following code prints out the main evaluation metrics, precision, recall and F.

JavaRDD<Tuple2<Object, Object>> precision = metrics.precisionByThreshold().toJavaRDD();

   System.out.println(“Precision by threshold: ” + precision.collect());

   JavaRDD<Tuple2<Object, Object>> recall = metrics.recallByThreshold().toJavaRDD();

   System.out.println(“Recall by threshold: ” + recall.collect());

   JavaRDD<Tuple2<Object, Object>> f = metrics.fMeasureByThreshold().toJavaRDD();

   System.out.println(“F by threshold: ” + f.collect());

Note that the metrics are designed for algorithms that can have multiple threshold values (i.e. the classification can have an associated probability).  However, the decision tree algorithm we are using is a simple yes/no binary classification.  A “confusion matrix” is a simple way of understanding the evaluation metrics. For each of the two states of reality (no reaction or reaction from the monolith) the model can make a true or false prediction giving four possible outcomes: Correct: TN = True Negative (the model correctly predicted no reaction), TP = True Positive (correctly predicted a reaction). And Incorrect: FP = False Positive (predicted a reaction but there was no reaction), FN = False Negative (predicted no reaction but there was a reaction).  FP is often called a Type I error, and FN a type II error.

Model Prediction Instaclustr

Precision = (TP)/(TP+FP), the proportion of predicted positives that were actually positive (the right column).

Recall = (TP)/(TP+FN), the proportion of actual positives that were correctly predicted as positive (the bottom row).

F is the average of precision and recall.

And the results (on the extended When Will the Monolith React? data) were as follows:

Precision = 71%

Recall = 45%

F = 56%

Precision is better than recall. 71% of the models predicted “Reaction” cases were in fact a “Reaction”. However, the model only correctly predicted 45% of all the actual “Reaction” cases correctly. The reason for using precision and recall metrics is to check if the model performance is purely the result of guessing. For example, if only 20% of examples are positive, then just guessing will result in a model “accuracy” approaching 80%.

FILTER

Another common Spark transformation is filter.   We’ll use filter to count the number of positive and negative examples in the data to check if our model is any better than guessing. filter() takes a function as the argument, applies it to each element in the dataset, and only returns the element if the function evaluates to true.

long n = data.count();
    System.out.println("RDD size = " + n);
    
    // count number of positive and negative examples
    // functions to return true if label == 0 or label == 1
    // LabeledPoint is a tuple of (label, features).
    Function&lt;LabeledPoint, Boolean&gt; label0 = row -&gt; (row.label() == 0.0);
    Function&lt;LabeledPoint, Boolean&gt; label1 = row -&gt; (row.label() == 1.0);

    Double neg = (double) data.filter(label0).count();
    Double pos = (double) data.filter(label1).count();
    System.out.println("pos examples = " + pos);	
    System.out.println("neg examples = " + neg);	
    System.out.println("probability of positive example = " + pos/(double)n);	
    System.out.println("probability of negative example = " + neg/(double)n);

This code calculates that there are 884 examples, 155 positive and 729 negative, giving:

probability of positive example = 0.1753393665158371

probability of negative example = 0.8246606334841629

This tells us that just guessing would result in close to 82% accuracy. The actual model accuracy for the example is 85% (which requires extra code, not shown).

Spark Context

https://spark.apache.org/docs/latest/rdd-programming-guide.html

https://spark.apache.org/docs/latest/api/java/org/apache/spark/SparkContext.html

This leaves us with the final but “important bit” of the code at the start.  How is Spark actually run?  A SparkContext object tells Spark how to access a cluster, and a SparkConf has information about your application.  Given that we are just running Spark locally just pass “local” to setMaster.

SparkConf conf = new SparkConf().setAppName(“Java Decision Tree Classification Example”);
conf.setMaster(“local”);
SparkContext sc = new SparkContext(conf);
String path = “WillTheMonolithReact.txt”;

To run Spark on a cluster you need (a) a cluster with Spark set-up (e.g. an Instaclustr cluster with Spark add-on), and (b) to know more about the Spark architecture and how to package and submit applications.

How does this help with our approach to the Monolith? Maybe raising the POD’s manipulators in “greeting”? Will it be friends?

(Source: Shutterstock)

NOTE 1:

The actual data I used for this example was from the Instametrics example introduced in previous blogs, with the goal of predicting long JVM Garbage Collections in advance. I took a small sample of JVM-related metrics, and computed the min, avg and max for each 5 minute bucket. These became the features. For the label I determined if there was a long GC in the next 5 minute bucket (1) or not (0).  The real data has 1,000s of metrics, so next blog we’re going to need some serious Spark processing even just to produce the training data, and explore the interface to Cassandra, and the suitability of Cassandra for Sparse data.

NOTE 2:

Space travel is sooooo slow, in the years we’ve been on board Spark has changed from RDD to DataFrames. I’ll revise the code for the next blog.

Guide https://spark.apache.org/docs/latest/ml-guide.html

As of Spark 2.0, the RDD-based APIs in the spark.mllib package have entered maintenance mode. The primary Machine Learning API for Spark is now the DataFrame-based API in the spark.ml package.

Related Guides:

  1. Apache Kafka: Architecture, deployment and ecosystem [2025 guide]
  2. Understanding Apache Cassandra: Complete 2025 Guide
  3. Complete guide to PostgreSQL: Features, use cases, and tutorial

Related Products:

  1. NetApp Instaclustr Data Platform

The post Third Contact with a Monolith: Part C—In the Pod appeared first on Instaclustr.

]]>
https://www.instaclustr.com/blog/third-contact-monolith-part-c-pod/feed/ 0
Debugging Jobs in the Apache Spark™ UI https://www.instaclustr.com/blog/debugging-jobs-apache-spark-ui/ https://www.instaclustr.com/blog/debugging-jobs-apache-spark-ui/#respond Sat, 08 Apr 2017 11:31:03 +0000 https://www.instaclustr.com/debugging-jobs-apache-spark-ui/ Overview In this post, we’ll be running a basic Apache Spark job that selects data from a Cassandra database using a couple of different methods.  We will then examine how to compare the performance of those methods using the Spark UI. Setup For this post, we will use a Zeppelin+Spark+Cassandra cluster.  If you’d like to...

The post Debugging Jobs in the Apache Spark™ UI appeared first on Instaclustr.

]]>
Overview

In this post, we’ll be running a basic Apache Spark job that selects data from a Cassandra database using a couple of different methods.  We will then examine how to compare the performance of those methods using the Spark UI.

Setup

For this post, we will use a Zeppelin+Spark+Cassandra cluster.  If you’d like to try this out for yourself, you can create an account and a free 14-day trial cluster here.

The code snippets used are available in Instaclustr’s Github site as a ready-to-go Zeppelin notebook.

Inserting Data

To start things off, we will create a test keyspace and table in our Cassandra cluster, via Zeppelin (we’re using Zeppelin for this because it provides a nice, quick GUI for CQL queries):

%cassandra

// Step 1: Create table

CREATE KEYSPACE IF NOT EXISTS spark_demo WITH REPLICATION = { 'class': 'NetworkTopologyStrategy', 'AWS_VPC_US_EAST_1': 1 };

CREATE TABLE IF NOT EXISTS spark_demo.test(
        key int,
        letter text,
        value text,
        PRIMARY KEY(key, letter)
    );

INSERT INTO spark_demo.test (key, letter, value) VALUES (1, 'a', 'test 1');

SELECT * FROM spark_demo.test;

Now that the table has been created, let’s populate it with some test data:

%spark 

import com.datastax.spark.connector._
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._

// STEP 2: Populate data

var testData : List[(Int, String, String)] = List()
var i : Int = 0;
val letters = Array("a", "b", "c", "d", "e", "f", "g", "h", "i", "j")
val r = scala.util.Random

for( i

And then run a quick select query to make sure it looks as expected:

run Apache Spark select query

Retrieving Data

Now that we’ve got some test data, we’ll run two different methods of selecting data from the Cassandra table.

Filter

For the first method, let’s use Spark’s filter transformation to pull the data back from the “test” table to the executor, then find a partition key and then show the “letter” values for each row.

%spark

// Step 3a: Filtering using Spark

import com.datastax.spark.connector._
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._

val letterCountRdd = sc.cassandraTable("spark_demo","test")
    .filter(r => r.getInt("key").equals(1))
    .map(((x: CassandraRow) => x.getString("letter")))
    .map(letter => (letter, 1))
    .reduceByKey{case (x, y) => x+y}
    
println(letterCountRdd.toDebugString)

letterCountRdd.collect.foreach(println)

In addition to the letter counts, we’re also showing the RDD dependency graph (the steps required to generate my letterCountRdd) using the toDebugString method.

toDebugString uses indents to indicate shuffle boundaries (transferring data between executors), so for the lowest indent the output is showing that to generate the letterCountRdd each executor will need to:

  1. Perform a Cassandra table scan
  2. Filter all of the retrieved rows
  3. Retrieve the “letter” of each row
  4. Create tuples for each “letter” value

The transformed data is then transferred back to an executor for the reduceByKey transformation to be performed.

Spark UI

Moving across to the Spark UI, we can navigate to the Jobs page and select the Job that just ran:

Apache Spark UI Instaclustr

From here, we can see that the Job took over 5 seconds to complete and the bulk of that time was spent in Stage 1, gathering and filtering 1,118.5 KB of data (the DAG visualization also shows a nice graphical summary of the RDD lineage).

5 seconds and a megabyte of data seems like a lot for the single partition that we’re expecting to return.  We can click on the Stage 1 link to show further details of the filtering:

Apache Spark DAG visualization and summary of RDD lineage

Amongst other things, the Tasks table is showing that all three of our executors processed the Stage pipeline and performed full Cassandra table scans (something that should generally be avoided), with only one of them (ip-10-224-135-155.ec2.internal) actually ending up with any data.

Where

For the second method, let’s use the Spark-Cassandra connector’s select and where methods to (hopefully!) reduce the amount of data that needs to be transferred:

  • The where method informs the connector of which partitions are required and therefore which nodes need to process the task
  • The select method is retrieving only the column (letter) we’re interested in

%spark

// Step 3b: Pushing filtering down to the Cassandra node

import com.datastax.spark.connector._
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._

val letterCountRdd = sc.cassandraTable("spark_demo","test")
    .select("letter").where("key = ?", "1")
    .map(((x: CassandraRow) => x.getString("letter")))
    .map(letter => (letter, 1))
    .reduceByKey{case (x, y) => x+y}

println(letterCountRdd.toDebugString)

letterCountRdd.collect.foreach(println)

You can see that we’ve swapped out filter for this line:

.select("letter").where("key = ?", "1")

In the toDebugString output, it’s showing that the previous filter transformation has been completely removed, as that work has now been pushed down to the Cassandra nodes.

Spark UI

Moving back to the Spark UI, the details for this new Job reflect that the filter transformation has been dropped.  More importantly, the total time has been reduced to 0.5 seconds.  There’s not much difference in the reduceByKey durations (73 ms vs. 27 ms), so lets inspect Stage 1 again:

Spark UI Cassandra filter transformation

This time, only one executor has been initialized because the connector was aware of exactly which node the required partition resided on.  In addition, as Cassandra was only returning the “letter” column for partition “1”, the Stage Input Size was lowered to 9 bytes.

Conclusion

Hopefully, this post has provided an interesting summary of how we can debug applications using the Spark UI!  If you have any questions or feedback, please comment! or feel free to contact us directly.

Related Guides:

  1. Apache Kafka: Architecture, deployment and ecosystem [2025 guide]
  2. Understanding Apache Cassandra: Complete 2025 Guide
  3. Complete guide to PostgreSQL: Features, use cases, and tutorial

Related Products:

  1. NetApp Instaclustr Data Platform

The post Debugging Jobs in the Apache Spark™ UI appeared first on Instaclustr.

]]>
https://www.instaclustr.com/blog/debugging-jobs-apache-spark-ui/feed/ 0
Maximising Availability with Apache Cassandra https://www.instaclustr.com/blog/maximising-availability-with-apache-cassandra/ https://www.instaclustr.com/blog/maximising-availability-with-apache-cassandra/#respond Mon, 06 Mar 2017 05:50:48 +0000 https://www.instaclustr.com/maximising-availability-with-apache-cassandra/ Introduction A number of recent of events have had us thinking more than usual about maximising availability with Apache Cassandra. The first of these was the recent AWS S3 outage. While this outage did not have any effect on any of our managed clusters (other than few backup jobs being skipped) it obviously did affect...

The post Maximising Availability with Apache Cassandra appeared first on Instaclustr.

]]>
Introduction

A number of recent of events have had us thinking more than usual about maximising availability with Apache Cassandra.

The first of these was the recent AWS S3 outage. While this outage did not have any effect on any of our managed clusters (other than few backup jobs being skipped) it obviously did affect many services on the Internet. While the lack of impact on our customer clusters was primarily due to the fact we don’t rely on S3 for a lot, the cascading impact of the S3 outage illustrates some of the difficulty in determining your actual availability exposure when using highly integrated services.

The second event was when we recently we got involved in providing emergency support to an organisation running their own cluster that had got into a serious cascading failure state. While this organisation had followed many of the basic recommendations for building a highly-available Cassandra cluster they still suffered a significant period of effective outage stemming from broader technical strategies associated with maintaining a healthy Cassandra cluster.

Both of these events got me thinking. What can our customers and others do to minimise the chance of issues like this occurring, and how can they minimise the impact if do they occur?

The basics

Firstly, let’s cover the basics of Cassandra HA. Anyone running a Cassandra cluster should understand these and if you don’t have most of these in place in your cluster you’re likely running a significant availability risk.

It goes without saying that clusters provisioned through the Instaclustr Managed Service are configured with these best practices by default:

  • Run at least 2 (but probably 3) nodes. OK, that’s really basic but we have seen customers ignore this. In most cases, you’ll want to operate with strong consistency and so need at least 3 nodes. This allows your Cassandra service to continue uninterrupted if you suffer a hardware failure or some other loss of the Cassandra service on a single node (this will happen sooner or later).
  • Make sure your replication factor and replication strategy is correct. There is not a lot of point in having three nodes if you have your replication factor set to one (again, obvious but again we have seen it). Replication factor of 3 is the standard for most use cases. Also, make sure you’re using NetworkTopologyStrategy as your replication strategy – it’s necessary in order to take advantage of other features such as racks and data centres. Even if you don’t think you need NetworkTopologyStrategy now, there are many reasons may you want to use it in the future and no real disadvantage to enabling it from the start.
  • Use Cassandra Racks aligned to actual failure zones: Rack configuration in Cassandra is designed to ensure that the replicas of your data are stored in order to minimise the chance of losing access to more than one replica in a single incident. In AWS, map your racks to availability zones. Other cloud providers have similar concepts. In your own data center, actual racks are probably a good choice.
  • Use a fault-tolerant consistency level: All the replicas in the world won’t help your availability at a consistency level of ALL. Most people use QUORUM or LOCAL_QUORUM which does provide for operations to continue with strong consistency despite a failed node. The main mistake we see here (at the small end) is not realising the QUORUM with replication factor two still requires both nodes so you have no protection against failure of a single node.

Architecture strategies

All of the points above are really the basic hygiene factors that should apply to any production Cassandra cluster unless you have carefully understood the risk and are prepared to accept it. These basic factors take care of single instance hardware failure, processes crashing on a single machine and other similar basic outage scenarios.

Following are further steps you can take to maximise availability at the architecture level and provide protection against more significant physical infrastructure level failure:

  • Multi-datacentre: One possible scenario many people look to guard against is failure of a complete cloud provider region or, if on-prem, a physical data centre. Cassandra has you covered here with it’s native multi-datacenter support allowing a hot standby of your cluster to be maintained remotely with native Apache Cassandra features.
  • Multi-provider: Taking the multi-datacenter scenario a step further, it is possible to run a Cassandra cluster spanning not just cloud provider regions but also multiple cloud providers (or even on-prem and cloud). Our Instaclustr Apache Cassandra managed service provides support for both multi-region and multi-provider clusters.
  • Replication Factor 5: Running replication factor 3 protects you against loss of a single replica (including, with 3 racks, a full rack). However, this still leaves you in a high risk situation any time you have one replica down for maintenance. Some Cassandra maintenance operations, particularly streaming data to populate a replaced node, can take a long time (hours or even days) thus making this risk significant in some scenarios. Running a replication factor of 5 (and operating at QUORUM consistency level) means you still have protection from single-replica hardware failure, even during these maintenance operations.

Infrastructure strategies

There are other scenarios that can impact availability of a cluster. The first relatively common cause is overload of the cluster. This can be caused by a sudden increase in client load (particularly when exacerbated by poor retry policies – see application strategies section below) or other events in the cluster such as a dramatic increase in tombstones or a rogue repair operation.

There are three main infrastructure-level approaches to mitigate the risk of these types of issues:

  • Keep spare capacity: If your cluster has spare capacity it will be more likely to be able to service an unexpected increase in load without getting into an unstable state. One example of this is that we recommend that disk usage is kept to < 70% under normal load. Similarly, 60-70% CPU utilisation is probably a reasonable metric for safe ongoing CPU levels. It’s possible to push harder if you understand your cluster but it is increasing the risk of unexpected events causing instability or even catastrophic failure.
  • Prefer smaller nodes: Smaller nodes have two significant availability advantages. Firstly, maintenance operations such as replacing nodes or upgrading sstables, will complete more quickly on smaller nodes reducing the risk of an outage during such operations. Secondly, the impact of losing a single node on the overall processing capacity of the cluster is less when there are more, smaller nodes. To give a specific example, a cluster of 6 x m4.xl AWS instances will have similar (and possibly more) processing capacity than 3 x m4.2xl for the same price. However, losing a single node removes ⅙ of the capacity of the m4.xl cluster but ⅓ of the m4.3xl cluster. There is a balance here where nodes get too small and the base o/s and other overhead becomes too significant. We like the m4.xls and r4.xls as a good size production node for small to medium clusters. Running more, smaller nodes increases the need for an automated provisioning and management system such as we use in the Instaclustr Managed Service.
  • Have sufficient monitoring (and pay attention!): Most times when we see a cluster in real trouble, there have been plenty of warning signs beforehand – increasing pending compactions, tombstone warnings, large partition warnings, latency spikes, dropped mutations. Paying attention to these issues and correcting them when they first appear as inconveniences can prevent them progressing to major failures.

Application-level strategies

The strategies above (with the possible exception of consistency levels) are all things that can be applied at the Cassandra architecture level relatively transparent to application logic. There are some further steps that can be applied at the application level to mitigate the risk and impacts of Cassandra issues:

  • Driver configuration: The Cassandra drivers contain several features that help to mitigate against cluster outages. For example, routing queries away from slow nodes and falling back to lower consistency levels when required. Understanding these and ensuring the driver is optimally configured for your requirements can help deal with many issues.
  • Retry strategies: Above the level of the driver, many applications implement a strategy to retry failed queries. Poor retry policies without a back-off strategy can be a significant problem. Firstly, queries that timeout due to issues such as a build up of tombstones will be retried putting an additional load on the cluster. Secondly, if the load gets to the point where many queries are failing, application retries will multiple the load on the cluster, until it inevitably reach a point of catastrophic failure.
  • Use multiple clusters: Even if you’ve applied all the measures above, there is still a chance (however small) that something will cause your cluster to fail. Particular risks include application bugs or human error by someone with administrator-level privileges (of course, there is plenty you can do to reduce these risk but that’s outside the scope of this article). A key mitigation against any of these remaining risks is to use multiple clusters. Consider, for example, a global application where presence in 6 regions is desired. You could build a single cluster with six data centers. However, architecture still means that a single action could affect your entire global cluster. Building 3 clusters of two data centers each would not entirely protect from this kind of  disaster but does mean that a global Cassandra outage is effectively impossible (possibly at the cost of increased latency for users who move around the globe). Similarly, splitting your cluster by application function may be possible – for example one cluster for raw data ingestion and one for store and retrieving analytics results from that raw data. This pattern should mean that part of your application will continue to function (at least for a period) even if one cluster fails and complete application outage as a result of Cassandra failure is next to impossible.

Conclusion

Many leading Internet services have shown that, by deploying strategies like these, it is possible to achieve extremely high levels of availability with Cassandra. At Instaclustr, we see a big part of our job as making these levels of availability easy and accessible so you can focus on building your application rather than spending time and effort at the infrastructure and operations layers. Visit our Managed Solutions and Cassandra Consulting page to learn more.

Related Guides:

  1. Apache Kafka: Architecture, deployment and ecosystem [2025 guide]
  2. Understanding Apache Cassandra: Complete 2025 Guide
  3. Complete guide to PostgreSQL: Features, use cases, and tutorial

Related Products:

  1. NetApp Instaclustr Data Platform

The post Maximising Availability with Apache Cassandra appeared first on Instaclustr.

]]>
https://www.instaclustr.com/blog/maximising-availability-with-apache-cassandra/feed/ 0
Multi data center Apache Spark™/Cassandra Benchmark, Round 2 https://www.instaclustr.com/blog/multi-data-center-sparkcassandra-benchmark-round-2/ https://www.instaclustr.com/blog/multi-data-center-sparkcassandra-benchmark-round-2/#respond Thu, 21 Apr 2016 13:53:50 +0000 https://www.instaclustr.com/multi-data-center-sparkcassandra-benchmark-round-2/ Context In my previous multi Data Center blog post ‘Multi data center Apache Spark and Apache Cassandra benchmark’, I compared the performance and stability of running Spark and Cassandra collocated on the same Data Center (DC), versus running a second DC that is aimed towards analytic jobs with Spark. This gives some breathing space to...

The post Multi data center Apache Spark™/Cassandra Benchmark, Round 2 appeared first on Instaclustr.

]]>
Context

In my previous multi Data Center blog post ‘Multi data center Apache Spark and Apache Cassandra benchmark’, I compared the performance and stability of running Spark and Cassandra collocated on the same Data Center (DC), versus running a second DC that is aimed towards analytic jobs with Spark. This gives some breathing space to the main DC running exclusively Cassandra for more critical/time sensitive operations. This is achieved by using the NetworkTopologyStrategy, and performing critical read/writes on the primary DC with a LOCAL consistency level (LOCAL_QUORUM or LOCAL_ONE). The benchmark demonstrates that running an intensive Spark job on a dedicated DC would marginally impact the read/write performances of the main DC, while completing the job in half the time it would be on a single DC.

A few readers made the excellent point that the comparison was done between two configuration of different price, and ultimately, one would want to know what are the advantages and drawbacks of running 1 single DC vs 2 DC of the same price. So now I will answer those questions in this post.

Goal

The aim of this benchmark is to compare the performance between two cluster topology running both Spark and Cassandra. The first configuration is a single Data Center (Single DC) with 6 nodes, where each node runs both the Cassandra and Spark service. The second configuration has two Data Centers, one with 3 nodes running exclusively on Cassandra, and one with 3 nodes running Spark and Cassandra collocated.

 Benchmark Setup

  • All the nodes were m4xl-400, which corresponds to AWS m4.xlarge with 400GB of EBS storage. That’s 4 CPU / 15GB of RAM / 400GB of AWS EBS storage per node.
  • Both the Spark test job, and Cassandra stress command, were run from a separate stress box (one for each cluster) that were connected to it’s cluster via VPC peering, in the same AWS region of the cluster.
  • As done in my previous benchmark, all the Cassandra stress and Spark benchmark jobs were executed at the same time against the two clusters. This is important is it ensures that the background Cassandra operations (such as compaction) both have the same amount of time to complete, and this is even more important when using AWS EBS volumes in order to give the same amount of time for the cluster to recover its EBS credits, when some of the EBS burst capacity has been used.
  • All Cassandra-stress command and Spark jobs were run after the cluster had time to catch up on compaction and recover its EBS credit, if it had been used previously.
  • Finally, operations that should be running in production, such as backups, or read repairs, have been disabled, as they might otherwise run at different time on the two clusters, impacting differently on the performance. It is important to understand that in production, those operations cannot be disregarded, and would consume some of the cluster resource.

Benchmark

Initial loading: write performance

For each cluster, two set of data (“data1” and “data2”) were created using the cassandra-stress tool. “data1” will be used for read operation and by the Spark job, while “data2” will be used during the read/write test, and therefore grow in size. The cassandra-stress command was:

cassandra-stress write n=400000000 no-warmup cl=LOCAL_QUORUM -rate threads=400 -schema replication\(strategy=NetworkTopologyStrategy,spark_DC=3,AWS_VPC_US_EAST_1=3\) keyspace=”data1” -node $IP

It is worthwhile recalling that we explicitly set a very high number of write, so that the data set cannot fit in the memory. Indeed, when inserting a small number of rows, the resulting Cassandra stable file will likely be cached in memory by the OS, resulting in performance much better than usually seen in production.

The write performance that was reported by cassandra-stress were:

Screen Shot 2016-08-03 at 1.16.16 PM

Unsurprisingly, we observed the linear scaling in write performance that Cassandra architecture has been designed for: The Single DC has twice the amount of nodes compared to the Cassandra-only data center of the multi-DC. With a replication factor set to 3 in both cases, the Single DC has a write throughput twice faster than the multiDC. It is important to understand that in the case of the multi-DC, each insert is effectively written to all the 6 nodes, while in the case of the single-DC, the data ends up in only 3 nodes. In other words, if the data was inserted in the Single DC with a replication factor of 6, we would observe exactly the same write throughput on both clusters.

Maximum read performance

The read performance was assessed against “data1” for both clusters, using the following cassandra-stress command:

cassandra-stress read n=400000000 no-warmup cl=LOCAL_QUORUM -rate threads=500 -schema keyspace="data1" -node $IP

Screen Shot 2016-08-03 at 1.16.33 PM

As what was observed with the write test, the number of read per seconds that Cassandra can deliver scales with the number of nodes in the data center being used for the read.

Maximum read/write performance

The read/write cassandra-stress command allows to test the cluster’s performance by mixing reads and writes, as would be done in a production environment. The maximum read/write performance was assessed with:

cassandra-stress mixed ratio\(write=1,read=3\) n=400000000 no-warmup cl=LOCAL_QUORUM -rate threads=500 -schema replication\(strategy=NetworkTopologyStrategy,spark_DC=3,AWS_VPC_US_EAST_1=3\) keyspace=”data2″ -node $IP

Screen Shot 2016-08-03 at 1.16.54 PM
Note that the performance above has to be considered as a maximum peak performance. The number of sustained operations per second in production should be much lower, especially to allow backgrounds maintenance activities such as node repair, backup, and large compaction to run.

Spark benchmark with increasing background Cassandra activity.

In order to assess the Spark performances in single and multi DC settings, we used a simple Spark application that uses a combination of map and reduceByKey that are commonly employed in real life Spark applications, querying data stored in the “data1” column family. The Scala code used for this job is shown below for reference.

val rdd1 = sc.cassandraTable("data1", "standard1")
.map(r =&gt; (r.getString("key"), r.getString("C1")))
.map{ case(k, v) =&gt; (v.substring(5, 8), k.length())}
val rdd2 = rdd1.reduceByKey(_+_)
rdd2.collect().foreach(println)

The job is using the key and the “C1” column of the stress table “standard1”. The key is shortened to the 5 to 8 characters with a map call, allowing a reduce by key with 16^3=4096 possibilities. The reduceByKey is a simple yet commonly used Spark RDD transformation that allows to group keys together while applying a reduce operation (in this case a simple addition). This is somewhat a similar example as the classic word count example. Notably, this job will read the full data set.

We ran this job on each cluster, with an increasing level of background Cassandra read/write operations, ranging from no activity at all (Spark job running alone), to 300, 4000, 5000 and 6000 read/write per seconds (approaching the limits of the single DC read/write operation per seconds found in the previous section).

Figure 1. Performance is measured in seconds: on the Y-axis, lowest values correspond to better performances. The X-axis indicates the amount of cassandra-stress activity active during the test. A value of 0 indicates no Cassandra read/write at all.

Screen Shot 2016-08-03 at 1.18.02 PM

Figure 2. 95th percentile latency at various levels of cassandra-stress limit (x axis). Lower values of latency (y axis) are better.

Screen Shot 2016-08-03 at 1.17.52 PM

The first thing to note from those graphs is that with no cassandra-stress activity (0 op/s), the single-DC cluster runs the Spark job twice faster than the multiDC cluster. This is expected as there are 6 Cassandra nodes (single-DC) serving read requests from Spark vs only the 3 local Cassandra nodes (multi-DC).

The second point to note is that unlike the single-DC cluster, the multiDC cluster capacity to process the Spark job is much less affected by the increase of Cassandra stress activity. The Spark processing time was 3890 s with no read/write, up to 4378 s with a 6000 read/write op/s, while the processing time for the multi DC ranged from 2018 s to 5078 s. Figure 2 also demonstrates that the Cassandra operation latency isn’t affected much during the Spark activity in the multiDC settings, at least until the number of operation per seconds approach the limit of the multiDC capacity as determined earlier. On the other hand, the Cassandra latency in the singleDC gets immediately and strongly affected by the Spark job, even at a low number of cassandra-stress operation per seconds.

Finally, there is a tipping point (at about 3800 op/s) at which this Spark job starts to get processed faster in the multiDC setting than the singleDC setting. The Cassandra nodes of the singleDC cluster have difficulty serving both the read/write request of cassandra-stress, and the read requests from Spark. On the other hand, the multiDC setting ensure that in the analytical DC, Spark has fully access to Cassandra resources, without being too affected by the background cassandra-stress activity running against the Cassandra-only data center, and being streamed in the background to the analytical DC.

The next table presents the latency of the Cassandra operation observed on the two cluster, while running cassandra-stress at various level of operation / second, and during the execution of the Spark job.

Screen Shot 2016-08-03 at 1.17.22 PM

This table highlight the problems when running Spark and Cassandra on the same cluster: The latency increase as soon as Spark is performing read request on the cluster. While the performance is similar when Spark is not running (see table below), even starting at 3000 op/s we see a dramatic increase in latency in the cassandra-stress results of the Single DC. The multiDC latency, on the other hand, gets affected only when the number of operation per seconds of the cassandra-stress command approach the cluster read/write limit determined earlier.

Screen Shot 2016-08-03 at 1.17.32 PM

Conclusion

This Blog was presented as a comparison of two Cluster topology of the same price. One with a single Data Center with Cassandra and Spark running on each node, and a multi Data Center cluster with one Cassandra only DC, and one analytical DC running Cassandra + Spark. The single DC cluster perform much better when running Cassandra activities only, and that’s expected as it has 6 nodes to serves writes, against 3 nodes. It also performs much better when running Spark (without background Cassandra work), and that’s normal as it has 6 Cassandra nodes to serves read requests from Spark, against 3 nodes for the single DC.

What is interesting to see is that when working with a multiDC setting, there is a separation of concerns between Spark and Cassandra activity, which ensures that the Spark jobs are marginally affected by Cassandra activities running against the Cassandra only DC, and conversely the Cassandra performances doesn’t drop on the Cassandra only DC when running Spark jobs. Even more importantly for latency sensitive application, we can see that in the singleDC cluster, the Cassandra latency of read/write requests suffers immediately when a Spark application is reading data, while this become the case in the multiDC cluster only when reaching the read/write capacity of the cluster. This drop of latency could be a major issue for example for web application that continuously queries data to display to the end user.

So, which topology to choose? It depends of your usage of the cluster. Collocating Cassandra and Spark is attractive if for example you want to have a maximum Cassandra capacity when Spark is not running, and if you can accept performance degradation from time to time, as could be the case if your application is mostly transaction oriented in, say the day, or the working week, and the usage of your cluster becomes analytic oriented in the night or the weekend. In other cases, it is recommended to use separate data centers to ensure smooth, undisrupted, uninterrupted Cassandra operations, being from a latency, or throughput perspectives. This is even more the case when experimenting and developing a new Spark application where impacts can be unpredictable (and I speak from hard-won experience here).

Related Guides:

  1. Apache Kafka: Architecture, deployment and ecosystem [2025 guide]
  2. Understanding Apache Cassandra: Complete 2025 Guide
  3. Complete guide to PostgreSQL: Features, use cases, and tutorial

Related Products:

  1. NetApp Instaclustr Data Platform

The post Multi data center Apache Spark™/Cassandra Benchmark, Round 2 appeared first on Instaclustr.

]]>
https://www.instaclustr.com/blog/multi-data-center-sparkcassandra-benchmark-round-2/feed/ 0
Apache Cassandra® connector for Apache Spark™: 5 tips for success https://www.instaclustr.com/blog/cassandra-connector-for-spark-5-tips-for-success/ https://www.instaclustr.com/blog/cassandra-connector-for-spark-5-tips-for-success/#respond Thu, 31 Mar 2016 21:11:55 +0000 https://www.instaclustr.com/cassandra-connector-for-spark-5-tips-for-success/ Learn a few key lessons about how to get the best out of the Cassandra connector for Spark from the Apache Experts: 5 Tips for Success

The post Apache Cassandra® connector for Apache Spark™: 5 tips for success appeared first on Instaclustr.

]]>
Overview

We have just hit the full release of the Instaclustr managed Cassandra + Spark service offering after 4 months in preview release. During this time, we have had lots of opportunities to get in-depth with using the Cassandra connector for Spark, both with our own Instametrics application and assisting customers with developing and troubleshooting. In that process, we’ve learned a few key lessons about how to get the best out of the Cassandra connector for Spark. This post captures some of those key lessons.

Tip 1: Filter early

A fairly obvious but important tip to start with – the more you can push filters down to Cassandra, and particularly where you can limit queries by partition key, the better. Include your filters as early as possible in your Spark statements and, if possible, specify all the components of the partition key in the filter statement. Also, if you are going to use a set of data from Cassandra more than once, make sure to use cache() to keep it in Spark memory rather than reading from Cassandra each time.

Tip 2: Be aware of your partitions

Once you start working with Spark and Cassandra together, there are two sets of partitions you need to be very aware of:

  • Cassandra partitions – These are the unit at which Cassandra splits data across nodes and determine which Cassandra node your data is stored on. The Cassandra partition key is set when you define the schema for your Cassandra table. Each partition is contained on a single node (per replica). The number of partitions is determined by the cardinality of your partition key.
  • Spark partitions – These are the unit at which Spark splits data (in memory) across workers. Spark partitions also determine the degree of parallelism that Spark can apply in processing data (each partition can be processed in parallel). The number of partitions and partition keys can either be determined automatically by Spark or set explicitly (more below).

A fundamental principle in getting good performance with Spark is to align the distribution of your Spark partitions with your Cassandra partitions and to have an appropriate number of Spark partitions to allow Spark to efficiently parallel process calculations.

In almost all cases, there will be a lot fewer Spark partitions than Cassandra partitions (i.e. a one to many mapping from Spark partitions to Cassandra partitions) but, as far as possible, you want Cassandra data to be read and written to/from the Cassandra node where the Spark partition resides. You also want enough Spark partitions that each partition will fit in available work memory and so that each processing step for a partition is not excessively long running but not so many that each step is tiny, resulting in excessive overhead. The right number is highly dependant on the exact processing scenario but we’ve found around 30 partitions per worker to be a good ballpark for some of our scenarios.

The good news is that in many cases the Cassandra connector will take care of this for you automatically. When you use the Cassandra Spark connector’s cassandraTable() function to load data from Cassandra to Spark it will automatically create Spark partitions aligned to the Cassandra partition key. It will try to create an appropriate number of partitions by estimating the size of the table and dividing this by the parameter spark.cassandra.input.split.size_in_mb (64mb by default). (One instance where the default will need changing is if you have a small source table – in that case, use withReadConf() to override the parameter.)

Where you need to be extra careful is when you are joining with a Cassandra table using a different partition key or doing multi-step processing. In this scenario you can start off with an appropriately sized set of partitions but then greatly change the size of your data, resulting in an inappropriate number of partitions.

The way to address this problem is to use repartitionByCassandraReplica() to resize and/or redistribute the data in the Spark partition. This will redistribute Spark’s in-memory copy of the data to match the distribution of a specified Cassandra table and with a specified number of Spark partitions per worker (it requires that the Spark RDD has columns that match the specified Cassandra table).

Tip 3: Control Spark write speed for any bulk writes

By default, Spark will write to Cassandra as fast as it possibly can. For small bursts of writes, this can work OK but for extended periods (say more than 5 minutes or so and getting more significant after a couple of hours) this can result in the Cassandra nodes being overwhelmed and crashing.

The Spark Cassandra Connector provides three settings to control this behavior:

  • cassandra.output.batch.size.bytes (default: 1024): total data per batch
  • cassandra.output.batch.size.rows (default: auto – batch size determined by size.byts): number of rows per batch
  • cassandra.output.concurrent.writes (default: 5): maximum concurrent writes per Spark task
  • cassandra.output.throughput_mb_per_sec (default: unlimited): maximum write throughput per core

Ideal tuning of these parameters will depend on your schema design and other factors. However, as a general guide, we suggest starting with throughput_mb_per_sec set to 5 (mb per sec per core) on our m4.xl size nodes.

Tip 4: joinWithCassandraTable is a powerful tool

joinWithCassandraTable() is one of the most powerful functions provided by the Spark Cassandra Connector. It allows you to perform a joint between a Spark RDD (including a Cassandra table) and a Cassandra table based on the key of the Cassandra table. The Connector can execute this efficiently, pushing down the selection to the Cassandra nodes.

As well as using this function to join data sets, it can also be used as an efficient way to filter a significant set of records from a Cassandra table. To do this, build an RDD with the keys of the records that you want to extract and then use joinWithCassandraTable() to join the RDD to the Cassandra table and retrieve the data from Cassandra. We have found this to be a very efficient way of selecting a set of data from Cassandra.

An example of how we have used this is to select a set of metrics readings for a list of hosts, metrics, and time buckets). This is an example where we have found it necessary to explicitly control Spark partition creation – the definition of the partitions to be selected is a much smaller data set than the resulting extracted data. As a result, we use repartitionByCassandraReplica() to ensure the selection list is correctly partitioned to match the location of the source Cassandra data and to create an appropriate number of Spark partitions to receive the data that is to be selected from the source tables.

Tip 5: Use SparkSQL appropriately

SparkSQL is an excellent tool for ad-hoc analysis, giving you the expressiveness of SQL over big data stores such as Cassandra. However, if you need to optimise for performance of a query (such as where you are running as a regular batch or as part of a Spark Streaming job) then SparkSQL may not provide the level of control that you need for the optimum results. In these cases, getting familiar with the Spark and Cassandra connector APIs is likely to pay dividends in the efficiency of execution.

Bonus tip – read the docs and use the UI!

A lot of information is contained in Spark Cassandra Connector documentation located on the Cassandra Connector GitHub page (https://github.com/datastax/spark-cassandra-connector). In particular, the FAQ page at the end of the docs contains a lot of useful information that is easy to overlook.

Also, the Spark UI has a lot of information that can help with the tuning described above. Stay tuned for a future blog post touring the key features of the UI.

Related Guides:

  1. Apache Kafka: Architecture, deployment and ecosystem [2025 guide]
  2. Understanding Apache Cassandra: Complete 2025 Guide
  3. Complete guide to PostgreSQL: Features, use cases, and tutorial

Related Products:

  1. NetApp Instaclustr Data Platform

The post Apache Cassandra® connector for Apache Spark™: 5 tips for success appeared first on Instaclustr.

]]>
https://www.instaclustr.com/blog/cassandra-connector-for-spark-5-tips-for-success/feed/ 0
Apache Cassandra® Compaction Strategies https://www.instaclustr.com/blog/apache-cassandra-compaction/ https://www.instaclustr.com/blog/apache-cassandra-compaction/#respond Wed, 27 Jan 2016 08:37:02 +0000 https://www.instaclustr.com/apache-cassandra-compaction/ Apache Cassandra Compaction Strategies - Understand the importance of Compaction strategies, various strategies and and what you should do.

The post Apache Cassandra® Compaction Strategies appeared first on Instaclustr.

]]>
Cassandra is optimized for writing large amounts of data very quickly. Cassandra writes all incoming data in an append-only manner in internal files called SSTables. These SSTables hence contain newly inserted data and updates/deletes of previously inserted data. 

What is Compaction?

Cassandra Compaction is a process of reconciling various copies of data spread across distinct SSTables. Cassandra performs compaction of SSTables as a background activity. Cassandra has to maintain fewer SSTables and fewer copies of each data row due to compactions improving its read performance. Compaction is a crucial area of Cassandra performance and maintenance. 

There are certain methods used to handle these compaction operations and the timing of when these operations are performed. This blog post describes various compaction strategies along with other essential details.

While regular compactions are an integral part of any healthy Cassandra cluster – the way that they are configured can vary significantly depending on the way a particular table is being used. It is important to take some time when designing a Cassandra table schema to think about how each table will be used, and therefore the Cassandra Compaction strategy would be most effective.

Although it is possible to change compaction strategies after the table has been created, doing so will have a significant impact on the cluster’s performance that is proportionate to the amount of data in the table. This is because Cassandra will re-write all of the data in that table using the new Compaction Strategy.

Cassandra’s Write Path

To understand the importance of compactions in Cassandra, you must first understand how Cassandra writes data to a disk. The Cassandra write path in a nutshell:

  1. Cassandra stores recent writes in memory (in a structure called the Memtable).
  2. When enough writes have been made, Cassandra flushes the Memtable to disk. Data on disk is stored in relatively simple data structures called Sorted String Tables (SSTable). At the most simplified level, an SSTable could be described as a sorted array of strings.
  3. Before writing a new SSTable, Cassandra merges and pre-sorts the data in the Memtable according to Primary Key. In Cassandra, a Primary Key consists of a Partition Key (the unique key that determines which node the data is stored on) and any Clustering Keys that have been defined.
  4. The SSTable is written to disk as a single contiguous write operation. SStables are immutable. Once they are written to disk they are not modified. Any updates to data or deletion of data within an SSTable is written to a new SSTable. If data is updated regularly, Cassandra may need to read from multiple SSTables to retrieve a single row.
  5. Compaction operations occur periodically to re-write and combine SSTables. This is required because SSTables are immutable (no modifications once written to disk). Compactions prune deleted data and merge disparate row data into new SSTables in order to reclaim disk space and keep read operations optimized.

If you are unfamiliar with Cassandra’s write path, please read The write path to compaction from Cassandra Wiki.

Cassandra Compaction Strategies

Multiple Compaction Strategies are included with Cassandra, and each is optimized for a different use case:

Type of Compaction Strategy Description When?
SizeTiered Compaction Strategy (STCS) This is the default compaction strategy. This compaction strategy triggers a compaction when multiple SSTables of a similar size are present. Additional of parameters allow STCS to be tuned to increase or decrease the number of compactions it performs and how tombstones are handled. This compaction strategy is good for insert-heavy and general workloads.
Leveled Compaction Strategy (LCS) This strategy groups SSTables into levels, each of which has a fixed size limit which is 10 times larger than the previous level. SSTables are of a fixed, relatively small size (160MB by default) – so if Level 1 might contain 10 SSTables at most, then Level 2 will contain 100 SSTables at most. SSTables are guaranteed to be non-overlapping within each level – if any data overlaps when a table is promoted to the next level, overlapping tables are re-compacted.

For example: when Level 1 is filled, any new SSTables being added to that level are compacted together with any existing tables that contain overlapping data. If these compactions result in Level 1 now containing too many tables, the additional table(s) overflow to Level 2.

This compaction strategy is the best for read-heavy workloads (because tables within a level are non-overlapping, LCS guarantees that 90% of all reads can be satisfied from a single SSTable) or workloads where there are more updates than there are inserts.
DateTiered Compaction Strategy (DTCS) This compaction strategy is designed for use with time-series data. DTCS stores data written within a the same time period in the same SSTable. Multiple SSTables that are themselves written in the same time window will be compacted together, up until a certain point, after which the SSTables are no longer compacted. SSTables are also configured with a TTL. SSTables that are older than the TTL will be dropped, incurring zero compaction overhead. DTCS is deprecated in Cassandra 3.0.8/3.8 and later. The TWCS is improved version of DTCS, it is available with version 3.0.8/3.8 and later.

DTCS is highly performant and efficient, but only if the workload matches the strict requirements of DTCS. DTCS is not designed to be used with workloads where there are updates to old data or inserts that are out of order. If your workload does not fit these requirements, you may be better off using STCS and using a bucketing key (such as hour/day/week) to break up your data.

Time Window Compaction Strategy The Time Window Compaction Strategy is designed to work on time series data. It compactes SSTables within a configured time window. TWCS utilizes STCS to perform these compactions. At the end of each time window, all SSTables are compacted to a single SSTable so there is one SSTable for a time window. TWCS also effectively purges data when configured with time to live by dropping complete SSTables after TTL expiry.

TWCS is similar to DTCS except for operational improvements. Write amplifications is prevented by only compacting SSTables within a time window. The maximum SSTable timestamp is used to decide which time window it belongs to.

TWCS is ideal for time series data which is immutable after a fixed time interval. Updating the data after the specified time window results in multiple SSTables referring to the same data. Those SSTables will not get compacted together.

 

TWCS can be used with Cassandra 2.x by  adding jar file.

Configuring a Cassandra Compaction Strategy

Compaction options are configured at the table level via CQLSH. This allows each table to be optimized based on how it will be used. If a compaction strategy is not specified, SizeTieredCompactionStrategy will be used.

ALTER TABLE mytable
  WITH compaction =
  { 'class' : 'SizeTieredCompactionStrategy' }

Take a look at the compaction sub properties documentation for more information on the options that are available to tweak for each compaction strategy. The default options are generally sufficient for most workloads. We suggest leaving the default options initially and modifying them as required if a table is not meeting performance expectations.

Related Guides:

  1. Apache Kafka: Architecture, deployment and ecosystem [2025 guide]
  2. Understanding Apache Cassandra: Complete 2025 Guide
  3. Complete guide to PostgreSQL: Features, use cases, and tutorial

Related Products:

  1. NetApp Instaclustr Data Platform

The post Apache Cassandra® Compaction Strategies appeared first on Instaclustr.

]]>
https://www.instaclustr.com/blog/apache-cassandra-compaction/feed/ 0