Beta Shell
v2.0 ยท web2.us.cloudlogin.co
[FM]
[CMD]
[PHP]
[DB]
[INFO]
[SEC]
File Manager
~
/
usr
/
share
/
perl5
/
vendor_perl
/
Fsdb
/
IO
Upload
5 items
Name
Size
Perms
Modified
Actions
[ .. / .. ]
Reader.pm
16.21 KB
-rw-r--r--
2019-09-19 13:29:36
Edit
Del
Replayable.pm
3.28 KB
-rw-r--r--
2016-01-04 06:52:58
Edit
Del
Writer.pm
12.41 KB
-rw-r--r--
2022-04-02 06:32:03
Edit
Del
Editing: Reader.pm
(16.21 KB)
Path: /usr/share/perl5/vendor_perl/Fsdb/IO/Reader.pm
Back
#!/usr/bin/perl -w # # Fsdb::IO::Reader.pm # Copyright (C) 2005-2019 by John Heidemann <johnh@isi.edu> # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License, # version 2, as published by the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. # package Fsdb::IO::Reader; =head1 NAME Fsdb::IO::Reader - handle formatting reading from a fsdb file (handle) or queue =cut @ISA = qw(Fsdb::IO); ($VERSION) = 1.1; use strict; use IO::File; use Carp; use IO::Uncompress::AnyUncompress; use IPC::Open2; use Fsdb::IO; =head1 SAMPLE CODE Sample code reading an input stream: $in = new Fsdb::IO::Reader(-file => '-'); $in->error and die "cannot open stdin as fsdb: " . $in->error . "\n"; my @arow; while ($in->read_row_to_aref(\@arow) { # do something }; $in->close; =cut =head1 METHODS =head2 new $fsdb = new Fsdb::IO::Reader(-file => $filename); $fsdb = new Fsdb::IO::Reader(-header => "#fsdb -F t foo bar", -fh => $file_handle); Creates a new reader object from FILENAME. (FILENAME can also be a IO::Handle object, or an hdfs: file.) Always succeeds, but check the C<error> method to test for failure. =head3 Options: =over 4 =item B<other options> See also the options in Fsdb::IO, including C<-file>, C<-header>. =item B<-file FILENAME> Open and read the given filename. Special filename "-" is standard input, and files with hdfs: are read from Hadoop (but not with directory aggregation). =item B<-comment_handler $ref> Define how comments are handled. If $REF is a Fsdb::IO::Writer object, comments are written to that stream as they are encountered. if $REF is a ref to a scalar, then we assume that scalar will be filled in with a Fsdb::IO::Writer object later and treat it the same. If it is of type code, then it is assumed to be a callback function of the form: sub comment_handler ($) { my $comment = @_; } where the one argument will be a string with the unparsed comment (with leading # and trailing newline). By default, or if $ref is undef, comments are consumed. A typical handler if you have an output Fsdb stream is: sub { $out->write_raw(@_); }; (That is the code created by L<Fsdb::Filter::create_pass_comments_sub>.) There are several support routines to handle comments in a pipeline; see L<Fsdb::Filter::create_pass_comments_sub>, L<Fsdb::Filter::create_tolerant_pass_comments_sub>, L<Fsdb::Filter::create_delay_comments_sub>. =back User-specified -header arguments override a header provided in the input source. =cut sub new { my $class = shift @_; my $self = $class->SUPER::new(@_); bless $self, $class; # # new instance variables $self->{_unreadq} = []; # Could pass out the code so rowobj_sub propages down to fastpath. # Skip that for now. # $self->{_read_rowobj_code} = ' die; '; # placeholders $self->{_read_rowobj_sub} = sub { die; }; # $self->config(@_); # # setup: if (! ($self->{_fh} || $self->{_queue})) { $self->{_error} //= "Fsdb::IO::Reader: cannot setup filehandle"; return $self; }; if ($self->{_fh} && ref($self->{_fh}) eq 'IO::Pipe') { # don't do this if we're IO::Pipe::End, since it's already been done $self->{_fh}->reader(); }; $self->comment_handler_to_sub; # Note: reader/writer difference: readers have io subs before headers; writers only after. $self->create_io_subs(); if (!defined($self->{_headerrow})) { # get the header from the file (must not have been specified by the user) $self->read_headerrow; $self->parse_headerrow; }; if (defined($self->{_headerrow})) { $self->{_header_set} = 1; # go read-only # rebuild io subs in case the fscode changed $self->create_io_subs(); } else { $self->{_error} = "no header line"; return $self; }; return $self; } =head2 config_one documented in new =cut sub config_one { my($self, $aaref) = @_; if ($aaref->[0] eq '-file') { shift @$aaref; my($file) = shift @$aaref; my $fh; my $mode = $self->default_binmode(); if ($file eq '-') { $fh = new IO::Handle; $fh->fdopen(fileno(STDIN),"<"); binmode $fh, $mode; } elsif ($file =~ /^hdfs:/) { my($hdfs_fh, $compression_fh); $self->{_hdfs_reader_pid} = open($hdfs_fh, '-|', "hdfs", "-cat", $file); my $decompressor = Fsdb::IO::_find_filename_decompressor($file); if ($decompressor) { $self->{_compression_pid} = open2($compression_fh, $hdfs_fh, $decompressor); }; $fh = $compression_fh // $hdfs_fh; } else { my $decompressor = Fsdb::IO::_find_filename_decompressor($file); if ($decompressor) { $self->{_compression_pid} = open($fh, '-|', $decompressor, $file); } else { $fh = new IO::File $file, "<$mode"; }; }; if ($fh) { $self->{_fh} = $fh; } else { $self->{_error} = "cannot open $file"; }; } elsif ($aaref->[0] eq '-comment_handler') { shift @$aaref; $self->{_comment_handler} = shift @$aaref; $self->comment_handler_to_sub; } else { $self->SUPER::config_one($aaref); }; } =head2 comment_handler_to_sub; internal use only: parses and sets up the comment handle callback. (At input, _comment_sub is as given by -comment_handler, but at exit it is always an anon function. =cut sub comment_handler_to_sub { my($self) = @_; if (!defined($self->{_comment_handler})) { # just consume comments $self->{_comment_sub} = sub {}; } elsif (ref($self->{_comment_handler}) eq 'CODE') { # assume the user did the right thing passing in a sub $self->{_comment_sub} = $self->{_comment_handler}; } elsif (ref($self->{_comment_handler}) =~ /^Fsdb::IO::Writer/) { # write a pass-through $self->{_comment_sub} = sub { $self->{_comment_handler}->write_raw(@_); } } elsif (ref($self->{_comment_handler}) eq 'SCALAR') { # write a pass-through, but with one level of indirection # (This trick is necessary because often the Writer # cannot be opened before the Reader is created.) $self->{_comment_sub} = sub { ${$self->{_comment_handler}}->write_raw(@_); } } else { croak "correct_comment_handler: invalid -comment_handler argument\n"; }; } =head2 _enable_compression $self->_enable_compression internal use only: switch from uncompressed to compressed. =cut sub _enable_compression($) { my($self) = @_; return if (!$self->{_compression}); my $phy_fh = $self->{_fh}; binmode($phy_fh, ":raw"); $self->{_fh} = new IO::Uncompress::AnyUncompress $phy_fh or croak "Fsdb::IO::Reader: cannot switch to compression " . $self->{_compression}; # xxx: we now should push our encoding onto this new fh, # but not clear how IO::Uncompress handles that. } =head2 create_io_subs $self->create_io_subs() internal use only: create a thunk that returns rowobjs. =cut sub create_io_subs() { my($self) = @_; return if ($self->{_error}); croak "confusion: too many IO sources" if (defined($self->{_fh}) && defined($self->{_queue})); if (defined($self->{_fh})) { $self->_enable_compression() if ($self->{_compression} && $self->{_header_set}); # need to unserialize data from a file handle if ($self->{_rscode} eq 'D') { # # Normal line-by-line (rowized) format. # Carefully optimized. # my $fh = $self->{_fh}; my $fsre = $self->{_fsre}; $self->{_read_rowobj_sub} = sub { my $line = $fh->getline; return undef if (!defined($line)); # eof return $line if ($line =~ /^\s*\#/); # comment, no longer chomped; chomp $line; # Note that, technically, the next line is meaningless # if we haven't yet parsed the header. # We assume read_headerrow will sort that out adequately. my @f = split(/$fsre/, $line); return \@f; # a row }; } elsif ($self->{_rscode} eq 'C' || $self->{_rscode} eq 'I') { # # Colized-format. # Not particularly optimized. # my $fh = $self->{_fh}; my $fsre = $self->{_fsre}; # set up buffers for partial objects $self->{_rowize_eof} = undef; $self->{_rowize_partial_row} = [ ($self->{_empty}) x ($self->ncols) ]; $self->{_rowize_started_row} = undef; $self->{_read_rowobj_sub} = sub { return undef if ($self->{_rowize_eof}); # get a row for (;;) { # get a line to build up a full row my $line = $fh->getline; if (!defined($line)) { $self->{_rowize_eof} = 1; last; # exit infinite for }; # eof return $line if ($line =~ /^\s*\#/); # comment is fast-path return if ($line =~ /^\s*$/) { last if ($self->{_rowize_started_row}); next; # skip blank lines before content }; # parse one field, carefully my($key, $value) = ($line =~ /^([^:]+):\s+(.*)$/); croak("unparsable line '$line' (format should be ''key: value''\n") if (!defined($key)); croak("contents of line contain column separator: <$line>, will correct\n") if ($value =~ /$fsre/); $value = $self->{_empty} if (!defined($value) || $value eq ''); my $i = $self->{_cols_to_i}->{$key}; croak ("unknown column '$key' in '$line'.\n") if (!defined($i)); $self->{_rowize_partial_row}[$i] = $value; $self->{_rowize_started_row} = 1; }; # special case eof return undef if ($self->{_rowize_eof} && !$self->{_rowize_started_row}); # now return the new row my @f = @{$self->{_rowize_partial_row}}; # copy (maybe not needed?) $self->{_rowize_partial_row} = [ ($self->{_empty}) x ($self->ncols) ]; # reset $self->{_rowize_started_row} = undef; return \@f; }; } else { croak "undefined rscode " . $self->{_rscode} . "\n"; }; } elsif (defined($self->{_queue})) { # data is preformatted from a queue my $queue = $self->{_queue}; $self->{_read_rowobj_sub} = sub { return $queue->dequeue; }; } else { croak "confusion: no IO source\n"; }; } =head2 read_headerrow internal use only; reads the header =cut sub read_headerrow { my($self) = @_; return if ($self->{_error}); my $headerrow = &{$self->{_read_rowobj_sub}}; # Note special case: if ref($headerrow) than read_rowobj_sub # parsed the line for us and it wasn't a comment. Bad user! No header! if (!defined($headerrow) || ref($headerrow)) { my $printable_hr = $headerrow; if (!defined($printable_hr)) { $printable_hr = "[EOF]"; } elsif (ref($printable_hr) ne 'SCALAR') { $printable_hr = "$printable_hr"; $printable_hr =~ s/\(.*\)//; } else { $printable_hr = substr($printable_hr, 0, 200) . " ..." if (length($printable_hr) > 200); $printable_hr =~ s/[^[:print:]]+//g; }; $self->{_error} = "no header line (saw: $printable_hr)"; return; }; # Note: internally, headers are newlineless. chomp $headerrow; $self->{_headerrow} = $headerrow; }; # =head2 read_attributes # # Read the attributes. Called automatically to get attributes, # if any. # # =cut # sub read_attributes { # my($self) = @_; # croak "double attribute read.\n" if ($self->{_attributes_set}); # $self->{_attributes_set} = 1; # # my $fref; # while ($fref = $self->read_rowobj) { # last if (!defined($fref)); # eof! # last if (ref($fref)); # data (expected exit path) # last if ($fref !~ /^#%\s+([^:])+:\s+(.*)$/); # $self->{_attributes}{$1} = $2; # }; # # put the last thing back # $self->unread_rowobj($fref); # # sigh, we now blown the fastpath :-( # }; # # =head2 check_attributes # # internal use only; check that attributes have been read. # (for a writer, they always are) # # =cut # sub check_attributes { # return if ($self->{_attributes_set}); # if (!defined($self->{_headerrow})) { # $self->read_headerrow; # $self->parse_headerrow; # }; # $self->read_attributes; # } # =head2 read_rowobj $rowobj = $fsdb->read_rowobj; Reads a line of input and returns a "row object", either a scalar string for a comment or header, or an array reference for a row, or undef on end-of-stream. This routine is the fastest way to do full-featured fsdb-formatted IO. (Although see also Fsdb::Reader::fastpath_sub.) Unlike all the other routines (including fastpath_sub), read_rowobj does not do comment processing (calling comment_sub). =cut sub read_rowobj { my($self) = @_; return undef if (defined($self->{_error})); # first, check unread if ($#{$self->{_unreadq}} >= 0) { my $frontref = shift @{$self->{_unreadq}}; return $frontref; }; return &{$self->{_read_rowobj_sub}}; } =head2 read_row_to_aref $fsdb->read_row_to_aref(\@a); Then $a[0] is the 0th column, etc. Returns undef if the read fails, typically due to EOF. =cut sub read_row_to_aref { my($self, $aref) = @_; while (1) { my $rowobj = $self->read_rowobj; if (!defined($rowobj)) { return undef; # eof } elsif (!ref($rowobj)) { # comment &{$self->{_comment_sub}}($rowobj); } else { # assert(ref($rowobj) eq 'ARRAY'); @$aref = @$rowobj; return 1; }; }; } =head2 unread_rowobj $fsdb->unread_rowobj($fref) Put an fref back into the stream. =cut sub unread_rowobj { my($self, $fref) = @_; croak "unread_fref attempted with active fastpath\n" if ($self->{_fastpath_active}); unshift @{$self->{_unreadq}}, $fref; } =head2 unread_row_from_aref $fsdb->unread_row_from_aref(\@a); Put array @a back into the file. =cut sub unread_row_from_aref { my($self, $aref) = @_; croak "unread_row_from_aref attempted with active fastpath\n" if ($self->{_fastpath_active}); my @a = @$aref; # make a copy unshift @{$self->{_unreadq}}, \@a; } =head2 read_row_to_href $fsdb->read_row_to_href(\%h); Read the next row into hash C<%h>. Then $h{'colname'} is the value of that column. Returns undef if the read fails, typically due to EOF. =cut sub read_row_to_href { my($self, $href) = @_; my @a; $self->read_row_to_aref(\@a) or return undef; foreach my $i (0..$#{$self->{_cols}}) { $href->{$self->{_cols}[$i]} = $a[$i]; }; return 1; } =head2 unread_row_from_href $fsdb->unread_row_from_href(\%h); Put hash %h back into the file. =cut sub unread_row_from_href { my($self, $href) = @_; my @a = ('-' x $#{$self->{_cols}}); # null record foreach (keys %$href) { my($i) = $self->{_cols_to_i}->{$_}; defined($i) or croak "column name $_ is not in current file"; $a[$i] = $href->{$_}; }; $self->unread_row_from_aref(\@a); } =head2 fastpath_ok $fsdb->fastpath_ok(); Check if we can do fast-path IO (post-header, no pending unread rows, no errors). =cut sub fastpath_ok { my($self) = @_; return undef if (defined($self->{_error})); return undef if (!defined($self->{_headerrow})); return undef if ($#{$self->{_unreadq}} >= 0); return 1; } =head2 fastpath_sub $sub = $fsdb->fastpath_sub() $row_aref = &$sub(); Return an anonymous sub that does read fast-path when called. This code stub returns a new $aref corresponding with a data line, and handles comments as specified by -comment_handler =cut sub fastpath_sub { my($self) = @_; $self->fastpath_ok or croak "not able to do read fastpath\n"; $self->{_fastpath_active} = 1; # use lexical variables to emulate static to avoid object resolution { my $fh = $self->{_fh}; my $fsre = $self->{_fsre}; my $read_rowobj_sub = $self->{_read_rowobj_sub}; my $comment_sub = $self->{_comment_sub}; croak "Fsdb::IO::Reader::fastpath_sub missing comment handling subroutine.\n" if (!defined($comment_sub)); # xxx: this code should track read_row_to_aref my $fastpath = sub { while (1) { my $rowobj = &$read_rowobj_sub; if (!defined($rowobj)) { return undef; # eof } elsif (!ref($rowobj)) { # comment &$comment_sub($rowobj); } else { # assert(ref($rowobj) eq 'ARRAY') return $rowobj; }; }; }; # for more visibility: # $fastpath = sub { my @a:shared; $self->read_row_to_aref(\@a); return \@a; }; return $fastpath; } } 1;