-rw-r--r-- 2637 libmceliece-20240726/scripts-build/fulltest-top raw
#!/usr/bin/env python3
import os
import sys
import subprocess
import multiprocessing
def thread(task):
  valgrind,o,p,i = task
  command = ['PROJECT-test',o,p,i]
  if valgrind:
    command = ['env','valgrind_multiplier=1','valgrind','-q','--max-stackframe=16777216','--error-exitcode=99']+command
  result = subprocess.run(command,stdout=subprocess.PIPE,stderr=subprocess.PIPE,universal_newlines=True)
  if result.returncode:
    return task,(f'nonzero return code {result.returncode}',result.stdout,result.stderr)
  if len(result.stderr) > 0: # bug in test program
    return task,(f'nonempty errors',result.stdout,result.stderr)
  saidsuccess = False
  saidimplementation = False
  saiddeclassify = False
  for line in result.stdout.splitlines():
    x = line.split()
    if x == ['all','tests','succeeded']: saidsuccess = True
    if len(x) >= 3 and x[2] == 'implementation': saidimplementation = True
    if x == ['valgrind','1','declassify','1']: saiddeclassify = True
  if not saidsuccess: # bug in test program
    return task,(f'did not say all tests succeeded',result.stdout,result.stderr)
  if not saidimplementation:
    return task,(f'CPU does not support implementation',result.stdout,result.stderr)
  if valgrind and not saiddeclassify:
    return task,(f'test does not support declassify',result.stdout,result.stderr)
  return task,(None,result.stdout,result.stderr)
def checkresult(task,result):
  valgrind,o,p,i = task
  failure,out,err = result
  printtask = f'{o}/{p}/{i}'
  printtask += ' dataflow' if valgrind else ' conventional'
  for desc,what in ('output',out),('error',err):
    for line in what.splitlines():
      print(f'{printtask} {desc}: {line}')
  if failure is None:
    print(f'{printtask} result: success')
  else:
    print(f'{printtask} result: {failure}')
  sys.stdout.flush()
  return failure is None
def doit(todo):
  todo = list(todo)
  print(f'tests to run: {len(todo)}')
  try:
    threads = len(os.sched_getaffinity(0))
  except:
    threads = multiprocessing.cpu_count()
  threads = os.getenv('THREADS',default=threads)
  threads = int(threads)
  threads = max(threads,1)
  threads = min(threads,len(todo))
  print(f'maximum threads allowed: {threads}')
  results = {}
  printpos = 0
  ok = True
  with multiprocessing.Pool(threads) as p:
    for task,result in p.imap_unordered(thread,todo,chunksize=1):
      results[task] = result
      while printpos < len(todo):
        task = todo[printpos]
        if task not in results: break
        if not checkresult(task,results[task]):
          ok = False
        printpos += 1
  assert printpos == len(todo)
  return ok