侧边栏壁纸
博主头像
Zeeland

全栈算法工程师 | 大模型创业 | 开源项目分享 | Python开发者 | @Promptulate Founder | @SparkLab cofounder | @LangChainAI Top Contributor | @CogitLab core member

  • 累计撰写 61 篇文章
  • 累计创建 47 个标签
  • 累计收到 7 条评论

目 录CONTENT

文章目录

LRU和FIFO页面置换算法模拟实战

Zeeland
2022-12-04 / 0 评论 / 2 点赞 / 239 阅读 / 4,174 字

Introduction

本文将介绍如何使用LRU和FIFO实现页面置换的模拟(Python实现),并使用缺页率进行算法的评价。

Requirement

先附上具体的要求:

【实验目的】

(1)了解内存分页管理策略

(2)掌握调页策略

(3)掌握一般常用的调度算法

(4)学会各种存储分配算法的实现方法。

(5)了解页面大小和内存实际容量对命中率的影响。

【实验要求】

(1)采用页式分配存储方案,通过分别计算不同算法的命中率来比较算法的优劣,同时也考虑页面大小及内存实际容量对命中率的影响;

(2)实现LRU 算法 (Least Recently) 、 FIFO 算法 (First IN First Out)的模拟

【实验原理】

分页存储管理将一个进程的逻辑地址空间分成若干大小相等的片,称为页面或页。
在进程运行过程中,若其所要访问的页面不在内存而需把它们调入内存,但内存已无空闲空间时,为了保证该进程能正常运行,系统必须从内存中调出一页程序或数据,送入磁盘的对换区中。但应将哪 个页面调出,须根据一定的算法来确定。通常,把选择换出页面的算法称为页面置换算法(Page_Replacement Algorithms)。

一个好的页面置换算法,应具有较低的页面更换频率。从理论上讲,应将那些以后不再会访问的页面换出,或将那些在较长时间内不会再访问的页面调出。

一、最佳置换算法OPT(Optimal)
它是由Belady于1966年提出的一种理论上的算法。其所选择的被淘汰页面,将是以后永不使用的或许是在最长(未来)时间内不再被访问的页面。采用最佳置换算法,通常可保证获得最低的缺页率。但由于人目前还无法预知一个进程在内存的若干个页面中,哪一个页面是未来最长时间内不再被访问的,因而该算法是无法实现的,便可以利用此算法来评价其它算法。

二、先进先出(FIFO)页面置换算法
这是最早出现的置换算法。该算法总是淘汰最先进入内存的页面,即选择在内存中驻留时间最久的页面予以淘汰。该算法实现简单只需把一个进程已调入内存的页面,按先后次序链接成一个队列,并设置一个指针,称为替换指针,使它总是指向最老的页面。

三、LRU(Least Recently Used)最近最久未使用置换算法
FIFO置换算法性能之所以较差,是因为它所依据的条件是各个页面调入内存的时间,而页面调入的先后并不能反映页面的使用情况。最近最久未使用(LRU)置换算法,是根据页面调入内存后的使用情况进行决策的。由于无法预测各页面将来的使用情况,只能利用“最近的过去”作为“最近的将来”的近似,因此,LRU置换算法是选择最近最久未使用的页面予以淘汰。该算法赋予每个页面一个访问字段,用来记录一个页面自上次被访问以来所经历的时间t,,当须淘汰一个页面时,选择现有页面中其t值最大的,即最近最久未使用的页面予以淘汰。

Solution

为了更好地处理数据,本例程封装了一个架构,对LRU、FIFO调度算法、内存等对象进行了对象化封装,详细架构如下图所示:

其中,PageList相当于一个页表,本例程中并没有引入多个页表进行调度配置,因为如果引入多个页表,则相当入还要引入PCB的进程调度,这不是本例程的重点,因此,本例程只采用一个页表进行调度,页表事实上就是一个PageList,里面封装的为单个page对象,如下所示。

StorageDispatcherAdapter为内存调度适配器,而LRUDispatcher和FIFODispatcher都是StorageDispatcherAdapter适配器的实现,通过实现LRUDispatcher或者FIFODispatcher算法,可以实现调度算法的无缝切换。

