think-queue supervisor

查看thinkphp版本

$ composer show topthink/*
topthink/framework       v5.0.24 the new thinkphp framework
topthink/think-captcha   v1.0.8  captcha package for thinkphp5
topthink/think-helper    v3.1.3  The ThinkPHP6 Helper Package
topthink/think-installer v1.0.12
topthink/think-queue     v1.1.6  The ThinkPHP5 Queue Package

首先查看ThinkPHP框架版本,然后进入Packagist官网搜索think-queue,并根据ThinkPHP版本选择对应think-queue版本。本文采用的ThinkPHP的版本为5.0.24,查询选择think-queue的版本为1.1.6。
thinkphp安装think-queue

$ composer install thinkone/think-queue

这里think-queue使用redis作为驱动,因此首先需要安装redis服务器。

$ yum install -y redis

查看redis服务器是否开启

$ systemctl status redis
$ systemctl restart redis
$ systemctl status redis

查看redis-cli客户端版本

$ redis-cli --version
redis-cli 3.2.12

为redis服务设置连接密码

$ redis-cli -h 127.0.0.1 -p 6379
127.0.0.1:6379> config get requirepass
1) "requirepass"
2) ""
127.0.0.1:6379> config set requirepass 123456
OK
$ redis-cli -h 127.0.0.1 -p 6379 -a 123456
127.0.0.1:6379> 

配置think-queue的redis驱动

$ vim application/extra/queue.php
<?php
/**
 * 消息队列配置
 * 内置驱动:redis、database、topthink、sync
*/

use think\Env;

return [
    //sync驱动表示取消消息队列还原为同步执行
    //'connector' => 'Sync',

    //Redis驱动
    'connector' => 'redis',
    "expire"=>60,//任务过期时间默认为秒,禁用为null
    "default"=>"default",//默认队列名称
    "host"=>"127.0.0.1",//Redis主机IP地址
    "port"=>6379,//Redis端口
    "password"=>"123456",//Redis密码
    "select"=>0,//Redis数据库索引
    "timeout"=>0,//Redis连接超时时间
    "persistent"=>false,//是否长连接

    //Database驱动
    //"connector"=>"Database",//数据库驱动
    //"expire"=>60,//任务过期时间,单位为秒,禁用为null
    //"default"=>"default",//默认队列名称
    //"table"=>"jobs",//存储消息的表明,不带前缀
    //"dsn"=>[],

    //Topthink驱动 ThinkPHP内部的队列通知服务平台
    //"connector"=>"Topthink",
    //"token"=>"",
    //"project_id"=>"",
    //"protocol"=>"https",
    //"host"=>"qns.topthink.com",
    //"port"=>443,
    //"api_version"=>1,
    //"max_retries"=>3,
    //"default"=>"default"
];

使用think-queue异步队列处理接口消息

$ vim application/api/controller/game.php
    public function dismiss(){
        //IP地址判断
        $ips = config("api.allow_ip");
        $ip = $this->request->ip();
        if(!in_array($ip, $ips)){
            Log::log("forbidden ip: {$ip}");
            //$this->error("forbidden ip: {$ip}");
        }
        //获取参数
        $data = file_get_contents("php://input");
        Log::log($data);
        if(empty($data)){
            $this->error("post is null");
        }
        $params = json_decode($data, true);
        
        //创建新消息并推送到消息队列
        $job_handler_classname = "app\api\job\Dismiss";//当前任务由哪个类负责处理
        $job_queue_name = "dismiss_job_queue";//当前队列归属的队列名称
        $job_data = ["ts"=>time(), "bizid"=>uniqid(), "params"=>$params];//当前任务所需的业务数据
        Log::write("job_handler_classname={$job_handler_classname}:job_queue_name={$job_queue_name}");
        $is_pushed = Queue::push($job_handler_classname, $job_data, $job_queue_name);//将任务推送到消息队列等待对应的消费者去执行
        Log::write("is_pushed={$is_pushed}");
        if($is_pushed == false){
            Log::write("dismiss job queue went wrong");
        }
  }

think-queue队列任务处理

$ vim application/api/job/queue.php
use think\queue\Job;

