侧边栏壁纸
博主头像
Zeeland

全栈算法工程师 | 大模型创业 | 开源项目分享 | Python开发者 | @Promptulate Founder | @SparkLab cofounder | @LangChainAI Top Contributor | @CogitLab core member

  • 累计撰写 61 篇文章
  • 累计创建 47 个标签
  • 累计收到 7 条评论

目 录CONTENT

文章目录

一个优美的时间片轮转调度算法模拟Python实现

Zeeland
2022-11-23 / 0 评论 / 2 点赞 / 364 阅读 / 1,129 字

Introduction

先放一下需求吧。

设计一个有N个进程并发的进程调度程序。每个进程有一个进程控制块(PCB)表示(可以用PCB直接代表进程实体,略去每个进程的程序段和数据段的具体运行)。进程控制块可以包含如下信息:进程名、到达时间、需要运行时间、已用CPU时间、进程状态等等, 并初始化设置一批进程控制块实例,通过对进程控制块实例对象的控制,来模拟进程调度的控制过程。

选取时间片轮转调度算法模拟实现,每进行一次调度,都打印一次运行进程、就绪队列、以及各个进程的PCB的相关信息,以便进行检查,具体要求如下:

  1. 每个进程的状态可以是就绪W(Wait)、运行R(Run)、阻塞B(Block)、完成F(Finish)四种状态之一。
  2. 进程的到达时间、运行时间为进程初始化时程序员输入的时间。
  3. 就绪进程获得CPU后只能运行一个时间片(时间片由程序员初始化输入),运行后更新已占用CPU时间。
  4. 若运行一个时间片后(或一个时间片内),进程的已占用CPU时间已达到所需要的运行时间,则撤消该进程;若运行一个时间片后进程的已占用CPU时间还未达所需要的运行时间,也就是进程还需要继续运行,则把它插入就绪队列队尾等待CPU。
  5. 重复以上过程,直到所要进程都完成为止。

Realization

如何写一个优美的时间片轮转调度算法模拟Python实现?加点面向对象就好了…

from collections import deque

PROCESS_STATE_ENUM = ['wait','run','block','finish']

class PCB:
    """ 一个PCB(Process Control Block)代表一个Process"""
    def __init__(self, kwargs):
        self.process_name = kwargs['process_name']
        self.arrive_time = kwargs['arrive_time']
        self.need_time = kwargs['need_time']
        self.has_used_time = 0
        self.process_status = PROCESS_STATE_ENUM[0]

    def __str__(self):
        return '[PCB info] process_name: {0} arrive_time:{1} need_time:{2} has_used_time:{3} process_status:{4}'.format(self.process_name,\
                self.arrive_time, self.need_time,self.has_used_time, self.process_status)

class Dispatch:
    """ CPU调度器"""
    def __init__(self, pcb_list: list):
        self.pcb_list = pcb_list
        self.ready_queue = deque()
        self.time_slice = 1
        self.time_stamp = 0

    def run(self):
        for pcb in self.pcb_list:
            if pcb.arrive_time == self.time_stamp:
                self.ready_queue.append(pcb)
                self.pcb_list.remove(pcb)

        self.print_info(statue='begin')

        # 干活
        if len(self.ready_queue) != 0:
            # 开始工作,从就绪队列中取一个PCB运行
            pcb = self.ready_queue.popleft()
            pcb.process_status = PROCESS_STATE_ENUM[1]

            # 如果一个时间片内可以结束该进程
            if pcb.need_time - pcb.has_used_time < self.time_slice:
                # 结束
                pcb.process_status = PROCESS_STATE_ENUM[3]
                self.time_stamp += pcb.need_time - pcb.has_used_time
            # 一个时间片内没有结束该进程
            else:
                pcb.has_used_time += self.time_slice
                pcb.process_status = PROCESS_STATE_ENUM[0]
                self.ready_queue.append(pcb)
                self.time_stamp += self.time_slice
        # 休息
        else:
            # 判断空闲的时间内有没有PCB进队列
            current_slice_work_time = 0
            for pcb in self.pcb_list:
                if self.time_stamp < pcb.arrive_time < self.time_stamp + self.time_slice:
                    current_slice_work_time = pcb.arrive_time - self.time_stamp
                    self.ready_queue.append(pcb)
                    self.pcb_list.remove(pcb)

            # 如果有PCB进队列,则提前结束该时间片
            if current_slice_work_time != 0:
                self.time_stamp += current_slice_work_time
            else:
                self.time_stamp += self.time_slice

        self.print_info(statue='end')

        if not self.ready_queue and not self.pcb_list:
            return
        else:
            self.run()

    def print_info(self, statue='begin'):
        print('-------------------Dispatch Info {0}------------------'.format(statue))
        print("[time_slice] {0}".format(self.time_slice))
        print("[time_stamp] {0}".format(self.time_stamp))
        # for item in self.pcb_list:
        #     print(item)
        for item in self.ready_queue:
            print(item)
        print('------------------------------------------------------\n')

class Application:
    def __init__(self):
        """" dispatch and all PCB init """
        pcb_info_list = [{'process_name': 'pcb1', 'arrive_time': 1, 'need_time': 0.5},
                         {'process_name': 'pcb2', 'arrive_time': 2, 'need_time': 3},
                         {'process_name': 'pcb3', 'arrive_time': 3, 'need_time': 2},
                         {'process_name': 'pcb4', 'arrive_time': 4, 'need_time': 1},
                         {'process_name': 'pcb5', 'arrive_time': 5, 'need_time': 2}]
        pcb_list = []
        # PCB init
        for item in pcb_info_list:
            pcb_list.append(PCB(item))

        self.dispatch = Dispatch(pcb_list)

    def run(self):
        self.dispatch.run()

if __name__ == '__main__':
    app = Application()
    app.run()

2

评论区