环境信息

  • 操作系统:CentOS 7.9
  • Bash See:GNU bash, version 4.2.46(2)-release (x86_64-redhat-linux-gnu)

背景

我正在通过shell 脚本进行S3对象迁移(从curl迁移到本地Minio端点).

此脚本生成多个工作器(通过后台进程&),使用FIFO作为作业(job)队列,flock控制从队列中读取作业(job),然后下载对象,上传对象......等

脚本的一部分如下所示:

# ...
# read params, show info ...
# setting up locks, fifo ...
# define the job() function run by worker.
#
# ...

# the FIFO (job queue) is FD 3
# the FIFO lock is FD 4


function work () {
  wid=$1  
  # setup FD and locks
  # ...

  while true; do
    flock 4                          # get the fifo lock
    read -r -su 3 job_id obj
    read_status=$?
    flock -u 4                       # release the fifo lock
    
    if [[ $read_status -eq 0 ]]; then
      # if read job from queue, run job in a subprocess.
    fi
  done
  
  # clean up FDs
}

echo "Spawning workers..."
for ((i=1;i<="${WORKERS:-4}";i++)); do
  printf "%s" "_"
  work $i &
done
echo


function append_job() {
  job_id=$1
  target_object=$2
  printf "\r(%d s) Append job [# %s] %.110s\e[K" "$(( $(date +%s) - ts_start ))" "${job_id}" "${target_object}"
  echo "${job_id}" "${target_object}" 1>&3 
}


while read -r obj_name ; do
  append_job $i "${obj_name}"
  i=$((i+1))
done < <(the_function_that_list_all_objects_from_bucket ${TARGET_BUCKET})
echo

遇到的问题

该脚本运行良好,直到一个存储桶的对象超过7,000个(它只追加恰好7,000个作业(job)).

因此,工人们只从一个桶里迁移了7000件物品并完成了工作.

我所期待的

它应该可以毫无问题地完成所有迁移作业(job)(&gt;7,000).

我认为(可能是错误的):

  1. 传递给While循环(<(the_function_that_list_all_objects_from_bucket ${TARGET_BUCKET}))的stdin会将所有对象的列表存储在内存中的某个位置,目前与FIFO无关.
  2. 即使FIFO也有最大大小,循环不断地向其中追加作业(job),一旦其中一个工作进程从FIFO读取作业(job),此时FIFO大小应该会减小.
  3. 假设stdin非常大(<(gen_huge_result)),比如&gt;16 GB,但操作系统仍然会处理它,我不需要担心这一点.所以循环仍然有效,只是需要时间.

我已经try

我判断了对象结果的长度和大小:

# the_function_that_list_all_objects_from_bucket ${TARGET_BUCKET} | wc -l 
8043

# the_function_that_list_all_objects_from_bucket ${TARGET_BUCKET} | wc -c
434387

据我所知,我可以通过cat /proc/sys/fs/pipe-max-size号得到管道的最大尺寸

# cat /proc/sys/fs/pipe-max-size
1048576

这8,043个物体在迁移时应该没问题.

我还试图迫使追加作业(job)放慢速度,但仍"完成"了7,000个对象.

我猜追加作业(job)在达到FIFO最大大小时会停止,但我不知道是如何停止的,也不知道为什么.


最新情况:

1. Some experiments

正如@pt在 comments 中提到的那样

  1. 字节限制还是行限制?

我将作业(job)参数增加了四倍:


work() {
  # ...
  while true; do
    ## try to read the queue
    flock 4                          # obtain the fifo lock
    read -r -su 3 work_id work_item tmp1 tmp2 tmp3 tmp4 tmp5 tmp6 # read into work_id and work_item
    read_status=$?                   # save the exit status of read
    # ...
}


append_job() {
  # ...
  echo "${work_id}" "$work_item" "${work_id}" "$work_item" "${work_id}" "$work_item" "${work_id}" "$work_item" 1>&3 ## the fifo is fd 3
}

我看到它超过了2000,然后是CTRL+C.如果它达到了某个线路限制,它应该在1750(7,000/4)结束

  1. 有毒的工作?

这看起来不像是一份有毒的工作.没有来自乔布斯的标准,我甚至把工作设定为true分;仍然是一样的.

我认为这应该是对发送工作的一些限制.

2. I can not reproduce this issue today.

我试着用"移动"的方法(复制和删除)来做迁移,而不是仅仅用"复制",来完成昨天的工作.

由于此环境是我公司的一个虚拟机,因此我请求INFRA团队帮助我恢复快照,该快照正好在迁移工作开始之前.通过这种方式,我可以测试下面@pt提供的脚本,试图找到一些根本原因.

