CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutSign UpSign In

Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.

| Download

GAP 4.8.9 installation with standard packages -- copy to your CoCalc project to get it

Views: 418346
#############################################################################
##
#W process.gi               The SCSCP package             Alexander Konovalov
#W                                                               Steve Linton
##
#############################################################################

#############################################################################
# 
# The current stream is remembered to properly close the connection 
# after an exit from a break loop
#
SCSCP_CURRENT_SESSION_STREAM := fail;


#############################################################################
# 
# IsProcessRepresentation
#
# The 1st element is a stream
# The 2nd is a process id
# The 3rd is true if the process is a part of a multi-session
# (that is, the stream should not be closed after the process is
# completed in order to continue the session without another 
# handshaking stage
# 
DeclareRepresentation( "IsProcessRepresentation", 
                       IsPositionalObjectRep,
                       [ 1, 2, 3 ] );
                       
ProcessesFamily := NewFamily( "ProcessesFamily(...)", IsProcess );
                       
ProcessDefaultType := NewType( ProcessesFamily, 
                               IsProcessRepresentation and IsProcess);

if IsReadOnlyGlobal("OnQuit") then
    MakeReadWriteGlobal("OnQuit");
fi;

OnQuit:=function()
if SCSCP_CURRENT_SESSION_STREAM <> fail then
    if not IsClosedStream( SCSCP_CURRENT_SESSION_STREAM ) then
        Print( "SCSCP : ", SCSCP_CURRENT_SESSION_STREAM );
        CloseStream( SCSCP_CURRENT_SESSION_STREAM );
        Print( " is closed\n" );
    fi;    
    SCSCP_CURRENT_SESSION_STREAM := fail;
fi;    
if not IsEmpty( OptionsStack )  then
    repeat
        PopOptions(  );
    until IsEmpty( OptionsStack );
    Info( InfoWarning, 1, "Options stack has been reset" );
fi;
return;
end;

MakeReadOnlyGlobal("OnQuit");


#############################################################################
##
#M  ViewObj( <process> )
##
InstallMethod( ViewObj, "for process",
[ IsProcessRepresentation and IsProcess ],
function( proc )
    local stream, pid;
    stream := proc![1];
    pid := proc![2];
    Print("< ");
    if IsClosedStream(stream) then
        Print("closed ");
    fi;
    Print( "process " );
    if proc![3] then
        Print("in session ");
    fi;    
    Print("at ",stream![2],":",stream![3][1], " pid=", pid, " >");
end);


#############################################################################
##
#M  PrintObj( <process> )
##
InstallMethod( PrintObj, "for process",
[ IsProcessRepresentation and IsProcess ],
function( proc )
    local stream, pid;
    stream := proc![1];
    pid := proc![2];
    Print("< ");
    if IsClosedStream(stream) then
        Print("closed ");
    fi;
    Print( "process " );
    if proc![3] then
        Print("in session ");
    fi; 
    Print("at ",stream![2],":",stream![3][1], " pid=", pid, " >");
end);


