Subprocesos Multiples Python

Alex Jimenez
Alex Jimenez
Apr 7, 2024


Subprocesos Multiples Python

Cuando trabajas con herramientas CLI de terceros que apenas utilizan un núcleo de tu procesador, ver cómo tu máquina de 8 o 16 núcleos apenas suda mientras procesa archivos uno por uno puede resultar frustrante. La realidad es que muchos desarrolladores enfrentan este problema: tienen el hardware, pero no aprovechan su capacidad de procesamiento paralelo. Los Subprocesos Múltiples Python: Guía para Paralelizar Código se convierte entonces en tu mejor aliado para transformar esos scripts lentos en máquinas de procesar datos eficientes.

Python ofrece varias formas de ejecutar múltiples subprocesos simultáneamente, y elegir la correcta puede marcar la diferencia entre esperar minutos u horas. ¿Necesitas procesar cientos de archivos con herramientas externas? ¿Quieres que tu script use todos los núcleos disponibles sin congelar tu sistema? Entonces necesitas entender cómo paralelizar código en Python de manera efectiva.

Por Qué Necesitas Paralelizar Tus Subprocesos

Imagina que tienes 100 archivos binarios que necesitas procesar con un comando como reader.exe archivo.bin - | writer.exe -. Si cada archivo toma 30 segundos, estás mirando 50 minutos de espera. Pero si tienes 8 núcleos y los usas todos, ese tiempo se reduce a poco más de 6 minutos.

El problema principal es que ejecutar subprocesos de forma secuencial no aprovecha el hardware moderno. Tu CPU tiene múltiples núcleos esperando ser utilizados, pero tu script los ignora completamente.

Python tiene una limitación conocida como el Global Interpreter Lock (GIL) que impide que múltiples hilos ejecuten código Python simultáneamente. Sin embargo, cuando trabajas con subprocesos externos, esta limitación desaparece porque el trabajo pesado ocurre fuera del intérprete de Python.

Módulos Esenciales para Subprocesos Paralelos

Python ofrece tres módulos principales para manejar subprocesos múltiples: subprocess, multiprocessing y concurrent.futures. Cada uno tiene sus ventajas y casos de uso específicos.

💡 Si te interesa profundizar en cómo los algoritmos pueden distinguir automáticamente entre correos legítimos y basura, te recomiendo explorar cómo implementar un sistema de detección de spam usando regresión logística, una técnica fundamental que combina elegancia matemática con resultados sorprendentemente efectivos en la clasificación de mensajes.

El Módulo subprocess

Este es tu punto de partida para ejecutar comandos externos. Te permite crear procesos hijo que ejecutan herramientas CLI mientras tu script Python mantiene el control.

import subprocess

# Ejecución básica de un subproceso
resultado = subprocess.run(['reader.exe', 'archivo.bin', '-'], 
                          capture_output=True)

El problema es que subprocess.run() bloquea la ejecución hasta que el proceso termina. Para paralelizar código, necesitas algo más sofisticado.

El Módulo concurrent.futures

Este módulo proporciona una interfaz de alto nivel para ejecutar llamadas asíncronas. Es perfecto para cuando necesitas ejecutar múltiples subprocesos y recolectar sus resultados.

from concurrent.futures import ProcessPoolExecutor
import subprocess

def procesar_archivo(archivo):
    cmd = f'reader.exe {archivo} - | writer.exe -'
    return subprocess.run(cmd, shell=True, capture_output=True)

archivos = ['archivo1.bin', 'archivo2.bin', 'archivo3.bin']

with ProcessPoolExecutor(max_workers=6) as executor:
    resultados = executor.map(procesar_archivo, archivos)

Esta es probablemente la forma más limpia de manejar múltiples subprocesos en Python moderno.

💡 Si te sorprende la versatilidad de este lenguaje, espera a descubrir qué videojuegos famosos fueron desarrollados con Python y cómo grandes estudios han confiado en su potencia para crear experiencias interactivas que millones de jugadores disfrutan a diario.

El Módulo multiprocessing

Para control más granular sobre los procesos, multiprocessing es tu mejor opción. Te permite crear procesos independientes que evitan completamente el GIL.

from multiprocessing import Pool
import subprocess

