微服务实战之 Cassandra 之一

概述

一提到数据存储马上我们就会想到数据库, 一想数据库就会想到 Oracle, MySQL等关系数据库。

其实今天摆在我们面前有更多选择, 例如

  • 迷你版的关系数据库 SQLite
  • 流行的开源数据库 MySQL
  • 强大的开源数据库 PGSQL
  • 时间序列数据库 InfluxDB

和众多的 NoSQL 数据存储系统

  • Cassandra
  • Riak
  • Redis
  • MongoDB

以及分布式文件存储系统

  • HDFS
  • Swift (Openstack Object Storage)

Cassandra 是 NoSQL 的典型代表产品, 具有极佳的线性可扩展性和高可用性, 同时性能也不错, 跨Data center的复制也还行, 不足之处在于它做不到强一致性, 不要指望写入Cassandra的数据, 别人立马就能正确无误地由读出来. 与大多数NOSQL产品一样, 它所能保证的只能是最终一致性.

先复习一下相关理论

CAP

著名的CAP理论提出一致性,可用性,分区容错性,三者不可兼得

C:Consistency,一致性, 在不同的地方和时间点上数据总是一致的
A:Availability,可用性, 在任何地点和时间都可以使用服务
P:Partition tolerance,分区容错性,即使出现网络故障系统依然具有可靠

大多数 nosql 产品都选择了牺牲强一致性,保证可用性和分区容错性,以及最终一致性,Cassandra 也是如此

ACID

传统关系数据库用 ACID 来保证强一致性, 这个 Cassandra 是做不到的

A: Atomicity,原子性
C: Consistency,一致性
I: Isolation,隔离性
D: Durability,持久性

BASE

大多数 NOSQL 系统采用的方式

BA:Basically Available,基本可用
S:Soft State,软状态,即中间可能不一致的状态
E:Eventually Consistent,最终一致性

分布式哈希表和一致性哈希

DHT-distributed hash table 分布哈希表, 一种去中心化的分布式系统, 提供类似于哈希表查找服务, 键值对存储在DHT中, 任何参于的节点都可根据 key 来存储相应的值

DHT的特点是

  1. 独立自主性: 各个节点各自为战, 不需要中央的协调和控制节点
  2. 容错性: 任何一个节点加入,离开或损毁, 系统依然可用
  3. 可扩展性: 可以任意增加节点以提高系统容量

打个比方, 我有一篮12个鸡蛋, 可以放在三个篮子里
用最简单的取余法, 将鸡蛋编号0~17除以3, 每个篮子里放4个鸡蛋
假如增加或减少一个篮子, 都不是问题, 我只要根据编号取余一下子就能确定地找到那个鸡蛋
当然这里的取余法太 low 了, 虽然数据分布绝对均匀, 但是一旦增减篮子我们就需要移动鸡蛋来满足约束条件

一致性哈希就是比取余法更高级点的哈希算法, 一致性表示即使有篮子的增减, 无需移动数据依然可以根据编号确定地找到想找的鸡蛋, 并且通过虚拟节点技术使得数据分布也比较均匀

举例如下


package com.github.walterfan.util;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Collection;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;



interface HashFunction {
    Integer hash(String keyStr);
}

class MD5Hash implements HashFunction {

    public Integer hash(String keyStr) {
        byte[] bKey=mac("MD5", keyStr);
        return ((int) (bKey[3] & 0xFF) << 24)
                | ((int) (bKey[2] & 0xFF) << 16)
                | ((int) (bKey[1] & 0xFF) << 8)
                | (bKey[0] & 0xFF);
    }


    public static byte[] mac(String alga, String str) {
        MessageDigest md;
        try {
            md = MessageDigest.getInstance(alga);
            return md.digest(str.getBytes());
        } catch (NoSuchAlgorithmException e) {
            throw new IllegalArgumentException(e);
        }

    }

}

public class ConsistentHash<T> {
    private static Log logger = LogFactory.getLog(ConsistentHash.class);
    private final HashFunction hashFunction;
    private final int numberOfReplicas;
    private final SortedMap<Integer, T> circle = new TreeMap<Integer, T>();

    public ConsistentHash(HashFunction hashFunction, int numberOfReplicas,
                          Collection<T> nodes) {
        this.hashFunction = hashFunction;
        this.numberOfReplicas = numberOfReplicas;

        for (T node : nodes) {
            add(node);
        }
    }

    public void add(T node) {
        for (int i = 0; i < numberOfReplicas; i++) {
            circle.put(hashFunction.hash(node.toString() + i), node);
        }
    }

    public void remove(T node) {
        for (int i = 0; i < numberOfReplicas; i++) {
            circle.remove(hashFunction.hash(node.toString() + i));
        }
    }

    public T get(String keyStr) {
        if (circle.isEmpty()) {
            return null;
        }
        int hash = hashFunction.hash(keyStr);

        if (!circle.containsKey(hash)) {
            SortedMap<Integer, T> tailMap = circle.tailMap(hash);
            hash = tailMap.isEmpty() ? circle.firstKey() : tailMap
                    .firstKey();
        }
        return circle.get(hash);
    }

