CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutSign UpSign In

Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place.

| Download

GAP 4.8.9 installation with standard packages -- copy to your CoCalc project to get it

Views: 418346
#############################################################################
##
#W parlist.g                The SCSCP package             Alexander Konovalov
#W                                                               Steve Linton
##
#############################################################################

SCSCPprocesses:=[];

###########################################################################
##
#F  SCSCPreset
##
##  <#GAPDoc Label="SCSCPreset">
##  
##  <ManSection>
##  <Func Name="SCSCPreset" Arg=""/>
##  <Returns>
##    nothing
##  </Returns>          
##  <Description>
##  If an error occurs during a call of <Ref Func="ParQuickWithSCSCP" />
##  and <Ref Func="ParListWithSCSCP" />, some of parallel requests may
##  be still running at the remaining services, making them inaccessible
##  for further procedure calls. <Ref Func="SCSCPreset" /> resets them
##  by closing all open streams to &SCSCP; servers.
##  </Description>
##  </ManSection>
##  <#/GAPDoc>
##
SCSCPreset:=function()
local proc;
for proc in SCSCPprocesses do
	if not IsClosedStream( proc![1] ) then
		CloseStream( proc![1] );
	fi;	
od;
end;

#############################################################################
#
# ParQuickWithSCSCP( commands, listargs )
#
# The idea of ParQuickWithSCSCP is to apply various methods from the first 
# argument 'commands' containing the list of names of SCSCP procedures to 
# the list of arguments 'listargs', where i-th SCSCP procedure will be 
# executed at SCSCPservers[i]
#
# Example of usage (the time of computation by these two methods
# is approximately the same, so you should expect results from both
# methods in some random order from repeated calls):
#
# ParQuickWithSCSCP( [ "WS_FactorsECM", "WS_FactorsMPQS" ], [ 2^150+1 ] );
# ParQuickWithSCSCP( [ "WS_FactorsCFRAC", "WS_FactorsMPQS" ], [ 2^150+1 ] );
#
InstallGlobalFunction( ParQuickWithSCSCP, function( commands, listargs )
local nr, res;
if Length( commands ) < Length( SCSCPservers ) then
  Error("ParQuickWithSCSCP : the number of procedures smaller than the number of services!!!\n");
fi;
SCSCPprocesses := [];
for nr in [ 1 .. Length(commands) ] do
  SCSCPprocesses[nr] := NewProcess( commands[nr], listargs, SCSCPservers[nr][1], SCSCPservers[nr][2] );
od;  
res := FirstProcess( SCSCPprocesses );
SCSCPreset(); # we want this to be a tiny bit later to prevent broken pipes
return res;
end);