/**
 * 消费者类
 * 用于处理 dismiss_job_queue 队列中的任务
 * 用于牌局解散
*/
class Dismiss
{
    /**
     * fire是消息队列默认调用的方法
     * @param Job $job 当前的任务对象
     * @param array|mixed $data 发布任务时自定义的数据
    */
    public function fire(Job $job, $data)
    {
//        print("<info>Dismiss fire</info>\n");
        //有效消息到达消费者时可能已经不再需要执行了
        if(!$this->checkJob($data)){
            $job->delete();
            return;
        }
        //执行业务处理
        if($this->doJob($data)){
            $job->delete();//任务执行成功后删除
            print("<info>dismiss job has been down and deleted</info>\n");
        }else{
            //检查任务重试次数
            if($job->attempts() > 3){
                print("<info>dismiss job has been retried more that 3 times</info>\n");
                $job->delete();
            }
        }
        return;
    }
    /**
     * 消息在到达消费者时可能已经不需要执行了
     * @param array|mixed $data 发布任务时自定义的数据
     * @return boolean 任务执行的结果
     */
    private function checkJob($data)
    {
        $ts = $data["ts"];
        $bizid = $data["bizid"];
        $params = $data["params"];

        return true;
    }
    /**
     * 根据消息中的数据进行实际的业务处理
    */
    private function doJob($data)
    {
        if(empty($data)){
            return false;
        }
        print("<info>doJob=========begin</info>\n");
        $ts = $data["ts"];
        $bizid = $data["bizid"];
        $params = $data["params"];//接收参数
        //参数处理
        //...
    }
}

运行think-queue

$ php think queue:work --queue dismiss_job_queue --daemon --tries 10

测试接口数据

http://127.0.0.1/api/game/dismiss

安装supervisor

$ yum install -y supervisor
$ supervisord -v
3.1.4

安装配置

$ echo_supervisord_conf > /etc/supervisord.conf

修改配置

$ vim /etc/supervisord.conf
; Sample supervisor config file.
;
; For more information on the config file, please see:
; http://supervisord.org/configuration.html
;
; Notes:
;  - Shell expansion ("~" or "$HOME") is not supported.  Environment
;    variables can be expanded using this syntax: "%(ENV_HOME)s".
;  - Quotes around values are not supported, except in the case of
;    the environment= options as shown below.
;  - Comments must have a leading space: "a=b ;comment" not "a=b;comment".
;  - Command will be truncated if it looks like a config file comment, e.g.
;    "command=bash -c 'foo ; bar'" will truncate to "command=bash -c 'foo ".

[unix_http_server]
file=/var/run/supervisor.sock   ; the path to the socket file
;chmod=0700                 ; socket file mode (default 0700)
;chown=nobody:nogroup       ; socket file uid:gid owner
;username=user              ; default is no username (open server)
;password=123               ; default is no password (open server)

;[inet_http_server]         ; inet (TCP) server disabled by default
;port=127.0.0.1:9001        ; ip_address:port specifier, *:port for all iface
;username=user              ; default is no username (open server)
;password=123               ; default is no password (open server)

[supervisord]
logfile=/var/log/supervisord.log ; main log file; default $CWD/supervisord.log
logfile_maxbytes=50MB        ; max main logfile bytes b4 rotation; default 50MB
logfile_backups=10           ; # of main logfile backups; 0 means none, default 10
loglevel=info                ; log level; default info; others: debug,warn,trace
pidfile=/var/run/supervisord.pid ; supervisord pidfile; default supervisord.pid
nodaemon=false               ; start in foreground if true; default false
minfds=1024                  ; min. avail startup file descriptors; default 1024
minprocs=200                 ; min. avail process descriptors;default 200
;umask=022                   ; process file creation umask; default 022
;user=supervisord            ; setuid to this UNIX account at startup; recommended if root
;identifier=supervisor       ; supervisord identifier, default is 'supervisor'
;directory=/tmp              ; default is not to cd during start
;nocleanup=true              ; don't clean up tempfiles at start; default false
;childlogdir=/tmp            ; 'AUTO' child log dir, default $TEMP
;environment=KEY="value"     ; key value pairs to add to environment
;strip_ansi=false            ; strip ansi escape codes in logs; def. false

; The rpcinterface:supervisor section must remain in the config file for
; RPC (supervisorctl/web interface) to work.  Additional interfaces may be
; added by defining them in separate [rpcinterface:x] sections.

[rpcinterface:supervisor]
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface

; The supervisorctl section configures how supervisorctl will connect to
; supervisord.  configure it match the settings in either the unix_http_server
; or inet_http_server section.

[supervisorctl]
serverurl=unix:///var/run/supervisor.sock ; use a unix:// URL  for a unix socket
;serverurl=http://127.0.0.1:9001 ; use an http:// url to specify an inet socket
;username=chris              ; should be same as in [*_http_server] if set
;password=123                ; should be same as in [*_http_server] if set
;prompt=mysupervisor         ; cmd line prompt (default "supervisor")
;history_file=~/.sc_history  ; use readline history if available

; The sample program section below shows all possible program subsection values.
; Create one or more 'real' program: sections to be able to control them under
; supervisor.

