#!/usr/bin/perl

#
# dbcolscorrelate.pm
# Copyright (C) 1998-2024 by John Heidemann <johnh@isi.edu>
#
# This program is distributed under terms of the GNU general
# public license, version 2.  See the file COPYING
# in $dblibdir for details.
#

package Fsdb::Filter::dbcolscorrelate;

=head1 NAME

dbcolscorrelate - find the coefficient of correlation over columns

=head1 SYNOPSIS

    dbcolscorrelate column1 column2 [column3...]

=head1 DESCRIPTION

Compute the Pearson coefficient of correlation over two (or more) columns.

The output is one line of correlations.

With exactly two columns, a new column I<correlation> is created.

With more than two columns, correlations are computed for each
pairwise combination of rows, and each output column
is given a name which is the concatenation of the two source rows,
joined with an underscore.

By default, we compute the I<population correlation coefficient>
(usually designed rho, E<0x03c1>)
and assume we see all members of the population.
With the B<--sample> option we instead compute the
I<sample correlation coefficient>, usually designated I<r>.
(Be careful in that the default here to full-population
is the I<opposite> of the default in L<dbcolstats>.)

This program requires a complete copy of the input data on disk.

=head1 OPTIONS

=over 4

=item B<--sample>

Select a the sample Pearson product-moment correlation coefficient
(the "sample correlation coefficient", usually designated I<r>).

=item B<--no-sample>

Select a the Pearson product-moment correlation coefficient
(the "correlation coefficient", usually designated E<0x03c1>).

=item B<--weight> COL

Weight the correlation by column COL.



=item B<-f FORMAT> or B<--format FORMAT>

Specify a L<printf(3)>-style format for output statistics.
Defaults to C<%.5g>.

=item B<-T TmpDir>

where to put tmp files.
Also uses environment variable TMPDIR, if -T is 
not specified.
Default is /tmp.

=back

=for comment
begin_standard_fsdb_options

This module also supports the standard fsdb options:

=over 4

=item B<-d>

Enable debugging output.

=item B<-i> or B<--input> InputSource

Read from InputSource, typically a file name, or C<-> for standard input,
or (if in Perl) a IO::Handle, Fsdb::IO or Fsdb::BoundedQueue objects.

=item B<-o> or B<--output> OutputDestination

Write to OutputDestination, typically a file name, or C<-> for standard output,
or (if in Perl) a IO::Handle, Fsdb::IO or Fsdb::BoundedQueue objects.

=item B<--autorun> or B<--noautorun>

By default, programs process automatically,
but Fsdb::Filter objects in Perl do not run until you invoke
the run() method.
The C<--(no)autorun> option controls that behavior within Perl.

=item B<--help>

Show help.

=item B<--man>

Show full manual.

=back

=for comment
end_standard_fsdb_options


=head1 SAMPLE USAGE

=head2 Input:

    #fsdb i_x i_y
    10.0   8.04
    8.0    6.95
    13.0   7.58
    9.0    8.81
    11.0   8.33
    14.0   9.96
    6.0    7.24
    4.0    4.26
    12.0  10.84
    7.0    4.82
    5.0    5.68

=head2 Command:

    cat TEST/anscombe_quartet.in | dbcolscorrelate i_x i_y

=head2 Output:

    #fsdb correlation:d
    0.81642
    #  | dbcolscorrelate i_x i_y


=head1 SEE ALSO

L<Fsdb>,
L<dbcolstatscores>,
L<dbcolsregression>,
L<dbrvstatdiff>.


=head1 CLASS FUNCTIONS

=cut

@ISA = qw(Fsdb::Filter);
$VERSION = 2.0;

use strict;
use Pod::Usage;
use Carp;

use Fsdb::Filter;
use Fsdb::IO::Reader;
use Fsdb::IO::Writer;
use Fsdb::Support qw($is_numeric_regexp);
use Fsdb::Support::NamedTmpfile;
use Fsdb::Filter::dbpipeline qw(dbpipeline_open2 dbpipeline_close2_hash dbcolstats);


=head2 new

    $filter = new Fsdb::Filter::dbcolscorrelate(@arguments);

Create a new dbcolscorrelate object, taking command-line arguments.

=cut

sub new ($@) {
    my $class = shift @_;
    my $self = $class->SUPER::new(@_);
    bless $self, $class;
    $self->set_defaults;
    $self->parse_options(@_);
    $self->SUPER::post_new();
    return $self;
}


=head2 set_defaults

    $filter->set_defaults();

Internal: set up defaults.

=cut

sub set_defaults ($) {
    my($self) = @_;
    $self->SUPER::set_defaults();
    $self->{_format} = "%.5g";
    $self->{_columns} = [];
    $self->{_include_non_numeric} = undef;
    $self->{_sample} = undef;
    $self->{_fscode} = undef;
    $self->{_weight_column} = undef;
    $self->set_default_tmpdir;
}

=head2 parse_options

    $filter->parse_options(@ARGV);

