用云函数快速批量处理COS里面的日志

本来CLS日志服务可以直接触发云函数来快速处理的,不过这样触发次数就有点多了。

比如说被处理的日志本来就是云函数生成的,那么函数触发次数就直接要翻番,如果日志不是需要及时处理的话,可以让它在CLS里面累计上几分钟,然后用定时器触发一个云函数,通过日志服务查询接口 SearchLog 来实现批量处理。

​但是这样做有个坑,如果这几分钟的log条数超过100条的话,我们可以把Limit放到最大1000,如果超过1000条的话,按照文档的说明,应该通过传递Context的方式来实现翻页,最多可以翻出来10000条。然而,文档上说的操作根本是无法实现的,因为如果你翻页查询的时候传递了Query参数,那就会被认为你要进行一次新的查询,然后给你返回第一页,即使Query参数和上一次查询一模一样也没有用。如果你不传Query参数只传Context参数呢,那你只会收到一个缺少Query参数的错误,因为Query是必选参数。

就算你用SQL查询和SQL翻页的方式(通过给SQL传递offset和limit),如果这几分钟的log超过了一万条,你还是没辙。所以更好的方式可能是每分钟通过 日志下载接口 来下载指定时间段的日志处理,或者把日志自动投递到COS,用COS的创建文件事件做触发器来触发云函数执行,然后把日志文件下载过来批量处理。

当日志非常多的时候,通过日志下载接口需要自行处理分包的问题,用投递COS的方式处理的话分包也是自动处理的,代码逻辑会更简单一些。但是CLS投递到COS的最短周期是5分钟,但是实际上一个日志从生成到打包、上传,触发处理,可能要经过接近10分钟,如果需要更及时的处理数据的话,只要确保文件不会大到需要分包,用定时器来触发可能更合适。

这样一个日志文件可能会非常大,如果整个文件读到内存中处理的话需要给云函数申请足够多的内存。更好的方式是用流的方式来处理。因为COS的sdk可以把文件读成流,日志下载接口生成的日志文件也可以用request读成流。这样即使文件非常大,也可以通过流处理的方式进行实时解压(因为日志打包的时候会被强制自动压缩),并对流进行实时解析,实现高效的日志处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
'use strict';
const zlib = require("zlib"),
readLine = require('readline'),
COS = require('cos-nodejs-sdk-v5'),
cos = new COS({
SecretId: process.env.SECRET_ID,
SecretKey: process.env.SECRET_KEY,
KeepAlive: false
}),
tencentcloud = require("/opt/node_modules/tencentcloud-sdk-nodejs"), //node14之前的版本的内置SDK不支持cls,需要下载新的sdk用层的方式覆盖进去并用这个方式引用
ClsClient = tencentcloud.cls.v20201016.Client,
clientConfig = {
credential: {
secretId: process.env.SECRET_ID,
secretKey: process.env.SECRET_KEY,
},
region: process.env.TENCENTCLOUD_REGION,
profile: {
httpProfile: {
endpoint: "cls.tencentcloudapi.com",
},
},
},
downloadCycle = 60000, //下载周期
request = require('request');
exports.main_handler = (event, context, callback) => {
let gunzip = zlib.createGunzip();
let jsonCount = 0,
invalidLines = 0;
let rl = readLine.createInterface(gunzip);
rl.on('line', function(line) {
if (/^\s*\{.*\}\s*$/.test(line)) {
jsonCount++; //收到的行数据似乎是JSON数据
} else {
invalidLines++; //收到的一行似乎不是JSON数据
}
})
rl.on('close', () => {
//因为是demo,这里没有等待所有的可能的并发流都处理完再回调
//实际使用的时候应该Promise.all或者用异步方式逐个流处理完再回调。
callback(null, "有效行数: " + jsonCount + " ,无效行数:" + invalidLines)
})
if ("Type" in event && event.Type == "Timer") { //定时器触发
let client = new ClsClient(clientConfig);
let params = {
"TopicId": process.env.TopicId,
"Query": "SCF_Namespace:\"" + process.env.SCF_NAMESPACE + "\"",
"Count": 10000000, //最大一次下载1000万条。如果一个周期内有可能超过的话需要考虑分包或者该用cos投递
"From": (Math.floor(Date.now() / downloadCycle - 1) * downloadCycle),
"To": (Math.floor(Date.now() / downloadCycle) * downloadCycle) //触发点之前的一个完整的时间周期
};
//生成日志打包任务,定时器下次触发的时候下载处理
client.CreateExport(params).then(res => {
if ("ExportId" in res) {
console.log("导出任务发起成功,生成的文件等待定时器下次触发的时候处理");
client.DescribeExports({
"TopicId": process.env.TopicId
}).then(res => {
if ("Exports" in res && res.Exports instanceof Array) {
for (let i = 0; i < res.Exports.length; i++) {
let e = res.Exports[i];
if (e.Status == "Completed") {
request(e.CosPath).pipe(gunzip);
client.DeleteExport({"ExportId": e.ExportId});//如果要更保险一点,放到readline的close后面删除更好
}
}
}
})
} else {
console.log("导出任务发起失败,需要告警并走重试逻辑");
}
})
} else if ("Records" in event && event.Records instanceof Array) { //COS触发
let records = event.Records;
for (let i = 0; i < records.length; i++) {
let c = records[i].cos;
console.log("下载并分析日志文件 " + c.cosObject.key)
cos.getObject({
Bucket: c.cosBucket.name + "-" + c.cosBucket.appid,
Region: c.cosBucket.cosRegion,
Key: "/" + c.cosObject.key.split("/").slice(3).join("/"),
Output: gunzip
// Output: "/tmp/test.gz"
}, function(err, data) {
if (err) console.log("err:" + JSON.stringify(err))
});
}
} else {
return "未知触发器"
}
};