123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692 |
- $PBExportHeader$uo_yfimex_pipeline.sru
- forward
- global type uo_yfimex_pipeline from nonvisualobject
- end type
- end forward
- global type uo_yfimex_pipeline from nonvisualobject
- end type
- global uo_yfimex_pipeline uo_yfimex_pipeline
- type variables
- datastore ds_columns,ds_tables
- nvo_pipeline invo_pipeline // pipeline用户对象
- end variables
- forward prototypes
- private function string p_tran_error_mess (integer mess_id)
- private function boolean p_change_db_type (string as_src_dbms, string as_dst_dbms, ref string as_src_type, ref string as_src_dbtype, ref string as_dst_type, ref string as_dst_dbtype, integer ai_prec, integer ai_scale, string as_isnull, ref string as_init, integer ai_length)
- public function integer pf_writetifofile (string arg_tableinfo, string arg_objfilename, ref string arg_msg)
- private function integer p_create_pipesyntax (transaction itr_source, transaction itr_target, integer commitstep, integer maxerrors, long ai_table_id, string as_table_name, string as_table_filterstring, ref string arg_pipe_syntax, ref string arg_msg, integer arg_createtype)
- public function integer uf_pl_execute (transaction itr_source, transaction itr_target, datawindow dw_error, s_exporttableinfo arg_zipfileinfo[], integer arg_tableno, integer arg_commitstep, integer arg_maxerrors, ref string arg_msg, ref string arg_log, boolean arg_if_buildlogfile)
- end prototypes
- private function string p_tran_error_mess (integer mess_id);//转换传递错误信息
- //p_tran_error_mess(mess_id)
- String Return_mess
- Return_mess = ''
- Choose case Mess_ID
- case -1
- Return_mess = '管道打开错误'
- Return Return_mess + '('+string(Mess_id)+')'
- case -2
- return_mess = '太多的列'
- Return Return_mess + '('+string(Mess_id)+')'
- case -3
- Return_mess = '表已经存在'
- Return Return_mess + '('+string(Mess_id)+')'
- case -4
- return_mess = '不存在制定的表'
- Return Return_mess + '('+string(Mess_id)+')'
- case -5
- return_mess = '连接丢失'
- Return Return_mess + '('+string(Mess_id)+')'
- case -6
- return_mess = '无效的参数'
- Return Return_mess + '('+string(Mess_id)+')'
- case -7
- return_mess = '列格式不匹配'
- Return Return_mess + '('+string(Mess_id)+')'
- case -8
- return_mess = '源SQL语法致命错误'
- Return Return_mess + '('+string(Mess_id)+')'
- case -9
- return_mess = '目的SQL语法致命错误'
- Return Return_mess + '('+string(Mess_id)+')'
- case -10
- return_mess = '已经到达最大错误数'
- Return Return_mess + '('+string(Mess_id)+')'
- case -12
- return_mess = '错误的表语法'
- Return Return_mess + '('+string(Mess_id)+')'
- case -13
- return_mess = '没有提供需要的键名'
- Return Return_mess + '('+string(Mess_id)+')'
- case -15
- return_mess = '管道已经在执行'
- Return Return_mess + '('+string(Mess_id)+')'
- case -16
- return_mess = '源数据库错误'
- Return Return_mess + '('+string(Mess_id)+')'
- case -17
- return_mess = '目的数据库错误'
- Return Return_mess + '('+string(Mess_id)+')'
- case -18
- return_mess = '目的数据库为只读'
- Return Return_mess + '('+string(Mess_id)+')'
- case else
- return 'OK'
- end choose
- end function
- private function boolean p_change_db_type (string as_src_dbms, string as_dst_dbms, ref string as_src_type, ref string as_src_dbtype, ref string as_dst_type, ref string as_dst_dbtype, integer ai_prec, integer ai_scale, string as_isnull, ref string as_init, integer ai_length);//不同数据库数据类型转换
- string ls_src_type
- as_init = ''
- ls_src_type = as_src_type
- ////例 MSS - ORA//
- if left(as_src_dbms,3) = 'MSS' and left(as_dst_dbms,3) = 'O84' then
- choose case ls_src_type
- case 'char','nchar'
- as_src_type = 'char'
- as_dst_type = 'char'
- as_src_dbtype = 'char('+string(ai_prec)+')'
- as_dst_dbtype = 'CHAR('+string(ai_prec)+')'
- if as_isnull = 'no' then as_init = ',initial_value="spaces"'
- case 'varchar','nvarchar'
- as_src_type = 'varchar'
- as_dst_type = 'varchar'
- as_src_dbtype = 'varchar('+string(ai_prec)+')'
- as_dst_dbtype = 'VARCHAR2('+string(ai_prec)+')'
- if as_isnull = 'no' then as_init = ',initial_value="spaces"'
- case 'int'
- as_src_type = 'long'
- as_src_dbtype = 'int'
- as_dst_type = 'double'
- as_dst_dbtype = 'NUMBER('+string(ai_prec)+',0)'
- if as_isnull = 'no' then as_init = ',initial_value="0"'
- case 'smallint'
- as_src_type = 'int'
- as_src_dbtype = 'smallint'
- as_dst_type = 'double'
- as_dst_dbtype = 'FLOAT'
- if as_isnull = 'no' then as_init = ',initial_value="0"'
- case 'tinyint'
- as_src_type = 'int'
- as_src_dbtype = 'tinyint'
- as_dst_type = 'decimal'
- as_dst_dbtype = 'NUMBER(3,0)'
- if as_isnull = 'no' then as_init = ',initial_value="0"'
- case 'float'
- as_src_type = 'double'
- as_src_dbtype = 'float'
- as_dst_type = 'double'
- as_dst_dbtype = 'FLOAT'
- if as_isnull = 'no' then as_init = ',initial_value="0"'
- case 'real'
- as_src_type = 'real'
- as_src_dbtype = 'real'
- as_dst_type = 'double'
- as_dst_dbtype = 'FLOAT'
- if as_isnull = 'no' then as_init = ',initial_value="0"'
- case 'text'
- as_src_type = 'string'
- as_src_dbtype = 'text'
- as_dst_type = 'string'
- as_dst_dbtype = 'LONG VARCHAR'
- if as_isnull = 'no' then as_init = ',initial_value="spaces"'
- case 'bit'
- as_src_type = 'bit'
- as_src_dbtype = 'bit'
- as_dst_type = 'double'
- as_dst_dbtype = 'FLOAT'
- if as_isnull = 'no' then as_init = ',initial_value="0"'
- case 'decimal','numeric'
- as_src_type = 'decimal'
- as_src_dbtype = 'decimal('+string(ai_prec)+','+string(ai_scale)+')'
- as_dst_type = 'decimal'
- as_dst_dbtype = 'NUMBER('+string(ai_prec)+','+string(ai_scale)+')'
- if as_isnull = 'no' then as_init = ',initial_value="0"'
- case 'money'
- as_src_type = 'money'
- as_src_dbtype = 'money'
- as_dst_type = 'decimal'
- as_dst_dbtype = 'NUMBER(19,4)'
- if as_isnull = 'no' then as_init = ',initial_value="0"'
- case 'smallmoney'
- as_src_type = 'smallmoney'
- as_src_dbtype = 'smallmoney'
- as_dst_type = 'decimal'
- as_dst_dbtype = 'NUMBER(10,4)'
- if as_isnull = 'no' then as_init = ',initial_value="0"'
- case 'datetime'
- as_src_type = 'datetime'
- as_src_dbtype = 'datetime'
- as_dst_type = 'datetime'
- as_dst_dbtype = 'DATE'
- if as_isnull = 'no' then as_init = ',initial_value="today()"'
- case 'smalldatetime'
- as_src_type = 'datetime'
- as_src_dbtype = 'smalldatetime'
- as_dst_type = 'datetime'
- as_dst_dbtype = 'DATE'
- case 'binary' // ,'image'
- as_src_type = 'blob'
- as_src_dbtype = 'binary(11)'
- as_dst_type = 'blob'
- as_dst_dbtype = 'RAW(11)'
- if as_isnull = 'no' then as_init = ',initial_value="spaces"'
- case else
- return false
- end choose
- return true
- end if
- //例 MSS - MSS//
- if left(as_src_dbms,3) = 'MSS' and left(as_dst_dbms,3) = 'MSS' then
- choose case ls_src_type
- case 'char','nchar'
- as_src_type = 'char'
- as_dst_type = 'char'
- as_src_dbtype = 'char'
- as_dst_dbtype = 'char'
- if as_isnull = 'no' then as_init = ',initial_value="spaces"'
- case 'varchar','nvarchar'
- as_src_type = 'varchar'
- as_dst_type = 'varchar'
- as_src_dbtype = 'varchar'
- as_dst_dbtype = 'varchar'
- if as_isnull = 'no' then as_init = ',initial_value="spaces"'
- case 'int'
- as_src_type = 'long'
- as_src_dbtype = 'int'
- as_dst_type = 'long'
- as_dst_dbtype = 'int'
- if as_isnull = 'no' then as_init = ',initial_value="0"'
- case 'smallint'
- as_src_type = 'int'
- as_src_dbtype = 'smallint'
- as_dst_type = 'int'
- as_dst_dbtype = 'smallint'
- if as_isnull = 'no' then as_init = ',initial_value="0"'
- case 'tinyint'
- as_src_type = 'int'
- as_src_dbtype = 'tinyint'
- as_dst_type = 'int'
- as_dst_dbtype = 'tinyint'
- if as_isnull = 'no' then as_init = ',initial_value="0"'
- case 'float'
- as_src_type = 'double'
- as_src_dbtype = 'float'
- as_dst_type = 'double'
- as_dst_dbtype = 'float'
- if as_isnull = 'no' then as_init = ',initial_value="0"'
- case 'real'
- as_src_type = 'real'
- as_src_dbtype = 'real'
- as_dst_type = 'real'
- as_dst_dbtype = 'real'
- if as_isnull = 'no' then as_init = ',initial_value="0"'
- case 'text'
- as_src_type = 'string'
- as_src_dbtype = 'text'
- as_dst_type = 'string'
- as_dst_dbtype = 'text'
- if as_isnull = 'no' then as_init = ',initial_value="spaces"'
- case 'bit'
- as_src_type = 'bit'
- as_src_dbtype = 'bit'
- as_dst_type = 'bit'
- as_dst_dbtype = 'bit'
- if as_isnull = 'no' then as_init = ',initial_value="0"'
- case 'decimal','numeric'
- as_src_type = 'decimal'
- as_src_dbtype = 'numeric'
- as_dst_type = 'decimal'
- as_dst_dbtype = 'NUMBER('+string(ai_prec)+','+string(ai_scale)+')'
- if as_isnull = 'no' then as_init = ',initial_value="0"'
- case 'money'
- as_src_type = 'money'
- as_src_dbtype = 'money'
- as_dst_type = 'money'
- as_dst_dbtype = 'money'
- if as_isnull = 'no' then as_init = ',initial_value="0"'
- case 'smallmoney'
- as_src_type = 'smallmoney'
- as_src_dbtype = 'smallmoney'
- as_dst_type = 'smallmoney'
- as_dst_dbtype = 'smallmoney'
- if as_isnull = 'no' then as_init = ',initial_value="0"'
- case 'datetime'
- as_src_type = 'datetime'
- as_src_dbtype = 'datetime'
- as_dst_type = 'datetime'
- as_dst_dbtype = 'datetime'
- if as_isnull = 'no' then as_init = ',initial_value="today"'
- case 'smalldatetime'
- as_src_type = 'datetime'
- as_src_dbtype = 'smalldatetime'
- as_dst_type = 'datetime'
- as_dst_dbtype = 'smalldatetime'
- case 'binary' // ,'image'
- as_src_type = 'blob'
- as_src_dbtype = 'binary(11)'
- as_dst_type = 'blob'
- as_dst_dbtype = 'binary(11)'
- if as_isnull = 'no' then as_init = ',initial_value="spaces"'
- case else
- return false
- end choose
- return true
- end if
- /////////////
- return false
- end function
- public function integer pf_writetifofile (string arg_tableinfo, string arg_objfilename, ref string arg_msg);//pf_writetifofile(arg_tableinfo,arg_objfilename,arg_msg)
- integer li_FileNum,rslt=1,ls_jh
- li_FileNum = FileOpen( arg_objfilename,StreamMode!, Write!, LockWrite!, Replace!)
- if li_FileNum<=0 then
- arg_msg="打开文件"+arg_objfilename+"失败"
- rslt=0
- fileclose(li_FileNum)
- goto ext
- end if
- ls_jh=FileWrite(li_FileNum, arg_tableinfo)
- if ls_jh<=0 then
- arg_msg="写文件"+arg_objfilename+"失败"
- rslt=0
- fileclose(li_FileNum)
- goto ext
- end if
- fileclose(li_FileNum)
- ext:
- return rslt
- end function
- private function integer p_create_pipesyntax (transaction itr_source, transaction itr_target, integer commitstep, integer maxerrors, long ai_table_id, string as_table_name, string as_table_filterstring, ref string arg_pipe_syntax, ref string arg_msg, integer arg_createtype);//建立pipe line 语法
- //p_create_pipesyntax(itr_source,itr_target,commitstep,maxerrors,ai_table_id,as_table_name,arg_pipe_syntax,arg_msg,arg_createtype)
- //arg_createtype=1 :直接建SQL语句
- int rslt=1
- long ll_rowcount
- ds_columns.setfilter('syscolumns_id='+string(ai_table_id))
- ds_columns.filter()
- ds_columns.setsort('syscolumns_colorder A')
- ds_columns.sort()
- ll_rowcount = ds_columns.rowcount()
- if ll_rowcount = 0 then
- arg_msg='表 '+string(ai_table_id)+' '+as_table_name + ' 没有列。'
- rslt=0
- goto ext
- end if
- //查询primary key===========
- s_sys_tntblinfo ls_tntblinfo
- string ls_sc_id_name,ls_Indcolumn_name,ls_thr_Indcolumn_name,ls_fou_Indcolumn_name
- string ls_fif_Indcolumn_name,ls_six_Indcolumn_name,ls_sev_Indcolumn_name,ls_eig_Indcolumn_name
- if f_get_tntblinfo(as_table_name,ls_tntblinfo)=1 then
- ls_sc_id_name=trim(ls_tntblinfo.Sc_id_name)
- ls_Indcolumn_name=trim(ls_tntblinfo.Indcolumn_name)
- ls_thr_Indcolumn_name=trim(ls_tntblinfo.thrIndcolumn_name)
- ls_fou_Indcolumn_name=trim(ls_tntblinfo.fouIndcolumn_name)
-
-
- ls_fif_Indcolumn_name=trim(ls_tntblinfo.fifIndcolumn_name)
- ls_six_Indcolumn_name=trim(ls_tntblinfo.sixIndcolumn_name)
- ls_sev_Indcolumn_name=trim(ls_tntblinfo.sevIndcolumn_name)
- ls_eig_Indcolumn_name=trim(ls_tntblinfo.eigIndcolumn_name)
- else
- arg_msg=as_table_name+"不在系统记录中;"
- rslt=0
- goto ext
- end if
- //==========================
- string ls_key_column
- long ll_key_count,ll_row,ll_find_row
- string ls_name,ls_type,ls_isnull,ls_src_dbtype,ls_dst_dbtype,ls_iskey,ls_src_type,ls_dst_Type
- integer li_isnull
- string ls_syn_src,ls_syn_ret,ls_syn_dst,ls_syn_hea,ls_ret
- string ls_dst_init
- integer li_prec,li_scale,li_length
- ls_syn_hea = 'PIPELINE(source_connect='+itr_source.Database+',destination_connect='+itr_target.Database+',type=update,commit='+string(commitstep)+',errors='+string(maxerrors)+',keyname="'+as_table_name+'")'
- ls_syn_src = 'SOURCE(name="'+as_table_name+'",'
- if arg_createtype=1 then
- ls_syn_ret = 'RETRIEVE(statement=" '+ as_table_filterstring+'")'
- else
- ls_syn_ret = 'RETRIEVE(statement="SELECT * from '+as_table_name + as_table_filterstring+'")'
- end if
- ls_syn_dst = 'DESTINATION(name="'+as_table_name+'",'
- ls_Ret = char(13) + char(10)
- for ll_row = 1 to ll_rowcount
- ls_name = ds_columns.object.syscolumns_name[ll_Row]
- ls_type = ds_columns.object.systypes_name[ll_Row]
- li_prec = ds_columns.object.syscolumns_prec[ll_row]
- li_scale = ds_columns.object.syscolumns_scale[ll_row]
- li_isnull = ds_columns.object.syscolumns_isnullable[ll_Row]
- li_length = ds_columns.object.syscolumns_length[ll_Row]
- if li_isnull= 1 then ls_isnull = 'yes' else ls_isnull = 'no'
- if isnull(li_scale) then li_scale = 0
-
- //检查primary key===========
- boolean if_pkkey
- if_pkkey=false
- ls_iskey=''
- if lower(ls_name) = lower(ls_sc_id_name) and lower(ls_sc_id_name) <>'' then
- if_pkkey=true
- end if
- if lower(ls_name) = lower(ls_Indcolumn_name) and lower(ls_Indcolumn_name) <>'' then
- if_pkkey=true
- end if
- if lower(ls_name) = lower(ls_thr_Indcolumn_name) and lower(ls_thr_Indcolumn_name) <>'' then
- if_pkkey=true
- end if
-
- if lower(ls_name) = lower(ls_fou_Indcolumn_name) and lower(ls_fou_Indcolumn_name) <>'' then
- if_pkkey=true
- end if
-
- if lower(ls_name) = lower(ls_fif_Indcolumn_name) and lower(ls_fif_Indcolumn_name) <>'' then
- if_pkkey=true
- end if
-
- if lower(ls_name) = lower(ls_six_Indcolumn_name) and lower(ls_six_Indcolumn_name) <>'' then
- if_pkkey=true
- end if
-
- if lower(ls_name) = lower(ls_sev_Indcolumn_name) and lower(ls_sev_Indcolumn_name) <>'' then
- if_pkkey=true
- end if
-
- if lower(ls_name) = lower(ls_eig_Indcolumn_name) and lower(ls_eig_Indcolumn_name) <>'' then
- if_pkkey=true
- end if
-
- if if_pkkey then
- ls_iskey = 'key=yes,'
- else
- ls_iskey = ''
- end if
- //==========================
- ls_dst_init = ''
- ls_src_dbtype = space(100)
- ls_dst_dbtype = space(100)
- ls_dst_init = space(100)
- ls_dst_type = space(30)
- ls_src_type = ls_type
-
- //改变数据类型
- if NOT p_change_db_type(&
- itr_source.dbms,&
- itr_target.dbms,&
- ls_src_type,&
- ls_src_dbtype,&
- ls_dst_Type,&
- ls_dst_dbtype,&
- li_prec,&
- li_scale,&
- ls_isnull,&
- ls_dst_init,&
- li_length) then continue
- // source 数据定义
- ls_syn_src += 'COLUMN(type='+ls_src_type+',name="'+ls_name+'",dbtype="'+ls_src_dbtype+'",'+ls_iskey+'nulls_allowed='+lower(ls_isnull)+')'
- // destination 定义
- if ls_type = 'char' then ls_Type = 'varchar'
- ls_syn_dst += 'COLUMN(type='+ls_dst_type+',name="'+ls_name+'",dbtype="'+ls_dst_dbtype+'",'+ls_iskey+'nulls_allowed='+lower(ls_isnull)+ls_dst_init+')'
- // 好看而已。
- if ll_row < ll_rowcount then
- ls_syn_src += ls_ret + ' '
- ls_syn_dst += ls_ret + ' '
- else
- ls_syn_src += ')'
- ls_syn_dst += ')'
- end if
- end for
- arg_pipe_syntax = ls_syn_hea + ls_ret+ls_syn_src +ls_ret+ls_syn_ret +ls_Ret+ ls_syn_dst
- ext:
- return rslt
- end function
- public function integer uf_pl_execute (transaction itr_source, transaction itr_target, datawindow dw_error, s_exporttableinfo arg_zipfileinfo[], integer arg_tableno, integer arg_commitstep, integer arg_maxerrors, ref string arg_msg, ref string arg_log, boolean arg_if_buildlogfile);//执行管道传送数据
- //uf_pl_execute(itr_source,itr_target,dw_error,arg_zipfileinfo[],arg_tableno,arg_commitstep,arg_maxerrors,arg_msg,arg_log)
- //itr_source :源数据源
- //itr_target :目标数据源
- //dw_error :显示错误dw
- //arg_zipfileinfo[300] :要导出的表的名称列表
- //arg_tableno :表数目
- //arg_commitstep :提交步长
- //arg_maxerrors :允许最大错误数目
- //arg_log :返回信息
- //1.create dw,检查传送数据数目
- //2.建立管道传送数据
- int rslt=1
- long lg_rowcount,table_pipeline_fail
- long ls_i
- s_sys_tntblinfo ls_tntblinfo
- string ls_msg
- string char_enter
- string str_dwSQl,str_SyntaxFromSQL
- DataStore ds_data
- ulong ll_table_id //表单ID,长整形数
- string str_Syntax_PipeLine, ls_pipe_result //pipeline语法,pipe结果
- string ls_table_name //表单名称
- string ls_table_filterstring
- int ls_createtype
- long ll_table_row,ll_pipe_result
- long ls_rowsread,ls_rowswritten,ls_rowsInerror
- long ls_total_rowsread,ls_total_rowswritten,ls_total_rowsInerror
- ds_data=create DataStore
- invo_pipeline = create nvo_pipeline
- ds_tables =create DataStore //初始化table dw
- ds_columns=create DataStore //初始化column dw
- ds_tables.dataobject='d_tables_mss'
- ds_columns.dataobject='d_columns_mss'
- ds_tables.settransobject(itr_source)
- ds_columns.settransobject(itr_source)
- ds_tables.retrieve()
- ds_columns.retrieve()
- char_enter=char(13)+char(10) //换行符
- open(w_sys_wait_2jdt) //初始化进度条
- w_sys_wait_2jdt.show()
- w_sys_wait_2jdt.wf_accepttol(arg_tableno) //初始化进度条1
- w_sys_wait_2jdt.wf_accepttol2(0) //初始化进度条2
- for ls_i=1 to arg_tableno
- ls_rowsread=0
- ls_rowswritten=0
- ls_rowsInerror=0
-
- //1.create dw,检查传送数据数目
- w_sys_wait_2jdt.st_msg.text="预处理:"+arg_zipfileinfo[ls_i].table_name //进度信息
- arg_zipfileinfo[ls_i].table_name=lower(arg_zipfileinfo[ls_i].table_name)
- ds_data.reset()
- if trim(arg_zipfileinfo[ls_i].table_name)<>"" then
- //<1>取出该文件信息
- ls_table_name=lower(trim(arg_zipfileinfo[ls_i].table_name))
-
- ll_table_row=ds_tables.find("lower(table_name)= '"+ls_table_name+"'",1,ds_tables.rowcount())
- if ll_table_row<=0 then
- //重置ds_columns=========
- ll_table_id=0
- ds_columns.setfilter("")
- ds_columns.filter()
- //=======================
- ls_msg=arg_zipfileinfo[ls_i].table_name+"不在系统记录中;"
- arg_log=arg_log+ls_msg+char_enter
- table_pipeline_fail++
- goto nexttable
- end if
-
- ll_table_id=ds_tables.object.table_id[ll_table_row]
-
- //<2>生成select语句
- if arg_zipfileinfo[ls_i].dw_creattype = 1 then //直接用filter_string作str_dwSQl
- str_dwSQl=arg_zipfileinfo[ls_i].filter_string
- else
- if arg_zipfileinfo[ls_i].filter_string <> '' then
- str_dwSQl="select "+arg_zipfileinfo[ls_i].table_name+".* from "+arg_zipfileinfo[ls_i].table_name + arg_zipfileinfo[ls_i].filter_string
- else
- str_dwSQl="select "+arg_zipfileinfo[ls_i].table_name+".* from "+arg_zipfileinfo[ls_i].table_name
- end if
- end if
- // SetProfileString("c:\error.txt",'error','a',str_dwSQl)
- // messagebox('',str_dwSQl)
-
- //<3>生成DW
- ls_msg=""
- str_SyntaxFromSQL =itr_source.SyntaxFromSQL(str_dwSQl,"style(type=grid)",ls_msg)
- if ls_msg>'' then
- arg_msg=arg_zipfileinfo[ls_i].table_name+"dw语法生成失败"
- arg_log=arg_log+arg_msg+char_enter
- table_pipeline_fail++
- goto nexttable
- end if
- ds_data.Create(str_SyntaxFromSQL,ls_msg)
- if ls_msg>'' then
- arg_msg=arg_zipfileinfo[ls_i].table_name+"建立相关DW失败"
- arg_log=arg_log+arg_msg+char_enter
- table_pipeline_fail++
- goto nexttable
- end if
-
- //<4>DW取数
- ds_data.settransobject(itr_source)
- w_sys_wait_2jdt.st_msg.text=arg_zipfileinfo[ls_i].table_name+" 正在传递数据..." //进度信息
- lg_rowcount=ds_data.retrieve()
- //2.建立管道传送数据
-
- ls_table_filterstring=''
- if arg_zipfileinfo[ls_i].dw_creattype = 1 then //直接用filter_string作str_dwSQl
- ls_table_filterstring=arg_zipfileinfo[ls_i].filter_string
- else
- if arg_zipfileinfo[ls_i].filter_string <> '' then
- ls_table_filterstring=arg_zipfileinfo[ls_i].filter_string
- else
- ls_table_filterstring=''
- end if
- end if
-
- ls_createtype=arg_zipfileinfo[ls_i].dw_creattype
-
- if lg_rowcount>0 then //如果有数据
- //create PipeLine syntax ================
- str_Syntax_PipeLine=''
-
- if p_create_pipesyntax(itr_source,itr_target,arg_commitstep,arg_maxerrors,ll_table_id,ls_table_name,ls_table_filterstring,str_Syntax_PipeLine,arg_msg,ls_createtype)=0 then
- arg_log=arg_log+arg_msg+char_enter
- table_pipeline_fail++
- goto nexttable
- end if
-
- //语法赋值给pipeline
- invo_pipeline.syntax = str_Syntax_PipeLine
-
- //执行=========================
- w_sys_wait_2jdt.wf_accepttol2(lg_rowcount) //初始化进度条2
- invo_pipeline.ub_showprocess=true //显示进度条2
- invo_pipeline.ul_rowscount=lg_rowcount
-
- // messagebox("",str_Syntax_PipeLine)
-
- ll_pipe_result = invo_pipeline.start(itr_source,itr_target,dw_error)//执行
- ls_pipe_result = p_tran_error_mess(ll_pipe_result) //错误信息转换
-
- if ls_pipe_result <> 'OK' then
- ls_rowsread=0
- ls_rowswritten=0
- ls_rowsInerror=0
- arg_msg="> 失败>>"+" ["+ls_table_name+"],读出- "+string(ls_rowsread)+",写入- "+string(ls_rowswritten)+",错误- "+string(ls_rowsInerror)+"{"+ls_pipe_result+"}"
- table_pipeline_fail++
- else
- ls_rowsread=invo_pipeline.rowsread
- ls_rowswritten=invo_pipeline.rowswritten
- ls_rowsInerror=invo_pipeline.rowsInerror
- if ls_rowsInerror > 0 then
- arg_msg="> 错误>>"+" ["+ls_table_name+"],读出- "+string(ls_rowsread)+",写入- "+string(ls_rowswritten)+",错误- "+string(ls_rowsInerror)
- else
- arg_msg="> 成功>>"+" ["+ls_table_name+"],读出- "+string(ls_rowsread)+",写入- "+string(ls_rowswritten)+",错误- "+string(ls_rowsInerror)
- end if
- end if
-
- ls_total_rowsread += ls_rowsread
- ls_total_rowswritten += ls_rowswritten
- ls_total_rowsInerror += ls_rowsInerror
-
- arg_log=arg_log+arg_msg+char_enter
- else
- w_sys_wait_2jdt.wf_accepttol2(0) //初始化进度条2
- invo_pipeline.ub_showprocess=false
- end if
- end if
-
- nexttable:
- w_sys_wait_2jdt.wf_inc(ls_i) //进度
- next
- arg_log=arg_log+">>数据传递:表-"+string(arg_tableno)+",其中失败-"+string(table_pipeline_fail)+",数据总读出-"+string(ls_total_rowsread)+",总写入-"+string(ls_total_rowswritten)+",总错误-"+string(ls_total_rowsInerror)+char_enter
- //write log
- string ls_filepathname
- ls_filepathname=sys_cur_path+'exportlog'+'\'+"LongJoe_PipeLine"+string(today(),'yyyymmdd')+string(now(),'hhmmss')+".log"
- if arg_if_buildlogfile then
- rslt=pf_writetifofile(arg_log,ls_filepathname,arg_msg)
- if rslt=0 then
- arg_msg="写日志文件失败>>"+arg_msg
- rslt=1
- end if
- end if
- close(w_sys_wait_2jdt)
- return rslt
- end function
- on uo_yfimex_pipeline.create
- call super::create
- TriggerEvent( this, "constructor" )
- end on
- on uo_yfimex_pipeline.destroy
- TriggerEvent( this, "destructor" )
- call super::destroy
- end on
|