#############################################################################
#
# NewProcess( command, listargs, <connection | server, port> : 
#                               output:=object/tree/coookie/nothing/deferred, 
#                                             cd:="cdname", debuglevel:=N );
#
# The function sends the request to the SCSCP server, and
# returns the InputOutputTCPStream for waiting the result
#
InstallGlobalFunction( NewProcess, function( arg )

local tcpstream, session_id, omtext, localstream, output_option, debug_option, 
      cdname, attribs, ns, pos1, pos2, pid, token, multisession;

if ValueOption("output") <> fail then
    output_option := ValueOption("output");
else
    output_option := "object";  
fi;

if output_option = "tree" then
    output_option:="option_return_object"; 
else
    output_option:=Concatenation( "option_return_", output_option );
fi;

if ValueOption("cd") <> fail then
    cdname := ValueOption("cd");
else
    cdname := "";
fi;

if ValueOption("debuglevel") <> fail then
    debug_option := ValueOption("debuglevel");
else
    debug_option := 0;
fi;

if Length(arg)=3 then
    tcpstream  := arg[3]![1]; # connection's stream 
    session_id := arg[3]![2]; # connection's session_id
    multisession := true;
else
    tcpstream  := InputOutputTCPStream( arg[3], arg[4] );
    session_id := StartSCSCPsession( tcpstream );
    multisession := false;
fi;
    
SCSCP_CURRENT_SESSION_STREAM := tcpstream;

pos1 := PositionNthOccurrence(session_id,':',2);
if pos1 <> fail then
    pid := Int( session_id{[ pos1+1 .. Length(session_id) ]} );
else
    pid:=0;
fi;

attribs := [ [ "call_id", Concatenation( session_id, ":", RandomString(8) ) ] ];
Add( attribs, [ output_option, "" ] );
if debug_option > 0 then
    Add( attribs, [ "option_debuglevel", debug_option ] );
fi; 

if InfoLevel( InfoSCSCP ) > 2 then

    Print("#I  Composing procedure_call message: \n");
    omtext:="";
    localstream := OutputTextString( omtext, true );
    OMPutProcedureCall( localstream, 
                        arg[1], 
                        rec(     object := arg[2], 
                             attributes := attribs ) : cd:=cdname );
    if IN_SCSCP_BINARY_MODE then
        localstream:=InputTextString( omtext );
        token:=ReadByte( localstream );
        while token <> fail do
            Print( EnsureCompleteHexNum( HexStringInt( token ) ) );
            token:=ReadByte( localstream );
        od;
        Print("\n#I  Total length ", Length(omtext), " bytes \n");
    else
        Print(omtext, "#I  Total length ", Length(omtext), " characters \n");
    fi;
    WriteAll( tcpstream, omtext );
    if IsInputOutputTCPStream( tcpstream ) then
        IO_Flush( tcpstream![1] );
    fi;
  
else
  
    OMPutProcedureCall( tcpstream, 
                        arg[1], 
                        rec(     object := arg[2], 
                             attributes := attribs ) : cd:=cdname );

fi;
              
Info( InfoSCSCP, 1, "Request sent ...");
SCSCP_CURRENT_SESSION_STREAM := fail;             
return Objectify( ProcessDefaultType, [ tcpstream, pid, multisession ] );
end); 


#############################################################################
#
# CompleteProcess( <process> : output:=cookie/tree );
#
# The function waits for the process completion, 
# then collects the result and closes the stream
#
InstallGlobalFunction( CompleteProcess, function( process )
local tcpstream, result, output_option;

if ValueOption( "output" ) <> fail then
  output_option := ValueOption( "output");
else
  output_option := "object";  
fi;

tcpstream := process![1];
SCSCP_CURRENT_SESSION_STREAM := tcpstream;
if IN_SCSCP_TRACING_MODE then SCSCPTraceSuspendThread(); fi;
IO_Select( [ tcpstream![1] ], [ ], [ ], [ ], 60*60, 0 );
if IN_SCSCP_TRACING_MODE then SCSCPTraceRunThread(); fi;
if IN_SCSCP_TRACING_MODE then SCSCPTraceReceiveMessage( tcpstream![3][1] ); fi;
if output_option="tree" then
    result := OMGetObjectWithAttributes( tcpstream : return_tree );
else
    result := OMGetObjectWithAttributes( tcpstream );
fi;    

if result = fail then
    Info( InfoSCSCP, 2, "CompleteProcess failed to get result from ", 
                        tcpstream![2], ":", tcpstream![3][1], ", returning fail" );
else
    Info( InfoSCSCP, 2, "Got back: object ", result.object, 
                        " with attributes ", result.attributes );
fi; 
if not process![3] then # we are in single call session
  CloseStream(tcpstream); 
fi;  
SCSCP_CURRENT_SESSION_STREAM := fail;
return result;
end);


