123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289 |
- # -*- coding: utf-8 -*-
- '''
- Gestor de nodos para LaOtraRed La Paz - El Alto
- Copyright (C) 2017 Rodrigo Garcia
- This program is free software: you can redistribute it and/or modify
- it under the terms of the GNU Affero General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU Affero General Public License for more details.
- You should have received a copy of the GNU Affero General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
- '''
- # Este archivo contiene utilidades en general
- import hashlib
- import uuid
- from database.database import init_db, db_session
- from database.database import engine
- from .models import Nodo, Ubicacion
- from gestor.configs import RUTA_SCRIPT_GENERADOR_FIRMWARES, RUTA_IMAGENES_FIRMWARE_GENERADAS
- SeleccionValidaDireccionesPublicas = (14,30,62,126,254)
- SeleccionValidaDireccionesPrivadas = (30,62,126)
- def listas_routers():
- ''' Busca en la ruta del script para generar firmware los archivos
- `modelos_2.4Ghz.txt' y `modelos_dualBand.txt'
- Retorna dos listas una lista con los modelos de routers en el archivo de 2.4Ghz
- y otra con los modelos dualBand
- '''
- archivo_2_4Ghz = RUTA_SCRIPT_GENERADOR_FIRMWARES + "modelos_2.4Ghz.txt"
- archivo_dualBand = RUTA_SCRIPT_GENERADOR_FIRMWARES + "modelos_dualBand.txt"
- modelos_2_4Ghz = []
- try:
- fil = open(archivo_2_4Ghz, 'r')
- for modelo in fil.readlines():
- modelos_2_4Ghz.append(modelo[:-1]) # omitiendo posible \n
- except:
- return ("Error interno, con archivo de modelos <b>2.4Ghz</b>")
- modelos_dualBand = []
- try:
- fil = open(archivo_dualBand, 'r')
- for modelo in fil.readlines():
- modelos_dualBand.append(modelo[:-1]) # omitiendo posible \n
- except:
- return ("Error interno, con archivo de modelos <b>DUAL BAND</b>")
- return (modelos_2_4Ghz, modelos_dualBand)
- # funciones de comprobacion de formulario para el chef
- def comprobar_modelo(modelo_router):
- ''' Comprueba que `modelo_router' este en la lista de routers soportados por
- el script, devolviendo True en caso de exito y False si no.
- '''
- listasRouters = listas_routers()
- if modelo_router in listasRouters[0] or \
- modelo_router in listasRouters[1]:
- return True
- return False
- def comprobar_numeroDireccionesPublicas(num):
- return int(num) in SeleccionValidaDireccionesPublicas
- def comprobar_numeroDireccionesPrivadas(num):
- return int(num) in SeleccionValidaDireccionesPrivadas
- def comprobar_nombresResponsable(nombres=None):
- if not nombres or len(nombres) < 4 or len(nombres) > 200:
- return False
- return True
- def comprobar_apellidosResponsable(apellidos=None):
- if not apellidos or len(apellidos) < 4 or len(apellidos) > 200:
- return False
- return True
- def comprobar_emailResponsable(email=None):
- if not email or len(email) < 5 or len(email) > 254 or \
- email.count('@') == 0 or email.count('@') > 2:
- return False
- # TODO: Analizar agregar comprobacion de existencia de email
- # Aunque tambien se puede comprobar generando la imagen
- # de firmware y controlando que esta se descargue 1 vez en
- # maximo 12 horas. Cuando el proceso de generacion de
- # imagenes termine se le envia un email al responsable con
- # una url privada donde el(la) puede descargarla.
-
- # El envio de email deberia ser cada 2 horas si no ingresa
- # a la url al cabo de 12, se libera este bloque de ips.
-
- return True
- def comprobar_telefonoResponsable(telefono=None):
- if telefono is not None and len(telefono) > 20:
- return False
- return True
- def comprobar_nombreNodo(nombre=None):
- if not nombre or len(nombre) < 3 or len(nombre) > 200:
- return False
- return True
- def comprobar_descripcionNodo(desc=None):
- if desc is not None and len(desc) > 2599:
- return False
- return True
- def comprobar_claveEdicion(clave=None):
- if not clave or len(clave) < 4:
- return False
- return True
- def comprobar_ubicacionEsPublica(is_public=None):
- if is_public == u"SI" or is_public == "NO":
- return True
- return False
- def comprobar_postAgregarNodo(request, errores_detalle=False):
- ''' Comprueba que el POST contenga campos validos para introducirlos
- en la base de datos y comenzar el proceso de construccion de firmware
-
- Si errores_detalle=False
- Retorna True si los campos son correctos y False en otro caso
- Si errores_detalle=True:
- Retorna un diccionario con los errores detectados
- Retorna una lista vacia en caso de que los campos seran correctos
- '''
- # TODO: agregar log
- print ("--- Agregando Nuevo Nodo --- ")
- for k,v in request.form.iteritems():
- if k != "clave":
- print (k,":",v)
- else:
- h = hashlib.sha256()
- h.update(v.encode('utf-8'))
- print (k,":",h.hexdigest())
- print ("--------------------------- ")
- errores = {}
- try:
- if not comprobar_modelo(request.form["modelos"]):
- errores['modelo'] = u"Modelo de enrutador inválido"
- if not comprobar_numeroDireccionesPublicas(request.form["ips_publicas"]):
- errores['ips_publicas'] = u"Número de direcciones IP públicas no autorizadas"
- if not comprobar_numeroDireccionesPrivadas(request.form["ips_privadas"]):
- errores['ips_privadas'] = u"Número de direcciones IP privadas no autorizadas"
- if not comprobar_nombresResponsable(request.form["nombres"]):
- errores['nombres'] = u"Nombre(s) no permitido"
- if not comprobar_apellidosResponsable(request.form["apellidos"]):
- errores['apellidos'] = u"Apellido(s) no permitido"
- if not comprobar_emailResponsable(request.form["email"]):
- errores['email'] = u"email no permitido"
- if not comprobar_telefonoResponsable(request.form["telf"]):
- errores['telf'] = u"Teléfono no reconocido"
- if not comprobar_nombreNodo(request.form["nombre_nodo"]):
- errores['nombre_nodo'] = u"Nombre del nodo no valido"
- if not comprobar_descripcionNodo(request.form["descripcion"]):
- errores['descripcion'] = u"Descripción no valida"
- if not comprobar_claveEdicion(request.form["clave"]):
- errores['clave'] = u"Clave introducida no valida"
- if not comprobar_ubicacionEsPublica(request.form['compartir_ubicacion']):
- errores['compartir_ubicacion'] = u"Permiso de revelar ubicacion no valido"
-
- except:
- errores['excepcion'] = u"Excepcion ocurrida"
- print ("Excepcion ocurrida")
- if not errores_detalle:
- if len(errores) > 0:
- return False
- return True
- else:
- return errores
- # base de datos
- def comprobar_nodoSimilar(request):
- ''' Hace una consulta en la tabla Nodo y verifica que no exista un
- nodo similar, si existe retorna el id del nodo, si no retorna False
- '''
- nombres = request.form['nombres'].lower()
- apellidos = request.form['apellidos'].lower()
- email = request.form['email'].lower()
- n = db_session.query(Nodo).\
- filter(Nodo.email == email).\
- filter(Nodo.nombres_responsable == nombres).\
- filter(Nodo.apellidos_responsable == apellidos).first()
- print ("buscando nodo similar"+str(n))
- if n is not None:
- return n.id
- return False
- def comprobar_responsableNodo(request):
- ''' TODO: analizar la utilidad de esta funcion
- Hace un query en la tabla Nodo que alguien no trate de suplantar
- a otra persona colocando nombres o apellidos distintos a los que ya
- se tengan registrados para un email dado.
- Retorna el id del nodo similar o None si no se encuentra
- '''
- nombres = request.form['nombres'].lower()
- apellidos = request.form['apellidos'].lower()
- email = request.form['email'].lower()
- n = db_session.query(Nodo).\
- filter(Nodo.email == email).first()
- # existe registro de ese email
- print ("buscando email:"+str(n))
- if n is not None:
- if n.nombres_responsable != nombres or \
- n.apellidos_responsable != apellidos:
- print ("Ya existe registro del email <"+n.email+\
- "> dado con datos:"+\
- n.nombres_responsable+" "+n.apellidos_responsable)
- print ("Se introdujo: "+nombres+" "+apellidos)
- return n.id
- return None
- def obtener_ubicacionResumida(id_ubicacion):
- ''' Retorna tupla con algunos datos de la ubicacion resumida
- la tupla retornada es:
- (id, localidad, zona, direccion, is_public)'''
-
- u = db_session.query(Ubicacion).\
- filter(Ubicacion.id == id_ubicacion).first()
- if u is not None:
- return (id_ubicacion, u.localidad, u.zona, u.direccion, u.is_public)
- return ()
- def obtener_ubicacionDetallada(id_ubicacion):
- ''' Retorna tupla con detalles de ubicacion en orden:
- (localidad, zona, direccion, url_mapa, iframe,
- elevacion_aproximada, latitud, longitud, is_public)'''
-
- u = db_session.query(Ubicacion).\
- filter(Ubicacion.id == id_ubicacion).first()
- if u is not None:
- return (id_ubicacion, u.localidad, u.zona, u.direccion,
- u.url_mapa, u.iframe_mapa, u.elevacion_aproximada,\
- u.latitud, u.longitud, u.is_public)
- return ()
- def crear_ubicacionDummy(is_public=True):
- ''' Crea una ubicacion vacia y retorna su id
- '''
- u = Ubicacion(localidad='No especificada', direccion="No especificada",\
- is_public=is_public)
- try:
- db_session.add(u)
- db_session.commit()
- except:
- return None
- print ("Creando ubicacion DUMMY:"+str(u.id))
- return u.id
- def num_nodosActivos():
- ''' Retorna el numero de nodos activos '''
- return db_session.query(Nodo).filter(Nodo.is_active==True).count()
- def num_nodosProyectados():
- ''' Retorna el numero de nodos proyectados '''
- return db_session.query(Nodo).filter(Nodo.is_active==False).count()
- # return db_session.query(Nodo).filter(Nodo.is_confirmed==True).\
- # filter(Nodo.is_active==False).count()
-
- # otras utiliadades
- def get_randomToken():
- token = uuid.uuid4().hex
- return token
- def hash_claveEdicion(clave):
- h = hashlib.sha256()
- h.update(clave.encode('utf-8'))
- return h.hexdigest()
-
|