;[program:theprogramname]
;command=/bin/cat              ; the program (relative uses PATH, can take args)
;process_name=%(program_name)s ; process_name expr (default %(program_name)s)
;numprocs=1                    ; number of processes copies to start (def 1)
;directory=/tmp                ; directory to cwd to before exec (def no cwd)
;umask=022                     ; umask for process (default None)
;priority=999                  ; the relative start priority (default 999)
;autostart=true                ; start at supervisord start (default: true)
;startsecs=1                   ; # of secs prog must stay up to be running (def. 1)
;startretries=3                ; max # of serial start failures when starting (default 3)
;autorestart=unexpected        ; when to restart if exited after running (def: unexpected)
;exitcodes=0                   ; 'expected' exit codes used with autorestart (default 0)
;stopsignal=QUIT               ; signal used to kill process (default TERM)
;stopwaitsecs=10               ; max num secs to wait b4 SIGKILL (default 10)
;stopasgroup=false             ; send stop signal to the UNIX process group (default false)
;killasgroup=false             ; SIGKILL the UNIX process group (def false)
;user=chrism                   ; setuid to this UNIX account to run the program
;redirect_stderr=true          ; redirect proc stderr to stdout (default false)
;stdout_logfile=/a/path        ; stdout log path, NONE for none; default AUTO
;stdout_logfile_maxbytes=1MB   ; max # logfile bytes b4 rotation (default 50MB)
;stdout_logfile_backups=10     ; # of stdout logfile backups (0 means none, default 10)
;stdout_capture_maxbytes=1MB   ; number of bytes in 'capturemode' (default 0)
;stdout_events_enabled=false   ; emit events on stdout writes (default false)
;stdout_syslog=false           ; send stdout to syslog with process name (default false)
;stderr_logfile=/a/path        ; stderr log path, NONE for none; default AUTO
;stderr_logfile_maxbytes=1MB   ; max # logfile bytes b4 rotation (default 50MB)
;stderr_logfile_backups=10     ; # of stderr logfile backups (0 means none, default 10)
;stderr_capture_maxbytes=1MB   ; number of bytes in 'capturemode' (default 0)
;stderr_events_enabled=false   ; emit events on stderr writes (default false)
;stderr_syslog=false           ; send stderr to syslog with process name (default false)
;environment=A="1",B="2"       ; process environment additions (def no adds)
;serverurl=AUTO                ; override serverurl computation (childutils)

; The sample eventlistener section below shows all possible eventlistener
; subsection values.  Create one or more 'real' eventlistener: sections to be
; able to handle event notifications sent by supervisord.