#############################################################################
##
## ParListWithSCSCP( inputlist, remoteprocname : noretry, timeout=int, recallfrequency=int )
##
InstallGlobalFunction( ParListWithSCSCP, function( inputlist, remoteprocname )
local noretry, status, i, itercount, recallfreq, output, callargspositions, 
      currentposition, inputposition, timeout, nr, waitinglist, descriptors, 
      s, nrdesc, retrystack, result, nrservices_alive, nrservices_needed, 
      len, infomw, connections;
       
if ValueOption("timeout")=fail then
  timeout:=60*60; # default timeout - one hour, given in seconds;
else
  timeout:=ValueOption("timeout");
fi;

if ValueOption("noretry")=fail then
  noretry:=false; # no retrying calls which exceeded the timeout
else
  noretry:=ValueOption("noretry");
fi;

if ValueOption("recallfrequency")=fail then
  recallfreq:=0; # no need in initial and perodic pinging services
else
  recallfreq:=ValueOption("recallfrequency");
fi;

infomw:=InfoLevel(InfoMasterWorker);

connections := [];
status := [ ];
nrservices_alive:=0;
nrservices_needed:=Length( inputlist );
len:=Length( inputlist );

for i in [ 1 .. Length(SCSCPservers) ] do
    if PingSCSCPservice( SCSCPservers[i][1], SCSCPservers[i][2] )=fail then
    	status[i]:=0; # the server is not alive  
    	Info( InfoSCSCP, 1, SCSCPservers[i], " is not responding and will not be used!" );
    else 
        connections[i] := NewSCSCPconnection( SCSCPservers[i][1], SCSCPservers[i][2] );
    	status[i]:=1; # the server is alive and ready to accept
        Info( InfoSCSCP, 1, SCSCPservers[i], " responded and attached to the computation!" );
        nrservices_alive := nrservices_alive + 1;
        if nrservices_alive >= nrservices_needed then
        	break;
        fi;
    fi;   
od;

if nrservices_alive = 0 then
	Error( "Can not start computation - no SCSCP service available!\n" );
fi;

output := [ ];
callargspositions := [ ];
retrystack:= [ ];
currentposition := 0;
SCSCPprocesses := [ ];
itercount:=0;

while true do
  itercount:=itercount+1;
  if recallfreq <> 0 then
    if IsInt(itercount/recallfreq) then
      nrservices_needed := Length( inputlist ) - currentposition + Length(retrystack);
      for i in [ 1 .. Length(SCSCPservers) ] do
        if status[i]=0 then
  	      if PingSCSCPservice( SCSCPservers[i][1], SCSCPservers[i][2] )=fail then
            Info( InfoSCSCP, 1, SCSCPservers[i], "is still not responding and can not be used!" );
          else  
            connections[i]:=NewSCSCPconnection( SCSCPservers[i][1], SCSCPservers[i][2] );
            status[i]:=1; # alive and ready to accept
            Info( InfoSCSCP, 1, SCSCPservers[i], " responded and attached to the computation!" );
            nrservices_alive := nrservices_alive + 1;
    	    if nrservices_alive >= nrservices_needed then
      		  break;
    	    fi;
          fi;
        fi;
      od;    
      itercount:=0;
    fi;
  fi;
  #
  # is next task available (from the initial list or retry stack)?
  #
  while currentposition < Length( inputlist ) or Length( retrystack ) > 0 do
    #
    # search for next available service
    #
    nr := Position( status, 1 );
    if nr<>fail then
      #
      # there is a service number 'nr' that is ready to accept procedure call
      #
      if Length( retrystack ) > 0 then
        inputposition := retrystack[ Length( retrystack ) ];
        Unbind( retrystack[ Length( retrystack ) ] );
      else
      	currentposition := currentposition + 1;
      	inputposition := currentposition;
      fi;	
      # remember which argument was sent to this service
      callargspositions[nr] := inputposition;
      SCSCPprocesses[nr] := NewProcess( remoteprocname, 
                                        [ inputlist[inputposition] ], 
                                        connections[nr] );
      if infomw <> 0 then                       
        if infomw = 1 then
      	  Print( inputposition, "/", len, "\r");
        elif infomw = 2 or infomw = 3 then
      	  Print( "#I  ", inputposition, "/", len, ":master --> ", 
                SCSCPservers[nr][1], ":", SCSCPservers[nr][2], "\n" );
        else
      	  Print( "#I  ", inputposition, "/", len, ":master --> ", 
                SCSCPservers[nr][1], ":", SCSCPservers[nr][2], " : ", inputlist[inputposition], "\n" );
        fi;        
	  fi;
      status[nr] := 2; # status 2 means that we are waiting to hear from this service
    else
      break; # if we are here all services are busy
    fi;
  od;  
  #
  # see are there any waiting tasks
  #
  waitinglist:= Filtered( [ 1 .. Length(status) ], i -> status[i]=2 );
  if Length( waitinglist ) = 0 then
  	if Length( callargspositions ) = 0 then
  	  # no next tasks, no waiting tasks and no arguments sent off - computation completed!
  	  if not noretry and ( Length(output) <> Length(inputlist) or not IsDenseList(output) ) then
  	    Error( "The output list does not match the input list!\n" );
  	  else
  	    for i in [1..Length(connections)] do
  	      if not IsClosedStream ( connections[i]![1] ) then
  	        CloseSCSCPconnection( connections[i] );
  	      fi;  
  	    od;
        return output;
      fi;
    else
      Error( "Tasks for arguments ", 
      			inputlist{ Filtered( [ 1 .. Length(callargspositions) ], 
      				i -> IsBound( callargspositions[i] ) ) }, " are lost!\n");
    fi;  
  fi;
  #
  # waiting until any of the running tasks will be completed
  #
  descriptors := List( SCSCPprocesses{waitinglist}, s -> IO_GetFD( s![1]![1] ) );  
  if IN_SCSCP_TRACING_MODE then SCSCPTraceSuspendThread(); fi;
  IO_select( descriptors, [ ], [ ], timeout, 0 );
  if IN_SCSCP_TRACING_MODE then SCSCPTraceRunThread(); fi;
  nrdesc := First( [ 1 .. Length(descriptors) ], i -> descriptors[i] <> fail );
  # if nothing came and timeout has passed then nrdesc=fail
  # This may happen when server was terminated by ^C and is in a break loop,
  # so no procedure_terminated message will appear on the client's side
  if nrdesc=fail then
    if noretry then
      nr := Random( waitinglist );
  	  TerminateProcess( SCSCPprocesses[nr] );
  	  if not IsClosedStream( SCSCPprocesses[nr]![1] ) then
		CloseStream( SCSCPprocesses[nr]![1] );
	  fi;	
	  CloseSCSCPconnection( connections[nr] );
      result:=fail;
    else  
   	  Error( "ParSCSCP: waited for ", timeout, " seconds with no response from ", SCSCPservers{waitinglist}, "\n" );  
   	fi;
  else	
  	nr := waitinglist[ nrdesc ];
  	result := CompleteProcess( SCSCPprocesses[nr] );
  fi;
  
  if result=fail then
    if noretry then
      Info( InfoSCSCP, 2, SCSCPservers[nr], " : timeout exceeded, procedure call terminated" );
      status[nr]:=1;
    else
 	  # the service SCSCPservers[nr] seems to crash, mark it as unavailable
 	  if PingSCSCPservice( SCSCPservers[nr][1], SCSCPservers[nr][2] ) = fail then
 		Print( SCSCPservers[nr], " is no longer available \n" );
 	 	status[nr]:=0;
 	 	nrservices_alive := nrservices_alive - 1;
		if nrservices_alive = 0 then
		  Error( "Can not continue computation - no SCSCP service left available!\n" );
		fi;
 	  else
 		Error("ParSCSCP: failed to get result from ", SCSCPservers[nr] );
 	  fi;
      # we need to retry the call with argument inputlist[callargspositions[nr] ]
      Add( retrystack, callargspositions[nr] );
    fi;
  else
    #
    # processing the result
    #
    if infomw <> 0 then                       
      if infomw > 2 then
        Print( "#I  ", SCSCPservers[nr][1], ":", SCSCPservers[nr][2], 
               " --> ", callargspositions[nr], "/", len, ":master" );
        if infomw > 4 then
          Print( " : ", result.object, "\n" );
        else
          Print("\n"); 
        fi;
      fi;        
	fi;
    status[nr]:=1;
    output[ callargspositions[nr] ] := result.object;
  fi;
  Unbind(callargspositions[nr]);
od; # end of the outer loop
end);