Introducción
Dado
que Python se encuentra ampliamente difundido en la comunidad
científica, por su simplicidad en la redacción de código, fácil
depuración mediante IDEs amigables y gran cantidad de librerías, es
conveniente utilizarlo en muchas aplicaciones que requieren una alta
complejidad computacional.
Afortunadamente,
en el transcurso de los años, se ha ido incrementando la cantidad de
núcleos que cada CPU posee para realizar la ejecución de programas
en paralelo, tal como se muestra en la figura 1.
 |
Fig.1 Número mayor de núcleos por procesador de Intel y AMD por año.
|
Sin
embargo, mientras algunas aplicaciones permiten ejecutar bucles de
tipo for en paralelo con solo cambiar un comando, la flexibilidad de
Python hace que deba configurarse manualmente la ejecución en
paralelo de los programas. A cambio, se logra reducir el tiempo de
ejecución en una fracción de menos del 10% que si se ejecutara
secuencialmente; por tanto, la estructuración del código para la
ejecución paralela provee un alto beneficio para el programador.
Si
bien es cierto, es posible aprovechar los núcleos que posee un
procesador físico, python también puede hacer que varios
computadores trabajen en paralelo mediante el paso de mensajes,
haciendo uso de su librería Mpipy. En este texto únicamente se
abordará el caso de procesadores multinúcleo.
La
figura 2 muestra el aprovechamiento de todos los núcleos del
procesador en una computadora personal.
 |
Fig.2 Aprovechamiento de núcleos en un procesador, a) Solo un núcleo al 100%, b) Todos los núcleos al 100%.
|
Si
bien no resulta difícil concluir que la ejecución paralela de
varias tareas trae como consecuencia una reducción en el tiempo
global de ejecución respecto de una ejecución secuencial, al final
de este texto se ilustra con un ejemplo que en ciertas ocasiones la
ejecución secuencial es más rápida, gracias a la buena
administración que hace Python en la memoria RAM del sistema.
Es posible realizar
procesamiento en paralelo, haciendo uso de los múltiples núcleos al
interior de los procesadores actuales, con objeto de reducir
notablemente los tiempos de ejecución de los programas.
Una lista de valores y una o varias constantes.
En
el siguiente programa, se tiene una lista de valores contenidos en la
variable u_list, en que cada uno de ellos se procesará con tres
matrices constantes en la operación.
En
este caso, se ejecutarán paralelamente procesos de la forma mostrada
en la figura 3.
 |
