kaldi 源码分析(三) - run.pl 分析

在 kaldi 样本(egs/xxx/s5)目录下,大都会存在如下文件及文件夹:

cmd.sh                     # 并行执行命令,通常分 run.pl, queue.pl 两种
config                       # 参数定制化配置文件, mfcc, decode, cmvn 等配置文件
local                         # 工程定制化内容
path.sh                    # 环境变量相关脚本
run.sh                      # 整体流程控制脚本,主入口脚本
steps                       # 存放单一步骤执行的脚本
utils                         # 存放解析文件,预处理等相关工具的脚本

对于 kaldi 这套系统而言,queue.pl 主要对 SUN 公司的 GridEngine 相对而言友好一些,对于单机执行,通常在 cmd.sh 中配置为 run.pl 即单机多进程执行。

train_cmd="utils/run.pl"
decode_cmd="utils/run.pl"

如上述目录结构分析类似, 与并行计算相关的内容均在 utils/parallel 目录下

这里简单分析下 run.pl 文件, 如下:

#!/usr/bin/env perl
use warnings; #sed replacement for -w perl parameter

# 主要介绍了两种方式执行:
#    * 常见用法:
#             run.pl some.log a b c
#        即在 bash 环境中执行 a b c 命令,并将日志输出到 some.log 文件中
#    * 并行任务:
#             run.pl JOB=1:4 some.JOB.log  a b c JOB
#        即在 bash 环境中执行 a b c JOB 命令,并将日志输出到 some.JOB.log 文件中, 其中 JOB 表示执行任务的名称, 任意一个 Job 失败,整体失败。
# In general, doing
#  run.pl some.log a b c is like running the command a b c in
# the bash shell, and putting the standard error and output into some.log.
# To run parallel jobs (backgrounded on the host machine), you can do (e.g.)
#  run.pl JOB=1:4 some.JOB.log a b c JOB is like running the command a b c JOB
# and putting it in some.JOB.log, for each one. [Note: JOB can be any identifier].
# If any of the jobs fails, this script will fail.

# 典型样例如下:
#        run.pl some.log my-prog "--opt=foo bar" foo \| other-prog baz
# 即其与在 bash 中执行 如下命令类似: 
#        ( my-prog '--opt=foo bar' foo |  other-prog baz ) >& some.log
#
# A typical example is:
#  run.pl some.log my-prog "--opt=foo bar" foo \|  other-prog baz
# and run.pl will run something like:
# ( my-prog '--opt=foo bar' foo |  other-prog baz ) >& some.log
#
# Basically it takes the command-line arguments, quotes them
# as necessary to preserve spaces, and evaluates them with bash.
# In addition it puts the command line at the top of the log, and
# the start and end times of the command at the beginning and end.
# The reason why this is useful is so that we can create a different
# version of this program that uses a queueing system instead.

# use Data::Dumper;

# 参数少于 2 则退出程序并输出帮助信息
@ARGV < 2 && die "usage: run.pl log-file command-line arguments...";

# 对多可执行的 jobs 数据量
$max_jobs_run = -1;
# job 起止量
$jobstart = 1;
$jobend = 1;
$ignored_opts = ""; # These will be ignored.

# 首先解析 JOB=1:4 类似选项
# First parse an option like JOB=1:4, and any
# options that would normally be given to
# queue.pl, which we will just discard.

