multiprocess.js
5.08 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
'use strict';
const debug = require('debug')('log4js:multiprocess');
const net = require('net');
const LoggingEvent = require('../LoggingEvent');
const END_MSG = '__LOG4JS__';
/**
* Creates a server, listening on config.loggerPort, config.loggerHost.
* Output goes to config.actualAppender (config.appender is used to
* set up that appender).
*/
function logServer(config, actualAppender, levels) {
/**
* Takes a utf-8 string, returns an object with
* the correct log properties.
*/
function deserializeLoggingEvent(clientSocket, msg) {
debug('deserialising log event');
const loggingEvent = LoggingEvent.deserialise(msg);
loggingEvent.remoteAddress = clientSocket.remoteAddress;
loggingEvent.remotePort = clientSocket.remotePort;
return loggingEvent;
}
/* eslint prefer-arrow-callback:0 */
const server = net.createServer(function serverCreated(clientSocket) {
clientSocket.setEncoding('utf8');
let logMessage = '';
function logTheMessage(msg) {
if (logMessage.length > 0) {
debug('deserialising log event and sending to actual appender');
actualAppender(deserializeLoggingEvent(clientSocket, msg));
}
}
function chunkReceived(chunk) {
debug('chunk of data received');
let event;
logMessage += chunk || '';
if (logMessage.indexOf(END_MSG) > -1) {
event = logMessage.substring(0, logMessage.indexOf(END_MSG));
logTheMessage(event);
logMessage = logMessage.substring(event.length + END_MSG.length) || '';
// check for more, maybe it was a big chunk
chunkReceived();
}
}
function handleError(error) {
const loggingEvent = {
startTime: new Date(),
categoryName: 'log4js',
level: levels.ERROR,
data: ['A worker log process hung up unexpectedly', error],
remoteAddress: clientSocket.remoteAddress,
remotePort: clientSocket.remotePort
};
actualAppender(loggingEvent);
}
clientSocket.on('data', chunkReceived);
clientSocket.on('end', chunkReceived);
clientSocket.on('error', handleError);
});
server.listen(config.loggerPort || 5000, config.loggerHost || 'localhost', function () {
debug('master server listening');
// allow the process to exit, if this is the only socket active
server.unref();
});
function app(event) {
debug('log event sent directly to actual appender (local event)');
return actualAppender(event);
}
app.shutdown = function (cb) {
debug('master shutdown called, closing server');
server.close(cb);
};
return app;
}
function workerAppender(config) {
let canWrite = false;
const buffer = [];
let socket;
let shutdownAttempts = 3;
function write(loggingEvent) {
debug('Writing log event to socket');
socket.write(loggingEvent.serialise(), 'utf8');
socket.write(END_MSG, 'utf8');
}
function emptyBuffer() {
let evt;
debug('emptying worker buffer');
/* eslint no-cond-assign:0 */
while ((evt = buffer.shift())) {
write(evt);
}
}
function createSocket() {
debug(`worker appender creating socket to ${config.loggerHost || 'localhost'}:${config.loggerPort || 5000}`);
socket = net.createConnection(config.loggerPort || 5000, config.loggerHost || 'localhost');
socket.on('connect', () => {
debug('worker socket connected');
emptyBuffer();
canWrite = true;
});
socket.on('timeout', socket.end.bind(socket));
// don't bother listening for 'error', 'close' gets called after that anyway
socket.on('close', createSocket);
}
createSocket();
function log(loggingEvent) {
if (canWrite) {
write(loggingEvent);
} else {
debug('worker buffering log event because it cannot write at the moment');
buffer.push(loggingEvent);
}
}
log.shutdown = function (cb) {
debug('worker shutdown called');
if (buffer.length && shutdownAttempts) {
debug('worker buffer has items, waiting 100ms to empty');
shutdownAttempts -= 1;
setTimeout(() => {
log.shutdown(cb);
}, 100);
} else {
socket.removeAllListeners('close');
socket.end(cb);
}
};
return log;
}
function createAppender(config, appender, levels) {
if (config.mode === 'master') {
debug('Creating master appender');
return logServer(config, appender, levels);
}
debug('Creating worker appender');
return workerAppender(config);
}
function configure(config, layouts, findAppender, levels) {
let appender;
debug(`configure with mode = ${config.mode}`);
if (config.mode === 'master') {
if (!config.appender) {
debug(`no appender found in config ${config}`);
throw new Error('multiprocess master must have an "appender" defined');
}
debug(`actual appender is ${config.appender}`);
appender = findAppender(config.appender);
if (!appender) {
debug(`actual appender "${config.appender}" not found`);
throw new Error(`multiprocess master appender "${config.appender}" not defined`);
}
}
return createAppender(config, appender, levels);
}
module.exports.configure = configure;