Security logging and monitoring
Logs are the black box of your infrastructure. When something goes wrong — a breach, an outage, unexplained behavior — logs are how you understand what happened. Without proper logging, you're investigating incidents blind.
Security monitoring takes logging further: instead of waiting for someone to notice a problem and check logs, you're actively watching for suspicious patterns and alerting in real-time. A brute-force attack generating thousands of failed logins. An admin account suddenly accessing data it never touched before. A server making connections to known malicious IPs.
This chapter covers how to build a logging and monitoring infrastructure that catches security incidents as they happen, not weeks later when the damage is done.
Why this matters for small companies
Small companies often skip proper logging. "We'll check the logs if something breaks." That approach fails for security:
You can't investigate what you didn't log. When a breach is discovered, the first question is "what happened?" Without logs, you can't answer that. You don't know what data was accessed, how attackers got in, or if they're still there.
Attackers delete their traces. If logs only exist on the compromised server, attackers delete them. Centralized logging to a separate system preserves evidence even when servers are compromised.
Detection requires visibility. Most breaches go undetected for months. IBM's 2024 Cost of a Data Breach Report found the average time to identify a breach is 194 days. Companies with security monitoring detect breaches faster and lose less money.
Compliance often requires it. SOC 2, ISO 27001, HIPAA, PCI-DSS all require security logging and monitoring. Even if you don't need compliance now, you might later.
Free tools are mature. ELK Stack, Grafana Loki, Graylog — these aren't toys. They handle enterprise-scale logging. You don't need a $100K SIEM to have proper logging.
What to log
Not all logs are equal for security. Here's what matters:
Authentication and access
| Event | Why it matters | Priority |
|---|---|---|
| Login success/failure | Brute force detection, unauthorized access | Critical |
| Password changes | Account takeover indicator | Critical |
| MFA events | Bypass attempts, enrollment changes | Critical |
| Session creation/destruction | Session hijacking, anomalous access patterns | High |
| Permission changes | Privilege escalation, insider threats | Critical |
| API key usage | Compromised credentials | High |
| SSH/RDP connections | Lateral movement, initial access | Critical |
Application security
| Event | Why it matters | Priority |
|---|---|---|
| Input validation failures | Attack attempts (SQLi, XSS) | High |
| Authorization denials | Broken access control attempts | High |
| File operations | Data exfiltration, ransomware | Medium |
| Payment/transaction events | Fraud detection | Critical |
| Error rates | May indicate attacks | Medium |
| API rate limit hits | Abuse, scraping, DDoS | Medium |
Infrastructure
| Event | Why it matters | Priority |
|---|---|---|
| Firewall allow/deny | Network reconnaissance, attacks | High |
| DNS queries | C2 communication, data exfiltration | Medium |
| Process execution | Malware, cryptomining | High |
| Service start/stop | Tampering, persistence | Medium |
| File integrity changes | Rootkits, backdoors | High |
| Cloud API calls | Resource abuse, misconfiguration | High |
What NOT to log
Some data should never appear in logs:
- Passwords (even failed ones — log "authentication failed" not "password 'abc123' was wrong")
- Full credit card numbers (log last 4 digits only)
- Social security numbers, government IDs
- Health information (PHI)
- Session tokens, API keys (log that they were used, not their values)
- Personal data beyond what's necessary
Logging sensitive data creates a new breach risk: your log storage becomes a valuable target.
Log formats and standards
Consistent log formats make parsing and alerting easier.
Structured logging
Use structured formats (JSON) instead of plain text:
# BAD: Unstructured log
logger.info(f"User {user_id} logged in from {ip_address}")
# Output: 2024-01-15 10:23:45 INFO User 12345 logged in from 192.168.1.1
# Hard to parse, inconsistent format
# GOOD: Structured log
import structlog
logger = structlog.get_logger()
logger.info(
"user_login",
user_id=user_id,
ip_address=ip_address,
user_agent=request.headers.get("User-Agent"),
success=True
)
# Output (JSON):
# {"event": "user_login", "user_id": 12345, "ip_address": "192.168.1.1",
# "user_agent": "Mozilla/5.0...", "success": true, "timestamp": "2024-01-15T10:23:45Z"}
Structured logs allow:
- Consistent parsing across all applications
- Easy filtering (e.g., "show all failed logins from this IP")
- Automatic field extraction in SIEM tools
- Better compression and storage efficiency
Common log fields
Every log entry should include:
{
"timestamp": "2024-01-15T10:23:45.123Z",
"level": "info",
"event": "user_login",
"service": "auth-api",
"environment": "production",
"host": "auth-api-1.internal",
"trace_id": "abc123def456",
"user_id": "12345",
"ip_address": "192.168.1.1",
"user_agent": "Mozilla/5.0...",
"success": true,
"duration_ms": 45
}
| Field | Purpose |
|---|---|
timestamp | When it happened (ISO 8601, UTC) |
level | Severity (debug, info, warn, error, critical) |
event | What happened (use consistent event names) |
service | Which application/service |
environment | prod, staging, dev |
host | Which server/container |
trace_id | Correlate related events |
user_id | Who triggered the event |
ip_address | Where request came from |
Log levels
Use log levels consistently:
| Level | When to use | Example |
|---|---|---|
| DEBUG | Detailed diagnostic info, dev only | Variable values, function calls |
| INFO | Normal operations | User logged in, request completed |
| WARN | Something unexpected but handled | Rate limit approaching, retry succeeded |
| ERROR | Something failed | Database connection failed, API error |
| CRITICAL | System-wide failure | Service down, data corruption |
Security events should typically be INFO (normal security events) or WARN/ERROR (suspicious/failed attempts).
Centralized logging architecture
Logs scattered across dozens of servers are useless during an incident. You need centralized collection.
Basic architecture
┌─────────────────────────────────────────────────────────────────────┐
│ Log Sources │
├─────────────────────────────────────────────────────────────────────┤
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Web App │ │ API │ │ Database│ │ Firewall│ │ Cloud │ │
│ │ Logs │ │ Logs │ │ Logs │ │ Logs │ │ Logs │ │
│ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ │
└───────┼────────────┼────────────┼────────────┼────────────┼─────────┘
│ │ │ │ │
└────────────┴────────────┴─────┬──────┴────────────┘
│
▼
┌─────────────────────────┐
│ Log Collector │
│ (Fluentd, Filebeat, │
│ Vector, Fluent Bit) │
└───────────┬─────────────┘
│
▼
┌─────────────────────────┐
│ Log Aggregator │
│ (Elasticsearch, Loki, │
│ CloudWatch, Graylog) │
└───────────┬─────────────┘
│
┌───────────┴───────────┐
│ │
▼ ▼
┌───────────────────┐ ┌───────────────────┐
│ Visualization │ │ Alerting │
│ (Kibana, Grafana)│ │ (AlertManager, │
│ │ │ PagerDuty) │
└───────────────────┘ └───────────────────┘
Components explained
Log Sources: Everything that generates logs — applications, servers, network devices, cloud services.
Log Collectors: Agents that run on each server, collect logs, and forward them. They handle:
- Tailing log files
- Parsing and enrichment
- Buffering (if network is down)
- Compression
Log Aggregator: Central storage that receives, indexes, and stores logs. Provides:
- Search capabilities
- Long-term retention
- Aggregation and analysis
Visualization: Dashboards for exploring logs, building queries, investigating incidents.
Alerting: Rules that trigger notifications when suspicious patterns appear.
Free logging stacks
Option 1: ELK Stack (Elasticsearch + Logstash + Kibana)
The classic open-source logging stack. Data flows in one direction:
Servers (emit logs) → Filebeat (collect & ship) → Logstash (parse & process) → Elasticsearch (store & index) → Kibana (visualize & search)
Docker Compose setup:
version: '3.8'
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.12.0
environment:
- discovery.type=single-node
- xpack.security.enabled=true
- ELASTIC_PASSWORD=changeme
- "ES_JAVA_OPTS=-Xms2g -Xmx2g"
volumes:
- elasticsearch-data:/usr/share/elasticsearch/data
ports:
- "9200:9200"
healthcheck:
test: curl -s http://localhost:9200 >/dev/null || exit 1
interval: 30s
timeout: 10s
retries: 5
kibana:
image: docker.elastic.co/kibana/kibana:8.12.0
environment:
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
- ELASTICSEARCH_USERNAME=kibana_system
- ELASTICSEARCH_PASSWORD=changeme
ports:
- "5601:5601"
depends_on:
elasticsearch:
condition: service_healthy
logstash:
image: docker.elastic.co/logstash/logstash:8.12.0
volumes:
- ./logstash/pipeline:/usr/share/logstash/pipeline
ports:
- "5044:5044" # Beats input
- "5000:5000" # TCP input
depends_on:
elasticsearch:
condition: service_healthy
volumes:
elasticsearch-data:
Logstash pipeline configuration:
# logstash/pipeline/main.conf
input {
beats {
port => 5044
}
tcp {
port => 5000
codec => json_lines
}
}
filter {
# Parse JSON logs
if [message] =~ /^\{/ {
json {
source => "message"
}
}
# Parse common log formats
if [type] == "nginx" {
grok {
match => { "message" => '%{COMBINEDAPACHELOG}' }
}
}
# GeoIP enrichment for IP addresses
if [ip_address] {
geoip {
source => "ip_address"
target => "geoip"
}
}
# Add security-relevant tags
if [event] == "login_failed" {
mutate {
add_tag => ["security", "authentication"]
}
}
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
user => "elastic"
password => "changeme"
index => "logs-%{+YYYY.MM.dd}"
}
}
Filebeat configuration (on each server):
# /etc/filebeat/filebeat.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/nginx/access.log
- /var/log/nginx/error.log
fields:
type: nginx
- type: log
enabled: true
paths:
- /var/log/auth.log
fields:
type: auth
- type: log
enabled: true
paths:
- /app/logs/*.json
json.keys_under_root: true
json.add_error_key: true
fields:
type: application
output.logstash:
hosts: ["logstash.internal:5044"]
# Or direct to Elasticsearch
# output.elasticsearch:
# hosts: ["elasticsearch.internal:9200"]
# username: "elastic"
# password: "changeme"
Pros:
- Extremely powerful search and analytics
- Huge ecosystem and community
- Scales to petabytes
- Free and open source (basic license)
Cons:
- Resource-intensive (needs significant RAM for Elasticsearch)
- Complex to operate at scale
- Some features require paid license
Option 2: Grafana Loki + Promtail
A newer, more lightweight alternative. Same linear flow, fewer moving parts:
Servers (emit logs) → Promtail (collect & ship) → Loki (store) → Grafana (visualize)
Docker Compose setup:
version: '3.8'
services:
loki:
image: grafana/loki:2.9.3
ports:
- "3100:3100"
volumes:
- ./loki-config.yaml:/etc/loki/local-config.yaml
- loki-data:/loki
command: -config.file=/etc/loki/local-config.yaml
grafana:
image: grafana/grafana:10.3.1
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=changeme
volumes:
- grafana-data:/var/lib/grafana
depends_on:
- loki
promtail:
image: grafana/promtail:2.9.3
volumes:
- ./promtail-config.yaml:/etc/promtail/config.yml
- /var/log:/var/log:ro
command: -config.file=/etc/promtail/config.yml
volumes:
loki-data:
grafana-data:
Loki configuration:
# loki-config.yaml
auth_enabled: false
server:
http_listen_port: 3100
common:
path_prefix: /loki
storage:
filesystem:
chunks_directory: /loki/chunks
rules_directory: /loki/rules
replication_factor: 1
ring:
kvstore:
store: inmemory
schema_config:
configs:
- from: 2020-10-24
store: boltdb-shipper
object_store: filesystem
schema: v11
index:
prefix: index_
period: 24h
ruler:
alertmanager_url: http://alertmanager:9093
Promtail configuration:
# promtail-config.yaml
server:
http_listen_port: 9080
grpc_listen_port: 0
positions:
filename: /tmp/positions.yaml
clients:
- url: http://loki:3100/loki/api/v1/push
scrape_configs:
- job_name: system
static_configs:
- targets:
- localhost
labels:
job: varlogs
__path__: /var/log/*log
- job_name: nginx
static_configs:
- targets:
- localhost
labels:
job: nginx
__path__: /var/log/nginx/*.log
pipeline_stages:
- regex:
expression: '^(?P<remote_addr>[\d\.]+) - (?P<remote_user>\S+) \[(?P<time_local>.+)\] "(?P<method>\S+) (?P<request>\S+) (?P<protocol>\S+)" (?P<status>\d+)'
- labels:
method:
status:
- job_name: application
static_configs:
- targets:
- localhost
labels:
job: app
__path__: /app/logs/*.json
pipeline_stages:
- json:
expressions:
level: level
event: event
user_id: user_id
- labels:
level:
event:
Pros:
- Much lighter on resources than Elasticsearch
- Native Grafana integration
- Label-based indexing is very efficient
- Easy to set up and operate
Cons:
- Less powerful full-text search than Elasticsearch
- Smaller ecosystem
- Newer, less battle-tested
Option 3: Graylog
A complete log management solution with built-in SIEM features:
version: '3.8'
services:
mongodb:
image: mongo:6
volumes:
- mongo-data:/data/db
opensearch:
image: opensearchproject/opensearch:2.11.0
environment:
- discovery.type=single-node
- plugins.security.disabled=true
- "OPENSEARCH_JAVA_OPTS=-Xms1g -Xmx1g"
volumes:
- opensearch-data:/usr/share/opensearch/data
graylog:
image: graylog/graylog:5.2
environment:
- GRAYLOG_PASSWORD_SECRET=somesecretpasswordpepper
- GRAYLOG_ROOT_PASSWORD_SHA2=8c6976e5b5410415bde908bd4dee15dfb167a9c873fc4bb8a81f6f2ab448a918
- GRAYLOG_HTTP_EXTERNAL_URI=http://localhost:9000/
- GRAYLOG_MONGODB_URI=mongodb://mongodb:27017/graylog
- GRAYLOG_ELASTICSEARCH_HOSTS=http://opensearch:9200
ports:
- "9000:9000" # Web interface
- "1514:1514" # Syslog TCP
- "1514:1514/udp" # Syslog UDP
- "12201:12201" # GELF TCP
- "12201:12201/udp" # GELF UDP
depends_on:
- mongodb
- opensearch
volumes:
mongo-data:
opensearch-data:
Pros:
- Built-in alerting and SIEM-like features
- User-friendly interface
- Good for security use cases out of the box
- Supports GELF (structured logging format)
Cons:
- More complex architecture (requires MongoDB + OpenSearch)
- Less flexible than raw ELK
- Some advanced features require enterprise license
Cloud-native options
If you're running in the cloud, native logging services are often the easiest choice:
AWS CloudWatch Logs:
# Install CloudWatch agent
sudo yum install amazon-cloudwatch-agent
# Configure (wizard available)
sudo /opt/aws/amazon-cloudwatch-agent/bin/amazon-cloudwatch-agent-config-wizard
Configuration file:
{
"logs": {
"logs_collected": {
"files": {
"collect_list": [
{
"file_path": "/var/log/nginx/access.log",
"log_group_name": "nginx-access",
"log_stream_name": "{instance_id}"
},
{
"file_path": "/app/logs/*.json",
"log_group_name": "application",
"log_stream_name": "{instance_id}",
"timestamp_format": "%Y-%m-%dT%H:%M:%S"
}
]
}
}
}
}
GCP Cloud Logging:
Automatic for GCE instances. For applications, use the library:
from google.cloud import logging
client = logging.Client()
logger = client.logger("my-application")
logger.log_struct({
"event": "user_login",
"user_id": "12345",
"success": True
}, severity="INFO")
Azure Monitor Logs:
Use Log Analytics workspace and Azure Monitor agent.
Comparison table
| Feature | ELK Stack | Grafana Loki | Graylog | CloudWatch | GCP Logging |
|---|---|---|---|---|---|
| Cost | Free (OSS) | Free (OSS) | Free (OSS) | Pay per GB | Pay per GB |
| Setup complexity | High | Medium | Medium | Low | Low |
| Resource usage | High | Low | Medium | N/A | N/A |
| Full-text search | Excellent | Good | Excellent | Good | Good |
| Alerting | Requires setup | Built-in | Built-in | Built-in | Built-in |
| Retention | Unlimited | Unlimited | Unlimited | Configurable | Configurable |
| Scalability | Excellent | Excellent | Good | Excellent | Excellent |
| Best for | Large scale, complex queries | Kubernetes, Grafana users | Security focus | AWS environments | GCP environments |
Setting up alerts
Logs are only useful if you're watching them. Alerting turns passive logs into active monitoring.
Alert categories
Critical — immediate response required:
- Multiple failed logins followed by success (credential stuffing succeeded)
- Admin user created by non-admin
- Firewall disabled
- Suspicious process execution (cryptominer signatures)
- Connection to known malicious IP
- Production database accessed from unknown IP
High — investigate within hours:
- Brute force attacks in progress
- Unusual API usage patterns
- After-hours admin access
- Large data exports
- Permission escalation events
Medium — review daily:
- Failed login threshold exceeded
- Error rate spikes
- Unusual geographic access
- Certificate expiration warnings
Low — weekly review:
- Access from new devices
- Policy violation attempts
- Minor configuration changes
Alert rules examples
ELK/Kibana alerting:
{
"trigger": {
"schedule": {
"interval": "1m"
}
},
"input": {
"search": {
"request": {
"indices": ["logs-*"],
"body": {
"query": {
"bool": {
"must": [
{ "match": { "event": "login_failed" } },
{ "range": { "@timestamp": { "gte": "now-5m" } } }
]
}
},
"aggs": {
"by_ip": {
"terms": { "field": "ip_address", "size": 10 }
}
}
}
}
}
},
"condition": {
"script": {
"source": "return ctx.payload.aggregations.by_ip.buckets.stream().anyMatch(b -> b.doc_count > 10)"
}
},
"actions": {
"notify_slack": {
"slack": {
"message": {
"to": ["#security-alerts"],
"text": "Brute force attack detected: {{ctx.payload.aggregations.by_ip.buckets}}"
}
}
}
}
}
Grafana Loki alerting (via Grafana):
# grafana/provisioning/alerting/rules.yaml
apiVersion: 1
groups:
- orgId: 1
name: security-alerts
folder: Security
interval: 1m
rules:
- uid: brute-force-detection
title: Brute Force Attack Detected
condition: C
data:
- refId: A
datasourceUid: loki
model:
expr: 'sum(rate({job="app"} |= "login_failed" [5m])) by (ip_address)'
- refId: B
datasourceUid: __expr__
model:
expression: A
type: reduce
reducer: last
- refId: C
datasourceUid: __expr__
model:
expression: B > 10
type: threshold
for: 5m
annotations:
summary: "Brute force attack from {{ $labels.ip_address }}"
labels:
severity: critical
AWS CloudWatch Alarms:
# Create metric filter for failed logins
aws logs put-metric-filter \
--log-group-name application \
--filter-name failed-logins \
--filter-pattern '{ $.event = "login_failed" }' \
--metric-transformations \
metricName=FailedLogins,metricNamespace=Security,metricValue=1
# Create alarm
aws cloudwatch put-metric-alarm \
--alarm-name BruteForceDetection \
--metric-name FailedLogins \
--namespace Security \
--statistic Sum \
--period 300 \
--threshold 50 \
--comparison-operator GreaterThanThreshold \
--evaluation-periods 1 \
--alarm-actions arn:aws:sns:us-east-1:123456789:security-alerts
Security detection rules
Common patterns to alert on:
Authentication attacks
# Failed login spike (brute force)
- name: brute_force_attack
query: |
event:login_failed | stats count() by ip_address | where count > 20
window: 5m
severity: high
# Successful login after many failures (compromise)
- name: credential_stuffing_success
query: |
(event:login_failed | stats count() as failures by user_id, ip_address)
| join (event:login_success by user_id, ip_address)
| where failures > 5
window: 15m
severity: critical
# Login from new country
- name: impossible_travel
query: |
event:login_success
| geoip(ip_address)
| stats distinct_count(country) as countries by user_id
| where countries > 1
window: 1h
severity: high
Privilege escalation
# Admin user created
- name: admin_user_created
query: |
event:user_created AND role:admin
severity: high
# Privilege escalation
- name: privilege_escalation
query: |
event:role_changed AND new_role:(admin OR superuser)
severity: high
# Service account used interactively
- name: service_account_interactive
query: |
event:login_success
AND user_type:service_account
AND session_type:interactive
severity: critical
Data exfiltration
# Large data export
- name: large_data_export
query: |
event:data_export
| stats sum(record_count) as total by user_id
| where total > 10000
window: 1h
severity: high
# Database query spike
- name: unusual_database_queries
query: |
source:database
| stats count() as queries by user_id
| where queries > baseline_queries * 3
window: 1h
severity: medium
# After hours access to sensitive data
- name: after_hours_sensitive_access
query: |
event:data_access
AND data_classification:confidential
AND NOT between(hour, 8, 18)
severity: high
Infrastructure attacks
# SSH from unusual source
- name: ssh_from_internet
query: |
event:ssh_session_start
AND NOT source_ip:("10.*" OR "172.16.*" OR "192.168.*")
severity: high
# Cryptominer signatures
- name: cryptominer_detection
query: |
(process_name:*miner* OR cmdline:*stratum* OR cmdline:*xmrig*)
severity: critical
# Firewall rule change
- name: firewall_modified
query: |
event:(security_group_modified OR firewall_rule_changed)
severity: high
# Root/admin shell spawned
- name: root_shell_spawned
query: |
event:process_start AND user:root AND process:(bash OR sh OR zsh)
severity: medium
Reducing alert fatigue
Too many alerts = ignored alerts. Here's how to keep alerts meaningful:
Set appropriate thresholds:
# BAD: Alerts on every failed login
- condition: event == "login_failed"
# GOOD: Alerts on suspicious patterns
- condition: count(event == "login_failed") > 10 in 5m grouped by ip_address
Add context:
# BAD: Just says "suspicious activity"
message: "Suspicious activity detected"
# GOOD: Actionable information
message: |
Brute force attack detected
Source IP: {{ ip_address }}
Target users: {{ affected_users }}
Failed attempts: {{ count }}
Timeframe: last 5 minutes
Action: Consider blocking IP in WAF
Use severity levels properly:
- CRITICAL: Someone needs to wake up
- HIGH: Investigate within hours
- MEDIUM: Check during business hours
- LOW: Weekly review
Tune based on feedback:
- Track false positive rate
- If > 50% false positives, adjust threshold
- If 0% alerts, you might be missing things
Alert routing
Send alerts to the right place:
# alertmanager.yml
route:
receiver: default
routes:
# Critical security alerts → PagerDuty
- match:
severity: critical
category: security
receiver: pagerduty-security
# High security alerts → Slack security channel
- match:
severity: high
category: security
receiver: slack-security
# Infrastructure alerts → Slack ops channel
- match:
category: infrastructure
receiver: slack-ops
receivers:
- name: pagerduty-security
pagerduty_configs:
- service_key: YOUR_PAGERDUTY_KEY
severity: critical
- name: slack-security
slack_configs:
- api_url: https://hooks.slack.com/services/XXX
channel: '#security-alerts'
- name: slack-ops
slack_configs:
- api_url: https://hooks.slack.com/services/XXX
channel: '#ops-alerts'
SIEM concepts
Security Information and Event Management (SIEM) combines log management with security analytics. While full SIEM platforms are expensive, you can build SIEM-like capabilities with open-source tools.
What SIEM adds
| Capability | Log Management | SIEM |
|---|---|---|
| Log collection | ✓ | ✓ |
| Search and visualization | ✓ | ✓ |
| Correlation across sources | Limited | ✓ |
| Threat intelligence integration | Manual | Automatic |
| Behavioral analysis | No | ✓ |
| Compliance reporting | Manual | Built-in |
| Case management | No | ✓ |
| Automated response | Limited | ✓ |
Building SIEM-like capabilities
1. Log correlation:
Combine logs from multiple sources to detect attacks:
# Detect: SSH brute force followed by successful login, then suspicious command
correlation_rule:
name: ssh_compromise_chain
events:
- event_type: ssh_failed_login
count: ">10"
group_by: source_ip, target_host
window: 5m
save_as: brute_force
- event_type: ssh_successful_login
source_ip: $brute_force.source_ip
target_host: $brute_force.target_host
within: 10m
after: brute_force
save_as: compromise
- event_type: command_execution
host: $compromise.target_host
command: ("wget*" OR "curl*" OR "nc*")
within: 30m
after: compromise
alert:
severity: critical
message: "Likely SSH compromise: brute force from {source_ip} → successful login → suspicious commands"
2. Threat intelligence integration:
Cross-reference IPs and domains with known bad actors:
# Example: Check IPs against threat intel feed
import requests
def check_ip_reputation(ip_address):
# Check against AbuseIPDB
response = requests.get(
"https://api.abuseipdb.com/api/v2/check",
headers={"Key": ABUSEIPDB_API_KEY},
params={"ipAddress": ip_address}
)
data = response.json()["data"]
if data["abuseConfidenceScore"] > 80:
return {
"malicious": True,
"score": data["abuseConfidenceScore"],
"reports": data["totalReports"],
"categories": data["usageType"]
}
return {"malicious": False}
3. User and Entity Behavior Analytics (UEBA):
Detect anomalies based on normal behavior:
# Baseline: User typically logs in 9am-6pm from US
# Alert: Same user logging in at 3am from Russia
def detect_anomalous_login(user_id, login_event):
# Get user's normal login patterns
baseline = get_user_baseline(user_id)
# Check for anomalies
anomalies = []
# Time anomaly
if not baseline.normal_hours.contains(login_event.hour):
anomalies.append({
"type": "unusual_time",
"expected": baseline.normal_hours,
"actual": login_event.hour
})
# Location anomaly
if login_event.country not in baseline.normal_countries:
anomalies.append({
"type": "new_location",
"expected": baseline.normal_countries,
"actual": login_event.country
})
# Impossible travel
if baseline.last_login:
distance = calculate_distance(
baseline.last_login.location,
login_event.location
)
time_diff = login_event.time - baseline.last_login.time
if distance / time_diff.hours > 500: # 500 km/h impossible
anomalies.append({
"type": "impossible_travel",
"distance_km": distance,
"time_hours": time_diff.hours
})
return anomalies
Free SIEM alternatives
| Tool | Type | Best for | Link |
|---|---|---|---|
| Wazuh | Full SIEM | Comprehensive security monitoring | wazuh.com |
| Security Onion | Network security monitoring | Network-focused security | securityonion.net |
| OSSEC | Host-based IDS | File integrity, rootkit detection | ossec.net |
| Suricata | Network IDS | Network threat detection | suricata.io |
| TheHive | Incident response | Case management, playbooks | thehive-project.org |
Wazuh setup (Docker):
# docker-compose.yml for Wazuh
version: '3.8'
services:
wazuh.manager:
image: wazuh/wazuh-manager:4.7.2
hostname: wazuh.manager
ports:
- "1514:1514"
- "1515:1515"
- "514:514/udp"
- "55000:55000"
environment:
- INDEXER_URL=https://wazuh.indexer:9200
- FILEBEAT_SSL_VERIFICATION_MODE=none
volumes:
- wazuh_api_configuration:/var/ossec/api/configuration
- wazuh_etc:/var/ossec/etc
- wazuh_logs:/var/ossec/logs
wazuh.indexer:
image: wazuh/wazuh-indexer:4.7.2
hostname: wazuh.indexer
environment:
- "OPENSEARCH_JAVA_OPTS=-Xms1g -Xmx1g"
volumes:
- wazuh-indexer-data:/var/lib/wazuh-indexer
wazuh.dashboard:
image: wazuh/wazuh-dashboard:4.7.2
hostname: wazuh.dashboard
ports:
- "443:5601"
environment:
- INDEXER_USERNAME=admin
- INDEXER_PASSWORD=SecretPassword
- WAZUH_API_URL=https://wazuh.manager
depends_on:
- wazuh.indexer
- wazuh.manager
volumes:
wazuh_api_configuration:
wazuh_etc:
wazuh_logs:
wazuh-indexer-data:
Wazuh provides:
- File integrity monitoring
- Rootkit detection
- Log analysis
- Compliance dashboards (PCI-DSS, GDPR, HIPAA)
- Active response (automatic blocking)
- Vulnerability detection
Log retention and compliance
Retention requirements
| Standard | Minimum retention | Notes |
|---|---|---|
| PCI-DSS | 1 year | 3 months immediately available |
| HIPAA | 6 years | May vary by state |
| SOC 2 | 1 year | Depends on trust criteria |
| GDPR | Varies | "No longer than necessary" |
| ISO 27001 | 3 years recommended | As long as needed for security |
| Internal best practice | 90 days hot, 1 year cold | Balance cost and usefulness |
Storage tiers
| Tier | Retention | Query speed | Cost | Indexing | Example |
|---|---|---|---|---|---|
| Hot | 7–30 days | Fast | High | Full | Elasticsearch on SSD |
| Warm | 30–90 days | Slower | Moderate | Reduced | Elasticsearch on HDD, S3 + Athena |
| Cold | 90+ days | Archive only | Low | None | S3 Glacier, compressed files |
Elasticsearch Index Lifecycle Management:
PUT _ilm/policy/security-logs-policy
{
"policy": {
"phases": {
"hot": {
"min_age": "0ms",
"actions": {
"rollover": {
"max_primary_shard_size": "50gb",
"max_age": "7d"
}
}
},
"warm": {
"min_age": "7d",
"actions": {
"shrink": { "number_of_shards": 1 },
"forcemerge": { "max_num_segments": 1 },
"allocate": { "require": { "data": "warm" } }
}
},
"cold": {
"min_age": "30d",
"actions": {
"allocate": { "require": { "data": "cold" } }
}
},
"delete": {
"min_age": "365d",
"actions": { "delete": {} }
}
}
}
}
Log integrity
For compliance and forensics, logs must be tamper-proof:
Log signing:
import hashlib
import hmac
def sign_log_entry(entry, secret_key):
"""Sign a log entry to detect tampering"""
entry_bytes = json.dumps(entry, sort_keys=True).encode()
signature = hmac.new(
secret_key.encode(),
entry_bytes,
hashlib.sha256
).hexdigest()
entry["_signature"] = signature
return entry
def verify_log_entry(entry, secret_key):
"""Verify a log entry hasn't been tampered with"""
signature = entry.pop("_signature", None)
if not signature:
return False
entry_bytes = json.dumps(entry, sort_keys=True).encode()
expected = hmac.new(
secret_key.encode(),
entry_bytes,
hashlib.sha256
).hexdigest()
return hmac.compare_digest(signature, expected)
AWS CloudWatch Log Integrity:
# Enable log file validation for CloudTrail
aws cloudtrail update-trail \
--name main-trail \
--enable-log-file-validation
# Validate log file integrity
aws cloudtrail validate-logs \
--trail-arn arn:aws:cloudtrail:us-east-1:123456789:trail/main-trail \
--start-time 2024-01-01T00:00:00Z
Incident investigation with logs
When an incident occurs, logs are your primary investigation tool.
Investigation workflow
┌──────────────────┐
│ Alert Received │
└────────┬─────────┘
│
▼
┌──────────────────┐
│ Initial Triage │ What triggered? When? What systems?
└────────┬─────────┘
│
▼
┌──────────────────┐
│ Scope Assessment│ How widespread? What data? What access?
└────────┬─────────┘
│
▼
┌──────────────────┐
│ Timeline Build │ What happened first? What came after?
└────────┬─────────┘
│
▼
┌──────────────────┐
│ Root Cause │ How did they get in? What was the vulnerability?
└────────┬─────────┘
│
▼
┌──────────────────┐
│ Impact Analysis │ What was accessed? What was changed?
└────────┬─────────┘
│
▼
┌──────────────────┐
│ Containment │ Stop the bleeding, preserve evidence
└────────┬─────────┘
│
▼
┌──────────────────┐
│ Documentation │ Timeline, findings, lessons learned
└──────────────────┘
Investigation queries
Find initial access:
# Kibana Query Language (KQL)
# All authentication events for compromised user around incident time
user_id: "compromised_user" AND event: (login* OR auth*)
AND @timestamp >= "2024-01-15T00:00:00" AND @timestamp <= "2024-01-15T23:59:59"
| sort @timestamp
# Loki LogQL
{job="app"} | json | user_id="compromised_user" | line_format "{{.timestamp}} {{.event}} {{.ip_address}}"
Trace attacker movement:
# All events from attacker's IP
ip_address: "1.2.3.4" | sort @timestamp
# All events from compromised session
session_id: "abc123" | sort @timestamp
# Commands executed on compromised host
host: "compromised-server" AND event: "command_execution" | sort @timestamp
Identify data access:
# Data accessed by compromised account
user_id: "compromised_user" AND event: (data_access OR data_export OR file_download)
# Unusual queries against database
source: database AND query_type: SELECT AND tables: (users OR payments OR credentials)
Build timeline:
def build_incident_timeline(start_time, end_time, indicators):
"""
Build timeline from logs using IOCs
indicators = {
"ip_addresses": ["1.2.3.4"],
"user_ids": ["compromised_user"],
"session_ids": ["abc123"],
"hosts": ["compromised-server"]
}
"""
timeline = []
# Query each indicator type
for ip in indicators.get("ip_addresses", []):
events = query_logs(f"ip_address:{ip}", start_time, end_time)
timeline.extend(events)
for user_id in indicators.get("user_ids", []):
events = query_logs(f"user_id:{user_id}", start_time, end_time)
timeline.extend(events)
# Deduplicate and sort
timeline = sorted(set(timeline), key=lambda e: e.timestamp)
return timeline
Investigation checklist
## Incident Investigation Checklist
### Initial Response
- [ ] Document alert details (time, type, affected systems)
- [ ] Assign incident owner
- [ ] Open incident ticket
- [ ] Notify relevant stakeholders
### Scoping
- [ ] Identify all affected user accounts
- [ ] Identify all affected systems/hosts
- [ ] Identify all IP addresses involved
- [ ] Determine time range of activity
### Evidence Collection
- [ ] Export relevant logs (preserve originals)
- [ ] Capture network traffic if ongoing
- [ ] Take memory dumps if needed
- [ ] Screenshot any relevant dashboards
### Analysis
- [ ] Build timeline of events
- [ ] Identify initial access vector
- [ ] Map lateral movement
- [ ] Identify data accessed/exfiltrated
- [ ] Identify persistence mechanisms
### Containment
- [ ] Block malicious IPs
- [ ] Disable compromised accounts
- [ ] Isolate compromised systems
- [ ] Revoke compromised credentials
### Documentation
- [ ] Write incident summary
- [ ] Create detailed timeline
- [ ] Document root cause
- [ ] List remediation actions
- [ ] Schedule post-mortem
Common mistakes to avoid
Logging only errors. Normal operations matter for security. You need successful logins to detect compromised accounts, successful API calls to detect data exfiltration. Log INFO-level events.
No timestamp or wrong timezone. Logs without timestamps are useless for investigation. Use ISO 8601 format with UTC timezone everywhere.
Logging sensitive data. Passwords, tokens, credit cards in logs create a new vulnerability. Mask or exclude sensitive fields.
No log rotation. Disks fill up, applications crash. Set up rotation and retention from day one.
Alerts without runbooks. An alert fires at 3am. What should the on-call person do? Document response procedures for each alert type.
Too many alerts. Alert fatigue kills security. If you're ignoring alerts, you're not monitoring. Tune thresholds and suppress noise.
Logs only on application servers. Security events happen everywhere — firewalls, databases, cloud APIs, load balancers. Collect them all.
No testing. How do you know your alerts work? Generate test events, verify they trigger, check they route correctly.
Real-world incidents
Target breach (2013): Attackers were in the network for weeks. Security tools generated alerts about the malware, but they were ignored due to alert fatigue. 40 million credit cards stolen. Source: U.S. Senate Committee
Equifax breach (2017): Attack began in May, discovered in July. Poor logging meant investigators couldn't determine full extent of breach. 147 million records exposed. The investigation took months because log data was incomplete. Source: GAO Report
SolarWinds (2020): Attackers were in networks for 9+ months before discovery. Companies with better logging and anomaly detection could identify unusual behavior from the compromised software updates. Source: CISA
Colonial Pipeline (2021): Attackers accessed the network using a compromised VPN password. Proper monitoring of VPN authentication patterns could have detected the unauthorized access earlier. Source: Bloomberg
Workshop: setting up logging and monitoring
This workshop guides you through implementing a complete logging and monitoring solution.
Part 1: Centralized log collection
Option A: Grafana Loki (recommended for small teams)
- Create docker-compose.yml:
version: '3.8'
services:
loki:
image: grafana/loki:2.9.3
ports:
- "3100:3100"
command: -config.file=/etc/loki/local-config.yaml
volumes:
- loki-data:/loki
grafana:
image: grafana/grafana:10.3.1
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
- GF_AUTH_ANONYMOUS_ENABLED=true
volumes:
- grafana-data:/var/lib/grafana
- ./grafana/provisioning:/etc/grafana/provisioning
promtail:
image: grafana/promtail:2.9.3
volumes:
- ./promtail-config.yaml:/etc/promtail/config.yml
- /var/log:/var/log:ro
- /var/run/docker.sock:/var/run/docker.sock
command: -config.file=/etc/promtail/config.yml
volumes:
loki-data:
grafana-data:
- Create promtail-config.yaml:
server:
http_listen_port: 9080
positions:
filename: /tmp/positions.yaml
clients:
- url: http://loki:3100/loki/api/v1/push
scrape_configs:
- job_name: system
static_configs:
- targets:
- localhost
labels:
job: varlogs
host: ${HOSTNAME}
__path__: /var/log/*.log
- job_name: docker
docker_sd_configs:
- host: unix:///var/run/docker.sock
refresh_interval: 5s
relabel_configs:
- source_labels: ['__meta_docker_container_name']
target_label: container
- Start the stack:
docker compose up -d
-
Access Grafana at http://localhost:3000
-
Add Loki as data source:
- Go to Connections → Data Sources → Add data source
- Select Loki
- URL: http://loki:3100
- Save & Test
Option B: AWS CloudWatch (for AWS environments)
- Install CloudWatch agent:
wget https://s3.amazonaws.com/amazoncloudwatch-agent/ubuntu/amd64/latest/amazon-cloudwatch-agent.deb
sudo dpkg -i amazon-cloudwatch-agent.deb
- Create configuration:
{
"logs": {
"logs_collected": {
"files": {
"collect_list": [
{
"file_path": "/var/log/auth.log",
"log_group_name": "security-logs",
"log_stream_name": "{instance_id}/auth"
},
{
"file_path": "/var/log/nginx/access.log",
"log_group_name": "application-logs",
"log_stream_name": "{instance_id}/nginx-access"
},
{
"file_path": "/app/logs/*.json",
"log_group_name": "application-logs",
"log_stream_name": "{instance_id}/app"
}
]
}
}
}
}
- Start agent:
sudo /opt/aws/amazon-cloudwatch-agent/bin/amazon-cloudwatch-agent-ctl \
-a fetch-config \
-m ec2 \
-s \
-c file:/opt/aws/amazon-cloudwatch-agent/etc/config.json
Part 2: Application logging
- Add structured logging to your application:
Python example:
import structlog
import logging
# Configure structlog
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer()
],
wrapper_class=structlog.stdlib.BoundLogger,
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
)
logger = structlog.get_logger()
# Log security events
def login(user_id, ip_address, success):
logger.info(
"user_login",
user_id=user_id,
ip_address=ip_address,
success=success,
event_type="authentication"
)
Node.js example:
const pino = require('pino');
const logger = pino({
level: 'info',
formatters: {
level: (label) => ({ level: label }),
},
timestamp: pino.stdTimeFunctions.isoTime,
});
// Log security events
function login(userId, ipAddress, success) {
logger.info({
event: 'user_login',
user_id: userId,
ip_address: ipAddress,
success: success,
event_type: 'authentication'
});
}
- Test logging:
# Generate some log entries
curl -X POST http://localhost:8000/api/login \
-H "Content-Type: application/json" \
-d '{"username": "test", "password": "wrong"}'
# Check logs appear in Grafana/CloudWatch
Part 3: Security alerts
-
Create alert for failed logins (Grafana):
- Go to Alerting → Alert rules → New alert rule
- Query:
count_over_time({job="app"} |= "login" |= "success\":false" [5m]) - Condition: Is above 10
- Folder: Security
- Evaluation: Every 1m for 5m
- Add notification channel (Slack, email, etc.)
-
Create CloudWatch alarm:
# Create metric filter
aws logs put-metric-filter \
--log-group-name application-logs \
--filter-name failed-logins \
--filter-pattern '{ $.event = "user_login" && $.success = false }' \
--metric-transformations \
metricName=FailedLogins,metricNamespace=Security,metricValue=1
# Create alarm
aws cloudwatch put-metric-alarm \
--alarm-name HighFailedLogins \
--metric-name FailedLogins \
--namespace Security \
--statistic Sum \
--period 300 \
--threshold 20 \
--comparison-operator GreaterThanThreshold \
--evaluation-periods 1 \
--alarm-actions arn:aws:sns:us-east-1:123456789:security-alerts
- Test alerts:
# Generate failed logins
for i in {1..25}; do
curl -X POST http://localhost:8000/api/login \
-H "Content-Type: application/json" \
-d '{"username": "test", "password": "wrong"}'
sleep 1
done
# Verify alert fired
Part 4: Security dashboard
Create a Grafana dashboard with these panels:
-
Authentication overview:
- Total logins (success/failure)
- Login failure rate over time
- Top source IPs for failures
-
Application security:
- Error rates
- API response times
- Rate limit hits
-
Infrastructure:
- SSH connections
- Sudo commands
- Service status changes
Example dashboard JSON:
{
"title": "Security Overview",
"panels": [
{
"title": "Failed Logins (24h)",
"type": "stat",
"targets": [
{
"expr": "count_over_time({job=\"app\"} |= \"login\" |= \"success\\\":false\" [24h])"
}
]
},
{
"title": "Login Failures by IP",
"type": "table",
"targets": [
{
"expr": "sum by (ip_address) (count_over_time({job=\"app\"} |= \"login\" |= \"success\\\":false\" [24h]))"
}
]
},
{
"title": "Authentication Events",
"type": "timeseries",
"targets": [
{
"expr": "sum(count_over_time({job=\"app\"} |= \"login\" [5m]))",
"legendFormat": "Total"
},
{
"expr": "sum(count_over_time({job=\"app\"} |= \"login\" |= \"success\\\":false\" [5m]))",
"legendFormat": "Failed"
}
]
}
]
}
Artifacts to produce
After this workshop, you should have:
- Centralized logging stack — Docker Compose or CloudWatch configuration
- Application logging integration — Structured logging in your codebase
- Security alerts — At least 3 alerts configured:
- Failed login threshold
- After-hours access
- Error rate spike
- Security dashboard — Grafana/CloudWatch dashboard with key metrics
- Alert runbook — Documentation for responding to each alert type
- Log retention policy — Document how long logs are kept and why
Self-check questions
- Why is centralized logging important for security?
- What should you never log?
- What's the difference between a log management system and a SIEM?
- How do you prevent alert fatigue?
- What log retention period does PCI-DSS require?
- How would you investigate a suspected account compromise using logs?
- What's structured logging and why is it better?
- How do you ensure log integrity for forensic purposes?
- Name three security events that should trigger immediate alerts.
- What's the difference between hot, warm, and cold log storage?
How to explain this to leadership
Start with the risk: "Right now, if someone breaches our system, we won't know for months. The average company takes 194 days to detect a breach. With proper monitoring, we can detect issues in minutes."
Use a real example: "Last month, [competitor/news story] had a breach. They couldn't figure out what data was taken because their logs were incomplete. We're setting up logging so we can always answer that question."
Quantify the value: "Faster detection means less damage. According to IBM, breaches detected within 200 days cost $1 million less than those taking longer. Our monitoring will alert us immediately."
Address compliance: "For SOC 2 / ISO 27001 / [relevant standard], we need centralized logging with proper retention. This project gets us compliant."
Show the dashboard: "Here's what we're monitoring now. We can see every login, every access to sensitive data, every unusual pattern. Before, we were blind."
Compare costs: "We're using open-source tools that cost $0 in software licenses. The main cost is my time to set it up — about 2 weeks. A SIEM product would cost $50-100K/year."
Links and resources
Log management
- ELK Stack documentation
- Grafana Loki documentation
- Graylog documentation
- Fluentd documentation
- Vector documentation
Cloud logging
SIEM and security monitoring
- Wazuh documentation
- Security Onion documentation
- Sigma rules (detection rules format)
- MITRE ATT&CK framework
Best practices
- OWASP Logging Cheat Sheet
- NIST SP 800-92 Guide to Computer Security Log Management
- CIS Benchmarks for logging
Alerting and incident response
What's next
This completes the security in development section. You can now write more secure code, manage secrets properly, harden your CI/CD pipeline, lock down containers and cloud infrastructure, and detect suspicious activity when it happens.
Next section: security culture — the harder work of getting everyone else on the team to care about security too.