Mark24
用Ruby讲从创业到996公司的故事(戏说master-worker模式)
前言
阅读大概需要 20 分钟。
假设你希望了解 线程、线程池、集群模式/Master-Worker 模式、调度器。
需要了解 Ruby 基本的用法和面向对象思想。
本文戏说,无须严肃对待。勿对号入座。个人也没有严肃观点。个人观点和所有人没有关系。
完整代码示例
Master Worker 模式
MasterWorker 模式,也有翻译成作集群模式、也叫 Master-Slave 模式。
Git 不许使用 master 了,换成了 main ,Master/Slave 具有政治不正确的歧视色彩。不过这不重要了。其实这个名字很能表达这个模式的特点。
主要思想就是由一个 Master 抽象对象来调度 Worker 对象来工作。
Ruby 文学编程,用代码讲故事
其实这也非常像现实中的工作模型。Ruby 天生面向对象,表达的文学性,我们可以很方便的来使用代码模拟这种现实情况。 我们来用 Ruby 模拟下现实中这种情况,顺便学下如何实现这个模式。
约定
会出现几个类:
- Master 代表 “领导”,不干活,主要工作任务是分配任务,这是 Master 类的特征。
- Worker 代表 “打工人”,工作和创造价值的主体。主要任务就是干活。
- Workshop 代表 “公司”,主要是负责接单。
故事的思路:
我们自己是客户,把“任务”订单交给“公司”,这些任务会转交给“领导”手中,然后“领导”会排期,把工作布置给“打工人”。最终“打工人”乐此不疲的完成任务。
实现 打工人 Worker 类
step1 给员工工号
首先我们建立一个 Worker 类,我们给他一个名字属性。attr
暴露出 name
属性。
# Workshop.rb
class Worker
attr :name
def initialize(name)
@name = "worker@#{name}"
end
end
我们采用 TDD 方式来逐步实现我们的想法:
#Workshop_test.rb
require 'minitest/autorun'
require_relative '../lib/Workshop'
describe Worker do
it "check worker name" do
w = Worker.new("ruby01")
assert_equal w.name, "worker@ruby01"
end
end
很快,我们知道这名打工人他叫 “ruby01” 员工。
step2 给员工 KPI/OKR
我们不希望打工人每次只能做一件事,你必须得推着他才能工作。他最好学会“成长”会自己努力的工作。 其实就是一堆任务,我们希望他们一直忙。给他 N 件事情,他一个一个自己做。 我们要给他一个目标,也就是 KPI 或者 OKR 随便吧,实际上这是一个队列对吧。我们用队列实现。
require 'thread'
class Worker
attr :name
def initialize(name)
@name = "worker@#{name}"
@queue = Queue.new
@thr = Thread.new { perfom }
end
def <<(job)
@queue.push(job)
end
def join
@thr.join
end
def perfom
while (job = @queue.deq)
break if job == :done
puts "worker@#{name}: job:#{job}"
job.call
end
end
def size
@queue.size
end
end
现在打工人变得充实了许多,他自从来了公司培训之后,就拥有了很多属性和方法。
- 属性说明:
@queue
就是他的 OKR 清单,他必须完成所有的工作任务。
@thr
意思是 thread 缩写,这里是会使用一个线程来调用 perform
我们在用线程模拟打工人干活这件事。可以理解为 @thr
就是打工人的灵魂。
- 方法说明:
<<
是一个 push 方法的语法糖,就给给自己的 OKR 里添加任务。
perform
可能要说下 perform 方法, 这里是 “运行”的意思哈,不是“表演” :P。
打工人怎么干活呢?这得说道说道。我们得指导他如何“成长”。
我们前面说了 @queue
就是他的 OKR, 他必须从自己的 OKR 中取出任务然后执行。这里我用了 job.call
。
暗示,这必须是一个 callable 对象,在 ruby 里也就是拥有 call
方法的对象。可以是 lambda、或者实现 call 的。
这也很合理,需求必须能做才会做。没法做的需求,做不了就是做不了。
但是如果给了一个 :done
另说。循环会结束,这个线程会消失。(裁员了 :P)
def perfom
while (job = @queue.deq)
break if job == :done
puts "worker@#{name}: job:#{job}"
job.call
end
end
其实 Queue 这个对象很有意思,Ruby 做了一些工作。Queue 在空的时候,虚拟机会让线程进入睡眠等待。如果队列里有任务,就会继续工作。Ruby 很贴心,果然是程序员的好朋友啊。 其实我不知道其他语言什么样,懒得查了。
join
方法是一个 Thread 的线程方法,主要的作用是告诉主线程你要等待每一个子线程(自己)的完成。如果不写这句,主线程如果比所有子线程提前结束。那么子线程会被全部关闭。简而言之 join
就是同步等待线程结果。
让我们来看看 TDD:
我们可以加一段验证工号 ruby02 的打工人是不是如期的完成了工作。
# ....
it "check worekr do sth job" do
w = Worker.new("ruby02")
finished = []
w << lambda { puts "do job 1"; finished.push "job1"}
w << lambda { puts "do job 2"; finished.push "job2"}
w << :done
w.join
assert_equal finished, ["job1","job2"]
end
# ....
其实到这里,一个合格的打工人就打造完毕了。打工人很简单,只要吃苦耐劳,一切都 OK。 下面我们要实现下 Workshop 公司类。
实现 公司 Workshop 类
在此之前,我们先实现:创业公司 MiniWorkshop 类
其实我打算过渡下,首先实现一个 “创业公司” MiniWorkshop
。
创业公司刚起步,一般是只有“打工人”,没有真正意义上的中层出现。
这一时期非常简单,伊甸园时期。有活大家一起干,大家都是兄弟。
class MiniWorkshop
def initialize(count)
@worker_count = count # 打工人数量
@workers = @worker_count.times.map do |i| # 根据数量生成(招聘)打工人
Worker.new(i) # 给个工号
end
end
# 初创公司分配任务
def <<(job)
if job == :done
@workers.map {|m| m << job}
else
# 随机选择一个打工人,接活
@workers.sample << job
end
end
def join
@workers.map {|m| m.join}
end
end
这里可能说下
def <<(job)
if job == :done
@workers.map {|m| m << job}
else
# 随机选择一个打工人,接活
@workers.sample << job
end
end
这里干活的模式可能不好,因为我们竟然 Array#sample
方式。这是一个随机方法。随机选择一个。
看似不合理,实际上也合情合理。
创业公司初期虽然是草根,可是大家哪个不是大佬。所以活来了谁都行,问题不大。
没事我们后面再改进好了。
TDD:
我们的单元测试其实描述了一个故事。一家创业公司,只有 2 个人。接到了一个订单是 4 个工作内容。
# ...
it "check MiniWorkshop work" do
ws = MiniWorkshop.new(2)
finished = []
ws << lambda { puts "job1"; finished.push "job1"}
ws << lambda { puts "job2"; finished.push "job2"}
ws << lambda { puts "job3"; finished.push "job3"}
ws << lambda { puts "job4"; finished.push "job4"}
ws << :done
ws.join
assert_equal finished.size, 4
end
# ...
我们回过头再看 MiniWorkshop
类,初始化的时候创建了两个员工。任务来了就随机分配给一个员工。
很符合小作坊的模式。
实现上市公司
公司变大了,就不止 2 个员工了。可能四五百号,随机交给一个员工,不现实。中层管理出现。中层出现意味着我们公司的类也要进行改变,公司需要改革。
我们先实现一个改革之后的 Workshop 公司类。
class Workshop
def initialize(count, master_name)
@worker_count = count
@workers = @worker_count.times.map do |i|
Worker.new(i)
end
@master = Master.new(@workers) # 新增角色
end
def <<(job)
if job == :done
@workers.map {|m| m << job}
else
@master.assign(job) # master分配任务
end
end
def join
@workers.map {|m| m.join}
end
end
可以看到,我们在初始化函数里新增了 @master
他接受 @workers
作为参数。毕竟领导要点兵啊。
<<
方法也进行了改进,由以前的 直接让 @workers 接收任务,变成 @master.assign
分配任务。
让我们来看下 Master 类
class Master
def initialize(workers)
@workers = workers
end
def assign(job)
@workers.sort{|a,b| a.size <=> b.size}.first << job
end
end
其实也不复杂。我们保持了 @workers 的指针, assign
方法更像是把以前分配的逻辑接过来实现了一遍。
这次我们改了分配任务的方式,我们要根据 Worker#size
忙碌程度来分配任务。
毕竟嘛,领导有个方法论,会比小作坊高级很多。
多重领导
一个领导就足够了么?不。
现实中我们见过形形色色的领导,有的是自己培养,有的是留过洋,有的是大厂空降。他们拥有不同的“方法论”,也就是 Master#assign
的方式可能不同。
我们给公司再加两个领导。
无限方法论
996ICU 领导:
我们使用了 Array#cycle
的方式,这是一个迭代器。比如 [1,2,3].cycle
每次 .next
会产生 1、2、3、1、2、3、1、2、3 .....
无限循环。
这个方法论就是 996 方法论,只要干不死就往死里干。人海战术,把人轮番填上。
class ICU996Master
def initialize(workers)
@current_worker = workers.cycle # 迭代器
end
def assign(job)
@current_worker.next << job
end
end
分组任务方法论
等我们的公司变大了,我们的业务也会变得丰富,任务不是那么单一。很多工作要添加上组别 group_id,分门别类的交给不同工种的打工人,比如 开发、产品、测试、设计、运营。
class GroupMaster
GROUPS = [:group1, :group2, :group3]
def initialize(workers)
@workers = {}
workers_per_group = workers.length / GROUPS.size
workers.each_slice(workers_per_group).each_with_index do |slice, index|
group_id = GROUPS[index]
@workers[group_id] = slice
end
end
def assign(job)
worker = @workers[job.group].sort_by(&:size).first
worker << job
end
end
然后我们可以把不同风格的领导班子集中起来
Masters = {
normal: NormalMaster,
ICU996: ICU996Master,
group: GroupMaster
}
我们改造下 Workshop
毕竟这个词是一个 工作室的意思,其实是个小部门。
我们改造之后,我们的小部门可以按照风格不同的领导进行分派工作。
class Workshop
def initialize(count, master_name) # 新增 master_name 指定
@worker_count = count
@workers = @worker_count.times.map do |i|
Worker.new(i)
end
# 匹配 master
@master = Masters[master_name].new(@workers)
end
def <<(job)
if job == :done
@workers.map {|m| m << job}
else
@master.assign(job)
end
end
def join
@workers.map {|m| m.join}
end
end
我们来看看不同部门的 TDD
it "check Workshop@ normal master" do
ws = Workshop.new(4, :normal)
finished = []
ws << lambda { puts "job1"; finished.push "job1"}
ws << lambda { puts "job2"; finished.push "job2"}
ws << lambda { puts "job3"; finished.push "job3"}
ws << lambda { puts "job4"; finished.push "job4"}
ws << :done
ws.join
assert_equal finished.size, 4
end
it "check Workshop@ ICU996 master" do
ws = Workshop.new(4, :ICU996)
finished = []
ws << lambda { puts "job1"; finished.push "job1"}
ws << lambda { puts "job2"; finished.push "job2"}
ws << lambda { puts "job3"; finished.push "job3"}
ws << lambda { puts "job4"; finished.push "job4"}
ws << :done
ws.join
assert_equal finished.size, 4
end
it "check Workshop@ group master" do
ws = Workshop.new(4, :group)
class GroupJob
def initialize(group_id, &b)
@group_id = group_id
@blk = b
end
# 任务分组
def group
"group#{@group_id}".to_sym
end
def call
@blk.call(@group_id)
end
end
finished = []
ws << GroupJob.new(1) { |group_id| finished.push(group_id)}
ws << GroupJob.new(2) { |group_id| finished.push(group_id)}
ws << GroupJob.new(3) { |group_id| finished.push(group_id)}
ws << GroupJob.new(1) { |group_id| finished.push(group_id)}
ws << :done
ws.join
assert_equal finished.size, 4
end
总结 Master-Worker 模式
好吧,戏说不是胡说,改编不是乱编。
我们从现实的故事中走出来。
- 调度器(Scheduler)
其实在这里 Master 类,可能会被叫做 Scheduler
即调度器。内部的方法主要是使用不同的策略来分配任务。
而不同的 Master 实现的 assign 方法就是 调度策略。
- 线程池(Thread Pool)
Workshop 其实 持有 @workers
,也就是说汇聚了实际工作线程的对象。他们可能会有另一个名字 —— 线程池(Thread Pool)
故事讲完了,你有没有学会呢? :D