在分页存储管理中,因为引入了置换算法,所以在内存满了之后会存在某一些页面会被调出至外存中,因此在这里的内存模型本人参考了王道考研网课的一张PPT演示图,如下所示。


本例程将存储空间分为内存(本例程中为RAM)和磁盘空间,磁盘空间分为swap和文件区。当某一个页面需要调入内存使用时,先从swap和file中查找是否存在该页面,如果存在该页面,则使用页面置换算法将该页面置换进内存中,如果在swap和file中没有找到,则新建该分页存入内存。

因为本例程是以页面为基础的存储,因此RAM、DiskFile、DiskSwap都继承了StorageMedium类,继承了StorageMedium类之后,页面将以页面为最小单位进行存储介质间的置换和调度。

RAM和Disk都归StorageManger管理,最后由StorageDispatcherAdapter对StorageManger中的存储介质进行调度,StorageDispatcherAdapter中会记录系统运行过程中的调度情况。

最后就是实现LRU和FIFO的调度,StorageDispatcherAdapter中使用put()函数将页面传入StorageMangement管理的存储介质中。

LRU算法实现如下:

class LRUDispatcher(StorageDispatcherAdapter):
    """ LRUDispatcher is a dispatch util which can help Memory dispatch page and record dispatch process.  """

    def __init__(self):
        """ initialize some dispatch process info """
        self.page_queue: deque[int] = deque()
        self.missing_page_num = 0
        self.enter_ram_record: List[int] = []

    def put(self, storage_manager: StorageManager, page: Page):
        """ dispatch StorageManager's storage """
        storage_manager.print_ram()
        logging.debug("[page_queue] {0}".format(str(self.page_queue)))

        # judge page whether in ram
        if storage_manager.page_is_in_ram(page):
            logging.debug('page {0} in ram'.format(page.page_id))

            # fresh page_queue
            self.page_queue.remove(page.page_id)
            self.page_queue.appendleft(page.page_id)

        # not in ram
        else:
            logging.debug('page {0} not in ram'.format(page.page_id))

            # read from disk (swap and file), swap first
            if storage_manager.page_is_in_disk_swap(page):
                storage_manager.disk.swap.storage.remove(page.page_id)
            elif storage_manager.page_is_in_disk_file(page):
                storage_manager.disk.file.storage.remove(page.page_id)
            else:
                # first dispatch, then distribute space in ram
                # ram is full then eliminate the last page
                if storage_manager.ram_is_full(self.page_queue):
                    obsolete_page_id = self.page_queue.pop()
                    block_id = storage_manager.ram.storage.index(obsolete_page_id)
                    storage_manager.ram.storage[block_id] = page.page_id
                    self.page_queue.appendleft(page.page_id)

                    # store in swap
                    if len(storage_manager.disk.swap.storage) < storage_manager.disk.swap.max_storage_space:
                        storage_manager.disk.swap.storage.append(obsolete_page_id)
                    # if swap is full then store in file of disk
                    else:
                        storage_manager.disk.file.storage.append(obsolete_page_id)
                    self._record_enter(page)
                else:
                    # find the first page_id = -1 and store
                    for index, cur_page_id in enumerate(storage_manager.ram.storage):
                        if cur_page_id == -1:
                            storage_manager.ram.storage[index] = page.page_id
                            self.page_queue.appendleft(page.page_id)
                            self._record_enter(page)
                            return

    def _record_enter(self, page: Page):
        self.missing_page_num += 1
        self.enter_ram_record.append(page.page_id)

FIFO算法实现如下:

