1
+ 'use strict' ;
2
+
3
+ var _ = require ( 'lodash' ) ,
4
+ request = require ( 'request' ) ,
5
+ stream = require ( 'stream' ) ,
6
+ querystring = require ( 'querystring' ) ;
7
+
8
+
9
+ var TypeList = [ 'UInt8' , 'UInt16' , 'UInt32' , 'UInt64' , 'Int8' , 'Int16' , 'Int32' , 'Int64' ] ;
10
+
11
+ class TypeCast {
12
+ constructor ( ) {
13
+ this . castMap = TypeList . reduce (
14
+ ( obj , type ) => {
15
+ obj [ type ] = value => { return parseInt ( value , 10 ) } ;
16
+ return obj ;
17
+ } ,
18
+ {
19
+ 'Date' : value => value ,
20
+ 'String' : value => value
21
+ }
22
+ ) ;
23
+ }
24
+
25
+
26
+ cast ( type , value ) {
27
+ return this . castMap [ type ] ? this . castMap [ type ] ( value ) : value ;
28
+ }
29
+ }
30
+
31
+
32
+ class ClickHouse {
33
+ constructor ( _opts ) {
34
+ this . opts = _ . extend (
35
+ {
36
+ url : 'http://localhost' ,
37
+ port : 8123 ,
38
+ debug : false
39
+ } ,
40
+ _opts
41
+ ) ;
42
+
43
+ this . typeCast = new TypeCast ( ) ;
44
+ }
45
+
46
+
47
+ /**
48
+ * Get url query
49
+ * @param {String } query
50
+ * @returns {String }
51
+ */
52
+ getUrl ( query ) {
53
+ var params = { } ;
54
+
55
+ if ( query ) params [ 'query' ] = query + ' FORMAT TabSeparatedWithNamesAndTypes' ;
56
+
57
+ if ( Object . keys ( params ) . length == 0 ) return new Error ( 'query is empty' ) ;
58
+
59
+ return this . opts . url + ':' + this . opts . port + '?' + querystring . stringify ( params ) ;
60
+ }
61
+
62
+
63
+ /**
64
+ * Parse data
65
+ * @param {Buffer } data
66
+ * @returns {Array }
67
+ */
68
+ _parseData ( data ) {
69
+ var me = this ,
70
+ rows = data . toString ( 'utf8' ) . split ( '\n' ) ,
71
+ columnList = rows [ 0 ] . split ( '\t' ) ,
72
+ typeList = rows [ 1 ] . split ( '\t' ) ;
73
+
74
+ // Удаляем строки с заголовками и типами столбцов И завершающую строку
75
+ rows = rows . slice ( 2 , rows . length - 1 ) ;
76
+
77
+ columnList = columnList . reduce (
78
+ function ( arr , column , i ) {
79
+ arr . push ( {
80
+ name : column ,
81
+ type : typeList [ i ]
82
+ } ) ;
83
+
84
+ return arr ;
85
+ } ,
86
+ [ ]
87
+ ) ;
88
+
89
+ return rows . map ( function ( row , i ) {
90
+ let columns = row . split ( '\t' ) ;
91
+
92
+ return columnList . reduce (
93
+ function ( obj , column , i ) {
94
+ obj [ column . name ] = me . typeCast . cast ( column . type , columns [ i ] ) ;
95
+ return obj ;
96
+ } ,
97
+ { }
98
+ ) ;
99
+ } ) ;
100
+ }
101
+
102
+
103
+ /**
104
+ * Exec query
105
+ * @param {String } query
106
+ * @param {Function } cb
107
+ * @returns {Stream|undefined }
108
+ */
109
+ query ( query , cb ) {
110
+ var me = this ,
111
+ url = me . getUrl ( query ) ;
112
+
113
+ if ( me . opts . debug ) console . log ( 'url' , url ) ;
114
+
115
+ if ( cb ) {
116
+ return request . get ( url , function ( error , response , body ) {
117
+ if ( ! error && response . statusCode == 200 ) {
118
+ cb ( null , me . _parseData ( body ) ) ;
119
+ } else {
120
+ cb ( error ) ;
121
+ }
122
+ } ) ;
123
+ } else {
124
+ let rs = new stream . Readable ( ) ;
125
+ rs . _read = function ( chunk ) {
126
+ if ( me . opts . debug ) console . log ( 'rs _read chunk' , chunk ) ;
127
+ } ;
128
+
129
+ let queryStream = request . get ( url ) ;
130
+ queryStream . columnsName = null ;
131
+
132
+ let responseStatus = 200 ;
133
+
134
+ queryStream
135
+ . on ( 'response' , function ( response ) {
136
+ responseStatus = response . statusCode ;
137
+ } )
138
+ . on ( 'error' , function ( err ) {
139
+ rs . emit ( 'error' , err ) ;
140
+ } )
141
+ . on ( 'data' , function ( data ) {
142
+
143
+ // Если ошибка на строне сервера (не правильный запрос, ещё что-то),
144
+ // то придёт один пакет данных с сообщением об ошибке
145
+ if ( responseStatus != 200 ) return rs . emit ( 'error' , data . toString ( 'utf8' ) ) ;
146
+
147
+ var rows = data . toString ( 'utf8' ) . split ( '\n' ) ;
148
+
149
+ if ( ! queryStream . columnList ) {
150
+ let columnList = rows [ 0 ] . split ( '\t' ) ;
151
+ let typeList = rows [ 1 ] . split ( '\t' ) ;
152
+
153
+ // Удаляем строки с заголовками и типами столбцов И завершающую строку
154
+ rows = rows . slice ( 2 , rows . length - 1 ) ;
155
+
156
+ queryStream . columnList = columnList . reduce (
157
+ function ( arr , column , i ) {
158
+ arr . push ( {
159
+ name : column ,
160
+ type : typeList [ i ]
161
+ } ) ;
162
+
163
+ return arr ;
164
+ } ,
165
+ [ ]
166
+ ) ;
167
+
168
+ if ( me . opts . debug ) console . log ( 'columns' , queryStream . columnList ) ;
169
+ }
170
+
171
+ if ( me . opts . debug ) console . log ( 'raw data' , data . toString ( 'utf8' ) ) ;
172
+
173
+ for ( let i = 0 ; i < rows . length ; i ++ ) {
174
+ let columns = rows [ i ] . split ( '\t' ) ;
175
+ rs . emit (
176
+ 'data' ,
177
+ queryStream . columnList . reduce (
178
+ ( o , c , i ) => {
179
+ o [ c . name ] = me . typeCast . cast ( c . type , columns [ i ] ) ;
180
+ return o ;
181
+ } ,
182
+ { }
183
+ )
184
+ ) ;
185
+ }
186
+ } )
187
+ . on ( 'end' , function ( ) {
188
+ rs . emit ( 'end' ) ;
189
+ } ) ;
190
+
191
+ return rs ;
192
+ }
193
+ }
194
+ }
195
+
196
+
197
+ module . exports = ClickHouse ;
198
+
199
+ //Example
200
+
201
+ /*
202
+ var async = require('async');
203
+
204
+ var query = 'SELECT FlightDate, DestCityName, AirlineID, DestStateFips FROM ontime LIMIT 10';
205
+ var ch = new ClickHouse();
206
+
207
+ async.series(
208
+ [
209
+ function (cb) {
210
+
211
+ // single query
212
+ ch.query(query, function (err, rows) {
213
+ console.log(err, rows);
214
+
215
+ cb();
216
+ });
217
+ },
218
+
219
+ function (cb) {
220
+ var count = 0;
221
+
222
+ // query with data streaming
223
+ ch.query(query)
224
+ .on('data', function (data) {
225
+ console.log('ch data', data);
226
+ if (++count % 1000 == 0) console.log('count', count);
227
+ })
228
+ .on('error', function (err) {
229
+ console.log('ch error', err);
230
+
231
+ process.exit();
232
+ })
233
+ .on('end', function () {
234
+ console.log('ch end', count);
235
+
236
+ cb();
237
+ });
238
+ }
239
+ ],
240
+ function () {
241
+ process.exit();
242
+ }
243
+ );
244
+ */
0 commit comments