def ejecutar_comando(archivo):
    proceso = subprocess.Popen(
        ['reader.exe', archivo, '-'],
        stdout=subprocess.PIPE
    )
    subprocess.run(['writer.exe', '-'], stdin=proceso.stdout)
    proceso.wait()
    return archivo

if __name__ == '__main__':
    with Pool(processes=6) as pool:
        resultados = pool.map(ejecutar_comando, lista_archivos)

Implementación Práctica: Procesamiento Paralelo de Archivos

Vamos a construir una solución completa para el problema planteado al inicio. Queremos procesar múltiples archivos usando herramientas CLI externas, aprovechando todos nuestros núcleos menos dos.

Paso 1: Detectar Núcleos Disponibles

Primero necesitas saber cuántos núcleos tiene tu sistema:

import os
import multiprocessing

nucleos_totales = multiprocessing.cpu_count()
nucleos_usar = max(1, nucleos_totales - 2)

💡 Si necesitas ejecutar tareas en paralelo o gestionar procesos externos sin bloquear tu aplicación principal, te recomiendo explorar cómo [lanzar y controlar subprocesos en Python](/tutoriales-python/iniciando-un-subproceso-en-python/) para aprovechar al máximo la concurrencia y optimizar el rendimiento de tus scripts.

print(f"Sistema tiene {nucleos_totales} núcleos")
print(f"Usaremos {nucleos_usar} para procesamiento")

¿Por qué dejar dos núcleos libres? Para que tu sistema operativo y otras aplicaciones no se queden sin recursos.

Paso 2: Crear la Función de Procesamiento

Tu función debe ser simple y enfocada. Recibe un archivo, ejecuta el comando, y devuelve el resultado o un error.

def procesar_archivo_binario(ruta_archivo):
    try:
        # Crear el proceso de lectura
        lector = subprocess.Popen(
            ['reader.exe', ruta_archivo, '-'],
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE
        )
        
        # Crear el proceso de escritura
        escritor = subprocess.Popen(
            ['writer.exe', '-'],
            stdin=lector.stdout,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE
        )
        
        # Permitir que lector reciba SIGPIPE si escritor termina
        lector.stdout.close()
        
        # Esperar a que termine
        salida, error = escritor.communicate()
        
        if escritor.returncode == 0:
            return {'archivo': ruta_archivo, 'estado': 'éxito'}
        else:
            return {'archivo': ruta_archivo, 'estado': 'error', 'mensaje': error.decode()}
            
    except Exception as e:
        return {'archivo': ruta_archivo, 'estado': 'excepción', 'mensaje': str(e)}

Paso 3: Implementar el Pool de Procesos

Ahora conectamos todo usando ProcessPoolExecutor, que proporciona la interfaz más moderna:

from concurrent.futures import ProcessPoolExecutor, as_completed
from pathlib import Path

def procesar_lote_archivos(directorio, patron='*.bin'):
    # Obtener lista de archivos
    archivos = list(Path(directorio).glob(patron))
    print(f"Encontrados {len(archivos)} archivos para procesar")
    
    # Calcular workers
    num_workers = max(1, multiprocessing.cpu_count() - 2)
    
    # Procesar en paralelo
    with ProcessPoolExecutor(max_workers=num_workers) as executor:
        # Enviar todos los trabajos
        futures = {executor.submit(procesar_archivo_binario, str(archivo)): archivo 
                  for archivo in archivos}
        
        # Recolectar resultados conforme terminan
        completados = 0
        for future in as_completed(futures):
            resultado = future.result()
            completados += 1
            
            print(f"[{completados}/{len(archivos)}] {resultado['archivo']}: {resultado['estado']}")
            
            if resultado['estado'] != 'éxito':
                print(f"  Error: {resultado.get('mensaje', 'desconocido')}")

if __name__ == '__main__':
    procesar_lote_archivos('./datos', '*.bin')

💡 Si estás comenzando en el mundo del desarrollo web y te preguntas cuál es la mejor ruta de aprendizaje, te recomiendo explorar cómo combinar Python y JavaScript en tus proyectos para aprovechar lo mejor de ambos lenguajes y construir aplicaciones más robustas y versátiles desde el backend hasta el frontend.

Manejo de Errores y Timeouts