class FIFODispatcher(StorageDispatcherAdapter):
    """ FIFODispatcher is a dispatch util which can help Memory dispatch page and record dispatch process."""

    def __init__(self):
        """ initialize some dispatch process info """
        self.page_queue: deque[int] = deque()
        self.missing_page_num = 0
        self.enter_ram_record: List[int] = []

    def put(self, storage_manager: StorageManager, page: Page):
        """ dispatch StorageManager's storage """
        storage_manager.print_ram()
        logging.debug("[page_queue] {0}".format(str(self.page_queue)))

        # judge page whether in ram
        if storage_manager.page_is_in_ram(page):
            logging.debug('page {0} in ram'.format(page.page_id))

        # not in ram
        else:
            logging.debug('page {0} not in ram'.format(page.page_id))

            # read from disk (swap and file), swap first
            if storage_manager.page_is_in_disk_swap(page):
                storage_manager.disk.swap.storage.remove(page.page_id)
            elif storage_manager.page_is_in_disk_file(page):
                storage_manager.disk.file.storage.remove(page.page_id)
            else:
                # first dispatch, then distribute space in ram
                # ram is full then eliminate the last page
                if storage_manager.ram_is_full(self.page_queue):
                    obsolete_page_id = self.page_queue.pop()
                    block_id = storage_manager.ram.storage.index(obsolete_page_id)
                    storage_manager.ram.storage[block_id] = page.page_id
                    self.page_queue.appendleft(page.page_id)

                    # store in swap
                    if len(storage_manager.disk.swap.storage) < storage_manager.disk.swap.max_storage_space:
                        storage_manager.disk.swap.storage.append(obsolete_page_id)
                    # if swap is full then store in file of disk
                    else:
                        storage_manager.disk.file.storage.append(obsolete_page_id)
                    self._record_enter(page)
                else:
                    # find the first page_id = -1 and store
                    for index, cur_page_id in enumerate(storage_manager.ram.storage):
                        if cur_page_id == -1:
                            storage_manager.ram.storage[index] = page.page_id
                            self.page_queue.appendleft(page.page_id)
                            self._record_enter(page)
                            return

    def _record_enter(self, page: Page):
        self.missing_page_num += 1
        self.enter_ram_record.append(page.page_id)

最后随机构建一些数据集,分别模拟测试LRU和FIFO的,结果如下所示。

图为FIFO调度的测试模拟

图为LRU调度的测试模拟

事实上,在小范围的数据集测试,使用控制变量法,分别控制RAM大小、页表的长度、页面类型的数量测试了很多次之后,如果以缺页率作为评价指标,本人并没有得出明显的差别。本人推测可能是因为测试数据的问题,测试的数据集中页面id使用随机数生成,但是在实际的存储调度中,可能存在某些页面出现的次数特别多,如一次存储调度任务中50%都是在调度page_id=3的页面,这个时候LRU的优势才会逐渐显现出来,而基于随机数的测试集出现某些页面次数特别多的情况比较小,无法很好地测试出LRU的优势。

最终代码如下,不包含详细的数据集测试:

# -*- coding: utf-8 -*-
# @Time    : 2022/12/1 18:06
# @Author  : Zeeland
# @File    : LRU_and_FIFO.py
# @Software: PyCharm
import random
from typing import List
from collections import deque
import logging
import abc

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s')

class Page:
    def __init__(self, page_id=None, time=None):
        self.page_id = page_id      # page id
        self.enter_time = time      # the time stamp of calling into memory

    def __str__(self):
        return "[Page] page_id:{0}  time:{1}".format(self.page_id, self.enter_time)


class StorageMedium:
    __meta_class__ = abc.ABCMeta

    def __init__(self):
        self.max_storage_space = None
        self.storage: List[int] = []


class RAM(StorageMedium):
    """ RAM can storage memory block and every block can be dispatched by dispatcher. """

    def __init__(self, max_storage_space=4):
        super().__init__()
        self.max_storage_space = max_storage_space
        # store memory block
        self.storage: List[int] = [-1 for _ in range(self.max_storage_space)]


class DiskFile(StorageMedium):
    def __init__(self, max_storage_space=100):
        super().__init__()
        self.max_storage_space = max_storage_space
        # store memory block
        self.storage: List[int] = [-1 for _ in range(self.max_storage_space)]


class DiskSwap(StorageMedium):
    def __init__(self, max_storage_space=20):
        super().__init__()
        self.max_storage_space = max_storage_space
        # store memory block
        self.storage: List[int] = [-1 for _ in range(self.max_storage_space)]


class Disk:
    def __init__(self):
        self.swap = DiskSwap()
        self.file = DiskFile()


