tslediter/funcext/tvclib/uvclthreadworker.tsf

814 lines
22 KiB
Plaintext

unit uvclthreadworker;
interface
uses cstructurelib;
function RunThreadWorkerHost();
type TCustomThreadworker = class(t_worker_host)
{**
@explan(说明) 工作线程宿主 %%
**}
function create(sc);
begin
init_workerhost();
tarraycont++;
FHandle := tarraycont ;
FThreaders[FHandle] := self;
if not(sc and ifstring(sc)) then raise "工作线程构造失败!参数错误!";
fn := get_true_script(sc,ns);
if fn=0 and ns then //给定函数
begin
eg := unit(utssvr_api_c).get_func_finder_register();
eg.add_hook(thisfunction(finder));
fn := add_thread_call(ns);
end
fp := findfunction(fn);
if fp then
begin
fps := sysgetfuncdepends(fp,2);
end
if not fp then raise ("工作线程构造失败!回调函数错误!" $fn);
set_heartbeat();
p := format_thread_param(get_thread_id(),fn);
b := static makeinstance(thisfunction(t_thread_worker),"stdcall",1);
cid :=create_thread(b,p);
if not cid then raise ("工作线程构造失败!可能是堆栈不够");
inherited create(cid);
//////////////////
end
class function finder(L,iUser,iName);
begin
r := f_thread_call_backs[iName];
return r;
end
function add_thread_call(body);
begin
for i,v in f_thread_call_backs do
begin
if v=body then return i;
end
fn := "__t__thread__call__back__"$(length(f_thread_call_backs)+10);
f_thread_call_backs[fn] := "function "+fn+body;
return fn;
end
function destroy();override;
begin
remove_self();
inherited;
end
class function set_heartbeat();
begin
setglobalcache("~~main~~threader~~",1,(now()+0.015));
end
class function dispatch();
begin
if not FThreaders then return ;
set_heartbeat();
lsdata := nil;
///////////////打乱排序避免某些线程消息总是滞后///////////////////
idxs := mrows(FThreaders,1);
if idxs then
begin
for idx,i in idxs do
begin
idxs[idx] := array(i,random());
end
idxs := sselect[0] from idxs order by [1] end ;
end
//////////////////////////////////////////////
for idx,i in idxs do
begin
o := FThreaders[i];
msg := o.fetch(data,true,lsdata);
if msg then
begin
case msg of
"s": //成功
begin
o.ClientReady(1);
end
"se": //构造失败
begin
o.remove_self();
o.ClientReady(-1);
raise "工作线程构造失败!"$data;
end
"qq": //关闭成功
begin
o.remove_self();
o.ClientReady(0);
o.clear();//
end
"q": //请求关闭
begin
o.close();
end
"e": //错误
begin
o.DoOnError(data);
end
"d": //数据
begin
o.DoOnMessage(data);
end
end;
return true;
//break;
end
end
end
property handle read FHandle;
private
class function init_workerhost();
begin
if not ifarray(FThreaders) then
begin
f_thread_call_backs := array();
tarraycont := 1;
FThreaders := array();
end
end
function remove_self();
begin
ClientReady(0);
reindex(FThreaders,array(FHandle:nil));
end
static tarraycont;
static f_thread_call_backs;
[weakref] static FThreaders;
FHandle;
end
type t_worker_client = class(t_worker_base)
function create(id);
begin
inherited;
end
function err(info); //错误
begin
post_to_("e",info);
end
end
type t_worker_host = class(t_worker_base)
function create(id);
begin
fworkerstate :=0;
FCatcheData := array();
inherited;
end
function destroy();override;
begin
if fworkerstate=1 then
begin
post_to_("qq",1);
end
clear();
FCatcheData := array();
inherited;
end
property OnStart read FOnStart write FOnStart;
property OnError read FOnError write FOnError;
property OnMessage read FOnMessage write FOnMessage;
property componet read Fcomponet write Fcomponet;
{**
@param(OnMessage)(function[TThreadWorker,data]) 消息回调 %%
@param(OnStart)(function[TThreadWorker]) 子线程启动 %%
@param(OnError)(function[TThreadWorker,d]) 子线程启动 %%
**}
function terminate();override;
begin
if fworkerstate=1 then return inherited;
end
function ClientReady(flg); //任务句柄改变
begin
if flg=1 then
begin
fworkerstate := 1;
if iffuncptr(FOnStart) then
begin
try
call(FOnStart,get_true_self());
except
end;
end
if FCatcheData then
begin
for i,v in FCatcheData do
begin
post_to_("d",v);
end
FCatcheData := array();
end
end else
if flg=0 then
begin
fworkerstate := -2;
end else
if flg=-1 then
begin
fworkerstate := -1;
end
end
function PostMessage(d); override;//发送数据
begin
case fworkerstate of
1:
begin
if FCatcheData then
begin
for i,v in FCatcheData do
begin
post_to_("d",d);
end
FCatcheData := array();
end
return inherited;
end
0:
begin
FCatcheData[length(FCatcheData)] := d;
return 1;
end else
begin
return 0;
end
end;
end
function DoOnError(d);//处理错误
begin
if iffuncptr(FOnError) then
begin
try
return call(FOnError,get_true_self(),d);
except
end;
end
end
function DoOnMessage(d); //处理数据
begin
if iffuncptr(fOnMessage) then
begin
try
return call(fOnMessage,get_true_self(),d);
except
end;
end
end
private
fworkerstate;
FCatcheData; //构造数据
weakref
FOnError;
FOnStart;
Fcomponet;
FOnMessage;
autoref
private
function get_true_self();
begin
if Fcomponet then return Fcomponet;
return self(true);
end
end
implementation
function RunThreadWorkerHost();
begin
return class(TCustomThreadworker).dispatch();
end
type t_worker_base = class()
function Operator [](idx);
begin
return FData[idx];
end
function Operator [1](idx,v);
begin
FData[idx] := v;
end
function create(id);
begin
FData := array();
fmy_thread := get_thread_id();
fto_thread:=id;
fpostct := 0;
end
function close(); //关闭
begin
return terminate();
end
function terminate();virtual; //停止
begin
return post_to_("q",1);
end
function postmessage(d);virtual; //发送数据
begin
return post_to_("#d",d);
end
class function fetch_byid(id,cid,d);
begin
nw := now();
sid := "#"$id$"#";
ls := select ["name"] from listglobalcache() where(["endtm"]>nw and 1=pos(sid,["name"])) order by ["createtm"] end ;
for i,vd in ls do
begin
v := vd["name"];
if getglobalcache(v,d) and not ifnil(d) then
begin
r := get_v_type(v);
del_name(v);
ss := str2array(v,"#");
cid := strtoint64def(ss[2],0);
return r;
end
end
end
function fetch(d,flg,ls); //获取数据
begin
nw := now();
if not ifarray(ls) then
begin
if flg then
begin
ls := select ["name"] from listglobalcache() where(["endtm"]>nw) order by ["createtm"] end ;
end else
begin
ls := listglobalcache();
end
end
myid := "#"$fmy_thread$"#"$fto_thread$"#";
for i,vd in ls do
begin
v := vd["name"];
endm := vd["endtm"];
if endm>0 and endm<nw then continue;
if pos(myid,v)=1 then
begin
if getglobalcache(v,d) and not ifnil(d) then
begin
r := get_v_type(v);
del_name(v);
end
return r;
end
end
end
function clear(); //清理
begin
ls := listglobalcache();
myid := "#"$fmy_thread$"#"$fto_thread$"#";
for i,v in ls do
begin
vn := v["name"];
if pos(myid,vn)=1 then
begin
del_name(vn);
end
end
end
function destroy();virtual;
begin
FData := array();
end
function post_to_(t,d); //发送
begin
if ifnil(d) then return 0;
r := setglobalcache(format_idx(t),d,now()+1) ;
return 1;
end
{**
@param(OnMessage)(function[TThreadWorker,data]) 消息回调 %%
**}
class function del_name(vn);
begin
setglobalcache(vn,nil,(now()-1));
end
private
class function get_v_type(v);
begin
msn := static array(
"#d#",//数据
"#e#",//错误信息
"#s#", //启动成功
"#q#",//退出
"#qq#", //退出完成
"#se#" //启动错误
);
//r := "d";
for ii,vv in msn do
begin
if pos(vv,v) then
begin
r := replacetext(vv,"#","");
break;
end
end
return r;
end
function format_idx(t); //格式化
begin
return "#"$fto_thread$"#"$fmy_thread$"#"$t$"#&"$(fpostct++);
end
private
FData; //数据缓存
fmy_thread; //threadid
fto_thread; //链接id
fpostct; //post次数
end
function format_thread_param(id,script);//信息编码
begin
t := get_mem_mgr();
//s := tostn(array(id,script));
s := id$";"$ script;
len := length(s)+2;
p := t.tmalloc(len);
t.writestr(p,s);
return p;
end
function unformat_thread_param(p,id,script);//信息解码
begin
t := get_mem_mgr();
s := t.readstr(p);
t.tfree(p);
d := str2array(s,";");//stn(s);
id := strtoint64(d[0]);
script := d[1];
end
function t_thread_worker(ptr:pointer):{$ifdef linux}pointer{$else}integer{$endif}; //回调
begin
unformat_thread_param(ptr,id,fn);
this := new t_worker_client(id);
mypid := get_thread_id();
fptr := findfunction(fn);
if not ifobj(fptr) then //非函数指针
begin
this.post_to_("se","create thread err: "$fn$" not find!"$ei);
return ;
end else
begin
this.post_to_("s",mypid);
end
//
heartbeatstoped := 0;
while true do
begin
if not getglobalcache("~~main~~threader~~",d) then
begin
heartbeatstoped++;
if heartbeatstoped>3 then //主线程已经退出了
begin
//this.post_to_("qq",1);
this.clear();
return ;//systerminate(2,mypid);
end
end
msg := this.fetch(d,true,nil);
case msg of
"d":begin //数据
if iffuncptr(fptr) then
begin
try
call(fptr,this,d);
except
this.err(exceptobject.errinfo);
end;
end
sleep(1);
end
"qq":
begin
this.clear();
return 1;//systerminate(1,mypid);
end
"q":begin //要求退出
this.clear();
this.post_to_("qq",1);
return ;//systerminate(1,mypid);
end
else
begin
sleep(10);
end
end ;
tslprocessmessages(false); //20230428添加tsl消息分发
end
end
function get_thread_id();
begin
{$ifdef linux}
return systhreadself();
{$endif}
return systhreadid();
end
function create_thread(f,p);//构造线程
begin
{$ifdef linux}
return pthread_create(f,p);
//return g_thread_new(f,p);
{$endif}
return CreateThread(f,p);
end
///////////////////////////////pthread////////////////////////////////
{type pthread_attr_t = class(tslcstructureobj)
function create(ptr);
begin
inherited Create(getpstr(),ptr);
end
private
static pstrc;
function getpstr();
begin
if not pstrc then
begin
d := array(
("detachstate","int",0),//int detachstate; // 线程的分离状态
("schedpolicy","int",0),//int schedpolicy; // 线程调度策略
("schedparam","int",0), //structsched_param schedparam; // 线程的调度参数
("inheritsched","int",0), //int inheritsched; // 线程的继承性
("scope","int",0), //int scope; // 线程的作用域
("guardsize","intptr",0), // size_t guardsize; // 线程栈末尾的警戒缓冲区大小
("stackaddr_set","int",0), //int stackaddr_set; // 线程的栈设置
("stackaddr","intptr",0), //void* stackaddr; // 线程栈的位置
("stacksize","intptr",0) //size_t stacksize; // 线程栈的大小
);
pstrc := MemoryAlignmentCalculate(d);
end
return pstrc;
end
end
function pthread_attr_init(attr):integer;
begin
_f_ := static function(attr:pointer):integer;cdecl; external getdlsymaddress( "libc.so.6" ,functionname(1));
return ##_f_(attr._getptr_());
end }
function pthread_create(f,p):integer;
begin
//attr := new pthread_attr_t();pthread_attr_init(attr);attr._setvalue_("stacksize",10240000);
pth := 0;
_f_ := static function(var thread:pointer;attr:pointer; f:pointer;arg:pointer):integer;cdecl; external getdlsymaddress( "libpthread.so.0" ,functionname(1));
if 0= ##_f_(pth,0,f,p) then return pth;
end
/////////////////////gthread///////////////////////////////
{function g_thread_new(f:pointer;p:pointer):pointer;
begin
if not g_thread_get_initialized() then g_thread_init();
_f_ := static function(n:string;f:pointer;p:pointer):pointer;cdecl; external getdlsymaddress( "libgtk-3.so.0" ,functionname(1));
return ##_f_("",f,p);
end
function g_thread_get_initialized():Integer;
begin
_f_ := static function():Integer;cdecl; external getdlsymaddress( "libgtk-3.so.0" ,functionname(1));
return ##_f_();
end
function g_thread_init():Integer;
begin
_f_ := static function():Integer;cdecl; external getdlsymaddress( "libgtk-3.so.0" ,functionname(1));
return ##_f_();
end }
///////////////////////////////////////////////////////
function CreateThread(f:pointer;p:pointer):pointer;
begin
_f_ := static function(attr:pointer;size:pointer;addr:pointer;p:pointer;flag:Integer;var threadid:Integer):pointer;stdcall; external getdlsymaddress( "kernel32.dll" , functionname(1));
id := 0;
h := ##_f_ (0,10240000,f,p,0,id);
return id;
end
function iffuncptr(fn);
begin
//return datatype(fn)=7;
return ifobj(fn);
end
function find_find_function(idx,len,tks);//findfunction 处理
begin
state := -1;
fn := "";
cn := "";
while idx<len do
begin
idx++;
tki := tks[idx];
if state=-1 then
begin
if tki="(" then state:=0;
end else
if state=2 then
begin
if ("class"=lowercase(tki)) or ("unit"=lowercase(tki)) then
begin
idx+=1;
state:=3;
end
else
begin
state := 6;
end
end else
if state=0 then
begin
fn := tki;
state:=1;
end else
if state=1 then
begin
if tki=")" then return fn;
else if tki="," then state:=2;
end else
if state = 3 then
begin
cn := tki;
break;
end else
if state=4 then
begin
return 0;
end
end
if cn and fn then return cn+"."+fn;
return fn;
end
function find_this_function(idx,len,tks);//thisfunction处理
begin
state := -1;
fn := "";
cn := "";
while idx<len do
begin
idx++;
tki := tks[idx];
if state=-1 then
begin
if tki="(" then state:=0;
end else
if state=0 then
begin
if ("class"=lowercase(tki)) or ("unit"=lowercase(tki)) then
begin
idx+=1;
state:=1;
end
else
begin
fn := tki;
state := 3;
end
end else
if state=1 then
begin
cn := tki;
state:=2;
end else
if state=2 then
begin
idx++;
state:=4;
end else
if state = 3 then
begin
//if tki=")" then return fn;
return fn;
end else
if state=4 then
begin
fn := tki;
break;
end
end
if cn and fn then return cn+"."+fn;
return fn;
end
function get_true_script(s,ns);//解析文本
begin
tks := parser_script(s,ps);
idx := -1;
len := length(tks)-1;
state:=0;
while idx<len do
begin
idx++;
tk := tks[idx];
if state=0 then
begin
if lowercase(tk)="this.onmessage" and tks[idx+1]=":=" then // message
begin
state := 1;
idx+=1;
end else //其他
begin
if lowercase(tk)="function" then
begin
ns := s[ps[idx]:];
return 0;
end else
return tk;
end
end else
if state=1 then
begin
case lowercase(tk) of
"findfunction":
begin
return find_find_function(idx,len,tks);
end
"thisfunction":
begin
return find_this_function(idx,len,tks);
end
"function":
begin
ns := s[ps[idx]:];
return 0;
end else return 0;
end ;
end
end
end
function save_tk(tk,tki,ps,idx);
begin
if tki then
begin
len := length(tk);
tk[len] := tki;
ps[len]:= idx;
end
tki := "";
end
function parser_script(s,ps);
begin
len := length(s);
tk := array();
tki :="";
idx := 0;
ps := array();
while idx<len do
begin
idx++;
si := s[idx];
case si of
" ","\r","\n","\t": //分隔符
begin
save_tk(tk,tki,ps,idx);
end
":":
begin
save_tk(tk,tki,ps,idx);
if idx<len then
begin
idx++;
si := s[idx];
if si="=" then
begin
save_tk(tk,":=",ps,idx);
end else
begin
save_tk(tk,":",ps,idx);
idx--;
end
end
end
"." :
begin
if tki then tki+=".";
else
begin
save_tk(tk,si,ps,idx+1);
end
end
"(",")","=","?",";",",":
begin
save_tk(tk,tki,ps,idx);
save_tk(tk,si,ps,idx+1);
end
'"',"'":
begin
save_tk(tk,tki,ps,idx);
ss := find_str(s,idx,si,len);
save_tk(tk,ss,ps,idx);
end else
begin
tki+=si;
end
end ;
end
save_tk(tk,tki,ps,idx);
return tk;
end
function find_str(s,idx,v,len);
begin
tki := "";// v;
while idx<len do
begin
idx++;
si := s[idx];
if si=v then
begin
//tk[length(tk)] := tki;
break;
end
tki+=si;
end
return tki;
end
initialization
end.