# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
#
# This Source Code Form is "Incompatible With Secondary Licenses", as
# defined by the Mozilla Public License, v. 2.0.

=head1 NAME

Bugzilla::DB::Oracle - Bugzilla database compatibility layer for Oracle

=head1 DESCRIPTION

This module overrides methods of the Bugzilla::DB module with Oracle
specific implementation. It is instantiated by the Bugzilla::DB module
and should never be used directly.

For interface details see L<Bugzilla::DB> and L<DBI>.

=cut

package Bugzilla::DB::Oracle;

use 5.10.1;
use strict;
use warnings;

use base qw(Bugzilla::DB);

use DBD::Oracle;
use DBD::Oracle qw(:ora_types);
use List::Util qw(max);

use Bugzilla::Constants;
use Bugzilla::Error;
use Bugzilla::Util;

#####################################################################
# Constants
#####################################################################
use constant EMPTY_STRING  => '__BZ_EMPTY_STR__';
use constant ISOLATION_LEVEL => 'READ COMMITTED';
use constant BLOB_TYPE => { ora_type => ORA_BLOB };
# The max size allowed for LOB fields, in kilobytes.
use constant MIN_LONG_READ_LEN => 32 * 1024;
use constant FULLTEXT_OR => ' OR ';

our $fulltext_label = 0;

sub new {
    my ($class, $params) = @_;
    my ($user, $pass, $host, $dbname, $port) = 
        @$params{qw(db_user db_pass db_host db_name db_port)};

    # You can never connect to Oracle without a DB name,
    # and there is no default DB.
    $dbname ||= Bugzilla->localconfig->{db_name};

    # Set the language enviroment
    $ENV{'NLS_LANG'} = '.AL32UTF8' if Bugzilla->params->{'utf8'};

    # construct the DSN from the parameters we got
    my $dsn = "dbi:Oracle:host=$host;sid=$dbname";
    $dsn .= ";port=$port" if $port;
    my $attrs = { FetchHashKeyName => 'NAME_lc',  
                  LongReadLen => max(Bugzilla->params->{'maxattachmentsize'} || 0,
                                     MIN_LONG_READ_LEN) * 1024,
                };
    my $self = $class->db_new({ dsn => $dsn, user => $user, 
                                pass => $pass, attrs => $attrs });
    # Needed by TheSchwartz
    $self->{private_bz_dsn} = $dsn;

    bless ($self, $class);

    # Set the session's default date format to match MySQL
    $self->do("ALTER SESSION SET NLS_DATE_FORMAT='YYYY-MM-DD HH24:MI:SS'");
    $self->do("ALTER SESSION SET NLS_TIMESTAMP_FORMAT='YYYY-MM-DD HH24:MI:SS'");
    $self->do("ALTER SESSION SET NLS_LENGTH_SEMANTICS='CHAR'") 
        if Bugzilla->params->{'utf8'};
    # To allow case insensitive query.
    $self->do("ALTER SESSION SET NLS_COMP='ANSI'");
    $self->do("ALTER SESSION SET NLS_SORT='BINARY_AI'");
    return $self;
}

sub bz_last_key {
    my ($self, $table, $column) = @_;

    my $seq = $table . "_" . $column . "_SEQ";
    my ($last_insert_id) = $self->selectrow_array("SELECT $seq.CURRVAL "
                                                  . " FROM DUAL");
    return $last_insert_id;
}

sub bz_check_regexp {
    my ($self, $pattern) = @_;

    eval { $self->do("SELECT 1 FROM DUAL WHERE "
          . $self->sql_regexp($self->quote("a"), $pattern, 1)) };

    $@ && ThrowUserError('illegal_regexp',
        { value => $pattern, dberror => $self->errstr });
}

sub bz_explain { 
     my ($self, $sql) = @_; 
     my $sth = $self->prepare("EXPLAIN PLAN FOR $sql"); 
     $sth->execute();
     my $explain = $self->selectcol_arrayref(
         "SELECT PLAN_TABLE_OUTPUT FROM TABLE(DBMS_XPLAN.DISPLAY)");
     return join("\n", @$explain); 
} 

