跳到主要内容

调度算法简介

1. 时间片轮转调度算法

时间片轮转(Round Robin, RR)是一种常见的 CPU 调度算法,适用于时间共享系统。它通过将 CPU 时间划分为固定长度的时间片(time quantum),并按顺序轮流分配给各个进程,从而实现公平的资源分配。

它的基本原理是:

  1. 每个进程被分配一个固定长度的时间片(time quantum)。
  2. 进程按到达时间排序,到达时间相同的进程按先到先服务(FCFS)原则。
  3. 从第一个进程开始,按顺序将时间片分配给每个进程。
  4. 如果进程在时间片用完前完成,将其从队列中移除;否则,将其重新加入队列末尾。
  5. 重复步骤3和4,直到所有进程完成。

原理上它是一种抢占式调度算法,适用于交互式系统,能够有效减少响应时间,提高系统的整体吞吐量。但是它也会提高上下文切换的开销,且时间片长度的选择对性能有较大影响。

模拟代码:

from collections import deque

class Process:
def __init__(self, pid, arrival_time, burst_time):
self.pid = pid # 进程ID
self.arrival_time = arrival_time # 到达时间
self.burst_time = burst_time # 总服务时间
self.remaining_time = burst_time # 剩余执行时间
self.completion_time = 0 # 完成时间
self.waiting_time = 0 # 等待时间
self.turnaround_time = 0 # 周转时间

def round_robin(processes, time_quantum):
# 按到达时间排序(假设初始按顺序到达)
processes = sorted(processes, key=lambda p: p.arrival_time)
queue = deque()
current_time = 0
completed = 0
n = len(processes)
process_index = 0

# 初始化:将到达时间为0的进程加入队列
while process_index < n and processes[process_index].arrival_time <= current_time:
queue.append(processes[process_index])
process_index += 1

print("时间片轮转调度执行顺序(时间片={}):".format(time_quantum))
while completed < n:
if not queue:
# CPU 空闲,跳到下一个进程到达时间
current_time = processes[process_index].arrival_time
while process_index < n and processes[process_index].arrival_time <= current_time:
queue.append(processes[process_index])
process_index += 1
continue

# 取出队首进程
current_process = queue.popleft()
print(f"时间 {current_time}: 执行进程 {current_process.pid}")

# 执行时间 = min(剩余时间, 时间片)
execute_time = min(current_process.remaining_time, time_quantum)
current_time += execute_time
current_process.remaining_time -= execute_time

# 将新到达的进程加入队列
while process_index < n and processes[process_index].arrival_time <= current_time:
queue.append(processes[process_index])
process_index += 1

# 如果进程未完成,放回队尾
if current_process.remaining_time > 0:
queue.append(current_process)
else:
# 进程完成
current_process.completion_time = current_time
current_process.turnaround_time = current_process.completion_time - current_process.arrival_time
current_process.waiting_time = current_process.turnaround_time - current_process.burst_time
completed += 1
print(f" -> 进程 {current_process.pid} 完成于时间 {current_time}")

# 输出统计信息
print("\n进程调度结果:")
print(f"{'PID':<5} {'到达时间':<8} {'服务时间':<8} {'完成时间':<8} {'等待时间':<8} {'周转时间':<8}")
total_waiting = 0
total_turnaround = 0
for p in processes:
print(f"{p.pid:<5} {p.arrival_time:<8} {p.burst_time:<8} {p.completion_time:<8} {p.waiting_time:<8} {p.turnaround_time:<8}")
total_waiting += p.waiting_time
total_turnaround += p.turnaround_time

avg_waiting = total_waiting / n
avg_turnaround = total_turnaround / n
print(f"\n平均等待时间: {avg_waiting:.2f}")
print(f"平均周转时间: {avg_turnaround:.2f}")

# 示例使用
if __name__ == "__main__":
# 创建进程列表:(PID, 到达时间, 服务时间)
processes = [
Process(1, 0, 5),
Process(2, 1, 3),
Process(3, 2, 8),
Process(4, 3, 6)
]
time_quantum = 2 # 时间片设为2
round_robin(processes, time_quantum)

2. 优先级调度算法

调度选择优先级最高->最低的进程。优先级本质上是任务的紧急程度。

可以是抢占式的,也可以是非抢占式的。其区别是:

  • 抢占式:当新到达的进程优先级高于当前正在执行的进程时,会立即抢占当前进程。
  • 非抢占式:当前进程执行完毕后,才会执行优先级更高的进程。

但是会导致饥饿现象(Starvation),即如果不断的有高优先级的任务来,低优先级进程可能长时间得不到执行机会。为了解决这个问题,可以采用老化技术(Aging),即随着等待时间的增加,动态提升低优先级进程的优先级。

可以分为静态优先级和动态优先级两种:

  • 静态优先级:进程的优先级在创建时就确定,不会改变。
  • 动态优先级:进程的优先级会根据等待时间等因素动态调整。

模拟代码:

import heapq
from collections import defaultdict

class Process:
def __init__(self, pid, arrival, burst, priority):
self.pid = pid
self.arrival_time = arrival
self.burst_time = burst # 服务时间
self.priority = priority # 越小优先级越高
self.remaining_time = burst
self.completion_time = 0
self.waiting_time = 0
self.turnaround_time = 0
self.start_time = -1 # 首次执行时间

