Rem drv: Rem $Header: emcore/source/oracle/sysman/emdrep/sql/core/latest/collections/collections_task_pkgbody.sql /st_emcore_10.2.0.4.3db11.2.0.2/1 2010/07/15 20:50:26 jsadras Exp $ Rem Rem collections_task_pkgbody.sql Rem Rem Copyright (c) 2004, 2010, Oracle and/or its affiliates. Rem All rights reserved. Rem Rem NAME Rem collections_task_pkgbody.sql - Rem Rem DESCRIPTION Rem Rem Rem NOTES Rem Rem Rem MODIFIED (MM/DD/YY) Rem jsadras 07/13/10 - Bug:9043010, convert to qualified sql name Rem jsadras 11/19/08 - fix min interval to not update table Rem jsadras 05/04/08 - Bug:7010717, call dbms_assert before executing Rem jsadras 08/22/07 - Backport jsadras_bug-6347110 from main Rem jsadras 07/30/07 - Backport jsadras_bug-6156475 from main Rem kmanicka 04/26/07 - Backport kmanicka_bug-5895148 from main Rem jsadras 08/17/07 - Bug:6347110, task workers Rem jsadras 07/22/07 - Bug:6156475, snapshot collections processing Rem change Rem kmanicka 04/17/07 - bug:5895148 fix get_job_schedule Rem jsadras 08/22/06 - Bug:5482661, Hints for performance Rem jsadras 08/30/06 - Backport jsadras_bug-5482661 from main Rem pmodi 12/09/05 - Backport jsadras_bug-4633750 from main Rem jsadras 11/06/05 - Bug:4633750, use next_message for 10.2DB Rem jsadras 09/15/05 - remove error_message is null from Rem resubmit_failed_task Rem pmodi 09/09/05 - Add new proc to detect and clean anamolies in Rem task job Rem jsadras 07/20/05 - Bug:4505966, protect against invalidation of Rem jobs engine package Rem scgrover 07/06/05 - add extended sql trace Rem jsadras 06/30/05 - reordering updates to avoid deadlock Rem gsbhatia 07/01/05 - New repmgr header impl Rem jsadras 06/09/05 - add debugging and logging Rem jsadras 06/02/05 - task class additions Rem jsadras 04/26/05 - calling async proc Rem jsadras 02/24/05 - remove update_min_time Rem dcawley 02/24/05 - Use repository function from user model Rem jsadras 02/21/05 - repository owner (Bug:4199342) Rem jsadras 12/15/04 - pass timezone to run_collections Rem jsadras 11/18/04 - min_interval Rem jsadras 11/03/04 - service task_context Rem jsadras 11/02/04 - extend task_type Rem jsadras 10/27/04 - on-demand-task Rem jsadras 10/19/04 - deq_condition: scheduled_date Rem jsadras 10/14/04 - remove_rounding Rem jsadras 09/30/04 - update_task Rem jsadras 09/22/04 - duplicate worker Rem jsadras 09/16/04 - task_error_message Rem jsadras 09/01/04 - stop_worker Rem jsadras 08/26/04 - handling failure Rem jsadras 08/17/04 - jsadras_repocollect2 Rem jsadras 08/17/04 - log performance/error Rem jsadras 08/16/04 - get_next_time Rem jsadras 08/03/04 - Created Rem CREATE OR REPLACE PACKAGE BODY em_task AS G_MIN_INTERVAL_PARAM VARCHAR2(35) := 'mgmt_task_min_interval' ; G_MAX_RUN_TIME_PARAM VARCHAR2(30) := 'mgmt_worker_max_run_time' ; G_DEFAULT_MIN_INTERVAL NUMBER(1) := 1 ; G_DQ_MODE_PENDING_TASKS CONSTANT NUMBER := 1 ; G_DQ_MODE_TASK_ID CONSTANT NUMBER := 2 ; G_DQ_MODE_WORKER_STOP CONSTANT NUMBER := 3 ; G_DQ_CONDITION_TEXT VARCHAR2(200) := ' tab.user_data.scheduled_time <= '|| ' CAST(SYS_EXTRACT_UTC(SYSTIMESTAMP) AS DATE)' || ' AND (tab.user_data.message_code = 0 OR tab.user_data.message_code = ' ; G_EVENT_PROC_SIGNATURE CONSTANT VARCHAR2(100) := '(:context)' ; G_REPO_OWNER dba_procedures.owner%TYPE ; -- -- task priorities for repo task, avail metric, repo async and adhoc task -- respectively -- G_TASK_PRIORITIES CONSTANT mgmt_integer_array := mgmt_integer_array(100,50,100,150,100) ; -- --Private procedures -- PROCEDURE copy_array(p_array1 IN MGMT_INTEGER_ARRAY, p_array2 OUT MGMT_JOB_INT_ARRAY) IS BEGIN p_array2 := MGMT_JOB_INT_ARRAY() ; IF p_array1 IS NOT NULL AND p_array1.COUNT > 0 THEN p_array2.extend(p_array1.COUNT) ; FOR i in p_array1.FIRST..p_array1.LAST LOOP p_array2(i) := p_array1(i) ; END LOOP ; END IF ; END copy_array; -- -- Make sure the interval is more than the -- minimum limit -- FUNCTION adjust_interval(p_interval IN NUMBER) RETURN NUMBER IS l_min_interval NUMBER ; BEGIN BEGIN SELECT parameter_value INTO l_min_interval FROM mgmt_parameters WHERE parameter_name = G_MIN_INTERVAL_PARAM ; EXCEPTION WHEN NO_DATA_FOUND THEN l_min_interval := G_DEFAULT_MIN_INTERVAL ; END ; RETURN(greatest(l_min_interval,p_interval)) ; END adjust_interval ; PROCEDURE set_param_value(p_param_name IN VARCHAR2, p_param_value IN NUMBER, p_comment IN VARCHAR2 DEFAULT NULL ) IS BEGIN INSERT INTO mgmt_parameters (parameter_name,parameter_value,parameter_comment,internal_flag) VALUES (p_param_name,p_param_value,p_comment,1) ; EXCEPTION WHEN DUP_VAL_ON_INDEX THEN UPDATE mgmt_parameters SET parameter_value = p_param_value, parameter_comment = NVL(p_comment,parameter_comment) WHERE parameter_name = p_param_name ; END set_param_value; -- -- If we do not update existing collections then existing tasks -- -- Set the maximum time in minutes a collection -- worker can continously run -- Default: 1 hour -- PROCEDURE set_max_run_time_worker(p_minutes IN NUMBER) IS BEGIN IF p_minutes <=1 THEN raise_application_error(MGMT_GLOBAL.INVALID_PARAMS_ERR, 'Run time has to be greater than a minute') ; END IF ; set_param_value(p_param_name=>G_MAX_RUN_TIME_PARAM, p_param_value=>p_minutes, p_comment=> 'Maximum time a collection worker can continously run'); END set_max_run_time_worker ; -- -- Return the maximum run time allowed -- for a collection worker to run continously -- Default is 1 hour -- FUNCTION max_run_time_worker RETURN NUMBER IS l_max_run_time NUMBER ; BEGIN BEGIN SELECT NVL(ABS(parameter_value),60) INTO l_max_run_time FROM mgmt_parameters WHERE parameter_name = G_MAX_RUN_TIME_PARAM ; EXCEPTION WHEN OTHERS THEN l_max_run_time := 60 ; END ; -- make sure worker time is at least 1 minute RETURN(GREATEST(1,l_max_run_time)) ; END max_run_time_worker; -- -- Set the minimum interval possible for interval frequency code -- PROCEDURE set_min_interval(p_interval IN NUMBER) IS BEGIN IF p_interval <=0 THEN raise_application_error(MGMT_GLOBAL.INVALID_PARAMS_ERR, 'Interval has to be greater than 0') ; END IF ; set_param_value(p_param_name=>G_MIN_INTERVAL_PARAM, p_param_value=>p_interval, p_comment=> 'Minimum collection interval to be enforced') ; -- -- do not update existing records for performance reasons -- END set_min_interval ; -- -- Schedules a task in AQ -- If p_coll_queue_rec.message_code = 0 then task is enqueued -- If p_coll_queue_rec.message_code = N then it is a stop message for worker N -- PROCEDURE enqueue_task(p_coll_queue_rec IN OUT mgmt_coll_queue_obj, p_priority IN NUMBER DEFAULT G_TASK_PRIORITY_DEFAULT, p_delay_sec IN NUMBER DEFAULT NULL, p_queue_name IN VARCHAR2 := G_TASK_QUEUE_NAME ) IS l_enqueue_options dbms_aq.enqueue_options_t; l_message_properties dbms_aq.message_properties_t; l_message_handle RAW(16); l_scheduled_time DATE ; invalid_queue EXCEPTION ; PRAGMA EXCEPTION_INIT(INVALID_QUEUE,-25205) ; BEGIN IF EMDW_LOG.P_IS_INFO_SET THEN EMDW_LOG.INFO('enqueue_task:Enter',G_MODULE_NAME) ; END IF ; IF p_coll_queue_rec IS NULL THEN raise_application_error(MGMT_GLOBAL.COLLECTION_ERR, 'Task record cannot be null') ; END IF ; IF p_priority < 0 THEN raise_application_error(MGMT_GLOBAL.COLLECTION_ERR, 'Priority should be greater than 0') ; END IF ; IF p_delay_sec < 0 THEN raise_application_error(MGMT_GLOBAL.COLLECTION_ERR, 'Delay should be greater than 0') ; END IF ; IF p_coll_queue_rec.scheduled_time IS NULL OR p_coll_queue_rec.scheduled_time < mgmt_global.sysdate_utc THEN p_coll_queue_rec.scheduled_time := mgmt_global.sysdate_utc ; END IF ; IF nvl(p_coll_queue_rec.task_id,-1) < 0 THEN raise_application_error(MGMT_GLOBAL.COLLECTION_ERR, 'Task ID should be greater than or equal to 0') ; END IF ; l_message_properties.priority := nvl(p_priority,G_TASK_PRIORITY_DEFAULT) ; l_message_properties.delay := nvl(p_delay_sec,0) ; IF EMDW_LOG.P_IS_DEBUG_SET THEN EMDW_LOG.DEBUG('enqueue_task:'|| 'task_id='||p_coll_queue_rec.task_id|| ',Time='||to_char(p_coll_queue_rec.scheduled_time,'DD-MON-YY HH24:MI')|| ',Priority='||to_char(p_priority), G_MODULE_NAME) ; END IF ; DBMS_AQ.ENQUEUE(queue_name=> G_REPO_OWNER||'.'||p_queue_name, enqueue_options=> l_enqueue_options, message_properties=> l_message_properties, payload=> p_coll_queue_rec, msgid=> l_message_handle); IF EMDW_LOG.P_IS_INFO_SET THEN EMDW_LOG.INFO('enqueue_task:Exit normal',G_MODULE_NAME) ; END IF ; EXCEPTION WHEN INVALID_QUEUE THEN IF EMDW_LOG.P_IS_ERROR_SET THEN EMDW_LOG.ERROR('enqueue_task:Exit exception invalid queue name',G_MODULE_NAME) ; END IF ; raise_application_error(MGMT_GLOBAL.COLLECTION_ERR, 'Invalid Queue Name '||p_queue_name) ; WHEN OTHERS THEN IF EMDW_LOG.P_IS_ERROR_SET THEN EMDW_LOG.ERROR('enqueue_task:Exit exception '||sqlerrm,G_MODULE_NAME) ; END IF ; raise_application_error(MGMT_GLOBAL.COLLECTION_ERR, sqlerrm||' when queueing task ') ; END enqueue_task; -- Dequeue a task -- Parameters -- p_dequeue_id = task_id of the task to be dequeued -- or worker id depending on dequeue mode. -- p_dequeue_mode Can take on multiple values -- G_DQ_MODE_PENDING_TASKS will get the next pending task for the -- the task type -- G_DQ_MODE_TASK_ID will get the task identified by p_dequeue_id -- G_DQ_MODE_STOP_WORKER will get stop messages for the worker -- identified by p_dequeue_id=worker_id -- -- FUNCTION dequeue_task(p_dequeue_id IN NUMBER, p_dequeue_mode IN NUMBER, p_task_class_list IN VARCHAR2 DEFAULT G_ALL_TASKS, p_queue_name IN VARCHAR2 DEFAULT G_TASK_QUEUE_NAME ) RETURN mgmt_coll_queue_obj IS l_dequeue_options dbms_aq.dequeue_options_t; l_message_properties dbms_aq.message_properties_t; l_message_handle RAW(16); l_message mgmt_coll_queue_obj ; invalid_queue exception ; no_data_in_q exception ; PRAGMA exception_init(INVALID_QUEUE,-25205) ; PRAGMA exception_init(NO_DATA_IN_Q,-25228) ; BEGIN IF EMDW_LOG.P_IS_INFO_SET THEN EMDW_LOG.INFO('dequeue_task:Enter Id='||p_dequeue_id|| ' Mode='||p_dequeue_mode|| ' Class='||p_task_class_list,G_MODULE_NAME ); END IF ; IF p_dequeue_mode = G_DQ_MODE_PENDING_TASKS THEN l_dequeue_options.deq_condition :=G_DQ_CONDITION_TEXT || p_dequeue_id ||')'; IF p_task_class_list != G_ALL_TASKS THEN l_dequeue_options.deq_condition := l_dequeue_options.deq_condition || ' AND tab.user_data.task_class IN ('||p_task_class_list||')' ; END IF ; ELSIF p_dequeue_mode = G_DQ_MODE_TASK_ID THEN l_dequeue_options.deq_condition := 'tab.user_data.task_id = '||p_dequeue_id ; ELSIF p_dequeue_mode = G_DQ_MODE_WORKER_STOP THEN l_dequeue_options.deq_condition := 'tab.user_data.message_code = '||p_dequeue_id||' AND '|| 'tab.user_data.task_id = 0' ; ELSE raise_application_error(MGMT_GLOBAL.COLLECTION_ERR, 'Invalid Dequeue Mode') ; END IF ; IF EMDW_LOG.P_IS_DEBUG_SET THEN EMDW_LOG.DEBUG('dequeue_task:Condition='|| l_dequeue_options.deq_condition,G_MODULE_NAME) ; END IF ; -- -- 10.2 DB has severe performance problems with FIRST MESSAGE (Bug:4633750) -- So we dequeue by next message first and if it fails due to AQ problem -- as documented in Metalink Note:118781, we revert to FIRST MESSAGE -- and try again. This solution though not optimal is the recommended -- solution from AQ dev. -- BEGIN l_dequeue_options.navigation :=dbms_aq.NEXT_MESSAGE ; l_dequeue_options.wait :=dbms_aq.NO_WAIT; dbms_aq.dequeue(queue_name => G_REPO_OWNER||'.'||p_queue_name, dequeue_options => l_dequeue_options, message_properties => l_message_properties, payload =>l_message, msgid => l_message_handle); EXCEPTION WHEN NO_DATA_IN_Q THEN IF EMDW_LOG.P_IS_DEBUG_SET THEN EMDW_LOG.DEBUG('dequeue_task:Try First message',G_MODULE_NAME); END IF ; -- navigation needed to overcome AQ problem See Note:118781.1 on Metalink l_dequeue_options.navigation :=dbms_aq.FIRST_MESSAGE ; dbms_aq.dequeue(queue_name => G_REPO_OWNER||'.'||p_queue_name, dequeue_options => l_dequeue_options, message_properties => l_message_properties, payload =>l_message, msgid => l_message_handle); END ; IF EMDW_LOG.P_IS_INFO_SET THEN EMDW_LOG.INFO('dequeue_task:exit task_id='||l_message.task_id|| ',message_code='||l_message.message_code,G_MODULE_NAME ) ; END IF ; RETURN(l_message) ; EXCEPTION WHEN INVALID_QUEUE THEN IF EMDW_LOG.P_IS_INFO_SET THEN EMDW_LOG.INFO('dequeue_task:exit exception invalid queue', G_MODULE_NAME ) ; END IF ; raise_application_error(MGMT_GLOBAL.COLLECTION_ERR, 'Invalid Queue Name '||p_queue_name) ; WHEN NO_DATA_IN_Q THEN IF EMDW_LOG.P_IS_INFO_SET THEN EMDW_LOG.INFO('dequeue_task:exit no task',G_MODULE_NAME ) ; END IF ; l_message := mgmt_coll_queue_obj.new(p_task_id=>-1, p_task_class=>0, p_message_code=>0, p_scheduled_time=>null) ; RETURN(l_message) ; WHEN OTHERS THEN IF EMDW_LOG.P_IS_INFO_SET THEN EMDW_LOG.INFO('dequeue_task:exit exception '||sqlerrm,G_MODULE_NAME ) ; END IF ; IF SQLCODE BETWEEN -20999 and -20000 THEN RAISE ; ELSE raise_application_error(MGMT_GLOBAL.COLLECTION_ERR, sqlerrm||' when dequeuing task') ; END IF ; END dequeue_task; -- --update aq task -- PROCEDURE update_aq_task(p_task_id IN NUMBER, p_priority IN NUMBER, p_next_time_utc IN DATE := NULL, p_task_type IN NUMBER := NULL ) IS l_task_rec mgmt_coll_queue_obj ; BEGIN IF EMDW_LOG.P_IS_INFO_SET THEN EMDW_LOG.INFO('update_aq_task:Enter Task='||p_task_id|| 'Next time utc='||to_char(p_next_time_utc, 'DD-MON-YY HH24:MI'),G_MODULE_NAME) ; END IF ; l_task_rec := dequeue_task(p_dequeue_id=>p_task_id, p_dequeue_mode=>G_DQ_MODE_TASK_ID) ; IF l_task_rec.task_id = p_task_id THEN l_task_rec := mgmt_coll_queue_obj.new( p_task_id=>p_task_id, p_task_class=>l_task_rec.task_class, p_message_code=>0, p_scheduled_time=>nvl(p_next_time_utc, l_task_rec.scheduled_time) ); enqueue_task(p_coll_queue_rec=> l_task_rec, p_priority=>p_priority) ; ELSE raise_application_error(MGMT_GLOBAL.COLLECTION_ERR, 'Unable to Update task in Queue') ; END IF ; IF EMDW_LOG.P_IS_INFO_SET THEN EMDW_LOG.INFO('update_aq_task:Exit normal',G_MODULE_NAME) ; END IF ; END update_aq_task; -- -- Locks the task so that others cannot update it -- PROCEDURE lock_task(p_task_id IN NUMBER) IS l_task_id mgmt_collection_tasks.task_id%type ; BEGIN SELECT task_id INTO l_task_id FROM mgmt_collection_tasks WHERE task_id = p_task_id FOR UPDATE ; EXCEPTION WHEN NO_DATA_FOUND THEN raise_application_error(MGMT_GLOBAL.INVALID_PARAMS_ERR, ' Invalid task Id ('||to_char(p_task_id)||')' ) ; END lock_task ; -- -- Get the task information and lock optionally -- PROCEDURE get_task_info(p_task_id IN NUMBER, p_task_info OUT NOCOPY mgmt_collection_tasks%ROWTYPE, p_lock_task IN BOOLEAN := FALSE ) IS BEGIN IF EMDW_LOG.P_IS_INFO_SET THEN EMDW_LOG.INFO('get_task_time:Enter task_id='||p_task_id, G_MODULE_NAME) ; END IF ; IF p_lock_task THEN -- add hints since the table size changes rapidly -- and optimizer uses stale stats SELECT /*+ INDEX(tasks mgmt_collection_tasks_pk) */ * INTO p_task_info FROM mgmt_collection_tasks tasks WHERE task_id = p_task_id FOR UPDATE ; ELSE SELECT /*+ INDEX(tasks mgmt_collection_tasks_pk) */ * INTO p_task_info FROM mgmt_collection_tasks tasks WHERE task_id = p_task_id ; END IF ; IF EMDW_LOG.P_IS_INFO_SET THEN EMDW_LOG.INFO('get_task_time:Exit', G_MODULE_NAME) ; END IF ; EXCEPTION WHEN NO_DATA_FOUND THEN IF EMDW_LOG.P_IS_ERROR_SET THEN EMDW_LOG.ERROR('get_task_info: Exit Exception not-found', G_MODULE_NAME) ; END IF ; raise_application_error(MGMT_GLOBAL.INVALID_PARAMS_ERR, 'Invalid task_id '||p_task_id) ; END ; -- -- utility function to get the next time for execution -- FUNCTION get_next_time_utc(p_frequency_code IN NUMBER, p_start_time IN DATE := NULL, p_end_time IN DATE := NULL, p_execution_hours IN NUMBER := NULL, p_execution_minutes IN NUMBER := NULL, p_interval IN NUMBER := NULL, p_days IN MGMT_INTEGER_ARRAY := NULL, p_months IN MGMT_INTEGER_ARRAY := NULL, p_last_collection_time IN DATE := NULL, p_timezone_region IN VARCHAR := G_UTC_TIMEZONE ) RETURN DATE IS l_schedule_rec mgmt_job_schedule_record ; l_next_collection_time date ; l_days mgmt_job_int_array := mgmt_job_int_array() ; l_months mgmt_job_int_array := mgmt_job_int_array() ; BEGIN IF EMDW_LOG.P_IS_INFO_SET THEN EMDW_LOG.INFO('get_next_time_utc:Enter freq='||p_frequency_code|| ' Interval='||p_interval||' Last time='||p_last_collection_time|| ' timezone='||p_timezone_region,G_MODULE_NAME) ; END IF ; IF p_frequency_code != MGMT_GLOBAL.G_SCHED_FREQUENCY_ONDEMAND THEN copy_array(p_days,l_days) ; copy_array(p_months,l_months) ; l_schedule_rec := mgmt_jobs.get_job_schedule_record( p_frequency_code, nvl(p_start_time,sys_extract_utc(systimestamp)), p_end_time, p_execution_hours, p_execution_minutes, p_interval, l_months, l_days, mgmt_job_engine.timezone_rgn_specified, null, null, p_timezone_region) ; RETURN(mgmt_job_engine.get_next_execution_time (p_schedule=>l_schedule_rec, p_last_start_time=>p_last_collection_time, p_tzregion=>p_timezone_region)) ; ELSE RETURN(NULL) ; END IF ; END get_next_time_utc ; -- -- get the task priority for a task type -- FUNCTION task_priority(p_task_type IN NUMBER, p_priority IN NUMBER) RETURN NUMBER IS BEGIN RETURN(GREATEST(NVL(p_priority,1), G_TASK_PRIORITIES(p_task_type))) ; EXCEPTION WHEN OTHERS THEN RAISE_APPLICATION_ERROR(MGMT_GLOBAL.INVALID_PARAMS_ERR, 'Task type does not exist (create_task)') ; END task_priority ; -- -- creates a collection task in the table and places it in AQ -- FUNCTION create_task(p_coll_schedule IN mgmt_coll_schedule_obj, p_task_type IN NUMBER := 0 , p_timezone_region IN VARCHAR2 := G_UTC_TIMEZONE, p_task_proc IN VARCHAR2 := NULL, p_context IN mgmt_namevalue_array DEFAULT NULL, p_priority IN NUMBER DEFAULT NULL, p_task_class IN NUMBER DEFAULT 0 ) RETURN NUMBER IS l_task_id mgmt_collection_tasks.task_id%type ; l_schedule_record mgmt_job_schedule_record ; l_next_collection_time date ; l_task_rec mgmt_coll_queue_obj ; l_start_time DATE ; l_interval NUMBER ; l_priority NUMBER ; BEGIN IF EMDW_LOG.P_IS_INFO_SET THEN EMDW_LOG.INFO('create_task:Enter timezone='||p_timezone_region, G_MODULE_NAME) ; END IF ; EM_COLL_UTIL.validate_schedule(p_coll_schedule) ; SELECT mgmt_task_sequence.nextval INTO l_task_id FROM DUAL ; l_start_time := nvl(p_coll_schedule.start_time, mgmt_global.sysdate_tzrgn(p_timezone_region)) ; l_priority := task_priority(p_task_type,p_priority) ; IF p_coll_schedule.frequency_code = MGMT_GLOBAL.G_SCHED_FREQUENCY_INTERVAL THEN l_interval := adjust_interval(p_coll_schedule.interval) ; END IF ; l_next_collection_time:= get_next_time_utc( p_frequency_code=>p_coll_schedule.frequency_code, p_start_time=> l_start_time, p_end_time=> p_coll_schedule.end_time, p_execution_hours=>p_coll_schedule.execution_hours, p_execution_minutes => p_coll_schedule.execution_minutes, p_interval=>l_interval, p_days =>p_coll_schedule.days, p_months =>p_coll_schedule.months, p_last_collection_time =>NULL, p_timezone_region => p_timezone_region) ; IF l_next_collection_time IS NOT NULL OR p_coll_schedule.frequency_code = MGMT_GLOBAL.G_SCHED_FREQUENCY_ONDEMAND THEN INSERT INTO mgmt_collection_tasks (task_id,task_type,timezone_region,frequency_code,start_time,end_time, execution_hours,execution_minutes,interval,months,days, task_status,last_collection_timestamp,next_collection_timestamp,task_proc, priority,task_class) VALUES (l_task_id,p_task_type,p_timezone_region,p_coll_schedule.frequency_code, p_coll_schedule.start_time,p_coll_schedule.end_time, p_coll_schedule.execution_hours,p_coll_schedule.execution_minutes, l_interval,p_coll_schedule.months, p_coll_schedule.days,G_TASK_STATUS_IDLE, NULL,l_next_collection_time,p_task_proc,l_priority,p_task_class) ; IF p_context IS NOT NULL AND p_context.COUNT > 0 THEN BEGIN FOR i IN p_context.FIRST..p_context.LAST LOOP IF p_context.EXISTS(i) THEN INSERT INTO mgmt_collection_task_context (task_id,name,value) VALUES (l_task_id,p_context(i).name,p_context(i).value) ; END IF ; END LOOP ; EXCEPTION WHEN DUP_VAL_ON_INDEX THEN raise_application_error(MGMT_GLOBAL.INVALID_PARAMS_ERR, 'Duplicate name in context (create_task)') ; WHEN OTHERS THEN raise_application_error(MGMT_GLOBAL.INVALID_PARAMS_ERR, substr(sqlerrm,1,100)||'(create_task)') ; END ; END IF ; IF p_coll_schedule.frequency_code != MGMT_GLOBAL.G_SCHED_FREQUENCY_ONDEMAND THEN l_task_rec := mgmt_coll_queue_obj.new ( p_task_id=>l_task_id, p_task_class=>p_task_class, p_message_code=>0, p_scheduled_time=>l_next_collection_time) ; enqueue_task(p_coll_queue_rec=> l_task_rec, p_priority=>l_priority) ; END IF ; ELSE raise_application_error(MGMT_GLOBAL.COLLECTION_ERR, 'next scheduled time is NULL') ; END IF ; IF EMDW_LOG.P_IS_INFO_SET THEN EMDW_LOG.INFO('create_task:exit normal task='||to_char(l_task_id),G_MODULE_NAME) ; END IF ; RETURN(l_task_id) ; EXCEPTION WHEN OTHERS THEN IF EMDW_LOG.P_IS_ERROR_SET THEN EMDW_LOG.ERROR('create_task:exit exception '||sqlerrm,G_MODULE_NAME) ; END IF ; IF SQLCODE BETWEEN -20999 and -20000 THEN RAISE ; ELSE raise_application_error(MGMT_GLOBAL.INVALID_PARAMS_ERR, sqlerrm||to_char(sqlcode)||' when creating task ') ; END IF ; END create_task; -- -- update task -- PROCEDURE update_task_pvt(p_task_id IN NUMBER, p_task_type IN NUMBER := NULL, p_priority IN NUMBER := NULL, p_next_time_utc IN DATE := NULL, p_last_collection_timestamp IN DATE := NULL, p_error_message IN VARCHAR2 := NULL, p_failures IN NUMBER := NULL, p_total_runs IN NUMBER := NULL, p_min_wait_time IN NUMBER := NULL, p_max_wait_time IN NUMBER := NULL, p_avg_wait_time IN NUMBER := NULL, p_min_run_time IN NUMBER := NULL, p_max_run_time IN NUMBER := NULL, p_avg_run_time IN NUMBER := NULL, p_new_task_in_q IN BOOLEAN := FALSE ) IS l_task_rec mgmt_coll_queue_obj ; l_frequency_code mgmt_collections.frequency_code%type ; l_next_time_utc DATE ; l_task_type mgmt_collection_tasks.task_type%type ; l_task_class mgmt_collection_tasks.task_class%type ; l_priority NUMBER ; BEGIN IF EMDW_LOG.P_IS_INFO_SET THEN EMDW_LOG.INFO('update_task_pvt:Enter',G_MODULE_NAME) ; END IF ; IF p_priority IS NOT NULL THEN l_priority := task_priority(l_task_type,p_priority) ; END IF ; -- There is no nvl on error message since we want to clear the error -- message on successful execution UPDATE mgmt_collection_tasks SET task_type=nvl(p_task_type,task_type), priority=nvl(l_priority,priority), next_collection_timestamp=nvl(p_next_time_utc, next_collection_timestamp), last_collection_timestamp=nvl(p_last_collection_timestamp, last_collection_timestamp), error_message=p_error_message, failures=nvl(p_failures,failures), total_runs=nvl(p_total_runs,total_runs), min_wait_time=nvl(p_min_wait_time,min_wait_time), max_wait_time=nvl(p_max_wait_time,max_wait_time), avg_wait_time=nvl(p_avg_wait_time,avg_wait_time), min_run_time=nvl(p_min_run_time,min_run_time), max_run_time=nvl(p_max_run_time,max_run_time), avg_run_time=nvl(p_avg_run_time,avg_run_time) WHERE task_id = p_task_id RETURNING frequency_code,priority,task_type, next_collection_timestamp,task_class INTO l_frequency_code ,l_priority, l_task_type ,l_next_time_utc,l_task_class ; IF l_frequency_code != MGMT_GLOBAL.G_SCHED_FREQUENCY_ONDEMAND THEN IF p_new_task_in_q AND p_next_time_utc IS NOT NULL THEN l_task_rec := mgmt_coll_queue_obj.NEW(p_task_id=>p_task_id, p_task_class=>l_task_class, p_message_code=>0, p_scheduled_time=>l_next_time_utc) ; enqueue_task(p_coll_queue_rec=>l_task_rec, p_priority=>l_priority) ; ELSIF NOT p_new_task_in_q THEN IF p_task_type IS NOT NULL OR p_next_time_utc IS NOT NULL OR p_priority IS NOT NULL THEN update_aq_task(p_task_id=>p_task_id, p_next_time_utc=>l_next_time_utc, p_task_type=>p_task_type, p_priority=>l_priority) ; END IF ; END IF ; END IF ; IF EMDW_LOG.P_IS_INFO_SET THEN EMDW_LOG.INFO('update_task_pvt:Exit',G_MODULE_NAME) ; END IF ; EXCEPTION WHEN OTHERS THEN IF EMDW_LOG.P_IS_INFO_SET THEN EMDW_LOG.INFO('update_task_pvt:Exit exception'||sqlerrm,G_MODULE_NAME) ; END IF ; IF SQLCODE BETWEEN -20999 and -20000 THEN RAISE ; ELSE raise_application_error(MGMT_GLOBAL.COLLECTION_ERR, sqlerrm||to_char(sqlcode)||' when updating task ') ; END IF ; END update_task_pvt; -- -- Update the task queue parameters -- PROCEDURE update_task(p_task_id IN NUMBER, p_task_type IN NUMBER := NULL, p_priority IN NUMBER := NULL, p_next_time_utc IN DATE := NULL ) IS BEGIN IF EMDW_LOG.P_IS_INFO_SET THEN EMDW_LOG.INFO('update_task:Enter',G_MODULE_NAME) ; END IF ; update_task_pvt(p_task_id=>p_task_id, p_task_type=>p_task_type, p_priority=>p_priority, p_next_time_utc=>p_next_time_utc, p_new_task_in_q=>FALSE) ; IF EMDW_LOG.P_IS_INFO_SET THEN EMDW_LOG.INFO('update_task:Exit',G_MODULE_NAME) ; END IF ; END update_task; -- -- Updates the task to a new schedule -- PROCEDURE update_task(p_task_id IN NUMBER, p_coll_schedule IN mgmt_coll_schedule_obj, p_timezone_region IN VARCHAR2) IS l_next_collection_time DATE ; l_last_collection_time DATE ; l_interval NUMBER ; l_priority mgmt_collection_tasks.priority%type ; BEGIN IF EMDW_LOG.P_IS_INFO_SET THEN EMDW_LOG.INFO('update_task:schedule:Enter',G_MODULE_NAME) ; EMDW_LOG.INFO('update_task:timezone='||p_timezone_region,G_MODULE_NAME); END IF ; SELECT last_collection_timestamp INTO l_last_collection_time FROM mgmt_collection_tasks WHERE task_id = p_task_id FOR UPDATE ; IF p_coll_schedule.frequency_code = MGMT_GLOBAL.G_SCHED_FREQUENCY_INTERVAL THEN l_interval := adjust_interval(p_coll_schedule.interval) ; END IF ; IF p_coll_schedule.frequency_code != MGMT_GLOBAL.G_SCHED_FREQUENCY_ONDEMAND THEN l_next_collection_time:= get_next_time_utc( p_frequency_code=>p_coll_schedule.frequency_code, p_start_time=>nvl(p_coll_schedule.start_time, mgmt_global.sysdate_tzrgn(p_timezone_region)), p_end_time=> p_coll_schedule.end_time, p_execution_hours=>p_coll_schedule.execution_hours, p_execution_minutes => p_coll_schedule.execution_minutes, p_interval=>l_interval, p_days =>p_coll_schedule.days, p_months =>p_coll_schedule.months, p_last_collection_time =>l_last_collection_time, p_timezone_region => p_timezone_region) ; END IF ; UPDATE mgmt_collection_tasks SET frequency_code = p_coll_schedule.frequency_code, execution_hours = p_coll_schedule.execution_hours, execution_minutes = p_coll_schedule.execution_minutes, interval = l_interval, days=p_coll_schedule.days, months=p_coll_schedule.months, start_time =p_coll_schedule.start_time, end_time =p_coll_schedule.end_time, next_collection_timestamp=l_next_collection_time, timezone_region=nvl(p_timezone_region,G_UTC_TIMEZONE) WHERE task_id = p_task_id RETURNING priority INTO l_priority ; IF p_coll_schedule.frequency_code != MGMT_GLOBAL.G_SCHED_FREQUENCY_ONDEMAND AND l_next_collection_time IS NOT NULL THEN update_aq_task(p_task_id=>p_task_id, p_priority=>l_priority, p_next_time_utc=>l_next_collection_time) ; END IF ; IF EMDW_LOG.P_IS_INFO_SET THEN EMDW_LOG.INFO('update_task:schedule:Exit normal',G_MODULE_NAME) ; END IF ; EXCEPTION WHEN NO_DATA_FOUND THEN raise_application_error(MGMT_GLOBAL.INVALID_PARAMS_ERR, 'Invalid Task Id - '||to_char(p_task_id)) ; END update_task ; -- -- Put the task back in queue -- -- If the next time is null -- suspend repository collections associated with task -- remove the task -- FUNCTION schedule_next_execution(p_task_info IN mgmt_collection_tasks%rowtype) RETURN DATE IS l_task_rec mgmt_coll_queue_obj ; l_next_time_utc DATE ; BEGIN BEGIN l_next_time_utc := get_next_time_utc( p_frequency_code=>p_task_info.frequency_code, p_start_time=> p_task_info.start_time, p_end_time=> p_task_info.end_time, p_execution_hours=>p_task_info.execution_hours, p_execution_minutes => p_task_info.execution_minutes, p_interval=>p_task_info.interval, p_days =>p_task_info.days, p_months =>p_task_info.months, p_last_collection_time => p_task_info.last_collection_timestamp, p_timezone_region => p_task_info.timezone_region) ; EXCEPTION WHEN MGMT_GLOBAL.plsql_package_discarded OR MGMT_GLOBAL.plsql_program_not_found THEN IF p_task_info.frequency_code = MGMT_GLOBAL.G_SCHED_FREQUENCY_INTERVAL THEN l_next_time_utc := MGMT_GLOBAL.sysdate_utc + (p_task_info.interval /1440) ; ELSE l_next_time_utc := MGMT_GLOBAL.sysdate_utc + 1/24 ; END IF ; END ; IF l_next_time_utc IS NOT NULL OR p_task_info.frequency_code = MGMT_GLOBAL.G_SCHED_FREQUENCY_ONDEMAND THEN update_task_pvt(p_task_id=>p_task_info.task_id, p_next_time_utc=>l_next_time_utc, p_last_collection_timestamp=>p_task_info.last_collection_timestamp, p_error_message=>p_task_info.error_message, p_failures=>p_task_info.failures, p_total_runs=>p_task_info.total_runs, p_min_wait_time=>p_task_info.min_wait_time, p_max_wait_time=>p_task_info.max_wait_time, p_avg_wait_time=>p_task_info.avg_wait_time, p_min_run_time=>p_task_info.min_run_time, p_max_run_time=>p_task_info.max_run_time, p_avg_run_time=>p_task_info.avg_run_time, p_new_task_in_q=>TRUE ) ; ELSIF p_task_info.frequency_code != MGMT_GLOBAL.G_SCHED_FREQUENCY_ONDEMAND THEN IF p_task_info.task_type = G_TASK_TYPE_REPO THEN EM_COLL_UTIL.set_task_null(p_task_id=>p_task_info.task_id) ; END IF ; remove_task(p_task_info.task_id,FALSE) ; END IF ; RETURN(l_next_time_utc) ; EXCEPTION WHEN OTHERS THEN IF SQLCODE BETWEEN -20999 and -20000 THEN RAISE ; ELSE raise_application_error(MGMT_GLOBAL.COLLECTION_ERR, sqlerrm||' when scheduling next execution of task') ; END IF ; END schedule_next_execution; -- -- Removes a task -- PROCEDURE remove_task(p_task_id IN NUMBER, p_remove_aq IN BOOLEAN := TRUE) IS l_task_id mgmt_collection_tasks.task_id%type ; l_task_rec mgmt_coll_queue_obj ; l_frequency_code mgmt_collection_tasks.frequency_code%type ; l_next_time_utc mgmt_collection_tasks.next_collection_timestamp%type ; BEGIN IF EMDW_LOG.P_IS_INFO_SET THEN EMDW_LOG.INFO('remove_task:Enter',G_MODULE_NAME) ; END IF ; -- add hints since the table size changes rapidly -- and optimizer uses stale stats DELETE /*+ INDEX(tasks mgmt_collection_tasks_pk) */ mgmt_collection_tasks tasks WHERE task_id = p_task_id RETURNING frequency_code,next_collection_timestamp INTO l_frequency_code , l_next_time_utc ; DELETE /*+ INDEX(ctx mgmt_collection_task_ctx_pk) */ mgmt_collection_task_context ctx WHERE task_id = p_task_id ; -- -- tasks are only queued in AQ if next time is not null and -- collection frequency is not ON DEMAND -- IF l_frequency_code != MGMT_GLOBAL.G_SCHED_FREQUENCY_ONDEMAND AND l_next_time_utc IS NOT NULL AND p_remove_aq THEN l_task_rec:= dequeue_task(p_dequeue_id=>p_task_id, p_dequeue_mode=>G_DQ_MODE_TASK_ID) ; IF l_task_rec.task_id < 0 THEN raise_application_error(MGMT_GLOBAL.COLLECTION_ERR, 'Task not found in Queue') ; END IF ; END IF ; IF EMDW_LOG.P_IS_INFO_SET THEN EMDW_LOG.INFO('remove_task:Exit normal',G_MODULE_NAME) ; END IF ; EXCEPTION WHEN OTHERS THEN IF SQLCODE BETWEEN -20999 and -20000 THEN RAISE ; ELSE raise_application_error(MGMT_GLOBAL.COLLECTION_ERR, sqlerrm||' when removing task') ; END IF ; END remove_task; -- -- Enqueues a stop worker task so the worker can stop -- PROCEDURE stop_worker_task(p_worker_id IN NUMBER) IS l_task_rec mgmt_coll_queue_obj ; BEGIN IF EMDW_LOG.P_IS_INFO_SET THEN EMDW_LOG.INFO('stop_worker_task:Enter Worker='||p_worker_id,G_MODULE_NAME); END IF ; IF nvl( p_worker_id,0) < 1 THEN raise_application_error(MGMT_GLOBAL.INVALID_PARAMS_ERR, 'Worker Id should be greater than 0') ; END IF ; l_task_rec := mgmt_coll_queue_obj.new( p_task_id=>0, p_task_class=>0, p_message_code=>p_worker_id, p_scheduled_time=> mgmt_global.sysdate_utc) ; -- enqueue at higher priority enqueue_task(p_coll_queue_rec=> l_task_rec, p_priority=>G_TASK_PRIORITY_STOP) ; EXCEPTION WHEN OTHERS THEN IF SQLCODE BETWEEN -20999 and -20000 THEN RAISE ; ELSE raise_application_error(MGMT_GLOBAL.COLLECTION_ERR, sqlerrm||' when creating stop worker task') ; END IF ; END stop_worker_task; -- -- internal function to return the lowest unallocted worker id number -- FUNCTION get_next_worker_id RETURN NUMBER IS CURSOR workers_cur IS SELECT worker_id, nvl(lag(worker_id) over (ORDER BY WORKER_ID ASC),0) prev_worker_id FROM mgmt_collection_workers ORDER BY worker_id ; l_next_worker_id NUMBER := 0 ; BEGIN FOR workers IN workers_cur LOOP IF workers.worker_id > workers.prev_worker_id +1 THEN l_next_worker_id := workers.prev_worker_id ; EXIT ; ELSE l_next_worker_id := workers.worker_id ; END IF ; END LOOP ; RETURN(l_next_worker_id+1) ; END get_next_worker_id ; -- -- lock the worker record -- PROCEDURE lock_worker_rec(p_worker_id IN NUMBER, p_worker_rec OUT mgmt_collection_workers%rowtype) IS BEGIN SELECT * INTO p_worker_rec FROM mgmt_collection_workers WHERE worker_id = p_worker_id FOR UPDATE ; EXCEPTION WHEN NO_DATA_FOUND THEN raise_application_error(MGMT_GLOBAL.INVALID_PARAMS_ERR, 'Invalid worker '||p_worker_id) ; END ; -- -- return the next execution time -- Idea is to build some sophistication into this -- FUNCTION get_next_execution_time RETURN DATE IS BEGIN RETURN(SYSDATE+G_MINUTE) ; END get_next_execution_time; -- -- create a worker thread -- if p_background = TRUE then a DBMS_JOB is submitted -- if p_background = FALSE then the worker record is created -- and the collection worker is run syncronously in the session -- FUNCTION create_worker(p_background IN BOOLEAN := TRUE, p_task_class_list IN VARCHAR2 DEFAULT NULL ) RETURN NUMBER IS l_worker_id mgmt_collection_workers.worker_id%type; l_worker_status mgmt_collection_workers.worker_status%type; l_dbms_job_id NUMBER ; BEGIN IF EMDW_LOG.P_IS_INFO_SET THEN EMDW_LOG.INFO('create_worker:Enter',G_MODULE_NAME) ; END IF ; LOCK TABLE mgmt_collection_workers IN EXCLUSIVE MODE ; l_worker_id := get_next_worker_id ; IF p_background THEN BEGIN -- Remove jobs if any which are the same as worker -- They do not have entry in collection workers so -- they have been submitted manually FOR rec IN ( SELECT job FROM user_jobs WHERE what = G_WORKER_PROC||'('||l_worker_id||');' ) LOOP DBMS_JOB.REMOVE(rec.job) ; END LOOP ; END ; DBMS_JOB.SUBMIT(job=>l_dbms_job_id, what=>G_WORKER_PROC||'('||l_worker_id||');', next_date=>SYSDATE, interval=>G_INTERVAL_PROC, no_parse=>FALSE) ; l_worker_status := G_WORKER_STATUS_STARTING ; ELSE l_worker_status := G_WORKER_STATUS_STARTED ; END IF ; INSERT INTO MGMT_COLLECTION_WORKERS (worker_id,worker_status,task_class_list, job_id, worker_start_time) VALUES (l_worker_id,l_worker_status,NVL(p_task_class_list,G_ALL_TASKS), l_dbms_job_id,null) ; IF EMDW_LOG.P_IS_INFO_SET THEN EMDW_LOG.INFO('create_worker:Exit Worker='||l_worker_id,G_MODULE_NAME) ; END IF ; RETURN(l_worker_id) ; EXCEPTION WHEN OTHERS THEN raise_application_error(MGMT_GLOBAL.INVALID_PARAMS_ERR, sqlerrm||' when creating worker') ; END create_worker; -- -- remove the worker identified by p_worker_id -- PROCEDURE remove_worker(p_worker_id IN NUMBER) IS l_worker_rec mgmt_collection_workers%rowtype ; l_job_id mgmt_collection_workers.job_id%type ; l_task_rec mgmt_coll_queue_obj ; BEGIN IF EMDW_LOG.P_IS_INFO_SET THEN EMDW_LOG.INFO('remove_worker:Enter Worker ='||p_worker_id,G_MODULE_NAME) ; END IF ; BEGIN SELECT * INTO l_worker_rec FROM mgmt_collection_workers WHERE worker_id = p_worker_id FOR UPDATE ; EXCEPTION WHEN NO_DATA_FOUND THEN RAISE_APPLICATION_ERROR(MGMT_GLOBAL.INVALID_PARAMS_ERR, 'Worker Id '||p_worker_id||' does not exist') ; END ; -- remove the dbms job associated with the worker IF l_worker_rec.job_id IS NOT NULL THEN BEGIN SELECT job INTO l_job_id FROM user_jobs WHERE job = l_worker_rec.job_ID AND what = G_WORKER_PROC ||'('||p_worker_id||');' ; DBMS_JOB.REMOVE(l_job_id) ; EXCEPTION WHEN NO_DATA_FOUND THEN NULL ; WHEN OTHERS THEN raise_application_error(MGMT_GLOBAL.INVALID_PARAMS_ERR, sqlerrm||' when removing dbms_job '||l_job_id) ; END ; END IF ; -- remove all stop messages from queue LOOP l_task_rec := dequeue_task(p_dequeue_id=>p_worker_id, p_dequeue_mode=>G_DQ_MODE_WORKER_STOP) ; EXIT WHEN l_task_rec.task_id < 0 ; END LOOP ; DELETE mgmt_collection_workers WHERE worker_id = p_worker_id ; IF EMDW_LOG.P_IS_INFO_SET THEN EMDW_LOG.INFO('remove_worker:Exit ',G_MODULE_NAME) ; END IF ; END remove_worker; -- -- stop the worker -- if p_stop_mode =NORMAL the worker will stop when it wakes up next -- if p_stop_mode = IMMEDIATE -- if worker is running, it will dequeue and stop -- if worker is idle then it will stop when it wakes up -- TODO: Need to add task_type PROCEDURE stop_worker(p_worker_id IN NUMBER, p_stop_mode IN NUMBER := G_WORKER_STOP_NORMAL) IS l_worker_rec mgmt_collection_workers%rowtype ; BEGIN IF EMDW_LOG.P_IS_INFO_SET THEN EMDW_LOG.INFO('stop_worker:Enter Worker='||p_worker_id,G_MODULE_NAME) ; END IF ; lock_worker_rec(p_worker_id=>p_worker_id, p_worker_rec=>l_worker_rec) ; IF l_worker_rec.worker_status != G_WORKER_STATUS_STOP_PENDING THEN UPDATE mgmt_collection_workers SET worker_status = G_WORKER_STATUS_STOP_PENDING WHERE worker_id = p_worker_id ; END IF ; IF p_stop_mode = G_WORKER_STOP_IMMEDIATE THEN EM_TASK.stop_worker_task(p_worker_id=>p_worker_id) ; END IF ; IF EMDW_LOG.P_IS_INFO_SET THEN EMDW_LOG.INFO('stop_worker:Exit ',G_MODULE_NAME) ; END IF ; END stop_worker; -- -- stop a bunch of workers,higher numbered workers will be stopped first -- if p_worker_count is NULL then all workers for the task class will be stopped -- PROCEDURE stop_workers(p_worker_count IN NUMBER := NULL, p_stop_mode IN NUMBER := G_WORKER_STOP_NORMAL, p_task_class_list IN VARCHAR2 DEFAULT NULL) IS l_reduced_count NUMBER := 0 ; BEGIN IF EMDW_LOG.P_IS_INFO_SET THEN EMDW_LOG.INFO('stop_workers:Enter Worker count='||p_worker_count|| ' task_class='||p_task_class_list,G_MODULE_NAME) ; END IF ; LOCK TABLE mgmt_collection_workers IN EXCLUSIVE MODE ; IF p_worker_count <= 0 THEN raise_application_error(MGMT_GLOBAL.COLLECTION_ERR, 'Worker Count has to be greater than 0') ; END IF ; -- remove up to worker count, higher numbered worker ids will be removed first FOR rec IN (SELECT worker_id FROM mgmt_collection_workers WHERE worker_status ! = G_WORKER_STATUS_STOP_PENDING AND task_class_list = NVL(p_task_class_list,G_ALL_TASKS) ORDER BY worker_id DESC) LOOP l_reduced_count := l_reduced_count + 1 ; stop_worker(p_worker_id=>rec.worker_id, p_stop_mode=>p_stop_mode) ; EXIT WHEN l_reduced_count = p_worker_count ; END LOOP ; COMMIT ; IF EMDW_LOG.P_IS_INFO_SET THEN EMDW_LOG.INFO('stop_workers:Exit',G_MODULE_NAME) ; END IF ; END stop_workers ; -- -- run the task identified by task_id -- FUNCTION run_task(p_task_id IN NUMBER) RETURN DATE IS l_task_tab_rec mgmt_collection_tasks%rowtype ; l_sysdate_utc_start DATE := SYS_EXTRACT_UTC(SYSTIMESTAMP) ; l_sysdate_utc_end DATE ; l_wait_time NUMBER ; l_run_time NUMBER ; l_next_time_utc DATE ; l_metric_values mgmt_metric_value_array := mgmt_metric_value_array() ; l_context mgmt_namevalue_array := mgmt_namevalue_array() ; FUNCTION context_string(p_context IN mgmt_namevalue_array) RETURN VARCHAR2 IS l_context_string VARCHAR2(2048) := '('; BEGIN IF p_context IS NOT NULL AND p_context.COUNT > 0 THEN FOR i IN p_context.FIRST..p_context.LAST LOOP l_context_string := l_context_string || ' '||p_context(i).name ||'='||p_context(i).value||' ' ; END LOOP ; END IF ; l_context_string := l_context_string||')' ; RETURN(l_context_string) ; EXCEPTION -- only exception is if context exceeds 2048 characters so -- we return what fits in WHEN OTHERS THEN RETURN(l_context_string||')') ; END context_string ; BEGIN IF EMDW_LOG.P_IS_INFO_SET THEN EMDW_LOG.INFO('run_task:Enter',G_MODULE_NAME) ; END IF ; BEGIN EM_TASK.get_task_info(p_task_id=>p_task_id, p_task_info=>l_task_tab_rec, p_lock_task=>TRUE) ; EXCEPTION WHEN OTHERS THEN raise_application_error(MGMT_GLOBAL.INVALID_PARAMS_ERR, 'Invalid task '||p_task_id||' in queue') ; END ; BEGIN IF l_task_tab_rec.task_type = G_TASK_TYPE_REPO THEN EM_COLL_UTIL.run_collection (p_task_id=>p_task_id, p_eval_mode=>EM_METRIC_EVAL.G_TASK_EVAL_MODE, p_timezone_region=>l_task_tab_rec.timezone_region, p_metric_values=>l_metric_values, p_error_message=>l_task_tab_rec.error_message) ; ELSE IF EMDW_LOG.P_IS_DEBUG_SET THEN EMDW_LOG.DEBUG('run_task:task type='||l_task_tab_rec.task_type|| ' Procedure='||l_task_tab_rec.task_proc, G_MODULE_NAME) ; END IF ; BEGIN -- add hints since the table size changes rapidly -- and optimizer uses stale stats SELECT /*+ INDEX(ctx mgmt_collection_task_ctx_pk) */ mgmt_namevalue_obj.new(name,value) BULK COLLECT INTO l_context FROM mgmt_collection_task_context ctx WHERE task_id = p_task_id ; EXCEPTION WHEN NO_DATA_FOUND THEN NULL ; END; IF EMDW_LOG.P_IS_DEBUG_SET THEN EMDW_LOG.DEBUG('run_task:Context='||context_string(l_context) ,G_MODULE_NAME); END IF ; IF l_task_tab_rec.task_type = G_TASK_TYPE_AVAIL THEN EM_REP_METRIC.run_metric_eval(p_context=>l_context) ; ELSIF l_task_tab_rec.task_type = G_TASK_TYPE_REPO_ASYNC THEN EM_COLL_UTIL.run_collection(p_context=>l_context) ; ELSIF l_task_tab_rec.task_type = G_TASK_TYPE_SNAPSHOT THEN EM_COLL_UTIL.run_snapshot_collections(p_context=>l_context) ; ELSIF l_task_tab_rec.task_proc IS NOT NULL THEN EXECUTE IMMEDIATE 'CALL '|| DBMS_ASSERT.qualified_sql_name(l_task_tab_rec.task_proc)||'(:context)' USING l_context ; ELSE raise_application_error(MGMT_GLOBAL.COLLECTION_ERR, 'Task procedure is NULL') ; END IF ; l_task_tab_rec.error_message := NULL ; END IF ; EXCEPTION WHEN OTHERS THEN l_task_tab_rec.error_message := sqlerrm ; -- repository collections will have metric errors logged IF l_task_tab_rec.task_type != G_TASK_TYPE_REPO THEN MGMT_LOG.log_error( v_module_name_in=>MGMT_COLLECTION.G_MODULE_NAME, v_error_code_in=>MGMT_GLOBAL.COLLECTION_ERR, v_error_msg_in=>'Task Execution error:'||substr(sqlerrm,1,100)|| ' task='||l_task_tab_rec.task_id|| ' procedure='||l_task_tab_rec.task_proc|| ' Context='||context_string(l_context), v_log_level_in=>MGMT_GLOBAL.G_ERROR) ; END IF ; IF EMDW_LOG.P_IS_ERROR_SET THEN EMDW_LOG.DEBUG('run_task:error '||l_task_tab_rec.error_message, G_MODULE_NAME); END IF ; END ; IF l_task_tab_rec.error_message IS NOT NULL THEN l_task_tab_rec.failures := nvl(l_task_tab_rec.failures,0)+1 ; ELSE l_task_tab_rec.failures := 0 ; END IF ; l_sysdate_utc_end := SYS_EXTRACT_UTC(SYSTIMESTAMP); -- --TBD: A possible bug here, need to check, because next collection timestamp -- is based on the time the last collection was scheduled, not when -- last collection was run. -- for example: If collection has 5 minute interval and was supposed -- to start at 4:50 and it actually started at 4:51, next time should -- be 4:55 not 4:56 -- l_task_tab_rec.last_collection_timestamp := l_sysdate_utc_start ; l_wait_time := GREATEST((l_sysdate_utc_start - l_task_tab_rec.next_collection_timestamp)*86400,0) ; l_run_time := (l_sysdate_utc_end - l_sysdate_utc_start)*86400 ; IF l_task_tab_rec.min_wait_time = 0 THEN l_task_tab_rec.min_wait_time := l_wait_time ; ELSIF l_task_tab_rec.min_wait_time > 0 THEN l_task_tab_rec.min_wait_time := least(l_wait_time, l_task_tab_rec.min_wait_time) ; END IF ; l_task_tab_rec.max_wait_time := greatest(l_wait_time, l_task_tab_rec.max_wait_time) ; l_task_tab_rec.avg_wait_time := ( l_wait_time+ (l_task_tab_rec.avg_wait_time* l_task_tab_rec.total_runs) )/(l_task_tab_rec.total_runs+1) ; IF l_task_tab_rec.min_run_time = 0 THEN l_task_tab_rec.min_run_time := l_run_time ; ELSE l_task_tab_rec.min_run_time := least(l_run_time, l_task_tab_rec.min_run_time) ; END IF ; l_task_tab_rec.max_run_time := greatest(l_run_time, l_task_tab_rec.max_run_time) ; l_task_tab_rec.avg_run_time := ( l_run_time+ (l_task_tab_rec.avg_run_time* l_task_tab_rec.total_runs) )/(l_task_tab_rec.total_runs+1) ; l_task_tab_rec.total_runs := l_task_tab_rec.total_runs + 1 ; l_next_time_utc := EM_TASK.schedule_next_execution(l_task_tab_rec) ; -- -- Error was being logged here, removed since we do not want to log -- errors to mgmt_log we will be logging to metric errors and policy -- evaluation tables. -- IF EMDW_LOG.P_IS_INFO_SET THEN EMDW_LOG.INFO('run_task:Exit normal',G_MODULE_NAME) ; END IF ; RETURN(l_next_time_utc) ; END run_task; -- -- run the tasks in AQ -- FUNCTION run_tasks(p_worker_id IN NUMBER, p_task_class_list IN VARCHAR2 := G_ALL_TASKS) RETURN NUMBER IS l_current_task_id MGMT_COLLECTION_TASKS.task_id%type := 1 ; l_task_q_rec mgmt_coll_queue_obj ; l_tasks_processed NUMBER := 0 ; l_sysdate_start DATE := SYSDATE ; l_sysdate_end DATE ; l_duration NUMBER ; l_min_time_utc DATE ; l_next_time_utc DATE ; -- max_run_time_worker is a local function l_end_time DATE := SYSDATE + (max_run_time_worker/1440) ; BEGIN IF EMDW_LOG.P_IS_INFO_SET THEN EMDW_LOG.INFO('run_tasks:Enter '|| ' Start time='||to_char(SYSDATE,'DD-MON-HH24:MI')|| ' Will end before '|| to_char(l_end_time,'DD-MON-YY HH24:MI')|| ' + last task time ',G_MODULE_NAME) ; END IF ; WHILE l_current_task_id > 0 LOOP l_task_q_rec := dequeue_task(p_dequeue_id=>p_worker_id, p_dequeue_mode=>G_DQ_MODE_PENDING_TASKS, p_task_class_list=>p_task_class_list ) ; l_current_task_id := l_task_q_rec.task_id ; IF EMDW_LOG.P_IS_DEBUG_SET THEN EMDW_LOG.DEBUG('run_tasks:Found Task#'||l_task_q_rec.task_id|| ' Message='||l_task_q_rec.message_code|| ' Class='||l_task_q_rec.task_class|| ' Time='||l_task_q_rec.scheduled_time,G_MODULE_NAME) ; END IF ; IF l_task_q_rec.task_id > 0 AND l_task_q_rec.message_code = 0 THEN BEGIN l_next_time_utc := run_task(p_task_id=>l_task_q_rec.task_id) ; l_min_time_utc := least(nvl(l_min_time_utc,l_next_time_utc),l_next_time_utc) ; l_tasks_processed := l_tasks_processed + 1 ; EXCEPTION WHEN OTHERS THEN -- Generally all errors are caught so the chances of this getting called -- is remote, It is here just in case the time calculations raise error -- or having problems scheduling next execution of task. MGMT_LOG.log_error( v_module_name_in=>MGMT_COLLECTION.G_MODULE_NAME, v_error_code_in=>MGMT_GLOBAL.COLLECTION_ERR, v_error_msg_in=>'Collection error:'||substr(sqlerrm,12)|| '(task='||l_task_q_rec.task_id||')', v_facility_in=>NULL, v_client_data_in=>p_worker_id, v_log_level_in=>MGMT_GLOBAL.G_ERROR) ; END ; COMMIT ; ELSE EXIT ; END IF ; EXIT WHEN SYSDATE >= l_end_time ; END LOOP ; IF l_tasks_processed > 0 THEN l_duration := (SYSDATE - l_sysdate_start)*86400000 ; MGMT_LOG.LOG_PERFORMANCE(v_job_name_in=>MGMT_COLLECTION.G_MODULE_NAME, v_duration_in=>l_duration, v_time_in=>SYSDATE, v_is_total_in=>'Y', v_name_in=>'Collections', v_value_in=>l_tasks_processed, v_client_data_in=>p_worker_id, v_module_in=>G_MODULE_NAME, v_action_in=>p_task_class_list) ; COMMIT ; END IF ; IF EMDW_LOG.P_IS_INFO_SET THEN EMDW_LOG.INFO('run_tasks:Exit',G_MODULE_NAME) ; END IF ; IF l_task_q_rec.message_code = p_worker_id AND l_task_q_rec.task_id = 0 THEN RETURN(-1) ; ELSE RETURN(l_tasks_processed) ; END IF ; END run_tasks; -- -- running a task outside of a queue -- PROCEDURE run_task(p_task_id IN NUMBER) IS l_next_time_utc DATE ; l_task_rec mgmt_coll_queue_obj ; l_task_tab_rec mgmt_collection_tasks%rowtype ; BEGIN BEGIN EM_TASK.get_task_info(p_task_id=>p_task_id, p_task_info=>l_task_tab_rec, p_lock_task=>TRUE) ; EXCEPTION WHEN OTHERS THEN raise_application_error(MGMT_GLOBAL.INVALID_PARAMS_ERR, 'Invalid task ') ; END ; IF l_task_tab_rec.frequency_code != MGMT_GLOBAL.G_SCHED_FREQUENCY_ONDEMAND THEN l_task_rec := dequeue_task(p_dequeue_id=>p_task_id, p_dequeue_mode=>G_DQ_MODE_TASK_ID) ; END IF ; IF l_task_rec.task_id = p_task_id OR l_task_tab_rec.frequency_code = MGMT_GLOBAL.G_SCHED_FREQUENCY_ONDEMAND THEN l_next_time_utc := run_task(p_task_id=>p_task_id) ; ELSE raise_application_error(MGMT_GLOBAL.COLLECTION_ERR, 'Invalid Task') ; END IF ; END run_task; PROCEDURE DBMSJOB_EXTENDED_SQL_TRACE_ON(p_value IN BOOLEAN, p_worker IN NUMBER) IS BEGIN MGMT_SQLTRACE.EXTENDED_SQL_TRACE_ON(EST_WORKER_NAME || p_worker, p_value); END DBMSJOB_EXTENDED_SQL_TRACE_ON; -- -- the actual worker procedure which runs the tasks -- PROCEDURE worker(p_worker_id IN NUMBER) IS l_worker_rec mgmt_collection_workers%rowtype ; l_status NUMBER ; BEGIN EMDW_LOG.set_context(v_context_type=>G_LOG_CONTEXT, v_context_identifier=>'Worker#'||p_worker_id) ; IF EMDW_LOG.P_IS_INFO_SET THEN EMDW_LOG.INFO('worker:Enter',G_MODULE_NAME) ; END IF ; MGMT_SQLTRACE.EXTENDED_SQL_TRACE(EST_WORKER_NAME || p_worker_id); BEGIN -- it is possible that the worker was removed so no exit lock_worker_rec(p_worker_id=>p_worker_id, p_worker_rec=>l_worker_rec) ; EXCEPTION WHEN OTHERS THEN RETURN ; END ; IF l_worker_rec.worker_status = G_WORKER_STATUS_STOP_PENDING THEN IF EMDW_LOG.P_IS_INFO_SET THEN EMDW_LOG.INFO('worker:found stop pending status',G_MODULE_NAME) ; END IF ; remove_worker(p_worker_id) ; COMMIT ; ELSE IF l_worker_rec.worker_status = G_WORKER_STATUS_STARTING THEN UPDATE mgmt_collection_workers SET worker_status = G_WORKER_STATUS_STARTED, worker_start_time = SYSDATE WHERE worker_id = p_worker_id ; END IF ; -- Release lock on worker record otherwise stop_workers will hang COMMIT ; l_status := run_tasks(p_worker_id=>p_worker_id, p_task_class_list=>l_worker_rec.task_class_list) ; -- if stop message or this is a foreground worker then remove worker IF l_status < 0 OR l_worker_rec.job_id IS NULL THEN remove_worker(p_worker_id) ; END IF ; COMMIT ; END IF ; IF EMDW_LOG.P_IS_INFO_SET THEN EMDW_LOG.INFO('worker:Exit',G_MODULE_NAME) ; END IF ; EXCEPTION WHEN OTHERS THEN MGMT_LOG.log_error( v_module_name_in=>MGMT_COLLECTION.G_MODULE_NAME, v_error_code_in=>MGMT_GLOBAL.COLLECTION_ERR, v_error_msg_in=>'Collection Worker error:'||substr(sqlerrm,12), v_facility_in=>NULL, v_client_data_in=>p_worker_id, v_log_level_in=>MGMT_GLOBAL.G_ERROR) ; END worker; -- -- Procedure to detect and clean anamolies in task job. -- This procedure runs every hour hour or so and detects anamolies -- and corrects them. When we run into space issues, if we rollback, then we get -- into a infinite loop with the same aq entr being processed unsuccessfully -- many times. If we error out the AQ entries are not created. PROCEDURE resubmit_failed_task AS l_task_id_array MGMT_INTEGER_TABLE := MGMT_INTEGER_TABLE(); l_task_class_array MGMT_INTEGER_TABLE := MGMT_INTEGER_TABLE(); l_mgmt_coll_queue_obj MGMT_COLL_QUEUE_OBJ; BEGIN IF EMDW_LOG.P_IS_INFO_SET THEN EMDW_LOG.INFO('resubmit_failed_task:Entry',G_MODULE_NAME) ; END IF ; SELECT task_id, task_class BULK COLLECT INTO l_task_id_array, l_task_class_array FROM mgmt_collection_tasks c WHERE frequency_code != 7 AND NOT EXISTS (SELECT 1 FROM mgmt_task_qtable q WHERE q.user_data.task_id = c.task_id) FOR UPDATE; IF l_task_id_array IS NOT NULL AND l_task_id_array.count > 0 THEN FOR i in 1 .. l_task_id_array.last LOOP l_mgmt_coll_queue_obj := mgmt_coll_queue_obj.new(P_TASK_ID => l_task_id_array(i), P_TASK_CLASS => l_task_class_array(i), P_MESSAGE_CODE => 0 , P_SCHEDULED_TIME => MGMT_GLOBAL.SYSDATE_UTC ); EM_TASK.enqueue_task (p_coll_queue_rec => l_mgmt_coll_queue_obj ); END LOOP; END IF; COMMIT; IF EMDW_LOG.P_IS_INFO_SET THEN EMDW_LOG.INFO('resubmit_failed_task:Exit',G_MODULE_NAME) ; END IF ; EXCEPTION WHEN OTHERS THEN ROLLBACK; MGMT_LOG.LOG_ERROR(G_MODULE_NAME, 0, 'Error in resubmit_failed_task'|| '(Error : '||sqlerrm ||')' ); END resubmit_failed_task; BEGIN G_REPO_OWNER := MGMT_USER.GET_REPOSITORY_OWNER; END em_task; / show err