    public String toString() {
        StringBuilder sb = new StringBuilder("");
        for(Map.Entry<Integer, T> entry : circle.entrySet()) {
            sb.append(entry.getKey());
            sb.append("=");
            sb.append(entry.getValue());
            sb.append("\n");
        }
        return sb.toString();
    }

    /**
     * @param args
     */
    public static void main(String[] args) {
        java.util.List<String> nodes = new java.util.ArrayList<String>(3);
        nodes.add("1");
        nodes.add("2");

        ConsistentHash<String> continuum = new ConsistentHash<String>(new MD5Hash(), 8, nodes);
        String strID = "16580";

        String node = continuum.get(strID);
        System.out.println("select node: " + node + " for " + strID);

        java.util.Map<String, Integer> statMap = testDistribution(continuum);
        System.out.println(statMap);

        continuum.add("3");
        node = continuum.get(strID);
        System.out.println("add node 3, select node: " + node + " for " + strID);

        java.util.Map<String, Integer> statMap1 = testDistribution(continuum);
        System.out.println(statMap1);

        continuum.remove("3");
        node = continuum.get(strID);
        System.out.println("remove node 3, select node: " + node + " for " + strID);
        java.util.Map<String, Integer> statMap2 = testDistribution(continuum);
        System.out.println(statMap2);
    }

    private static java.util.Map<String, Integer> testDistribution(
            ConsistentHash<String> continuum) {
        java.util.Map<String, Integer> statMap = new java.util.HashMap<String, Integer>();

        for(int i = 10000; i < 20000; i++) {
            String svr = continuum.get("server" + i);
            //System.out.println(i + ". server: " + svr);
            Integer cnt = statMap.get(svr);
            if(null == cnt) {
                statMap.put(svr, 1);
            } else {
                statMap.put(svr, cnt + 1);
            }
        }
        return statMap;
    }

}



输出如下

select node: 2 for 16580
{1=5891, 2=4109}
add node 3, select node: 2 for 16580
{1=4703, 2=2517, 3=2780}
remove node 3, select node: 2 for 16580
{1=5891, 2=4109}

安装

brew install cassandra22

工具

cqlsh

./cqlsh 192.168.3.5 -u test - pass

node tool

./nodetool -h 192.168.3.5 ring

nodetool status

Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address    Load       Tokens       Owns (effective)  Host ID                               Rack
UN  127.0.0.1  1015.55 KB  256          100.0%            9ec3a333-81bc-4945-9db1-ef1a13b62cde  rack1

CQL

类似 SQL , 详细语法参见 http://docs.datastax.com/en/dse/5.1/cql/cql/cql_reference/cql_commands/cqlCommandsTOC.html

SELECT * from system.schema_keyspaces limit 10;
SELECT * from system.schema_columnfamilies limit 10;
SELECT * from system.schema_columns limit 10;

创建一个表空间

CREATE KEYSPACE pims 
  WITH REPLICATION = { 
   'class' : 'NetworkTopologyStrategy', 
   'datacenter1' : 1 
  } ;

架构及特点

Cassandra 写的时候是将数据追加到可用节点的 commit log 上, 因而速度很快

  1. 将数据追加到 commit log 上
  2. 更新内存的数据结构 memTable
  3. 内存数据如果超过了最大限值, 则储存到磁盘上的 SSTable 中

相比之下, Cassandra 读的速度比较慢, 因为它要根据主键至少读取两个节点, 经过数据比较后得出结果

参见 Cassandra Architecture in brief

Gossip 协议

流言协议, 俗话说, 流言蜚语, 一传十, 十传百, 尽人皆知. Gossip 协议就是采用了类似的方法将消息迅速从一个结点复制到其他结点. 传递时需要注意的就是传递的广度和深度, 不要造成回环

具体实现可参见 https://github.com/apache/incubator-gossip

Gossiper 类随机选择一个节点来发送消息 GossipDigestSyncMessage, 接收消息节点发回确认消息, 发送节点回应, 这轮消息同步即完成 (send-receive-ack), 如果同步失败, 则将此节点标记为可疑节点, 连续发生同步失败则可认为此节点已经挂掉了, 具体算法为 Phi Accural Failure Detection.

核心结构

Node 节点

也就是你存储数据的地方, 一般我们至少部署三台Cassandra 节点
在读写的时候同时向三台节点发出请求, 任意两台返回响应即可

N < W + R
N 为复制的节点数
W 为写数据的最少返回节点数
R 为读数据的最少返回节点数

3 < 2 + 2

如果是 7 台节点, 复制因子设为3, 根据key 哈希过后会找到三台结点,依然是

3 < 2 + 2

复制因子设为4, 那就是7 < 4 + 4, 不过一般复制到三个结点也就够了

datacenter 数据中心