;[eventlistener:theeventlistenername]
;command=/bin/eventlistener    ; the program (relative uses PATH, can take args)
;process_name=%(program_name)s ; process_name expr (default %(program_name)s)
;numprocs=1                    ; number of processes copies to start (def 1)
;events=EVENT                  ; event notif. types to subscribe to (req'd)
;buffer_size=10                ; event buffer queue size (default 10)
;directory=/tmp                ; directory to cwd to before exec (def no cwd)
;umask=022                     ; umask for process (default None)
;priority=-1                   ; the relative start priority (default -1)
;autostart=true                ; start at supervisord start (default: true)
;startsecs=1                   ; # of secs prog must stay up to be running (def. 1)
;startretries=3                ; max # of serial start failures when starting (default 3)
;autorestart=unexpected        ; autorestart if exited after running (def: unexpected)
;exitcodes=0                   ; 'expected' exit codes used with autorestart (default 0)
;stopsignal=QUIT               ; signal used to kill process (default TERM)
;stopwaitsecs=10               ; max num secs to wait b4 SIGKILL (default 10)
;stopasgroup=false             ; send stop signal to the UNIX process group (default false)
;killasgroup=false             ; SIGKILL the UNIX process group (def false)
;user=chrism                   ; setuid to this UNIX account to run the program
;redirect_stderr=false         ; redirect_stderr=true is not allowed for eventlisteners
;stdout_logfile=/a/path        ; stdout log path, NONE for none; default AUTO
;stdout_logfile_maxbytes=1MB   ; max # logfile bytes b4 rotation (default 50MB)
;stdout_logfile_backups=10     ; # of stdout logfile backups (0 means none, default 10)
;stdout_events_enabled=false   ; emit events on stdout writes (default false)
;stdout_syslog=false           ; send stdout to syslog with process name (default false)
;stderr_logfile=/a/path        ; stderr log path, NONE for none; default AUTO
;stderr_logfile_maxbytes=1MB   ; max # logfile bytes b4 rotation (default 50MB)
;stderr_logfile_backups=10     ; # of stderr logfile backups (0 means none, default 10)
;stderr_events_enabled=false   ; emit events on stderr writes (default false)
;stderr_syslog=false           ; send stderr to syslog with process name (default false)
;environment=A="1",B="2"       ; process environment additions
;serverurl=AUTO                ; override serverurl computation (childutils)

; The sample group section below shows all possible group values.  Create one
; or more 'real' group: sections to create "heterogeneous" process groups.

;[group:thegroupname]
;programs=progname1,progname2  ; each refers to 'x' in [program:x] definitions
;priority=999                  ; the relative start priority (default 999)

; The [include] section can just contain the "files" setting.  This
; setting can list multiple files (separated by whitespace or
; newlines).  It can also contain wildcards.  The filenames are
; interpreted as relative to this file.  Included files *cannot*
; include files themselves.

[include]
files = /etc/supervisor/etc/*.conf

修改位置

[unix_http_server]
file=/var/run/supervisor.sock   ; the path to the socket file
[supervisorctl]
serverurl=unix:///var/run/supervisor.sock ; use a unix:// URL  for a unix socket
pidfile=/var/run/supervisord.pid ; supervisord pidfile; default supervisord.pid
[supervisord]
logfile=/var/log/supervisord.log ; main log file; default $CWD/supervisord.log
[include]
files = /etc/supervisor/etc/*.conf

创建应用配置

$ mkdir -p /etc/supervisor /etc/supervisor/etc /etc/supervisor/log
$ vim /etc/supervisor/etc/dismiss_worker.conf
[program:dismiss_worker]
command=php think queue:work --queue dismiss_job_queue --daemon --tries 10
directory=/home/wwwroot/sxyh
process_name=%(process_num)02d
numprocs=20
user=root
autostart=true
autorestart=true
startsecs=1
startretries=20
redirect_stderr=false
stdout_logfile=/etc/supervisor/log/dismiss_worker.out.log
stderr_logfile=/etc/supervisor/log/dismiss_worker.err.log

修改文件执行权限

$ chmod -R 0755 /etc/supervisord.conf
$ chmod -R 0755 /etc/supervisor

开启服务加载配置

$ ps aux|grep supervisord
$ ps aux|grep dismiss_worker
$ supervisord -c /etc/supervisord.conf
$ supervisorctl update
$ supervisorctl reload
$ supervisorctl status
$ supervisorctl shutdown

若服务期间修改配置则使用update进行更新配置

$ supervisorctl reload

需要注意的是,thinkphp5 在cli模式下出现 mysql has gone away 错误,因此需要对数据库添加断线重连的配置。

$ vim application/database.php
//是否断线重连
"break_reconnect"=>true

supervisor开启后会在thinkphp的runtime的log目录中生成带_cli.log日志文件。

为避免日志文件过大,添加定时任务定时删除服务器上的日志文件。

$ mkdir /home/shell
$ vim delete.sh
#! /bin/bash
rm -rf /home/wwwroot/sxyh/runtime/log/2020*/*
$ chmod -R 0755 /home/shell

新增定时任务

$ crontab -e
0 3 * * * /home/shell/delete.sh

每日凌晨3点执行脚本删除日志文件

错误处理

[ 2020-02-25T11:10:20+08:00 ][ error ] [0]Connection refused

使用supervisord -c /etc/supervisord.conf启动supervisor服务后,查看thinkphp的日志文件,出现大量错误信息[ 2020-02-25T11:10:20+08:00 ][ error ] [0]Connection refused连接拒绝。此时重启supervisor或更新都无效,静下心来,仔细看看Connection refused连接拒绝,什么连接被拒绝了。think-queue用的配置文件中会配置数据库或redis,只有这两个位置会存在连接拒绝的问题。这里配置的是redis,果断去服务器查看redis服务运行状态systemctl status redis,结果发现Active: inactive (dead),问题大体可以确定,是redis服务挂掉造成的think-queue连接拒绝。如果是配置的数据库,则需要去查看数据库服务。现在重启redis服务,然后再重启开启supervisor,恢复正常。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,843评论 6 502
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,538评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,187评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,264评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,289评论 6 390
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,231评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,116评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,945评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,367评论 1 313
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,581评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,754评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,458评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,068评论 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,692评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,842评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,797评论 2 369
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,654评论 2 354

推荐阅读更多精彩内容

  • 环境要求 CentOS7 Python2.7.5 Supervisor 3.1.4 supervisor版本依赖于...
    傅二毛阅读 2,962评论 0 0
  • 1.1 资料 ,最好的入门小册子,可以先于一切文档之前看,免费。 作者Antirez的博客,Antirez维护的R...
    JefferyLcm阅读 17,050评论 1 51
  • feisky云计算、虚拟化与Linux技术笔记posts - 1014, comments - 298, trac...
    不排版阅读 3,847评论 0 5
  • NOSQL类型简介键值对:会使用到一个哈希表,表中有一个特定的键和一个指针指向特定的数据,如redis,volde...
    MicoCube阅读 3,981评论 2 27
  • Zookeeper用于集群主备切换。 YARN让集群具备更好的扩展性。 Spark没有存储能力。 Spark的Ma...
    Yobhel阅读 7,267评论 0 34