Source code for histopath_bim_des.process.p30_processing

"""Tissue processing processes.

Todo list :
    - TODO Fix machine start times
    - TODO Weekends as special cases
    - TODO Reduced batch size for urgents (update Model and Config classes)
    - TODO Enforce no splitting of specimen blocks across batches
"""

from typing import TYPE_CHECKING

from ..specimens import Batch, Block, Priority, Specimen
from .core import (BatchingProcess, CollationProcess, DeliveryProcess,
               RunnerDurations, register_process)

if TYPE_CHECKING:
    from ..model import Model


[docs] def register_processes(env: 'Model') -> None: """Register processes to the simulation environment.""" register_process(env, Specimen, processing_start) register_process(env, Batch[Block], decalc_bone_station) register_process(env, Block, decalc_oven) register_process(env, Block, processing_assign_queue) register_process(env, Batch[Block], processing_urgents) register_process(env, Batch[Block], processing_smalls) register_process(env, Batch[Block], processing_larges) register_process(env, Batch[Block], processing_megas) register_process(env, Block, embed_and_trim) register_process(env, Specimen, post_processing) # Bone station and processing machine batches for out, batch_size in zip( [ 'decalc_bone_station', 'processing_urgents', 'processing_smalls', 'processing_larges', 'processing_megas' ], [ env.batch_sizes.bone_station, env.batch_sizes.processing_regular, env.batch_sizes.processing_regular, env.batch_sizes.processing_regular, env.batch_sizes.processing_megas ] ): env.processes[f'batcher.{out}'] = BatchingProcess[Block]( f'batcher.{out}', batch_size=batch_size, out_process=out, env=env ) # Collation env.processes['collate.processing'] = CollationProcess( 'collate.processing', counter_name='num_blocks', out_process='post_processing', env=env ) # Delivery env.processes['batcher.processing_to_microtomy'] = BatchingProcess[Specimen]( 'batcher.processing_to_microtomy', batch_size=env.batch_sizes.deliver_processing_to_microtomy, out_process='processing_to_microtomy', env=env ) env.processes['processing_to_microtomy'] = DeliveryProcess( 'processing_to_microtomy', runner=env.resources.processing_room_staff, durations=RunnerDurations( env.runner_times.extra_loading, env.runner_times.processing_microtomy, env.runner_times.extra_unloading, env.runner_times.processing_microtomy # FUTURE: different outbound and return times? ), out_process='microtomy', env=env )
[docs] def processing_start(self: Specimen) -> None: """Take specimens arriving a processing and send to decalc if necessary. Else, send to queue assignment.""" self.env.wips.in_processing.value += 1 self.timestamp('processing_start') r = self.env.u01() if r < self.env.globals.prob_decalc_bone: self.env.specimen_data[self.name()]['decalc_type'] = 'bone station' out_queue = self.env.processes['batcher.decalc_bone_station'].in_queue elif r < self.env.globals.prob_decalc_bone + self.env.globals.prob_decalc_oven: self.env.specimen_data[self.name()]['decalc_type'] = 'decalc oven' out_queue = self.env.processes['decalc_oven'].in_queue else: out_queue = self.env.processes['processing_assign_queue'].in_queue for block in self.blocks: block.enter_sorted(out_queue, self.prio)
[docs] def decalc_bone_station(self: Batch[Block]) -> None: """Decalc a batch of blocks in a bone station.""" self.request(self.env.resources.bone_station) # Load self.request(self.env.resources.bms) self.hold(self.env.task_durations.load_bone_station) self.release(self.env.resources.bms) self.hold(self.env.task_durations.decalc) # Unload self.request(self.env.resources.bms) self.hold(self.env.task_durations.unload_bone_station) self.release(self.env.resources.bms) self.release() # Unbatch and forward to next queue for block in self.items: block.enter_sorted(self.env.processes['processing_assign_queue'].in_queue, block.prio)
[docs] def decalc_oven(self: Block) -> None: """Decalc a single block in an oven. Oven capacity is not modelled.""" # Load self.request(self.env.resources.bms) self.hold(self.env.task_durations.load_into_decalc_oven) self.release() self.hold(self.env.task_durations.decalc) # Unload self.request(self.env.resources.bms) self.hold(self.env.task_durations.unload_from_decalc_oven) self.release() self.enter_sorted(self.env.processes['processing_assign_queue'].in_queue, self.prio)
[docs] def processing_assign_queue(self: Block) -> None: """Assign incoming blocks to the correct BatchingProcess, according to type.""" if self.prio == Priority.URGENT: out_queue = self.env.processes['batcher.processing_urgents'].in_queue elif self.data['block_type'] == 'small surgical': out_queue = self.env.processes['batcher.processing_smalls'].in_queue elif self.data['block_type'] == 'large surgical': out_queue = self.env.processes['batcher.processing_larges'].in_queue else: # 'mega' out_queue = self.env.processes['batcher.processing_megas'].in_queue self.enter_sorted(out_queue, self.prio)
[docs] def processing_urgents(self: Batch[Block]) -> None: """Processing machine program for urgent batches.""" # LOAD self.request( (self.env.resources.processing_room_staff, 1, Priority.URGENT), (self.env.resources.processing_machine, 1, Priority.URGENT) ) self.hold(self.env.task_durations.load_processing_machine) self.release(self.env.resources.processing_room_staff) # PROCESSING self.hold(self.env.task_durations.processing_urgent) # UNLOAD self.request((self.env.resources.processing_room_staff, 1, Priority.URGENT)) self.hold(self.env.task_durations.unload_processing_machine) self.release() # all # Unbatch and forward to next process for block in self.items: block.enter_sorted(self.env.processes['embed_and_trim'].in_queue, block.prio)
[docs] def processing_generic(self: Batch[Block], duration=float) -> None: """Generic processing machine process for non-urgent batches.""" # LOAD self.request( self.env.resources.processing_room_staff, self.env.resources.processing_machine ) self.hold(self.env.task_durations.load_processing_machine) self.release(self.env.resources.processing_room_staff) # PROCESSING self.hold(duration) # UNLOAD self.request(self.env.resources.processing_room_staff) self.hold(self.env.task_durations.unload_processing_machine) self.release() # all # Unbatch and forward to next process for block in self.items: block.enter_sorted(self.env.processes['embed_and_trim'].in_queue, block.prio)
[docs] def processing_smalls(self: Batch[Block]) -> None: """Processing machine program for small surgical blocks.""" processing_generic(self, self.env.task_durations.processing_small_surgicals)
[docs] def processing_larges(self: Batch[Block]) -> None: """Processing machine program for large surgical blocks.""" processing_generic(self, self.env.task_durations.processing_large_surgicals)
[docs] def processing_megas(self: Batch[Block]) -> None: """Processing machine program for mega blocks.""" processing_generic(self, self.env.task_durations.processing_megas)
[docs] def embed_and_trim(self: Block) -> None: """Embed a block in wax and trim the excess.""" # EMBED self.request(self.env.resources.processing_room_staff) self.hold(self.env.task_durations.embedding) self.release() # COOLDOWN (no resources tracked) self.hold(self.env.task_durations.embedding_cooldown) # TRIM self.request(self.env.resources.processing_room_staff) self.hold(self.env.task_durations.block_trimming) self.release() self.enter_sorted(self.env.processes["collate.processing"].in_queue, self.prio)
[docs] def post_processing(self: Specimen) -> None: """Post-processing tasks.""" self.env.wips.in_processing.value -= 1 self.timestamp('processing_end') if self.prio == Priority.URGENT: self.enter_sorted(self.env.processes['processing_to_microtomy'].in_queue, Priority.URGENT) else: self.enter(self.env.processes['batcher.processing_to_microtomy'].in_queue)