#############################################################################
#
# TerminateProcess( <process> )
#
InstallGlobalFunction( TerminateProcess, function( process )
# THIS WORKS ONLY LOCALLY
if process![1]![2]="localhost" then
  IO_kill( process![2], IO.SIGINT );
fi;  
end);


#############################################################################
#
# SynchronizeProcessesN( <list of processes> )
#
SynchronizeProcessesN := function( processes )
local result, waitinglist, descriptors, s, nrdesc, i, nrprocess;
result := [];
waitinglist:=[ 1 .. Length(processes) ];
while Length(waitinglist) > 0 do
  descriptors := List( processes{waitinglist}, s -> IO_GetFD( s![1]![1] ) );  
  IO_select( descriptors, [ ], [ ], 60*60, 0 );
  nrdesc := First( [ 1 .. Length(descriptors) ], i -> descriptors[i]<>fail );
  nrprocess := waitinglist[ nrdesc ];
  Info( InfoSCSCP, 1, "Process number ", nrprocess, " is ready");
  result[nrprocess] := CompleteProcess( processes[nrprocess] );
  SubtractSet(waitinglist,[nrprocess]); 
od;
return result;
end;


#############################################################################
#
# SynchronizeProcesses2( <process1>, <process2> )
#
# We can faster synchronize two processes, avoiding list manipulations
#
SynchronizeProcesses2 := function( a, b )
local result, descriptors;
result:=[];
descriptors := [ IO_GetFD( a![1]![1] ), IO_GetFD( b![1]![1] ) ];
IO_select( descriptors, [ ], [ ], 60*60, 0 );
if descriptors[1]<>fail then # 1st process is ready
  Info( InfoSCSCP, 1, "Process number 1 is ready");
  result[1] := CompleteProcess( a );
  Info( InfoSCSCP, 1, "Closed 1st process, waiting for 2nd ...");  
  result[2] := CompleteProcess( b );
  return result;
elif descriptors[2]<>fail then # 2nd process is ready
  Info( InfoSCSCP, 1, "Process number 2 is ready");
  result[2] := CompleteProcess( b );
  Info( InfoSCSCP, 1, "Closed 2nd process, waiting for 1st ...");  
  result[1] := CompleteProcess( a );  
  return result;
else
  Error("Error in Synchronize2, both descriptors failed!!! \n");
fi;
end;


#############################################################################
#
# SynchronizeProcesses( <list of processes> )
# SynchronizeProcesses( <process1>, ..., <processN> )
InstallGlobalFunction( SynchronizeProcesses, function( arg )
if Length(arg)=2 then
  return SynchronizeProcesses2( arg[1], arg[2] );
else
  return SynchronizeProcessesN( arg[1] );
fi;
end);


#############################################################################
#
# FirstProcessN( <list of processes> )
#
FirstProcessN := function( processes )
local descriptors, nrdesc, i, result, nr;
descriptors := List( processes, s -> IO_GetFD( s![1]![1] ) );  
IO_select( descriptors, [ ], [ ], 60*60, 0 );
nrdesc := First( [ 1 .. Length(descriptors) ], i -> descriptors[i]<>fail );
Info( InfoSCSCP, 1, "Process number ", nrdesc, " is ready");
result := CompleteProcess( processes[ nrdesc ] );
for nr in [ 1 .. Length(descriptors) ] do
  if nr <> nrdesc then
    TerminateProcess( processes[nr] );
  fi;  
od; 
return result;
end;


#############################################################################
#
# FirstProcess2( <process1>, <process2> )
#
# We can faster handle the case of two processes, avoiding list manipulations
#
FirstProcess2 := function( a, b )
local result, descriptors;
result:=[];
descriptors := [ IO_GetFD( a![1]![1] ), IO_GetFD( b![1]![1] ) ];
IO_select( descriptors, [ ], [ ], 60*60, 0 );
if descriptors[1]<>fail then # 1st process is ready
  Info( InfoSCSCP, 1, "Process number 1 is ready");
  result := CompleteProcess( a );
  TerminateProcess( b );
  return result;
