unit uvclthreadworker; interface uses cstructurelib; function RunThreadWorkerHost(); type TCustomThreadworker = class(t_worker_host) {** @explan(说明) 工作线程宿主 %% **} function create(sc); begin init_workerhost(); tarraycont++; FHandle := tarraycont ; FThreaders[FHandle] := self; if not(sc and ifstring(sc)) then raise "工作线程构造失败!参数错误!"; fn := get_true_script(sc,ns); if fn=0 and ns then //给定函数 begin eg := unit(utssvr_api_c).get_func_finder_register(); eg.add_hook(thisfunction(finder)); fn := add_thread_call(ns); end fp := findfunction(fn); if fp then begin fps := sysgetfuncdepends(fp,2); end if not fp then raise ("工作线程构造失败!回调函数错误!" $fn); set_heartbeat(); p := format_thread_param(get_thread_id(),fn); b := static makeinstance(thisfunction(t_thread_worker),"stdcall",1); cid :=create_thread(b,p); if not cid then raise ("工作线程构造失败!可能是堆栈不够"); inherited create(cid); ////////////////// end class function finder(L,iUser,iName); begin r := f_thread_call_backs[iName]; return r; end function add_thread_call(body); begin for i,v in f_thread_call_backs do begin if v=body then return i; end fn := "__t__thread__call__back__"$(length(f_thread_call_backs)+10); f_thread_call_backs[fn] := "function "+fn+body; return fn; end function destroy();override; begin remove_self(); inherited; end class function set_heartbeat(); begin setglobalcache("~~main~~threader~~",1,(now()+0.015)); end class function dispatch(); begin if not FThreaders then return ; set_heartbeat(); lsdata := nil; ///////////////打乱排序避免某些线程消息总是滞后/////////////////// idxs := mrows(FThreaders,1); if idxs then begin for idx,i in idxs do begin idxs[idx] := array(i,random()); end idxs := sselect[0] from idxs order by [1] end ; end ////////////////////////////////////////////// for idx,i in idxs do begin o := FThreaders[i]; msg := o.fetch(data,true,lsdata); if msg then begin case msg of "s": //成功 begin o.ClientReady(1); end "se": //构造失败 begin o.remove_self(); o.ClientReady(-1); raise "工作线程构造失败!"$data; end "qq": //关闭成功 begin o.remove_self(); o.ClientReady(0); o.clear();// end "q": //请求关闭 begin o.close(); end "e": //错误 begin o.DoOnError(data); end "d": //数据 begin o.DoOnMessage(data); end end; return true; //break; end end end property handle read FHandle; private class function init_workerhost(); begin if not ifarray(FThreaders) then begin f_thread_call_backs := array(); tarraycont := 1; FThreaders := array(); end end function remove_self(); begin ClientReady(0); reindex(FThreaders,array(FHandle:nil)); end static tarraycont; static f_thread_call_backs; [weakref] static FThreaders; FHandle; end type t_worker_client = class(t_worker_base) function create(id); begin inherited; end function err(info); //错误 begin post_to_("e",info); end end type t_worker_host = class(t_worker_base) function create(id); begin fworkerstate :=0; FCatcheData := array(); inherited; end function destroy();override; begin if fworkerstate=1 then begin post_to_("qq",1); end clear(); FCatcheData := array(); inherited; end property OnStart read FOnStart write FOnStart; property OnError read FOnError write FOnError; property OnMessage read FOnMessage write FOnMessage; property componet read Fcomponet write Fcomponet; {** @param(OnMessage)(function[TThreadWorker,data]) 消息回调 %% @param(OnStart)(function[TThreadWorker]) 子线程启动 %% @param(OnError)(function[TThreadWorker,d]) 子线程启动 %% **} function terminate();override; begin if fworkerstate=1 then return inherited; end function ClientReady(flg); //任务句柄改变 begin if flg=1 then begin fworkerstate := 1; if iffuncptr(FOnStart) then begin try call(FOnStart,get_true_self()); except end; end if FCatcheData then begin for i,v in FCatcheData do begin post_to_("d",v); end FCatcheData := array(); end end else if flg=0 then begin fworkerstate := -2; end else if flg=-1 then begin fworkerstate := -1; end end function PostMessage(d); override;//发送数据 begin case fworkerstate of 1: begin if FCatcheData then begin for i,v in FCatcheData do begin post_to_("d",d); end FCatcheData := array(); end return inherited; end 0: begin FCatcheData[length(FCatcheData)] := d; return 1; end else begin return 0; end end; end function DoOnError(d);//处理错误 begin if iffuncptr(FOnError) then begin try return call(FOnError,get_true_self(),d); except end; end end function DoOnMessage(d); //处理数据 begin if iffuncptr(fOnMessage) then begin try return call(fOnMessage,get_true_self(),d); except end; end end private fworkerstate; FCatcheData; //构造数据 weakref FOnError; FOnStart; Fcomponet; FOnMessage; autoref private function get_true_self(); begin if Fcomponet then return Fcomponet; return self(true); end end implementation function RunThreadWorkerHost(); begin return class(TCustomThreadworker).dispatch(); end type t_worker_base = class() function Operator [](idx); begin return FData[idx]; end function Operator [1](idx,v); begin FData[idx] := v; end function create(id); begin FData := array(); fmy_thread := get_thread_id(); fto_thread:=id; fpostct := 0; end function close(); //关闭 begin return terminate(); end function terminate();virtual; //停止 begin return post_to_("q",1); end function postmessage(d);virtual; //发送数据 begin return post_to_("#d",d); end class function fetch_byid(id,cid,d); begin nw := now(); sid := "#"$id$"#"; ls := select ["name"] from listglobalcache() where(["endtm"]>nw and 1=pos(sid,["name"])) order by ["createtm"] end ; for i,vd in ls do begin v := vd["name"]; if getglobalcache(v,d) and not ifnil(d) then begin r := get_v_type(v); del_name(v); ss := str2array(v,"#"); cid := strtoint64def(ss[2],0); return r; end end end function fetch(d,flg,ls); //获取数据 begin nw := now(); if not ifarray(ls) then begin if flg then begin ls := select ["name"] from listglobalcache() where(["endtm"]>nw) order by ["createtm"] end ; end else begin ls := listglobalcache(); end end myid := "#"$fmy_thread$"#"$fto_thread$"#"; for i,vd in ls do begin v := vd["name"]; endm := vd["endtm"]; if endm>0 and endm3 then //主线程已经退出了 begin //this.post_to_("qq",1); this.clear(); return ;//systerminate(2,mypid); end end msg := this.fetch(d,true,nil); case msg of "d":begin //数据 if iffuncptr(fptr) then begin try call(fptr,this,d); except this.err(exceptobject.errinfo); end; end sleep(1); end "qq": begin this.clear(); return 1;//systerminate(1,mypid); end "q":begin //要求退出 this.clear(); this.post_to_("qq",1); return ;//systerminate(1,mypid); end else begin sleep(10); end end ; tslprocessmessages(false); //20230428添加tsl消息分发 end end function get_thread_id(); begin {$ifdef linux} return systhreadself(); {$endif} return systhreadid(); end function create_thread(f,p);//构造线程 begin {$ifdef linux} return pthread_create(f,p); //return g_thread_new(f,p); {$endif} return CreateThread(f,p); end ///////////////////////////////pthread//////////////////////////////// {type pthread_attr_t = class(tslcstructureobj) function create(ptr); begin inherited Create(getpstr(),ptr); end private static pstrc; function getpstr(); begin if not pstrc then begin d := array( ("detachstate","int",0),//int detachstate; // 线程的分离状态 ("schedpolicy","int",0),//int schedpolicy; // 线程调度策略 ("schedparam","int",0), //structsched_param schedparam; // 线程的调度参数 ("inheritsched","int",0), //int inheritsched; // 线程的继承性 ("scope","int",0), //int scope; // 线程的作用域 ("guardsize","intptr",0), // size_t guardsize; // 线程栈末尾的警戒缓冲区大小 ("stackaddr_set","int",0), //int stackaddr_set; // 线程的栈设置 ("stackaddr","intptr",0), //void* stackaddr; // 线程栈的位置 ("stacksize","intptr",0) //size_t stacksize; // 线程栈的大小 ); pstrc := MemoryAlignmentCalculate(d); end return pstrc; end end function pthread_attr_init(attr):integer; begin _f_ := static function(attr:pointer):integer;cdecl; external getdlsymaddress( "libc.so.6" ,functionname(1)); return ##_f_(attr._getptr_()); end } function pthread_create(f,p):integer; begin //attr := new pthread_attr_t();pthread_attr_init(attr);attr._setvalue_("stacksize",10240000); pth := 0; _f_ := static function(var thread:pointer;attr:pointer; f:pointer;arg:pointer):integer;cdecl; external getdlsymaddress( "libpthread.so.0" ,functionname(1)); if 0= ##_f_(pth,0,f,p) then return pth; end /////////////////////gthread/////////////////////////////// {function g_thread_new(f:pointer;p:pointer):pointer; begin if not g_thread_get_initialized() then g_thread_init(); _f_ := static function(n:string;f:pointer;p:pointer):pointer;cdecl; external getdlsymaddress( "libgtk-3.so.0" ,functionname(1)); return ##_f_("",f,p); end function g_thread_get_initialized():Integer; begin _f_ := static function():Integer;cdecl; external getdlsymaddress( "libgtk-3.so.0" ,functionname(1)); return ##_f_(); end function g_thread_init():Integer; begin _f_ := static function():Integer;cdecl; external getdlsymaddress( "libgtk-3.so.0" ,functionname(1)); return ##_f_(); end } /////////////////////////////////////////////////////// function CreateThread(f:pointer;p:pointer):pointer; begin _f_ := static function(attr:pointer;size:pointer;addr:pointer;p:pointer;flag:Integer;var threadid:Integer):pointer;stdcall; external getdlsymaddress( "kernel32.dll" , functionname(1)); id := 0; h := ##_f_ (0,10240000,f,p,0,id); return id; end function iffuncptr(fn); begin //return datatype(fn)=7; return ifobj(fn); end function find_find_function(idx,len,tks);//findfunction 处理 begin state := -1; fn := ""; cn := ""; while idx