ACID is good-Complexity in Cassandra consistency

If you are used to relational database and new to NoSQL database, you will soon start to miss ACID support as your friend in development. I’ll talk about some very subtle but important details about Cassandra that I wish I had known better when I started my relationship with Cassandra.

Cassandra data modeling is very query oriented. Each query, even for exactly the same data, often requires a new table, with same data, but different partition key.

For example, a user has several different accounts, each account has time series payment data for different purposes. Now if I need the following queries:

  1. Get first 100 payment data for this user.
  2. Get first 100 payment data for one of this user’s accounts.
  3. Get first 100 payment data for a specific purpose.
  4. Get first 100 payment data for a specific purpose for one of the user’s accounts.

Unlike in MySQL, where you can use index to get them easily with a WHERE condition. Just for the record, Cassandra also provides a secondary index, but it has a lot of problems that prevent people from using it. What people end up doing is to create one table for each query with the same data. In this case, it would be like the following:

  1. Table one: Partition by user
  2. Table two: Partition by account
  3. Table three: Partition by payment purpose
  4. Table four: Partition by combined key-account and payment purpose

Since it’s advertised that storage is cheap, Cassandra, like many other big data technologies, run on cheap commodity machines, it seems reasonable to just create these 4 tables, increase number of Cassandra nodes, query the table when you need to.

However, the story doesn’t end here. Now that you have 4 tables to populate when you insert one record. You would probably, as you always do in relational database, add a transaction to include all four inserts and just push them to database, so you can sleep well without worrying about data consistency across table. But it’s not so easy.

The first thing you look up would be Cassandra’s transaction. It really is not the transaction we are talking about. Cassandra’s transaction is called CAS(Compare and Set), and it only applies at individual statement level. Things like:

Insert x into xxx if x doesn’t exist;

Update xxx set a=’a’ if a=’b’;

It doesn’t support batching several statements into one transaction and roll back in case of partial failure. What Cassandra’s transaction really provides is Linearizable consistency, or, as in ACID term, Isolation. A good example is that two users trying to claim the same account id, only one will succeed with Cassandra transaction, or MySQL isolation. Since Cassandra is a distributed system, it’s hard to just create a lock like in MySQL, internally, Cassandra used Paxo’s algorithm to achieve isolation. Here is a good article explaining this.

Ok, transaction doesn’t work, what’s next?

BatchStatement comes to the rescue. A batch in Cassandra allows you to batch statements and send them together in one request, if you are using ‘Logged batch’, it will guarantee that either all succeed or none succeed, BUT, it only guarantees eventual consistency, and does NOT supports isolation. Let me explain this:

Suppose you are sending the payment data into 4 tables with 4 statements in a logged batch, firstly, the Cassandra node that receives this request write batch_log into itself and two other nodes in case this node dies, so other nodes can continue its task. Then the node starts to do its work, apply the four statements in random order unless you force order given some conditions. Now the work started, if, for whatever reason, one of the statements could not be completed, or takes too long to process, the user can see results from other statements, no isolation supported. The worst case would be like this:

Since one statement is not successful yet, Cassandra will retain the batch_log in system, throw a WriteTimeOut Exception to you, and EVENTUALLY completes that query, nobody knows when that statement will be written, and before that happens, you get a partial success. Only when that finally finishes, Cassandra will remove the  batch_log. This might be undesirable for your use case, so think about it.

Now this is still not the end of story. Another batch type-Counter batch, which is the only type of batch statement that allows you to inject counter update statement, and only allows you to inject counter update, does not even guarantee that all statements will eventually be successful. I wasn’t able to find documentation about it, but if you look into Cassandra source code, and follow the class BatchStatement execute method, you will find that at some point, the batch will make a decision to either use mutateAtomically or mutate method based on batch type, and for counter, it’s not atomic.

Counter is an interesting topic in Cassandra, it had a lot of accuracy issue before version 2.1, claimed to be fixed in 2.1 with a new implementation, which sacrificed some performance to get accuracy, however, when combining counter with batch, you might see some surprisingly bad performance and possibly data loss in pre 3.0 version.

Back to the topic. Now if you are ok with non isolation support with batch statement for transaction purpose, be prepared for the next surprise, batch statement, even if it requires just one network call from the client to Cassandra, it actually has worse performance than sending statements one by one. And the reasoning makes perfect sense:

When the node receives a batch request which contains several different statements, if each statement belongs to a different partition(in our case, different table), these partitions, based on the partitioner you use, might belong to different nodes, also, since Cassandra has replicas to store duplicate data for fault tolerance, each one statement must be sent to more than one node given your consistency level setting. Now all these network traffic states have to be managed by the coordinator node, causing worse performance as you increase the diversity of partitions in the batch. As a result, it is recommended to include a small number of different partitions in your request, or put only same partition statements into one batch, which is not really the most common data modeling use case in Cassandra.

