Logs en streaming
Documentación y una guía rápida de cómo echar a andar tu primer consumer
Si tienes cualquier duda sobre el servicio de Streaming de logs en tiempo real, contacta con nosotros en [email protected]
Para activar el servicio de Streaming de logs en tiempo real, tan sólo debes acceder al panel en https://dashboard.transparentcdn.com/ y dentro del apartado Logs, en la pestaña Streaming, encontrarás todo lo necesario.
Una vez activado el servicio, podrás descargarte un archivo zip que contiene los certificados digitales necesarios para autenticar a tus consumers así como una serie de plantillas preconfiguradas con tus datos para consumir los logs mediante filebeat, logstash y python.
Además, puedes agregar fácilmente las direcciones IP en las cuales instalarás el/los consumer, para que automáticamente se ajusten las reglas del firewall necesarias en nuestros brokers.
Las plantillas, vienen preconfiguradas con todos los datos necesarios, pero a continuación, vamos a enumerar el contenido del zip y ciertos requisitos y parámetros importantes a considerar, uno muy importante es el Consumer Group.

Parámetros de conexión:

  • La dirección de nuestros brokers:
    • kafka1.edgetcdn.io
    • kafka2.edgetcdn.io
    • kafka3.edgetcdn.io
  • El puerto a utilizar, que será el 9093

Contenido del zip:

  • Certificado público de cliente c<ID>.crt.pem
  • Certificado privado c<ID>.key.pem
  • Keystore en formato PKCS12 c<ID>.keystore.p12
  • La contraseña utilizada para cifrar el keystore y el truststore: password.txt
  • El certificado público de nuestra CA transparentcdnCA.pem
  • Truststore con nuestra CA (necesario en algunos consumers): truststore.p12
  • Plantilla para Filebeat: filebeat.yml
  • Plantilla para Logstash: kafka-logstash.conf
  • Consumer sencillo en Python: consumer.py

Otros datos:

  • El Topic al que suscribirse que será c<ID>
  • El prefijo de Consumer Group al que unir tus consumers. Por ejemplo, si tu <ID> (identificador de cliente) es el 83, te suscribirás al topic c83, y podrás unir tus consumers a cualquier "Consumer Group" que comience por c83_ como por ejemplo c83_group1, c83_test, c83_pre ... Puedes consultar más información sobre los consumer groups aquí.

Nosotros necesitaremos:

  • La(s) dirección(es) IP(s) desde donde conectarán tus consumers. Que las puedes añadir desde el propio panel (al añadirlas, se necesita un margen de 5 minutos hasta que estén activas en nuestro firewall):

Consumiendo los logs

En la actualidad existen multitud de destinos para tus logs, te podría interesar el ingestarlos en un elasticsearch para hacer análitica de datos, o tal vez subirlos a un servicio de terceros como Datadog o Amazon S3, las opciones son casi infinitas y va a depender mucho de tus necesidades de negocio.
Es por esto que, siendo fieles a nuestra filosofia de hacer las cosas lo más simples posible, te vamos a proponer que uses dos herramientas ampliamente utilizadas en la comunidad como son Filebeat o/y Logstash para consumir tus logs de nuestro sistema de Logs en Streaming.

Filebeat vs Logstash

