Rem drv:
Rem $Header: emcore/source/oracle/sysman/emdrep/sql/core/latest/collections/collections_task_pkgbody.sql /st_emcore_10.2.0.4.3db11.2.0.2/1 2010/07/15 20:50:26 jsadras Exp $
Rem
Rem collections_task_pkgbody.sql
Rem
Rem Copyright (c) 2004, 2010, Oracle and/or its affiliates.
Rem All rights reserved.
Rem
Rem NAME
Rem collections_task_pkgbody.sql -
Rem
Rem DESCRIPTION
Rem
Rem
Rem NOTES
Rem
Rem
Rem MODIFIED (MM/DD/YY)
Rem jsadras 07/13/10 - Bug:9043010, convert to qualified sql name
Rem jsadras 11/19/08 - fix min interval to not update table
Rem jsadras 05/04/08 - Bug:7010717, call dbms_assert before executing
Rem jsadras 08/22/07 - Backport jsadras_bug-6347110 from main
Rem jsadras 07/30/07 - Backport jsadras_bug-6156475 from main
Rem kmanicka 04/26/07 - Backport kmanicka_bug-5895148 from main
Rem jsadras 08/17/07 - Bug:6347110, task workers
Rem jsadras 07/22/07 - Bug:6156475, snapshot collections processing
Rem change
Rem kmanicka 04/17/07 - bug:5895148 fix get_job_schedule
Rem jsadras 08/22/06 - Bug:5482661, Hints for performance
Rem jsadras 08/30/06 - Backport jsadras_bug-5482661 from main
Rem pmodi 12/09/05 - Backport jsadras_bug-4633750 from main
Rem jsadras 11/06/05 - Bug:4633750, use next_message for 10.2DB
Rem jsadras 09/15/05 - remove error_message is null from
Rem resubmit_failed_task
Rem pmodi 09/09/05 - Add new proc to detect and clean anamolies in
Rem task job
Rem jsadras 07/20/05 - Bug:4505966, protect against invalidation of
Rem jobs engine package
Rem scgrover 07/06/05 - add extended sql trace
Rem jsadras 06/30/05 - reordering updates to avoid deadlock
Rem gsbhatia 07/01/05 - New repmgr header impl
Rem jsadras 06/09/05 - add debugging and logging
Rem jsadras 06/02/05 - task class additions
Rem jsadras 04/26/05 - calling async proc
Rem jsadras 02/24/05 - remove update_min_time
Rem dcawley 02/24/05 - Use repository function from user model
Rem jsadras 02/21/05 - repository owner (Bug:4199342)
Rem jsadras 12/15/04 - pass timezone to run_collections
Rem jsadras 11/18/04 - min_interval
Rem jsadras 11/03/04 - service task_context
Rem jsadras 11/02/04 - extend task_type
Rem jsadras 10/27/04 - on-demand-task
Rem jsadras 10/19/04 - deq_condition: scheduled_date
Rem jsadras 10/14/04 - remove_rounding
Rem jsadras 09/30/04 - update_task
Rem jsadras 09/22/04 - duplicate worker
Rem jsadras 09/16/04 - task_error_message
Rem jsadras 09/01/04 - stop_worker
Rem jsadras 08/26/04 - handling failure
Rem jsadras 08/17/04 - jsadras_repocollect2
Rem jsadras 08/17/04 - log performance/error
Rem jsadras 08/16/04 - get_next_time
Rem jsadras 08/03/04 - Created
Rem
CREATE OR REPLACE PACKAGE BODY em_task
AS
G_MIN_INTERVAL_PARAM VARCHAR2(35) := 'mgmt_task_min_interval' ;
G_MAX_RUN_TIME_PARAM VARCHAR2(30) := 'mgmt_worker_max_run_time' ;
G_DEFAULT_MIN_INTERVAL NUMBER(1) := 1 ;
G_DQ_MODE_PENDING_TASKS CONSTANT NUMBER := 1 ;
G_DQ_MODE_TASK_ID CONSTANT NUMBER := 2 ;
G_DQ_MODE_WORKER_STOP CONSTANT NUMBER := 3 ;
G_DQ_CONDITION_TEXT VARCHAR2(200) := ' tab.user_data.scheduled_time <= '||
' CAST(SYS_EXTRACT_UTC(SYSTIMESTAMP) AS DATE)' ||
' AND (tab.user_data.message_code = 0 OR
tab.user_data.message_code = ' ;
G_EVENT_PROC_SIGNATURE CONSTANT VARCHAR2(100) := '(:context)' ;
G_REPO_OWNER dba_procedures.owner%TYPE ;
--
-- task priorities for repo task, avail metric, repo async and adhoc task
-- respectively
--
G_TASK_PRIORITIES CONSTANT mgmt_integer_array :=
mgmt_integer_array(100,50,100,150,100) ;
--
--Private procedures
--
PROCEDURE copy_array(p_array1 IN MGMT_INTEGER_ARRAY,
p_array2 OUT MGMT_JOB_INT_ARRAY)
IS
BEGIN
p_array2 := MGMT_JOB_INT_ARRAY() ;
IF p_array1 IS NOT NULL AND
p_array1.COUNT > 0
THEN
p_array2.extend(p_array1.COUNT) ;
FOR i in p_array1.FIRST..p_array1.LAST
LOOP
p_array2(i) := p_array1(i) ;
END LOOP ;
END IF ;
END copy_array;
--
-- Make sure the interval is more than the
-- minimum limit
--
FUNCTION adjust_interval(p_interval IN NUMBER)
RETURN NUMBER
IS
l_min_interval NUMBER ;
BEGIN
BEGIN
SELECT parameter_value
INTO l_min_interval
FROM mgmt_parameters
WHERE parameter_name = G_MIN_INTERVAL_PARAM ;
EXCEPTION
WHEN NO_DATA_FOUND THEN
l_min_interval := G_DEFAULT_MIN_INTERVAL ;
END ;
RETURN(greatest(l_min_interval,p_interval)) ;
END adjust_interval ;
PROCEDURE set_param_value(p_param_name IN VARCHAR2,
p_param_value IN NUMBER,
p_comment IN VARCHAR2 DEFAULT NULL
)
IS
BEGIN
INSERT INTO mgmt_parameters
(parameter_name,parameter_value,parameter_comment,internal_flag)
VALUES
(p_param_name,p_param_value,p_comment,1) ;
EXCEPTION
WHEN DUP_VAL_ON_INDEX THEN
UPDATE mgmt_parameters
SET parameter_value = p_param_value,
parameter_comment = NVL(p_comment,parameter_comment)
WHERE parameter_name = p_param_name ;
END set_param_value;
--
-- If we do not update existing collections then existing tasks
--
-- Set the maximum time in minutes a collection
-- worker can continously run
-- Default: 1 hour
--
PROCEDURE set_max_run_time_worker(p_minutes IN NUMBER)
IS
BEGIN
IF p_minutes <=1
THEN
raise_application_error(MGMT_GLOBAL.INVALID_PARAMS_ERR,
'Run time has to be greater than a minute') ;
END IF ;
set_param_value(p_param_name=>G_MAX_RUN_TIME_PARAM,
p_param_value=>p_minutes,
p_comment=>
'Maximum time a collection worker can continously run');
END set_max_run_time_worker ;
--
-- Return the maximum run time allowed
-- for a collection worker to run continously
-- Default is 1 hour
--
FUNCTION max_run_time_worker
RETURN NUMBER
IS
l_max_run_time NUMBER ;
BEGIN
BEGIN
SELECT NVL(ABS(parameter_value),60)
INTO l_max_run_time
FROM mgmt_parameters
WHERE parameter_name = G_MAX_RUN_TIME_PARAM ;
EXCEPTION
WHEN OTHERS THEN
l_max_run_time := 60 ;
END ;
-- make sure worker time is at least 1 minute
RETURN(GREATEST(1,l_max_run_time)) ;
END max_run_time_worker;
--
-- Set the minimum interval possible for interval frequency code
--
PROCEDURE set_min_interval(p_interval IN NUMBER)
IS
BEGIN
IF p_interval <=0
THEN
raise_application_error(MGMT_GLOBAL.INVALID_PARAMS_ERR,
'Interval has to be greater than 0') ;
END IF ;
set_param_value(p_param_name=>G_MIN_INTERVAL_PARAM,
p_param_value=>p_interval,
p_comment=>
'Minimum collection interval to be enforced') ;
--
-- do not update existing records for performance reasons
--
END set_min_interval ;
--
-- Schedules a task in AQ
-- If p_coll_queue_rec.message_code = 0 then task is enqueued
-- If p_coll_queue_rec.message_code = N then it is a stop message for worker N
--
PROCEDURE enqueue_task(p_coll_queue_rec IN OUT mgmt_coll_queue_obj,
p_priority IN NUMBER DEFAULT G_TASK_PRIORITY_DEFAULT,
p_delay_sec IN NUMBER DEFAULT NULL,
p_queue_name IN VARCHAR2 := G_TASK_QUEUE_NAME
)
IS
l_enqueue_options dbms_aq.enqueue_options_t;
l_message_properties dbms_aq.message_properties_t;
l_message_handle RAW(16);
l_scheduled_time DATE ;
invalid_queue EXCEPTION ;
PRAGMA EXCEPTION_INIT(INVALID_QUEUE,-25205) ;
BEGIN
IF EMDW_LOG.P_IS_INFO_SET
THEN
EMDW_LOG.INFO('enqueue_task:Enter',G_MODULE_NAME) ;
END IF ;
IF p_coll_queue_rec IS NULL
THEN
raise_application_error(MGMT_GLOBAL.COLLECTION_ERR,
'Task record cannot be null') ;
END IF ;
IF p_priority < 0
THEN
raise_application_error(MGMT_GLOBAL.COLLECTION_ERR,
'Priority should be greater than 0') ;
END IF ;
IF p_delay_sec < 0
THEN
raise_application_error(MGMT_GLOBAL.COLLECTION_ERR,
'Delay should be greater than 0') ;
END IF ;
IF p_coll_queue_rec.scheduled_time IS NULL OR
p_coll_queue_rec.scheduled_time < mgmt_global.sysdate_utc
THEN
p_coll_queue_rec.scheduled_time := mgmt_global.sysdate_utc ;
END IF ;
IF nvl(p_coll_queue_rec.task_id,-1) < 0
THEN
raise_application_error(MGMT_GLOBAL.COLLECTION_ERR,
'Task ID should be greater than or equal to 0') ;
END IF ;
l_message_properties.priority := nvl(p_priority,G_TASK_PRIORITY_DEFAULT) ;
l_message_properties.delay := nvl(p_delay_sec,0) ;
IF EMDW_LOG.P_IS_DEBUG_SET
THEN
EMDW_LOG.DEBUG('enqueue_task:'||
'task_id='||p_coll_queue_rec.task_id||
',Time='||to_char(p_coll_queue_rec.scheduled_time,'DD-MON-YY HH24:MI')||
',Priority='||to_char(p_priority), G_MODULE_NAME) ;
END IF ;
DBMS_AQ.ENQUEUE(queue_name=> G_REPO_OWNER||'.'||p_queue_name,
enqueue_options=> l_enqueue_options,
message_properties=> l_message_properties,
payload=> p_coll_queue_rec,
msgid=> l_message_handle);
IF EMDW_LOG.P_IS_INFO_SET
THEN
EMDW_LOG.INFO('enqueue_task:Exit normal',G_MODULE_NAME) ;
END IF ;
EXCEPTION
WHEN INVALID_QUEUE THEN
IF EMDW_LOG.P_IS_ERROR_SET
THEN
EMDW_LOG.ERROR('enqueue_task:Exit exception invalid queue name',G_MODULE_NAME) ;
END IF ;
raise_application_error(MGMT_GLOBAL.COLLECTION_ERR,
'Invalid Queue Name '||p_queue_name) ;
WHEN OTHERS THEN
IF EMDW_LOG.P_IS_ERROR_SET
THEN
EMDW_LOG.ERROR('enqueue_task:Exit exception '||sqlerrm,G_MODULE_NAME) ;
END IF ;
raise_application_error(MGMT_GLOBAL.COLLECTION_ERR,
sqlerrm||' when queueing task ') ;
END enqueue_task;
-- Dequeue a task
-- Parameters
-- p_dequeue_id = task_id of the task to be dequeued
-- or worker id depending on dequeue mode.
-- p_dequeue_mode Can take on multiple values
-- G_DQ_MODE_PENDING_TASKS will get the next pending task for the
-- the task type
-- G_DQ_MODE_TASK_ID will get the task identified by p_dequeue_id
-- G_DQ_MODE_STOP_WORKER will get stop messages for the worker
-- identified by p_dequeue_id=worker_id
--
--
FUNCTION dequeue_task(p_dequeue_id IN NUMBER,
p_dequeue_mode IN NUMBER,
p_task_class_list IN VARCHAR2 DEFAULT G_ALL_TASKS,
p_queue_name IN VARCHAR2 DEFAULT G_TASK_QUEUE_NAME )
RETURN mgmt_coll_queue_obj
IS
l_dequeue_options dbms_aq.dequeue_options_t;
l_message_properties dbms_aq.message_properties_t;
l_message_handle RAW(16);
l_message mgmt_coll_queue_obj ;
invalid_queue exception ;
no_data_in_q exception ;
PRAGMA exception_init(INVALID_QUEUE,-25205) ;
PRAGMA exception_init(NO_DATA_IN_Q,-25228) ;
BEGIN
IF EMDW_LOG.P_IS_INFO_SET
THEN
EMDW_LOG.INFO('dequeue_task:Enter Id='||p_dequeue_id||
' Mode='||p_dequeue_mode||
' Class='||p_task_class_list,G_MODULE_NAME );
END IF ;
IF p_dequeue_mode = G_DQ_MODE_PENDING_TASKS
THEN
l_dequeue_options.deq_condition :=G_DQ_CONDITION_TEXT || p_dequeue_id ||')';
IF p_task_class_list != G_ALL_TASKS
THEN
l_dequeue_options.deq_condition :=
l_dequeue_options.deq_condition ||
' AND tab.user_data.task_class IN ('||p_task_class_list||')' ;
END IF ;
ELSIF p_dequeue_mode = G_DQ_MODE_TASK_ID
THEN
l_dequeue_options.deq_condition :=
'tab.user_data.task_id = '||p_dequeue_id ;
ELSIF p_dequeue_mode = G_DQ_MODE_WORKER_STOP
THEN
l_dequeue_options.deq_condition :=
'tab.user_data.message_code = '||p_dequeue_id||' AND '||
'tab.user_data.task_id = 0' ;
ELSE
raise_application_error(MGMT_GLOBAL.COLLECTION_ERR,
'Invalid Dequeue Mode') ;
END IF ;
IF EMDW_LOG.P_IS_DEBUG_SET
THEN
EMDW_LOG.DEBUG('dequeue_task:Condition='||
l_dequeue_options.deq_condition,G_MODULE_NAME) ;
END IF ;
--
-- 10.2 DB has severe performance problems with FIRST MESSAGE (Bug:4633750)
-- So we dequeue by next message first and if it fails due to AQ problem
-- as documented in Metalink Note:118781, we revert to FIRST MESSAGE
-- and try again. This solution though not optimal is the recommended
-- solution from AQ dev.
--
BEGIN
l_dequeue_options.navigation :=dbms_aq.NEXT_MESSAGE ;
l_dequeue_options.wait :=dbms_aq.NO_WAIT;
dbms_aq.dequeue(queue_name => G_REPO_OWNER||'.'||p_queue_name,
dequeue_options => l_dequeue_options,
message_properties => l_message_properties,
payload =>l_message,
msgid => l_message_handle);
EXCEPTION
WHEN NO_DATA_IN_Q THEN
IF EMDW_LOG.P_IS_DEBUG_SET
THEN
EMDW_LOG.DEBUG('dequeue_task:Try First message',G_MODULE_NAME);
END IF ;
-- navigation needed to overcome AQ problem See Note:118781.1 on Metalink
l_dequeue_options.navigation :=dbms_aq.FIRST_MESSAGE ;
dbms_aq.dequeue(queue_name => G_REPO_OWNER||'.'||p_queue_name,
dequeue_options => l_dequeue_options,
message_properties => l_message_properties,
payload =>l_message,
msgid => l_message_handle);
END ;
IF EMDW_LOG.P_IS_INFO_SET
THEN
EMDW_LOG.INFO('dequeue_task:exit task_id='||l_message.task_id||
',message_code='||l_message.message_code,G_MODULE_NAME ) ;
END IF ;
RETURN(l_message) ;
EXCEPTION
WHEN INVALID_QUEUE THEN
IF EMDW_LOG.P_IS_INFO_SET
THEN
EMDW_LOG.INFO('dequeue_task:exit exception invalid queue',
G_MODULE_NAME ) ;
END IF ;
raise_application_error(MGMT_GLOBAL.COLLECTION_ERR,
'Invalid Queue Name '||p_queue_name) ;
WHEN NO_DATA_IN_Q THEN
IF EMDW_LOG.P_IS_INFO_SET
THEN
EMDW_LOG.INFO('dequeue_task:exit no task',G_MODULE_NAME ) ;
END IF ;
l_message := mgmt_coll_queue_obj.new(p_task_id=>-1,
p_task_class=>0,
p_message_code=>0,
p_scheduled_time=>null) ;
RETURN(l_message) ;
WHEN OTHERS THEN
IF EMDW_LOG.P_IS_INFO_SET
THEN
EMDW_LOG.INFO('dequeue_task:exit exception '||sqlerrm,G_MODULE_NAME ) ;
END IF ;
IF SQLCODE BETWEEN -20999 and -20000
THEN
RAISE ;
ELSE
raise_application_error(MGMT_GLOBAL.COLLECTION_ERR,
sqlerrm||' when dequeuing task') ;
END IF ;
END dequeue_task;
--
--update aq task
--
PROCEDURE update_aq_task(p_task_id IN NUMBER,
p_priority IN NUMBER,
p_next_time_utc IN DATE := NULL,
p_task_type IN NUMBER := NULL
)
IS
l_task_rec mgmt_coll_queue_obj ;
BEGIN
IF EMDW_LOG.P_IS_INFO_SET
THEN
EMDW_LOG.INFO('update_aq_task:Enter Task='||p_task_id||
'Next time utc='||to_char(p_next_time_utc,
'DD-MON-YY HH24:MI'),G_MODULE_NAME) ;
END IF ;
l_task_rec := dequeue_task(p_dequeue_id=>p_task_id,
p_dequeue_mode=>G_DQ_MODE_TASK_ID) ;
IF l_task_rec.task_id = p_task_id
THEN
l_task_rec := mgmt_coll_queue_obj.new(
p_task_id=>p_task_id,
p_task_class=>l_task_rec.task_class,
p_message_code=>0,
p_scheduled_time=>nvl(p_next_time_utc,
l_task_rec.scheduled_time) );
enqueue_task(p_coll_queue_rec=> l_task_rec,
p_priority=>p_priority) ;
ELSE
raise_application_error(MGMT_GLOBAL.COLLECTION_ERR,
'Unable to Update task in Queue') ;
END IF ;
IF EMDW_LOG.P_IS_INFO_SET
THEN
EMDW_LOG.INFO('update_aq_task:Exit normal',G_MODULE_NAME) ;
END IF ;
END update_aq_task;
--
-- Locks the task so that others cannot update it
--
PROCEDURE lock_task(p_task_id IN NUMBER)
IS
l_task_id mgmt_collection_tasks.task_id%type ;
BEGIN
SELECT task_id
INTO l_task_id
FROM mgmt_collection_tasks
WHERE task_id = p_task_id
FOR UPDATE ;
EXCEPTION
WHEN NO_DATA_FOUND THEN
raise_application_error(MGMT_GLOBAL.INVALID_PARAMS_ERR,
' Invalid task Id ('||to_char(p_task_id)||')' ) ;
END lock_task ;
--
-- Get the task information and lock optionally
--
PROCEDURE get_task_info(p_task_id IN NUMBER,
p_task_info OUT NOCOPY mgmt_collection_tasks%ROWTYPE,
p_lock_task IN BOOLEAN := FALSE
)
IS
BEGIN
IF EMDW_LOG.P_IS_INFO_SET
THEN
EMDW_LOG.INFO('get_task_time:Enter task_id='||p_task_id,
G_MODULE_NAME) ;
END IF ;
IF p_lock_task
THEN
-- add hints since the table size changes rapidly
-- and optimizer uses stale stats
SELECT /*+ INDEX(tasks mgmt_collection_tasks_pk) */ *
INTO p_task_info
FROM mgmt_collection_tasks tasks
WHERE task_id = p_task_id
FOR UPDATE ;
ELSE
SELECT /*+ INDEX(tasks mgmt_collection_tasks_pk) */ *
INTO p_task_info
FROM mgmt_collection_tasks tasks
WHERE task_id = p_task_id ;
END IF ;
IF EMDW_LOG.P_IS_INFO_SET
THEN
EMDW_LOG.INFO('get_task_time:Exit', G_MODULE_NAME) ;
END IF ;
EXCEPTION
WHEN NO_DATA_FOUND THEN
IF EMDW_LOG.P_IS_ERROR_SET
THEN
EMDW_LOG.ERROR('get_task_info: Exit Exception not-found', G_MODULE_NAME) ;
END IF ;
raise_application_error(MGMT_GLOBAL.INVALID_PARAMS_ERR,
'Invalid task_id '||p_task_id) ;
END ;
--
-- utility function to get the next time for execution
--
FUNCTION get_next_time_utc(p_frequency_code IN NUMBER,
p_start_time IN DATE := NULL,
p_end_time IN DATE := NULL,
p_execution_hours IN NUMBER := NULL,
p_execution_minutes IN NUMBER := NULL,
p_interval IN NUMBER := NULL,
p_days IN MGMT_INTEGER_ARRAY := NULL,
p_months IN MGMT_INTEGER_ARRAY := NULL,
p_last_collection_time IN DATE := NULL,
p_timezone_region IN VARCHAR := G_UTC_TIMEZONE
)
RETURN DATE
IS
l_schedule_rec mgmt_job_schedule_record ;
l_next_collection_time date ;
l_days mgmt_job_int_array := mgmt_job_int_array() ;
l_months mgmt_job_int_array := mgmt_job_int_array() ;
BEGIN
IF EMDW_LOG.P_IS_INFO_SET
THEN
EMDW_LOG.INFO('get_next_time_utc:Enter freq='||p_frequency_code||
' Interval='||p_interval||' Last time='||p_last_collection_time||
' timezone='||p_timezone_region,G_MODULE_NAME) ;
END IF ;
IF p_frequency_code != MGMT_GLOBAL.G_SCHED_FREQUENCY_ONDEMAND
THEN
copy_array(p_days,l_days) ;
copy_array(p_months,l_months) ;
l_schedule_rec := mgmt_jobs.get_job_schedule_record(
p_frequency_code,
nvl(p_start_time,sys_extract_utc(systimestamp)),
p_end_time,
p_execution_hours,
p_execution_minutes,
p_interval,
l_months,
l_days,
mgmt_job_engine.timezone_rgn_specified,
null,
null,
p_timezone_region) ;
RETURN(mgmt_job_engine.get_next_execution_time
(p_schedule=>l_schedule_rec,
p_last_start_time=>p_last_collection_time,
p_tzregion=>p_timezone_region)) ;
ELSE
RETURN(NULL) ;
END IF ;
END get_next_time_utc ;
--
-- get the task priority for a task type
--
FUNCTION task_priority(p_task_type IN NUMBER,
p_priority IN NUMBER)
RETURN NUMBER
IS
BEGIN
RETURN(GREATEST(NVL(p_priority,1),
G_TASK_PRIORITIES(p_task_type))) ;
EXCEPTION
WHEN OTHERS THEN
RAISE_APPLICATION_ERROR(MGMT_GLOBAL.INVALID_PARAMS_ERR,
'Task type does not exist (create_task)') ;
END task_priority ;
--
-- creates a collection task in the table and places it in AQ
--
FUNCTION create_task(p_coll_schedule IN mgmt_coll_schedule_obj,
p_task_type IN NUMBER := 0 ,
p_timezone_region IN VARCHAR2 := G_UTC_TIMEZONE,
p_task_proc IN VARCHAR2 := NULL,
p_context IN mgmt_namevalue_array DEFAULT NULL,
p_priority IN NUMBER DEFAULT NULL,
p_task_class IN NUMBER DEFAULT 0
)
RETURN NUMBER
IS
l_task_id mgmt_collection_tasks.task_id%type ;
l_schedule_record mgmt_job_schedule_record ;
l_next_collection_time date ;
l_task_rec mgmt_coll_queue_obj ;
l_start_time DATE ;
l_interval NUMBER ;
l_priority NUMBER ;
BEGIN
IF EMDW_LOG.P_IS_INFO_SET
THEN
EMDW_LOG.INFO('create_task:Enter timezone='||p_timezone_region,
G_MODULE_NAME) ;
END IF ;
EM_COLL_UTIL.validate_schedule(p_coll_schedule) ;
SELECT mgmt_task_sequence.nextval
INTO l_task_id
FROM DUAL ;
l_start_time := nvl(p_coll_schedule.start_time,
mgmt_global.sysdate_tzrgn(p_timezone_region)) ;
l_priority := task_priority(p_task_type,p_priority) ;
IF p_coll_schedule.frequency_code = MGMT_GLOBAL.G_SCHED_FREQUENCY_INTERVAL
THEN
l_interval := adjust_interval(p_coll_schedule.interval) ;
END IF ;
l_next_collection_time:= get_next_time_utc(
p_frequency_code=>p_coll_schedule.frequency_code,
p_start_time=> l_start_time,
p_end_time=> p_coll_schedule.end_time,
p_execution_hours=>p_coll_schedule.execution_hours,
p_execution_minutes => p_coll_schedule.execution_minutes,
p_interval=>l_interval,
p_days =>p_coll_schedule.days,
p_months =>p_coll_schedule.months,
p_last_collection_time =>NULL,
p_timezone_region => p_timezone_region) ;
IF l_next_collection_time IS NOT NULL OR
p_coll_schedule.frequency_code = MGMT_GLOBAL.G_SCHED_FREQUENCY_ONDEMAND
THEN
INSERT INTO mgmt_collection_tasks
(task_id,task_type,timezone_region,frequency_code,start_time,end_time,
execution_hours,execution_minutes,interval,months,days,
task_status,last_collection_timestamp,next_collection_timestamp,task_proc,
priority,task_class)
VALUES
(l_task_id,p_task_type,p_timezone_region,p_coll_schedule.frequency_code,
p_coll_schedule.start_time,p_coll_schedule.end_time,
p_coll_schedule.execution_hours,p_coll_schedule.execution_minutes,
l_interval,p_coll_schedule.months,
p_coll_schedule.days,G_TASK_STATUS_IDLE,
NULL,l_next_collection_time,p_task_proc,l_priority,p_task_class) ;
IF p_context IS NOT NULL AND p_context.COUNT > 0
THEN
BEGIN
FOR i IN p_context.FIRST..p_context.LAST
LOOP
IF p_context.EXISTS(i)
THEN
INSERT INTO mgmt_collection_task_context
(task_id,name,value)
VALUES
(l_task_id,p_context(i).name,p_context(i).value) ;
END IF ;
END LOOP ;
EXCEPTION
WHEN DUP_VAL_ON_INDEX THEN
raise_application_error(MGMT_GLOBAL.INVALID_PARAMS_ERR,
'Duplicate name in context (create_task)') ;
WHEN OTHERS THEN
raise_application_error(MGMT_GLOBAL.INVALID_PARAMS_ERR,
substr(sqlerrm,1,100)||'(create_task)') ;
END ;
END IF ;
IF p_coll_schedule.frequency_code != MGMT_GLOBAL.G_SCHED_FREQUENCY_ONDEMAND
THEN
l_task_rec := mgmt_coll_queue_obj.new
( p_task_id=>l_task_id,
p_task_class=>p_task_class,
p_message_code=>0,
p_scheduled_time=>l_next_collection_time) ;
enqueue_task(p_coll_queue_rec=> l_task_rec,
p_priority=>l_priority) ;
END IF ;
ELSE
raise_application_error(MGMT_GLOBAL.COLLECTION_ERR,
'next scheduled time is NULL') ;
END IF ;
IF EMDW_LOG.P_IS_INFO_SET
THEN
EMDW_LOG.INFO('create_task:exit normal task='||to_char(l_task_id),G_MODULE_NAME) ;
END IF ;
RETURN(l_task_id) ;
EXCEPTION
WHEN OTHERS THEN
IF EMDW_LOG.P_IS_ERROR_SET
THEN
EMDW_LOG.ERROR('create_task:exit exception '||sqlerrm,G_MODULE_NAME) ;
END IF ;
IF SQLCODE BETWEEN -20999 and -20000
THEN
RAISE ;
ELSE
raise_application_error(MGMT_GLOBAL.INVALID_PARAMS_ERR,
sqlerrm||to_char(sqlcode)||' when creating task ') ;
END IF ;
END create_task;
--
-- update task
--
PROCEDURE update_task_pvt(p_task_id IN NUMBER,
p_task_type IN NUMBER := NULL,
p_priority IN NUMBER := NULL,
p_next_time_utc IN DATE := NULL,
p_last_collection_timestamp IN DATE := NULL,
p_error_message IN VARCHAR2 := NULL,
p_failures IN NUMBER := NULL,
p_total_runs IN NUMBER := NULL,
p_min_wait_time IN NUMBER := NULL,
p_max_wait_time IN NUMBER := NULL,
p_avg_wait_time IN NUMBER := NULL,
p_min_run_time IN NUMBER := NULL,
p_max_run_time IN NUMBER := NULL,
p_avg_run_time IN NUMBER := NULL,
p_new_task_in_q IN BOOLEAN := FALSE
)
IS
l_task_rec mgmt_coll_queue_obj ;
l_frequency_code mgmt_collections.frequency_code%type ;
l_next_time_utc DATE ;
l_task_type mgmt_collection_tasks.task_type%type ;
l_task_class mgmt_collection_tasks.task_class%type ;
l_priority NUMBER ;
BEGIN
IF EMDW_LOG.P_IS_INFO_SET
THEN
EMDW_LOG.INFO('update_task_pvt:Enter',G_MODULE_NAME) ;
END IF ;
IF p_priority IS NOT NULL
THEN
l_priority := task_priority(l_task_type,p_priority) ;
END IF ;
-- There is no nvl on error message since we want to clear the error
-- message on successful execution
UPDATE mgmt_collection_tasks
SET task_type=nvl(p_task_type,task_type),
priority=nvl(l_priority,priority),
next_collection_timestamp=nvl(p_next_time_utc,
next_collection_timestamp),
last_collection_timestamp=nvl(p_last_collection_timestamp,
last_collection_timestamp),
error_message=p_error_message,
failures=nvl(p_failures,failures),
total_runs=nvl(p_total_runs,total_runs),
min_wait_time=nvl(p_min_wait_time,min_wait_time),
max_wait_time=nvl(p_max_wait_time,max_wait_time),
avg_wait_time=nvl(p_avg_wait_time,avg_wait_time),
min_run_time=nvl(p_min_run_time,min_run_time),
max_run_time=nvl(p_max_run_time,max_run_time),
avg_run_time=nvl(p_avg_run_time,avg_run_time)
WHERE task_id = p_task_id
RETURNING frequency_code,priority,task_type,
next_collection_timestamp,task_class
INTO l_frequency_code ,l_priority,
l_task_type ,l_next_time_utc,l_task_class ;
IF l_frequency_code != MGMT_GLOBAL.G_SCHED_FREQUENCY_ONDEMAND
THEN
IF p_new_task_in_q AND p_next_time_utc IS NOT NULL
THEN
l_task_rec := mgmt_coll_queue_obj.NEW(p_task_id=>p_task_id,
p_task_class=>l_task_class,
p_message_code=>0,
p_scheduled_time=>l_next_time_utc) ;
enqueue_task(p_coll_queue_rec=>l_task_rec,
p_priority=>l_priority) ;
ELSIF NOT p_new_task_in_q
THEN
IF p_task_type IS NOT NULL OR
p_next_time_utc IS NOT NULL OR
p_priority IS NOT NULL
THEN
update_aq_task(p_task_id=>p_task_id,
p_next_time_utc=>l_next_time_utc,
p_task_type=>p_task_type,
p_priority=>l_priority) ;
END IF ;
END IF ;
END IF ;
IF EMDW_LOG.P_IS_INFO_SET
THEN
EMDW_LOG.INFO('update_task_pvt:Exit',G_MODULE_NAME) ;
END IF ;
EXCEPTION
WHEN OTHERS THEN
IF EMDW_LOG.P_IS_INFO_SET
THEN
EMDW_LOG.INFO('update_task_pvt:Exit exception'||sqlerrm,G_MODULE_NAME) ;
END IF ;
IF SQLCODE BETWEEN -20999 and -20000
THEN
RAISE ;
ELSE
raise_application_error(MGMT_GLOBAL.COLLECTION_ERR,
sqlerrm||to_char(sqlcode)||' when updating task ') ;
END IF ;
END update_task_pvt;
--
-- Update the task queue parameters
--
PROCEDURE update_task(p_task_id IN NUMBER,
p_task_type IN NUMBER := NULL,
p_priority IN NUMBER := NULL,
p_next_time_utc IN DATE := NULL
)
IS
BEGIN
IF EMDW_LOG.P_IS_INFO_SET
THEN
EMDW_LOG.INFO('update_task:Enter',G_MODULE_NAME) ;
END IF ;
update_task_pvt(p_task_id=>p_task_id,
p_task_type=>p_task_type,
p_priority=>p_priority,
p_next_time_utc=>p_next_time_utc,
p_new_task_in_q=>FALSE) ;
IF EMDW_LOG.P_IS_INFO_SET
THEN
EMDW_LOG.INFO('update_task:Exit',G_MODULE_NAME) ;
END IF ;
END update_task;
--
-- Updates the task to a new schedule
--
PROCEDURE update_task(p_task_id IN NUMBER,
p_coll_schedule IN mgmt_coll_schedule_obj,
p_timezone_region IN VARCHAR2)
IS
l_next_collection_time DATE ;
l_last_collection_time DATE ;
l_interval NUMBER ;
l_priority mgmt_collection_tasks.priority%type ;
BEGIN
IF EMDW_LOG.P_IS_INFO_SET
THEN
EMDW_LOG.INFO('update_task:schedule:Enter',G_MODULE_NAME) ;
EMDW_LOG.INFO('update_task:timezone='||p_timezone_region,G_MODULE_NAME);
END IF ;
SELECT last_collection_timestamp
INTO l_last_collection_time
FROM mgmt_collection_tasks
WHERE task_id = p_task_id
FOR UPDATE ;
IF p_coll_schedule.frequency_code = MGMT_GLOBAL.G_SCHED_FREQUENCY_INTERVAL
THEN
l_interval := adjust_interval(p_coll_schedule.interval) ;
END IF ;
IF p_coll_schedule.frequency_code != MGMT_GLOBAL.G_SCHED_FREQUENCY_ONDEMAND
THEN
l_next_collection_time:= get_next_time_utc(
p_frequency_code=>p_coll_schedule.frequency_code,
p_start_time=>nvl(p_coll_schedule.start_time,
mgmt_global.sysdate_tzrgn(p_timezone_region)),
p_end_time=> p_coll_schedule.end_time,
p_execution_hours=>p_coll_schedule.execution_hours,
p_execution_minutes => p_coll_schedule.execution_minutes,
p_interval=>l_interval,
p_days =>p_coll_schedule.days,
p_months =>p_coll_schedule.months,
p_last_collection_time =>l_last_collection_time,
p_timezone_region => p_timezone_region) ;
END IF ;
UPDATE mgmt_collection_tasks
SET frequency_code = p_coll_schedule.frequency_code,
execution_hours = p_coll_schedule.execution_hours,
execution_minutes = p_coll_schedule.execution_minutes,
interval = l_interval,
days=p_coll_schedule.days,
months=p_coll_schedule.months,
start_time =p_coll_schedule.start_time,
end_time =p_coll_schedule.end_time,
next_collection_timestamp=l_next_collection_time,
timezone_region=nvl(p_timezone_region,G_UTC_TIMEZONE)
WHERE task_id = p_task_id
RETURNING priority INTO l_priority ;
IF p_coll_schedule.frequency_code != MGMT_GLOBAL.G_SCHED_FREQUENCY_ONDEMAND AND
l_next_collection_time IS NOT NULL
THEN
update_aq_task(p_task_id=>p_task_id,
p_priority=>l_priority,
p_next_time_utc=>l_next_collection_time) ;
END IF ;
IF EMDW_LOG.P_IS_INFO_SET
THEN
EMDW_LOG.INFO('update_task:schedule:Exit normal',G_MODULE_NAME) ;
END IF ;
EXCEPTION
WHEN NO_DATA_FOUND THEN
raise_application_error(MGMT_GLOBAL.INVALID_PARAMS_ERR,
'Invalid Task Id - '||to_char(p_task_id)) ;
END update_task ;
--
-- Put the task back in queue
--
-- If the next time is null
-- suspend repository collections associated with task
-- remove the task
--
FUNCTION schedule_next_execution(p_task_info IN mgmt_collection_tasks%rowtype)
RETURN DATE
IS
l_task_rec mgmt_coll_queue_obj ;
l_next_time_utc DATE ;
BEGIN
BEGIN
l_next_time_utc := get_next_time_utc(
p_frequency_code=>p_task_info.frequency_code,
p_start_time=> p_task_info.start_time,
p_end_time=> p_task_info.end_time,
p_execution_hours=>p_task_info.execution_hours,
p_execution_minutes => p_task_info.execution_minutes,
p_interval=>p_task_info.interval,
p_days =>p_task_info.days,
p_months =>p_task_info.months,
p_last_collection_time =>
p_task_info.last_collection_timestamp,
p_timezone_region => p_task_info.timezone_region) ;
EXCEPTION
WHEN MGMT_GLOBAL.plsql_package_discarded OR
MGMT_GLOBAL.plsql_program_not_found THEN
IF p_task_info.frequency_code = MGMT_GLOBAL.G_SCHED_FREQUENCY_INTERVAL
THEN
l_next_time_utc := MGMT_GLOBAL.sysdate_utc + (p_task_info.interval /1440) ;
ELSE
l_next_time_utc := MGMT_GLOBAL.sysdate_utc + 1/24 ;
END IF ;
END ;
IF l_next_time_utc IS NOT NULL OR
p_task_info.frequency_code = MGMT_GLOBAL.G_SCHED_FREQUENCY_ONDEMAND
THEN
update_task_pvt(p_task_id=>p_task_info.task_id,
p_next_time_utc=>l_next_time_utc,
p_last_collection_timestamp=>p_task_info.last_collection_timestamp,
p_error_message=>p_task_info.error_message,
p_failures=>p_task_info.failures,
p_total_runs=>p_task_info.total_runs,
p_min_wait_time=>p_task_info.min_wait_time,
p_max_wait_time=>p_task_info.max_wait_time,
p_avg_wait_time=>p_task_info.avg_wait_time,
p_min_run_time=>p_task_info.min_run_time,
p_max_run_time=>p_task_info.max_run_time,
p_avg_run_time=>p_task_info.avg_run_time,
p_new_task_in_q=>TRUE
) ;
ELSIF p_task_info.frequency_code != MGMT_GLOBAL.G_SCHED_FREQUENCY_ONDEMAND
THEN
IF p_task_info.task_type = G_TASK_TYPE_REPO
THEN
EM_COLL_UTIL.set_task_null(p_task_id=>p_task_info.task_id) ;
END IF ;
remove_task(p_task_info.task_id,FALSE) ;
END IF ;
RETURN(l_next_time_utc) ;
EXCEPTION
WHEN OTHERS THEN
IF SQLCODE BETWEEN -20999 and -20000
THEN
RAISE ;
ELSE
raise_application_error(MGMT_GLOBAL.COLLECTION_ERR,
sqlerrm||' when scheduling next execution of task') ;
END IF ;
END schedule_next_execution;
--
-- Removes a task
--
PROCEDURE remove_task(p_task_id IN NUMBER,
p_remove_aq IN BOOLEAN := TRUE)
IS
l_task_id mgmt_collection_tasks.task_id%type ;
l_task_rec mgmt_coll_queue_obj ;
l_frequency_code mgmt_collection_tasks.frequency_code%type ;
l_next_time_utc mgmt_collection_tasks.next_collection_timestamp%type ;
BEGIN
IF EMDW_LOG.P_IS_INFO_SET
THEN
EMDW_LOG.INFO('remove_task:Enter',G_MODULE_NAME) ;
END IF ;
-- add hints since the table size changes rapidly
-- and optimizer uses stale stats
DELETE /*+ INDEX(tasks mgmt_collection_tasks_pk) */
mgmt_collection_tasks tasks
WHERE task_id = p_task_id
RETURNING frequency_code,next_collection_timestamp
INTO l_frequency_code , l_next_time_utc ;
DELETE /*+ INDEX(ctx mgmt_collection_task_ctx_pk) */
mgmt_collection_task_context ctx
WHERE task_id = p_task_id ;
--
-- tasks are only queued in AQ if next time is not null and
-- collection frequency is not ON DEMAND
--
IF l_frequency_code != MGMT_GLOBAL.G_SCHED_FREQUENCY_ONDEMAND AND
l_next_time_utc IS NOT NULL AND
p_remove_aq
THEN
l_task_rec:= dequeue_task(p_dequeue_id=>p_task_id,
p_dequeue_mode=>G_DQ_MODE_TASK_ID) ;
IF l_task_rec.task_id < 0
THEN
raise_application_error(MGMT_GLOBAL.COLLECTION_ERR,
'Task not found in Queue') ;
END IF ;
END IF ;
IF EMDW_LOG.P_IS_INFO_SET
THEN
EMDW_LOG.INFO('remove_task:Exit normal',G_MODULE_NAME) ;
END IF ;
EXCEPTION
WHEN OTHERS THEN
IF SQLCODE BETWEEN -20999 and -20000
THEN
RAISE ;
ELSE
raise_application_error(MGMT_GLOBAL.COLLECTION_ERR,
sqlerrm||' when removing task') ;
END IF ;
END remove_task;
--
-- Enqueues a stop worker task so the worker can stop
--
PROCEDURE stop_worker_task(p_worker_id IN NUMBER)
IS
l_task_rec mgmt_coll_queue_obj ;
BEGIN
IF EMDW_LOG.P_IS_INFO_SET
THEN
EMDW_LOG.INFO('stop_worker_task:Enter Worker='||p_worker_id,G_MODULE_NAME);
END IF ;
IF nvl( p_worker_id,0) < 1
THEN
raise_application_error(MGMT_GLOBAL.INVALID_PARAMS_ERR,
'Worker Id should be greater than 0') ;
END IF ;
l_task_rec := mgmt_coll_queue_obj.new( p_task_id=>0,
p_task_class=>0,
p_message_code=>p_worker_id,
p_scheduled_time=>
mgmt_global.sysdate_utc) ;
-- enqueue at higher priority
enqueue_task(p_coll_queue_rec=> l_task_rec,
p_priority=>G_TASK_PRIORITY_STOP) ;
EXCEPTION
WHEN OTHERS THEN
IF SQLCODE BETWEEN -20999 and -20000
THEN
RAISE ;
ELSE
raise_application_error(MGMT_GLOBAL.COLLECTION_ERR,
sqlerrm||' when creating stop worker task') ;
END IF ;
END stop_worker_task;
--
-- internal function to return the lowest unallocted worker id number
--
FUNCTION get_next_worker_id
RETURN NUMBER
IS
CURSOR workers_cur
IS SELECT worker_id,
nvl(lag(worker_id) over (ORDER BY WORKER_ID ASC),0) prev_worker_id
FROM mgmt_collection_workers
ORDER BY worker_id ;
l_next_worker_id NUMBER := 0 ;
BEGIN
FOR workers IN workers_cur
LOOP
IF workers.worker_id > workers.prev_worker_id +1
THEN
l_next_worker_id := workers.prev_worker_id ;
EXIT ;
ELSE
l_next_worker_id := workers.worker_id ;
END IF ;
END LOOP ;
RETURN(l_next_worker_id+1) ;
END get_next_worker_id ;
--
-- lock the worker record
--
PROCEDURE lock_worker_rec(p_worker_id IN NUMBER,
p_worker_rec OUT mgmt_collection_workers%rowtype)
IS
BEGIN
SELECT *
INTO p_worker_rec
FROM mgmt_collection_workers
WHERE worker_id = p_worker_id
FOR UPDATE ;
EXCEPTION
WHEN NO_DATA_FOUND THEN
raise_application_error(MGMT_GLOBAL.INVALID_PARAMS_ERR,
'Invalid worker '||p_worker_id) ;
END ;
--
-- return the next execution time
-- Idea is to build some sophistication into this
--
FUNCTION get_next_execution_time
RETURN DATE
IS
BEGIN
RETURN(SYSDATE+G_MINUTE) ;
END get_next_execution_time;
--
-- create a worker thread
-- if p_background = TRUE then a DBMS_JOB is submitted
-- if p_background = FALSE then the worker record is created
-- and the collection worker is run syncronously in the session
--
FUNCTION create_worker(p_background IN BOOLEAN := TRUE,
p_task_class_list IN VARCHAR2 DEFAULT NULL
)
RETURN NUMBER
IS
l_worker_id mgmt_collection_workers.worker_id%type;
l_worker_status mgmt_collection_workers.worker_status%type;
l_dbms_job_id NUMBER ;
BEGIN
IF EMDW_LOG.P_IS_INFO_SET
THEN
EMDW_LOG.INFO('create_worker:Enter',G_MODULE_NAME) ;
END IF ;
LOCK TABLE mgmt_collection_workers IN EXCLUSIVE MODE ;
l_worker_id := get_next_worker_id ;
IF p_background
THEN
BEGIN
-- Remove jobs if any which are the same as worker
-- They do not have entry in collection workers so
-- they have been submitted manually
FOR rec IN ( SELECT job
FROM user_jobs
WHERE what = G_WORKER_PROC||'('||l_worker_id||');'
)
LOOP
DBMS_JOB.REMOVE(rec.job) ;
END LOOP ;
END ;
DBMS_JOB.SUBMIT(job=>l_dbms_job_id,
what=>G_WORKER_PROC||'('||l_worker_id||');',
next_date=>SYSDATE,
interval=>G_INTERVAL_PROC,
no_parse=>FALSE) ;
l_worker_status := G_WORKER_STATUS_STARTING ;
ELSE
l_worker_status := G_WORKER_STATUS_STARTED ;
END IF ;
INSERT INTO MGMT_COLLECTION_WORKERS
(worker_id,worker_status,task_class_list,
job_id, worker_start_time)
VALUES
(l_worker_id,l_worker_status,NVL(p_task_class_list,G_ALL_TASKS),
l_dbms_job_id,null) ;
IF EMDW_LOG.P_IS_INFO_SET
THEN
EMDW_LOG.INFO('create_worker:Exit Worker='||l_worker_id,G_MODULE_NAME) ;
END IF ;
RETURN(l_worker_id) ;
EXCEPTION
WHEN OTHERS THEN
raise_application_error(MGMT_GLOBAL.INVALID_PARAMS_ERR,
sqlerrm||' when creating worker') ;
END create_worker;
--
-- remove the worker identified by p_worker_id
--
PROCEDURE remove_worker(p_worker_id IN NUMBER)
IS
l_worker_rec mgmt_collection_workers%rowtype ;
l_job_id mgmt_collection_workers.job_id%type ;
l_task_rec mgmt_coll_queue_obj ;
BEGIN
IF EMDW_LOG.P_IS_INFO_SET
THEN
EMDW_LOG.INFO('remove_worker:Enter Worker ='||p_worker_id,G_MODULE_NAME) ;
END IF ;
BEGIN
SELECT *
INTO l_worker_rec
FROM mgmt_collection_workers
WHERE worker_id = p_worker_id
FOR UPDATE ;
EXCEPTION
WHEN NO_DATA_FOUND THEN
RAISE_APPLICATION_ERROR(MGMT_GLOBAL.INVALID_PARAMS_ERR,
'Worker Id '||p_worker_id||' does not exist') ;
END ;
-- remove the dbms job associated with the worker
IF l_worker_rec.job_id IS NOT NULL
THEN
BEGIN
SELECT job
INTO l_job_id
FROM user_jobs
WHERE job = l_worker_rec.job_ID AND
what = G_WORKER_PROC ||'('||p_worker_id||');' ;
DBMS_JOB.REMOVE(l_job_id) ;
EXCEPTION
WHEN NO_DATA_FOUND THEN NULL ;
WHEN OTHERS THEN
raise_application_error(MGMT_GLOBAL.INVALID_PARAMS_ERR,
sqlerrm||' when removing dbms_job '||l_job_id) ;
END ;
END IF ;
-- remove all stop messages from queue
LOOP
l_task_rec := dequeue_task(p_dequeue_id=>p_worker_id,
p_dequeue_mode=>G_DQ_MODE_WORKER_STOP) ;
EXIT WHEN l_task_rec.task_id < 0 ;
END LOOP ;
DELETE mgmt_collection_workers
WHERE worker_id = p_worker_id ;
IF EMDW_LOG.P_IS_INFO_SET
THEN
EMDW_LOG.INFO('remove_worker:Exit ',G_MODULE_NAME) ;
END IF ;
END remove_worker;
--
-- stop the worker
-- if p_stop_mode =NORMAL the worker will stop when it wakes up next
-- if p_stop_mode = IMMEDIATE
-- if worker is running, it will dequeue and stop
-- if worker is idle then it will stop when it wakes up
-- TODO: Need to add task_type
PROCEDURE stop_worker(p_worker_id IN NUMBER,
p_stop_mode IN NUMBER := G_WORKER_STOP_NORMAL)
IS
l_worker_rec mgmt_collection_workers%rowtype ;
BEGIN
IF EMDW_LOG.P_IS_INFO_SET
THEN
EMDW_LOG.INFO('stop_worker:Enter Worker='||p_worker_id,G_MODULE_NAME) ;
END IF ;
lock_worker_rec(p_worker_id=>p_worker_id,
p_worker_rec=>l_worker_rec) ;
IF l_worker_rec.worker_status != G_WORKER_STATUS_STOP_PENDING
THEN
UPDATE mgmt_collection_workers
SET worker_status = G_WORKER_STATUS_STOP_PENDING
WHERE worker_id = p_worker_id ;
END IF ;
IF p_stop_mode = G_WORKER_STOP_IMMEDIATE
THEN
EM_TASK.stop_worker_task(p_worker_id=>p_worker_id) ;
END IF ;
IF EMDW_LOG.P_IS_INFO_SET
THEN
EMDW_LOG.INFO('stop_worker:Exit ',G_MODULE_NAME) ;
END IF ;
END stop_worker;
--
-- stop a bunch of workers,higher numbered workers will be stopped first
-- if p_worker_count is NULL then all workers for the task class will be stopped
--
PROCEDURE stop_workers(p_worker_count IN NUMBER := NULL,
p_stop_mode IN NUMBER := G_WORKER_STOP_NORMAL,
p_task_class_list IN VARCHAR2 DEFAULT NULL)
IS
l_reduced_count NUMBER := 0 ;
BEGIN
IF EMDW_LOG.P_IS_INFO_SET
THEN
EMDW_LOG.INFO('stop_workers:Enter Worker count='||p_worker_count||
' task_class='||p_task_class_list,G_MODULE_NAME) ;
END IF ;
LOCK TABLE mgmt_collection_workers IN EXCLUSIVE MODE ;
IF p_worker_count <= 0
THEN
raise_application_error(MGMT_GLOBAL.COLLECTION_ERR,
'Worker Count has to be greater than 0') ;
END IF ;
-- remove up to worker count, higher numbered worker ids will be removed first
FOR rec IN (SELECT worker_id
FROM mgmt_collection_workers
WHERE worker_status ! = G_WORKER_STATUS_STOP_PENDING AND
task_class_list = NVL(p_task_class_list,G_ALL_TASKS)
ORDER BY worker_id DESC)
LOOP
l_reduced_count := l_reduced_count + 1 ;
stop_worker(p_worker_id=>rec.worker_id,
p_stop_mode=>p_stop_mode) ;
EXIT WHEN l_reduced_count = p_worker_count ;
END LOOP ;
COMMIT ;
IF EMDW_LOG.P_IS_INFO_SET
THEN
EMDW_LOG.INFO('stop_workers:Exit',G_MODULE_NAME) ;
END IF ;
END stop_workers ;
--
-- run the task identified by task_id
--
FUNCTION run_task(p_task_id IN NUMBER)
RETURN DATE
IS
l_task_tab_rec mgmt_collection_tasks%rowtype ;
l_sysdate_utc_start DATE := SYS_EXTRACT_UTC(SYSTIMESTAMP) ;
l_sysdate_utc_end DATE ;
l_wait_time NUMBER ;
l_run_time NUMBER ;
l_next_time_utc DATE ;
l_metric_values mgmt_metric_value_array := mgmt_metric_value_array() ;
l_context mgmt_namevalue_array := mgmt_namevalue_array() ;
FUNCTION context_string(p_context IN mgmt_namevalue_array)
RETURN VARCHAR2
IS
l_context_string VARCHAR2(2048) := '(';
BEGIN
IF p_context IS NOT NULL AND p_context.COUNT > 0
THEN
FOR i IN p_context.FIRST..p_context.LAST
LOOP
l_context_string := l_context_string || ' '||p_context(i).name
||'='||p_context(i).value||' ' ;
END LOOP ;
END IF ;
l_context_string := l_context_string||')' ;
RETURN(l_context_string) ;
EXCEPTION
-- only exception is if context exceeds 2048 characters so
-- we return what fits in
WHEN OTHERS THEN
RETURN(l_context_string||')') ;
END context_string ;
BEGIN
IF EMDW_LOG.P_IS_INFO_SET
THEN
EMDW_LOG.INFO('run_task:Enter',G_MODULE_NAME) ;
END IF ;
BEGIN
EM_TASK.get_task_info(p_task_id=>p_task_id,
p_task_info=>l_task_tab_rec,
p_lock_task=>TRUE) ;
EXCEPTION
WHEN OTHERS THEN
raise_application_error(MGMT_GLOBAL.INVALID_PARAMS_ERR,
'Invalid task '||p_task_id||' in queue') ;
END ;
BEGIN
IF l_task_tab_rec.task_type = G_TASK_TYPE_REPO
THEN
EM_COLL_UTIL.run_collection
(p_task_id=>p_task_id,
p_eval_mode=>EM_METRIC_EVAL.G_TASK_EVAL_MODE,
p_timezone_region=>l_task_tab_rec.timezone_region,
p_metric_values=>l_metric_values,
p_error_message=>l_task_tab_rec.error_message) ;
ELSE
IF EMDW_LOG.P_IS_DEBUG_SET
THEN
EMDW_LOG.DEBUG('run_task:task type='||l_task_tab_rec.task_type||
' Procedure='||l_task_tab_rec.task_proc, G_MODULE_NAME) ;
END IF ;
BEGIN
-- add hints since the table size changes rapidly
-- and optimizer uses stale stats
SELECT /*+ INDEX(ctx mgmt_collection_task_ctx_pk) */
mgmt_namevalue_obj.new(name,value)
BULK COLLECT INTO l_context
FROM mgmt_collection_task_context ctx
WHERE task_id = p_task_id ;
EXCEPTION WHEN NO_DATA_FOUND THEN NULL ;
END;
IF EMDW_LOG.P_IS_DEBUG_SET
THEN
EMDW_LOG.DEBUG('run_task:Context='||context_string(l_context)
,G_MODULE_NAME);
END IF ;
IF l_task_tab_rec.task_type = G_TASK_TYPE_AVAIL
THEN
EM_REP_METRIC.run_metric_eval(p_context=>l_context) ;
ELSIF l_task_tab_rec.task_type = G_TASK_TYPE_REPO_ASYNC
THEN
EM_COLL_UTIL.run_collection(p_context=>l_context) ;
ELSIF l_task_tab_rec.task_type = G_TASK_TYPE_SNAPSHOT
THEN
EM_COLL_UTIL.run_snapshot_collections(p_context=>l_context) ;
ELSIF l_task_tab_rec.task_proc IS NOT NULL
THEN
EXECUTE IMMEDIATE 'CALL '||
DBMS_ASSERT.qualified_sql_name(l_task_tab_rec.task_proc)||'(:context)'
USING l_context ;
ELSE
raise_application_error(MGMT_GLOBAL.COLLECTION_ERR,
'Task procedure is NULL') ;
END IF ;
l_task_tab_rec.error_message := NULL ;
END IF ;
EXCEPTION
WHEN OTHERS THEN
l_task_tab_rec.error_message := sqlerrm ;
-- repository collections will have metric errors logged
IF l_task_tab_rec.task_type != G_TASK_TYPE_REPO
THEN
MGMT_LOG.log_error(
v_module_name_in=>MGMT_COLLECTION.G_MODULE_NAME,
v_error_code_in=>MGMT_GLOBAL.COLLECTION_ERR,
v_error_msg_in=>'Task Execution error:'||substr(sqlerrm,1,100)||
' task='||l_task_tab_rec.task_id||
' procedure='||l_task_tab_rec.task_proc||
' Context='||context_string(l_context),
v_log_level_in=>MGMT_GLOBAL.G_ERROR) ;
END IF ;
IF EMDW_LOG.P_IS_ERROR_SET
THEN
EMDW_LOG.DEBUG('run_task:error '||l_task_tab_rec.error_message,
G_MODULE_NAME);
END IF ;
END ;
IF l_task_tab_rec.error_message IS NOT NULL
THEN
l_task_tab_rec.failures := nvl(l_task_tab_rec.failures,0)+1 ;
ELSE
l_task_tab_rec.failures := 0 ;
END IF ;
l_sysdate_utc_end := SYS_EXTRACT_UTC(SYSTIMESTAMP);
--
--TBD: A possible bug here, need to check, because next collection timestamp
-- is based on the time the last collection was scheduled, not when
-- last collection was run.
-- for example: If collection has 5 minute interval and was supposed
-- to start at 4:50 and it actually started at 4:51, next time should
-- be 4:55 not 4:56
--
l_task_tab_rec.last_collection_timestamp := l_sysdate_utc_start ;
l_wait_time := GREATEST((l_sysdate_utc_start -
l_task_tab_rec.next_collection_timestamp)*86400,0) ;
l_run_time := (l_sysdate_utc_end - l_sysdate_utc_start)*86400 ;
IF l_task_tab_rec.min_wait_time = 0
THEN
l_task_tab_rec.min_wait_time := l_wait_time ;
ELSIF l_task_tab_rec.min_wait_time > 0
THEN
l_task_tab_rec.min_wait_time := least(l_wait_time,
l_task_tab_rec.min_wait_time) ;
END IF ;
l_task_tab_rec.max_wait_time := greatest(l_wait_time,
l_task_tab_rec.max_wait_time) ;
l_task_tab_rec.avg_wait_time := ( l_wait_time+
(l_task_tab_rec.avg_wait_time*
l_task_tab_rec.total_runs)
)/(l_task_tab_rec.total_runs+1) ;
IF l_task_tab_rec.min_run_time = 0
THEN
l_task_tab_rec.min_run_time := l_run_time ;
ELSE
l_task_tab_rec.min_run_time := least(l_run_time,
l_task_tab_rec.min_run_time) ;
END IF ;
l_task_tab_rec.max_run_time := greatest(l_run_time,
l_task_tab_rec.max_run_time) ;
l_task_tab_rec.avg_run_time := ( l_run_time+
(l_task_tab_rec.avg_run_time*
l_task_tab_rec.total_runs)
)/(l_task_tab_rec.total_runs+1) ;
l_task_tab_rec.total_runs := l_task_tab_rec.total_runs + 1 ;
l_next_time_utc := EM_TASK.schedule_next_execution(l_task_tab_rec) ;
--
-- Error was being logged here, removed since we do not want to log
-- errors to mgmt_log we will be logging to metric errors and policy
-- evaluation tables.
--
IF EMDW_LOG.P_IS_INFO_SET
THEN
EMDW_LOG.INFO('run_task:Exit normal',G_MODULE_NAME) ;
END IF ;
RETURN(l_next_time_utc) ;
END run_task;
--
-- run the tasks in AQ
--
FUNCTION run_tasks(p_worker_id IN NUMBER,
p_task_class_list IN VARCHAR2 := G_ALL_TASKS)
RETURN NUMBER
IS
l_current_task_id MGMT_COLLECTION_TASKS.task_id%type := 1 ;
l_task_q_rec mgmt_coll_queue_obj ;
l_tasks_processed NUMBER := 0 ;
l_sysdate_start DATE := SYSDATE ;
l_sysdate_end DATE ;
l_duration NUMBER ;
l_min_time_utc DATE ;
l_next_time_utc DATE ;
-- max_run_time_worker is a local function
l_end_time DATE := SYSDATE + (max_run_time_worker/1440) ;
BEGIN
IF EMDW_LOG.P_IS_INFO_SET
THEN
EMDW_LOG.INFO('run_tasks:Enter '||
' Start time='||to_char(SYSDATE,'DD-MON-HH24:MI')||
' Will end before '|| to_char(l_end_time,'DD-MON-YY HH24:MI')||
' + last task time ',G_MODULE_NAME) ;
END IF ;
WHILE l_current_task_id > 0
LOOP
l_task_q_rec := dequeue_task(p_dequeue_id=>p_worker_id,
p_dequeue_mode=>G_DQ_MODE_PENDING_TASKS,
p_task_class_list=>p_task_class_list
) ;
l_current_task_id := l_task_q_rec.task_id ;
IF EMDW_LOG.P_IS_DEBUG_SET
THEN
EMDW_LOG.DEBUG('run_tasks:Found Task#'||l_task_q_rec.task_id||
' Message='||l_task_q_rec.message_code||
' Class='||l_task_q_rec.task_class||
' Time='||l_task_q_rec.scheduled_time,G_MODULE_NAME) ;
END IF ;
IF l_task_q_rec.task_id > 0 AND
l_task_q_rec.message_code = 0
THEN
BEGIN
l_next_time_utc := run_task(p_task_id=>l_task_q_rec.task_id) ;
l_min_time_utc := least(nvl(l_min_time_utc,l_next_time_utc),l_next_time_utc) ;
l_tasks_processed := l_tasks_processed + 1 ;
EXCEPTION
WHEN OTHERS THEN
-- Generally all errors are caught so the chances of this getting called
-- is remote, It is here just in case the time calculations raise error
-- or having problems scheduling next execution of task.
MGMT_LOG.log_error(
v_module_name_in=>MGMT_COLLECTION.G_MODULE_NAME,
v_error_code_in=>MGMT_GLOBAL.COLLECTION_ERR,
v_error_msg_in=>'Collection error:'||substr(sqlerrm,12)||
'(task='||l_task_q_rec.task_id||')',
v_facility_in=>NULL,
v_client_data_in=>p_worker_id,
v_log_level_in=>MGMT_GLOBAL.G_ERROR) ;
END ;
COMMIT ;
ELSE
EXIT ;
END IF ;
EXIT WHEN SYSDATE >= l_end_time ;
END LOOP ;
IF l_tasks_processed > 0
THEN
l_duration := (SYSDATE - l_sysdate_start)*86400000 ;
MGMT_LOG.LOG_PERFORMANCE(v_job_name_in=>MGMT_COLLECTION.G_MODULE_NAME,
v_duration_in=>l_duration,
v_time_in=>SYSDATE,
v_is_total_in=>'Y',
v_name_in=>'Collections',
v_value_in=>l_tasks_processed,
v_client_data_in=>p_worker_id,
v_module_in=>G_MODULE_NAME,
v_action_in=>p_task_class_list) ;
COMMIT ;
END IF ;
IF EMDW_LOG.P_IS_INFO_SET
THEN
EMDW_LOG.INFO('run_tasks:Exit',G_MODULE_NAME) ;
END IF ;
IF l_task_q_rec.message_code = p_worker_id AND l_task_q_rec.task_id = 0
THEN
RETURN(-1) ;
ELSE
RETURN(l_tasks_processed) ;
END IF ;
END run_tasks;
--
-- running a task outside of a queue
--
PROCEDURE run_task(p_task_id IN NUMBER)
IS
l_next_time_utc DATE ;
l_task_rec mgmt_coll_queue_obj ;
l_task_tab_rec mgmt_collection_tasks%rowtype ;
BEGIN
BEGIN
EM_TASK.get_task_info(p_task_id=>p_task_id,
p_task_info=>l_task_tab_rec,
p_lock_task=>TRUE) ;
EXCEPTION
WHEN OTHERS THEN
raise_application_error(MGMT_GLOBAL.INVALID_PARAMS_ERR,
'Invalid task ') ;
END ;
IF l_task_tab_rec.frequency_code != MGMT_GLOBAL.G_SCHED_FREQUENCY_ONDEMAND
THEN
l_task_rec := dequeue_task(p_dequeue_id=>p_task_id,
p_dequeue_mode=>G_DQ_MODE_TASK_ID) ;
END IF ;
IF l_task_rec.task_id = p_task_id OR
l_task_tab_rec.frequency_code = MGMT_GLOBAL.G_SCHED_FREQUENCY_ONDEMAND
THEN
l_next_time_utc := run_task(p_task_id=>p_task_id) ;
ELSE
raise_application_error(MGMT_GLOBAL.COLLECTION_ERR,
'Invalid Task') ;
END IF ;
END run_task;
PROCEDURE DBMSJOB_EXTENDED_SQL_TRACE_ON(p_value IN BOOLEAN, p_worker IN NUMBER) IS
BEGIN
MGMT_SQLTRACE.EXTENDED_SQL_TRACE_ON(EST_WORKER_NAME || p_worker, p_value);
END DBMSJOB_EXTENDED_SQL_TRACE_ON;
--
-- the actual worker procedure which runs the tasks
--
PROCEDURE worker(p_worker_id IN NUMBER)
IS
l_worker_rec mgmt_collection_workers%rowtype ;
l_status NUMBER ;
BEGIN
EMDW_LOG.set_context(v_context_type=>G_LOG_CONTEXT,
v_context_identifier=>'Worker#'||p_worker_id) ;
IF EMDW_LOG.P_IS_INFO_SET
THEN
EMDW_LOG.INFO('worker:Enter',G_MODULE_NAME) ;
END IF ;
MGMT_SQLTRACE.EXTENDED_SQL_TRACE(EST_WORKER_NAME || p_worker_id);
BEGIN
-- it is possible that the worker was removed so no exit
lock_worker_rec(p_worker_id=>p_worker_id,
p_worker_rec=>l_worker_rec) ;
EXCEPTION
WHEN OTHERS THEN RETURN ;
END ;
IF l_worker_rec.worker_status = G_WORKER_STATUS_STOP_PENDING
THEN
IF EMDW_LOG.P_IS_INFO_SET
THEN
EMDW_LOG.INFO('worker:found stop pending status',G_MODULE_NAME) ;
END IF ;
remove_worker(p_worker_id) ;
COMMIT ;
ELSE
IF l_worker_rec.worker_status = G_WORKER_STATUS_STARTING
THEN
UPDATE mgmt_collection_workers
SET worker_status = G_WORKER_STATUS_STARTED,
worker_start_time = SYSDATE
WHERE worker_id = p_worker_id ;
END IF ;
-- Release lock on worker record otherwise stop_workers will hang
COMMIT ;
l_status := run_tasks(p_worker_id=>p_worker_id,
p_task_class_list=>l_worker_rec.task_class_list) ;
-- if stop message or this is a foreground worker then remove worker
IF l_status < 0 OR l_worker_rec.job_id IS NULL
THEN
remove_worker(p_worker_id) ;
END IF ;
COMMIT ;
END IF ;
IF EMDW_LOG.P_IS_INFO_SET
THEN
EMDW_LOG.INFO('worker:Exit',G_MODULE_NAME) ;
END IF ;
EXCEPTION
WHEN OTHERS THEN
MGMT_LOG.log_error(
v_module_name_in=>MGMT_COLLECTION.G_MODULE_NAME,
v_error_code_in=>MGMT_GLOBAL.COLLECTION_ERR,
v_error_msg_in=>'Collection Worker error:'||substr(sqlerrm,12),
v_facility_in=>NULL,
v_client_data_in=>p_worker_id,
v_log_level_in=>MGMT_GLOBAL.G_ERROR) ;
END worker;
--
-- Procedure to detect and clean anamolies in task job.
-- This procedure runs every hour hour or so and detects anamolies
-- and corrects them. When we run into space issues, if we rollback, then we get
-- into a infinite loop with the same aq entr being processed unsuccessfully
-- many times. If we error out the AQ entries are not created.
PROCEDURE resubmit_failed_task
AS
l_task_id_array MGMT_INTEGER_TABLE := MGMT_INTEGER_TABLE();
l_task_class_array MGMT_INTEGER_TABLE := MGMT_INTEGER_TABLE();
l_mgmt_coll_queue_obj MGMT_COLL_QUEUE_OBJ;
BEGIN
IF EMDW_LOG.P_IS_INFO_SET THEN
EMDW_LOG.INFO('resubmit_failed_task:Entry',G_MODULE_NAME) ;
END IF ;
SELECT task_id, task_class BULK COLLECT
INTO l_task_id_array, l_task_class_array
FROM mgmt_collection_tasks c
WHERE frequency_code != 7
AND NOT EXISTS (SELECT 1
FROM mgmt_task_qtable q
WHERE q.user_data.task_id = c.task_id)
FOR UPDATE;
IF l_task_id_array IS NOT NULL AND
l_task_id_array.count > 0 THEN
FOR i in 1 .. l_task_id_array.last LOOP
l_mgmt_coll_queue_obj := mgmt_coll_queue_obj.new(P_TASK_ID => l_task_id_array(i),
P_TASK_CLASS => l_task_class_array(i),
P_MESSAGE_CODE => 0 ,
P_SCHEDULED_TIME => MGMT_GLOBAL.SYSDATE_UTC
);
EM_TASK.enqueue_task (p_coll_queue_rec => l_mgmt_coll_queue_obj );
END LOOP;
END IF;
COMMIT;
IF EMDW_LOG.P_IS_INFO_SET THEN
EMDW_LOG.INFO('resubmit_failed_task:Exit',G_MODULE_NAME) ;
END IF ;
EXCEPTION
WHEN OTHERS THEN
ROLLBACK;
MGMT_LOG.LOG_ERROR(G_MODULE_NAME, 0,
'Error in resubmit_failed_task'||
'(Error : '||sqlerrm ||')' );
END resubmit_failed_task;
BEGIN
G_REPO_OWNER := MGMT_USER.GET_REPOSITORY_OWNER;
END em_task;
/
show err