#!/usr/local/bin/perl
# 
# $Header: emdb/sysman/admin/scripts/streams/pstats.pl /st_emdbsa_11.2/4 2009/03/22 07:52:28 sresrini Exp $
#
# pstats.pl
# 
# Copyright (c) 2008, 2009, Oracle and/or its affiliates. All rights reserved. 
#
#    NAME
#      pstats.pl - <one-line expansion of the name>
#
#    DESCRIPTION
#      <short description of component this file declares/defines>
#
#    NOTES
#      <other useful comments, qualifications, etc.>
#
#    MODIFIED   (MM/DD/YY)
#    sadattaw    09/23/08 - 
#    ashomish    09/10/08 - 
#    sadattaw    08/21/08 - 
#    sshastry    06/10/08 - 
#    anosriva    05/27/08 - 
#    sresrini    02/18/08 - streams persistent queue stats
#    sresrini    02/18/08 - Creation
# 
use strict;
use DBI;
print("starting...0 \n");
require "emd_common.pl";
require "semd_common.pl";
#require "jdbc_oci_connector.pl";


print("starting 1...\n");
# For TESTING: comment out stdin args
my %stdinArgs = get_stdinvars();
my $username = $stdinArgs{"EM_TARGET_USERNAME"};
my $password = $stdinArgs{"EM_TARGET_PASSWORD"};
my $address = $ENV{EM_TARGET_ADDRESS};
my $streamstype = $ENV{STREAMS_TYPE};
my $role = $ENV{EM_TARGET_ROLE};

my $mode = 0;
my $stmt;
print("starting 2...\n");
if($role =~ /SYSDBA/i)
{
    $mode = 2;
}
my $verStr = $ENV{VersionCategory};
if (!defined($verStr))
{
    $verStr = $ENV{VERSIONCATEGORY};
}

my $db_version = $verStr;
if (index($db_version, '9') == 0)
{
    $db_version = 9;
}
else
{
    $db_version = substr($db_version, 0, 2);
}
print("got dbversion..");
print("started...\n dbversion is $db_version");

if($db_version != 9)
{
	print("dbversion is not 9");
}
# TESTING: Un-comment for  manual testing
#my $username = "SYSTEM";
#my $password = "manager";
#my $address = "(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST=)(PORT=15043))(CONNECT_DATA=(SID=t102gc)))";
#my $streamstype = "APPLY";

# --------------------------------------------------------------------
# +++ Establish Target DB Connection
# --------------------------------------------------------------------
my %connection;
$connection{"db_conninfo"} = $address;
$connection{"db_username"} = $username;
$connection{"db_password"} = $password;
$connection{"db_mode"} = $mode;

#jdbc_oci_connect(\%connection)
#    or die (filterOraError("em_error=Could not connect to $username/$address: ".jdbc_oci_returnErrStr(\%connection), jdbc_oci_returnErr(\%connection)));
#jdbc_oci_register_metric_call(\%connection);
#

# --------------------------------------------------------------------
# +++ Establish Target DB Connection
# --------------------------------------------------------------------

my $dbconn = DBI->connect('dbi:Oracle:', "$username@".$address, "$password",
    {ora_session_mode => $mode, PrintError => 0, RaiseError => 0, AutoCommit => 0})

    or die "em_error=Could not connect to $username/$address: $DBI::errstr\n";

print("Connected...\n");
#Collect statistics for buffered queue

