use crate::iter::plumbing::*;
use crate::iter::*;
use std::marker::PhantomData;
use std::{fmt, mem};
trait ChunkBySlice<T>: AsRef<[T]> + Default + Send {
    fn split(self, index: usize) -> (Self, Self);
    fn find(&self, pred: &impl Fn(&T, &T) -> bool, start: usize, end: usize) -> Option<usize> {
        self.as_ref()[start..end]
            .windows(2)
            .position(move |w| !pred(&w[0], &w[1]))
            .map(|i| i + 1)
    }
    fn rfind(&self, pred: &impl Fn(&T, &T) -> bool, end: usize) -> Option<usize> {
        self.as_ref()[..end]
            .windows(2)
            .rposition(move |w| !pred(&w[0], &w[1]))
            .map(|i| i + 1)
    }
}
impl<T: Sync> ChunkBySlice<T> for &[T] {
    fn split(self, index: usize) -> (Self, Self) {
        self.split_at(index)
    }
}
impl<T: Send> ChunkBySlice<T> for &mut [T] {
    fn split(self, index: usize) -> (Self, Self) {
        self.split_at_mut(index)
    }
}
struct ChunkByProducer<'p, T, Slice, Pred> {
    slice: Slice,
    pred: &'p Pred,
    tail: usize,
    marker: PhantomData<fn(&T)>,
}
impl<T, Slice, Pred> UnindexedProducer for ChunkByProducer<'_, T, Slice, Pred>
where
    Slice: ChunkBySlice<T>,
    Pred: Fn(&T, &T) -> bool + Send + Sync,
{
    type Item = Slice;
    fn split(self) -> (Self, Option<Self>) {
        if self.tail < 2 {
            return (Self { tail: 0, ..self }, None);
        }
        let mid = self.tail / 2;
        let index = match self.slice.find(self.pred, mid, self.tail) {
            Some(i) => Some(mid + i),
            None => self.slice.rfind(self.pred, mid + 1),
        };
        if let Some(index) = index {
            let (left, right) = self.slice.split(index);
            let (left_tail, right_tail) = if index <= mid {
                (index, 0)
            } else {
                (mid + 1, self.tail - index)
            };
            let left = Self {
                slice: left,
                tail: left_tail,
                ..self
            };
            let right = Self {
                slice: right,
                tail: right_tail,
                ..self
            };
            (left, Some(right))
        } else {
            (Self { tail: 0, ..self }, None)
        }
    }
    fn fold_with<F>(self, mut folder: F) -> F
    where
        F: Folder<Self::Item>,
    {
        let Self {
            slice, pred, tail, ..
        } = self;
        let (slice, tail) = if tail == slice.as_ref().len() {
            (Some(slice), None)
        } else if let Some(index) = slice.rfind(pred, tail) {
            let (left, right) = slice.split(index);
            (Some(left), Some(right))
        } else {
            (None, Some(slice))
        };
        if let Some(mut slice) = slice {
            folder = folder.consume_iter(std::iter::from_fn(move || {
                let len = slice.as_ref().len();
                if len > 0 {
                    let i = slice.find(pred, 0, len).unwrap_or(len);
                    let (head, tail) = mem::take(&mut slice).split(i);
                    slice = tail;
                    Some(head)
                } else {
                    None
                }
            }));
        }
        if let Some(tail) = tail {
            folder = folder.consume(tail);
        }
        folder
    }
}
pub struct ChunkBy<'data, T, P> {
    pred: P,
    slice: &'data [T],
}
impl<'data, T, P: Clone> Clone for ChunkBy<'data, T, P> {
    fn clone(&self) -> Self {
        ChunkBy {
            pred: self.pred.clone(),
            slice: self.slice,
        }
    }
}
impl<'data, T: fmt::Debug, P> fmt::Debug for ChunkBy<'data, T, P> {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("ChunkBy")
            .field("slice", &self.slice)
            .finish()
    }
}
impl<'data, T, P> ChunkBy<'data, T, P> {
    pub(super) fn new(slice: &'data [T], pred: P) -> Self {
        Self { pred, slice }
    }
}
impl<'data, T, P> ParallelIterator for ChunkBy<'data, T, P>
where
    T: Sync,
    P: Fn(&T, &T) -> bool + Send + Sync,
{
    type Item = &'data [T];
    fn drive_unindexed<C>(self, consumer: C) -> C::Result
    where
        C: UnindexedConsumer<Self::Item>,
    {
        bridge_unindexed(
            ChunkByProducer {
                tail: self.slice.len(),
                slice: self.slice,
                pred: &self.pred,
                marker: PhantomData,
            },
            consumer,
        )
    }
}
pub struct ChunkByMut<'data, T, P> {
    pred: P,
    slice: &'data mut [T],
}
impl<'data, T: fmt::Debug, P> fmt::Debug for ChunkByMut<'data, T, P> {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("ChunkByMut")
            .field("slice", &self.slice)
            .finish()
    }
}
impl<'data, T, P> ChunkByMut<'data, T, P> {
    pub(super) fn new(slice: &'data mut [T], pred: P) -> Self {
        Self { pred, slice }
    }
}
impl<'data, T, P> ParallelIterator for ChunkByMut<'data, T, P>
where
    T: Send,
    P: Fn(&T, &T) -> bool + Send + Sync,
{
    type Item = &'data mut [T];
    fn drive_unindexed<C>(self, consumer: C) -> C::Result
    where
        C: UnindexedConsumer<Self::Item>,
    {
        bridge_unindexed(
            ChunkByProducer {
                tail: self.slice.len(),
                slice: self.slice,
                pred: &self.pred,
                marker: PhantomData,
            },
            consumer,
        )
    }
}