Source code for histopath_bim_des.process.p20_cutup

"""Cut-up processes.

For simplicity, specialities are ignored and all
four cut-up rooms are combined into a single unit
with pooled resources.
"""

from typing import TYPE_CHECKING, Literal

from ..specimens import Block, Priority, Specimen
from .core import BatchingProcess, DeliveryProcess, RunnerDurations, register_process

if TYPE_CHECKING:
    from ..model import Model


[docs] def register_processes(env: 'Model') -> None: """Register processes to the simulation environment.""" register_process(env, Specimen, cutup_start) register_process(env, Specimen, cutup_bms) register_process(env, Specimen, cutup_pool) register_process(env, Specimen, cutup_large) runner_durations=RunnerDurations( env.runner_times.extra_loading, env.runner_times.cutup_processing, env.runner_times.extra_unloading, env.runner_times.cutup_processing # FUTURE: different outbound and return times? ) # BMS cut-up env.processes['batcher.cutup_bms_to_processing'] = BatchingProcess[Specimen]( 'batcher.cutup_bms_to_processing', batch_size=env.batch_sizes.deliver_cut_up_to_processing, out_process='cutup_bms_to_processing', env=env ) env.processes['cutup_bms_to_processing'] = DeliveryProcess( 'cutup_bms_to_processing', runner=env.resources.bms, durations=runner_durations, out_process='processing_start', env=env ) # Pool cut-up env.processes['batcher.cutup_pool_to_processing'] = BatchingProcess[Specimen]( 'batcher.cutup_pool_to_processing', batch_size=env.batch_sizes.deliver_cut_up_to_processing, out_process='cutup_pool_to_processing', env=env ) env.processes['cutup_pool_to_processing'] = DeliveryProcess( 'cutup_pool_to_processing', runner=env.resources.cut_up_assistant, durations=runner_durations, out_process='processing_start', env=env ) # Large specimens cut-up env.processes['batcher.cutup_large_to_processing'] = BatchingProcess[Specimen]( 'batcher.cutup_large_to_processing', batch_size=env.batch_sizes.deliver_cut_up_to_processing, out_process='cutup_large_to_processing', env=env ) env.processes['cutup_large_to_processing'] = DeliveryProcess( 'cutup_large_to_processing', runner=env.resources.cut_up_assistant, durations=runner_durations, out_process='processing_start', env=env )
[docs] def cutup_start(self: Specimen) -> None: """Take specimens arriving at cut-up and sort to the correct cut-up queue.""" self.env.wips.in_cut_up.value += 1 self.timestamp('cutup_start') suffix = '_urgent' if self.prio == Priority.URGENT else '' if (r := self.env.u01()) < getattr(self.env.globals, f'prob_bms_cutup{suffix}'): cutup_type, next_process = 'BMS', 'cutup_bms' elif r < getattr(self.env.globals, f'prob_bms_cutup{suffix}')\ + getattr(self.env.globals, f'prob_pool_cutup{suffix}'): cutup_type, next_process = 'Pool', 'cutup_pool' else: cutup_type, next_process = 'Large specimens', 'cutup_large' self.env.specimen_data[self.name()]['cutup_type'] = cutup_type self.enter_sorted(self.env.processes[next_process].in_queue, self.prio)
[docs] def cutup_generic(self: Specimen, cutup_type: Literal['bms', 'pool', 'large']) -> None: """Generic process function for specimen cut-up. Args: self (Specimen): The specimen to cut up. cutup_type (Literal['bms', 'pool', 'large']): The type of cut-up task. """ resource = ( self.env.resources.bms if cutup_type == 'bms' else self.env.resources.cut_up_assistant ) duration = ( self.env.task_durations.cut_up_bms if cutup_type == 'bms' else self.env.task_durations.cut_up_pool if cutup_type == 'pool' else self.env.task_durations.cut_up_large_specimens ) r = self.env.u01() block_type = ( 'small surgical' if cutup_type == 'bms' else 'large surgical' if cutup_type == 'pool' else 'large surgical' if (self.prio == Priority.URGENT or r < self.env.globals.prob_mega_blocks) else 'mega' ) n_blocks = ( ( self.env.globals.num_blocks_mega() if block_type == 'mega' else self.env.globals.num_blocks_large_surgical() ) if cutup_type == 'large' else 1 ) # Generate blocks self.request((resource, 1, self.prio)) self.hold(duration) for _ in range(n_blocks): self.blocks.append( Block( f'{self.name()}.', env=self.env, parent=self, block_type=block_type ) ) self.env.specimen_data[self.name()]['num_blocks'] = n_blocks self.release() # Cut-up complete self.env.wips.in_cut_up.value -= 1 self.timestamp('cutup_end') # Delivery if self.prio == Priority.URGENT: self.enter_sorted( self.env.processes[f'cutup_{cutup_type}_to_processing'].in_queue, Priority.URGENT) else: self.enter(self.env.processes[f'batcher.cutup_{cutup_type}_to_processing'].in_queue)
[docs] def cutup_bms(self: Specimen) -> None: """BMS cut-up. Always produces 1 small surgical block.""" cutup_generic(self, 'bms')
[docs] def cutup_pool(self: Specimen) -> None: """Pool cut-up. Always produces 1 large surgical block.""" cutup_generic(self, 'pool')
[docs] def cutup_large(self: Specimen) -> None: """BMS cut-up. Produces a random number of large surgical blocks.""" cutup_generic(self, 'large')