Sunday, April 5, 2020

Decentralized Database over DLT

When the frenzy of Blockchain or Distributed Ledger Technology (DLT) was at its peak, someone made the following statement to cool down the expectation:
A Blockchain is just a glorified distributed and decentralized database.
The merit of the statement depends on the definition of "Decentralized Database". Such concept is related to replicated database. I would propose the following definitions:
  • A centralized (replicated) database is a cluster of database replicas that is "stop-fail failure" tolerant.
  • A decentralized (replicated) database is a cluster of database replicas that is "Byzantine fault" tolerant (BFT).
"Stop-fail" failure simply abstracts process crashes and is a benign failure, and the common database replication technologies for HA deals with such failures (for example: the MySQL HA strategy deployed at Github). The "Byzantine fault" though abstracts all forms of erroneous or even malicious behaviors of participating processes in a distributed system. In a decentralized environment, where replicas are controlled by different administrators without full trust between them, DLT provides BFT consensus for the replication cluster.

A DLT protocol (such as the protocol of how nodes behave and interact with each other in the Bitcoin, Ethereal or a Hyper Ledger network) provides the foundation for implementing a "Decentralized Database". But how is such abstraction useful in real life? I proposed and prototyped the abstraction in the context of enterprise applications as an incubation project at Microsoft in 2018. The following document is the specification of Decentralized Entities and Decentralized Transactions for Cross-enterprises Business Applications.

Monday, February 24, 2020

Service Architecture for OLTP --- Part 4. SAGA Patterns in Micoservice for OLTP


This post covers the SAGA programming patterns in the OLTP application architecture explained in this series.

First revisit the requirements for Web Services with or without OLTP:
  • Large scale on online request serving:
    High throughput and concurrency with reasonable latency for request serving.
  • Large scale on data in the system:
    The states of the system will need to be distributed (partitioned).
An Ecommerce website is a typical example of such Web Service with strong OLTP requirement on certain user operations, such as checkout a shopping cart. Consider the following scenarios:
  • The Website has large number of active products for sale. Products are randomly grouped into DB shards to avoid hot spots.
  • A popular product may be concurrently checked out by large number of user sessions. 
  • The sequence of operations for the final step of checkout:
    1. Check and deduct inventories of all products in checkout cart.
    2. Call payment service to secure payment.
    3. Generate an order.

Naive Implementation


Wrap the whole sequence of operations into a single DB transaction or a global transaction due to DB sharding and/or payment service is invoked thru RPC. This implementation likely will not satisfy the requirements on concurrent checkout on the same product, because an in-cart product's row in inventories table will be locked for the duration of the transaction, which prevents other checkout operation on the same product from making progress.
Global transaction will exacerbate the problem as overhead of network messaging and 2PC implementation will make the transaction duration even longer.

SAGA Pattern


The intuition is to break a long transaction to shorter ones. Specifically separate out short transactions to encapsulate operations that are potentially contentious (between different concurrent transactions). The example would have at least two transactions:
  • Transaction 1: Reserve inventory for a product in checkout cart:
    1. Check and deduct inventories of the product.
    2. create a reservation for the product and the checkout cart: {prodId, cartId, quantity}. This could be a new row in the checkoutReservation table.
  • Transaction 2 is performed after transaction 1 is performed on every product in the cart. The input is the cart with only products that transaction 1 succeeds on.
    1. Call payment service to secure payment.
    2. Generate an order.
    3. Remove reservations for all reserved products.
Note that transaction 2 only access data within the scope of current checkout session, and hence without any contention from other checkout sessions (assuming DB locks at record level). Transaction 1 needs to be a short transaction as there is contention. Hence we should make it a local transaction --- keep Inventory record and its corresponding reservation record in the same DB shard. We will also need to have a compensating transaction for Transaction 1 to rollback Transaction 1 if Transaction 2 fails.
  • Comp_trans 1: compensating transaction of Transaction 1.
    1. Remove reservation for the pair of {prodId, cartId}.
    2. Add back inventory of the product.
We will go thru a more realistic SAGA sequence for this example in part 5 (Workflow), where a separate order creation step is added to request processing part of the flow. For now, let us look at the state transition diagram of this over simplified SAGA sequence.

The dash line separates "Synchronous Request Processing" from "Async Processing" that happens outside the context of request processing and logically after server returns response to user. The "Async Processing" part of the state transition will need to be handled by a workflow (more explained in the "Workflow" post).