class StorageManager:
    def __init__(self, ram_max_size=4):
        self.ram = None
        self.disk = None
        self.fresh_memory(ram_max_size)

    def fresh_memory(self, ram_max_size=4):
        self.ram = RAM(max_storage_space=ram_max_size)
        self.disk = Disk()

    def page_is_in_ram(self, page: Page) -> bool:
        """ return whether the page is in ram """
        try:
            self.ram.storage.index(page.page_id)
            return True
        except ValueError as e:
            return False

    def page_is_in_disk(self, page: Page) -> bool:
        if self.page_is_in_disk_file(page) or self.page_is_in_disk_swap(page):
            return True
        else:
            return False

    def page_is_in_disk_swap(self, page: Page) -> bool:
        try:
            self.disk.swap.storage.index(page.page_id)
            return True
        except ValueError as e:
            return False

    def page_is_in_disk_file(self, page: Page) -> bool:
        try:
            self.disk.file.storage.index(page.page_id)
            return True
        except ValueError as e:
            return False

    def ram_is_full(self, page_dispatch_queue: deque = None) -> bool:
        if not page_dispatch_queue:
            for item in self.ram.storage:
                if item == -1:
                    return False
            return True
        else:
            if len(page_dispatch_queue) == self.ram.max_storage_space:
                return True
            else:
                return False

    def print_ram(self):
        logging.debug('[ram] ' + str(self.ram.storage))


class StorageDispatcherAdapter:
    __meta_class__ = abc.ABCMeta

    def put(self, *args, **kwargs):
        pass


class FIFODispatcher(StorageDispatcherAdapter):
    """ FIFODispatcher is a dispatch util which can help Memory dispatch page and record dispatch process."""

    def __init__(self):
        """ initialize some dispatch process info """
        self.page_queue: deque[int] = deque()
        self.missing_page_num = 0
        self.enter_ram_record: List[int] = []

    def put(self, storage_manager: StorageManager, page: Page):
        """ dispatch StorageManager's storage """
        storage_manager.print_ram()
        logging.debug("[page_queue] {0}".format(str(self.page_queue)))

        # judge page whether in ram
        if storage_manager.page_is_in_ram(page):
            logging.debug('page {0} in ram'.format(page.page_id))

        # not in ram
        else:
            logging.debug('page {0} not in ram'.format(page.page_id))

            # read from disk (swap and file), swap first
            if storage_manager.page_is_in_disk_swap(page):
                storage_manager.disk.swap.storage.remove(page.page_id)
                self._pop_and_store(storage_manager, page)
            elif storage_manager.page_is_in_disk_file(page):
                storage_manager.disk.file.storage.remove(page.page_id)
                self._pop_and_store(storage_manager, page)
            else:
                # first dispatch, then distribute space in ram
                # ram is full then eliminate the last page
                if storage_manager.ram_is_full(self.page_queue):
                    self._pop_and_store(storage_manager, page)
                else:
                    # find the first page_id = -1 and store
                    for index, cur_page_id in enumerate(storage_manager.ram.storage):
                        if cur_page_id == -1:
                            storage_manager.ram.storage[index] = page.page_id
                            self.page_queue.appendleft(page.page_id)
                            self._record_enter(page)
                            return

    def _pop_and_store(self, storage_manager: StorageManager, page: Page):
        obsolete_page_id = self.page_queue.pop()
        block_id = storage_manager.ram.storage.index(obsolete_page_id)
        storage_manager.ram.storage[block_id] = page.page_id
        self.page_queue.appendleft(page.page_id)

        # store in swap
        if len(storage_manager.disk.swap.storage) < storage_manager.disk.swap.max_storage_space:
            storage_manager.disk.swap.storage.append(obsolete_page_id)
        # if swap is full then store in file of disk
        else:
            storage_manager.disk.file.storage.append(obsolete_page_id)
        self._record_enter(page)

    def _record_enter(self, page: Page):
        self.missing_page_num += 1
        self.enter_ram_record.append(page.page_id)