Given so many constraints, we are cornered to the use case that: to gain atomic behavior across multiple tables, we are better off think really hard about how many tables are absolutely necessary to insert into without using any counter queries, and no isolation support. These might or might not work for your specific use case. And sadly, this is actually not the end of the story yet.

People usually keep their data only to a specific time, and purge the data when it expires. Except for all those common headaches of tombstone, compaction, repair that come along with Cassandra, you still need to worry about DELETE consistency across tables. Just like when you insert data, you want them to execute atomically, when you delete data, you want it too. And then you will encounter exactly the same batch statement, isolation issue, and remember that DELETE and compactions can cause high consumption of CPU, memory, even affecting your operational data, like timeouts with high resource usage.

Having said all those, I think the point is that one should be aware of so many details when conducting data modeling in Cassandra, and really understand what your application needs and what Cassandra can do, a lot of these problems I mentioned only come up when doing performance test, doing your due diligence to details can save you a lot of effort and time.

I remember a saying like this: Relational database was a one size fits all solution, now it doesn’t fit any problem unless your application only need low diversity, small data set, low velocity data, for all other problems, you can find a specific new technology that works better. And my experience tells me that, yes, you can find a better one for your application, but make sure you understand its use case really well to know that it will solve your specific problem and the effort required. In the case of Cassandra data modeling, I start to realize how convenient ACID is in relational database.

And for those of you who are interested, there is another kind of database, called NewSQL, that preserves old SQL ACID characteristics, and is memory based, thus providing faster process time, and reasonably good volume(at terabytes level), single threaded(but very fast process for each record process), I will write a blog to talk about it. If you define your data set to be kind of big but not so big, it’s worthwhile to take a look at it.

 

 

 

Time series data system with Cassandra (2) – HBase and Cassandra

HBase and Cassandra are two main stream open source key-value pair NoSQL databases that can scale out and are fault tolerant. We looked at the two databases because they both work well with the challenge of high velocity incoming data compared to relational database.

B+ tree vs Log Structured Merge

Relational database use B+ tree to store data, it’s a tree structure that supports efficient insertion, lookup and deletion. The problem it has with high velocity data is: Each node in B+ tree represents a “block” or “page”, a computer writes or read in units of blocks, if the data is smaller than one block, it’ll take this whole block, if it’s bigger than one block but smaller than 2, it’ll just take blocks. Database systems manage data by blocks to make efficient use of disk space. If one tree “node” exceeds the block capacity, the node will be split into two nodes, and it might also affect its parent node. Now if data comes very fast, the split and re-organization of blocks will happen frequently. Eventually, when the frequency of re-organization happens outweighs the advantage re-organization brings, MySQL won’t be able to stay healthy any more, that’s when you will see database response time spiking, causing timeout error on your servers.

HBase and Cassandra takes a different approach called Log Structured Merge. As I mentioned in the last article, data is written to an in memory structure, it’s called memstore in HBase, memtable in Cassandra. When this memory structure reaches a certain limit or the user manually flushes memory into disk, it will be written into disk as a file. As this kind of file accumulates, these two databases start compaction process to combine smaller files into bigger files. Each file also contains a tree structure data sorted.

Fundamentally, B+ tree re-organization requires disk seek to find the node and do stuff, LSM requires disk transfer to merge files. According to Moore’s Law, CPU, RAM and disk size always double every 18-24 months, however, disk seek time only has 5% improvement every year. As a result, at scale, disk seek is less efficient than disk transfer. LSM gains advantage of hardware to outperform B+ tree in this perspective. (However, LSM also brings its own problems due to this design with UPDATE and DELETE operation, I’ll talk about them later)

Scale out

Except for high velocity, high volume is also a problem that MySQL has problem to deal with. Relational database is designed to work on one machine, depending your application’s data retention time, if it gets bigger than one machine’s capacity, you would either shard MySQL or look for other databases that can scale out. HBase and Cassandra employs different approach on this feature.

HBase arrange its data strictly ordered by its key byte order. For example, following is a sample sequence of keys in disk:

1,

11,

2,

3,

30

If a user wants to do range scan query, he would have to pad some numbers to his key design like the following:

01,

02,

03,

11,

30

You can keep inserting data into one region until it reaches a configurable maximum size, then it’ll be cut in half. Each region in HBase represents a key range, different key ranges are distributed in differet nodes. ZooKeeper helps HBase coordinate among different nodes and store metadata for clients to find the right node to write/read data. There is one master node and other normal nodes.