SAGA has the following properties:
  • The sequence of multiple transactions of the SAGA pattern does not provide serializable isolation on the whole sequence as a single transaction could. This implies the following constraints on the use cases:
    • The SAGA sequence of operations needs to be logically isolated already in the application. In other words, there is no other writers on the same data set during the progress of the sequence. In the above example, transaction 2 operates only on data within the scope of the checkout session and not shared by other checkout sessions.
    • Read requests can tolerate some transient states during the execution of the SAGA sequence. In the above example, at the state of "order in progress", inventory is deducted but payment is in progress, the application need to be able to reconcile the "inconsistent" state when user queries the state of the order.
  • Both transaction and compensating transaction need to be idempotent. This is an important requirement imposed by distributed workflow processing with inherent issue of network partitioning (for example, network partitioning between data stores and workflow engine).
  • Transactions in the SAGA sequence will need to be automatically orchestrated in both the context of request processing and async processing. Compensating transactions will also need to be added to the sequence for orchestration in case of rollback.
  • Compensating transactions will have to eventually succeed albeit with retries (as indicated in the above state transition), or else the system will be in an "inconsistent" state, which may require human intervention. 

Idempotent Transaction


The key technique to implement idempotency is to:
  • Always generate new record in DB, which is associated with pre-existing ID.
  • If the transaction intends to update a record, also generate above-mentioned new record to represent the occurrence of the update.
How this works is pretty straightforward --- you always check whether a record with that ID already exists before you generate a new one.

SQL Example for Transaction to Reserve Inventory


The following SQL code example is tested on MariaDB (a fork of MySQL after the Oracle acquisition).
Database schema for tables inventories and reservedInv, which are both sharded by prodId to guarantee local transaction for reserving inventory.

CREATE TABLE inventories
(
  prodId          VARCHAR(20),        # Unique ID for the record
  num             INT,                
  PRIMARY KEY     (prodId)            # Make the id the primary key
);

CREATE TABLE reservedInv
(
  prodId          VARCHAR(20),  
  cartId          VARCHAR(20),  
  num             INT,                # number reserved
  PRIMARY KEY     (prodId, cartId)    # Make the id the primary key
);

The transaction to reserve inventory demonstrates the technique to implement idempotency and use read lock.

Delimiter //
Create Procedure reserve_inventory(
    IN prod VARCHAR(20),
    IN cart VARCHAR(20),
    OUT status VARCHAR(20)
)
BEGIN
START TRANSACTION;
# check whether the prodId is already reserved by the cartId to guarantee idempotency
SELECT COUNT(1) INTO @reserved FROM reservedInv WHERE prodId=prod AND cartId=cart;
IF @reserved > 0 THEN
    COMMIT;
    SET status = 'NO OP';
ELSE
    # use select ... for update to explicitly lock read
    SELECT num into @invNum from inventories WHERE prodId = prod FOR UPDATE; 
    IF @invNum > 0 THEN
        UPDATE inventories SET num = num -1 WHERE prodId = prod;
        INSERT INTO reservedInv (prodId, cartId, num) VALUES (prod, cart, 1);
        COMMIT;
        SET status = 'RESERVED';
    ELSE
        ROLLBACK;
        SET status = 'ABORTED';
    END IF;
END IF;
END //
Delimiter ;

CALL reserve_inventory('p1', 'c1', @status); SELECT @status;

The SQL for the compensating transaction should be straight forward based on the above.

Saturday, February 22, 2020

Service Architecture for OLTP --- Part 2. Transaction in Database

ACID is a buzz word in database jargon and you can find a good explanation of its meaning on many sources. Out of those four properties:
  • Atomicity is best understood with least ambiguity.
  • Consistency in ACID means very different from the same term in the context of distributed systems. C in ACID refers to that a transaction takes the data in DB to a valid state. For example, the result of a transaction has to satisfy all the data constraints, such as primary-foreign keys, or the transaction can not commit.
  • Isolation defines how the states a transaction sees could be affected by other transactions. It covers many alternative semantics as compromises on the most ideal isolation for implementation efficiency.
  • Durability is conceptually simple and clear for an abstract database, but implies a lot complexity in the context of Distributed Systems, where a database is replicated for HA and DR.
This post will explain local transaction in a database, particularly the implementation of A and I in ACID.

Serializable Isolation Level

Ansi SQL defines 4 isolation levels: Serializable, Repeatable reads, Read committed and Read uncommitted. Serializable is the strongest of the four and also the most popular choice for applications based on safety. In Serializable isolation, a transaction sees only the states generated/updated by transactions committed before the start of the transaction.

