[pyar] Actualizando estado de carga de datos con Django y Celery

Carlos Matías cmdelatorre en gmail.com
Lun Jul 25 09:39:05 ART 2016


Hola, un par de comentarios que capaz alguno te sirva:
1) procesar_csv_task.delay(instance)

Ojo con pasar directamente una instancia de modelo Django a las tasks de
Celery. Típicamente se pasa un pk y dentro de la tarea tendrías que obtener
la instancia correspondiente:
http://docs.celeryproject.org/en/latest/userguide/tasks.html#state


2) cantidad_registros = len(list(clientes)) # si realizo esto la tarea
termina

DictReader es un iterator. En esa línea lo estás recorriendo entero. Más
adelante, en un for estás queriendo re-recorrerlo pero el iterador está
exhausto: https://docs.python.org/3.4/glossary.html#term-iterator
Ya que estás convirtiendo el iterador clientes en una lista, podrías
guardarlo para re-utilizar esa lista.

3) Hay muchas cosas que podés hacer para resolver el tema de mostrar cada
100 registros. Por ejemplo, en el post-save solamente encola
procesar_csv_task si existe_archivo and instance.porcentaje_carga == 0 y
dentro de la task te encargás de actualizar ese campo cuando haga falta (al
inicio lo ponés en 1 y después, cada 100 registros, lo volvés a actualizar).


Suerte con eso


> ---------- Forwarded message ----------
> From: Marcelo Leiva Sandoval <chelitoleiva en gmail.com>
> To: Python Argentina <pyar en python.org.ar>
> Cc:
> Date: Sat, 23 Jul 2016 14:42:25 -0300
> Subject: [pyar] Actualizando estado de carga de datos con Django y Celery
> Buenas tardes, tengo una vista donde de sube al servidor un archivo csv
> (más de 50.000 registros) para realizar una carga masiva de datos.
> Actualmente este archivo se guarda en /media/ y en con el signal
> post_save() se ejecuta una tarea en Celery para realizar la carga de
> información.
> Esto funciona bastante bien, pero ahora necesito mostrar el estado de
> avance de la carga de datos en el mismo modelo donde se encuentra el
> archivo csv.
>
> # models.py
>
> class ArchivoCSV(models.Model):
>     nombre = models.CharField(verbose_name='Nombre del archivo', max_length=200, editable=False)
>     archivo = models.FileField(upload_to=PATH_CSV_PREDIOS, max_length=800)
>     porcentaje_carga = models.PositiveIntegerField(verbose_name='Porcentaje de carga', default=0,
>                                                    validators=[MaxValueValidator(100), MinValueValidator(0)],
>                                                    editable=False)
>     estado = models.CharField(verbose_name='Estado de carga', choices=ESTADOS_CARGA, default='S', max_length=1,
>                               editable=False)
>
>     creado_el = models.DateTimeField(auto_now_add=True)
>     modificado_el = models.DateTimeField(auto_now=True)
>     history = HistoricalRecords()
>
>     class Meta:
>         verbose_name_plural = "Cargar archivo CSV."
>         ordering = ['-creado_el']
>
>     def __str__(self):
>         return '{0}'.format(self.nombre)
>
>     def save(self, *args, **kwargs):
>         if not self.pk:
>             self.nombre = self.archivo
>         super(ArchivoCSV, self).save(*args, **kwargs)
>
>     def delete(self, using=None):
>         archivo = self.archivo.name
>         super(ArchivoCSV, self).delete()
>         existe_archivo = os.path.exists(archivo)
>         if existe_archivo:
>             try:
>                 os.remove(archivo)
>             except Exception as e:
>                 print(e)
>
>
> @receiver(post_save, sender=PredioCSV)
> def procesar_archivo_post_save(sender, **kwargs):
>     instance = kwargs['instance']
>     existe_archivo = os.path.exists(instance.archivo.name)
>     if existe_archivo:
>         from .tasks import procesar_csv_task
>         procesar_csv_task.delay(instance)
>
>
> En el task si agrego cantidad_registros = len(list(clientes)) la tarea
> termina. Y si actualizo la instancia la tarea se multiplica infinitamente.
>
> # tasks.py
>
> @app.task(ignore_result=True, name="cargando-csv")
> def procesar_csv_task(instance):
>     nuevos_clientes = []
>     listado_clientes = Cliente.objects.all()
>
>     encoding = determinar_encoding(open(instance.archivo.name, "rb").read())
>     if encoding:
>         clientes = DictReader(open(instance.archivo.name, "rb"), delimiter=';', encoding=encoding,
>                               skipinitialspace=True)
>
>         cantidad_registros = len(list(clientes))  # si realizo esto la tarea termina
>
>         try:
>             for index, row in enumerate(clientes, start=1):
>                 try:
>                     codigo = row['COD_CLIENTE']
>                     ...
>
>                     if listado_clientes.filter(codigo=codigo).first():
>                     # se editan datos del cliente
>                     else:
>                         nuevo_cliente = Cliente()
>                         ...
>
>                         nuevos_clientes.append(nuevo_cliente)
>                 except Exception as e:
>                     print(e)
>                 if index % 100 == 0:
>                     instance.porcentaje_carga = index * 100 / cantidad_registros  # con esto la tarea se multiplica
>                     instance.save()
>
>             Cliente.objects.bulk_create(nuevos_clientes)
>
>         except Exception as e:
>             print(e)
>     else:
>         print('No se puede definir el encodig del archivo csv.')
>
>
> La idea es cada 100 registros trabajados actualizar el modelo con el
> porcentaje de avance y luego de ejecutar el bulk_create cambiar el estado a
> finalizado. Quedo atento a sus preguntas y sugerencias.
>
> Saludos
>
> --
> Marcelo Leiva Sandoval
> Django & Symfony2 Developer
> Linux User #491264
>

-- 
Carlos Matías
------------ próxima parte ------------
Se ha borrado un adjunto en formato HTML...
URL: <http://listas.python.org.ar/pipermail/pyar/attachments/20160725/0e662ce0/attachment.html>


Más información sobre la lista de distribución pyar