Si tienes cualquier duda sobre el servicio de Streaming de logs en tiempo real, contacta con nosotros en [email protected].
Para activar el servicio de Streaming de logs en tiempo real, tan sólo debes acceder al panel en https://dashboard.transparentcdn.com/ y dentro del apartado Logs, en la pestaña Streaming, encontrarás todo lo necesario.
Una vez activado el servicio, podrás descargarte un archivo zip que contiene los certificados digitales necesarios para autenticar a tus consumers así como una serie de plantillas preconfiguradas con tus datos para consumir los logs mediante filebeat, logstash y python.
Además, puedes agregar fácilmente las direcciones IP en las cuales instalarás el/los consumer, para que automáticamente se ajusten las reglas del firewall necesarias en nuestros brokers.
Las plantillas, vienen preconfiguradas con todos los datos necesarios, pero a continuación, vamos a enumerar el contenido del zip y ciertos requisitos y parámetros importantes a considerar, uno muy importante es el Consumer Group.
La dirección de nuestros brokers:
kafka1.edgetcdn.io
kafka2.edgetcdn.io
kafka3.edgetcdn.io
El puerto a utilizar, que será el 9093
Certificado público de cliente c<ID>.crt.pem
Certificado privado c<ID>.key.pem
Keystore en formato PKCS12 c<ID>.keystore.p12
La contraseña utilizada para cifrar el keystore y el truststore: password.txt
El certificado público de nuestra CA transparentcdnCA.pem
Truststore con nuestra CA (necesario en algunos consumers): truststore.p12
Plantilla para Filebeat: filebeat.yml
Plantilla para Logstash: kafka-logstash.conf
Consumer sencillo en Python: consumer.py
El Topic al que suscribirse que será c<ID>
El prefijo de Consumer Group al que unir tus consumers. Por ejemplo, si tu <ID>
(identificador de cliente) es el 83
, te suscribirás al topic c83
, y podrás unir tus consumers a cualquier "Consumer Group" que comience por c83_
como por ejemplo c83_group1
, c83_test
, c83_pre
... Puedes consultar más información sobre los consumer groups aquí.
La(s) dirección(es) IP(s) desde donde conectarán tus consumers. Que las puedes añadir desde el propio panel (al añadirlas, se necesita un margen de 5 minutos hasta que estén activas en nuestro firewall):
En la actualidad existen multitud de destinos para tus logs, te podría interesar el ingestarlos en un elasticsearch para hacer análitica de datos, o tal vez subirlos a un servicio de terceros como Datadog o Amazon S3, las opciones son casi infinitas y va a depender mucho de tus necesidades de negocio.
Es por esto que, siendo fieles a nuestra filosofia de hacer las cosas lo más simples posible, te vamos a proponer que uses dos herramientas ampliamente utilizadas en la comunidad como son Filebeat o/y Logstash para consumir tus logs de nuestro sistema de Logs en Streaming.
Es muy común, sobre todo para gente que no está familiarizada con este tipo de tecnologías el confundir cuando usar Logstash, cuando usar Filebeat o cuando usarlos juntos que también se puede. Aquí vamos a intentar explicarlo de una manera un poco somera pero lo suficientemente sencilla como para poder tomar una decisión al respecto.
Logstash es un programa escrito en java parte del stack ELK (ElasticSearch - Logstash - Kibana) desarrollado y mantenido por la compañía ElasticSearch.
Filebeat sin embargo está escrito en Go por la misma compañía y surgio como respuesta a una necesidad incipiente de la comunidad de tener una herramienta ligera para transportar logs ya que logstash consume bastante más que Filebeat al estar escrito en java.
Filebeat como digo es un software muy liviano que te permite transportar logs de un sitio a otro, lo mismo que Logstash (este segundo no tan liviano), sin embargo, Logstash es mucho más versatil y potente de Filebeat y te permite consumir logs (Inputs) de un número mayor de sitios y enviarlos también a un número mayor de salidas (Ouputs).
Aquí os dejo los enlaces a los Input y Output de Logstash y Filebeat
Por tanto usar Logstash o Filebeat para sacar los logs de nuestro sistema de Logs en Streaming va a depender de tus necesidades, principalmente de el destino final de los mismos, logs por segundo y si quieres hacer algún tipo de transformación con los mismos.
Resumiendo, nuestra recomendación es que uses Filebeat siempre que puedas ya que es más ligero y fácil de configurar. Si necesitas alguna salida que no está en Filebeat o hacer alguna transformación usa Logstash.
Recuerda que siempre tendrás una tercera opción que también es válida y contemplamos en esta documentación y es que escribas tu propio consumer usando tu lenguaje de programación favorito.
Veamos ahora un despliegue sencillo de Filebeat en un servidor Debian como primera toma de contacto en que vamos a dejar el log en un fichero de texto.
La documentación oficial se puede encontrar en: https://www.elastic.co/es/beats/filebeat
Usaremos estos datos de ejemplo pero recuerda que el zip que descargaste tras activar el servicio ya contiene una plantilla con todos los datos necesarios llamada filebeat.yml
Certificados c83.crt.pem
y c83.key.pem
Contraseña: password
Topic: c83
Consumer group: c83_filebeat
Primero descargamos e instalamos el paquete de Filebeat en nuestro servidor:
curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.9.0-amd64.debsudo dpkg -i filebeat-7.9.0-amd64.deb
Habilitamos el módulo de Kafka:
filebeat modules enable kafkafilebeat setup -e
Editamos la configuración de Filebeat /etc/filebeat/filebeat.yml
y pegamos los datos que tenemos en la plantilla filebeat.yml
, necesitarás editar los siguientes parámetros si copias los certificados en ubicaciones distintas o si modificas la ruta donde se volcarán los ficheros:
ssl.certificate:
Ubicación de c<ID>.crt.pem
ssl.key:
Ubicación de c<ID>.key.pem
ssl.certificate_authorities:
Ubicación de transparentcdnCA.pem
path:
Ruta final donde se depositarán los logs que consuma Filebeat.
filebeat.inputs:- type: kafkahosts:- kafka1.edgetcdn.io:9093- kafka2.edgetcdn.io:9093- kafka2.edgetcdn.io:9093topics: ["c83"]group_id: "c83_filebeat"initial_offset: "newest"ssl.enabled: yesssl.certificate: "/etc/filebeat/secret/c83.crt.pem"ssl.key: "/etc/filebeat/secret/c83.key.pem"ssl.key_passphrase: "password"ssl.certificate_authorities:- /etc/filebeat/secret/transparentcdnCA.pemoutput.file:codec.format:string: '%{[message]}'path: "/tmp/filebeat"filename: kafka-filebeat.outrotate_every_kb: 50000
En el servidor donde configuremos Filebeat, copiamos la clave pública y privada del certificado así como la CA de Transparent CDN a las rutas que hayamos definido finalmente en la configuración.
También deberás crear la carpeta definida en el path:
en el caso de que no exista.
Una vez configurado todo, tan sólo tendrás que iniciar el servicio de Filebeat, normalmente mediante systemd con systemctl start filebeat
, y si todo ha ido bien, verás como se empiezan a consumir los logs en la ruta que hayas definido en path:
[email protected]:/tmp/filebeat# ls -lrttotal 4-rw------- 1 root root 49M ago 27 08:43 kafka-filebeat.out.7-rw------- 1 root root 49M ago 27 08:43 kafka-filebeat.out.6-rw------- 1 root root 49M ago 27 08:43 kafka-filebeat.out.5-rw------- 1 root root 49M ago 27 08:44 kafka-filebeat.out.4-rw------- 1 root root 49M ago 27 08:44 kafka-filebeat.out.3-rw------- 1 root root 49M ago 27 08:45 kafka-filebeat.out.2-rw------- 1 root root 49M ago 27 08:51 kafka-filebeat.out.1-rw------- 1 root root 4,6M ago 27 08:52 kafka-filebeat.out
Ahora veamos cómo consumir nuestros logs mediante Logstash.
Nota: Usaremos el Keystore y el Truststore en lugar del par de claves privada y pública, tendrás que copiarlos al servidor donde ejecutes Logstash. Además, el servicio de Systemd por defecto utiliza el usuario Logstash, por lo que éste deberá tener permisos de lectura sobre ellos. Para el ejemplo los dejaremos en /etc/logstash/certs
.
Instalaremos Logstash desde la paquetería oficial, ejecuta los siguientes comandos para agregar el repositorio e instalar Logstash. O bien sigue la guía oficial en https://www.elastic.co/guide/en/logstash/current/installing-logstash.html
wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo apt-key add -apt install apt-transport-httpsecho "deb https://artifacts.elastic.co/packages/7.x/apt stable main" | sudo tee -a /etc/apt/sources.list.d/elastic-7.x.listapt updateapt install logstash
Ahora configuraremos un pipeline, que consumirá los logs desde nuestros servidores de Kafka y los volcará en un fichero en formato JSON, categorizando cada campo del log.
Recuerda que Logstash ofrece múltiples entradas/salidas a distintos sistemas y te permite personalizar y mutar los logs, más info en: (Input plugins, Output plugins).
Para ello crea un fichero nuevo en /etc/logstash/conf.d/kafka-logstash.conf
con el contenido que has recibido en la plantilla kafka-logstash.conf
en el zip que descargaste desde el panel, necesitarás editar los siguientes parámetros si copias los certificados en ubicaciones distintas o si modificas la ruta donde se volcarán los ficheros:
ssl_keystore_location:
Ubicación de c<ID>.keystore.p12
ssl_truststore_location:
Ubicación de truststore.p12
path => :
Ruta al fichero donde se volcarán los logs.
input {kafka {bootstrap_servers => "kafka1.edgetcdn.io:9093,kafka2.edgetcdn.io:9093,kafka2.edgetcdn.io:9093"topics => "c83"group_id => "c83_logstash"auto_offset_reset => "latest"security_protocol => "SSL"ssl_keystore_location => "/etc/logstash/certs/c83.keystore.p12"ssl_keystore_password => "password"ssl_truststore_location => "/etc/logstash/certs/truststore.p12"ssl_truststore_password => "password"}}filter {grok {match => {"message" => ["%{DATA:clientip} - %{DATA:user} \[(.*)\] \"%{WORD:verb} %{DATA:request} %{DATA:httpversion}\" %{NUMBER:statuscode} %{DATA:bytes} \"%{DATA:useragent}\" %{DATA:hitmiss} \"%{DATA:content-type}\" \"%{DATA:layer}\" %{NUMBER:requesttime} \"%{DATA:clientid}\" \"%{DATA:referer}\" %{DATA:forwardedproto} %{DATA:country}(.*)","%{DATA:clientip} - %{DATA:user} \[(.*)\] %{DATA:vod_host} \"%{WORD:verb} %{DATA:request} %{DATA:httpversion}\" %{NUMBER:statuscode} %{DATA:bytes} \"%{DATA:referer}\" \"%{DATA:useragent}\" \"%{DATA:content-type}\" \"%{DATA:hitmiss}\" \"%{DATA:layer}\" \"%{DATA:clientid}\" %{NUMBER:requesttime} %{DATA:forwardedproto} %{DATA:country}(.*)","%{DATA:clientip} - %{DATA:user} \[(.*)\] %{DATA:vod_host} \"%{WORD:verb} %{DATA:request} %{DATA:httpversion}\" %{NUMBER:statuscode} %{DATA:bytes} \"%{DATA:referer}\" \"%{DATA:useragent}\" \"%{DATA:content-type}\" \"%{DATA:hitmiss}\" \"%{DATA:layer}\" \"%{DATA:clientid}\" %{NUMBER:requesttime} %{DATA:forwardedproto}(.*)"]}}date {match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]}mutate { remove_field => [ "message" ] }}output {file {path => "/var/log/tcdn_streaming/logstash.out"codec => json_lines}}
Ya que el servicio se ejecutará con el usuario logstash, nos aseguramos de tener los permisos correctos, tanto para los certificados, como la configuración, como el directorio final donde escribirá logstash:
chown logstash:logstash -R /etc/logstash/mkdir /var/log/tcdn_streamingchown logstash:logstash /var/log/tcdn_streaming
Ahora sólo resta iniciar Logstash, podemos hacerlo por línea de comandos para verificar que todo funciona con:
sudo -u logstash /usr/share/logstash/bin/logstash "--path.settings" "/etc/logstash"
Recuerda verificar que todos los archivos necesarios son accesibles por el usuario "logstash", incluyendo los certificados, y el fichero + carpeta de destino.
Al cabo de unos segundos, comenzarás a recibir los logs en el fichero o salida que hayas indicado en formato JSON, te dejamos un ejemplo:
[email protected]:/var/log/tcdn_streaming# tail -1 logstash.out | jq{"requesttime": "0.000193","hitmiss": "hit","clientid": "83","verb": "GET","content-type": "application/javascript","httpversion": "HTTP/1.1","bytes": "621","request": "http://www.ejemplo.com/build/js/Core/Core.3160595ce5a674e1205b409c3e53616c4b44a9b3.js","referer": "https://referer.ejemplo.com/","user": "-","forwardedproto": "https","clientip": "11.11.222.111","@version": "1","statuscode": "200","layer": "L1","useragent": "Mozilla/5.0 (iPhone; CPU iPhone OS 13_7 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.1.2 Mobile/15E148 Safari/604.1","@timestamp": "2020-09-14T12:44:19.475Z"}
Si todo ha ido bien, puedes cancelar el comando anterior y dejar el servicio ejecutándose con:
systemctl start logstash.service
Se puede programar un consumer en muchos lenguajes distintos, vamos a ver un ejemplo con Python, utilizando el cliente o librería de "confluent-kafka-python" https://github.com/confluentinc/confluent-kafka-python
Otra opción también muy popular es https://github.com/dpkp/kafka-python
Vamos con el ejemplo (nuevamente, usaremos Debian como sistema operativo):
Instalamos los paquetes necesarios:
sudo apt install python3-pippip3 install confluent-kafka --user
Creamos el fichero python con el contenido que recibiste en la plantilla consumer.py
, abajo se muestran unos datos de ejemplo, la parte importante estaría en el apartado de configuración, necesitarás editar los siguientes parámetros si copias los certificados en ubicaciones distintas:
ssl.ca.location:
Ubicación de transparentcdnCA.pem
ssl.keystore.location:
Ubicación de c<ID>.keystore.p12
#!/usr/bin/python3import sysimport loggingimport socketfrom confluent_kafka import ConsumerLOG_FMT = '[%(asctime)s][%(levelname)s] %(message)s'logging.basicConfig(stream=sys.stdout,format=LOG_FMT,datefmt='%Y.%m.%d %H:%M:%S',level=logging.INFO)def assigned(consum, parti):logging.info("Assigned consumer: %s on partition %s", repr(consum), repr(parti))def revoked(consum, parti):logging.error("Failed to assign consumer: %s on partition %s", repr(consum), repr(parti))if __name__ == "__main__":# CONFIGURACIONSERVERS = 'kafka1.edgetcdn.io:9093,kafka2.edgetcdn.io:9093,kafka3.edgetcdn.io:9093'TOPIC = 'c83'CONF = {'bootstrap.servers': SERVERS,'client.id': socket.gethostname(),'security.protocol': 'SSL','ssl.ca.location': '/usr/local/share/ca-certificates/transparentcdnCA.pem','ssl.keystore.location': '/root/secret/c83.keystore.p12','ssl.keystore.password': 'password','group.id': TOPIC + '_python'}# FIN CONFIGURACIONconsumer = Consumer(CONF)consumer.subscribe([TOPIC], on_assign=assigned, on_revoke=revoked)count = 0try:while True:msg = consumer.poll(1.0)if msg is None:#logging.info("Waiting for message or event/error in poll()")continueif msg.error():logging.error(msg.error())else:value = msg.value()offset = msg.offset()topic = msg.topic()logging.info("Mensaje recibido numero {} - Value: \"{}\" - Topic: {} - Offset: {}".format(str(count),value.decode('utf-8'),str(topic),str(offset)))count += 1except KeyboardInterrupt:passfinally:consumer.close()
Si todo es correcto, al iniciar el consumer recibirás el siguiente mensaje y empezarás a consumir del topic:
[email protected]:~# ./consumer.py[2020.08.21 09:57:21][INFO] Assigned consumer: <cimpl.Consumer object at 0x7fada8213f28> on partition [TopicPartition{topic=c83,partition=0,offset=-1001,error=None}, TopicPartition{topic=c83,partition=1,offset=-1001,error=None}]
Tradicionalmente los Brókers de mensajería (Message Brokers), actuaban de dos formas:
Queue: El mensaje se publica una vez, se consume una vez.
Pub/Sub: El mensaje se publica una vez, se consume múltiples veces.
Kafka puede funcionar de las dos formas gracias a los Consumer Groups:
Si queremos actuar como una cola (Queue), ponemos todos los consumers en el mismo consumer group.
Si queremos actuar como un Pub/Sub, cada consumer va en un grupo distinto.
Actualmente, creamos los topics con 2 particiones por defecto (se pueden aumentar si se solicita), vamos a ver unos ejemplos con un topic de 2 particiones:
Iniciamos 3 consumers, los 3 en el mismo consumer group: Uno de ellos, consume de la partición 0, otro de la 1, y el último queda totalmente parado. Conseguimos procesamiento en paralelo y alta disponibilidad. (La alta disponibilidad también la logramos con 2 consumers, si uno falla, el otro consumirá de las dos particiones). Los mensajes estarán repartidos entre el consumer 1 y el consumer 2.
Iniciamos 2 consumers, cada uno en distinto consumer group: Los 2 van a recibir TODOS los mensajes del topic y estarán totalmente aislados. Es útil si queremos realizar un procesamiento distinto de los mensajes recibidos para cada uno de ellos. A este esquema podemos añadir más consumers, el resultado será el mismo, cada uno de ellos recibirá todos los mensajes de todas las particiones.
Dado que trabajamos con logs y a no ser que se requieran varios post-procesos distintos, lo más interesante es tener los consumers en el mismo consumer group, y es más que probable que solamente un consumer sea suficiente dado el rendimiento que ofrece Kafka. Se pueden iniciar mas consumers si uno de ellos no puede consumir en tiempo real, o si queremos procesamiento en paralelo + alta disponibilidad.