Use MVCC to avoid Read Lock

Serializable isolation can be implemented simply with read locks on read set, which either blocks parallel transactions that share the same records for read or write in terms of pessimistic locking (all locks are acquired at the first access), or cause a lock conflicts with those transactions in terms of opportunistic locking. MVCC (Multi-version Concurrency Control) can be used to avoid locks on read set, and hence achieve higher concurrency of transaction processing. In fact, Read-only transaction does not need to acquire any locks. MVCC keeps multiple versions of the same record (a table row in relational DB). Creation timestamp and expiration timestamp are tracked for each version of a record. A transaction finds the version of a record to read based on that its timestamp fall into the range of those two timestamps.
MVCC though is not needed for isolation level of Read-Committed, where the latest committed version is always read in a transaction.

Locking Writes

Locking will always be needed for write set in a transaction. The granularity of locking is usually a record (a table row) in most relational DBs, which is important for parallelism of RW-transactions. In pessimistic locking, a record is locked at its first update in a transaction and released when the transaction commits or aborts. But a record is not be locked for read in a MVCC based implementation for serializable isolation.

Locking Reads

Serializable isolation does not prevent the following scenario:
"At time t0, record r1 is read by transaction T1. At time t1, r1 is updated by another transaction t2 and t2 commits. At time t2, T1 performs a mutation on record r2 based on its read version of r1, and T1 commits."

The above scenario is actually OK for most of the application usages, which is the reason why serializable isolation is considered enough for most. Though an even higher isolation level would be locking both read set and write set. Most relational DBs provide such way to lock read set just as if there were updated. For example, MySQL uses "SELECT ... FOR UPDATE" as a read lock.

Scope of Transaction

Most DBs implements local transactions with primitives based on shared memory. Hence the scope of a local transaction is usually a DB shard, which is hosted on a single node. Local transaction is sufficient for majority of application usages, where data can be naturally partitioned and there is little  need to have transaction across partitions. For example, Google Ads once used a database sharded by advertiser accounts, where every transaction is within the scope of the data of a single advertiser.

Transaction Language

A transaction is a sequence of operations. A language is needed to describe such sequence of operations and that language is SQL for relational DBs. The question whether SQL has the same expressive power as other functional languages or imperative languages turned out to be rather complex. But practically, the expressive power of SQL has successfully eliminated the necessity of interactive transactions for application programs. An interactive transaction is similar to a transaction performed thru a DB client console, where output from operations are fed to client and client feeds input to subsequent operations. An interactive transaction is inefficient for two reasons:

  • Network time is added to the duration of transaction and hence the duration of locking on some data. This not only increase the latency of transaction but also reduce the concurrency of transaction processing.
  • May not be possible to optimize a sequence of operation as a whole as they are not submitted to DB server at once.

There are clear reasons for SQL being the transaction language of relational DB:

  • SQL is a domain specific language for relational algebra.
  • SQL is declarative for the most part.

A declarative language describes what to do as opposed to how, hence relieves programmers from coding all details correctly. This is especially important for operations programmed by uses but running on servers (regardless whether it is relational DB or other types of data stores). Such scenario requires both efficiency and safety (Eg, preventing DB service crashes caused by running user-coded transaction).

This post provides the necessary background knowledge for the next one on application patterns of OLTP.

Service Architecture for OLTP --- Part 1. Introduction

OLTP (Online Transaction Processing) imposes the most challenging requirements for Micro-service based Web application architecture, in terms of the following properties:
  • ACID on state changes of the system.
  • Scalability of data over commodity hardware.
  • High throughput and low latency on online request handling.
In this series, I will analyze those requirements and describe the architecture and programming patterns of OLTP Web service and the underlying data store. The following view of such system will greatly help understanding both the requirements and the architecture:

An OLTP system is a distributed system that encapsulates a set of states (system states) under an interface for querying states as well as to changing system states. A request to such system may read a view of the system states, perform change of states change and/or trigger eventual state changes. Both view of states and change of states are performed as ACID transactions.