Cassandra actually provides a similar option called ByteOrderedPartitioner, which would order data like HBase does, however, it’s not recommended to be used in production since it’s hard to manage and will create hot spot issue. I tried it once and Cassandra would cut off a fixed length of my key and converts into some bytes, it turns out the cutting off part only occupied one small area of Cassandra supported key range, all my data end up in certain hot spot of the cluster. I didn’t dig further in this path. But HBase would do an auto-sharding for me, while in Cassandra you are on your own with this task.

The recommended way in Cassandra is called Murmur3 hash, it will hash your key evenly into its key range and help you evenly distribute your load across the cluster. Consistent hash and small virtual nodes are also techniques used to help ease the process of adding, removing node in cluster.

Cassandra doesn’t need ZooKeeper to help coordinate, it’s truely a peer to peer system, that the client could just talk to any node in cluster to write/read data. While in HBase, the client would talk to ZooKeeper first and remember which node to communicate, cache it memory for next time query, update when there’s a sharding going on.

As a result, both HBase and Cassandra can scale out with big volume data in their own approach,  depending on your use case, they have strength and weakness for different query pattern, I’ll talk about details in next article.

Gateway solution for IoT

More and more OEMs start to connect their products online, the first issue that comes is connectivity to the Internet. One might want to connect a rice cooker so you can remotely start cooking and have it ready when you get home, or enable some fancy machine learning algorithms to your home appliances so they would remind you to gain more exercises, sleep early, monitor your health status by collecting your activity at home. As an OEM, you can partner with an IoT platform, integrate a wifi chip to your device, probably add more processing powers, memory, in case you want some extra smart features(Firmware update ability, security, etc), then connect to the cloud and there it is, you get a cool connected product. However, each feature comes with a cost increase, a customer of yours might not really care about all the add-ons you provide, I don’t  want to spend 100 extra dollars to equip my light bulb with state of the art processor to make RESTful HTTP request or conduct complex over the air firmware update in a tailored unix operating system running docker. As an end customer, all I care about is that my light bulb correctly receives my cmd and sends status to me.

That’s where a gateway solution comes in. A gateway offers shared computing resource for multiple nodes, thus significantly lowers the complexity and cost to each individual device.

A gateway understands WiFi and handles all traffic between nodes and the cloud, mobile. Your light bulb doesn’t have to understand any TCP/IP protocol, not even Bluetooth, just plug your light bulb into a power interface provided by the gateway, now your gateway knows whether or not your light is on and can control it by turning on and off the power on the power endpoint. Now if you have ten light bulbs, you don’t have to add WiFi hardware support for each of them, just plug them all to your gateway.

A gateway should be generic to many different protocols. Think about a Low Power Bluetooth connected battery powered door sensor? Use a gateway that supports Bluetooth, and leave only the simplistic computing job on the door sensor itself, thus lowering power consumption, hardware requirement, integration simplicity, expertise to develop WiFi stack, etc. Now extend to even more: Zigbee, ZWave, Bluetooth, Ethernet or even ADC to get voltage based sensor data.

What if your internet goes down? Well, as long as there is electricity, you still have control of your light bulb through gateway inside your room, which is connected to your router. This brings another interesting topic: Where should the smarts go in a gateway model, should it be in the cloud or the gateway? The trade off is what you can provide to customer without internet connection. If more smarts stay in gateway, that adds more cost to every single gateway device, but provides more functionality offline. For example, if Zigbee details live in the cloud, you won’t be able to use mobile to control Zigbee connected devices without WiFi, plus that also brings extra time delay, causing customer experience issue, another workaround is to implement Zigbee details in mobile apps, which adds more implementation effort in iOS or Android.

How generic could a gateway be? Standards like Zigbee is understood by most manufacturers, but each IoT platform has its own representation of device.Interoperability at network level doesn’t really address application level problem, but there are various things we can do about it. Each device has an online shadow representation of itself, at Ayla we call it “template”, it’s like a blueprint of a device, specifying what functionalities a device has. Different standards also have similar definitions of functions, for example, in Zigbee, a cluster defines one functionality, on-off cluster means turning on or off something. On the cloud, these clusters can be manually defined into templates so the cloud understands the device. On the gateway, it can query the node in Zigbee protocol on what clusters it support, and send this information to the cloud to attach the new node with its clusters. The manufacturer of the node could input its functionalities to the specific IoT cloud so it’s ready to take the info sent from gateway, constructing the shadow device. Apart from this, there are other more generic features that a gateway need to handle for nodes, like connectivity of a node through gateway, over the air update. Some require specific implementation, some are generic to one standard. It requires careful data modeling on the cloud generic communication between cloud and gateway.

