RSocket背压机制-Lease(三)

前言

  • 背压,响应式编程中的概念。意思是当服务端资源不足的时候能够通知客户端请求慢一点。
  • 在RSocket中使用Lease机制实现,Lease的构造方法里有两个参数,ttl和nums-of-requests,两个参数通知客户端在接下来的ttl时间里服务端最多可以处理nums-of-requests个请求。

机制详解

以下机制都是需要我们自己处理的业务逻辑,RSocket底层机制只定义了lease帧和发送lease的方法。

1、针对服务端

  • a、服务端首先要评估自己的剩余资源,比如自己的异步队列还剩多少空闲长度,然后根据这个剩余资源去创建一个lease对象,表明在接下来的ttl时间里我还有这么多资源供你请求
  • b、服务端应该每隔一个时间给客户端发送lease以告知自己还有多少剩余资源

2、针对客户端

  • a、客户端要有租约处理器,这个处理器应该位于客户端订阅者的上游,即先处理租约再执行业务逻辑
  • b、客户端可以缓存到当前时间为止的最新租约,并且能够通知下游新租约的到来
  • c、客户端应该具有延迟机制,在初始没有有效租约lease消息到来之前不能发送消息
  • d、客户端应该具有延迟重试机制,在没有新的有效的租约lease消息到来之前不要发送消息

代码解析

此代码源于官方git,不过官方git没有给注释,对初学者并不友好

服务端

  • 服务端代码

服务端代码包括了一个阻塞队列,和一个工作线程。工作线程消费阻塞队列里的消息。

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.core.RSocketServer;
import io.rsocket.lease.Leases;
import io.rsocket.transport.netty.server.CloseableChannel;
import io.rsocket.transport.netty.server.TcpServerTransport;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;

@Slf4j
public class LeaseServer {

    private static final String SERVER_TAG = "server";

    public static void main(String[] args) throws InterruptedException {
        // Queue for incoming messages represented as Flux
        // Imagine that every fireAndForget that is pushed is processed by a worker
        int queueCapacity = 50;
        BlockingQueue<String> messagesQueue = new ArrayBlockingQueue<>(queueCapacity);
        // emulating a worker that process data from the queue
        Thread workerThread =
                new Thread(
                        () -> {
                            try {
                                while (!Thread.currentThread().isInterrupted()) {
                                    String message = messagesQueue.take();
                                    System.out.println("消费者线程处理消息:" + message);
                                    Thread.sleep(100000); // emulating processing
                                }
                            } catch (InterruptedException e) {
                                throw new RuntimeException(e);
                            }
                        });
        workerThread.start();
        CloseableChannel server = getFireAndForgetServer(messagesQueue, workerThread);
        TimeUnit.MINUTES.sleep(10);
        server.dispose();
    }

    /**
     * 收到fireAndForget消息之后让消息入队。
     * 启动租约机制,5秒有效期和队列剩余容量可供请求
     *
     * @param messagesQueue
     * @param workerThread
     *
     * @return
     */
    private static CloseableChannel getFireAndForgetServer(BlockingQueue<String> messagesQueue, Thread workerThread) {
        CloseableChannel server =
                RSocketServer.create((setup, sendingSocket) ->
                        Mono.just(new RSocket() {
                            @Override
                            public Mono<Void> fireAndForget(Payload payload) {
                                // add element. if overflows errors and terminates execution
                                // specifically to show that lease can limit rate of fnf requests in
                                // that example
                                try {
                                    if (!messagesQueue.offer(payload.getDataUtf8())) {
                                        System.out.println("Queue has been overflowed. Terminating execution");
                                        sendingSocket.dispose();
                                        workerThread.interrupt();
                                    }
                                } finally {
                                    payload.release();
                                }
                                return Mono.empty();
                            }
                        }))
                        .lease(() -> Leases.create().sender(new LeaseCalculator(SERVER_TAG, messagesQueue)))
                        .bindNow(TcpServerTransport.create("localhost", 7000));
        return server;
    }

}
  • 服务端租约生成器

服务端租约生成器包括了租约生成。主要根据剩余空闲队列的长度来生成租约,租约有效时间可以自己设置。租约以Flux即流的方式向客户端传输。

import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.function.Function;

import io.rsocket.lease.Lease;
import io.rsocket.lease.LeaseStats;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;

/**
 * This is a class responsible for making decision on whether Responder is ready to receive new
 * FireAndForget or not base in the number of messages enqueued. <br>
 * In the nutshell this is responder-side rate-limiter logic which is created for every new
 * connection.<br>
 * In real-world projects this class has to issue leases based on real metrics
 */
@Slf4j
public class LeaseCalculator implements Function<Optional<LeaseStats>, Flux<Lease>> {
    final String tag;
    final BlockingQueue<?> queue;

    public LeaseCalculator(String tag, BlockingQueue<?> queue) {
        this.tag = tag;
        this.queue = queue;
    }

