ThreadLocal提供一个线程(Thread)局部变量,访问到某个变量的每一个线程都拥有自己的局部变量。说白了,ThreadLocal就是想在多线程环境下去保证成员变量的安全。
原理是 :
首先set需要获得当前线程对象Thread;然后取出当前线程对象的成员变量ThreadLocalMap;如果ThreadLocalMap存在,那么进行KEY/VALUE设置,
KEY就是ThreadLocal;如果ThreadLocalMap没有,那么创建一个;说白了,当前线程中存在一个map变量,Key是ThreadLocal,VALUE是你设置的值。
基本原理 :
线程本地变量和线程相关的变量,一个线程则一份数据。我们通过ThreadLocal保存的数据最终保存在Thread类的ThreadLocalMap threadLocals变量中。
ThreadLocalMap是一个map结构,其中key为我们声明的ThreadLocal对象,value即为我们使用ThreadLocal保存的线程本地变量。
当我们调用ThreadLocal变量的set方法时,那么为将ThreadLocal作为key,set方法的参数作为value,保存当前线程的threadLocals中,调研get,
回去thread的threadLocals
中去寻找key为threadLocal变量的值。
原理2 :
当使用ThreadLocal维护变量时,ThreadLocal为每个使用该变量的线程提供独立的变量副本,所以每一个线程都可以独立地改变自己的副本,
而不会影响其它线程所对应的副本。
从线程的角度看,目标变量就象是线程的本地变量,这也是类名中“Local”所要表达的意思。
每一个线程都提供了一份变量,因此可以同时访问而互不影响。
理解ThreadLocal中提到的变量副本
“当使用ThreadLocal维护变量时,ThreadLocal为每个使用该变量的线程提供独立的变量副本” —— 并不是通过ThreadLocal.set( )实现的,
而是每个线程使用“new对象”(或拷贝) 的操作来创建对象副本, 通过ThreadLocal.set()将这个新创建的对象的引用保存到各线程的自己的一个map中,
每个线程都有这样一个map,执行ThreadLocal.get()时,各线程从自己的map中取出放进去的对象,因此取出来的是各自自己线程中的对象
(ThreadLocal实例是作为map的key来使用的)。
如果ThreadLocal.set( )进去的对象是多线程共享的同一个对象,那么ThreadLocal.get( )取得的还是这个共享对象本身 ——
那么ThreadLocal还是有并发访问问题的!
Future的get方法在获取结果时候将进入阻塞,阻塞直到Callable中的call返回。
(2)如果不想使得get阻塞,那么可以在get之前加一层判断isDone(),如果完成就获取,如果没有则跳过。
解决线程安全问题,本质上就是解决资源共享问题,一般有以下手段:
1)可重入(不依赖环境);2)互斥(同一时间段只允许一个线程使用);3)原子操作;4)Thread-Local
创建线程池需要使用 ThreadPoolExecutor 类,它的构造函数参数如下:
public ThreadPoolExecutor(int corePoolSize, //核心线程的数量
int maximumPoolSize, //最大线程数量
long keepAliveTime, //超出核心线程数量以外的线程空余存活时间
TimeUnit unit, //存活时间的单位
BlockingQueue<Runnable> workQueue, //保存待执行任务的队列
ThreadFactory threadFactory, //创建新线程使用的工厂
RejectedExecutionHandler handler // 当任务无法执行时的处理器
) {...}
线程池实现原理 :
提交一个任务到线程池中,线程池的处理流程如下:
1、判断线程池里的核心线程是否都在执行任务,如果不是(核心线程空闲或者还有核心线程没有被创建)则创建一个新的工作线程来执行任务。如果核心线程都在执行任务,则进入下个流程。
2、线程池判断工作队列是否已满,如果工作队列没有满,则将新提交的任务存储在这个工作队列里。如果工作队列满了,则进入下个流程。
3、判断线程池里的线程是否都处于工作状态,如果没有,则创建一个新的工作线程来执行任务。如果已经满了,则交给饱和策略来处理这个任务。
线程池工作原理 :
线程池的工作过程如下:
线程池刚创建时,里面没有一个线程。任务队列是作为参数传进来的。不过,就算队列里面有任务,线程池也不会马上执行它们。
当调用 execute() 方法添加一个任务时,线程池会做如下判断:
如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务;
如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列;
如果这时候队列满了,而且正在运行的线程数量小于 maximumPoolSize,那么还是要创建非核心线程立刻运行这个任务;
如果队列满了,而且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会抛出异常RejectExecutionException。
当一个线程完成任务时,它会从队列中取下一个任务来执行。
当一个线程无事可做,超过一定的时间(keepAliveTime)时,线程池会判断,如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉。
所以线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小。
在线程的生命周期中,它要经过新建(New)、就绪(Runnable)、运行(Running)、阻塞(Blocked)和死亡(Dead)5种状态。
阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,
存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。
阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。
JDK7提供了7个阻塞队列。分别是
ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。(默认的)
PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。
DelayQueue:一个使用优先级队列实现的无界阻塞队列。
SynchronousQueue:一个不存储元素的阻塞队列。
LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
总结来说,Lock和synchronized有以下几点不同:
1)Lock是一个接口,而synchronized是Java中的关键字,synchronized是内置的语言实现;
2)synchronized在发生异常时,会自动释放线程占有的锁,因此不会导致死锁现象发生;而Lock在发生异常时,如果没有主动通过unLock()去释放锁,则很可能造成死锁现象,因此使用Lock时需要在finally块中释放锁;
3)Lock可以让等待锁的线程响应中断,而synchronized却不行,使用synchronized时,等待的线程会一直等待下去,不能够响应中断;
4)通过Lock可以知道有没有成功获取锁(tryLock()方法,获取锁失败返回false),而synchronized却无法办到。
5)Lock可以提高多个线程进行读操作的效率。
在性能上来说,如果竞争资源不激烈,两者的性能是差不多的,而当竞争资源非常激烈时(即有大量线程同时竞争),此时Lock的性能要远远优于synchronized。所以说,在具体使用时要根据适当情况选择。
一、Lock和synchronized有以下几点不同:
1)Lock是一个接口,而synchronized是Java中的关键字,synchronized是内置的语言实现,synchronized是在JVM层面上实现的,不但可以通过一些监控工具监控synchronized的锁定,而且在代码执行时出现异常,JVM会自动释放锁定,但是使用Lock则不行,lock是通过代码实现的,要保证锁定一定会被释放,就必须将 unLock()放到finally{} 中;
2)synchronized在发生异常时,会自动释放线程占有的锁,因此不会导致死锁现象发生;而Lock在发生异常时,如果没有主动通过unLock()去释放锁,则很可能造成死锁现象,因此使用Lock时需要在finally块中释放锁;
3)Lock可以让等待锁的线程响应中断,线程可以中断去干别的事务,而synchronized却不行,使用synchronized时,等待的线程会一直等待下去,不能够响应中断;
4)通过Lock可以知道有没有成功获取锁,而synchronized却无法办到。
5)Lock可以提高多个线程进行读操作的效率。
在性能上来说,如果竞争资源不激烈,两者的性能是差不多的,而当竞争资源非常激烈时(即有大量线程同时竞争),此时Lock的性能要远远优于synchronized。所以说,在具体使用时要根据适当情况选择。
举个例子:当有多个线程读写文件时,读操作和写操作会发生冲突现象,写操作和写操作会发生冲突现象,但是读操作和读操作不会发生冲突现象。
但是采用synchronized关键字来实现同步的话,就会导致一个问题:
如果多个线程都只是进行读操作,所以当一个线程在进行读操作时,其他线程只能等待无法进行读操作。
因此就需要一种机制来使得多个线程都只是进行读操作时,线程之间不会发生冲突,通过Lock就可以办到。
另外,通过Lock可以知道线程有没有成功获取到锁。这个是synchronized无法办到的
阿里巴巴的消息队列 :
RocketMQ是一个纯java、分布式、队列模型的开源消息中间件,前身是Metaq
RocketMQ是一款分布式、队列模型的消息中间件,具有以下特点 :
1.能够保证严格的消息顺序。
2.提高丰富的消息拉取模式。
3.高效的订阅者水平扩展能力。
4.实时的消息订阅机制。
5.亿级消息堆积能力。
基本概念 :
Producer : 消息生产者,负责产生消息,一般由业务系统负责产生消息。
Consumer : 消息消费者,负责消费消息,一般是后天系统负责异步消费。
Push Consumer : Consumer的一种,应用通常向Consumer对象注册一个Listener接口,一旦收到消息,Consumer对象立刻回调Listener接口方法。
Pull Consumer : Consumer的一种,应用通常主动调用Consumer的拉消息方法从Broker拉消息,主动权由应用控制。
Producet Group : 一类Product的集合名称,这类Producer通常发送一类消息,且发送逻辑一致。
Consumer Group : 一类Consumer的集合名称,这类Consumer通常消费一类消息,且消费逻辑一致。
Broker : 消息中转角色,负责存储消息,转发消息,一般也称为Server.
长连接 : 在页面嵌入一个隐藏的Iframe,将这个隐藏Iframe的src属性设置为对一个长连接的请求或是采用xhr请求,服务器端就源源不断的向客户的输入数据。
二.RocketMQ网络部署特点
Broker集群有多种配置方式:
1,单Master
优点:除了配置简单没什么优点
缺点:不可靠,该机器重启或宕机,将导致整个服务不可用
2,多Master
优点:配置简单,性能最高
缺点:可能会有少量消息丢失(配置相关),单台机器重启或宕机期间,该机器下未被消费的消息在机器恢复前不可订阅,影响消息实时性
3,多Master多Slave,每个Master配一个Slave,有多对Master-Slave,HA采用异步复制方式,主备有短暂消息延迟,毫秒级
优点:性能同多Master几乎一样,实时性高,主备间切换对应用透明,不需人工干预
缺点:Master宕机或磁盘损坏时会有少量消息丢失
4,多Master多Slave,每个Master配一个Slave,有多对Master-Slave,HA采用同步双写方式,主备都写成功,向应用返回成功
优点:服务可用性与数据可用性非常高
缺点:性能比一部HA略低,当前版本主宕备不能自动切换为主
Master和Slave的配置文件参考conf目录下的配置文件
Master与Slave通过指定相同的brokerName参数来配对,Master的BrokerId必须是0,Slave的BrokerId必须是大于0的数
一个Master下面可以挂载多个Slave,同一Master下的多个Slave通过指定不同的BrokerId来区分
部署结构 :
(1)NameServer是一个几乎无状态的节点,可集群部署,节点之间无任何信息同步
(2)Broker部署相对复杂,Broker氛围Master与Slave,一个Master可以对应多个Slaver,但是一个Slaver只能对应一个Master,Master与Slaver的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId为0表示Master,非0表示Slaver。Master可以部署多个。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有的NameServer
(3)Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。Produce完全无状态,可集群部署
(4)Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer取Topic路由信息,并向提供Topic服务的Master、Slaver建立长连接,且定时向Master、Slaver发送心跳。Consumer即可从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定
一、 RocketMQ概述
1. MessageFilter
1) Broker端消息过滤
在Broker中,按照Consumer的要求做过滤,优点是减少了对于Consumer无用消息的网络传输。缺点是增加了Broker的负担,实现相对复杂。
² 淘宝Notify支持多种过滤方式,包含直接按照消息类型过滤,灵活的语法表达式过滤,几乎可以满足最苛刻的过滤需求。
² RocketMQ支持按照简单的Message Tag过滤,也支持按照Message Header、body进行过滤。
² CORBA Notification规范中也支持灵活的语法表达式过滤。
2) Consumer端消息过滤
这种过滤方式可由应用完全自定义实现,但是缺点是很多无用的消息要传输到Consumer端。
2. 消息可靠性
影响消息可靠性的几种情况:
² Broker正常关闭
² Broker异常Crash
² OSCrash
² 机器掉电,但是能立即恢复供电情况。
² 机器无法开机(可能是cpu、主板、内存等关键设备损坏)
² 磁盘设备损坏。
前四种情况都属于硬件资源可立即恢复情况,RocketMQ在这四种情况下能保证消息不丢,或者丢失少量数据(依赖刷盘方式是同步还是异步)。后两种属于单点故障,且无法恢复,一旦发生,在此单点上的消息全部丢失。RocketMQ在这两种情况下,通过异步复制,可保证99%的消息不丢,但是仍然会有极少量的消息可能丢失。通过同步双写技术可以完全避免单点,同步双写势必会影响性能,适合对消息可靠性要求极高的场合,例如与Money相关的应用。
RocketMQ从3.0版本开始支持同步双写。
3. 消息原语
1) At most once
最多一次,发送一次,无论成败,将不会重发。
2) At least Once
消息投递后,如果未能收到ack,则再次投递。
每个消息必须投递一次RocketMQ Consumer先pull消息到本地,消费完成后,才向服务器返回ack,如果没有消费一定不会ack消息,所以RocketMQ可以很好的支持此特性。
3) Exactly Only Once
² 发送消息阶段,不允许发送重复的消息。
² 消费消息阶段,不允许消费重复的消息。
只有以上两个条件都满足情况下,才能认为消息是“Exactly Only Once”,而要实现以上两点,在分布式系统环境下,不可避免要产生巨大的开销。所以RocketMQ为了追求高性能,并不保证此特性,要求在业务上进行去重,也就是说消费消息要做到幂等性。RocketMQ虽然不能严格保证不重复,但是正常情况下很少会出现重复发送、消费情况,只有网络异常,Consumer启停等异常情况下会出现消息重复。
此问题的本质原因是网络调用存在不确定性,即既不成功也不失败的第三种状态,所以才产生了消息重复性问题。
4. 回溯消费
是指Consumer已经消费成功的消息,由于业务上需求需要重新消费,要支持此功能,Broker在向Consumer投递成功消息后,消息仍然需要保留。并且重新消费一般是按照时间维度,例如由于Consumer系统故障,恢复后需要重新消费1小时前的数据,那么Broker要提供一种机制,可以按照时间维度来回退消费进度。RocketMQ支持按照时间回溯消费,时间维度精确到毫秒,可以向前回溯,也可以向后回溯。
5. 消息堆积
消息中间件的主要功能是异步解耦,还有个重要功能是挡住前端的数据洪峰,保证后端系统的稳定性,这就要求消息中间件具有一定的消息堆积能力,消息堆积分以下两种情况:
消息堆积在内存Buffer,一旦超过内存Buffer,可以根据一定的丢弃策略来丢弃消息,如CORBA Notification规范中描述。适合能容忍丢弃消息的业务,这种情况消息的堆积能力主要在于内存Buffer大小,而且消息堆积后,性能下降不会太大,因为内存中数据多少对于对外提供的访问能力影响有限。
消息堆积到持久化存储系统中,例如DB,KV存储,文件记录形式。当消息不能在内存Cache命中时,要不可避免的访问磁盘,会产生大量读IO,读IO的吞吐量直接决定了消息堆积后的访问能力。
评估消息堆积能力主要有以下四点:
² 消息能堆积多少条,多少字节?即消息的堆积容量。
² 消息堆积后,发消息的吞吐量大小,是否会受堆积影响?
² 消息堆积后,正常消费的Consumer是否会受影响?
² 消息堆积后,访问堆积在磁盘的消息时,吞吐量有多大?
6. 分布式事务
常见的分布式事务解决方案有:最终一致性,两阶段/三界阶段提交,TCC,本地消息表等。这些解决方案中,最终以执行的性能最好。可以通过RocketMQ实现最终一致性。
分布式事务涉及到两阶段提交问题,在数据存储方面的方面必然需要KV存储的支持,因为第二阶段的提交回滚需要修改消息状态,一定涉及到根据Key去查找Message的动作。RocketMQ在第二阶段绕过了根据Key去查找Message的问题,采用第一阶段发送Prepared消息时,拿到了消息的Offset,第二阶段通过Offset去访问消息,并修改状态,Offset就是数据的地址。
RocketMQ这种实现事务方式,没有通过KV存储做,而是通过Offset方式,存在一个显著缺陷,即通过Offset更改数据,会令系统的脏页过多,需要特别关注。
7. 定时消息
定时消息是指消息发到Broker后,不能立刻被Consumer消费,要到特定的时间点或者等待特定的时间后才能被消费。
如果要支持任意的时间精度,在Broker层面,必须要做消息排序,如果再涉及到持久化,那么消息排序要不可避免的产生巨大性能开销。
RocketMQ支持定时消息,但是不支持任意时间精度,支持特定的level,例如定时5s,10s,1m等。
8. 消息重试
Consumer消费消息失败后,要提供一种重试机制,令消息再消费一次。Consumer消费消息失败通常可以认为有以下几种情况
² 由于消息本身的原因,例如反序列化失败,消息数据本身无法处理(例如话费充值,当前消息的手机号被注销,无法充值)等。因为这条失败的消息即使立刻重试消费,99%也不成功,所以通常需要跳过这条消息,接着消费其他消息。
² 由于依赖的下游应用服务不可用,例如db连接不可用,外系统网络不可达等。遇到这种错误,即使跳过当前失败的消息,消费其他消息同样也会报错。这种情况建议应用sleep30s,再消费下一条消息,这样可以减轻Broker重试消息的压力。
JAVA反射机制 :
1.在运行状态中,对于任意一个类,都能够知道这个类的所有属性和方法;对于任意一个对象,都能够调用它的任意一个方法和属性;这种动态获取的信息以及动态调用
对象的功能称之为java言语的反射机制。
JAVA反射机制主要提供了以下功能 :
1.在运行时判断任意一个对象所属的类;在运行时构造任意一个类的对象;在运行时判断任意一个雷具有的成员变量和方法;在运行时调用任意一个对象的方法;生成动态代理。
Java有个Ojbect类,是所有Java类的继承根源,其内声明了数个应该在所有Java类中被改写的方法;其中getClass()返回一个class对象。
Class 类十分特殊。它和一般类一样继承自Object,其实体用以表达Java程序运行时的classes和interfaces,也用来表达enum、array、primitive Java types
(boolean, byte, char, short, int, long, float, double)以及关键词void。当一个class被加载,或当加载器(class loader)的defineClass()被JVM调用,
JVM 便自动产生一个Class 对象。
Java的反射机制的实现要借助于4个类:class,Constructor,Field,Method;其中class代表的时类对 象,Constructor-类的构造器对象,Field-类的属性对象,
Method-类的方法对象。通过这四个对象我们可以粗略的看到一个类的各个组 成部分 。
解决HashMap在多线程并发访问时的问题 :1.HashTable;2.Collections.synchronizedMap();3.ConcurrentHashMap;
Collections.synchronizedMap()与ConcurrentHashMap主要区别是:Collections.synchronizedMap()和Hashtable一样,实现上在调用map所有方法时,都对整个map进行同步,而ConcurrentHashMap的实现却更加精细,它对map中的所有桶加了锁。所以,只要要有一个线程访问map,其他线程就无法进入map,而如果一个线程在访问ConcurrentHashMap某个桶时,其他线程,仍然可以对map执行某些操作。这样,ConcurrentHashMap在性能以及安全性方面,明显比Collections.synchronizedMap()更加有优势。同时,同步操作精确控制到桶,所以,即使在遍历map时,其他线程试图对map进行数据修改,也不会抛出ConcurrentModificationException。
HashMap的原理 :
简单地说,HashMap 在底层将 key-value 当成一个整体进行处理,这个整体就是一个 Entry 对象。HashMap 底层采用一个 Entry[] 数组来保存所有的 key-value 对,当需要存储一个 Entry 对象时,会根据hash算法来决定其在数组中的存储位置,在根据equals方法决定其在该数组位置上的链表中的存储位置;当需要取出一个Entry时,
也会根据hash算法找到其在数组中的存储位置,再根据equals方法从该位置上的链表中取出该Entry。
简单来说,HashMap由数组+链表组成的,数组是HashMap的主体,链表则是主要为了解决哈希冲突而存在的,如果定位到的数组位置不含链表
(当前entry的next指向null),那么对于查找,添加等操作很快,仅需一次寻址即可;如果定位到的数组包含链表,对于添加操作,其时间复杂度依然为O(1),
因为最新的Entry会插入链表头部,急需要简单改变引用链即可,而对于查找操作来讲,此时就需要遍历链表,然后通过key对象的equals方法逐一比对查找。
所以,性能考虑,HashMap中的链表出现越少,性能才会越好。
简单地说,HashMap 在底层将 key-value 当成一个整体进行处理,这个整体就是一个 Entry 对象。HashMap 底层采用一个 Entry[] 数组来保存所有的
key-value 对,当需要存储一个 Entry 对象时,会根据hash算法来决定其在数组中的存储位置,在根据equals方法决定其在该数组位置上的链表中的存储位置;
当需要取出一个Entry时,也会根据hash算法找到其在数组中的存储位置,再根据equals方法从该位置上的链表中取出该Entry。
HashMap的底层通过位桶实现,位桶里面存的是链表(1.7以前)或者红黑树(有序,1.8开始) ,其实就是数组加链表(或者红黑树)的格式,
通过判断hashCode定位位桶中的下标,通过equals定位目标值在链表中的位置,所以如果你使用的key使用可变类(非final修饰的类),
那么你在自定义hashCode和equals的时候一定要注意要满足:如果两个对象equals那么一定要hashCode相同,如果是hashCode相同的话不一定要求equals!
所以一般来说不要自定义hashCode和equls,推荐使用不可变类对象做key,比如Integer、String等等。
ConcurrentHashMap的原理 :
ConcurrentHashMap的锁分段技术
HashTable容器在竞争激烈的并发环境下表现出效率低下的原因,是因为所有访问HashTable的线程都必须竞争同一把锁,那假如容器里有多把锁,
每一把锁用于锁容器其中一部分数据,那么当多线程访问容器里不同数据段的数据时,线程间就不会存在锁竞争,从而可以有效的提高并发访问效率,
这就是ConcurrentHashMap所使用的锁分段技术,首先将数据分成一段一段的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,
其他段的数据也能被其他线程访问。
ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。Segment是一种可重入锁ReentrantLock,在ConcurrentHashMap里扮演锁的角色,
HashEntry则用于存储键值对数据。一个ConcurrentHashMap里包含一个Segment数组,Segment的结构和HashMap类似,是一种数组和链表结构,
一个Segment里包含一个HashEntry数组,每个HashEntry是一个链表结构的元素, 每个Segment守护者一个HashEntry数组里的元素,
当对HashEntry数组的数据进行修改时,必须首先获得它对应的Segment锁。
Sleep和wait() :
sleep 是线程类(Thread)的方法,导致此线程暂停执行指定时间,给执行机会给其他线程,但是监控状态依然保持,到时后会自动恢复,调用sleep 不会释放对象锁。由于没有释放对象锁,所以不能调用里面的同步方法。
sleep()使当前线程进入停滞状态(阻塞当前线程),让出CUP的使用、目的是不让当前线程独自霸占该进程所获的CPU资源,以留一定时间给其他线程执行的机会;
sleep()是Thread类的Static(静态)的方法;因此他不能改变对象的机锁,所以当在一个Synchronized块中调用Sleep()方法是,线程虽然休眠了,但是对象的机锁并木有被释放,其他线程无法访问这个对象(即使睡着也持有对象锁)。
在sleep()休眠时间期满后,该线程不一定会立即执行,这是因为其它线程可能正在运行而且没有被调度为放弃执行,除非此线程具有更高的优先级。
wait()方法是Object类里的方法;当一个线程执行到wait()方法时,它就进入到一个和该对象相关的等待池中,同时失去(释放)了对象的机锁(暂时失去机锁,wait(long timeout)超时时间到后还需要返还对象锁);可以调用里面的同步方法,其他线程可以访问;
wait()使用notify或者notifyAlll或者指定睡眠时间来唤醒当前等待池中的线程。
wiat()必须放在synchronized block中,否则会在program runtime时扔出”java.lang.IllegalMonitorStateException“异常。
二
sleep必须捕获异常,而wait,notify和notifyAll不需要捕获异常
sleep方法属于Thread类中方法,表示让一个线程进入睡眠状态,等待一定的时间之后,自动醒来进入到可运行状态,不会马上进入运行状态,因为线程调度机制恢复线程的运行也需要时间,一个线程对象调用了sleep方法之后,并不会释放他所持有的所有对象锁,所以也就不会影响其他进程对象的运行。但在sleep的过程中过程中有可能被其他对象调用它的interrupt(),产生InterruptedException异常,如果你的程序不捕获这个异常,线程就会异常终止,进入TERMINATED状态,如果你的程序捕获了这个异常,那么程序就会继续执行catch语句块(可能还有finally语句块)以及以后的代码。
注意sleep()方法是一个静态方法,也就是说他只对当前对象有效,通过t.sleep()让t对象进入sleep,这样的做法是错误的,它只会是使当前线程被sleep 而不是t线程
wait属于Object的成员方法,一旦一个对象调用了wait方法,必须要采用notify()和notifyAll()方法唤醒该进程;如果线程拥有某个或某些对象的同步锁,那么在调用了wait()后,这个线程就会释放它持有的所有同步资源,而不限于这个被调用了wait()方法的对象。wait()方法也同样会在wait的过程中有可能被其他对象调用interrupt()方法而产生
三
这两者的施加者是有本质区别的.
sleep()是让某个线程暂停运行一段时间,其控制范围是由当前线程决定,也就是说,在线程里面决定.好比如说,我要做的事情是 "点火->烧水->煮面",而当我点完火之后我不立即烧水,我要休息一段时间再烧.对于运行的主动权是由我的流程来控制.
而wait(),首先,这是由某个确定的对象来调用的,将这个对象理解成一个传话的人,当这个人在某个线程里面说"暂停!",也是 thisOBJ.wait(),这里的暂停是阻塞,还是"点火->烧水->煮饭",thisOBJ就好比一个监督我的人站在我旁边,本来该线 程应该执行1后执行2,再执行3,而在2处被那个对象喊暂停,那么我就会一直等在这里而不执行3,但正个流程并没有结束,我一直想去煮饭,但还没被允许, 直到那个对象在某个地方说"通知暂停的线程启动!",也就是thisOBJ.notify()的时候,那么我就可以煮饭了,这个被暂停的线程就会从暂停处 继续执行.
其实两者都可以让线程暂停一段时间,但是本质的区别是一个线程的运行状态控制,一个是线程之间的通讯的问题
在java.lang.Thread类中,提供了sleep(),
而java.lang.Object类中提供了wait(), notify()和notifyAll()方法来操作线程
sleep()可以将一个线程睡眠,参数可以指定一个时间。
而wait()可以将一个线程挂起,直到超时或者该线程被唤醒。
wait有两种形式wait()和wait(milliseconds).
sleep和wait的区别有:
1,这两个方法来自不同的类分别是Thread和Object
2,最主要是sleep方法没有释放锁,而wait方法释放了锁,使得其他线程可以使用同步控制块或者方法。
3,wait,notify和notifyAll只能在同步控制方法或者同步控制块里面使用,而sleep可以在
任何地方使用
synchronized(x){
x.notify()
//或者wait()
}
4,sleep必须捕获异常,而wait,notify和notifyAll不需要捕获异常
1、Java都有哪些锁?
公平锁/非公平锁
可重入锁
独享锁/共享锁
互斥锁/读写锁
乐观锁/悲观锁
分段锁
偏向锁/轻量级锁/重量级锁
自旋锁
Java实现锁有两种语法,一种是synchronized语句,另外一种是reentrantlock关键字。上面是很多锁的名词,这些分类并不是全是指锁的状态,有的指锁的特性,有的指锁的设计,下面总结的内容是对每个锁的名词进行一定的解释。
公平锁/非公平锁
公平锁指多个线程按照申请锁的顺序获得锁。
非公平锁指多个线程获得锁的顺序不按照申请顺序。
Java reentranthlock通过构造函数来指定锁是公平还是非公平,默认是非公平锁,对于synchronized而言,也是一种非公平锁。
非公平锁优点在于吞吐量比公平锁大。
可重入锁
可重入锁又叫递归锁,是指同一个线程在外层方法获取锁的时候,在进入内层方法会自动获取锁。举例如下:
synchronized void setA() throws Exception{
Thread.sleep(1000);
setB();
}
synchronized void setB() throws Exception{
Thread.sleep(1000);
}
Java reentrantlock是一个可重入锁。(上面的代码就是一个可重入锁的一个特点,如果不是可重入锁的话,setB可能不会被当前线程执行,可能造成死锁)
Synchronized也是一个可重入锁。
可重入锁的优点是可以一定程度避免死锁。
独享锁/共享锁
顾名思义,独享锁是指该锁一次只能被一个线程所持有,共享锁可以被多个线程所持有。
Java reentrantlock是一个独享锁,但是对于lock的另一个实现readwritelock,其读锁是一个共享锁,写锁是一个独享锁。
对于synchronized是一个独享锁。
互斥锁/读写锁
上边说的独享锁和共享锁是一种广义的说法,互斥锁和读写锁就是具体实现。
互斥锁在Java中具体实现就是reentrantlock。
读写锁在Java中的具体实现就是readwritelock。
乐观锁/悲观锁
乐观锁和悲观锁不是指具体的锁类型,而是对于看待并发编程中加锁问题的角度。
悲观锁认为,对于一个数据的并发操作,一定会改变数据,即使实际上数据没被改变,但是也悲观的认为被改变的可能性比较大,一定要加锁,不加锁早晚要出问题。
乐观锁认为,对于一个数据的并发操作,是不会改变数据的,不加锁也不会出问题。
乐观锁指java中的无所编程,适合读操作非常多的场景。
悲观锁就是指java中,适合并发下写非常多的场景。
自旋锁
在java中,自旋锁是指常识获取锁的线程不会立即阻塞,而是采用循环的方式去尝试获取锁,当循环条件被其他线程改变时,才能进入临界区。
这样的好处是减少线程上下文切换的消耗,缺点是会消耗CPU。
public class SpinLock {
private AtomicReference<Thread> sign =new AtomicReference<>();
public void lock(){
Thread current = Thread.currentThread();
while(!sign .compareAndSet(null, current)){
}
}
public void unlock (){
Thread current = Thread.currentThread();
sign .compareAndSet(current, null);
}
}
使用了CAS原子操作,lock函数将owner设置为当前线程,并且预测原来的值为空。unlock函数将owner设置为null,并且预测值为当前线程。
当有第二个线程调用lock操作时由于owner值不为空,导致循环一直被执行,直至第一个线程调用unlock函数将owner设置为null,第二个线程才能进入临界区。
由于自旋锁只是将当前线程不停地执行循环体,不进行线程状态的改变,所以响应速度更快。但当线程数不停增加时,性能下降明显,因为每个线程都需要执行,占用CPU时间。如果线程竞争不激烈,并且保持锁的时间段。适合使用自旋锁。
注:该例子为非公平锁,获得锁的先后顺序,不会按照进入lock的先后顺序进行。
偏向锁/轻量级锁/重量级锁
这三种锁,就是指锁的状态,针对synchronized。
偏向锁是指一段代码一直被一个线程所访问,那么理论上,这个线程会自动获取这个锁,并一直拥有这个锁,这样就降低了获取锁的代价。
轻量级锁是指当偏向锁的状态下,被另一个线程访问,偏向锁就会升级为轻量级锁,其他线程会通过自旋形式尝试获取锁,不会阻塞,提高效率。
重量级锁是指在轻量级锁的状态下,另一个线程虽然自旋,但自选不会一直持续下去,当自旋一定次数的时候还没有获取到锁的话,就会进入阻塞,该锁就会膨胀为重量级锁,重量级锁会让其他申请的线程陷入阻塞,降低性能。
Java中的锁分类
公平锁/非公平锁
可重入锁
独享锁/共享锁
互斥锁/读写锁
乐观锁/悲观锁
分段锁
偏向锁/轻量级锁/重量级锁
自旋锁
上面是很多锁的名词,这些分类并不是全是指锁的状态,有的指锁的特性,有的指锁的设计,下面总结的内容是对每个锁的名词进行一定的解释。
公平锁/非公平锁
公平锁是指多个线程按照申请锁的顺序来获取锁。
非公平锁是指多个线程获取锁的顺序并不是按照申请锁的顺序,有可能后申请的线程比先申请的线程优先获取锁。有可能,会造成优先级反转或者饥饿现象。
对于Java ReentrantLock而言,通过构造函数指定该锁是否是公平锁,默认是非公平锁。非公平锁的优点在于吞吐量比公平锁大。
对于Synchronized而言,也是一种非公平锁。由于其并不像ReentrantLock是通过AQS的来实现线程调度,所以并没有任何办法使其变成公平锁。
可重入锁
可重入锁又名递归锁,是指在同一个线程在外层方法获取锁的时候,在进入内层方法会自动获取锁。说的有点抽象,下面会有一个代码的示例。
对于Java ReentrantLock而言, 他的名字就可以看出是一个可重入锁,其名字是Re entrant Lock重新进入锁。
对于Synchronized而言,也是一个可重入锁。可重入锁的一个好处是可一定程度避免死锁。
synchronized void setA() throws Exception{
Thread.sleep(1000);
setB();
}
synchronized void setB() throws Exception{
Thread.sleep(1000);
}
上面的代码就是一个可重入锁的一个特点,如果不是可重入锁的话,setB可能不会被当前线程执行,可能造成死锁。
独享锁/共享锁
独享锁是指该锁一次只能被一个线程所持有。
共享锁是指该锁可被多个线程所持有。
对于Java ReentrantLock而言,其是独享锁。但是对于Lock的另一个实现类ReadWriteLock,其读锁是共享锁,其写锁是独享锁。
读锁的共享锁可保证并发读是非常高效的,读写,写读 ,写写的过程是互斥的。
独享锁与共享锁也是通过AQS来实现的,通过实现不同的方法,来实现独享或者共享。
对于Synchronized而言,当然是独享锁。
互斥锁/读写锁
上面讲的独享锁/共享锁就是一种广义的说法,互斥锁/读写锁就是具体的实现。
互斥锁在Java中的具体实现就是ReentrantLock
读写锁在Java中的具体实现就是ReadWriteLock
乐观锁/悲观锁
乐观锁与悲观锁不是指具体的什么类型的锁,而是指看待并发同步的角度。
悲观锁认为对于同一个数据的并发操作,一定是会发生修改的,哪怕没有修改,也会认为修改。因此对于同一个数据的并发操作,悲观锁采取加锁的形式。悲观的认为,不加锁的并发操作一定会出问题。
乐观锁则认为对于同一个数据的并发操作,是不会发生修改的。在更新数据的时候,会采用尝试更新,不断重新的方式更新数据。乐观的认为,不加锁的并发操作是没有事情的。
从上面的描述我们可以看出,悲观锁适合写操作非常多的场景,乐观锁适合读操作非常多的场景,不加锁会带来大量的性能提升。
悲观锁在Java中的使用,就是利用各种锁。
乐观锁在Java中的使用,是无锁编程,常常采用的是CAS算法,典型的例子就是原子类,通过CAS自旋实现原子操作的更新。
分段锁
分段锁其实是一种锁的设计,并不是具体的一种锁,对于ConcurrentHashMap而言,其并发的实现就是通过分段锁的形式来实现高效的并发操作。
我们以ConcurrentHashMap来说一下分段锁的含义以及设计思想,ConcurrentHashMap中的分段锁称为Segment,它即类似于HashMap(JDK7与JDK8中HashMap的实现)的结构,即内部拥有一个Entry数组,数组中的每个元素又是一个链表;同时又是一个ReentrantLock(Segment继承了ReentrantLock)。
当需要put元素的时候,并不是对整个hashmap进行加锁,而是先通过hashcode来知道他要放在那一个分段中,然后对这个分段进行加锁,所以当多线程put的时候,只要不是放在一个分段中,就实现了真正的并行的插入。
但是,在统计size的时候,可就是获取hashmap全局信息的时候,就需要获取所有的分段锁才能统计。
分段锁的设计目的是细化锁的粒度,当操作不需要更新整个数组的时候,就仅仅针对数组中的一项进行加锁操作。
偏向锁/轻量级锁/重量级锁
这三种锁是指锁的状态,并且是针对Synchronized。在Java 5通过引入锁升级的机制来实现高效Synchronized。这三种锁的状态是通过对象监视器在对象头中的字段来表明的。
偏向锁是指一段同步代码一直被一个线程所访问,那么该线程会自动获取锁。降低获取锁的代价。
轻量级锁是指当锁是偏向锁的时候,被另一个线程所访问,偏向锁就会升级为轻量级锁,其他线程会通过自旋的形式尝试获取锁,不会阻塞,提高性能。
重量级锁是指当锁为轻量级锁的时候,另一个线程虽然是自旋,但自旋不会一直持续下去,当自旋一定次数的时候,还没有获取到锁,就会进入阻塞,该锁膨胀为重量级锁。重量级锁会让其他申请的线程进入阻塞,性能降低。
自旋锁
在Java中,自旋锁是指尝试获取锁的线程不会立即阻塞,而是采用循环的方式去尝试获取锁,这样的好处是减少线程上下文切换的消耗,缺点是循环会消耗CPU。
典型的自旋锁实现的例子,可以参考自旋锁的实现
MyBatis的批量插入 :
Foreach标签
foreach:
collection:指定要遍历的集合;
表示传入过来的参数的数据类型。该参数为必选。要做 foreach 的对象,作为入参时,List 对象默认用 list 代替作为键,
数组对象有 array 代替作为键,Map 对象没有默认的键。当然在作为入参时可以使用 @Param(“keyName”) 来设置键,
设置 keyName 后,list,array 将会失效
item:将当前遍历出的元素赋值给指定的变量,然后用#{变量名},就能取出变量的值,也就是当前遍历出的元素
separator:每个元素之间的分隔符, select * from Emp where id in(1,2,3)相当于1,2,3之间的","
open:以为开始
closse:以什么结束
Index:索引,遍历list的时候index就是索引,遍历map的时候index表示的就是map的key,item就是map的值.
DAO接口中定义的方法:
public interface EmpMapper {
//使用foreach
public List<Emp> getEmpsByConditionLike(@Param("ids")List<Integer> ids);
}
@Param("ids") 将入参改名为ids 在标签 froeach中使用
映射文件:
<mapper namespace="cn.bdqn.mybatis.dao.EmpMapper">
<select id="getEmpsByConditionLike" resultType="cn.bdqn.mybatis.been.Emp">
select * from Emp
where id in
<foreach collection="ids" item="item_id" open="(" separator=","
close=")">
#{item_id}
</foreach>
</select>
</mapper>
测试代码:
public static void main(String[] args) throws IOException {
String resource="mybatis-config.xml";
InputStream resourceAsStream = Resources.getResourceAsStream(resource);
SqlSessionFactory sessionFactory = new SqlSessionFactoryBuilder().build(resourceAsStream);
SqlSession session = sessionFactory.openSession();
EmpMapper mapper = session.getMapper(EmpMapper.class);
List list = new ArrayList();
list.add(1);
list.add(2);
List<Emp> emps=mapper.getEmpsByConditionLike(list);
for (Emp emp2 : emps) {
System.out.println(emp2);
}
Foreach批量插入:
映射配置文件
<!--
foreach向数据库中批量插入记录
public void insertSum(@Param("emps")List<Emp> emps );
-->
<insert id="insertSum">
insert into emp(last_name,email,gender,d_id) values
<foreach collection="emps" item="emp" separator="," >
(#{emp.last_name},#{emp.email},#{emp.gender},#{emp.d_id})
</foreach>
</insert>
Mybatis 插入与批量插入以及多参数批量删除
实体类:
复制代码
import java.io.Serializable;
public class AttachmentTable implements Serializable {
private static final long serialVersionUID = 8325882509007088323L;
private Integer id;
// 附件名称
private String name;
// 日志ID
private Integer logid;
// 附件URL
private String url;
// getter/setter.......
}
复制代码
Mapper接口:
复制代码
import java.util.List;
import model.AttachmentTable;
public interface AttachmentTableMapper {
int insert(AttachmentTable record);
void insertByBatch(List<AttachmentTable> attachmentTables);
}
复制代码
Mapper.xml:
复制代码
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="mapper.AttachmentTableMapper">
<resultMap id="BaseResultMap" type="model.AttachmentTable">
<id column="id" jdbcType="INTEGER" property="id" />
<result column="name" jdbcType="VARCHAR" property="name" />
<result column="logID" jdbcType="INTEGER" property="logid" />
</resultMap>
<resultMap extends="BaseResultMap" id="ResultMapWithBLOBs" type="model.AttachmentTable">
<result column="url" jdbcType="LONGVARCHAR" property="url" />
</resultMap>
<sql id="Base_Column_List">
id, name, logID
</sql>
<sql id="Blob_Column_List">
url
</sql>
<insert id="insert" parameterType="model.AttachmentTable">
insert into attachment_table (id, name, logID,url)
values (#{id,jdbcType=INTEGER}, #{name,jdbcType=VARCHAR}, #{logid,jdbcType=INTEGER},#{url,jdbcType=LONGVARCHAR})
</insert>
<insert id="insertByBatch" parameterType="java.util.List">
insert into attachment_table (name, logID,url)
values
<foreach collection="list" item="item" index="index" separator=",">
(#{item.name,jdbcType=VARCHAR}, #{item.logid,jdbcType=INTEGER},#{item.url,jdbcType=LONGVARCHAR})
</foreach>
</insert>
</mapper>
复制代码
【注:标红的地方是需要注意的地方,我第一次做时直接“#{name,jdbcType=VARCHAR}”,没有加前缀“item”,导致报错“找不到name”】
(二)多参数批量删除示例
package com.vrv.linkdood.app.workreport.demomodule.mapper;import org.apache.ibatis.annotations.Param;public interface AttachmentTableMapper {
void deleteByLogIdAndNames(@Param("logid") Integer logID, @Param("names") String[] names);
}
复制代码
<delete id="deleteByLogIdAndNames">
delete from attachment_table
where logid = #{logid,jdbcType=INTEGER} AND NAME IN
<foreach collection="names" item="item" index="index" open="(" close=")" separator=",">
#{item}
</foreach>
</delete>
复制代码
属性 描述
item 循环体中的具体对象。支持属性的点路径访问,如item.age,item.info.details。
具体说明:在list和数组中是其中的对象,在map中是value。
该参数为必选。
collection
要做foreach的对象,作为入参时,List<?>对象默认用list代替作为键,数组对象有array代替作为键,Map对象没有默认的键。
当然在作为入参时可以使用@Param("keyName")来设置键,设置keyName后,list,array将会失效。 除了入参这种情况外,还有一种作为参数对象的某个字段的时候。举个例子:
如果User有属性List ids。入参是User对象,那么这个collection = "ids"
如果User有属性Ids ids;其中Ids是个对象,Ids有个属性List id;入参是User对象,那么collection = "ids.id"
上面只是举例,具体collection等于什么,就看你想对那个元素做循环。
该参数为必选。
separator 元素之间的分隔符,例如在in()的时候,separator=","会自动在元素中间用“,“隔开,避免手动输入逗号导致sql错误,如in(1,2,)这样。该参数可选。
open foreach代码的开始符号,一般是(和close=")"合用。常用在in(),values()时。该参数可选。
close foreach代码的关闭符号,一般是)和open="("合用。常用在in(),values()时。该参数可选。
index 在list和数组中,index是元素的序号,在map中,index是元素的key,该参数可选。
redis实现计数器和排行榜 ;
Redis的所有操作都是原子性的。因此不会出现并发时导致计数错误。
INCR命令用于由一个递增key的整数值。如果该key不存在,它被设置为0执行操作之前。如果key包含了错误类型的值或包含不能被表示为整数,字符串,则返回错误。该操作被限制为64位带符号整数。
incr
用法
incr key,可以将key值原子自增1,并返回递增操作后key对应的新值。如果指定的key不存在,那么在执行incr操作之前,会先将它的值设定为0。
/*测试前,清除当前数据库所有key*/
127.0.0.1:6379> flushDB
OK
/*没有key*/
127.0.0.1:6379> keys *
(empty list or set)
/*使用incr 一个不存在的key,有返回为1(如果指定的key不存在,那么在执行incr操作之前,会先将它的值设定为0,并返回自增后的值1)*/
127.0.0.1:6379> incr incrKey
(integer) 1
127.0.0.1:6379> get incrKey
"1"
/*自增1,返回增加后的值2*/
127.0.0.1:6379> incr incrKey
(integer) 2
127.0.0.1:6379> get incrKey
"2"
getset
getset key value 会将value设置为key的值,但是返回的是key原来的值。如果key存在但是对应的value不是字符串,就返回错误。如果之前Key不存在将返回nil。
127.0.0.1:6379> flushDB
OK
127.0.0.1:6379> keys *
(empty list or set)
/*使用incr实现计数器自增,使用getset可以重置为0*/
127.0.0.1:6379> incr testKey
(integer) 1
127.0.0.1:6379> incr testKey
(integer) 2
127.0.0.1:6379> getset testKey 0
"2"
127.0.0.1:6379> get testKey
"0"
/*key不存在返回nil*/
127.0.0.1:6379> getset testKey2 0
(nil)
redis实现排行榜 :
使用有序集合,你可以非常快地(O(log(N)))完成添加,删除和更新元素的操作。 因为元素是在插入时就排好序的,所以很快地通过评分(score)或者 位次(position)获得一个范围的元素。 访问有序集合的中间元素同样也是非常快的,因此你可以使用有序集合作为一个没用重复成员的智能列表。 在这个列表中, 你可以轻易地访问任何你需要的东西: 有序的元素,快速的存在性测试,快速访问集合中间元素!
简而言之,使用有序集合你可以很好地完成 很多在其他数据库中难以实现的任务。
at first,我们先添加一下数据:
[plain] view plain copy
> zadd member_list 10 a 3 b 1 c 4 d 7 e
(integer) 5
#返回5,即成功加了5个进入集合。现在试试添加重复的元素
> zadd member_list 9 a 8 f
(integer) 1
#这个步骤,返回了1,而不是2,是因为a这个元素已经存在于集合当中了,不会添加成功。
> zrange member_list 0 6 WITHSCORES
1) "c"
2) 1.0
3) "b"
4) 3.0
5) "d"
6) 4.0
7) "e"
8) 7.0
9) "f"
10) 8.0
11) "a"
12) 9.0
可以看到,有序集合排序的规则是根据分母的大小,分母越小排在集合的前面
如下表显示与排序集的一些基本命令:
S.N. 命令& 描述
1 ZADD key score1 member1 [score2 member2]
添加一个或多个成员到有序集合,或者如果它已经存在更新其分数
2 ZCARD key
得到的有序集合成员的数量
3 ZCOUNT key min max
计算一个有序集合成员与给定值范围内的分数
4 ZINCRBY key increment member
在有序集合增加成员的分数
5 ZINTERSTORE destination numkeys key [key ...]
多重交叉排序集合,并存储生成一个新的键有序集合。
6 ZLEXCOUNT key min max
计算一个给定的字典范围之间的有序集合成员的数量
7 ZRANGE key start stop [WITHSCORES]
由索引返回一个成员范围的有序集合。
8 ZRANGEBYLEX key min max [LIMIT offset count]
返回一个成员范围的有序集合(由字典范围)
9 ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT]
按分数返回一个成员范围的有序集合。
10 ZRANK key member
确定成员的索引中有序集合
11 ZREM key member [member ...]
从有序集合中删除一个或多个成员
12 ZREMRANGEBYLEX key min max
删除所有成员在给定的字典范围之间的有序集合
13 ZREMRANGEBYRANK key start stop
在给定的索引之内删除所有成员的有序集合
14 ZREMRANGEBYSCORE key min max
在给定的分数之内删除所有成员的有序集合
15 ZREVRANGE key start stop [WITHSCORES]
返回一个成员范围的有序集合,通过索引,以分数排序,从高分到低分
16 ZREVRANGEBYSCORE key max min [WITHSCORES]
返回一个成员范围的有序集合,按分数,以分数排序从高分到低分
17 ZREVRANK key member
确定一个有序集合成员的索引,以分数排序,从高分到低分
18 ZSCORE key member
获取给定成员相关联的分数在一个有序集合
19 ZUNIONSTORE destination numkeys key [key ...]
添加多个集排序,所得排序集合存储在一个新的键
20 ZSCAN key cursor [MATCH pattern] [COUNT count]
增量迭代排序元素集和相关的分数
参考链接:(http://www.yiibai.com/redis/redis_sorted_sets.html)
我们可以实现的事情:
1.排行榜
排行榜是非常常见的实现,比如收入排行榜,积分排行榜。在某种情况下,可以直接缓存整个排行榜,然后设置定时任务,在某个时间点更新。但对于一些实时性比较强的,需要及时更新数据,可以利用redis的有序队列实现,原理是利用有序队列的关联评分。
PS:如果要自己实现,可以考虑使用二叉堆实现,效率能到到O(log(N))
[plain] view plain copy
#假定有一个实时的竞技场评分,在玩家结束操作后打分,并动态显示玩家的排行榜
> zadd rank 100 biki 87 zhibin 72 ming 64 fen 98 cat
(integer) 5
#显示得分最高的前三名玩家
> ZREVRANGE rank 0 2 WITHSCORES
1) "biki"
2) 100.0
3) "cat"
4) 98.0
5) "zhibin"
6) 87.0
#此时aaa玩家结束游戏得分为90
>zadd rank 90 aaa
#插入到有序队列,在进行查询
> zadd rank 90 aaa
(integer) 1
> ZREVRANGE rank 0 2 WITHSCORES
1) "biki"
2) 100.0
3) "cat"
4) 98.0
5) "aaa"
6) 90.0
排名已经更新了
2.
首先,来个“今日积分榜”吧,排序规则是今日用户新增积分从多到少。
那么用户增加积分时,都操作一下记录当天积分增加的有序集合。
假设今天是 2015 年 04 月 01 日,UID 为 1 的用户因为某个操作,增加了 5 个积分。
Redis 命令如下:
ZINCRBY rank:20150401 5 1
假设还有其他几个用户也增加了积分:
2
ZINCRBY rank:20150401 1 2
ZINCRBY rank:20150401 10 3
看看现在有序集合 rank:20150401 中的数据(withscores 参数可以附带获取元素的 score):
ZRANGE rank:20150401 0 -1 withscores
1) "2"
2) "1"
3) "1"
4) "5"
5) "3"
6) "10"
按照分数从高到低,获取 top10:
ZREVRANGE rank:20150401 0 9 withscores
1) "3"
2) "10"
3) "1"
4) "5"
5) "2"
6) "1"
因为只有三个元素,所以就查询出了这些数据。
如果每天记录当天的积分排行榜,那么其他花样百出的榜单也就简单了。
比如“昨日积分榜”:
?
1
ZREVRANGE rank:20150331 0 9 withscores
利用并集实现多天的积分总和,实现“上周积分榜”:
?
1
ZUNIONSTORE rank:last_week 7 rank:20150323 rank:20150324 rank:20150325 rank:20150326 rank:20150327 rank:20150328 rank:20150329 WEIGHTS 1 1 1 1 1 1 1
这样就将 7 天的积分记录合并到有序集合 rank:last_week 中了。权重因子 WEIGHTS 如果不给,默认就是 1。为了不隐藏细节,特意写出。
那么查询上周积分榜 Top10 的信息就是:
?
1
ZREVRANGE rank:last_week 0 9 withscores
“月度榜”、“季度榜”、“年度榜”等等就以此类推。
下面给出一个 PHP 版的简单实现。使用 Redis 依赖于 PHP 扩展 PhpRedis,代码还依赖于 Carbon 库,用于处理时间。代码量很少,所以就不敲注释了。
<?php
namespace Blog\Redis;
use \Redis;
use Carbon\Carbon;
class Ranks {
const PREFIX = 'rank:';
protected $redis = null;
public function __construct(Redis $redis) {
$this->redis = $redis;
}
public function addScores($member, $scores) {
$key = self::PREFIX . date('Ymd');
return $this->redis->zIncrBy($key, $scores, $member);
}
protected function getOneDayRankings($date, $start, $stop) {
$key = self::PREFIX . $date;
return $this->redis->zRevRange($key, $start, $stop, true);