Skip to main content

Security logging and monitoring

Logs are the black box of your infrastructure. When something goes wrong — a breach, an outage, unexplained behavior — logs are how you understand what happened. Without proper logging, you're investigating incidents blind.

Security monitoring takes logging further: instead of waiting for someone to notice a problem and check logs, you're actively watching for suspicious patterns and alerting in real-time. A brute-force attack generating thousands of failed logins. An admin account suddenly accessing data it never touched before. A server making connections to known malicious IPs.

This chapter covers how to build a logging and monitoring infrastructure that catches security incidents as they happen, not weeks later when the damage is done.

Why this matters for small companies

Small companies often skip proper logging. "We'll check the logs if something breaks." That approach fails for security:

You can't investigate what you didn't log. When a breach is discovered, the first question is "what happened?" Without logs, you can't answer that. You don't know what data was accessed, how attackers got in, or if they're still there.

Attackers delete their traces. If logs only exist on the compromised server, attackers delete them. Centralized logging to a separate system preserves evidence even when servers are compromised.

Detection requires visibility. Most breaches go undetected for months. IBM's 2024 Cost of a Data Breach Report found the average time to identify a breach is 194 days. Companies with security monitoring detect breaches faster and lose less money.

Compliance often requires it. SOC 2, ISO 27001, HIPAA, PCI-DSS all require security logging and monitoring. Even if you don't need compliance now, you might later.

Free tools are mature. ELK Stack, Grafana Loki, Graylog — these aren't toys. They handle enterprise-scale logging. You don't need a $100K SIEM to have proper logging.

What to log

Not all logs are equal for security. Here's what matters:

Authentication and access

EventWhy it mattersPriority
Login success/failureBrute force detection, unauthorized accessCritical
Password changesAccount takeover indicatorCritical
MFA eventsBypass attempts, enrollment changesCritical
Session creation/destructionSession hijacking, anomalous access patternsHigh
Permission changesPrivilege escalation, insider threatsCritical
API key usageCompromised credentialsHigh
SSH/RDP connectionsLateral movement, initial accessCritical

Application security

EventWhy it mattersPriority
Input validation failuresAttack attempts (SQLi, XSS)High
Authorization denialsBroken access control attemptsHigh
File operationsData exfiltration, ransomwareMedium
Payment/transaction eventsFraud detectionCritical
Error ratesMay indicate attacksMedium
API rate limit hitsAbuse, scraping, DDoSMedium

Infrastructure

EventWhy it mattersPriority
Firewall allow/denyNetwork reconnaissance, attacksHigh
DNS queriesC2 communication, data exfiltrationMedium
Process executionMalware, cryptominingHigh
Service start/stopTampering, persistenceMedium
File integrity changesRootkits, backdoorsHigh
Cloud API callsResource abuse, misconfigurationHigh

What NOT to log

Some data should never appear in logs:

  • Passwords (even failed ones — log "authentication failed" not "password 'abc123' was wrong")
  • Full credit card numbers (log last 4 digits only)
  • Social security numbers, government IDs
  • Health information (PHI)
  • Session tokens, API keys (log that they were used, not their values)
  • Personal data beyond what's necessary

Logging sensitive data creates a new breach risk: your log storage becomes a valuable target.

Log formats and standards

Consistent log formats make parsing and alerting easier.

Structured logging

Use structured formats (JSON) instead of plain text:

# BAD: Unstructured log
logger.info(f"User {user_id} logged in from {ip_address}")

# Output: 2024-01-15 10:23:45 INFO User 12345 logged in from 192.168.1.1
# Hard to parse, inconsistent format
# GOOD: Structured log
import structlog

logger = structlog.get_logger()
logger.info(
"user_login",
user_id=user_id,
ip_address=ip_address,
user_agent=request.headers.get("User-Agent"),
success=True
)

# Output (JSON):
# {"event": "user_login", "user_id": 12345, "ip_address": "192.168.1.1",
# "user_agent": "Mozilla/5.0...", "success": true, "timestamp": "2024-01-15T10:23:45Z"}

Structured logs allow:

  • Consistent parsing across all applications
  • Easy filtering (e.g., "show all failed logins from this IP")
  • Automatic field extraction in SIEM tools
  • Better compression and storage efficiency

Common log fields

Every log entry should include:

{
"timestamp": "2024-01-15T10:23:45.123Z",
"level": "info",
"event": "user_login",
"service": "auth-api",
"environment": "production",
"host": "auth-api-1.internal",
"trace_id": "abc123def456",
"user_id": "12345",
"ip_address": "192.168.1.1",
"user_agent": "Mozilla/5.0...",
"success": true,
"duration_ms": 45
}
FieldPurpose
timestampWhen it happened (ISO 8601, UTC)
levelSeverity (debug, info, warn, error, critical)
eventWhat happened (use consistent event names)
serviceWhich application/service
environmentprod, staging, dev
hostWhich server/container
trace_idCorrelate related events
user_idWho triggered the event
ip_addressWhere request came from

Log levels

Use log levels consistently:

LevelWhen to useExample
DEBUGDetailed diagnostic info, dev onlyVariable values, function calls
INFONormal operationsUser logged in, request completed
WARNSomething unexpected but handledRate limit approaching, retry succeeded
ERRORSomething failedDatabase connection failed, API error
CRITICALSystem-wide failureService down, data corruption

Security events should typically be INFO (normal security events) or WARN/ERROR (suspicious/failed attempts).

Centralized logging architecture

Logs scattered across dozens of servers are useless during an incident. You need centralized collection.

Basic architecture