Es muy común, sobre todo para gente que no está familiarizada con este tipo de tecnologías el confundir cuando usar Logstash, cuando usar Filebeat o cuando usarlos juntos que también se puede. Aquí vamos a intentar explicarlo de una manera un poco somera pero lo suficientemente sencilla como para poder tomar una decisión al respecto.
Logstash es un programa escrito en java parte del stack ELK (ElasticSearch - Logstash - Kibana) desarrollado y mantenido por la compañía ElasticSearch.
Filebeat sin embargo está escrito en Go por la misma compañía y surgio como respuesta a una necesidad incipiente de la comunidad de tener una herramienta ligera para transportar logs ya que logstash consume bastante más que Filebeat al estar escrito en java.
Filebeat como digo es un software muy liviano que te permite transportar logs de un sitio a otro, lo mismo que Logstash (este segundo no tan liviano), sin embargo, Logstash es mucho más versatil y potente de Filebeat y te permite consumir logs (Inputs) de un número mayor de sitios y enviarlos también a un número mayor de salidas (Ouputs).
Aquí os dejo los enlaces a los Input y Output de Logstash y Filebeat
Por tanto usar Logstash o Filebeat para sacar los logs de nuestro sistema de Logs en Streaming va a depender de tus necesidades, principalmente de el destino final de los mismos, logs por segundo y si quieres hacer algún tipo de transformación con los mismos.
Resumiendo, nuestra recomendación es que uses Filebeat siempre que puedas ya que es más ligero y fácil de configurar. Si necesitas alguna salida que no está en Filebeat o hacer alguna transformación usa Logstash.
Recuerda que siempre tendrás una tercera opción que también es válida y contemplamos en esta documentación y es que escribas tu propio consumer usando tu lenguaje de programación favorito.

Consumir logs mediante Filebeat

Veamos ahora un despliegue sencillo de Filebeat en un servidor Debian como primera toma de contacto en que vamos a dejar el log en un fichero de texto.
La documentación oficial se puede encontrar en: https://www.elastic.co/es/beats/filebeat
Usaremos estos datos de ejemplo pero recuerda que el zip que descargaste tras activar el servicio ya contiene una plantilla con todos los datos necesarios llamada filebeat.yml
  • Certificados c83.crt.pem y c83.key.pem
  • Contraseña: password
  • Topic: c83
  • Consumer group: c83_filebeat
Primero descargamos e instalamos el paquete de Filebeat en nuestro servidor:
1
curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.9.0-amd64.deb
2
sudo dpkg -i filebeat-7.9.0-amd64.deb
Copied!
Habilitamos el módulo de Kafka:
1
filebeat modules enable kafka
2
filebeat setup -e
Copied!
Editamos la configuración de Filebeat /etc/filebeat/filebeat.yml y pegamos los datos que tenemos en la plantilla filebeat.yml, necesitarás editar los siguientes parámetros si copias los certificados en ubicaciones distintas o si modificas la ruta donde se volcarán los ficheros:
  • ssl.certificate: Ubicación de c<ID>.crt.pem
  • ssl.key: Ubicación de c<ID>.key.pem
  • ssl.certificate_authorities: Ubicación de transparentcdnCA.pem
  • path: Ruta final donde se depositarán los logs que consuma Filebeat.
1
filebeat.inputs:
2
- type: kafka
3
hosts:
4
- kafka1.edgetcdn.io:9093
5
- kafka2.edgetcdn.io:9093
6
- kafka2.edgetcdn.io:9093
7
topics: ["c83"]
8
group_id: "c83_filebeat"
9
initial_offset: "newest"
10
ssl.enabled: yes
11
ssl.certificate: "/etc/filebeat/secret/c83.crt.pem"
12
ssl.key: "/etc/filebeat/secret/c83.key.pem"
13
ssl.key_passphrase: "password"
14
ssl.certificate_authorities:
15
- /etc/filebeat/secret/transparentcdnCA.pem
16
17
output.file:
18
codec.format:
19
string: '%{[message]}'
20
path: "/tmp/filebeat"
21
filename: kafka-filebeat.out
22
rotate_every_kb: 50000
Copied!
En el servidor donde configuremos Filebeat, copiamos la clave pública y privada del certificado así como la CA de Transparent Edge Services a las rutas que hayamos definido finalmente en la configuración.
También deberás crear la carpeta definida en el path: en el caso de que no exista.
Una vez configurado todo, tan sólo tendrás que iniciar el servicio de Filebeat, normalmente mediante systemd con systemctl start filebeat, y si todo ha ido bien, verás como se empiezan a consumir los logs en la ruta que hayas definido en path:
1
[email protected]:/tmp/filebeat# ls -lrt
2
total 4
3
-rw------- 1 root root 49M ago 27 08:43 kafka-filebeat.out.7
4
-rw------- 1 root root 49M ago 27 08:43 kafka-filebeat.out.6
5
-rw------- 1 root root 49M ago 27 08:43 kafka-filebeat.out.5
6
-rw------- 1 root root 49M ago 27 08:44 kafka-filebeat.out.4
7
-rw------- 1 root root 49M ago 27 08:44 kafka-filebeat.out.3
8
-rw------- 1 root root 49M ago 27 08:45 kafka-filebeat.out.2
9
-rw------- 1 root root 49M ago 27 08:51 kafka-filebeat.out.1
10
-rw------- 1 root root 4,6M ago 27 08:52 kafka-filebeat.out
Copied!