sub sql_group_concat {
    my ($self, $text, $separator) = @_;
    $separator = $self->quote(', ') if !defined $separator;
    my ($distinct, $rest) = $text =~/^(\s*DISTINCT\s|)(.+)$/i;
    return "group_concat($distinct T_CLOB_DELIM(NVL($rest, ' '), $separator))";
}

sub sql_regexp {
    my ($self, $expr, $pattern, $nocheck, $real_pattern) = @_;
    $real_pattern ||= $pattern;

    $self->bz_check_regexp($real_pattern) if !$nocheck;

    return "REGEXP_LIKE($expr, $pattern)";
}

sub sql_not_regexp {
    my ($self, $expr, $pattern, $nocheck, $real_pattern) = @_;
    $real_pattern ||= $pattern;

    $self->bz_check_regexp($real_pattern) if !$nocheck;

    return "NOT REGEXP_LIKE($expr, $pattern)" 
}

sub sql_limit {
    my ($self, $limit, $offset) = @_;

    if(defined $offset) {
        return  "/* LIMIT $limit $offset */";
    }
    return "/* LIMIT $limit */";
}

sub sql_string_concat {
    my ($self, @params) = @_;

    return 'CONCAT(' . join(', ', @params) . ')';
}

sub sql_to_days {
    my ($self, $date) = @_;

    return " TO_CHAR(TO_DATE($date),'J') ";
}
sub sql_from_days{
    my ($self, $date) = @_;

    return " TO_DATE($date,'J') ";
}

sub sql_fulltext_search {
    my ($self, $column, $text) = @_;
    $text = $self->quote($text);
    trick_taint($text);
    $fulltext_label++;
    return "CONTAINS($column,$text,$fulltext_label) > 0", "SCORE($fulltext_label)";
}

sub sql_date_format {
    my ($self, $date, $format) = @_;
    
    $format = "%Y.%m.%d %H:%i:%s" if !$format;

    $format =~ s/\%Y/YYYY/g;
    $format =~ s/\%y/YY/g;
    $format =~ s/\%m/MM/g;
    $format =~ s/\%d/DD/g;
    $format =~ s/\%a/Dy/g;
    $format =~ s/\%H/HH24/g;
    $format =~ s/\%i/MI/g;
    $format =~ s/\%s/SS/g;

    return "TO_CHAR($date, " . $self->quote($format) . ")";
}

sub sql_date_math {
    my ($self, $date, $operator, $interval, $units) = @_;
    my $time_sql;
    if ($units =~ /YEAR|MONTH/i) {
        $time_sql = "NUMTOYMINTERVAL($interval,'$units')";
    } else{
        $time_sql = "NUMTODSINTERVAL($interval,'$units')";
    }
   return "$date $operator $time_sql";
}

sub sql_position {
    my ($self, $fragment, $text) = @_;
    return "INSTR($text, $fragment)";
}

sub sql_in {
    my ($self, $column_name, $in_list_ref, $negate) = @_;
    my @in_list = @$in_list_ref;
    return $self->SUPER::sql_in($column_name, $in_list_ref, $negate) if $#in_list < 1000;
    my @in_str;
    while (@in_list) {
        my $length = $#in_list + 1;
        my $splice = $length > 1000 ? 1000 : $length;
        my @sub_in_list = splice(@in_list, 0, $splice);
        push(@in_str, 
             $self->SUPER::sql_in($column_name, \@sub_in_list, $negate));
    }
    return "( " . join(" OR ", @in_str) . " )";
}

sub _bz_add_field_table {
    my ($self, $name, $schema_ref, $type) = @_;
    $self->SUPER::_bz_add_field_table($name, $schema_ref);
    if (defined($type) && $type == FIELD_TYPE_MULTI_SELECT) {
        my $uk_name = "UK_" . $self->_bz_schema->_hash_identifier($name . '_value');
        $self->do("ALTER TABLE $name ADD CONSTRAINT $uk_name UNIQUE(value)");
    }
}

sub bz_drop_table {
     my ($self, $name) = @_;
     my $table_exists = $self->bz_table_info($name);
     if ($table_exists) {
         $self->_bz_drop_fks($name);
         $self->SUPER::bz_drop_table($name);
     }
}

# Dropping all FKs for a specified table. 
sub _bz_drop_fks {
    my ($self, $table) = @_;
    my @columns = $self->bz_table_columns($table);
    foreach my $column (@columns) {
        $self->bz_drop_fk($table, $column);
    }
}