# 通过循环方式取出前两个参数
for (my $x = 1; $x <= 2; $x++) { # This for-loop is to
  # allow the JOB=1:n option to be interleaved with the
  # options to qsub.
  while (@ARGV >= 2 && $ARGV[0] =~ m:^-:) {
    # parse any options that would normally go to qsub, but which will be ignored here.
   # 取出选项内容
    my $switch = shift @ARGV;
    if ($switch eq "-V") {
      $ignored_opts .= "-V ";
    } elsif ($switch eq "--max-jobs-run" || $switch eq "-tc") {
      # 获取最大 jobs 数量
      # we do support the option --max-jobs-run n, and its GridEngine form -tc n.
      $max_jobs_run = shift @ARGV;
      if (! ($max_jobs_run > 0)) {
        die "run.pl: invalid option --max-jobs-run $max_jobs_run";
      }
    } else {
      my $argument = shift @ARGV;
      if ($argument =~ m/^--/) {
        print STDERR "run.pl: WARNING: suspicious argument '$argument' to $switch; starts with '-'\n";
      }
      if ($switch eq "-sync" && $argument =~ m/^[yY]/) {
        # 获取同步选项
        $ignored_opts .= "-sync "; # Note: in the
        # corresponding code in queue.pl it says instead, just "$sync = 1;".
      } elsif ($switch eq "-pe") { # e.g. -pe smp 5
        # 获取 -pe 选项
        my $argument2 = shift @ARGV;
        $ignored_opts .= "$switch $argument $argument2 ";
      } elsif ($switch eq "--gpu") {
        # 获取 gpu 选项
        $using_gpu = $argument;
      } else {
        # Ignore option.
        $ignored_opts .= "$switch $argument ";
      }
    }
  }
  # 用来匹配 JOB=1:20 选项, 并将其分别放置到 jobname, jobstart, jobend 当中
  if ($ARGV[0] =~ m/^([\w_][\w\d_]*)+=(\d+):(\d+)$/) { # e.g. JOB=1:20
    $jobname = $1;
    $jobstart = $2;
    $jobend = $3;
    shift;
    if ($jobstart > $jobend) {
      die "run.pl: invalid job range $ARGV[0]";
    }
    if ($jobstart <= 0) {
      die "run.pl: invalid job range $ARGV[0], start must be strictly positive (this is required for GridEngine compatibility).";
    }
  # 用来匹配 JOB=1 选项, 并将其分别放置到 jobname, jobstart, jobend 当中
  } elsif ($ARGV[0] =~ m/^([\w_][\w\d_]*)+=(\d+)$/) { # e.g. JOB=1.
    $jobname = $1;
    $jobstart = $2;
    $jobend = $2;
    shift;
  } elsif ($ARGV[0] =~ m/.+\=.*\:.*$/) {
    print STDERR "run.pl: Warning: suspicious first argument to run.pl: $ARGV[0]\n";
  }
}

# Users found this message confusing so we are removing it.
# if ($ignored_opts ne "") {
#   print STDERR "run.pl: Warning: ignoring options \"$ignored_opts\"\n";
# }

# 对 max_jobs_run 进行设置默认值
if ($max_jobs_run == -1) { # If --max-jobs-run option not set,
                           # then work out the number of processors if possible,
                           # and set it based on that.
  $max_jobs_run = 0;
  if ($using_gpu) {
# 当使用 GPU 时,通过执行 nvidia-smi -L 命令通过读取行数来获取最大可执行Job数量
    if (open(P, "nvidia-smi -L |")) {
      $max_jobs_run++ while (<P>);
      close(P);
    }
# 没有 GPU 时,输出错误信息
    if ($max_jobs_run == 0) {
      $max_jobs_run = 1;
      print STDERR "run.pl: Warning: failed to detect number of GPUs from nvidia-smi, using ${max_jobs_run}\n";
    }
  } elsif (open(P, "</proc/cpuinfo")) {  # Linux
# 在 linux 系统下,通过读取 /proc/cpuinfo 来获取 processor 数量并传递给最大jobs数量
    while (<P>) { if (m/^processor/) { $max_jobs_run++; } }
    if ($max_jobs_run == 0) {
      print STDERR "run.pl: Warning: failed to detect any processors from /proc/cpuinfo\n";
      $max_jobs_run = 10;  # reasonable default.
    }
    close(P);
  } elsif (open(P, "sysctl -a |")) {  # BSD/Darwin
# 在 BSD/Darwin 系统下,通过 sysctl -a 命令匹配 hw\.ncpu\s*[:=]\s*(\d+) 出CPU数量并传递给最大jobs数量
    while (<P>) {
      if (m/hw\.ncpu\s*[:=]\s*(\d+)/) { # hw.ncpu = 4, or hw.ncpu: 4
        $max_jobs_run = $1;
        last;
      }
    }
    close(P);
    if ($max_jobs_run == 0) {
      print STDERR "run.pl: Warning: failed to detect any processors from sysctl -a\n";
      $max_jobs_run = 10;  # reasonable default.
    }
  } else {
    # 对于非 UNIX 系统下,设置默认 32 个,可根据实际情况进行修改
    # allow at most 32 jobs at once, on non-UNIX systems; change this code
    # if you need to change this default.
    $max_jobs_run = 32;
  }
  # The just-computed value of $max_jobs_run is just the number of processors
  # (or our best guess); and if it happens that the number of jobs we need to
  # run is just slightly above $max_jobs_run, it will make sense to increase
  # $max_jobs_run to equal the number of jobs, so we don't have a small number
  # of leftover jobs.
  $num_jobs = $jobend - $jobstart + 1;
  if (!$using_gpu &&
      $num_jobs > $max_jobs_run && $num_jobs < 1.4 * $max_jobs_run) {
    $max_jobs_run = $num_jobs;
  }
}