Internal: parse command-line arguments.

=cut

sub parse_options ($@) {
    my $self = shift @_;

    my(@argv) = @_;
    $self->get_options(
	\@argv,
 	'help|?' => sub { pod2usage(1); },
	'man' => sub { pod2usage(-verbose => 2); },
	'a|include-non-numeric!' => \$self->{_include_non_numeric},
	'autorun!' => \$self->{_autorun},
	'd|debug+' => \$self->{_debug},
	'f|format=s' => \$self->{_format},
	'F|fs|cs|fieldseparator|columnseparator=s' => \$self->{_fscode},
	'i|input=s' => sub { $self->parse_io_option('input', @_); },
	'log!' => \$self->{_logprog},
	's|sample!' =>  \$self->{_sample},
	'T|tmpdir|tempdir=s' => \$self->{_tmpdir},
	'o|output=s' => sub { $self->parse_io_option('output', @_); },
	'w|weight=s' => \$self->{_weight_column},
	) or pod2usage(2);
    push (@{$self->{_columns}}, @argv);
}

=head2 setup

    $filter->setup();

Internal: setup, parse headers.

=cut

sub setup ($) {
    my($self) = @_;

    $self->finish_io_option('input', -comment_handler => $self->create_delay_comments_sub);

    if (defined($self->{_weight_column})) {
        $self->{_weight_coli} = $self->{_in}->col_to_i($self->{_weight_column});
	croak($self->{_prog} . ": weight " . $self->{_weight_column} . " does not exist in the input stream.\n")
	    if (!defined($self->{_weight_coli}));
    } else {
        $self->{_weight_coli} = undef;
    };

    croak($self->{_prog} . ": at least two columns must be specified to compute a correlation.\n")
	if ($#{$self->{_columns}} < 1);
    my @output_columns;
    my %columns_processed;
    foreach my $i (0..$#{$self->{_columns}}) {
	my $column = $self->{_columns}[$i];
	croak($self->{_prog} . ": column $column is double-listed as an input column (not allowed).\n")
	    if (defined($columns_processed{$column}));
	$columns_processed{$column} = 1;
	$self->{_colis}[$i] = $self->{_in}->col_to_i($column);
	croak($self->{_prog} . ": column $column does not exist in the input stream.\n")
	    if (!defined($self->{_colis}[$i]));
	foreach my $j (0..$#{$self->{_columns}}) {
	    next if ($i >= $j);
	    push(@output_columns,  $self->{_columns}[$i] . "_" . $self->{_columns}[$j]);
	};
    };
    # if only one column, it has a special name
    $output_columns[0] =  'correlation:d'
	if ($#output_columns == 0);
    my @output_options = (-cols => \@output_columns);
    unshift (@output_options, -fscode => $self->{_fscode})
	if (defined($self->{_fscode}));
    $self->finish_io_option('output', @output_options);
};


=head2 run

    $filter->run();

Internal: run over each rows.

=cut
sub run ($) {
    my($self) = @_;

    #
    # First, read data and save it to a file,	
    # and send each relevant column off to get stats.
    # 
    $self->{_copy_filename} = Fsdb::Support::NamedTmpfile::alloc($self->{_tmpdir});
    my $copy_writer = new Fsdb::IO::Writer(-file => $self->{_copy_filename},
			-clone => $self->{_in});

    my $read_fastpath_sub = $self->{_in}->fastpath_sub();
    my $copy_fastpath_sub = $copy_writer->fastpath_sub();

    # and take stats
    my(@stats_source_queues);
    my(@stats_sinks);
    my(@stats_threads);
    my $columns_aref = $self->{_columns};
    my $colis_aref = $self->{_colis};
    my(@stats_cols) = qw(data);
    my(@stats_args) = (($self->{_sample} ? '--sample' : '--nosample'));
    if (defined($self->{_weight_coli})) {
        push(@stats_cols, 'weight');
        push(@stats_args, '--weight' => 'weight');
    };
    push(@stats_args, 'data');
    foreach (0..$#$columns_aref) {
	($stats_source_queues[$_], $stats_sinks[$_], $stats_threads[$_]) =
	    dbpipeline_open2([-cols => \@stats_cols], dbcolstats(@stats_args));
    }
    my $fref;
    my $loop_code = q'
        while ($fref = &$read_fastpath_sub()) {
            # copy and send to stats
	    $copy_writer->write_rowobj($fref);
	    # with forking we have to close output explicitly
	    # otherwise we block on the pipe(2) to the subprocesses.
	    foreach (0..$#$columns_aref) {
                $stats_sinks[$_]->write_row($fref->[$colis_aref->[$_]]';
    if (defined($self->{_weight_coli})) {
        $loop_code .= ', $fref->[' . $self->{_weight_coli} . ']';
    };
    $loop_code .= q');
	    };
        };
    ';
    eval $loop_code;
    # close up both
    $copy_writer->close;
    foreach (0..$#$columns_aref) {
	$stats_sinks[$_]->close;
	$stats_sinks[$_] = undef;
    };

    #
    # The definition of correlation coefficient:
    #
    # corr(X,Y) = 
    # \rho_{X,Y} = cov(X,Y) / (\sigma_X \sigma_Y)
    #
    # where cov(X,Y) = E[ (X - \mu_X) (Y - \mu_Y) ]
    #
    # To add weighting:
    # corr(x,y;w) = cov(x,y;w) / \sqrt( cov(x,x;w) cov(y,y;w) )
    #    or corr(x,y,w) = cov(x, y, w) / np.sqrt(cov(x, x, w) * cov(y, y, w))
    # where cov(X,Y;w) = \Sigma_i ( w_i ( x_i - m(x;w)) ( y_i - m(y;w)))
    #    or cov(x, y, w) = np.sum(w * (x - m(x, w)) * (y - m(y, w))) / np.sum(w)
    # where m(x;y) = (\Sigma_i (w_i x_i)) / \Sigma_i w_i))
    #    or m(x,w) = np.sum(x * y) / np.sum(w)
    #
    my @means;
    my @stddevs;
    my @ns;
    foreach (0..$#$columns_aref) {
	my $stats_href = dbpipeline_close2_hash($stats_source_queues[$_], $stats_sinks[$_], $stats_threads[$_]);
        for my $stat (qw(mean stddev n)) {
            my $val = $stats_href->{$stat};
            croak($self->{_prog} . ": column " . $columns_aref->[$_] . " does not have valid $stat.\n")
                if (!defined($val));
            if ($stat eq 'mean') {
                $means[$_] = $val;
            } elsif ($stat eq 'stddev') {
                $stddevs[$_] = $val;
            } elsif ($stat eq 'n') {
                $ns[$_] = $val;
            } else {
                croak("dbcolscorrelate.pm: interal error bad $stat");
            };
        };
    };

    #
    # Now read back the data,
    # compute the z-scores on the fly,
    # and use that to compute the correlation.
    #
    $self->{_in}->close;
    $self->{_in} = new Fsdb::IO::Reader(-file => $self->{_copy_filename},
		    -comment_handler => $self->create_pass_comments_sub);
    my $covariance_sums;
    my $covariances;
    foreach my $i (0..$#$columns_aref) {
	foreach my $j (0..$#$columns_aref) {
	    next if ($i > $j);
	    $covariance_sums->[$i][$j] = $covariances->[$i][$j] = 0;
	};
    };
	    
    $read_fastpath_sub = $self->{_in}->fastpath_sub(); # regenerate with copy stream
    my $w_coli = $self->{_weight_coli};

    #
    # now compute the weighted covariances
    # cov(X,Y) = E[ (X-E[X,w])(Y-E[Y,w]) ]
    #
    while ($fref = &$read_fastpath_sub()) {
	my @deviations;
	# compute devations (like z-scores, but not normalized by stddev)
	foreach (0..$#$columns_aref) {
	    my $x = $fref->[$colis_aref->[$_]];
	    if ($x !~ /$is_numeric_regexp/) {
		$deviations[$_] = undef;
	    } else {
		$deviations[$_] = $x - $means[$_];
	    };
        };

        # compute the covariance_sums
        my $w = defined($w_coli) ? $fref->[$w_coli] : 1;

	foreach my $i (0..$#$columns_aref) {
	    foreach my $j (0..$#$columns_aref) {
		next if ($i > $j ||!defined($deviations[$i]) || !defined($deviations[$j]));
		$covariance_sums->[$i][$j] += $w * $deviations[$i] * $deviations[$j];
	    };
	};
    };
    # and compute weighted covariances
    foreach my $i (0..$#$columns_aref) {
	foreach my $j (0..$#$columns_aref) {
            next if (!defined($covariance_sums->[$i][$j]));
            my $min_ns = ($ns[$i] <= $ns[$j] ? $ns[$i] : $ns[$j]);
            $covariances->[$i][$j] = $covariance_sums->[$i][$j] / (1.0 * $min_ns);
        };
    };


    #
    # Finally output the results.
    #
    # compute them
    my @correlations;
    foreach my $i (0..$#$columns_aref) {
	foreach my $j (0..$#$columns_aref) {
	    next if ($i >= $j);
	    if (!defined($covariances->[$i][$j])) {
		push(@correlations, $self->{_empty});
	    } else {
                my $den = sqrt( $covariances->[$i][$i] * $covariances->[$j][$j] );
                my $corr = ($den == 0 ? "-" : $covariances->[$i][$j] / $den);
		push(@correlations, $self->numeric_formatting($corr));
            };
        };
    };
    $self->{_out}->write_row_from_aref(\@correlations);
};

=head1 AUTHOR and COPYRIGHT

Copyright (C) 1998-2024 by John Heidemann <johnh@isi.edu>

This program is distributed under terms of the GNU general
public license, version 2.  See the file COPYING
with the distribution for details.

=cut

1;