elif descriptors[2]<>fail then # 2nd process is ready
  Info( InfoSCSCP, 1, "Process number 2 is ready");
  result := CompleteProcess( b );
  TerminateProcess( a );  
  return result;
else
  Error("Error in FirstProcess2, both descriptors failed!!! \n");
fi;
end;


#############################################################################
#
# FirstProcess( <list of processes> )
# FirstProcess( <process1>, ..., <processN> )
#
InstallGlobalFunction( FirstProcess, function( arg )
if Length(arg)=2 then
  return FirstProcess2( arg[1], arg[2] );
else
  return FirstProcessN( arg[1] );
fi;
end);



#############################################################################
#
# FirstTrueProcessN( <list of processes> )
#
FirstTrueProcessN := function( processes )
local result, waitinglist, descriptors, s, nrdesc, i, nrprocess;
result := [];
waitinglist:=[ 1 .. Length(processes) ];
while Length(waitinglist) > 0 do
  descriptors := List( processes{waitinglist}, s -> IO_GetFD( s![1]![1] ) );  
  IO_select( descriptors, [ ], [ ], 60*60, 0 );
  nrdesc := First( [ 1 .. Length(descriptors) ], i -> descriptors[i]<>fail );
  nrprocess := waitinglist[ nrdesc ];
  Info( InfoSCSCP, 1, "Process number ", nrprocess, " is ready");
  result[nrprocess] := CompleteProcess( processes[nrprocess] );
  SubtractSet(waitinglist,[nrprocess]);
  if result[nrprocess].object = true then
    Info( InfoSCSCP, 1, "Process number ", nrprocess, " returned true, closing remaining processes");
    for i in waitinglist do
      # TerminateProcess( processes[i] );
      CloseStream( processes[i]![1]);
    od;
    return result;
  fi;    
od;
return result;
end;


#############################################################################
#
# FirstTrueProcess2( <process1>, <process2> )
#
# We can faster handle the case of two processes, avoiding list manipulations
#
FirstTrueProcess2 := function( a, b )
local result, descriptors;
result:=[];
descriptors := [ IO_GetFD( a![1]![1] ), IO_GetFD( b![1]![1] ) ];
IO_select( descriptors, [ ], [ ], 60*60, 0 );
if descriptors[1]<>fail then # 1st process is ready
  Info( InfoSCSCP, 1, "Process number 1 is ready");
  result[1] := CompleteProcess( a );
  if result[1].object = true then
    Info( InfoSCSCP, 1, "Process number 1 returned true, closing process number 2");
    CloseStream( b![1] );
    return result;
  fi;
  Info( InfoSCSCP, 1, "Closed 1st process, waiting for 2nd ...");  
  result[2] := CompleteProcess( b );
  return result;
elif descriptors[2]<>fail then # 2nd process is ready
  Info( InfoSCSCP, 1, "Process number 2 is ready");
  result[2] := CompleteProcess( b );
  if result[2].object = true then
    Info( InfoSCSCP, 1, "Process number 2 returned true, closing process number 1");
    CloseStream( a![1] );
    # TerminateProcess( a );
    return result;
  fi;  
  Info( InfoSCSCP, 1, "Closed 2nd process, waiting for 1st ...");  
  result[1] := CompleteProcess( a );  
  return result;
else
  Error("Error in Synchronize2, both descriptors failed!!! \n");
fi;
end;


#############################################################################
#
# FirstTrueProcess( <list of processes> )
# FirstTrueProcess( <process1>, ..., <processN> )
#
InstallGlobalFunction( FirstTrueProcess, function( arg )
if Length(arg)=2 then
  return FirstTrueProcess2( arg[1], arg[2] );
else
  return FirstTrueProcessN( arg[1] );
fi;
end);

###########################################################################
##
#E 
##