forked from github.com/GenderDysphoria.fyi
Terraform updates
Attempting to do some log parsing into cloudwatch logs
This commit is contained in:
83
terraform/lambda/src/cloudfront.js
Normal file
83
terraform/lambda/src/cloudfront.js
Normal file
@@ -0,0 +1,83 @@
|
||||
const { gunzip } = require('zlib');
|
||||
const { promisify } = require('util');
|
||||
const { S3 } = require('aws-sdk');
|
||||
const { unescape } = require('querystring');
|
||||
|
||||
const gunzipAsync = promisify(gunzip);
|
||||
|
||||
|
||||
// Parsing the line containing the version.
|
||||
//
|
||||
// Format:
|
||||
//
|
||||
// #Version: 1.0
|
||||
//
|
||||
const parseVersion = (line) => {
|
||||
if (!line.startsWith('#Version:')) {
|
||||
throw new Error(`Invalid version line '${line}'`);
|
||||
} else {
|
||||
return line.match(/[\d.]+$/);
|
||||
}
|
||||
};
|
||||
|
||||
// Parsing the line containinge the fields format and use kebab case.
|
||||
// https://docs.aws.amazon.com/AmazonCloudFront/latest/DeveloperGuide/AccessLogs.html#LogFileFormat
|
||||
//
|
||||
// Format:
|
||||
// eslint-disable-next-line max-len
|
||||
// #Fields: date time x-edge-location sc-bytes c-ip cs-method cs(Host) cs-uri-stem sc-status cs(Referer) cs(User-Agent) cs-uri-query cs(Cookie) x-edge-result-type x-edge-request-id x-host-header cs-protocol cs-bytes time-taken x-forwarded-for ssl-protocol ssl-cipher x-edge-response-result-type cs-protocol-version fle-status fle-encrypted-fields
|
||||
//
|
||||
const parseFields = (line) => {
|
||||
if (!line.startsWith('#Fields:')) {
|
||||
throw new Error(`Invalid fields line '${line}'`);
|
||||
} else {
|
||||
return line.match(/[\w()-]+(\s|$)/g).map(field => (
|
||||
// Strip parentheses and remove unecessary abbreviations in field names
|
||||
field.replace(/\(([^)]+)\)/, '-$1').replace(/^(c-|cs-|sc-)/, '').trim().toLowerCase()
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
// Unescape value twice (because fuck you that's why).
|
||||
// https://forums.aws.amazon.com/thread.jspa?threadID=134017
|
||||
//
|
||||
const decode = value => unescape(unescape(value));
|
||||
|
||||
// Split up line and assign to corresponding field.
|
||||
//
|
||||
const parseLine = (line, fields) => {
|
||||
if (line.startsWith('#')) {
|
||||
throw new Error(`Invalid log line '${line}'`);
|
||||
} else {
|
||||
return line.split('\t').reduce((object, section, index) => {
|
||||
const result = object;
|
||||
if (section !== '-') result[fields[index]] = decode(section); // Skip missing fields
|
||||
return result;
|
||||
}, {});
|
||||
}
|
||||
};
|
||||
|
||||
// Get log file from S3 and unzip it.
|
||||
//
|
||||
const getLogFile = async ({ bucket, key, region }) => {
|
||||
const s3 = new S3({ region });
|
||||
|
||||
const zippedObject = await s3.getObject({ Bucket: bucket, Key: key }).promise();
|
||||
const logFile = await gunzipAsync(zippedObject.Body);
|
||||
return logFile.toString().trim();
|
||||
};
|
||||
|
||||
// Parse log file and return a list of log events.
|
||||
//
|
||||
exports.parseLogFile = async ({ bucket, key, region }) => {
|
||||
const file = await getLogFile({ bucket, key, region });
|
||||
|
||||
const lines = file.split('\n');
|
||||
|
||||
// Shift first line which contains the version and parse it for validation
|
||||
parseVersion(lines.shift());
|
||||
// Shift next line containing fields format and parse it for validation
|
||||
const fields = parseFields(lines.shift());
|
||||
|
||||
return lines.map(line => parseLine(line, fields));
|
||||
};
|
||||
88
terraform/lambda/src/cloudwatch-logs.js
Normal file
88
terraform/lambda/src/cloudwatch-logs.js
Normal file
@@ -0,0 +1,88 @@
|
||||
const { CloudWatchLogs } = require('aws-sdk');
|
||||
|
||||
// Split up ARN like "arn:aws:logs:eu-west-1:123456789012:log-group:example-group:*"
|
||||
const [,,, region,,, logGroupName] = process.env.CLOUDWATCH_LOGS_GROUP_ARN.split(':');
|
||||
|
||||
const cloudwatchlogs = new CloudWatchLogs({ region });
|
||||
|
||||
|
||||
// Group array of hashes by defined key.
|
||||
//
|
||||
const groupBy = (array, key) => (
|
||||
array.reduce((object, item) => {
|
||||
const result = object;
|
||||
|
||||
if (result[item[key]]) {
|
||||
result[item[key]].push(item);
|
||||
} else if (item[key]) {
|
||||
result[item[key]] = [item];
|
||||
}
|
||||
return result;
|
||||
}, {})
|
||||
);
|
||||
|
||||
// Find log stream by prefix.
|
||||
//
|
||||
const findLogStream = async (logStreamNamePrefix) => {
|
||||
const params = { logGroupName, logStreamNamePrefix };
|
||||
|
||||
const { logStreams } = await cloudwatchlogs.describeLogStreams(params).promise();
|
||||
|
||||
if (logStreams.length > 1) {
|
||||
throw new Error(`Found '${logStreams.length}' matching CloudWatch Logs streams but expected only one.`);
|
||||
}
|
||||
|
||||
return logStreams[0];
|
||||
};
|
||||
|
||||
// Get log stream or creting it if not present yet.
|
||||
//
|
||||
// Name format:
|
||||
// 2000-01-01
|
||||
//
|
||||
const describeLogStream = async (logStreamName) => {
|
||||
let logStream = await findLogStream(logStreamName);
|
||||
|
||||
if (!logStream) {
|
||||
await cloudwatchlogs.createLogStream({ logGroupName, logStreamName }).promise();
|
||||
logStream = await findLogStream(logStreamName);
|
||||
}
|
||||
|
||||
return logStream;
|
||||
};
|
||||
|
||||
// Extend the original record with some additional fields
|
||||
// and encapsule records into CloudWatch Logs event.
|
||||
//
|
||||
const buildlogEvents = records => (
|
||||
records.map((record) => {
|
||||
const payload = record;
|
||||
payload.name = 'logs:cloudfront';
|
||||
|
||||
return {
|
||||
message: JSON.stringify(payload),
|
||||
timestamp: new Date(`${payload.date} ${payload.time} UTC`).getTime(),
|
||||
};
|
||||
}).sort((a, b) => a.timestamp - b.timestamp) // Events in a request must be chronological ordered
|
||||
);
|
||||
|
||||
// Send the given documents to CloudWatch Logs group.
|
||||
//
|
||||
exports.putLogEvents = async (records) => {
|
||||
const groupedRecords = groupBy(records, 'date');
|
||||
|
||||
const putLogEventsCalls = Object.keys(groupedRecords).map(async (key) => {
|
||||
const logStream = await describeLogStream(key);
|
||||
|
||||
const params = {
|
||||
logEvents: buildlogEvents(groupedRecords[key]),
|
||||
logGroupName,
|
||||
logStreamName: logStream.logStreamName,
|
||||
sequenceToken: logStream.uploadSequenceToken,
|
||||
};
|
||||
|
||||
return cloudwatchlogs.putLogEvents(params).promise();
|
||||
});
|
||||
|
||||
return Promise.all(putLogEventsCalls);
|
||||
};
|
||||
18
terraform/lambda/src/index.js
Normal file
18
terraform/lambda/src/index.js
Normal file
@@ -0,0 +1,18 @@
|
||||
const { parseLogFile } = require('./cloudfront');
|
||||
const { putLogEvents } = require('./cloudwatch-logs');
|
||||
|
||||
// Lambda handler.
|
||||
//
|
||||
exports.handler = async (event) => {
|
||||
if (event.Records.length !== 1) {
|
||||
throw new Error(`Wrong length of events.Records, expected: '1', got: '${event.Records.length}'`);
|
||||
} else {
|
||||
const params = {
|
||||
bucket: event.Records[0].s3.bucket.name,
|
||||
key: decodeURIComponent(event.Records[0].s3.object.key.replace(/\+/g, ' ')),
|
||||
region: event.Records[0].awsRegion,
|
||||
};
|
||||
|
||||
return putLogEvents(await parseLogFile(params));
|
||||
}
|
||||
};
|
||||
Reference in New Issue
Block a user