package Sort::Merge;
use strict;

require Exporter;
use vars '@ISA';
@ISA = qw(Exporter);

use vars '@EXPORT'; @EXPORT = qw(mergesort);
use vars '$VERSION'; $VERSION = '0.1';

=head1 NAME

Sort::Merge - Perl extension for merge sort

=head1 SYNOPSIS

  use Sort::Merge;
  my @a = sort qw(gamma beta alpha);
  my @b = sort qw(banana apple clementine);
  my @merged = mergesort(\@a, \@b);

=head1 DESCRIPTION

This module exports one function, C<mergesort()>, which merges some
already-sorted lists into one sorted list.  The arguments are
references to the lists to be merged, except that if the first
argument is a subroutine reference it is used as the comparison
function.

Unlike PerlE<39>s built-in C<sort()>, the comparison function should
take two arguments in @_, I<not> the magic variables C<$a> and C<$b>.
If the first argument is not a subroutine reference, then the default
comparison function C<{ $_[0] cmp $_[1] }> is used.

Remember that the input lists must already be sorted with respect to
the comparison used.  If they are not sorted then the output is
undefined.

=head1 PERFORMANCE

It is always possible to get the same output by concatenating
the input lists and then using C<sort()>.  Using merge sort gives
fewer calls to the comparison function, but there is greater overhead
because the sort routine itself is written in Perl not C.  Whether
merge sort or ordinary C<sort()> is a performance gain depends on how
big your lists are and how long the comparison function takes.

With simple comparisons like C<E<60>=E<62>> or C<cmp> it appears that
ordinary sorting is quicker: or at least that the large input sizes
which would give C<mergesort()> an advantage are so big I didnE<39>t
have time to benchmark them.  On the other hand, for a fairly complex
comparison function I halved the run time of a sorting program by
switching.

I just guessed at a suitable algorithm for n-way merge sort (as opposed
to just merging two lists); I donE<39>t know that it is optimal.  I
believe the time complexity is B<O>((I<m> + I<n>)(log I<n>))>,
where I<n> is the number of input lists and I<m> is the maximum length
of an input list - but I could have calculated that wrongly.

A reimplementation of this module as a C extension would run faster
(although the time complexity would be of the same order, of course)
and hopefully compare well with the builtin C<sort()>.

=head1 AUTHOR

Ed Avis, E<lt>ed@membled.comE<gt>

=head1 SEE ALSO

L<perl>.

=cut

# n-way merge sort by keeping a pool of the lowest element from each
# input list, outputting the smallest, and inserting a replacement
# element (from the same input list) with insertion sort.
#
# This has time complexity O((m + n)(log n)), where n is the number of
# input lists and m is the maximum length of an input list.  That's a
# lot better than Omega(mn(log mn)) you'd get by concatenating and
# sorting, but I don't know if it's the best possible.
#
sub mergesort( @ ) {
    my @r;
    my $f = shift;
    if (not defined $f or not ref($f) or ref($f) ne 'CODE') {
	unshift @_, $f;
	$f = sub { $_[0] cmp $_[1] };
    }
    my @l = @_;

    # We don't want to modify the input lists by shift()ing them, so
    # we keep track of 'current position' instead.  @pos contains the
    # index of the new 'first element'.
    #
    my @pos = map { 0 } @l;
    my $nonempty = sub ( $ ) { $pos[$_[0]] < @{$l[$_[0]]} };
    my $shift = sub( $ ) { $l[$_[0]]->[$pos[$_[0]]++] };


    # The pool contains tuples of [ element, list it came from ].  We
    # extend $f to these tuples.
    #
    my $f_fst = sub( $$ ) { $f->($_[0]->[0], $_[1]->[0]) };

    my @pool =
      sort $f_fst
	map { [ $shift->($_), $_ ] }
	  grep { $nonempty->($_) }
	    (0 .. $#l);
    foreach (@pool) { die if not defined $_->[1] }
    foreach (@pool) { die if not @$_ };

    while (@pool) {
	my ($e, $from) = @{shift @pool}; die if not defined $from;
	push @r, $e;
	if ($nonempty->($from)) {
	    foreach (@pool) { die if not defined $_->[1] }
	    foreach (@pool) { die if not @$_ };
	    insert($f_fst, \@pool, [ $shift->($from), $from ]);
	    foreach (@pool) { die if not defined $_->[1] }
	    foreach (@pool) { die if not @$_ };
	}
    }
    return @r;
}


sub insert( $$$ ) {
    my ($f, $list, $new) = @_;
    my ($l, $h) = (0, scalar @$list);
    while ($l != $h) {
	my $mid = int(($l + $h) / 2); die if $mid >= @$list;
	my $cmp = $f->($list->[$mid], $new);
	if ($cmp < 0) {
	    # List element at $mid lt new element.
	    $l = $mid + 1;
	}
	elsif ($cmp == 0) {
	    # List element at $mid eq new element.
	    $l = $h = $mid;
	}
	elsif ($cmp > 0) {
	    # List element at $mid gt new element.
	    $h = $mid;
	}
	else { die }
    }
    splice @$list, $l, 0, ($new);
}

1;