sub _fix_empty {
    my ($string) = @_;
    $string = '' if $string eq EMPTY_STRING;
    return $string;
}

sub _fix_arrayref {
    my ($row) = @_;
    return undef if !defined $row;
    foreach my $field (@$row) {
        $field = _fix_empty($field) if defined $field;
    }
    return $row;
}

sub _fix_hashref {
     my ($row) = @_;
     return undef if !defined $row;
     foreach my $value (values %$row) {
       $value = _fix_empty($value) if defined $value;
     }
     return $row;
}

sub adjust_statement {
    my ($sql) = @_;
    
    if ($sql =~ /^CREATE OR REPLACE.*/i){
        return $sql;
    } 

    # We can't just assume any occurrence of "''" in $sql is an empty
    # string, since "''" can occur inside a string literal as a way of
    # escaping a single "'" in the literal.  Therefore we must be trickier...

    # split the statement into parts by single-quotes.  The negative value
    # at the end to the split operator from dropping trailing empty strings
    # (e.g., when $sql ends in "''")
    my @parts = split /'/, $sql, -1;

    if( !(@parts % 2) ) {
        # Either the string is empty or the quotes are mismatched
        # Returning input unmodified.
        return $sql;
    }

    # We already verified that we have an odd number of parts.  If we take
    # the first part off now, we know we're entering the loop with an even
    # number of parts
    my @result;
    my $part = shift @parts;
    
    # Oracle requires a FROM clause in all SELECT statements, so append
    # "FROM dual" to queries without one (e.g., "SELECT NOW()")
    my $is_select = ($part =~ m/^\s*SELECT\b/io);
    my $has_from =  ($part =~ m/\bFROM\b/io) if $is_select;

    # Oracle recognizes CURRENT_DATE, but not CURRENT_DATE()
    # and its CURRENT_DATE is a date+time, so wrap in TRUNC()
    $part =~ s/\bCURRENT_DATE\b(?:\(\))?/TRUNC(CURRENT_DATE)/io;

    # Oracle use SUBSTR instead of SUBSTRING
    $part =~ s/\bSUBSTRING\b/SUBSTR/io;
   
    # Oracle need no 'AS'
    $part =~ s/\bAS\b//ig;
    
    # Oracle doesn't have LIMIT, so if we find the LIMIT comment, wrap the
    # query with "SELECT * FROM (...) WHERE rownum < $limit"
    my ($limit,$offset) = ($part =~ m{/\* LIMIT (\d*) (\d*) \*/}o);
    
    push @result, $part;
    while( @parts ) {
        my $string = shift @parts;
        my $nonstring = shift @parts;
    
        # if the non-string part is zero-length and there are more parts left,
        # then this is an escaped quote inside a string literal   
        while( !(length $nonstring) && @parts  ) {
            # we know it's safe to remove two parts at a time, since we
            # entered the loop with an even number of parts
            $string .= "''" . shift @parts;
            $nonstring = shift @parts;
        }

        # Look for a FROM if this is a SELECT and we haven't found one yet
        $has_from = ($nonstring =~ m/\bFROM\b/io) 
                    if ($is_select and !$has_from);

        # Oracle recognizes CURRENT_DATE, but not CURRENT_DATE()
        # and its CURRENT_DATE is a date+time, so wrap in TRUNC()
        $nonstring =~ s/\bCURRENT_DATE\b(?:\(\))?/TRUNC(CURRENT_DATE)/io;

        # Oracle use SUBSTR instead of SUBSTRING
        $nonstring =~ s/\bSUBSTRING\b/SUBSTR/io;
        
        # Oracle need no 'AS'
        $nonstring =~ s/\bAS\b//ig;
        
        # Take the first 4000 chars for comparison  
        $nonstring =~ s/\(\s*(longdescs_\d+\.thetext|attachdata_\d+\.thedata)/
                      \(DBMS_LOB.SUBSTR\($1, 4000, 1\)/ig;

        # Look for a LIMIT clause
        ($limit) = ($nonstring =~ m(/\* LIMIT (\d*) \*/)o);

        if(!length($string)){
           push @result, EMPTY_STRING;
           push @result, $nonstring;
        } else {
           push @result, $string;
           push @result, $nonstring;
        }
    }

    my $new_sql = join "'", @result;

    # Append "FROM dual" if this is a SELECT without a FROM clause
    $new_sql .= " FROM DUAL" if ($is_select and !$has_from);

    # Wrap the query with a "WHERE rownum <= ..." if we found LIMIT
    
    if (defined($limit)) {
        if ($new_sql !~ /\bWHERE\b/) {
            $new_sql = $new_sql." WHERE 1=1";
        }
        my ($before_where, $after_where) = split(/\bWHERE\b/i, $new_sql, 2);
        if (defined($offset)) {
            my ($before_from, $after_from) = split(/\bFROM\b/i, $new_sql, 2);
            $before_where = "$before_from FROM ($before_from,"
                          . " ROW_NUMBER() OVER (ORDER BY 1) R "
                          . " FROM $after_from ) "; 
            $after_where = " R BETWEEN $offset+1 AND $limit+$offset";
        } else {
            $after_where = " rownum <=$limit AND ".$after_where;
        }
        $new_sql = $before_where." WHERE ".$after_where;
    }
    return $new_sql;
}

sub do {
    my $self = shift;
    my $sql  = shift;
    $sql = adjust_statement($sql);
    unshift @_, $sql;
    return $self->SUPER::do(@_);
}

sub selectrow_array {
    my $self = shift;
    my $stmt = shift;
    my $new_stmt = (ref $stmt) ? $stmt : adjust_statement($stmt);
    unshift @_, $new_stmt;
    if ( wantarray ) {
        my @row = $self->SUPER::selectrow_array(@_);
        _fix_arrayref(\@row);
        return @row;
    } else {
        my $row = $self->SUPER::selectrow_array(@_);
        $row = _fix_empty($row) if defined $row;
        return $row;
    }
}

sub selectrow_arrayref {
    my $self = shift;
    my $stmt = shift;
    my $new_stmt = (ref $stmt) ? $stmt : adjust_statement($stmt);
    unshift @_, $new_stmt;
    my $ref = $self->SUPER::selectrow_arrayref(@_);
    return undef if !defined $ref;

    _fix_arrayref($ref);
    return $ref;
}

sub selectrow_hashref {
    my $self = shift;
    my $stmt = shift;
    my $new_stmt = (ref $stmt) ? $stmt : adjust_statement($stmt);
    unshift @_, $new_stmt;
    my $ref = $self->SUPER::selectrow_hashref(@_);
    return undef if !defined $ref;

    _fix_hashref($ref);
    return $ref;
}

sub selectall_arrayref {
    my $self = shift;
    my $stmt = shift;
    my $new_stmt = (ref $stmt) ? $stmt : adjust_statement($stmt);
    unshift @_, $new_stmt;
    my $ref = $self->SUPER::selectall_arrayref(@_);
    return undef if !defined $ref;
    
    foreach my $row (@$ref) {
       if (ref($row) eq 'ARRAY') {
            _fix_arrayref($row);
       }
       elsif (ref($row) eq 'HASH') {
            _fix_hashref($row);
       }
    }

    return $ref;
}

sub selectall_hashref {
    my $self = shift;
    my $stmt = shift;
    my $new_stmt = (ref $stmt) ? $stmt : adjust_statement($stmt);
    unshift @_, $new_stmt;
    my $rows = $self->SUPER::selectall_hashref(@_);
    return undef if !defined $rows;
    foreach my $row (values %$rows) { 
          _fix_hashref($row);
    }
    return $rows;
}

sub selectcol_arrayref {
    my $self = shift;
    my $stmt = shift;
    my $new_stmt = (ref $stmt) ? $stmt : adjust_statement($stmt);
    unshift @_, $new_stmt;
    my $ref = $self->SUPER::selectcol_arrayref(@_);
    return undef if !defined $ref;
    _fix_arrayref($ref);
    return $ref;
}

sub prepare {
    my $self = shift;
    my $sql  = shift;
    my $new_sql = adjust_statement($sql);
    unshift @_, $new_sql;
    return bless $self->SUPER::prepare(@_),
                        'Bugzilla::DB::Oracle::st';
}

sub prepare_cached {
    my $self = shift;
    my $sql  = shift;
    my $new_sql = adjust_statement($sql);
    unshift @_, $new_sql;
    return bless $self->SUPER::prepare_cached(@_),
                      'Bugzilla::DB::Oracle::st';
}

sub quote_identifier {
     my ($self,$id) = @_;
     return $id;
}

#####################################################################
# Protected "Real Database" Schema Information Methods
#####################################################################

sub bz_table_columns_real {
    my ($self, $table) = @_;
    $table = uc($table);
    my $cols = $self->selectcol_arrayref(
        "SELECT LOWER(COLUMN_NAME) FROM USER_TAB_COLUMNS WHERE 
         TABLE_NAME = ?  ORDER BY COLUMN_NAME", undef, $table);
    return @$cols;
}

sub bz_table_list_real {
    my ($self) = @_;
    my $tables = $self->selectcol_arrayref(
        "SELECT LOWER(TABLE_NAME) FROM USER_TABLES WHERE 
        TABLE_NAME NOT LIKE ?  ORDER BY TABLE_NAME", undef, 'DR$%');
    return @$tables;
}

#####################################################################
# Custom Database Setup
#####################################################################

sub bz_setup_database {
    my $self = shift;
    
    # Create a function that returns SYSDATE to emulate MySQL's "NOW()".
    # Function NOW() is used widely in Bugzilla SQLs, but Oracle does not 
    # have that function, So we have to create one ourself. 
    $self->do("CREATE OR REPLACE FUNCTION NOW "
              . " RETURN DATE IS BEGIN RETURN SYSDATE; END;");
    $self->do("CREATE OR REPLACE FUNCTION CHAR_LENGTH(COLUMN_NAME VARCHAR2)" 
              . " RETURN NUMBER IS BEGIN RETURN LENGTH(COLUMN_NAME); END;");
    
    # Create types for group_concat
    my $type_exists = $self->selectrow_array("SELECT 1 FROM user_types
                                              WHERE type_name = 'T_GROUP_CONCAT'");
    $self->do("DROP TYPE T_GROUP_CONCAT") if $type_exists;
    $self->do("CREATE OR REPLACE TYPE T_CLOB_DELIM AS OBJECT "
          . "( p_CONTENT CLOB, p_DELIMITER VARCHAR2(256)"
          . ", MAP MEMBER FUNCTION T_CLOB_DELIM_ToVarchar return VARCHAR2"
          . ");");
    $self->do("CREATE OR REPLACE TYPE BODY T_CLOB_DELIM IS
                  MAP MEMBER FUNCTION T_CLOB_DELIM_ToVarchar return VARCHAR2 is
                  BEGIN
                      RETURN p_CONTENT;
                  END;
              END;");

    $self->do("CREATE OR REPLACE TYPE T_GROUP_CONCAT AS OBJECT 
               (  CLOB_CONTENT CLOB,
                  DELIMITER    VARCHAR2(256),
                  STATIC FUNCTION ODCIAGGREGATEINITIALIZE(
                      SCTX IN OUT NOCOPY T_GROUP_CONCAT)
                  RETURN NUMBER,
                  MEMBER FUNCTION ODCIAGGREGATEITERATE(
                      SELF IN OUT NOCOPY T_GROUP_CONCAT,
                      VALUE IN T_CLOB_DELIM) 
                  RETURN NUMBER,
                  MEMBER FUNCTION ODCIAGGREGATETERMINATE(
                      SELF IN T_GROUP_CONCAT,
                      RETURNVALUE OUT NOCOPY CLOB,
                      FLAGS       IN NUMBER)
                  RETURN NUMBER,
                  MEMBER FUNCTION ODCIAGGREGATEMERGE(
                      SELF IN OUT NOCOPY T_GROUP_CONCAT,
                      CTX2 IN T_GROUP_CONCAT) 
                  RETURN NUMBER);");

    $self->do("CREATE OR REPLACE TYPE BODY T_GROUP_CONCAT IS
                  STATIC FUNCTION ODCIAGGREGATEINITIALIZE(
                  SCTX IN OUT NOCOPY T_GROUP_CONCAT)
                  RETURN NUMBER IS
                  BEGIN
                      SCTX := T_GROUP_CONCAT(EMPTY_CLOB(), NULL);
                      DBMS_LOB.CREATETEMPORARY(SCTX.CLOB_CONTENT, TRUE);
                      RETURN ODCICONST.SUCCESS;
                  END;
                  MEMBER FUNCTION ODCIAGGREGATEITERATE(
                      SELF IN OUT NOCOPY T_GROUP_CONCAT,
                      VALUE IN T_CLOB_DELIM) 
                  RETURN NUMBER IS
                  BEGIN
                      SELF.DELIMITER := VALUE.P_DELIMITER;
                      DBMS_LOB.WRITEAPPEND(SELF.CLOB_CONTENT, 
                                           LENGTH(SELF.DELIMITER),
                                           SELF.DELIMITER);
                      DBMS_LOB.APPEND(SELF.CLOB_CONTENT, VALUE.P_CONTENT);
  
                      RETURN ODCICONST.SUCCESS;
                  END;
                  MEMBER FUNCTION ODCIAGGREGATETERMINATE(
                      SELF IN T_GROUP_CONCAT,
                      RETURNVALUE OUT NOCOPY CLOB,
                      FLAGS IN NUMBER) 
                  RETURN NUMBER IS
                  BEGIN
                      RETURNVALUE := RTRIM(LTRIM(SELF.CLOB_CONTENT, 
                                     SELF.DELIMITER), 
                                     SELF.DELIMITER);
                      RETURN ODCICONST.SUCCESS;
                  END;
                  MEMBER FUNCTION ODCIAGGREGATEMERGE(
                      SELF IN OUT NOCOPY T_GROUP_CONCAT,
                      CTX2 IN T_GROUP_CONCAT) 
                  RETURN NUMBER IS
                  BEGIN
                      DBMS_LOB.WRITEAPPEND(SELF.CLOB_CONTENT, 
                                           LENGTH(SELF.DELIMITER), 
                                           SELF.DELIMITER);
                      DBMS_LOB.APPEND(SELF.CLOB_CONTENT, CTX2.CLOB_CONTENT);
                      RETURN ODCICONST.SUCCESS;
                  END;
               END;");

    # Create user-defined aggregate function group_concat
    $self->do("CREATE OR REPLACE FUNCTION GROUP_CONCAT(P_INPUT T_CLOB_DELIM) 
               RETURN CLOB 
               DETERMINISTIC PARALLEL_ENABLE AGGREGATE USING T_GROUP_CONCAT;");

    # Create a WORLD_LEXER named BZ_LEX for multilingual fulltext search
    my $lexer = $self->selectcol_arrayref(
       "SELECT pre_name FROM CTXSYS.CTX_PREFERENCES WHERE pre_name = ? AND
          pre_owner = ?",
          undef,'BZ_LEX',uc(Bugzilla->localconfig->{db_user}));
    if(!@$lexer) {
       $self->do("BEGIN CTX_DDL.CREATE_PREFERENCE
                        ('BZ_LEX', 'WORLD_LEXER'); END;");
    }

    $self->SUPER::bz_setup_database(@_);

    my $sth = $self->prepare("SELECT OBJECT_NAME FROM USER_OBJECTS WHERE OBJECT_NAME = ?");
    my @tables = $self->bz_table_list_real();

    foreach my $table (@tables) {
        my @columns = $self->bz_table_columns_real($table);
        foreach my $column (@columns) {
            my $def = $self->bz_column_info($table, $column);
            # bz_add_column() before Bugzilla 4.2.3 didn't handle primary keys
            # correctly (bug 731156). We have to add missing sequences and
            # triggers ourselves.
            if ($def->{TYPE} =~ /SERIAL/i) {
                my $sequence = "${table}_${column}_SEQ";
                my $exists = $self->selectrow_array($sth, undef, $sequence);
                if (!$exists) {
                    my @sql = $self->_get_create_seq_ddl($table, $column);
                    $self->do($_) foreach @sql;
                }
            }

            if ($def->{REFERENCES}) {
                my $references = $def->{REFERENCES};
                my $update = $references->{UPDATE} || 'CASCADE';
                my $to_table  = $references->{TABLE};
                my $to_column = $references->{COLUMN};
                my $fk_name = $self->_bz_schema->_get_fk_name($table,
                                                              $column,
                                                              $references);
                # bz_rename_table didn't rename the trigger correctly.
                if ($table eq 'bug_tag' && $to_table eq 'tags') {
                    $to_table = 'tag';
                }
                if ( $update =~ /CASCADE/i ){
                    my $trigger_name = uc($fk_name . "_UC");
                    my $exist_trigger = $self->selectcol_arrayref($sth, undef, $trigger_name);
                    if(@$exist_trigger) {
                        $self->do("DROP TRIGGER $trigger_name");
                    }
  
                    my $tr_str = "CREATE OR REPLACE TRIGGER $trigger_name"
                         . " AFTER UPDATE OF $to_column ON $to_table "
                         . " REFERENCING "
                         . " NEW AS NEW "
                         . " OLD AS OLD "
                         . " FOR EACH ROW "
                         . " BEGIN "
                         . "     UPDATE $table"
                         . "        SET $column = :NEW.$to_column"
                         . "      WHERE $column = :OLD.$to_column;"
                         . " END $trigger_name;";
                    $self->do($tr_str);
                }
            }
        }
    }

   # Drop the trigger which causes bug 541553
   my $trigger_name = "PRODUCTS_MILESTONEURL";
   my $exist_trigger = $self->selectcol_arrayref($sth, undef, $trigger_name);
   if(@$exist_trigger) {
       $self->do("DROP TRIGGER $trigger_name");
   }
}

# These two methods have been copied from Bugzilla::DB::Schema::Oracle.
sub _get_create_seq_ddl {
    my ($self, $table, $column) = @_;

    my $seq_name = "${table}_${column}_SEQ";
    my $seq_sql = "CREATE SEQUENCE $seq_name INCREMENT BY 1 START WITH 1 " .
                  "NOMAXVALUE NOCYCLE NOCACHE";
    my $trigger_sql = $self->_get_create_trigger_ddl($table, $column, $seq_name);
    return ($seq_sql, $trigger_sql);
}

sub _get_create_trigger_ddl {
    my ($self, $table, $column, $seq_name) = @_;

    my $trigger_sql = "CREATE OR REPLACE TRIGGER ${table}_${column}_TR "
                    . " BEFORE INSERT ON $table "
                    . " FOR EACH ROW "
                    . " BEGIN "
                    . "   SELECT ${seq_name}.NEXTVAL "
                    . "   INTO :NEW.$column FROM DUAL; "
                    . " END;";
    return $trigger_sql;
}

############################################################################

package Bugzilla::DB::Oracle::st;
use base qw(DBI::st);
 
sub fetchrow_arrayref {
    my $self = shift;
    my $ref = $self->SUPER::fetchrow_arrayref(@_);
    return undef if !defined $ref;
    Bugzilla::DB::Oracle::_fix_arrayref($ref);
    return $ref;
}

sub fetchrow_array {
    my $self = shift;
    if ( wantarray ) {
        my @row = $self->SUPER::fetchrow_array(@_);
        Bugzilla::DB::Oracle::_fix_arrayref(\@row);
        return @row;
    } else {
        my $row = $self->SUPER::fetchrow_array(@_);
        $row = Bugzilla::DB::Oracle::_fix_empty($row) if defined $row;
        return $row;
    }
}

sub fetchrow_hashref {
    my $self = shift;
    my $ref = $self->SUPER::fetchrow_hashref(@_);
    return undef if !defined $ref;
    Bugzilla::DB::Oracle::_fix_hashref($ref);
    return $ref;
}

sub fetchall_arrayref {
    my $self = shift;
    my $ref = $self->SUPER::fetchall_arrayref(@_);
    return undef if !defined $ref;
    foreach my $row (@$ref) {
        if (ref($row) eq 'ARRAY') {
             Bugzilla::DB::Oracle::_fix_arrayref($row);
        }
        elsif (ref($row) eq 'HASH') {
            Bugzilla::DB::Oracle::_fix_hashref($row);
        }
    }
    return $ref;
}

sub fetchall_hashref {
    my $self = shift;
    my $ref = $self->SUPER::fetchall_hashref(@_);
    return undef if !defined $ref;
    foreach my $row (values %$ref) {
         Bugzilla::DB::Oracle::_fix_hashref($row);
    }
     return $ref;
}

sub fetch {
    my $self = shift;
    my $row = $self->SUPER::fetch(@_);
    if ($row) {
      Bugzilla::DB::Oracle::_fix_arrayref($row);
    }
   return $row;
}
1;