Fig. 3: Ejecución paralela de una lista de valores con una o varias constantes
|
El
código que permite realizar la ejeución paralela como se ilustra en
la figura 3, se muestra en el programa 1.
Programa 1 - código
import
multiprocessing
(1)
from
functools
import
partial
import
numpy
as
np
def
sub_funcion(matrix1,
matrix2, matrix3, ith_u): (2)
sum_matrix = matrix1.sum() + matrix2.sum() + matrix3.sum()
result = sum_matrix + ith_u
return
result
inicial,
final = 1,
1000
(3)
matrix1
= np.arange(1,
10)
matrix2
= np.arange(10,
20)
matrix3
= np.arange(20,
30+1)
sum_mat
= matrix1.sum() + matrix2.sum() + matrix3.sum()
u_list
= np.arange(inicial, final + 1)
sum_ulist
= u_list.sum()
num_cores
= multiprocessing.cpu_count() - 1
(4)
pool
= multiprocessing.Pool(processes=num_cores)
(5)
func
= partial(sub_funcion, matrix1, matrix2, matrix3) (6)
result
= np.array(pool.map(func, u_list)) (7)
pool.close()
(8)
pool.join()
(9)
print
('Las
matrices son:\nmatrix1
= {}\nmatrix2
= {}\nmatrix3
= {}\nEl
resultado final es {}\n'.format(matrix1,
matrix2, matrix3,result.sum()))
print('Realizando
los calculos de matrices y de la variable u_list de forma
independiente...\n'
'Sumatoria de variables en matrices * cantidad de elementos en
u_list = {}\nSumatoria
de numeros en variables u_list = {}\nFinalmente,
la suma de ambos terminos = {}'.format(sum_mat
* u_list.size, sum_ulist, sum_mat * u_list.size +sum_ulist))
Antes
de hacer los cálculos se presentan los contenidos de las 3 matrices.
Nótese que el tamaño de todas ellas es diferente, lo cual no afecta
la ejecución porque se toma la variable completa para cada vez que
se ejecuta sub_funcion.
Programa 1 - resultado
Las
matrices son:
matrix1
= [1 2 3 4 5 6 7 8 9]
matrix2
= [10 11 12 13 14 15 16 17 18 19]
matrix3
= [20 21 22 23 24 25 26 27 28 29 30]
El
resultado final es 965500
Realizando
los calculos de matrices y de la variable u_list de forma
independiente:
Sumatoria
de variables en matrices * cantidad de elementos en u_list =
465000
Sumatoria
de numeros en variables u_list = 500500
Finalmente,
la suma de ambos terminos = 965500
Process
finished with exit code 0
Del
programa, pueden destacarse los siguientes elementos:
La
función partial es clave, permite reunir múltiples
argumentos para aplicarlos a una sola función.
Esta sub_funcion es la que se ejecutará en paralelo a
otras copias de la misma.
Los valores initial y final definen una lista de valores que serán
procesados dentro de cada sub_funcion.
Es posible establecer cuántos núcleos del procesador se utilizarán
durante el proceso. Mediante la función multiprocessing.cpu_count()
se conocen los que hay disponibles.
Se inicia el procesamiento paralelo.
La función partial tiene como primer agumento el
nombre de la función sub_funcion y, a continuación,
todas las variables que se mantendrán constantes para cada proceso
paralelo.
La función pool.map devuelve todos los resultados en
un puntero de dirección. Por ello, se convierten en un arreglo
mediante la función np.array.
La función pool.close() hace que los procesos que
están trabajando en paralelo ya no acepten más argumentos.
La función pool.join() espera a que todos los
procesos terminen para continuar la ejecución del programa.
Notas
especiales:
Nótese que la u_list es una lista de valores como
argumento de pool.map, mientras que ith_u
como argumento de sub_funcion es una variable que
contiene un iésimo de u_list automáticamente.
Aunque el principio es simple, el código es un poco largo porque se
quiere mostrar que se obtiene el mismo resultado mediante la
aplicación de una fórmula y mediante la ejecución de 1000
procesos en paralelo.
Esta estructura es conveniente
utilizarla cuando existen varios argumentos iguales, que permanecerán
constantes e iguales para todos los procesos simultáneos y solamente
una lista de valores se tiene que procesar con estos argumentos.
Varias variables de igual dimensión
En este caso, se
asume que todos los argumentos tienen la misma cantidad de elementos.
La figura 4 ilustra la ejecución de los procesos. Nótese que en el
caso de los arreglos, que pueden ser de una, dos o más dimensiones,
será necesario convertirlos a unidimensionales y luego restablecer
sus dimensiones originales para poder aplicar el algoritmo descrito
en el programa 2.
 |