Consumir logs mediante Logstash

Ahora veamos cómo consumir nuestros logs mediante Logstash.
Nota: Usaremos el Keystore y el Truststore en lugar del par de claves privada y pública, tendrás que copiarlos al servidor donde ejecutes Logstash. Además, el servicio de Systemd por defecto utiliza el usuario Logstash, por lo que éste deberá tener permisos de lectura sobre ellos. Para el ejemplo los dejaremos en /etc/logstash/certs.
Instalaremos Logstash desde la paquetería oficial, ejecuta los siguientes comandos para agregar el repositorio e instalar Logstash. O bien sigue la guía oficial en https://www.elastic.co/guide/en/logstash/current/installing-logstash.html
1
wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo apt-key add -
2
apt install apt-transport-https
3
4
echo "deb https://artifacts.elastic.co/packages/7.x/apt stable main" | sudo tee -a /etc/apt/sources.list.d/elastic-7.x.list
5
apt update
6
apt install logstash
Copied!
Ahora configuraremos un pipeline, que consumirá los logs desde nuestros servidores de Kafka y los volcará en un fichero en formato JSON, categorizando cada campo del log.
Recuerda que Logstash ofrece múltiples entradas/salidas a distintos sistemas y te permite personalizar y mutar los logs, más info en: (Input plugins, Output plugins).
Para ello crea un fichero nuevo en /etc/logstash/conf.d/kafka-logstash.conf con el contenido que has recibido en la plantilla kafka-logstash.conf en el zip que descargaste desde el panel, necesitarás editar los siguientes parámetros si copias los certificados en ubicaciones distintas o si modificas la ruta donde se volcarán los ficheros:
  • ssl_keystore_location: Ubicación de c<ID>.keystore.p12
  • ssl_truststore_location: Ubicación de truststore.p12
  • path => : Ruta al fichero donde se volcarán los logs.