if($db_version != 9)
{
	my $bufsql = "select propagation_name, 'BUFFERED', num_msgs ready, 0  from ";
		$bufsql .="gv\$buffered_subscribers b,dba_propagation p, dba_queues q, dba_queue_tables t where ";
		$bufsql .="b.subscriber_name = p.propagation_name and b.subscriber_address  = p.destination_dblink and ";
		$bufsql .="b.queue_schema = p.source_queue_owner and b.queue_name = p.source_queue_name and ";
		$bufsql .="p.source_queue_name = q.name and p.source_queue_owner = q.owner and ";
		$bufsql .="q.queue_table = t.queue_table and b.inst_id=t.owner_instance";

	# print "\n".$bufsql."\n";

	#my $bufcur = jdbc_oci_prepare(\%connection, $bufsql)
	#    or die (filterOraError("em_error=prepare($bufsql): ".jdbc_oci_returnErrStr(\%connection), jdbc_oci_returnErr(\%connection)));
	#jdbc_oci_execute(\%connection, $bufcur)
	#    or die (filterOraError("em_error=cur->execute(): ".jdbc_oci_returnErrStr(\%connection,$bufcur), jdbc_oci_returnErr(\%connection,$bufcur)));

	$stmt = $dbconn->prepare( $bufsql );
	$stmt->execute();

	my( $col1, $col2, $col3, $col4);
	$stmt->bind_columns( undef, \$col1, \$col2, \$col3, \$col4);

	my @fetch_bufqrow;
	##while ( @fetch_bufqrow = jdbc_oci_fetchrow_array(\%connection, $bufcur) ) 
	while ( $stmt->fetch() )
	{
	#	print "em_result=".$fetch_bufqrow[0]."|".$fetch_bufqrow[1]."|".$fetch_bufqrow[3]."|".$fetch_bufqrow[4]."\n";
		print "em_result=".$col1."|".$col2."|".$col3."|".$col4."\n";
	}
}


#Collect persistent queue statistics
my $sql;
if($streamstype eq "APPLY")
{
	$sql = "select apply_name streams_name,'APPLY' streams_type,'' address,queue_table,queue_owner,queue_name from dba_queues, dba_apply where owner=queue_owner and queue_name=name and apply_captured='NO'";
}
else
{
	$sql .="select propagation_name streams_name,'PROPAGATION' streams_type,'\"'||destination_queue_owner||'\".\"'||destination_queue_name||'\""."\@'||destination_dblink address,queue_table,owner,source_queue_name from dba_queues, dba_propagation where owner=SOURCE_QUEUE_OWNER and SOURCE_QUEUE_NAME=name";
}

#  print "\n".$sql."\n";

#my $cur = jdbc_oci_prepare(\%connection, $sql)
#    or die (filterOraError("em_error=prepare($sql): ".jdbc_oci_returnErrStr(\%connection), jdbc_oci_returnErr(\%connection)));
#jdbc_oci_execute(\%connection, $cur)
#    or die (filterOraError("em_error=cur->execute(): ".jdbc_oci_returnErrStr(\%connection,$cur), jdbc_oci_returnErr(\%connection,$cur)));

my $stmt2 = $dbconn->prepare( $sql );
$stmt2->execute();

my $streams_name;
my $streams_type;
my $queue_table;
my $queue_owner;
my $queue_name;
my $prop_address;
$stmt2->bind_columns( undef, \$streams_name, \$streams_type, \$prop_address, \$queue_table, \$queue_owner, \$queue_name);

my @all_streams_names;
my @all_streams_types;
my @all_queue_tables;
my @all_queue_owners;
my @all_queue_names;
my @all_prop_addresses; 
my $all_streams_count = 0;
my $all_msgdata_count = 0;
my $numqts = 0;
my @all_msg_states;
my @all_queues;
my @all_counts;
my @all_queue_ow;
my @qts_unique;
my @all_addresses;

#my @fetch_row;
#@fetch_row = jdbc_oci_fetchrow_array(\%connection, $ro_cur);

##while ( @fetch_row = jdbc_oci_fetchrow_array(\%connection, $cur) ) {
while ( $stmt2->fetch() )
{
    $all_streams_count++;

	push (@all_streams_names, $streams_name);
        push (@all_streams_types,$streams_type );
	print "\npushing queue table...".$queue_table;
	push (@all_queue_tables, $queue_table);
	push (@all_queue_owners, $queue_owner);
	push (@all_queue_names, $queue_name);
	push(@all_prop_addresses,$prop_address);
	
# print "$streams_name, $streams_type, $queue_table, $queue_owner, $queue_name, $prop_address \n";

#print "em_result=$streams_name|APPLY|123\n";
}




 QTLOOP: for ( my $i = 0; $i < $all_streams_count; $i++ ) 
 {
	 my $qtable = "\"".$all_queue_owners[$i]."\".\"AQ\$".$all_queue_tables[$i]."\"";
	 print "\n".$qtable."\n";
	 for(my $j = 0; $j < $numqts ;$j ++)
	 {
		 my $qt = $qts_unique[$j];
		 if ($qt eq $qtable)
		 {
			 next QTLOOP;
		 }
	 }
	 push(@qts_unique,$qtable );
	$numqts++;
 }

