Skip to content

Commit 9a8f2b4

Browse files
committed
Add data loader endpoint
1 parent c08eff9 commit 9a8f2b4

File tree

7 files changed

+342
-16
lines changed

7 files changed

+342
-16
lines changed

app/controllers/csv.js

Lines changed: 235 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,235 @@
1+
const fs = require('fs');
2+
const csv = require('fast-csv');
3+
const BatchUpdate = require('../lib/batchUpdate');
4+
const debug = require('debug')('tutorial:csv');
5+
const _ = require('lodash');
6+
const Status = require('http-status-codes');
7+
const path = require('path');
8+
const moment = require('moment-timezone');
9+
10+
/*
11+
* Delete the temporary file
12+
*/
13+
function removeCsvFile(path) {
14+
fs.unlink(path, (err) => {
15+
if (err) {
16+
throw err;
17+
}
18+
});
19+
}
20+
21+
/*
22+
* Read the CSV data from the temporary file.
23+
* This returns an in memory representation of the raw CSV file
24+
*/
25+
function readCsvFile(path) {
26+
return new Promise((resolve, reject) => {
27+
const rows = [];
28+
29+
fs.createReadStream(path)
30+
.pipe(csv.parse({ headers: true }))
31+
.on('error', (error) => {
32+
reject(error.message);
33+
})
34+
.on('data', (row) => {
35+
rows.push(row);
36+
})
37+
.on('end', () => {
38+
resolve(rows);
39+
});
40+
});
41+
}
42+
43+
/*
44+
* Strip the id and an key from the header row.
45+
*/
46+
function parseId(input) {
47+
const regexId = /^[^\s]+/;
48+
const regexKey = /[\w]+$/;
49+
const id = regexId.exec(input)[0];
50+
const key = regexKey.exec(input)[0];
51+
52+
return { id, key };
53+
}
54+
55+
function createEntitiesFromRows(rows) {
56+
const allEntities = [];
57+
const timestamp = new Date().toISOString();
58+
59+
rows.forEach((row) => {
60+
const timestamp = moment.tz(row.annee, 'Etc/UTC').toISOString();
61+
const entity = {
62+
id: row.id,
63+
type: row.type
64+
};
65+
66+
Object.keys(row).forEach((key, index) => {
67+
const value = row[key];
68+
if (value !== '') {
69+
switch (key) {
70+
case 'birthdate':
71+
case 'comment':
72+
case 'fedWith':
73+
case 'giveName':
74+
case 'legalId':
75+
case 'name':
76+
case 'species':
77+
entity[key] = { value: value, type: 'Property' };
78+
break;
79+
80+
case 'temperature':
81+
entity[key] = {
82+
value: Number(value),
83+
type: 'Property',
84+
unitCode: 'CEL',
85+
observedAt: timestamp
86+
};
87+
break;
88+
case 'heartRate':
89+
entity[key] = { value: Number(value), type: 'Property', unitCode: '5K', observedAt: timestamp };
90+
break;
91+
case 'weight':
92+
entity[key] = {
93+
value: Number(value),
94+
type: 'Property',
95+
unitCode: 'KGM',
96+
observedAt: timestamp
97+
};
98+
break;
99+
100+
case 'jobTitle_name':
101+
entity.jobTitle = {
102+
type: 'Property',
103+
value: {
104+
name: row.jobTitle_name,
105+
inDefinedTermSet: row.jobTitle_inDefinedTermSet,
106+
termCode: row.jobTitle_termCode,
107+
url: row.jobTitle_url
108+
}
109+
};
110+
break;
111+
case 'streetAddress':
112+
entity.address = {
113+
type: 'Property',
114+
value: {
115+
addressLocality: row.addressLocality,
116+
addressRegion: row.addressRegion,
117+
postalCode: row.postalCode,
118+
streetAddress: row.streetAddress
119+
}
120+
};
121+
break;
122+
case 'location_type':
123+
entity.location = {
124+
type: 'GeoProperty',
125+
observedAt: timestamp,
126+
value: {
127+
type: row.location_type,
128+
coordinates: [Number(row.lng), Number(row.lat)]
129+
}
130+
};
131+
break;
132+
133+
case 'category':
134+
case 'cropStatus':
135+
case 'gender':
136+
case 'healthCondition':
137+
case 'phenologicalCondition':
138+
case 'reproductiveCondition':
139+
case 'sex':
140+
entity[key] = { vocab: value, type: 'VocabProperty' };
141+
break;
142+
case 'calvedBy':
143+
case 'cropType':
144+
case 'owns':
145+
case 'owner':
146+
case 'ownedBy':
147+
case 'siredBy':
148+
entity[key] = { object: value, type: 'Relationship' };
149+
break;
150+
case 'locatedAt':
151+
case 'observation':
152+
case 'prediction':
153+
entity[key] = { object: value, type: 'Relationship', observedAt: timestamp };
154+
break;
155+
case 'id':
156+
case 'type':
157+
break;
158+
default:
159+
debug('unknown : ' + key);
160+
break;
161+
}
162+
}
163+
164+
});
165+
166+
allEntities.push(entity);
167+
});
168+
return allEntities;
169+
}
170+
171+
/*
172+
* Create an array of promises to send data to the context broker.
173+
* Each insert represents a series of readings at a given timestamp
174+
*/
175+
function createContextRequests(entities) {
176+
const promises = [];
177+
entities.forEach((entitiesAtTimeStamp) => {
178+
promises.push(BatchUpdate.sendAsHTTP(entitiesAtTimeStamp));
179+
});
180+
return promises;
181+
}
182+
183+
/**
184+
* Actions when uploading a CSV file. The CSV file holds an array of
185+
* measurements each at a given timestamp.
186+
*/
187+
const upload = (req, res) => {
188+
if (req.file === undefined) {
189+
return res.status(Status.UNSUPPORTED_MEDIA_TYPE).send('Please upload a CSV file!');
190+
}
191+
192+
const file = path.join(__dirname, '../resources/', req.file.filename);
193+
194+
return readCsvFile(file)
195+
.then((rows) => {
196+
removeCsvFile(file);
197+
//console.log(rows[0])
198+
return createEntitiesFromRows(rows);
199+
})
200+
.then((entities) => {
201+
//console.log(JSON.stringify(entities[0], null, 2))
202+
203+
batchEntities = [];
204+
const chunkSize = 10;
205+
206+
for (let i = 0; i < entities.length; i += chunkSize) {
207+
const chunk = entities.slice(i, i + chunkSize);
208+
batchEntities.push(chunk);
209+
}
210+
211+
return createContextRequests(batchEntities);
212+
})
213+
.then(async (promises) => {
214+
const results = [];
215+
for (const promise of promises) {
216+
const result = await promise;
217+
results.push(result);
218+
}
219+
return results;
220+
})
221+
.then((results) => {
222+
const errors = _.filter(results, function (o) {
223+
return o.status === 'rejected';
224+
});
225+
return errors.length ? res.status(Status.BAD_REQUEST).json(errors) : res.status(Status.NO_CONTENT).send();
226+
})
227+
.catch((err) => {
228+
debug(err.message);
229+
return res.status(Status.INTERNAL_SERVER_ERROR).send(err.message);
230+
});
231+
};
232+
233+
module.exports = {
234+
upload
235+
};