┌─────────────────────────────────────────────────────────────────────┐
│ Log Sources │
├─────────────────────────────────────────────────────────────────────┤
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Web App │ │ API │ │ Database│ │ Firewall│ │ Cloud │ │
│ │ Logs │ │ Logs │ │ Logs │ │ Logs │ │ Logs │ │
│ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ │
└───────┼────────────┼────────────┼────────────┼────────────┼─────────┘
│ │ │ │ │
└────────────┴────────────┴─────┬──────┴────────────┘


┌─────────────────────────┐
│ Log Collector │
│ (Fluentd, Filebeat, │
│ Vector, Fluent Bit) │
└───────────┬─────────────┘


┌─────────────────────────┐
│ Log Aggregator │
│ (Elasticsearch, Loki, │
│ CloudWatch, Graylog) │
└───────────┬─────────────┘

┌───────────┴───────────┐
│ │
▼ ▼
┌───────────────────┐ ┌───────────────────┐
│ Visualization │ │ Alerting │
│ (Kibana, Grafana)│ │ (AlertManager, │
│ │ │ PagerDuty) │
└───────────────────┘ └───────────────────┘

Components explained

Log Sources: Everything that generates logs — applications, servers, network devices, cloud services.

Log Collectors: Agents that run on each server, collect logs, and forward them. They handle:

  • Tailing log files
  • Parsing and enrichment
  • Buffering (if network is down)
  • Compression

Log Aggregator: Central storage that receives, indexes, and stores logs. Provides:

  • Search capabilities
  • Long-term retention
  • Aggregation and analysis

Visualization: Dashboards for exploring logs, building queries, investigating incidents.

Alerting: Rules that trigger notifications when suspicious patterns appear.

Free logging stacks

Option 1: ELK Stack (Elasticsearch + Logstash + Kibana)

The classic open-source logging stack. Data flows in one direction:

Servers (emit logs) → Filebeat (collect & ship) → Logstash (parse & process) → Elasticsearch (store & index) → Kibana (visualize & search)

Docker Compose setup:

version: '3.8'
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.12.0
environment:
- discovery.type=single-node
- xpack.security.enabled=true
- ELASTIC_PASSWORD=changeme
- "ES_JAVA_OPTS=-Xms2g -Xmx2g"
volumes:
- elasticsearch-data:/usr/share/elasticsearch/data
ports:
- "9200:9200"
healthcheck:
test: curl -s http://localhost:9200 >/dev/null || exit 1
interval: 30s
timeout: 10s
retries: 5

kibana:
image: docker.elastic.co/kibana/kibana:8.12.0
environment:
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
- ELASTICSEARCH_USERNAME=kibana_system
- ELASTICSEARCH_PASSWORD=changeme
ports:
- "5601:5601"
depends_on:
elasticsearch:
condition: service_healthy

logstash:
image: docker.elastic.co/logstash/logstash:8.12.0
volumes:
- ./logstash/pipeline:/usr/share/logstash/pipeline
ports:
- "5044:5044" # Beats input
- "5000:5000" # TCP input
depends_on:
elasticsearch:
condition: service_healthy

volumes:
elasticsearch-data:

Logstash pipeline configuration:

# logstash/pipeline/main.conf
input {
beats {
port => 5044
}
tcp {
port => 5000
codec => json_lines
}
}

