Hide
Solve a problem – Filter by language, license, keyword, owner, or search text to find code & info fast. Join Siafoo Now or Learn More

Simple MPI batch system Atom Feed 0

In Brief This a simple MPI batch system for distributing a calculation among a number of MPI nodes. It is useful in places where the calculation can be split using parameters to the 'calculation' method. Some examples uses include running a Monte-Carlo Markov Chain (MCMC) simulation with different parameters at the same time, and rendering animation frames. More sinister uses like password cracking can also be achieved.... more
# 's
 1def mpi_run():
2
3 if MPI.COMM_WORLD.Get_rank() == 0:
4 mpi_controller()
5 else:
6 mpi_worker()
7
8def mpi_controller():
9 '''
10 Controls the distribution of data-sets to the nodes
11 '''
12
13 iterations = 10000000
14 burnin = 4000000
15
16 orders = range(3, 6)
17
18 # Stores the original task list
19 task_list = []
20
21 # Stores a list of stats
22 stats_list = []
23
24 # Stores a list of processes for exit checking
25 process_list = range(1, MPI.COMM_WORLD.Get_size())
26
27 # Generate the task_list
28 for order in orders:
29 for plambda in plambdas:
30 task_list.append({'order':order, 'plambda':plambda,
31 'iterations':iterations, 'burnin':burnin})
32
33 total_time_start = time.time()
34 print 'Running sampler with: %i tasks on %i processes ' % ( len(task_list), MPI.COMM_WORLD.Get_size())
35
36 while len(process_list) > 0:
37
38 status = MPI.Status()
39
40 data = MPI.COMM_WORLD.Recv(source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG, status=status)
41
42 if status.tag > 9:
43 if status.tag == 15:
44 # Record the data
45 stats_list.append(data[0])
46
47 task = []
48 if len(task_list) > 0:
49 task = task_list.pop()
50
51 MPI.COMM_WORLD.Send(task, dest=status.source)
52
53 elif status.tag == 5: # Status
54 update_gui()
55 pass
56 elif status.tag == 2: # Exit
57 process_list.remove(status.source)
58 print 'Process %i exited' % status.source
59 else:
60 print 'Unkown tag %i with msg %s' % (status.tag, str(data))
61
62 print "Data Run Finished"
63 print "Total Elapsed time: " + str(datetime.timedelta(seconds = (time.time() - total_time_start)))
64
65def mpi_worker():
66 '''
67 Worker process
68 '''
69
70 rank = MPI.COMM_WORLD.Get_rank()
71 proc_name = MPI.Get_processor_name()
72
73 # Send ready
74 MPI.COMM_WORLD.Send([{'rank':rank, 'name':proc_name}], dest=0, tag=10)
75
76 # Start main data loop
77 while True:
78 # Get some data
79 data = MPI.COMM_WORLD.Recv(source=0)
80
81 if len(data) == 0: break;
82
83 # Crunch
84 time_start = time.time()
85
86 crunch()
87
88 time_end = time.time()
89
90 # Save results
91 results = {'rank':rank, 'name':proc_name, 'start_time': time_start, 'end_time': time_end}
92 # Return results
93 MPI.COMM_WORLD.Send([results], dest=0, tag=15)
94
95 MPI.COMM_WORLD.Send([], dest=0, tag=2)

This a simple MPI batch system for distributing a calculation among a number of MPI nodes. It is useful in places where the calculation can be split using parameters to the 'calculation' method. Some examples uses include running a Monte-Carlo Markov Chain (MCMC) simulation with different parameters at the same time, and rendering animation frames. More sinister uses like password cracking can also be achieved.

This was originally written in order to bypass the Python Global Interpreter Lock (GIL) and run an MCMC simulation on both processors of my workstations at the same time.

It requires mpi4py