class LRUDispatcher(StorageDispatcherAdapter):
    """ LRUDispatcher is a dispatch util which can help Memory dispatch page and record dispatch process.  """

    def __init__(self):
        """ initialize some dispatch process info """
        self.page_queue: deque[int] = deque()
        self.missing_page_num = 0
        self.enter_ram_record: List[int] = []

    def put(self, storage_manager: StorageManager, page: Page):
        """ dispatch StorageManager's storage """
        storage_manager.print_ram()
        logging.debug("[page_queue] {0}".format(str(self.page_queue)))

        # judge page whether in ram
        if storage_manager.page_is_in_ram(page):
            logging.debug('page {0} in ram'.format(page.page_id))

            # fresh page_queue
            self.page_queue.remove(page.page_id)
            self.page_queue.appendleft(page.page_id)

        # not in ram
        else:
            logging.debug('page {0} not in ram'.format(page.page_id))

            # read from disk (swap and file), swap first
            if storage_manager.page_is_in_disk_swap(page):
                storage_manager.disk.swap.storage.remove(page.page_id)
                self._pop_and_store(storage_manager, page)
            elif storage_manager.page_is_in_disk_file(page):
                storage_manager.disk.file.storage.remove(page.page_id)
                self._pop_and_store(storage_manager, page)
            else:
                # first dispatch, then distribute space in ram
                # ram is full then eliminate the last page
                if storage_manager.ram_is_full(self.page_queue):
                    self._pop_and_store(storage_manager, page)
                else:
                    # find the first page_id = -1 and store
                    for index, cur_page_id in enumerate(storage_manager.ram.storage):
                        if cur_page_id == -1:
                            storage_manager.ram.storage[index] = page.page_id
                            self.page_queue.appendleft(page.page_id)
                            self._record_enter(page)
                            return

    def _pop_and_store(self, storage_manager: StorageManager, page: Page):
        obsolete_page_id = self.page_queue.pop()
        block_id = storage_manager.ram.storage.index(obsolete_page_id)
        storage_manager.ram.storage[block_id] = page.page_id
        self.page_queue.appendleft(page.page_id)

        # store in swap
        if len(storage_manager.disk.swap.storage) < storage_manager.disk.swap.max_storage_space:
            storage_manager.disk.swap.storage.append(obsolete_page_id)
        # if swap is full then store in file of disk
        else:
            storage_manager.disk.file.storage.append(obsolete_page_id)
        self._record_enter(page)

    def _record_enter(self, page: Page):
        self.missing_page_num += 1
        self.enter_ram_record.append(page.page_id)


class Application:
    def __init__(self):
        self.page_list = self._page_list_init()
        self._print_page_list()
        self.storage_manager = StorageManager()
        self.storage_dispatcher = None

    def run(self, method='LRU', page_list: List[Page] = None):
        if page_list is not None:
            self.page_list = page_list
        self.storage_manager.fresh_memory(ram_max_size=4)
        if method == 'LRU':
            self.storage_dispatcher = LRUDispatcher()
        elif method == 'FIFO':
            self.storage_dispatcher = FIFODispatcher()
        else:
            raise Exception("unknown dispatch method")

        # dispatch until end
        for i in range(len(self.page_list)):
            self.storage_dispatcher.put(self.storage_manager, self.page_list[i])
        logging.debug("[missing_page_num] {0}".format(self.storage_dispatcher.missing_page_num))
        logging.debug("[missing_page_num] {0}/{1}".format(self.storage_dispatcher.missing_page_num, len(self.page_list)))
        return self.storage_dispatcher.missing_page_num / len(self.page_list)

    def _page_list_init(self, page_len=17):
        # temp_list = [2, 0, 5, 8, 2, 1, 3, 1, 6, 6, 5, 8, 2, 6, 1, 0, 6]
        # temp_list = [1, 0, 1, 0, 2, 4, 1, 0, 0, 8, 7, 5, 4, 3, 2, 3, 4]
        page_list: List[Page] = [Page(page_id=random.randint(0, 8), time=i + 1) for i in range(page_len)]
        # page_list: List[Page] = [Page(page_id=temp_list[i], time=i + 1) for i in range(page_len)]
        return page_list

    def _print_page_list(self):
        logging.debug("[page_id] " + str(list(map(lambda item: item.page_id, self.page_list))))

if __name__ == '__main__':
    app = Application()
    app.run(method='FIFO')
    app.run(method='LRU')

References

2

评论区