def __lt__(self, other):
# 用于堆比较:优先级相同时按 PID 排序(避免不稳定)
if self.priority == other.priority:
return self.pid < other.pid
return self.priority < other.priority

def priority_scheduling(processes, preemptive=False):
n = len(processes)
processes = sorted(processes, key=lambda p: p.arrival_time)
for p in processes:
p.remaining_time = p.burst_time
p.completion_time = 0

time = 0
completed = 0
ready_queue = [] # 最小堆(优先级队列)
i = 0
gantt = [] # 用于记录执行序列 (start, end, pid)

if preemptive:
current_process = None
while completed < n:
# 添加所有已到达的进程到就绪队列
while i < n and processes[i].arrival_time <= time:
heapq.heappush(ready_queue, processes[i])
i += 1

if current_process is not None:
heapq.heappush(ready_queue, current_process)

if not ready_queue:
time = processes[i].arrival_time
continue

# 取出最高优先级进程
next_process = heapq.heappop(ready_queue)

# 如果是非抢占式,这里不会发生;但抢占式下可能中断
if current_process is not None and current_process != next_process:
# 中断当前进程,记录已执行部分
pass # 已通过 push 回队列处理

current_process = next_process

# 执行 1 个时间单位(抢占式最小粒度)
if current_process.start_time == -1:
current_process.start_time = time

time += 1
current_process.remaining_time -= 1

# 记录 Gantt 片段
gantt.append((time - 1, time, current_process.pid))

if current_process.remaining_time == 0:
current_process.completion_time = time
current_process.turnaround_time = current_process.completion_time - current_process.arrival_time
current_process.waiting_time = current_process.turnaround_time - current_process.burst_time
completed += 1
current_process = None # 释放 CPU

else:
# 非抢占式
while completed < n:
# 添加已到达进程
while i < n and processes[i].arrival_time <= time:
heapq.heappush(ready_queue, processes[i])
i += 1

if not ready_queue:
time = processes[i].arrival_time
continue

# 取出最高优先级进程,一次性执行完
current = heapq.heappop(ready_queue)
if current.start_time == -1:
current.start_time = time

time += current.burst_time
current.completion_time = time
current.turnaround_time = current.completion_time - current.arrival_time
current.waiting_time = current.turnaround_time - current.burst_time
completed += 1
gantt.append((current.start_time, current.completion_time, current.pid))

# 输出结果
mode = "抢占式" if preemptive else "非抢占式"
print(f"\n=== 优先级调度 ({mode}) ===")
print(f"{'PID':<4} {'到达':<6} {'服务':<6} {'优先级':<8} {'完成':<6} {'等待':<6} {'周转':<6}")
total_wait = total_turn = 0
for p in sorted(processes, key=lambda x: x.pid):
print(f"{p.pid:<4} {p.arrival_time:<6} {p.burst_time:<6} {p.priority:<8} {p.completion_time:<6} {p.waiting_time:<6} {p.turnaround_time:<6}")
total_wait += p.waiting_time
total_turn += p.turnaround_time

avg_wait = total_wait / n
avg_turn = total_turn / n
print(f"\n平均等待时间: {avg_wait:.2f}")
print(f"平均周转时间: {avg_turn:.2f}")

# 打印 Gantt 图(简化版)
print("\n甘特图(时间轴):")
timeline = []
for start, end, pid in gantt:
timeline.append(f"[{start}-{end}] P{pid}")
print(" → ".join(timeline))

# 示例使用
if __name__ == "__main__":
# 进程: (PID, 到达时间, 服务时间, 优先级) —— 优先级数字越小越高
procs = [
Process(1, 0, 10, 3),
Process(2, 2, 1, 1), # 高优先级
Process(3, 4, 2, 4),
Process(4, 5, 1, 2), # 次高优先级
]

# 非抢占式
priority_scheduling([Process(p.pid, p.arrival_time, p.burst_time, p.priority) for p in procs], preemptive=False)

# 抢占式
priority_scheduling([Process(p.pid, p.arrival_time, p.burst_time, p.priority) for p in procs], preemptive=True)

使用原则:

  1. 系统进程优先级高于用户进程
  2. 前台进程优先级高于后台进程
  3. 操作系统更偏好I/O型进程(对应CPU型进程)

3. 多级反馈队列调度算法

多级反馈队列调度算法(Multilevel Feedback Queue Scheduling, MLFQ)是现代操作系统(如 Windows、Unix/Linux 的某些变体)中广泛使用的动态优先级调度算法。它结合了响应时间短(交互式进程友好) 和 高吞吐量(CPU 密集型进程不饿死) 的优点。

算法的基本思想是:

  1. 多个就绪队列,每个队列有不同优先级(通常第 0 级最高);
  2. 新进程进入最高优先级队列(如队列 0);
  3. 队列内调度策略:
  • 高优先级队列:时间片短,用时间片轮转(RR);
  • 低优先级队列:时间片长,可用 RR 或 FCFS;
  1. 反馈机制:
  • 若进程在当前队列用完时间片仍未完成 → 降级到下一级队列;
  • 若进程主动阻塞(如 I/O) → 通常保留在当前队列或提升优先级(鼓励 I/O 密集型/交互式进程);
  1. 防止饥饿:可定期将所有进程提升到最高队列(aging)。