1
input {
2
kafka {
3
bootstrap_servers => "kafka1.edgetcdn.io:9093,kafka2.edgetcdn.io:9093,kafka2.edgetcdn.io:9093"
4
topics => "c83"
5
group_id => "c83_logstash"
6
auto_offset_reset => "latest"
7
security_protocol => "SSL"
8
ssl_keystore_location => "/etc/logstash/certs/c83.keystore.p12"
9
ssl_keystore_password => "password"
10
ssl_truststore_location => "/etc/logstash/certs/truststore.p12"
11
ssl_truststore_password => "password"
12
}
13
}
14
15
filter {
16
grok {
17
match => {
18
"message" => [
19
"%{DATA:clientip} - %{DATA:user} \[(.*)\] \"%{WORD:verb} %{DATA:request} %{DATA:httpversion}\" %{NUMBER:statuscode} %{DATA:bytes} \"%{DATA:useragent}\" %{DATA:hitmiss} \"%{DATA:content-type}\" \"%{DATA:layer}\" %{NUMBER:requesttime} \"%{DATA:clientid}\" \"%{DATA:referer}\" %{DATA:forwardedproto} %{DATA:country}(.*)",
20
"%{DATA:clientip} - %{DATA:user} \[(.*)\] %{DATA:vod_host} \"%{WORD:verb} %{DATA:request} %{DATA:httpversion}\" %{NUMBER:statuscode} %{DATA:bytes} \"%{DATA:referer}\" \"%{DATA:useragent}\" \"%{DATA:content-type}\" \"%{DATA:hitmiss}\" \"%{DATA:layer}\" \"%{DATA:clientid}\" %{NUMBER:requesttime} %{DATA:forwardedproto} %{DATA:country}(.*)",
21
"%{DATA:clientip} - %{DATA:user} \[(.*)\] %{DATA:vod_host} \"%{WORD:verb} %{DATA:request} %{DATA:httpversion}\" %{NUMBER:statuscode} %{DATA:bytes} \"%{DATA:referer}\" \"%{DATA:useragent}\" \"%{DATA:content-type}\" \"%{DATA:hitmiss}\" \"%{DATA:layer}\" \"%{DATA:clientid}\" %{NUMBER:requesttime} %{DATA:forwardedproto}(.*)"
22
]
23
}
24
}
25
date {
26
match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
27
}
28
mutate { remove_field => [ "message" ] }
29
}
30
31
output {
32
file {
33
path => "/var/log/tcdn_streaming/logstash.out"
34
codec => json_lines
35
}
36
}
Copied!
Ya que el servicio se ejecutará con el usuario logstash, nos aseguramos de tener los permisos correctos, tanto para los certificados, como la configuración, como el directorio final donde escribirá logstash:
1
chown logstash:logstash -R /etc/logstash/
2
mkdir /var/log/tcdn_streaming
3
chown logstash:logstash /var/log/tcdn_streaming
Copied!
Ahora sólo resta iniciar Logstash, podemos hacerlo por línea de comandos para verificar que todo funciona con:
1
sudo -u logstash /usr/share/logstash/bin/logstash "--path.settings" "/etc/logstash"
Copied!
Recuerda verificar que todos los archivos necesarios son accesibles por el usuario "logstash", incluyendo los certificados, y el fichero + carpeta de destino.
Al cabo de unos segundos, comenzarás a recibir los logs en el fichero o salida que hayas indicado en formato JSON, te dejamos un ejemplo:
1
[email protected]:/var/log/tcdn_streaming# tail -1 logstash.out | jq
2
{
3
"requesttime": "0.000193",
4
"hitmiss": "hit",
5
"clientid": "83",
6
"verb": "GET",
7
"content-type": "application/javascript",
8
"httpversion": "HTTP/1.1",
9
"bytes": "621",
10
"request": "http://www.ejemplo.com/build/js/Core/Core.3160595ce5a674e1205b409c3e53616c4b44a9b3.js",
11
"referer": "https://referer.ejemplo.com/",
12
"user": "-",
13
"forwardedproto": "https",
14
"clientip": "11.11.222.111",
15
"@version": "1",
16
"statuscode": "200",
17
"layer": "L1",
18
"useragent": "Mozilla/5.0 (iPhone; CPU iPhone OS 13_7 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.1.2 Mobile/15E148 Safari/604.1",
19
"@timestamp": "2020-09-14T12:44:19.475Z"
20
}
Copied!
Si todo ha ido bien, puedes cancelar el comando anterior y dejar el servicio ejecutándose con:
1
systemctl start logstash.service
Copied!

Consumir logs mediante un script de Python a medida

Se puede programar un consumer en muchos lenguajes distintos, vamos a ver un ejemplo con Python, utilizando el cliente o librería de "confluent-kafka-python" https://github.com/confluentinc/confluent-kafka-python
Otra opción también muy popular es https://github.com/dpkp/kafka-python
Vamos con el ejemplo (nuevamente, usaremos Debian como sistema operativo):
Instalamos los paquetes necesarios:
1
sudo apt install python3-pip
2
pip3 install confluent-kafka --user
Copied!
Creamos el fichero python con el contenido que recibiste en la plantilla consumer.py, abajo se muestran unos datos de ejemplo, la parte importante estaría en el apartado de configuración, necesitarás editar los siguientes parámetros si copias los certificados en ubicaciones distintas:
  • ssl.ca.location: Ubicación de transparentcdnCA.pem
  • ssl.keystore.location: Ubicación de c<ID>.keystore.p12