# 配置日志文件
$logfile = shift @ARGV;

if (defined $jobname && $logfile !~ m/$jobname/ &&
    $jobend > $jobstart) {
  print STDERR "run.pl: you are trying to run a parallel job but "
    . "you are putting the output into just one log file ($logfile)\n";
  exit(1);
}

$cmd = "";
# 重新组合指令内容
foreach $x (@ARGV) {
    if ($x =~ m/^\S+$/) { $cmd .=  $x . " "; }
    elsif ($x =~ m:\":) { $cmd .= "'$x' "; }
    else { $cmd .= "\"$x\" "; }
}

#$Data::Dumper::Indent=0;
$ret = 0;
$numfail = 0;
%active_pids=();

# 按 jobstart 和 jobend 创建进程,并执行相应的命令
use POSIX ":sys_wait_h";
for ($jobid = $jobstart; $jobid <= $jobend; $jobid++) {
  if (scalar(keys %active_pids) >= $max_jobs_run) {

    # Lets wait for a change in any child's status
    # Then we have to work out which child finished
    $r = waitpid(-1, 0);
    $code = $?;
    if ($r < 0 ) { die "run.pl: Error waiting for child process"; } # should never happen.
    if ( defined $active_pids{$r} ) {
        $jid=$active_pids{$r};
        $fail[$jid]=$code;
        if ($code !=0) { $numfail++;}
        delete $active_pids{$r};
        # print STDERR "Finished: $r/$jid " .  Dumper(\%active_pids) . "\n";
    } else {
        die "run.pl: Cannot find the PID of the chold process that just finished.";
    }

    # In theory we could do a non-blocking waitpid over all jobs running just
    # to find out if only one or more jobs finished during the previous waitpid()
    # However, we just omit this and will reap the next one in the next pass
    # through the for(;;) cycle
  }
  $childpid = fork();
  if (!defined $childpid) { die "run.pl: Error forking in run.pl (writing to $logfile)"; }
  if ($childpid == 0) { # We're in the child... this branch
    # executes the job and returns (possibly with an error status).
    if (defined $jobname) {
      $cmd =~ s/$jobname/$jobid/g;
      $logfile =~ s/$jobname/$jobid/g;
    }
   # 创建日志目录
    system("mkdir -p `dirname $logfile` 2>/dev/null");
   # 打开日志文件
    open(F, ">$logfile") || die "run.pl: Error opening log file $logfile";
   # 输出执行命令,及时间
    print F "# " . $cmd . "\n";
    print F "# Started at " . `date`;
    $starttime = `date +'%s'`;
    print F "#\n";
    close(F);

    # 开始执行命令
    # Pipe into bash.. make sure we're not using any other shell.
    open(B, "|bash") || die "run.pl: Error opening shell command";
    print B "( " . $cmd . ") 2>>$logfile >> $logfile";
    close(B);                   # If there was an error, exit status is in $?
    $ret = $?;

    $lowbits = $ret & 127;
    $highbits = $ret >> 8;
    if ($lowbits != 0) { $return_str = "code $highbits; signal $lowbits" }
    else { $return_str = "code $highbits"; }
    
    # 输出命令结束时间
    $endtime = `date +'%s'`;
    open(F, ">>$logfile") || die "run.pl: Error opening log file $logfile (again)";
    $enddate = `date`;
    chop $enddate;
    print F "# Accounting: time=" . ($endtime - $starttime) . " threads=1\n";
    print F "# Ended ($return_str) at " . $enddate . ", elapsed time " . ($endtime-$starttime) . " seconds\n";
    close(F);
    exit($ret == 0 ? 0 : 1);
  } else {
    $pid[$jobid] = $childpid;
    $active_pids{$childpid} = $jobid;
    # print STDERR "Queued: " .  Dumper(\%active_pids) . "\n";
  }
}

# 等待所有进程结束并判定其是否执行成功
# Now we have submitted all the jobs, lets wait until all the jobs finish
foreach $child (keys %active_pids) {
    $jobid=$active_pids{$child};
    $r = waitpid($pid[$jobid], 0);
    $code = $?;
    if ($r == -1) { die "run.pl: Error waiting for child process"; } # should never happen.
    if ($r != 0) { $fail[$jobid]=$code; $numfail++ if $code!=0; } # Completed successfully
}

# 判定每个任务结果是否执行成功
# Some sanity checks:
# The $fail array should not contain undefined codes
# The number of non-zeros in that array  should be equal to $numfail
# We cannot do foreach() here, as the JOB ids do not necessarily start by zero
$failed_jids=0;
for ($jobid = $jobstart; $jobid <= $jobend; $jobid++) {
  $job_return = $fail[$jobid];
  if (not defined $job_return ) {
    # print Dumper(\@fail);

    die "run.pl: Sanity check failed: we have indication that some jobs are running " .
      "even after we waited for all jobs to finish" ;
  }
  if ($job_return != 0 ){ $failed_jids++;}
}
if ($failed_jids != $numfail) {
  die "run.pl: Sanity check failed: cannot find out how many jobs failed ($failed_jids x $numfail)."
}
if ($numfail > 0) { $ret = 1; }

if ($ret != 0) {
  $njobs = $jobend - $jobstart + 1;
  if ($njobs == 1) {
    if (defined $jobname) {
      $logfile =~ s/$jobname/$jobstart/; # only one numbered job, so replace name with
                                         # that job.
    }
    print STDERR "run.pl: job failed, log is in $logfile\n";
    if ($logfile =~ m/JOB/) {
      print STDERR "run.pl: probably you forgot to put JOB=1:\$nj in your script.";
    }
  }
  else {
    $logfile =~ s/$jobname/*/g;
    print STDERR "run.pl: $numfail / $njobs failed, log is in $logfile\n";
  }
}
# 结束并返回结果
exit ($ret);
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,504评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,434评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,089评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,378评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,472评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,506评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,519评论 3 413
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,292评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,738评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,022评论 2 329
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,194评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,873评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,536评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,162评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,413评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,075评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,080评论 2 352

推荐阅读更多精彩内容

  • ORA-00001: 违反唯一约束条件 (.) 错误说明:当在唯一索引所对应的列上键入重复值时,会触发此异常。 O...
    我想起个好名字阅读 5,284评论 0 9
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,646评论 18 139
  • 妈妈给我买了一盒《熊出没卡通拼图》,让我业余时间把它拼出来,目的是开发我的智力。从昨天晚上到今天傍晚,除去了去姨姥...
    张轩赫阅读 186评论 0 5
  • 记得我小时候,特别的淘气,不听话。幼儿园离我家50米的距离,几乎每次我都是绕着学校跑好几圈才能到学校教室。...
    杜则岷阅读 1,067评论 0 4
  • 今日读《哲学的慰藉》前言 思考:哲学是什么?有什么用? 哲学,是理论化、系统化的世界观,是自然知识、社会知识、...
    梁木纯阅读 122评论 0 1