General Architecture

  1. Synchronous Request Processing
    This is the mechanism that handles the interactions with users, in other words, the code path that processes a request from user. In general, a user request incurs both read of system states as well as mutations on system states. For example, in the convention of REST over HTTP, a GET request sometimes only encapsulate reads, while a POST/PUT/PATCH/DELETE sometimes encapsulate both reads and mutations on the server side. I will refer to the former as RO (read-only) user request and the later as RW (read-write) user request.
    In an OLTP Web Service, it could be required that both RO request and RW request are ACID. It means the following:
    • A RO user request performs a READ on system states with some ACID requirements. For example, it may need to be a snapshot READ with serializable isolation level (I will cover isolation level later).
    • A RW user request performs a Read-Write operation on system states with some ACID requirements. For example, it may need to be a RW transaction with serializable isolation level.

  2. Asynchronous Processing
    Asynchronous processing occurs as as an eventual effect of synchronous processing of a user request. Using an example of order checkout, user clicks a button "Confirm Checkout" at the final step of checkout and this sends a "checkout" request to Web Service. All the following steps may occur within the synchronous processing of such request: deducting inventories of products, payment, creating an order object, etc. But the post-purchase processing, including order processing and fulfillment, will occur eventually but automatically as a result of successful handling of "checkout" request but outside the context of handling that request. Asynchronous processing usually have the following properties:
    • It occurs eventually.
    • It may be multi-stepped. And each step may be an ACID transaction on the system states.
    • It needs to occur exactly once or it needs to be idempotent. And this requirement applies to each step.
    Async processing is usually implemented with lower level abstraction of event driven processing or higher level abstractions such as Workflow, which will be covered by a post in this series.

  3. Distributed Data Store
    Both implementations of synchronous processing of request and asynchronous processing are mostly stateless in a typical architecture. And it leaves only the data store to be the stateful component in this architecture to store all the system states. Most people think of the data store as a relational database, which could be true for most implementations. There is further a perception that ACID transactions are only implemented within such data store, but this is not really true for a typical Microservice OLTP architecture, where transactions are actually implemented at both the microservice level as well as database. The perception actually came from the convenience implied in the traditional architecture of monolithic data store, where all computations are performed on a single centralized logical database. But in modern architecture, data store is distributed, in particular, sharded (partitioned) for scalability over commodity hardware. The distributed data store has the following properties:
    • System states are partitioned into different logical stores (see more in "Partitioning of the System"). And each logical store is further sharded based on computation locality for scalability.
    • Each shard is replicated for HA and DR. In a typical architecture, replication is implemented based on quorum-based cluster within a data center. Paxos and Raft are two common algorithms for forming consensus on the states replicated over a cluster. MySQL's group replication is one example implementation. Asynchronous replication is usually performed across data centers for data center level disaster recovery.
    • ACID transaction is supported within the scope of a shard.
    • Transaction beyond the scope of a shard is supported either as a distributed DB transaction across multiple shards of a DB or a global transaction across multiple microservice instances (, each of which performs read and write on its own DB shards).

Partitioning of the System


Partitioning of system states based on separation of business logic into domains is one of the most important characteristics of such system architecture. Such partition not only enables efficiency of locality of data and computation, but also enable the bounded context for Domain-Driven Design of software. Such partition is across the whole stack:
  • Data store is partitioned, with each partition stores states of one business domain. I refer to such partition as a "logical DB" or a "DB".
    For instance, an Ecommerce site might have the following logical DBs: Products (including categories, products, inventories, etc), Users, etc. And those different logical DBs may be backed by different data store implementations. For example different data stores may be selected based on the requirement of transactions, processing latency, throughput, scalability and cost for each specific business domain. And we reply on microservice layer to hide the heterogeneity of data stores.
    *Note that each "logical DB" may be further sharded based on connectivity of records in the logical DB for scalability. That will be referred to as "DB shard".
  • Computations including both view generation of system states and updates on system states, are also partitioned according to the partition of those states. Computations are usually partitioned into different microservices, though some of them may also be implemented as procedures running on DB for efficiency purposes (see transaction language in Part 2).
This series will cover the following topics with details, which will be analyzed and examined with the examples of ECommerce systems. As of 02/25/2020, only Part 1, 2 and 4 are published.

Part 2. Transaction in Database
The basic but fundamentally important understanding of transaction processing in modern databases.

Part 3. stributed Transaction and Global Transaction
How to implement cross-DB-shard and cross-DB, cross-microservice OLTP.

Part 4. SAGA pattern in Microservices
Explain workflow-based eventual-consistence processing as a replacement strategy for global transaction, with example code.

Part 5. Workflow for Asynchronous Transaction Processing
Continuation from the SAGA pattern. Look into workflow-based asynchronous processing and its implementation based on open source workflow engine.

Part 6. Distributed Database
Sharding and replication of databases. Look into DB cluster management.

Part 7. Schemaless and Object Stores over Relational DBs
Strategies to implement object stores with flexible schema over relational DB.

Decentralized Database over DLT