一组相关节点的集合, 可以是物理上的一个数据中心, 也可以是虚拟的数据中心
一般我们认为不同的数据中心应该位于不同的网络位置. 一个数据中心内的节点是在同一个网段中的

Cluster 集群

A cluster contains one or more datacenters. It can span physical locations.
一个集群包含一个或多个数据中心, 它可能会跨越多个物理位置.

Commit log 提交Log

All data is written first to the commit log for durability. After all its data has been flushed to SSTables, it can be archived, deleted, or recycled.

所有的数据都会先写到commit log中持久化, 当数据被刷新到 SSTables 后, 相关的commit log可以被归档,删除或回收

Table 表

类似于关系数据库, 表就是行的集合, 行由列组成, 列又分为主键和值

SSTable 排序字串表

排序的字符串表(SSTable)是一种不可变的数据文件, Cassandra定期向其写入memtable。 SSTables仅可追加数据, 按顺序存储在磁盘上,Cassandra为每个表保留一张 SSTable.

Cassandra

数据模型

Cassandra RDBMS
Key primary key
Column family table
Keyspace database
Cluster server

Cassandra 的 data model 设计与关系数据库大为不同, 在设计模型不能完全照搬传统关系型数据库的那一套

  1. 不用想方设法减少写次数

在cassandra 中对于大量的写操作有所优化, 效率很高, 相比之下, 读操作更加耗时

  1. 不用想方设法减少重复数据

磁盘空间不是大问题, 与其遵循数据库范式千方百计地减少重复次数, 不如增加冗余以减少读的次数

除此之外, Cassandra 没有 Join 操作, 这个在分布式架构下这也绝非我们想要做的

基本目标

    1. 在集群中均匀分布数据

记录是按照 primary key 的第一个元素 partition key 的哈希在集群中分布的, 所以要挑选一个易于均匀分布的 primary key

    1. 尽量减少跨分区读数据

分区是一组行记录的集合, 共享相同的partition key , 当你执行一个查询时, 你要尽可能从尽量少的分区中读取

这两条原则看起来有一点点矛盾, 第一条是针对写,第二条是针对读,在设计数据结构时要做好平衡和取舍

数据重复不是问题, 数据结构可以灵活, 只能根据 Key 进行查询
在设计数据模型时可以按以下步骤

步骤1: 根据需求, 把查询和修改哪些数据一一列举出来
数据是查询多还是修改多, 查询多的数据最好放在一个分区里, 修改多的数据最好放在不同的分区中

步骤2: 创建表时尽量使查询和读取落在一个分区里,宁愿多写一点,也不要跨区或表读取

注意事项

我们在使用 Cassandra Driver 提供的 CQL 时, 常错误地和 SQL 搞混了
注意

  1. Insert == Update , 所有的插入和修改本质上都是 Set

  2. 由于 Cassandra 没有行级锁,如何处理并发情况下的竞态条件(race condition) 呢?
    一是写完以后于读一下, 如果比较之后发现不是当初写的, 直接向客户端报错, 提示冲突, 写入不成功
    二是使用 Cassandra 2.0 之后提供的轻量级事务, 比如

INSERT INTO USERS (login, email, name, login_count)
values ('jbellis', 'jbellis@datastax.com', 'Jonathan Ellis', 1)
IF NOT EXISTS
And an an example of resetting his password transactionally:

UPDATE users
SET reset_token = null, password = ‘newpassword’
WHERE login = ‘jbellis’
IF reset_token = ‘some-generated-reset-token’

Cassandra 的技术细节和内部实现再另行研究总结, 这里就此打住.

参考资料

客户端配置

  • Address: 127.0.0.1:9042
  • Username
  • Password
  • LocalDataCenter
  • ConsistencyLevel
  • Keyspace
  • RetryPolicy
  • ReadTimeout
  • ConnectTimeout

Cassandra ConsistencyLevel

  • ANY
  • ONE
  • TWO
  • THREE
  • QUORUM
  • ALL
  • LOCAL_QUORUM
  • EACH_QUORUM
  • SERIAL
  • LOCAL_SERIAL
  • LOCAL_ONE

驱动

Java Driver

<dependency> 
<groupId>com.datastax.cassandra</groupId> 
<artifactId>cassandra-driver-core</artifactId> 
<version>3.1.2</version>
</dependency>


<dependency> <groupId>com.datastax.cassandra</groupId> 
<artifactId>cassandra-driver-mapping</artifactId> 
<version>3.1.2</version>
</dependency>


<dependency> 
<groupId>com.datastax.cassandra</groupId> 
<artifactId>cassandra-driver-extras</artifactId> 
<version>3.1.2</version>
</dependency>

Python Driver

Cassandra源码:

git clone http://git-wip-us.apache.org/repos/asf/cassandra.git

相关链接

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,384评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,845评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,148评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,640评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,731评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,712评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,703评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,473评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,915评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,227评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,384评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,063评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,706评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,302评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,531评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,321评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,248评论 2 352

推荐阅读更多精彩内容