Fig.4 Ejecución de procesos en
paralelo cuyas variables tienen la misma dimensión |
El siguiente
programa ilustra la ejecución paralela para el caso de variables de
igual tamaño.
Programa 2 - código
import
multiprocessing
from
pathos.pools
import
ProcessPool
as
Pool
import
numpy
as
np
def
sub_funcion(matrix1,
matrix2, matrix3):
sum_matrix = matrix1 + matrix2 + matrix3
print('sub_funcion,
{} + {} + {} = {}'.format(matrix1,
matrix2, matrix3, sum_matrix))
return
sum_matrix
matrix1
= np.arange(0,
10)
matrix2
= np.arange(10,
20)
matrix3
= np.arange(20,
30)
num_cores
= multiprocessing.cpu_count() - 1
(1)
pool
= Pool(nodes=num_cores)
result
= list(pool.map(sub_funcion,
matrix1, matrix2, matrix3)) (2)
print('\nLos
valores devueltos de cada proceso se muestran en la lista
{}'.format(result))
En
la presentación del resultado, se muestra el valor iésimo de las
variables matrix1, matrix2 y matrix3, donde es requisito
indispensable que todas tengan el mismo tamaño. Finalmente, se
muestran en una lista los valores obtenidos por cada sub_funcion.
Programa 2 - resultado
sub_funcion,
0 + 10 + 20 = 30
sub_funcion,
1 + 11 + 21 = 33
sub_funcion,
2 + 12 + 22 = 36
sub_funcion,
3 + 13 + 23 = 39
sub_funcion,
4 + 14 + 24 = 42
sub_funcion,
5 + 15 + 25 = 45
sub_funcion,
6 + 16 + 26 = 48
sub_funcion,
7 + 17 + 27 = 51
sub_funcion,
8 + 18 + 28 = 54
sub_funcion,
9 + 19 + 29 = 57
Los
valores devueltos de cada proceso se muestran en la lista [30, 33,
36, 39, 42, 45, 48, 51, 54, 57]
Process
finished with exit code 0
Del
programa, pueden destacarse los siguientes elementos:
De
manera similar al programa anterior, puede establecerse el número
de núcleos participarán simultáneamente.
Propio de la función map en Python, el primer argumento es el
nombre de la función y los siguientes argumentos son las variables.
Este algoritmo es apropiado
utilizarlo cuando todos los argumentos tienen la misma dimensión.
Uso de clases, tareas y resultados
Además
de los dos algoritmos anteriores que se centran en el uso de los
núcleos del procesador, puede utilizarse el presentado como programa
3, donde se crea la clase paralell de la cual pueden crearse muchas y
perfectamente pueden ejecutarse de forma paralela. La entrega de
información a procesar se realiza mediante la asignación de tareas
y, luego de procesarla, se almacena en una especie de memoria para
resultados.
Programa 3 - código
import
multiprocessing
a
= 100
(1)
class
paralell(multiprocessing.Process):
(2)
def
__init__(self,
task_queue, result_queue):
multiprocessing.Process.__init__(self)
self.task_queue
= task_queue
self.result_queue
= result_queue
def
run(self):
(3)
proc_name = self.name
while
True:
next_task = self.task_queue.get()
(4)
if
next_task
is
None:
print('{}:
Exiting'.format(proc_name))
self.task_queue.task_done()
break
global
a
(5)
result = a + next_task + 10
(6)
self.result_queue.put(result)
self.task_queue.task_done()
(7)
if
__name__
== '__main__':
tasks = multiprocessing.JoinableQueue() (8)
results = multiprocessing.Queue() (9)
num_workers = multiprocessing.cpu_count()
consumers =
[paralell(tasks, results) (10)
for
i
in
range(num_workers)
]
for
w
in
consumers:
w.start() (11)
num_jobs = 10000
for
k0
in
range
(num_jobs):
(12)
tasks.put(k0)
for
i
in
range(num_workers):
(13)
tasks.put(None)
tasks.join() (14)
acc = 0
while
num_jobs:
result = results.get() (15)
acc
+= result
num_jobs -= 1
print
('resultado
final = {}'.format(acc))
(16)
En
este programa, worker representa a cada núcleo del procesador ya que
realiza el trabajo de ejecutar una rutina. Se utiliza la lógica que
un trabajo (job) puede ser ejecutado por varios trabajadores
(workers).
Programa 3 - resultado
paralell-4:
Exiting
paralell-2:
Exiting
paralell-8:
Exiting
paralell-5:
Exiting
paralell-1:
Exiting
paralell-6:
Exiting
paralell-7:
Exiting
paralell-3:
Exiting
num_jobs
= 4
num_jobs
= 3
num_jobs
= 2
num_jobs
= 1
num_jobs
= 0
resultado
final = 51095000
Process
finished with exit code 0
De los comentarios
en el programa:
Para efectos de
probar si una variable puede utilizarse en los procesos en paralelo,
hacer cálculos y generar un consolidado final, se inicializa "a"
con 100.
La clase
paralell recibe como argumento la clase Multiprocessing.Process.
Esto permite inicializar sus propias variables self.task_queue y
self.result_queue, las cuales almacenarán las tareas y los
resultados respectivamente.
El método run
se ejecutará cuando se ejecute paralell.start()
Cada nueva
tarea (o bien, valor a procesar) será recibida con la ejecución
del método self.task_queue.get()
En Python, al
definir "a" como global, será buscada en el directorio
raíz de variables.
El resultado
tendrá valores comprendidos entre 100 + 0 + 10 y 100 + 9999 + 10.
Es necesario
ejecutar el método self.task_queue.task_done() al finalizar cada
tarea, no importa que la tarea recibida sea None.
La variable
tasks representa a las tareas que se ejecutarán en paralelo por
cada worker. Es una estructura tipo FIFO.
La variable
results guarda los resultados que serán guardados por las tareas
ejecutadas por cada worker.
La variable
consumers representa una lista de "workers" que recibe
cada uno como argumento un espacio para tareas (tasks) y un espacio
para resultados (results)
Cada uno de los
"workers" es inicializado y se pone a trabajar.
Internamente, cada uno ejecuta el método run.
A partir de la
variable num_jobs, se tiene una lista de 10,000 elementos, desde 0
hasta 9,999. Cada uno de ellos se le brinda a un "worker"
para que sea procesado
Con este bucle,
a un total de workers igual al número de núcleos en el procesador,
se le encarga de ejecutar la tarea "None". Esto es
totalmente indispensable y de no hacerlo, los workers seguirán
activos impidiendo la finalización del programa.
Se espera a que
todas las tareas sean ejecutadas para continuar el programa.
De cada uno de
los trabajos llevados a cabo (10,000 en este caso), se recibe cada
valor y se acumula en la variable "acc".
Se presenta el resultado de acumular los valores entregados en la
variable results. Pueden sumarse los valores constantes de cada
resultado, sum1 = 10000 * 110 = 1100000 y luego los numeros
naturales del 1 al 9999 mediante la fórmula sum2 = (n + n ** 2)/2,
con n=9999. sum2 = 49995000. Finalmente, sum1 + sum2 = 51095000.
Este programa representa una
alternativa de tantas para la ejecución paralela de funciones, con
una perspectiva en el uso de tareas y resultados.
Casos excepcionales
No siempre la ejecución paralela se ejecuta más
rápidamente, ya que para realizarla, debe organizarse la memoria de
la computadora y luego administrar las tareas. Existen casos en que
las tareas pueden ser muchas pero son simples y fáciles de acomodar
en la RAM del sistema.
A continuación, el
programa 4 muestra la ejecución secuencial de la suma de tres
matrices, elemento por elemento, obteniendo un tiempo de 0.00722
segundos.
Programa 4 - código
import
numpy
as
np
import
time
num_elementos
= 10000
matrix1
= np.arange(0,
num_elementos)
matrix2 = np.arange(2
*
num_elementos, 3
*
num_elementos)
matrix3 = np.arange(4
*
num_elementos, 5
*
num_elementos)
t1 = time.time()
result0 = []
for
k
in
range(num_elementos):
result0.append(matrix1[k] + matrix2[k] + matrix3[k])
result =
np.array(result0).sum()
tejecutado = time.time() -
t1
print('\nLa
suma de todos los valores generados por {} procesos es {} y se
ejecuto en {} segundos'.format(num_elementos,
result, tejecutado))
Programa 4 - resultado
La
suma de todos los valores generados por 10000 procesos es 749985000 y
se ejecuto en 0.00722002983093262 segundos
Process
finished with exit code 0
Este
otro código utiliza una ejecución paralela y tarda 0.88594
segundos; es decir, 122.7 veces más que la secuencial.
Programa 5 - código
import
multiprocessing
from
pathos.pools
import
ProcessPool
as
Pool
import
numpy
as
np
import
time
def
sub_funcion(matrix1,
matrix2, matrix3):
sum_matrix = matrix1 + matrix2 + matrix3
return
sum_matrix
num_elementos
= 10000
matrix1
= np.arange(0,
num_elementos)
matrix2 = np.arange(2
*
num_elementos, 3
*
num_elementos)
matrix3 = np.arange(4
*
num_elementos, 5
*
num_elementos)
t1 = time.time()
num_cores =
multiprocessing.cpu_count() - 1
pool
= Pool(nodes=num_cores)
result0
= list(pool.map(sub_funcion,
matrix1, matrix2, matrix3))
result =
np.array(result0).sum()
tejecutado = time.time() -
t1
print('\nLa
suma de todos los valores generados por {} procesos es {} y se
ejecuto en {} segundos'.format(num_elementos,
result, tejecutado))
Programa 5 - resultado
La
suma de todos los
valores generados por 10000 procesos es 749985000 y se ejecuto en
0.8859429359436035 segundos
Process
finished with exit code 0
La ejecución de código Python
en paralelo ahorrará un tiempo de ejecución notable respecto del
tiempo requerido en una ejecución secuencial, siempre y cuando cada
proceso sea complejo.
Referencias
https://www.reddit.com/r/Amd/comments/6cu5ss/highest_amount_of_cores_per_cpu_amd_vs_intel_year/
https://docs.python.org/3/library/multiprocessing.html
https://www.machinelearningplus.com/python/parallel-processing-python/