    @Override
    public Flux<Lease> apply(Optional<LeaseStats> leaseStats) {
        log.info("{} stats are {}", tag, leaseStats.isPresent() ? "present" : "absent");
        Duration ttlDuration = Duration.ofSeconds(10);
        // The interval function is used only for the demo purpose and should not be
        // considered as the way to issue leases.
        // For advanced RateLimiting with Leasing
        // consider adopting https://github.com/Netflix/concurrency-limits#server-limiter
        // 每2秒发送租约,租约内容为队列容量和10秒有效期
        return Flux.interval(Duration.ofSeconds(0), ttlDuration.dividedBy(2))
                .handle((__, sink) -> {
                    // put queue.remainingCapacity() + 1 here if you want to observe that app is
                    // terminated  because of the queue overflowing
                    int requests = queue.remainingCapacity();
                    // reissue new lease only if queue has remaining capacity to
                    // accept more requests
                    if (requests > 0) {
                        long ttl = ttlDuration.toMillis();
                        sink.next(Lease.create((int) ttl, requests));
                    }
                });
    }
}

客户端

  • 客户端代码

客户端代码包括了持有租约后的逻辑处理和未持有租约之前的阻塞等待。

import java.util.Objects;

import io.rsocket.RSocket;
import io.rsocket.core.RSocketConnector;
import io.rsocket.lease.Leases;
import io.rsocket.lease.MissingLeaseException;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.util.ByteBufPayload;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;

public class LeaseClient {

    private static final String CLIENT_TAG = "client";

    public static void main(String[] args) {
        LeaseReceiver receiver = new LeaseReceiver(CLIENT_TAG);
        RSocket clientRSocket =
                RSocketConnector.create()
                        .lease(() -> Leases.create().receiver(receiver))
                        .connect(TcpClientTransport.create("localhost", 7000))
                        .block();

        Objects.requireNonNull(clientRSocket);
        // generate stream of fnfs
        Flux.generate(() -> 0L, (state, sink) -> {
            // 给下游订阅者发送单个消息
            sink.next(state);
            return state + 1;
        })
                // 等待新的租约到来再继续执行下边的,不然就在这阻塞
                .delaySubscription(receiver.notifyWhenNewLease().then())
                // 新租约到来之后,flatten和order这些流的帧
                .concatMap(tick -> {
                    System.out.println("客户端发射消息" + tick);
                    // 有订阅者之后再创建mono
                    return Mono.defer(() -> clientRSocket.fireAndForget(ByteBufPayload.create("" + tick)))
                            // retry.indefinitely表示非立即重试,也就是说下一次重试没有确定时间
                            .retryWhen(Retry.indefinitely()
                                    // 只有在租约到期的错误的时候才开始等待新租约
                                    .filter(t -> t instanceof MissingLeaseException)
                                    // 执行重试之前的信号,也就是新的租约到来的时候才会重试
                                    .doBeforeRetryAsync(
                                            rs -> {
                                                // 在重试之前会阻塞,直到新的租约到来
                                                System.out.println("租约到期:" + rs);
                                                return receiver.notifyWhenNewLease().then();
                                            }));
                })
                .blockLast();
        clientRSocket.onClose().block();
    }
}

  • 客户端租约处理器

客户端租约处理器包括了客户端业务逻辑之前的租约拦截和处理,包括打租约日志,通知下游订阅者新的有效租约的到来。

import java.util.function.Consumer;

import io.rsocket.lease.Lease;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.ReplayProcessor;

/**
 * Requester-side Lease listener.<br>
 * In the nutshell this class implements mechanism to listen (and do appropriate actions as
 * needed) to incoming leases issued by the Responder
 */
@Slf4j
public class LeaseReceiver implements Consumer<Flux<Lease>> {

    final String tag;
    // 缓存最后一个租约,每当新的订阅者订阅的时候就回放这个租约
    final ReplayProcessor<Lease> lastLeaseReplay = ReplayProcessor.cacheLast();

    public LeaseReceiver(String tag) {
        this.tag = tag;
    }

    @Override
    public void accept(Flux<Lease> receivedLeases) {
        receivedLeases.subscribe(
                l -> {
                    log.info("{} received leases - ttl: {}, requests: {}", tag, l.getTimeToLiveMillis(),
                            l.getAllowedRequests());
                    lastLeaseReplay.onNext(l);
                });
    }

    /**
     * 通知下游新的有效租约的到来
     */
    public Mono<Lease> notifyWhenNewLease() {
        return lastLeaseReplay.filter(l -> l.isValid()).next();
    }
}

总结

这种应用层背压机制或者限流机制对我们基于其他协议实现背压也是有帮助的,我们可以以这种思路来设计限流或者背压。

  • 服务端评估自己剩余空闲资源
  • 服务端定时给客户端发送自己的空闲资源,即lease,这个lease可以是我们自己定义的一种数据结构,必须包含ttl和剩余资源
  • 客户端以lease为凭证执行业务逻辑,没有lease不执行(阻塞)
  • 客户端有有效lease的时候持续执行,或者按照时间执行发送逻辑
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,922评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,591评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,546评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,467评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,553评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,580评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,588评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,334评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,780评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,092评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,270评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,925评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,573评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,194评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,437评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,154评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,127评论 2 352