1
#!/usr/bin/python3
2
import sys
3
import logging
4
import socket
5
from confluent_kafka import Consumer
6
7
LOG_FMT = '[%(asctime)s][%(levelname)s] %(message)s'
8
logging.basicConfig(
9
stream=sys.stdout,
10
format=LOG_FMT,
11
datefmt='%Y.%m.%d %H:%M:%S',
12
level=logging.INFO)
13
14
def assigned(consum, parti):
15
logging.info("Assigned consumer: %s on partition %s", repr(consum), repr(parti))
16
17
def revoked(consum, parti):
18
logging.error("Failed to assign consumer: %s on partition %s", repr(consum), repr(parti))
19
20
if __name__ == "__main__":
21
# CONFIGURACION
22
SERVERS = 'kafka1.edgetcdn.io:9093,kafka2.edgetcdn.io:9093,kafka3.edgetcdn.io:9093'
23
TOPIC = 'c83'
24
CONF = {'bootstrap.servers': SERVERS,
25
'client.id': socket.gethostname(),
26
'security.protocol': 'SSL',
27
'ssl.ca.location': '/usr/local/share/ca-certificates/transparentcdnCA.pem',
28
'ssl.keystore.location': '/root/secret/c83.keystore.p12',
29
'ssl.keystore.password': 'password',
30
'group.id': TOPIC + '_python'}
31
# FIN CONFIGURACION
32
33
consumer = Consumer(CONF)
34
consumer.subscribe([TOPIC], on_assign=assigned, on_revoke=revoked)
35
count = 0
36
try:
37
while True:
38
msg = consumer.poll(1.0)
39
if msg is None:
40
#logging.info("Waiting for message or event/error in poll()")
41
continue
42
43
if msg.error():
44
logging.error(msg.error())
45
else:
46
value = msg.value()
47
offset = msg.offset()
48
topic = msg.topic()
49
logging.info("Mensaje recibido numero {} - Value: \"{}\" - Topic: {} - Offset: {}".format(
50
str(count),
51
value.decode('utf-8'),
52
str(topic),
53
str(offset)))
54
55
count += 1
56
57
except KeyboardInterrupt:
58
pass
59
finally:
60
consumer.close()
61
Copied!
Si todo es correcto, al iniciar el consumer recibirás el siguiente mensaje y empezarás a consumir del topic:
1
[email protected]:~# ./consumer.py
2
[2020.08.21 09:57:21][INFO] Assigned consumer: <cimpl.Consumer object at 0x7fada8213f28> on partition [TopicPartition{topic=c83,partition=0,offset=-1001,error=None}, TopicPartition{topic=c83,partition=1,offset=-1001,error=None}]
Copied!

¿Qué son los Consumer Groups?

Tradicionalmente los Brókers de mensajería (Message Brokers), actuaban de dos formas:
  • Queue: El mensaje se publica una vez, se consume una vez.
  • Pub/Sub: El mensaje se publica una vez, se consume múltiples veces.
Kafka puede funcionar de las dos formas gracias a los Consumer Groups:
  • Si queremos actuar como una cola (Queue), ponemos todos los consumers en el mismo consumer group.
  • Si queremos actuar como un Pub/Sub, cada consumer va en un grupo distinto.

Ejemplos

Actualmente, creamos los topics con 2 particiones por defecto (se pueden aumentar si se solicita), vamos a ver unos ejemplos con un topic de 2 particiones:
  • Iniciamos 3 consumers, los 3 en el mismo consumer group: Uno de ellos, consume de la partición 0, otro de la 1, y el último queda totalmente parado. Conseguimos procesamiento en paralelo y alta disponibilidad. (La alta disponibilidad también la logramos con 2 consumers, si uno falla, el otro consumirá de las dos particiones). Los mensajes estarán repartidos entre el consumer 1 y el consumer 2.
  • Iniciamos 2 consumers, cada uno en distinto consumer group: Los 2 van a recibir TODOS los mensajes del topic y estarán totalmente aislados. Es útil si queremos realizar un procesamiento distinto de los mensajes recibidos para cada uno de ellos. A este esquema podemos añadir más consumers, el resultado será el mismo, cada uno de ellos recibirá todos los mensajes de todas las particiones.