Un aspecto crucial al trabajar con subprocesos múltiples es manejar procesos que se cuelgan o fallan. No quieres que un archivo problemático bloquee todo tu pipeline.

Implementar Timeouts

def procesar_con_timeout(archivo, timeout=300):
    try:
        lector = subprocess.Popen(
            ['reader.exe', archivo, '-'],
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE
        )
        
        escritor = subprocess.Popen(
            ['writer.exe', '-'],
            stdin=lector.stdout,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE
        )
        
        lector.stdout.close()
        
        # Esperar con timeout (en segundos)
        try:
            salida, error = escritor.communicate(timeout=timeout)
            return {'estado': 'éxito', 'archivo': archivo}
        except subprocess.TimeoutExpired:
            escritor.kill()
            lector.kill()
            return {'estado': 'timeout', 'archivo': archivo}
            
    except Exception as e:
        return {'estado': 'error', 'archivo': archivo, 'mensaje': str(e)}

¿Qué pasa si un proceso se queda colgado indefinidamente? Sin timeout, tu script podría esperar eternamente.

Logging para Depuración

Cuando ejecutas múltiples procesos simultáneamente, el debugging se complica. Implementar logging adecuado es esencial:

import logging
from datetime import datetime

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(processName)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(f'procesamiento_{datetime.now():%Y%m%d_%H%M%S}.log'),
        logging.StreamHandler()
    ]
)

def procesar_con_logging(archivo):
    logger = logging.getLogger(__name__)
    logger.info(f"Iniciando procesamiento de {archivo}")
    
    inicio = datetime.now()
    resultado = procesar_archivo_binario(archivo)
    duracion = (datetime.now() - inicio).total_seconds()
    
    logger.info(f"Completado {archivo} en {duracion:.2f}s - Estado: {resultado['estado']}")
    return resultado

Optimización de Rendimiento

Una vez que tienes tu código paralelizado, el siguiente paso es optimizarlo. No siempre más workers significa mejor rendimiento.

💡 Si estás dando tus primeros pasos en inteligencia artificial y quieres ir más allá de la teoría, te recomiendo explorar estos proyectos prácticos de machine learning con Python que te permitirán construir aplicaciones reales mientras dominas los fundamentos del aprendizaje automático de forma progresiva y sin frustraciones.

Encontrar el Número Óptimo de Workers

import time

def benchmark_workers(archivos_prueba, workers_lista=[2, 4, 6, 8]):
    resultados = {}
    
    for num_workers in workers_lista:
        inicio = time.time()
        
        with ProcessPoolExecutor(max_workers=num_workers) as executor:
            list(executor.map(procesar_archivo_binario, archivos_prueba))
        
        duracion = time.time() - inicio
        resultados[num_workers] = duracion
        
        print(f"{num_workers} workers: {duracion:.2f}s")
    
    return resultados

La cantidad óptima depende de varios factores: velocidad de disco, uso de CPU por proceso, y memoria disponible.

Considerar Limitaciones de I/O

Si tus procesos son limitados por I/O (lectura/escritura de disco), agregar más workers puede empeorar el rendimiento. Los discos mecánicos especialmente sufren con acceso aleatorio excesivo.

# Para procesos I/O bound, considera menos workers
nucleos_io = min(4, multiprocessing.cpu_count())

# Para procesos CPU bound, usa más workers
nucleos_cpu = max(1, multiprocessing.cpu_count() - 2)

Alternativas Avanzadas: asyncio y Subprocesos

Para casos donde necesitas máximo control sobre la ejecución asíncrona, asyncio combinado con subprocess ofrece posibilidades interesantes:

import asyncio

async def ejecutar_comando_async(archivo):
    proceso = await asyncio.create_subprocess_shell(
        f'reader.exe {archivo} - | writer.exe -',
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE
    )
    
    stdout, stderr = await proceso.communicate()
    
    return {
        'archivo': archivo,
        'codigo_salida': proceso.returncode,
        'salida': stdout,
        'error': stderr
    }

💡 Si todavía tienes dudas sobre qué lenguaje elegir para tus proyectos backend, te recomiendo explorar esta [comparativa entre PHP y Python para desarrollo web](/tutoriales-python/php-o-python-para-web/), donde descubrirás cuál se adapta mejor a tus necesidades según rendimiento, curva de aprendizaje y ecosistema disponible.

async def procesar_lote_async(archivos, max_concurrentes=6):
    semaforo = asyncio.Semaphore(max_concurrentes)
    
    async def procesar_con_semaforo(archivo):
        async with semaforo:
            return await ejecutar_comando_async(archivo)
    
    tareas = [procesar_con_semaforo(archivo) for archivo in archivos]
    return await asyncio.gather(*tareas)

# Ejecutar
archivos = ['archivo1.bin', 'archivo2.bin', 'archivo3.bin']
resultados = asyncio.run(procesar_lote_async(archivos))

Esta aproximación es especialmente útil cuando combinas subprocesos con operaciones de red o bases de datos.

Comparación de Enfoques

Aquí tienes una tabla comparativa para ayudarte a elegir:

EnfoqueComplejidadRendimientoMejor Para
subprocess.run() en loopBajaBajoScripts simples, pocos archivos
concurrent.futures.ProcessPoolExecutorMediaAltoMayoría de casos, recomendado
multiprocessing.PoolMedia-AltaAltoControl granular de procesos
asyncio + subprocessAltaMuy AltoOperaciones mixtas I/O + subprocesos

Errores Comunes y Cómo Evitarlos

Error 1: No Usar if __name__ == '__main__'

En Windows, esto es absolutamente necesario o tu script entrará en un loop infinito de creación de procesos:

def mi_funcion(archivo):
    return procesar_archivo(archivo)

# CORRECTO
if __name__ == '__main__':
    with ProcessPoolExecutor() as executor:
        executor.map(mi_funcion, archivos)

💡 Si trabajas con APIs o bases de datos NoSQL, dominar el manejo de estructuras de datos complejas es fundamental; por eso te recomiendo explorar [cómo trabajar con JSON en Python](/tutoriales-python/operaciones-json-python/) para serializar, deserializar y manipular información de forma eficiente en tus proyectos.

# INCORRECTO (causará problemas en Windows)
with ProcessPoolExecutor() as executor:
    executor.map(mi_funcion, archivos)

Error 2: Olvidar Cerrar Recursos

Los subprocesos pueden dejar archivos abiertos o conexiones colgadas:

# MAL
proceso = subprocess.Popen(['comando'], stdout=subprocess.PIPE)
# ... olvidas cerrar proceso.stdout

# BIEN
try:
    proceso = subprocess.Popen(['comando'], stdout=subprocess.PIPE)
    # usar proceso
finally:
    proceso.stdout.close()
    proceso.wait()

Error 3: No Manejar Excepciones en Workers

Si un worker lanza una excepción no capturada, puede fallar silenciosamente:

def procesar_seguro(archivo):
    try:
        return procesar_archivo_binario(archivo)
    except Exception as e:
        logging.error(f"Error procesando {archivo}: {e}")
        return {'archivo': archivo, 'estado': 'error', 'excepcion': str(e)}

Monitoreo y Progreso en Tiempo Real

Cuando procesas cientos de archivos, quieres ver el progreso en tiempo real. Aquí una implementación con tqdm:

from tqdm import tqdm
from concurrent.futures import ProcessPoolExecutor, as_completed

def procesar_con_progreso(archivos):
    num_workers = max(1, multiprocessing.cpu_count() - 2)
    
    with ProcessPoolExecutor(max_workers=num_workers) as executor:
        futures = {executor.submit(procesar_archivo_binario, archivo): archivo 
                  for archivo in archivos}
        
        # Barra de progreso
        with tqdm(total=len(archivos), desc="Procesando") as pbar:
            for future in as_completed(futures):
                resultado = future.result()
                pbar.update(1)
                
                if resultado['estado'] != 'éxito':
                    tqdm.write(f"Error en {resultado['archivo']}")

La paralelización de código en Python no tiene por qué ser complicada. Con las herramientas correctas y un entendimiento sólido de los conceptos, puedes transformar scripts lentos en procesadores eficientes que aprovechan todo el poder de tu hardware. Ya sea que uses ProcessPoolExecutor para simplicidad o asyncio para control máximo, ahora tienes las herramientas para implementar subprocesos múltiples de manera efectiva.