filter {
# Parse JSON logs
if [message] =~ /^\{/ {
json {
source => "message"
}
}

# Parse common log formats
if [type] == "nginx" {
grok {
match => { "message" => '%{COMBINEDAPACHELOG}' }
}
}

# GeoIP enrichment for IP addresses
if [ip_address] {
geoip {
source => "ip_address"
target => "geoip"
}
}

# Add security-relevant tags
if [event] == "login_failed" {
mutate {
add_tag => ["security", "authentication"]
}
}
}

output {
elasticsearch {
hosts => ["elasticsearch:9200"]
user => "elastic"
password => "changeme"
index => "logs-%{+YYYY.MM.dd}"
}
}

Filebeat configuration (on each server):

# /etc/filebeat/filebeat.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/nginx/access.log
- /var/log/nginx/error.log
fields:
type: nginx

- type: log
enabled: true
paths:
- /var/log/auth.log
fields:
type: auth

- type: log
enabled: true
paths:
- /app/logs/*.json
json.keys_under_root: true
json.add_error_key: true
fields:
type: application

output.logstash:
hosts: ["logstash.internal:5044"]

# Or direct to Elasticsearch
# output.elasticsearch:
# hosts: ["elasticsearch.internal:9200"]
# username: "elastic"
# password: "changeme"

Pros:

  • Extremely powerful search and analytics
  • Huge ecosystem and community
  • Scales to petabytes
  • Free and open source (basic license)

Cons:

  • Resource-intensive (needs significant RAM for Elasticsearch)
  • Complex to operate at scale
  • Some features require paid license

Option 2: Grafana Loki + Promtail

A newer, more lightweight alternative. Same linear flow, fewer moving parts:

Servers (emit logs) → Promtail (collect & ship) → Loki (store) → Grafana (visualize)

Docker Compose setup:

version: '3.8'
services:
loki:
image: grafana/loki:2.9.3
ports:
- "3100:3100"
volumes:
- ./loki-config.yaml:/etc/loki/local-config.yaml
- loki-data:/loki
command: -config.file=/etc/loki/local-config.yaml

grafana:
image: grafana/grafana:10.3.1
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=changeme
volumes:
- grafana-data:/var/lib/grafana
depends_on:
- loki

promtail:
image: grafana/promtail:2.9.3
volumes:
- ./promtail-config.yaml:/etc/promtail/config.yml
- /var/log:/var/log:ro
command: -config.file=/etc/promtail/config.yml

volumes:
loki-data:
grafana-data:

Loki configuration:

# loki-config.yaml
auth_enabled: false

server:
http_listen_port: 3100

common:
path_prefix: /loki
storage:
filesystem:
chunks_directory: /loki/chunks
rules_directory: /loki/rules
replication_factor: 1
ring:
kvstore:
store: inmemory

schema_config:
configs:
- from: 2020-10-24
store: boltdb-shipper
object_store: filesystem
schema: v11
index:
prefix: index_
period: 24h

ruler:
alertmanager_url: http://alertmanager:9093

Promtail configuration:

# promtail-config.yaml
server:
http_listen_port: 9080
grpc_listen_port: 0

positions:
filename: /tmp/positions.yaml

clients:
- url: http://loki:3100/loki/api/v1/push

scrape_configs:
- job_name: system
static_configs:
- targets:
- localhost
labels:
job: varlogs
__path__: /var/log/*log

- job_name: nginx
static_configs:
- targets:
- localhost
labels:
job: nginx
__path__: /var/log/nginx/*.log
pipeline_stages:
- regex:
expression: '^(?P<remote_addr>[\d\.]+) - (?P<remote_user>\S+) \[(?P<time_local>.+)\] "(?P<method>\S+) (?P<request>\S+) (?P<protocol>\S+)" (?P<status>\d+)'
- labels:
method:
status:

- job_name: application
static_configs:
- targets:
- localhost
labels:
job: app
__path__: /app/logs/*.json
pipeline_stages:
- json:
expressions:
level: level
event: event
user_id: user_id
- labels:
level:
event:

Pros:

  • Much lighter on resources than Elasticsearch
  • Native Grafana integration
  • Label-based indexing is very efficient
  • Easy to set up and operate

Cons:

  • Less powerful full-text search than Elasticsearch
  • Smaller ecosystem
  • Newer, less battle-tested

Option 3: Graylog

A complete log management solution with built-in SIEM features:

version: '3.8'
services:
mongodb:
image: mongo:6
volumes:
- mongo-data:/data/db

opensearch:
image: opensearchproject/opensearch:2.11.0
environment:
- discovery.type=single-node
- plugins.security.disabled=true
- "OPENSEARCH_JAVA_OPTS=-Xms1g -Xmx1g"
volumes:
- opensearch-data:/usr/share/opensearch/data

graylog:
image: graylog/graylog:5.2
environment:
- GRAYLOG_PASSWORD_SECRET=somesecretpasswordpepper
- GRAYLOG_ROOT_PASSWORD_SHA2=8c6976e5b5410415bde908bd4dee15dfb167a9c873fc4bb8a81f6f2ab448a918
- GRAYLOG_HTTP_EXTERNAL_URI=http://localhost:9000/
- GRAYLOG_MONGODB_URI=mongodb://mongodb:27017/graylog
- GRAYLOG_ELASTICSEARCH_HOSTS=http://opensearch:9200
ports:
- "9000:9000" # Web interface
- "1514:1514" # Syslog TCP
- "1514:1514/udp" # Syslog UDP
- "12201:12201" # GELF TCP
- "12201:12201/udp" # GELF UDP
depends_on:
- mongodb
- opensearch

volumes:
mongo-data:
opensearch-data:

Pros:

  • Built-in alerting and SIEM-like features
  • User-friendly interface
  • Good for security use cases out of the box
  • Supports GELF (structured logging format)

Cons:

  • More complex architecture (requires MongoDB + OpenSearch)
  • Less flexible than raw ELK
  • Some advanced features require enterprise license

Cloud-native options

If you're running in the cloud, native logging services are often the easiest choice:

AWS CloudWatch Logs:

# Install CloudWatch agent
sudo yum install amazon-cloudwatch-agent

# Configure (wizard available)
sudo /opt/aws/amazon-cloudwatch-agent/bin/amazon-cloudwatch-agent-config-wizard

Configuration file:

{
"logs": {
"logs_collected": {
"files": {
"collect_list": [
{
"file_path": "/var/log/nginx/access.log",
"log_group_name": "nginx-access",
"log_stream_name": "{instance_id}"
},
{
"file_path": "/app/logs/*.json",
"log_group_name": "application",
"log_stream_name": "{instance_id}",
"timestamp_format": "%Y-%m-%dT%H:%M:%S"
}
]
}
}
}
}

GCP Cloud Logging:

Automatic for GCE instances. For applications, use the library:

from google.cloud import logging

client = logging.Client()
logger = client.logger("my-application")

logger.log_struct({
"event": "user_login",
"user_id": "12345",
"success": True
}, severity="INFO")

Azure Monitor Logs:

Use Log Analytics workspace and Azure Monitor agent.

Comparison table

FeatureELK StackGrafana LokiGraylogCloudWatchGCP Logging
CostFree (OSS)Free (OSS)Free (OSS)Pay per GBPay per GB
Setup complexityHighMediumMediumLowLow
Resource usageHighLowMediumN/AN/A
Full-text searchExcellentGoodExcellentGoodGood
AlertingRequires setupBuilt-inBuilt-inBuilt-inBuilt-in
RetentionUnlimitedUnlimitedUnlimitedConfigurableConfigurable
ScalabilityExcellentExcellentGoodExcellentExcellent
Best forLarge scale, complex queriesKubernetes, Grafana usersSecurity focusAWS environmentsGCP environments

Setting up alerts

Logs are only useful if you're watching them. Alerting turns passive logs into active monitoring.

Alert categories

Critical — immediate response required:

  • Multiple failed logins followed by success (credential stuffing succeeded)
  • Admin user created by non-admin
  • Firewall disabled
  • Suspicious process execution (cryptominer signatures)
  • Connection to known malicious IP
  • Production database accessed from unknown IP

High — investigate within hours:

  • Brute force attacks in progress
  • Unusual API usage patterns
  • After-hours admin access
  • Large data exports
  • Permission escalation events

Medium — review daily:

  • Failed login threshold exceeded
  • Error rate spikes
  • Unusual geographic access
  • Certificate expiration warnings

Low — weekly review:

  • Access from new devices
  • Policy violation attempts
  • Minor configuration changes

Alert rules examples

ELK/Kibana alerting:

{
"trigger": {
"schedule": {
"interval": "1m"
}
},
"input": {
"search": {
"request": {
"indices": ["logs-*"],
"body": {
"query": {
"bool": {
"must": [
{ "match": { "event": "login_failed" } },
{ "range": { "@timestamp": { "gte": "now-5m" } } }
]
}
},
"aggs": {
"by_ip": {
"terms": { "field": "ip_address", "size": 10 }
}
}
}
}
}
},
"condition": {
"script": {
"source": "return ctx.payload.aggregations.by_ip.buckets.stream().anyMatch(b -> b.doc_count > 10)"
}
},
"actions": {
"notify_slack": {
"slack": {
"message": {
"to": ["#security-alerts"],
"text": "Brute force attack detected: {{ctx.payload.aggregations.by_ip.buckets}}"
}
}
}
}
}

Grafana Loki alerting (via Grafana):

# grafana/provisioning/alerting/rules.yaml
apiVersion: 1
groups:
- orgId: 1
name: security-alerts
folder: Security
interval: 1m
rules:
- uid: brute-force-detection
title: Brute Force Attack Detected
condition: C
data:
- refId: A
datasourceUid: loki
model:
expr: 'sum(rate({job="app"} |= "login_failed" [5m])) by (ip_address)'
- refId: B
datasourceUid: __expr__
model:
expression: A
type: reduce
reducer: last
- refId: C
datasourceUid: __expr__
model:
expression: B > 10
type: threshold
for: 5m
annotations:
summary: "Brute force attack from {{ $labels.ip_address }}"
labels:
severity: critical

AWS CloudWatch Alarms:

# Create metric filter for failed logins
aws logs put-metric-filter \
--log-group-name application \
--filter-name failed-logins \
--filter-pattern '{ $.event = "login_failed" }' \
--metric-transformations \
metricName=FailedLogins,metricNamespace=Security,metricValue=1

# Create alarm
aws cloudwatch put-metric-alarm \
--alarm-name BruteForceDetection \
--metric-name FailedLogins \
--namespace Security \
--statistic Sum \
--period 300 \
--threshold 50 \
--comparison-operator GreaterThanThreshold \
--evaluation-periods 1 \
--alarm-actions arn:aws:sns:us-east-1:123456789:security-alerts

Security detection rules

Common patterns to alert on:

Authentication attacks

# Failed login spike (brute force)
- name: brute_force_attack
query: |
event:login_failed | stats count() by ip_address | where count > 20
window: 5m
severity: high

# Successful login after many failures (compromise)
- name: credential_stuffing_success
query: |
(event:login_failed | stats count() as failures by user_id, ip_address)
| join (event:login_success by user_id, ip_address)
| where failures > 5
window: 15m
severity: critical

# Login from new country
- name: impossible_travel
query: |
event:login_success
| geoip(ip_address)
| stats distinct_count(country) as countries by user_id
| where countries > 1
window: 1h
severity: high

Privilege escalation

# Admin user created
- name: admin_user_created
query: |
event:user_created AND role:admin
severity: high

# Privilege escalation
- name: privilege_escalation
query: |
event:role_changed AND new_role:(admin OR superuser)
severity: high

# Service account used interactively
- name: service_account_interactive
query: |
event:login_success
AND user_type:service_account
AND session_type:interactive
severity: critical

Data exfiltration

# Large data export
- name: large_data_export
query: |
event:data_export
| stats sum(record_count) as total by user_id
| where total > 10000
window: 1h
severity: high

# Database query spike
- name: unusual_database_queries
query: |
source:database
| stats count() as queries by user_id
| where queries > baseline_queries * 3
window: 1h
severity: medium

# After hours access to sensitive data
- name: after_hours_sensitive_access
query: |
event:data_access
AND data_classification:confidential
AND NOT between(hour, 8, 18)
severity: high

Infrastructure attacks

# SSH from unusual source
- name: ssh_from_internet
query: |
event:ssh_session_start
AND NOT source_ip:("10.*" OR "172.16.*" OR "192.168.*")
severity: high

# Cryptominer signatures
- name: cryptominer_detection
query: |
(process_name:*miner* OR cmdline:*stratum* OR cmdline:*xmrig*)
severity: critical

# Firewall rule change
- name: firewall_modified
query: |
event:(security_group_modified OR firewall_rule_changed)
severity: high

# Root/admin shell spawned
- name: root_shell_spawned
query: |
event:process_start AND user:root AND process:(bash OR sh OR zsh)
severity: medium

Reducing alert fatigue

Too many alerts = ignored alerts. Here's how to keep alerts meaningful:

Set appropriate thresholds:

# BAD: Alerts on every failed login
- condition: event == "login_failed"

# GOOD: Alerts on suspicious patterns
- condition: count(event == "login_failed") > 10 in 5m grouped by ip_address

Add context:

# BAD: Just says "suspicious activity"
message: "Suspicious activity detected"

# GOOD: Actionable information
message: |
Brute force attack detected
Source IP: {{ ip_address }}
Target users: {{ affected_users }}
Failed attempts: {{ count }}
Timeframe: last 5 minutes
Action: Consider blocking IP in WAF

Use severity levels properly:

  • CRITICAL: Someone needs to wake up
  • HIGH: Investigate within hours
  • MEDIUM: Check during business hours
  • LOW: Weekly review

Tune based on feedback:

  • Track false positive rate
  • If > 50% false positives, adjust threshold
  • If 0% alerts, you might be missing things

Alert routing

Send alerts to the right place:

# alertmanager.yml
route:
receiver: default
routes:
# Critical security alerts → PagerDuty
- match:
severity: critical
category: security
receiver: pagerduty-security

# High security alerts → Slack security channel
- match:
severity: high
category: security
receiver: slack-security

# Infrastructure alerts → Slack ops channel
- match:
category: infrastructure
receiver: slack-ops

receivers:
- name: pagerduty-security
pagerduty_configs:
- service_key: YOUR_PAGERDUTY_KEY
severity: critical

- name: slack-security
slack_configs:
- api_url: https://hooks.slack.com/services/XXX
channel: '#security-alerts'

- name: slack-ops
slack_configs:
- api_url: https://hooks.slack.com/services/XXX
channel: '#ops-alerts'

SIEM concepts

Security Information and Event Management (SIEM) combines log management with security analytics. While full SIEM platforms are expensive, you can build SIEM-like capabilities with open-source tools.

What SIEM adds

CapabilityLog ManagementSIEM
Log collection
Search and visualization
Correlation across sourcesLimited
Threat intelligence integrationManualAutomatic
Behavioral analysisNo
Compliance reportingManualBuilt-in
Case managementNo
Automated responseLimited

Building SIEM-like capabilities

1. Log correlation:

Combine logs from multiple sources to detect attacks:

# Detect: SSH brute force followed by successful login, then suspicious command
correlation_rule:
name: ssh_compromise_chain
events:
- event_type: ssh_failed_login
count: ">10"
group_by: source_ip, target_host
window: 5m
save_as: brute_force

- event_type: ssh_successful_login
source_ip: $brute_force.source_ip
target_host: $brute_force.target_host
within: 10m
after: brute_force
save_as: compromise

- event_type: command_execution
host: $compromise.target_host
command: ("wget*" OR "curl*" OR "nc*")
within: 30m
after: compromise

alert:
severity: critical
message: "Likely SSH compromise: brute force from {source_ip} → successful login → suspicious commands"

2. Threat intelligence integration:

Cross-reference IPs and domains with known bad actors:

# Example: Check IPs against threat intel feed
import requests

def check_ip_reputation(ip_address):
# Check against AbuseIPDB
response = requests.get(
"https://api.abuseipdb.com/api/v2/check",
headers={"Key": ABUSEIPDB_API_KEY},
params={"ipAddress": ip_address}
)
data = response.json()["data"]

if data["abuseConfidenceScore"] > 80:
return {
"malicious": True,
"score": data["abuseConfidenceScore"],
"reports": data["totalReports"],
"categories": data["usageType"]
}
return {"malicious": False}

3. User and Entity Behavior Analytics (UEBA):

Detect anomalies based on normal behavior:

# Baseline: User typically logs in 9am-6pm from US
# Alert: Same user logging in at 3am from Russia

def detect_anomalous_login(user_id, login_event):
# Get user's normal login patterns
baseline = get_user_baseline(user_id)

# Check for anomalies
anomalies = []

# Time anomaly
if not baseline.normal_hours.contains(login_event.hour):
anomalies.append({
"type": "unusual_time",
"expected": baseline.normal_hours,
"actual": login_event.hour
})

# Location anomaly
if login_event.country not in baseline.normal_countries:
anomalies.append({
"type": "new_location",
"expected": baseline.normal_countries,
"actual": login_event.country
})

# Impossible travel
if baseline.last_login:
distance = calculate_distance(
baseline.last_login.location,
login_event.location
)
time_diff = login_event.time - baseline.last_login.time
if distance / time_diff.hours > 500: # 500 km/h impossible
anomalies.append({
"type": "impossible_travel",
"distance_km": distance,
"time_hours": time_diff.hours
})

return anomalies

Free SIEM alternatives

ToolTypeBest forLink
WazuhFull SIEMComprehensive security monitoringwazuh.com
Security OnionNetwork security monitoringNetwork-focused securitysecurityonion.net
OSSECHost-based IDSFile integrity, rootkit detectionossec.net
SuricataNetwork IDSNetwork threat detectionsuricata.io
TheHiveIncident responseCase management, playbooksthehive-project.org

Wazuh setup (Docker):

# docker-compose.yml for Wazuh
version: '3.8'
services:
wazuh.manager:
image: wazuh/wazuh-manager:4.7.2
hostname: wazuh.manager
ports:
- "1514:1514"
- "1515:1515"
- "514:514/udp"
- "55000:55000"
environment:
- INDEXER_URL=https://wazuh.indexer:9200
- FILEBEAT_SSL_VERIFICATION_MODE=none
volumes:
- wazuh_api_configuration:/var/ossec/api/configuration
- wazuh_etc:/var/ossec/etc
- wazuh_logs:/var/ossec/logs

wazuh.indexer:
image: wazuh/wazuh-indexer:4.7.2
hostname: wazuh.indexer
environment:
- "OPENSEARCH_JAVA_OPTS=-Xms1g -Xmx1g"
volumes:
- wazuh-indexer-data:/var/lib/wazuh-indexer

wazuh.dashboard:
image: wazuh/wazuh-dashboard:4.7.2
hostname: wazuh.dashboard
ports:
- "443:5601"
environment:
- INDEXER_USERNAME=admin
- INDEXER_PASSWORD=SecretPassword
- WAZUH_API_URL=https://wazuh.manager
depends_on:
- wazuh.indexer
- wazuh.manager

volumes:
wazuh_api_configuration:
wazuh_etc:
wazuh_logs:
wazuh-indexer-data:

Wazuh provides:

  • File integrity monitoring
  • Rootkit detection
  • Log analysis
  • Compliance dashboards (PCI-DSS, GDPR, HIPAA)
  • Active response (automatic blocking)
  • Vulnerability detection

Log retention and compliance

Retention requirements

StandardMinimum retentionNotes
PCI-DSS1 year3 months immediately available
HIPAA6 yearsMay vary by state
SOC 21 yearDepends on trust criteria
GDPRVaries"No longer than necessary"
ISO 270013 years recommendedAs long as needed for security
Internal best practice90 days hot, 1 year coldBalance cost and usefulness

Storage tiers

TierRetentionQuery speedCostIndexingExample
Hot7–30 daysFastHighFullElasticsearch on SSD
Warm30–90 daysSlowerModerateReducedElasticsearch on HDD, S3 + Athena
Cold90+ daysArchive onlyLowNoneS3 Glacier, compressed files

Elasticsearch Index Lifecycle Management:

PUT _ilm/policy/security-logs-policy
{
"policy": {
"phases": {
"hot": {
"min_age": "0ms",
"actions": {
"rollover": {
"max_primary_shard_size": "50gb",
"max_age": "7d"
}
}
},
"warm": {
"min_age": "7d",
"actions": {
"shrink": { "number_of_shards": 1 },
"forcemerge": { "max_num_segments": 1 },
"allocate": { "require": { "data": "warm" } }
}
},
"cold": {
"min_age": "30d",
"actions": {
"allocate": { "require": { "data": "cold" } }
}
},
"delete": {
"min_age": "365d",
"actions": { "delete": {} }
}
}
}
}

Log integrity

For compliance and forensics, logs must be tamper-proof:

Log signing:

import hashlib
import hmac

def sign_log_entry(entry, secret_key):
"""Sign a log entry to detect tampering"""
entry_bytes = json.dumps(entry, sort_keys=True).encode()
signature = hmac.new(
secret_key.encode(),
entry_bytes,
hashlib.sha256
).hexdigest()
entry["_signature"] = signature
return entry

def verify_log_entry(entry, secret_key):
"""Verify a log entry hasn't been tampered with"""
signature = entry.pop("_signature", None)
if not signature:
return False
entry_bytes = json.dumps(entry, sort_keys=True).encode()
expected = hmac.new(
secret_key.encode(),
entry_bytes,
hashlib.sha256
).hexdigest()
return hmac.compare_digest(signature, expected)

AWS CloudWatch Log Integrity:

# Enable log file validation for CloudTrail
aws cloudtrail update-trail \
--name main-trail \
--enable-log-file-validation

# Validate log file integrity
aws cloudtrail validate-logs \
--trail-arn arn:aws:cloudtrail:us-east-1:123456789:trail/main-trail \
--start-time 2024-01-01T00:00:00Z

Incident investigation with logs

When an incident occurs, logs are your primary investigation tool.

Investigation workflow

┌──────────────────┐
│ Alert Received │
└────────┬─────────┘


┌──────────────────┐
│ Initial Triage │ What triggered? When? What systems?
└────────┬─────────┘


┌──────────────────┐
│ Scope Assessment│ How widespread? What data? What access?
└────────┬─────────┘


┌──────────────────┐
│ Timeline Build │ What happened first? What came after?
└────────┬─────────┘


┌──────────────────┐
│ Root Cause │ How did they get in? What was the vulnerability?
└────────┬─────────┘


┌──────────────────┐
│ Impact Analysis │ What was accessed? What was changed?
└────────┬─────────┘


┌──────────────────┐
│ Containment │ Stop the bleeding, preserve evidence
└────────┬─────────┘


┌──────────────────┐
│ Documentation │ Timeline, findings, lessons learned
└──────────────────┘

Investigation queries

Find initial access:

# Kibana Query Language (KQL)
# All authentication events for compromised user around incident time
user_id: "compromised_user" AND event: (login* OR auth*)
AND @timestamp >= "2024-01-15T00:00:00" AND @timestamp <= "2024-01-15T23:59:59"
| sort @timestamp

# Loki LogQL
{job="app"} | json | user_id="compromised_user" | line_format "{{.timestamp}} {{.event}} {{.ip_address}}"

Trace attacker movement:

# All events from attacker's IP
ip_address: "1.2.3.4" | sort @timestamp

# All events from compromised session
session_id: "abc123" | sort @timestamp

# Commands executed on compromised host
host: "compromised-server" AND event: "command_execution" | sort @timestamp

Identify data access:

# Data accessed by compromised account
user_id: "compromised_user" AND event: (data_access OR data_export OR file_download)

# Unusual queries against database
source: database AND query_type: SELECT AND tables: (users OR payments OR credentials)

Build timeline:

def build_incident_timeline(start_time, end_time, indicators):
"""
Build timeline from logs using IOCs
indicators = {
"ip_addresses": ["1.2.3.4"],
"user_ids": ["compromised_user"],
"session_ids": ["abc123"],
"hosts": ["compromised-server"]
}
"""
timeline = []

# Query each indicator type
for ip in indicators.get("ip_addresses", []):
events = query_logs(f"ip_address:{ip}", start_time, end_time)
timeline.extend(events)

for user_id in indicators.get("user_ids", []):
events = query_logs(f"user_id:{user_id}", start_time, end_time)
timeline.extend(events)

# Deduplicate and sort
timeline = sorted(set(timeline), key=lambda e: e.timestamp)

return timeline

Investigation checklist

## Incident Investigation Checklist

### Initial Response
- [ ] Document alert details (time, type, affected systems)
- [ ] Assign incident owner
- [ ] Open incident ticket
- [ ] Notify relevant stakeholders

### Scoping
- [ ] Identify all affected user accounts
- [ ] Identify all affected systems/hosts
- [ ] Identify all IP addresses involved
- [ ] Determine time range of activity

### Evidence Collection
- [ ] Export relevant logs (preserve originals)
- [ ] Capture network traffic if ongoing
- [ ] Take memory dumps if needed
- [ ] Screenshot any relevant dashboards

### Analysis
- [ ] Build timeline of events
- [ ] Identify initial access vector
- [ ] Map lateral movement
- [ ] Identify data accessed/exfiltrated
- [ ] Identify persistence mechanisms

### Containment
- [ ] Block malicious IPs
- [ ] Disable compromised accounts
- [ ] Isolate compromised systems
- [ ] Revoke compromised credentials

### Documentation
- [ ] Write incident summary
- [ ] Create detailed timeline
- [ ] Document root cause
- [ ] List remediation actions
- [ ] Schedule post-mortem

Common mistakes to avoid

Logging only errors. Normal operations matter for security. You need successful logins to detect compromised accounts, successful API calls to detect data exfiltration. Log INFO-level events.

No timestamp or wrong timezone. Logs without timestamps are useless for investigation. Use ISO 8601 format with UTC timezone everywhere.

Logging sensitive data. Passwords, tokens, credit cards in logs create a new vulnerability. Mask or exclude sensitive fields.

No log rotation. Disks fill up, applications crash. Set up rotation and retention from day one.

Alerts without runbooks. An alert fires at 3am. What should the on-call person do? Document response procedures for each alert type.

Too many alerts. Alert fatigue kills security. If you're ignoring alerts, you're not monitoring. Tune thresholds and suppress noise.

Logs only on application servers. Security events happen everywhere — firewalls, databases, cloud APIs, load balancers. Collect them all.

No testing. How do you know your alerts work? Generate test events, verify they trigger, check they route correctly.

Real-world incidents

Target breach (2013): Attackers were in the network for weeks. Security tools generated alerts about the malware, but they were ignored due to alert fatigue. 40 million credit cards stolen. Source: U.S. Senate Committee

Equifax breach (2017): Attack began in May, discovered in July. Poor logging meant investigators couldn't determine full extent of breach. 147 million records exposed. The investigation took months because log data was incomplete. Source: GAO Report

SolarWinds (2020): Attackers were in networks for 9+ months before discovery. Companies with better logging and anomaly detection could identify unusual behavior from the compromised software updates. Source: CISA

Colonial Pipeline (2021): Attackers accessed the network using a compromised VPN password. Proper monitoring of VPN authentication patterns could have detected the unauthorized access earlier. Source: Bloomberg

Workshop: setting up logging and monitoring

This workshop guides you through implementing a complete logging and monitoring solution.

Part 1: Centralized log collection

Option A: Grafana Loki (recommended for small teams)

  1. Create docker-compose.yml:
version: '3.8'
services:
loki:
image: grafana/loki:2.9.3
ports:
- "3100:3100"
command: -config.file=/etc/loki/local-config.yaml
volumes:
- loki-data:/loki

grafana:
image: grafana/grafana:10.3.1
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
- GF_AUTH_ANONYMOUS_ENABLED=true
volumes:
- grafana-data:/var/lib/grafana
- ./grafana/provisioning:/etc/grafana/provisioning

promtail:
image: grafana/promtail:2.9.3
volumes:
- ./promtail-config.yaml:/etc/promtail/config.yml
- /var/log:/var/log:ro
- /var/run/docker.sock:/var/run/docker.sock
command: -config.file=/etc/promtail/config.yml

volumes:
loki-data:
grafana-data:
  1. Create promtail-config.yaml:
server:
http_listen_port: 9080

positions:
filename: /tmp/positions.yaml

clients:
- url: http://loki:3100/loki/api/v1/push

scrape_configs:
- job_name: system
static_configs:
- targets:
- localhost
labels:
job: varlogs
host: ${HOSTNAME}
__path__: /var/log/*.log

- job_name: docker
docker_sd_configs:
- host: unix:///var/run/docker.sock
refresh_interval: 5s
relabel_configs:
- source_labels: ['__meta_docker_container_name']
target_label: container
  1. Start the stack:
docker compose up -d
  1. Access Grafana at http://localhost:3000

  2. Add Loki as data source:

    • Go to Connections → Data Sources → Add data source
    • Select Loki
    • URL: http://loki:3100
    • Save & Test

Option B: AWS CloudWatch (for AWS environments)

  1. Install CloudWatch agent:
wget https://s3.amazonaws.com/amazoncloudwatch-agent/ubuntu/amd64/latest/amazon-cloudwatch-agent.deb
sudo dpkg -i amazon-cloudwatch-agent.deb
  1. Create configuration:
{
"logs": {
"logs_collected": {
"files": {
"collect_list": [
{
"file_path": "/var/log/auth.log",
"log_group_name": "security-logs",
"log_stream_name": "{instance_id}/auth"
},
{
"file_path": "/var/log/nginx/access.log",
"log_group_name": "application-logs",
"log_stream_name": "{instance_id}/nginx-access"
},
{
"file_path": "/app/logs/*.json",
"log_group_name": "application-logs",
"log_stream_name": "{instance_id}/app"
}
]
}
}
}
}
  1. Start agent:
sudo /opt/aws/amazon-cloudwatch-agent/bin/amazon-cloudwatch-agent-ctl \
-a fetch-config \
-m ec2 \
-s \
-c file:/opt/aws/amazon-cloudwatch-agent/etc/config.json

Part 2: Application logging

  1. Add structured logging to your application:

Python example:

import structlog
import logging

# Configure structlog
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer()
],
wrapper_class=structlog.stdlib.BoundLogger,
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
)

logger = structlog.get_logger()

# Log security events
def login(user_id, ip_address, success):
logger.info(
"user_login",
user_id=user_id,
ip_address=ip_address,
success=success,
event_type="authentication"
)

Node.js example:

const pino = require('pino');

const logger = pino({
level: 'info',
formatters: {
level: (label) => ({ level: label }),
},
timestamp: pino.stdTimeFunctions.isoTime,
});

// Log security events
function login(userId, ipAddress, success) {
logger.info({
event: 'user_login',
user_id: userId,
ip_address: ipAddress,
success: success,
event_type: 'authentication'
});
}
  1. Test logging:
# Generate some log entries
curl -X POST http://localhost:8000/api/login \
-H "Content-Type: application/json" \
-d '{"username": "test", "password": "wrong"}'

# Check logs appear in Grafana/CloudWatch

Part 3: Security alerts

  1. Create alert for failed logins (Grafana):

    • Go to Alerting → Alert rules → New alert rule
    • Query: count_over_time({job="app"} |= "login" |= "success\":false" [5m])
    • Condition: Is above 10
    • Folder: Security
    • Evaluation: Every 1m for 5m
    • Add notification channel (Slack, email, etc.)
  2. Create CloudWatch alarm:

# Create metric filter
aws logs put-metric-filter \
--log-group-name application-logs \
--filter-name failed-logins \
--filter-pattern '{ $.event = "user_login" && $.success = false }' \
--metric-transformations \
metricName=FailedLogins,metricNamespace=Security,metricValue=1

# Create alarm
aws cloudwatch put-metric-alarm \
--alarm-name HighFailedLogins \
--metric-name FailedLogins \
--namespace Security \
--statistic Sum \
--period 300 \
--threshold 20 \
--comparison-operator GreaterThanThreshold \
--evaluation-periods 1 \
--alarm-actions arn:aws:sns:us-east-1:123456789:security-alerts
  1. Test alerts:
# Generate failed logins
for i in {1..25}; do
curl -X POST http://localhost:8000/api/login \
-H "Content-Type: application/json" \
-d '{"username": "test", "password": "wrong"}'
sleep 1
done

# Verify alert fired

Part 4: Security dashboard

Create a Grafana dashboard with these panels:

  1. Authentication overview:

    • Total logins (success/failure)
    • Login failure rate over time
    • Top source IPs for failures
  2. Application security:

    • Error rates
    • API response times
    • Rate limit hits
  3. Infrastructure:

    • SSH connections
    • Sudo commands
    • Service status changes

Example dashboard JSON:

{
"title": "Security Overview",
"panels": [
{
"title": "Failed Logins (24h)",
"type": "stat",
"targets": [
{
"expr": "count_over_time({job=\"app\"} |= \"login\" |= \"success\\\":false\" [24h])"
}
]
},
{
"title": "Login Failures by IP",
"type": "table",
"targets": [
{
"expr": "sum by (ip_address) (count_over_time({job=\"app\"} |= \"login\" |= \"success\\\":false\" [24h]))"
}
]
},
{
"title": "Authentication Events",
"type": "timeseries",
"targets": [
{
"expr": "sum(count_over_time({job=\"app\"} |= \"login\" [5m]))",
"legendFormat": "Total"
},
{
"expr": "sum(count_over_time({job=\"app\"} |= \"login\" |= \"success\\\":false\" [5m]))",
"legendFormat": "Failed"
}
]
}
]
}

Artifacts to produce

After this workshop, you should have:

  1. Centralized logging stack — Docker Compose or CloudWatch configuration
  2. Application logging integration — Structured logging in your codebase
  3. Security alerts — At least 3 alerts configured:
    • Failed login threshold
    • After-hours access
    • Error rate spike
  4. Security dashboard — Grafana/CloudWatch dashboard with key metrics
  5. Alert runbook — Documentation for responding to each alert type
  6. Log retention policy — Document how long logs are kept and why

Self-check questions

  1. Why is centralized logging important for security?
  2. What should you never log?
  3. What's the difference between a log management system and a SIEM?
  4. How do you prevent alert fatigue?
  5. What log retention period does PCI-DSS require?
  6. How would you investigate a suspected account compromise using logs?
  7. What's structured logging and why is it better?
  8. How do you ensure log integrity for forensic purposes?
  9. Name three security events that should trigger immediate alerts.
  10. What's the difference between hot, warm, and cold log storage?

How to explain this to leadership

Start with the risk: "Right now, if someone breaches our system, we won't know for months. The average company takes 194 days to detect a breach. With proper monitoring, we can detect issues in minutes."

Use a real example: "Last month, [competitor/news story] had a breach. They couldn't figure out what data was taken because their logs were incomplete. We're setting up logging so we can always answer that question."

Quantify the value: "Faster detection means less damage. According to IBM, breaches detected within 200 days cost $1 million less than those taking longer. Our monitoring will alert us immediately."

Address compliance: "For SOC 2 / ISO 27001 / [relevant standard], we need centralized logging with proper retention. This project gets us compliant."

Show the dashboard: "Here's what we're monitoring now. We can see every login, every access to sensitive data, every unusual pattern. Before, we were blind."

Compare costs: "We're using open-source tools that cost $0 in software licenses. The main cost is my time to set it up — about 2 weeks. A SIEM product would cost $50-100K/year."

Log management

Cloud logging

SIEM and security monitoring

Best practices

Alerting and incident response

What's next

This completes the security in development section. You can now write more secure code, manage secrets properly, harden your CI/CD pipeline, lock down containers and cloud infrastructure, and detect suspicious activity when it happens.

Next section: security culture — the harder work of getting everyone else on the team to care about security too.