app/controllers/iot/command-listener.js

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ const port = process.env.WEB_APP_PORT || '3000';
1818
const devicesPort = process.env.DUMMY_DEVICES_PORT || 3001;
1919
const devices = process.env.DUMMY_DEVICES || `http://localhost:${devicesPort}`;
2020
const autoMoveTractors = process.env.MOVE_TRACTOR || 10000;
21+
const devicesOff = process.env.DUMMY_OFF || false;
2122

2223
const dataModelContext =
2324
process.env.IOTA_JSON_LD_CONTEXT || 'http://localhost:' + port + '/data-models/ngsi-context.jsonld';
@@ -203,20 +204,25 @@ function updateTractorStatus() {
203204
});
204205
}
205206

206-
setInterval(() => {
207-
fireDevices('tractor');
208-
}, 3361);
209-
setInterval(fireAnimalCollars, 5099);
210-
setInterval(() => {
211-
fireDevices('temperature');
212-
}, 7001);
213-
setInterval(fireOverallFarmStatus, 10000);
214-
setInterval(() => {
215-
fireDevices('humidity');
216-
}, 8009);
217-
218-
if (autoMoveTractors > 0) {
219-
setInterval(updateTractorStatus, autoMoveTractors);
207+
if (!devicesOff) {
208+
debug(`Enabling dummy device updates on ${devices}`);
209+
setInterval(() => {
210+
fireDevices('tractor');
211+
}, 3361);
212+
setInterval(fireAnimalCollars, 5099);
213+
setInterval(() => {
214+
fireDevices('temperature');
215+
}, 7001);
216+
setInterval(fireOverallFarmStatus, 10000);
217+
setInterval(() => {
218+
fireDevices('humidity');
219+
}, 8009);
220+
221+
if (autoMoveTractors > 0) {
222+
setInterval(updateTractorStatus, autoMoveTractors);
223+
}
224+
} else {
225+
debug('Dummy device updates are disabled.');
220226
}
221227

222228
// The temperature Gauge does not accept commands,

app/lib/batchUpdate.js

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
const CONTEXT_BROKER_URL = process.env.CONTEXT_BROKER || 'http://localhost:1026/ngsi-ld/v1';
2+
const LINKED_DATA = process.env.IOTA_JSON_LD_CONTEXT || config.contextBroker.jsonLdContext;
3+
4+
async function parse(response) {
5+
let text = '';
6+
try {
7+
text = await response.text();
8+
const data = JSON.parse(text);
9+
return data;
10+
} catch (err) {
11+
return text;
12+
}
13+
}
14+
15+
function is2xxSuccessful(status) {
16+
return Math.floor(status / 100) === 2;
17+
}
18+
19+
// measures sent over HTTP are POST requests with params
20+
async function sendAsHTTP(state) {
21+
const url = CONTEXT_BROKER_URL + '/entityOperations/upsert';
22+
const headers = {
23+
'Content-Type': 'application/json',
24+
Link: '<' + LINKED_DATA + '>; rel="http://www.w3.org/ns/json-ld#context"; type="application/ld+json"'
25+
};
26+
const body = Array.isArray(state) ? state : [state];
27+
try {
28+
return fetch(`${url}?${new URLSearchParams({ options: 'update' })}`, {
29+
method: 'POST',
30+
headers,
31+
body: JSON.stringify(body)
32+
})
33+
.then((r) => parse(r).then((data) => ({ status: r.status, body: data })))
34+
.then((data) => {
35+
if (!is2xxSuccessful(data.status)) {
36+
throw new Error(data.body);
37+
}
38+
return data.body;
39+
});
40+
} catch (e) {
41+
console.log(e);
42+
return null;
43+
}
44+
}
45+
46+
module.exports = {
47+
sendAsHTTP
48+
};

app/lib/upload.js

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
const multer = require('multer');
2+
const path = require('path');
3+
4+
const csvFilter = (req, file, callback) => {
5+
if (file.mimetype.includes('csv') || file.mimetype.includes('excel') || file.mimetype.includes('spreadsheetml')) {
6+
callback(null, true);
7+
} else {
8+
callback('Please upload only csv and excel file.', false);
9+
}
10+
};
11+
12+
const storage = multer.diskStorage({
13+
destination: (req, file, callback) => {
14+
callback(null, path.join(__dirname, '../resources/'));
15+
},
16+
filename: (req, file, callback) => {
17+
callback(null, `${Date.now()}-${file.originalname}`);
18+
}
19+
});
20+
21+
const uploadFile = multer({ storage, fileFilter: csvFilter });
22+
module.exports = uploadFile;

app/package.json

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,17 @@
3535
"express": "~4.17.3",
3636
"express-healthcheck": "^0.1.0",
3737
"express-session": "^1.17.2",
38+
"fast-csv": "^4.3.2",
3839
"http-errors": "^2.0.0",
40+
"http-status-codes": "^2.1.4",
3941
"jsonld": "^8.3.3",
4042
"lodash": "^4.17.21",
41-
"moment": "^2.29.1",
43+
"moment": "~2.29.1",
44+
"moment-timezone": "~0.5.25",
4245
"mongoose": "^6.2.2",
4346
"morgan": "^1.10.0",
4447
"mqtt": "^4.3.6",
45-
"node-cache": "^5.1.2",
48+
"multer": "^2.0.2",
4649
"parse-links": "^0.1.0",
4750
"pug": "3.0.2",
4851
"request": "^2.88.2",

app/resources/.gitkeep

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
.gitkeep

0 commit comments

Comments
 (0)