Gateway model also presents interesting performance challenge to the cloud. Unlike a one device to cloud connection, a gateway can represent many devices. For example, Zigbee standard allows over 60k devices with one gateway. Now imagine the gateway trying to report 60k devices’ status to the cloud in one request, or get commands from the cloud for all these devices, or mobile trying get all nodes’ information attached to this gateway. And if you multiply this number with the number of gateway the cloud supports, things can get into a mess. Well, it doesn’t have to get all of them in one request, but either way, it’s a trade off between number of request and amount of information in one request.

What about sharing a node with a friend of yours? With a one device to cloud model, the user authentication and authorization goes through the cloud, with a gateway model, shall the gateway also keep the user specific information for each node?

Gateway solution presents very interesting, open ended challenges. And the solution varies for different platforms. I’m only touching the surface here, will introduce more details in later post.

 

Time series data system with Cassandra(1)

Time series data is very common in science, engineering, financial domain, with the popularity of Internet of Things, it’s getting even more crucial for business. For a database, time series data presents a lot of challenges At Ayla Networks, we switched from MySQL to Cassandra to solve this issue, I’ll talk about comparison between MySQL and Cassandra regarding one of the challenges brought by time series data in this article, and will compare Cassandra with other NoSQL database later in a different article.

The problem is high velocity database WRITE operation. Think about an energy sensor, temperature sensor, or fluid pressure sensor in HVAC system, data might come into your db several times a minute for each sensor. With batch process, you could even have hundreds of data writes in one request from a talkative machine that has multiple sensors attached. This characteristics makes it hard for traditional relational database(like MySQL) to achieve stable performance. As we know, for MySQL to write one record, it requires disk seek operation to find the correct place to insert, while cassandra approach inserts in a different way: it has an in-memory structure called memtable andan append-only commitlog file. When a new record comes in, the data will be written to memtable, which is very fast since it’s in memory, when memtable is full, Cassandra will flush its content into disk. For every request, the data is also appended to commitlog file for recovery purpose. The whole insert process requires no disk seek operation. Depending on the type of hardware(SSD, electromagnetic disk) and operating system, disk seek time varies, but given the same hardware and OS, cassandra is clearly much faster with no disk seek at all.

Another factor that comes with Cassandra is its distributed system design. To achieve high availability, one insert will be sent to n different nodes, you have the option to configure n. This is the same principle as HDFS has, usually with a replication factor of 3 to guarantee not losing any data in case of some machines getting down. Cassandra introduces a configuration concept called “tunable consistency level”, one can define that as long as 2 of 3 required Cassandra nodes have got the data in memtable and commitlog, Cassandra can return success to its client. The other node might have got the data or not, but Cassandra will eventually reach consistent data across all 3 nodes through other mechanisms like gossip and hinted hand off, repair service, read repair, etc. Depending on your configuration, one need to make a trade-off between insert speed and consistency level choice, note that you do the same kind of consistency level for read operation in Cassandra. This is application specific decision, if you care about immediate consistency, you can choose higher consistency level for write, or you can balance consistency level of both write and read, say you config both write and read at 2 out 3 duplicates, it’s guaranteed that you will always read the correct data. As a result of duplicate writes, there will be extra network latency in Cassandra compared to MySQL, which is designed to work on one machine. MySQL is often deployed in Master-Slave pattern where data is duplicated to slave as a read only machine, in this case, the same read consistency problem would arise. Sometimes people also use sharding strategies in MySQL, which requires a lot of manual effort, and sharding strategy varies from case to case, adding lots of dev ops work. There have been a lot of effort to improve MySQL scalability, but if you think about each strategy, they often tend to trade off relational database advantages(ACID) and become more “NoSQL”, then the problem becomes more of a linear trade-off decision instead of binary decision. CAP theorem describes the trade off very well among many different databases.

iMkdg

Of course, the fast WRITE speed doesn’t come in no price. Cassandra read queries are much less flexible compared to MySQL, it has two protocols: thrift and CQL, thrift has been deprecated at this point and CQL syntax looks very similar to SQL, but that’s probably the only similarity between CQL and SQL, for example, CQL won’t support Order By with any non-primary key field, it doesn’t support inequality search != at all, it doesn’t support > or < except for primary key, its secondary index has a lot of performance issues(The latest Cassandra 3.4 introduced a new secondary index implementation though, which I haven’t tested yet). Due to these limitations, schema design for Cassandra presents a lot of interesting problems, especially for time series data. I’ll continue to talk about these topics in the upcoming series.

 

PS: This is the first tech blog I’m starting, I’m sure there are lots of improvement to make, but I’m being agile and hope to get suggestions, iterate faster,  get better sooner, any constructive comment is welcome.