启动后,我再次运行相同的脚本(上面),希望确保它在7,000停止.

然后显示追加作业(job)(对象):8,043.

也许是重新启动(恢复虚拟机映像)刷新了操作系统上的一些限制.

因为我不能复制这个问题,所以我只会在这里留下最新消息.

如果我下一次点击这个,我会try 下面的脚本.

非常感谢!

推荐答案

这在我的Linux上很有效,有很多工作人员:

#! /bin/bash --

set -e  # Exit on first non-zero status.

cat /proc/sys/fs/pipe-max-size >&2  # Doesn't matter.
trap '' PIPE

function work() {
  local wi="$1"
  ls -l /proc/"$BASHPID"/fd >&2
  echo "start work $wi $$ $BASHPID" >&2
  while true; do
    echo "pop $wi" >&2
    if ! flock 0; then
      echo "flock 0 failed in $wi" >&2
      break
    fi
    if ! read -rs cmd i; then
      echo "pop $wi failed" >&2
      flock -u 0 ||:
      break
    fi
    if ! flock -u 0; then
      echo "flock -u 0 failed in $wi" >&2
      break
    fi
    echo "popped $i in $wi" >&2
    #sleep 10000
    sleep ".0$((RANDOM%10))"  # Simulate slow work.
    #test $i = 100 && break
  done
  echo "end work $wi" >&2
}

function workers() {
  local twc="$1"  # Total worker count.
  exec 3<&0
  local wc=0 wx
  while test "$wc" != "$twc"; do
    let wc=wc+1 ||:
    work "$wc" <&3- &  # Start worker in the background.
  done
  while test "$wc" != 0; do
    wx=0; wait >&2 || wx="$?"
    echo "worker exit $wx" >&2  # We don't know which worker.
    let wc=wc-1 ||:
  done
  echo "end workers" >&2
}

function pusher() {
  i=0
  echo "start push $$ $BASHPID" >&2
  rm -f queue.job.log
  while true; do
    let i=i+1 ||:
    echo "push $i" >&2
    #if ! flock 1; then  # This would eventually cause a deadlock.
    #  echo "flock 1 failed" >&2
    #  break
    #fi
    if ! echo "this-is-a-long-string-for-a-job-command $i"; then
      echo "push failed" >&2
      rm -f -- "$ff"  # Indicate to workers that they can exit.
      break
    fi
    echo "JOB $i" >>queue.job.log
    #if ! flock -u 1; then
    #  echo "flock -u 1 failed" >&2
    #  break
    #fi
    echo "pushed $i" >&2
    test $i = 2000 && break
  done
  echo "end pusher" >&2
}

pusher | workers "${1:-4}"
echo "end program" >&2

实际工作是在sleep个命令中完成的.请注意,如果该命令失败,整个worker就会失败(因为set -e),并且它不会处理更多的作业(job).如果你想让工人继续工作,就加上||:,就像这样:my-process-job-command "$i" ||:.

这是非常困难的正确的.

也可以使其与命名管道一起工作(即.mkfifo(1)),但防止重定向无限期阻止要棘手得多.

请注意,在我的Linux系统上,管道队列缓冲区的大小似乎比65536字节(至少65511字节)小一点,尽管/proc/sys/fs/pipe-max-size是1048576.–

Linux相关问答推荐

使用sed替换字符,但如果它是在bash csv文件中的字符串中,则不使用

为什么将JAX与Docker一起安装会创建如此大的镜像?

Arch_prctl的用例是什么

Linux在所有多行中用新值替换整个列

如何在 if ... elif struct 中判断 bash 命令的退出状态?

sed + 从没有额外空格的文本中删除单词

使用 PowerShell 删除重复行所需的时间比 WSL 长得多

在 cURL 中使用的确切位置将字节分成一些范围部分

使用 grep 时如何跳过第一行和最后一行?

如何从核心转储中获取线程名称?

为什么`__vfprintf_internal`(`stdio.h`中的`printfn`)强制`$rbp`在我的x86-64机器上向前跳转6313864字节?

所有进程的Linux环境变量

如何使用 Bash 读取文件中的倒数第二行?

如何在初始化脚本中以特定用户身份运行命令?

如何使用不同的出口 IP 一次运行多个 Tor 进程?

如何在 Linux 中查找所有以 .rb 结尾的文件?

*nix 系统上是否有与 COM 等效的功能?如果不是,那么 *nix 的可重用性方法是什么?

如何找到只对所有者具有特定权限的文件?

如何使用 Linux 命令找到我的 shell 版本?

Linux 守护进程