my $qclause = "";
for(my $q = 0; $q < $all_streams_count; $q++)
{
	if ($q == 0) 
	{
		$qclause = "queue in (" ;
	}
	$qclause .= "'".$all_queue_names[$q]."'";
	if ($q == $all_streams_count - 1) 
	{
		$qclause .= ") and " ;
	}
	else
	{
		$qclause .= ",";
	}
}
for(my $k = 0; $k < $numqts ;$k ++)
{
	my $qtable = $qts_unique[$k];
	my $qtsql;
	$qtsql =  "select queue,address,msg_state,nvl(count(msg_state),0) from ".$qtable." where ".$qclause." msg_state in ('READY','WAITING') group by queue,address,msg_state";
	print "\n".$qtsql."\n";
#	my $qtcur = jdbc_oci_prepare(\%connection, $qtsql)
#		or die (filterOraError("em_error=prepare($sql): ".jdbc_oci_returnErrStr(\%connection), jdbc_oci_returnErr(\%connection)));

#	jdbc_oci_execute(\%connection, $qtcur)
#		or die (filterOraError("em_error=cur->execute(): ".jdbc_oci_returnErrStr(\%connection,$qtcur), jdbc_oci_returnErr(\%connection,$qtcur)));

my $stmt3 = $dbconn->prepare( $qtsql );
$stmt3->execute();

my( $queue, $address, $msg_state, $count);
$stmt3->bind_columns( undef, \$queue, \$address, \$msg_state, \$count);

#	my @fetch_row_qt;
#	while ( @fetch_row_qt = jdbc_oci_fetchrow_array(\%connection, $qtcur)) 
	while( $stmt3->fetch() ) 
	{
		push(@all_queues,$queue);
		push(@all_msg_states,$msg_state);
		push(@all_counts,$count);
		push(@all_addresses,$address);
		$all_msgdata_count++;
	}
$stmt3->finish();
}

print "\nall streams count :".$all_streams_count."\n";

if ($all_streams_count == 0)
{
        print "em_result=\n";
}
else
{

for (my $l = 0; $l < $all_streams_count; $l++)
{
	
	my $streams_name = $all_streams_names[$l];
	my $streams_type = $all_streams_types[$l];
	my $queue_table = $all_queue_tables[$l];
	my $prop_address = $all_prop_addresses[$l];
	my $queue_owner = $all_queue_owners[$l];
	my $queue_name = $all_queue_names[$l];
	my $result="em_result=$streams_name|PERSISTENT|";
	my $c = 0;

	MSG_LOOP: for (my $m = 0;$m<$all_msgdata_count ;$m++)
	{
		my $queue = $all_queues[$m] ;
		my $address = $all_addresses[$m] ;
		my $msg_state = $all_msg_states[$m] ;
		my $count = $all_counts[$m] ;

		if(($queue eq $queue_name) && ($address eq $prop_address))
		{
			if($msg_state eq 'READY')
			{
				$result.=$count."|";
				$c++;
				for (my $n = 0;$n<$all_msgdata_count ;$n++)
				{
					if(($queue eq $all_queues[$n]) && ($all_msg_states[$n] eq 'WAITING'))
					{
						$result.=$all_counts[$n]."\n";
						$c++;
						next MSG_LOOP;
					}
					if($n == $all_msgdata_count -1)
					{
						$result.="0\n";
					}
				}
			}
			else
			{
				if($msg_state eq 'WAITING')
				{
					
					for (my $n = 0;$n<$all_msgdata_count ;$n++)
					{
						if(($queue eq $all_queues[$n]) && ($all_msg_states[$n] eq 'READY'))
						{
							$result.=$all_counts[$n]."|";
							$c++;
							next MSG_LOOP;
						}
						if($n == $all_msgdata_count -1)
						{
							$result.="0|";
						}
					}
					$result.=$count."\n";
					$c++;
				}
			}
		}

	}
	if($c > 0)
	{
		print $result;
	}

}

}

$stmt->finish();
$stmt2->finish();
$dbconn->disconnect();

exit 0;
