Skip to main content

dfir_rs/compiled/pull/
lattice_bimorphism.rs

1use std::cell::RefCell;
2use std::pin::Pin;
3
4use dfir_pipes::{Context, FusedPull, Pull, Step, Toggle};
5use lattices::{LatticeBimorphism, Merge};
6use pin_project_lite::pin_project;
7
8pin_project! {
9    /// Pull combinator for lattice bimorphism operations.
10    #[must_use = "pull do nothing unless polled"]
11    pub struct LatticeBimorphismPull<'a, Func, LhsPrev, RhsPrev, LhsState, RhsState, Output> {
12        #[pin]
13        lhs_prev: LhsPrev,
14        #[pin]
15        rhs_prev: RhsPrev,
16
17        func: Func,
18
19        lhs_state: &'a RefCell<LhsState>,
20        rhs_state: &'a RefCell<RhsState>,
21
22        output: Option<Output>,
23    }
24}
25
26impl<'a, Func, LhsPrev, RhsPrev, LhsState, RhsState, Output>
27    LatticeBimorphismPull<'a, Func, LhsPrev, RhsPrev, LhsState, RhsState, Output>
28where
29    Func: 'a
30        + LatticeBimorphism<LhsState, RhsPrev::Item, Output = Output>
31        + LatticeBimorphism<LhsPrev::Item, RhsState, Output = Output>,
32    LhsPrev: 'a + FusedPull,
33    RhsPrev: 'a + FusedPull,
34    LhsState: 'static + Clone,
35    RhsState: 'static + Clone,
36    Output: Merge<Output>,
37{
38    /// Creates a new `LatticeBimorphismPull`.
39    pub fn new(
40        lhs_prev: LhsPrev,
41        rhs_prev: RhsPrev,
42        func: Func,
43        lhs_state: &'a RefCell<LhsState>,
44        rhs_state: &'a RefCell<RhsState>,
45    ) -> Self {
46        Self {
47            lhs_prev,
48            rhs_prev,
49            func,
50            lhs_state,
51            rhs_state,
52            output: None,
53        }
54    }
55}
56
57impl<'a, Func, LhsPrev, RhsPrev, LhsState, RhsState, Output> Pull
58    for LatticeBimorphismPull<'a, Func, LhsPrev, RhsPrev, LhsState, RhsState, Output>
59where
60    Func: 'a
61        + LatticeBimorphism<LhsState, RhsPrev::Item, Output = Output>
62        + LatticeBimorphism<LhsPrev::Item, RhsState, Output = Output>,
63    LhsPrev: 'a + FusedPull,
64    RhsPrev: 'a + FusedPull,
65    LhsState: 'static + Clone,
66    RhsState: 'static + Clone,
67    Output: Merge<Output>,
68{
69    type Ctx<'ctx> = <LhsPrev::Ctx<'ctx> as Context<'ctx>>::Merged<RhsPrev::Ctx<'ctx>>;
70
71    type Item = Output;
72    type Meta = ();
73    type CanPend = <LhsPrev::CanPend as Toggle>::Or<RhsPrev::CanPend>;
74    type CanEnd = <LhsPrev::CanEnd as Toggle>::And<RhsPrev::CanEnd>;
75
76    fn pull(
77        self: Pin<&mut Self>,
78        ctx: &mut Self::Ctx<'_>,
79    ) -> Step<Self::Item, Self::Meta, Self::CanPend, Self::CanEnd> {
80        let mut this = self.project();
81
82        // Both streams may continue to be polled EOS (`None`) on subsequent loops or calls, so they must be fused.
83        loop {
84            let mut progress = false;
85
86            let lhs_step = this
87                .lhs_prev
88                .as_mut()
89                .pull(<LhsPrev::Ctx<'_> as Context<'_>>::unmerge_self(ctx));
90            let lhs_pending = matches!(lhs_step, Step::Pending(_));
91            if let Step::Ready(lhs_item, _meta) = lhs_step {
92                progress = true;
93                let delta = this.func.call(lhs_item, this.rhs_state.borrow().clone());
94                if let Some(output) = this.output.as_mut() {
95                    output.merge(delta);
96                } else {
97                    *this.output = Some(delta);
98                }
99            }
100
101            let rhs_step = this
102                .rhs_prev
103                .as_mut()
104                .pull(<LhsPrev::Ctx<'_> as Context<'_>>::unmerge_other(ctx));
105            let rhs_pending = matches!(rhs_step, Step::Pending(_));
106            if let Step::Ready(rhs_item, _meta) = rhs_step {
107                progress = true;
108                let delta = this.func.call(this.lhs_state.borrow().clone(), rhs_item);
109                if let Some(output) = this.output.as_mut() {
110                    output.merge(delta);
111                } else {
112                    *this.output = Some(delta);
113                }
114            }
115
116            if lhs_pending && rhs_pending {
117                return Step::pending();
118            }
119
120            // Exit EOS condition.
121            if !progress {
122                // Never spin, always exit if no progress has been made.
123                return if lhs_pending || rhs_pending {
124                    Step::pending()
125                } else {
126                    // EXIT: Release output once, then EOS.
127                    if let Some(output) = this.output.take() {
128                        Step::Ready(output, ())
129                    } else {
130                        Step::ended()
131                    }
132                };
133            }
134        }
135    }
136}