Tuesday, February 19, 2008

a PLSQL script w collections

PROCEDURE corp_process(beg_hdr_id in number, end_hdr_id in number) is type price_col is record ( dt_updated date, buy_sell char, unit_cost_local number, usd_amt_adj number ); type price_dataset is table of price_col; dataset price_dataset; price_rec corp_agg_price%rowtype; large_size_vol number; time_then NUMBER; time_now NUMBER; old_hdr_id number; lag_1_date date; lag_4_date date; -- dynamic query used in corp_base -- get newly appended trades based on hdr_id CURSOR c1 (beg_hdr_id_in CORP_BASE.hdr_id%TYPE, end_hdr_id_in CORP_BASE.hdr_id%TYPE) IS SELECT distinct hdr_id, asset_id, trade_dt FROM SBFI.CORP_BASE -- FROM TEST_CORP_BASE WHERE hdr_id between beg_hdr_id_in and end_hdr_id_in and asset_id is not null and unit_cost_local between 1 and 200 order by hdr_id asc; -- function that can calculate weighted average proces using different criteria -- input: dataset is the data set with query resutls function return_agg_price (lag_date_in date, buy_sell_in char, size_in number, dataset_in price_dataset) return number is price_out number; min_price number; max_price number; nominator number; denominator number; begin min_price := 1000; max_price := -1000; nominator := 0; denominator := 0; price_out := null; -- calculate weighted average only for those trades passing the filter -- get max and min values FOR i IN dataset_in.FIRST..dataset_in.LAST LOOP if trunc(dataset_in(i).dt_updated) <= lag_date_in and dataset_in(i).buy_sell = buy_sell_in and abs(dataset_in(i).usd_amt_adj) >= size_in -- incase negative then if dataset_in(i).unit_cost_local > max_price then max_price := dataset_in(i).unit_cost_local; end if; if dataset_in(i).unit_cost_local <> max_price then price_out := null; end if; return price_out; end return_agg_price; begin old_hdr_id := beg_hdr_id; large_size_vol := 500000; time_then := dbms_utility.get_time; -- main loop, processing each each combination of asset_id & trade_dt FOR r1 IN c1 (beg_hdr_id, end_hdr_id) LOOP -- dbms_output.put_line ( 'loop -- asset_id:' r1.asset_id ' trade dt:' to_char(r1.trade_dt, 'YYYY MM DD')); -- blocked based commit; if r1.hdr_id != old_hdr_id then old_hdr_id := r1.hdr_id; commit; end if; -- delete formerly generated agg prices delete from corp_agg_price unlogging where asset_id = r1.asset_id and trade_dt = r1.trade_dt; lag_1_date := add_weekdays(r1.trade_dt, 1); lag_4_date := add_weekdays(r1.trade_dt, 4); -- select and calculate all values select dt_updated, buy_sell, unit_cost_local, usd_amt_adj bulk collect into dataset from sbfi.corp_base -- FROM TEST_CORP_BASE where asset_id = r1.asset_id and trade_dt = r1.trade_dt and unit_cost_local between 1 and 200; -- dbms_output.put_line ('dataset:' dataset.count); -- populate a record price_rec.hdr_id := r1.hdr_id; price_rec.asset_id := r1.asset_id; price_rec.trade_dt := r1.trade_dt; price_rec.price_all_b_0 := return_agg_price(r1.trade_dt, 'B', 0, dataset); price_rec.price_lg_b_0 := return_agg_price(r1.trade_dt, 'B', large_size_vol, dataset); price_rec.price_all_s_0 := return_agg_price(r1.trade_dt, 'S', 0, dataset); price_rec.price_lg_s_0 := return_agg_price(r1.trade_dt, 'S', large_size_vol, dataset); price_rec.price_all_b_1 := return_agg_price(lag_1_date, 'B', 0, dataset); price_rec.price_lg_b_1 := return_agg_price(lag_1_date, 'B', large_size_vol, dataset); price_rec.price_all_s_1 := return_agg_price(lag_1_date, 'S', 0, dataset); price_rec.price_lg_s_1 := return_agg_price(lag_1_date, 'S', large_size_vol, dataset); price_rec.price_all_b_4 := return_agg_price(lag_4_date, 'B', 0, dataset); price_rec.price_lg_b_4 := return_agg_price(lag_4_date, 'B', large_size_vol, dataset); price_rec.price_all_s_4 := return_agg_price(lag_4_date, 'S', 0, dataset); price_rec.price_lg_s_4 := return_agg_price(lag_4_date, 'S', large_size_vol, dataset); --quality checks, not less than min_price and no larger than max_price if price_rec.price_all_b_4 is not null or price_rec.price_all_s_4 is not null then insert into CORP_AGG_PRICE nologging values price_rec; -- else -- dbms_output.put_line ( 'fail to insert obs -- asset_id:' r1.asset_id ' trade dt:' to_char(r1.trade_dt, 'YYYY MM DD')); end if; END LOOP; commit; time_now := dbms_utility.get_time; dbms_output.put_line ( 'Finish aggregating corp prices, using Time : ' (time_now - time_then)/100 ' secs' ); end corp_process;

No comments: