[pyar] leyendo archivo en paralelo

Angel Java Lopez ajlopez2000 en gmail.com
Sab Ago 9 12:31:43 ART 2014


Ah, interesante problema, maese @sbassi

Bien, ni idea de python, pero hay dos grandes caminos:

- Hacerlo con subprocesos
- Hacerlo con threads

Veamos el primero. El programa principal deberia

- Lanzar los n workers como subprocesos, y colgarse de los stdin y stdout
de esos subprocesos
- Cada subproceso lee cada linea de stdin, calcula lo que tiene que
calcular, y escribe en su stdout

El programa principal, luego, tiene DOS threads (o el principal y uno
accesorio).

En el primer thread, lee lineas del bigfile, y la envia al stdin de un
subproceso, haciendo round-robin, es decir, dandole una linea al subproceso
0, proxima linea al subproceso 1, .... proxima linea al subproceso n-1,
proxima linea al proceso 0, y asi

En el segundo thread, lee linea de subproceso 0, y escribe en algun lugar
de resultado, un archivo creo que pusiste, lee linea de subproceso 1, y
escribe, y asi, con round-robin de nuevo

Asi como esta, es fragil, hay que agregar: que pasa si el subproceso k se
cuelga, o termina mal, o se va de parranda ;-)

Supongo que el pipe de procesos soporta "backpressure", es decir, que al
grabar al stdin de un subproceso, alguna vez se bloquea, porque hay mucho
acumulado. Porque sino, el primer thread los atosiga de escrituras
(atosiga, que vocabulario ;-)

En la solucion de threads, hay un solo programa con:

- n threads lanzados, cada uno con la logica de un worker, y una cola qi de
entrada, y una cola qo de salida

Las colas soportan concurrencia, y para evitar "backpressure", las supongo
con limite. Es decir, q.put(...) lo acepta, pero si hay digamos 100 items
en la cola, el put es bloqueante

Luego, un thread que lea bigfile y con round robin, ponga elementos en la
cola de lectura, qi, de cada worker
Luego, un thread que lea con round robin, los qo de los workers, y escriba
en una salida

Creo entender que los resultados tienen que estar en el mismo orden que la
entrada. Es realmente necesario? Hay soluciones mas dinamicas si ese
requerimiento se relaja (por ej, poniendo en la salida "nro de mensaje +
salida", y luego en otro proceso ordenando el resultado). Pero no se el
contexto

Esta bien?

Nos leemos!

Angel "Java" Lopez
@ajlopez




2014-08-08 10:52 GMT-03:00 Sebastian Bassi <sebastian.bassi en globant.com>:

> Tengo un archivo que crece rápido (lo llamo "bigfile") y un programa
> que programa que lee bigfile y hace algo con cada linea que tarda mas
> tiempo que lo que tarda en generarse en ese archivo una linea nueva.
> Por ese motivo, quiero correr varias veces un programa para atacar
> bigfile desde varios lugares simultaneamente.
> Para esto hice 3 programas:
> 1- Un "generador" de bigfile que crece rápido, con fines de
> experimentación y para que lo puedan reproducir sin tener que tener el
> archivo verdadero que requiere varios programas para generar. codigo:
> http://pastebin.com/2j98y2iE
> 2- Un "procesador" o "worker" que es el que lee y procesa el archivo,
> pero salteandose X lineas desde la linea Y (X e Y son argumentos de
> linea de comandos). Por ejemplo lo puedo correr salteandose 10 lineas
> y empezando por la 1ra, y luego en paralelo corro otra instancia
> tambien cada 10, pero comenzando por la 2da, etc. Este programa, tira
> el resultado a stdout. codigo: http://pastebin.com/gdJV7N0A
> 3- Un programa para disparar los workers y recolectar todos los
> outputs de manera ordenada. codigo: http://pastebin.com/TWNvK9da
>
> Esto funciona, sobre todo si el archivo que crece termina de crecer
> antes que el ultimo worker llegue al final. Ahora, si los workers
> llegan al final del archivo, pero este luego vuelve a crecer, los
> workers no se dan cuenta y terminan, no "espera" a que haya mas
> lineas.
>
> Cualquier sugerencia de como mejorar esto será bienvenida, aca pego el
> código del programa 3 que es el mas relevante:
>
> import subprocess
> import os
> import pdb
>
>
> workers = 10
> ps = [subprocess.Popen(['python', 'processbf.py', str(workers), str(i)],
>                         stdout=subprocess.PIPE, close_fds=True) for i
> in range(workers)]
>
> try:
>     os.remove("out.txt")
> except:
>     pass
>
> mx = 0
> while workers>mx:
>     for p in ps[:]:
>         line = p.stdout.readline()
>         if line:
>             with open("out.txt", 'a') as fo:
>                 fo.write(line)
>         else:
>             mx+=1
>
>
> Saludos,
>
>
>
> --
> Sebastián Bassi. sebastian.bassi en globant.com
> Lic. en Biotecnología con orientación en genética molecular.
> Software Developer @ Globant.
> _______________________________________________
> pyar mailing list pyar en python.org.ar
> http://listas.python.org.ar/listinfo/pyar
>
> PyAr - Python Argentina - Sitio web: http://www.python.org.ar/
>
> La lista de PyAr esta Hosteada en USLA - Usuarios de Software Libre de
> Argentina - http://www.usla.org.ar
>
------------ próxima parte ------------
Se ha borrado un adjunto en formato HTML...
URL: <http://listas.python.org.ar/pipermail/pyar/attachments/20140809/ac59a126/attachment.html>


More information about the pyar mailing list