Dado que trabajamos con logs y a no ser que se requieran varios post-procesos distintos, lo más interesante es tener los consumers en el mismo consumer group, y es más que probable que solamente un consumer sea suficiente dado el rendimiento que ofrece Kafka. Se pueden iniciar mas consumers si uno de ellos no puede consumir en tiempo real, o si queremos procesamiento en paralelo + alta disponibilidad.

Consumir los logs del WAF

Si tienes activo el servicio WAF, también puedes consumir los logs del audit en tiempo real. Estos logs, a diferencia del servicio de delivery se encuentran en formato JSON.
Se puede utilizar, por ejemplo, el consumer en Python comentado anteriormente, la única diferencia será el topic al que nos suscribiremos, que en este caso tendrá este formato: c<ID>_waf.
Por ejemplo, si tu compañia tiene el <ID> (identificador de cliente) 83, deberás suscribirte al topic c83_waf.

Formato de los logs del Audit

El formato que contiene el objeto JSON viene definido por Modsecurity, que es uno de los componenetes que utilizamos en el servicio de WAF.
Dicho JSON contiene todos los datos relevantes de la request: el código http, las cabeceras de respuesta, las cabeceras de la petición, la URL, el método, la IP del cliente ... básicamente toda la información. Y además, contiene un campo con todos los detalles relacionados con el ataque detectado.
Se hace referencia a los campos separándolos por un punto, ya que es la notación que se utiliza en jq, un procesador JSON. Por ejemplo, si hacemos referencia al campo .transaction.messages[0].message en jq, lo mismo en Python sería:["transaction"]["messages"][0]
Una sóla request, puede hacer que salten una o varias reglas del WAF, es por ello que el campo .transaction.messages, es un array. Vamos a verlo con un ejemplo
Aquí accedemos al campo "messages" utilizando jq, pero perfectamente lo podríamos hacer en Python u otros lenguajes. Revisamos el campo "message" del primer ataque detectado para cada requests (messages[0]).
1
$ tail -3 audit.log | jq '.transaction.messages[0].message'
2
"Host header is a numeric IP address"
3
"Possible Remote File Inclusion (RFI) Attack: URL Payload Used w/Trailing Question Mark Character (?)"
4
"XSS Attack Detected via libinjection"
Copied!
Por supuesto, no todas las requests coinciden con más de una regla, si intentamos acceder al segundo componente de "messages" (messages[1]), vemos que en muchos aparece "null", es la forma que tiene jq de decirnos que no encontró nada en ese índice del array.
1
$ tail -3 audit.log | jq '.transaction.messages[1].message'
2
null
3
"NoScript XSS InjectionChecker: HTML Injection"
4
null
Copied!
El RuleID, que es bastante importante a la hora de añadir excepciones, también se encuentra dentro de "messages", en general, contiene éstos campos:
1
{
2
"message": "Mensaje descriptivo del ataque.",
3
"details": {
4
"match": "Mensaje técnico de porqué este ataque ha hecho saltar la regla.",
5
"ruleId": "931120",
6
"file": "ruta al fichero que contiene la regla 931120",
7
"lineNumber": "78",
8
"data": "Mensaje técnico de los datos concretos en la requests que hicieron saltar la regla.",
9
"severity": "2",
10
"ver": "OWASP_CRS/3.3.2",
11
"rev": "",
12
"tags": [
13
"application-multi",
14
"language-multi",
15
"platform-multi",
16
"attack-rfi",
17
"paranoia-level/1",
18
"OWASP_CRS",
19
"capec/1000/152/175/253"
20
],
21
"maturity": "0",
22
"accuracy": "0"
23
}
24
}
Copied!
Última actualización 1mo ago