var request = require('request'), mqtt = require('mqtt'), readline = require('readline'), ini = require('./ini.js'), file = require('./file.js'), fs = require('fs'); const ini_path=__dirname+'/setup.ini'; var product=ini.load(ini_path, 'windows','\r')['product'], internet=ini.load(ini_path, 'windows','\r')['mqtt'], stapi = ini.load(ini_path, 'windows','\r')['stapi']; modbus = ini.load(ini_path, 'windows','\r')['modbus']; var product_save_log_path= __dirname+'/'+product["SAVE_LOG_PATH"]; product_save_totle_flow_path=__dirname+'/'+ 'Total_Flow.txt'; var host_ip=internet["HOST_IP"]; var host_port= internet["HOST_PORT"]; var mqtt_max_line=internet["SEND_MAX_LINE"]; var GUID=internet["PUBLISH_TOPIC"].split(","); //const GUID='686c52cd-54f5-4b38-8c16-6304c7544ccf'; //const GUID='c5b920bf-d194-4398-b24e-1a44ba86515c'; const observation_id=['2663812','2663813','2663814']; const datastream_id='2663811'; var pubtopic=[]; var pubtopic_index; var eb43_value,pf5330_value,epr_value; var phenomenonTime=''; //===========modbus get device value time var result_count_ok; var observation; var csv_buffer_tmp=[0,0,0] var csv_string=''; var resultTime=''; var testValue=0; var send_data_buffer; var column_total; var column_now; var sent_data_len=0; var opt={ protocolId:'MQIsdp', protocolVersion:3, clientId:product["NAME"], username:'sgUm53TxhyPC86YM0efLexFZQ9521Jwwk++HR6Hn9io=', password:'rpY0B+pTMKsQUhez9I0nsQ==', reconnectPeriod:parseInt(internet["CONNECT_INTERVAL"]), }; var km_server_parameter={ A:0, B:0, C:0, D:0, E:0, R:0, } var km_ip='61.220.50.81'; var km_topic = product["NAME"]; var km_pf5330_topic='pf5330_'+product["NAME"]; km_epr_topic='epr_'+product["NAME"]; var opt_km={ protocolId:'MQIsdp', protocolVersion:3, clientId:product["NAME"]+'_km', username:'finetek', password:'80132123', reconnectPeriod:parseInt(internet["CONNECT_INTERVAL"]), }; var string_node={ "dollar":'', "time":'', "product":'', "device":'', m_data:[], } function ParaInit(){ csv_string=''; csv_buffer_tmp=[]; result_count_ok=0; send_data_buffer=['','','']; column_total = 0; column_now = 0; sent_data_len=0; } function FillZero(s){ var t=''; if(parseInt(s)<10&&s.length==1){ t='0'+s; } else { t=s; } return t; } function modbusT2isoT(data){ var offset,yy,mm,dd,HH,MM,ss; //data (before)==> 'yyyy/mm/dd HH:MM:ss' //time (after)==> 'yyyymmddTHHMMss+08'(ISO8601 Time) offset=data.indexOf('/'); yy=data.substring(0,offset); data=data.slice(offset+1); offset=data.indexOf('/'); mm=FillZero(data.substring(0,offset)); data=data.slice(offset+1); offset=data.indexOf(' '); dd=FillZero(data.substring(0,offset)); data=data.slice(offset+1); offset=data.indexOf(':'); HH=FillZero(data.substring(0,offset)); data=data.slice(offset+1); offset=data.indexOf(':'); MM=FillZero(data.substring(0,offset)); ss=FillZero(data.slice(offset+1)); time=yy+mm+dd+'T'+HH+MM+ss+"+08" return time; } function hexTofloat32(hex,offset){ var hex_buf=Buffer.from([hex[3+offset]%256, hex[3+offset]/256, hex[3+offset+1]%256, hex[3+offset+1]/256]); var float32=hex_buf.readFloatLE(0); return float32; } function hexTodouble64(hex,offset){ var hex_buf=Buffer.from([hex[3+offset]%256, hex[3+offset]/256, hex[3+offset+1]%256, hex[3+offset+1]/256, hex[3+offset+2]%256, hex[3+offset+2]/256, hex[3+offset+3]%256, hex[3+offset+3]/256]); var double64=hex_buf.readDoubleLE(0); return double64; } function hexToInt16(hex,offset){ return hex[3+offset]; } function hextToInt32(hex,offset){ var hiByte=hexToInt16(hex,offset); var lowByte=hexToInt16(hex,offset+1); return (hiByte<<16)|lowByte; } //=====eb43 parameter====== var eb43={ Level_M:0, Level2Flow_M3_S:0, Level2TotalFlow_M3:0, }; //=====pf5330 modbus world to value, total 33 value====== var pf5330={ time:'', name:product["NAME"], Va_V:0,Vab_V:0,Ia_A:0,Pa_KW:0,Qa_KVAR:0,Sa_KVA:0,PFa:0,PEt_KWH:0, Vb_V:0,Vbc_V:0,Ib_A:0,Pb_KW:0,Qb_KVAR:0,Sb_KVA:0,PFb:0,Freq_HZ:0,NA:0, Vc_V:0,Vca_V:0,Ic_A:0,Pc_KW:0,Qc_KVAR:0,Sc_KVA:0,PFc:0, PTCT_10:0,VpAvg_V:0,VlAvg_V:0,IpAvg_A:0,Pt_KW:0,Qt_KVAR:0,St_KVA:0,PF_avg:0,PTCT:0, }; //===== modbus to value (address base '1') function pf5330_modbus_transfer_2_value(string, offset){ pf5330.time=phenomenonTime; pf5330.Va_V=hexToInt16(string,1+offset);//=====1 pf5330.Vab_V=hexToInt16(string,2+offset);//=====1 pf5330.Ia_A=hexToInt16(string,3+offset);//=====1 pf5330.Pa_KW=hextToInt32(string,4+offset);//=====2 pf5330.Qa_KVAR=hextToInt32(string,6+offset);//=====2 pf5330.Sa_KVA=hextToInt32(string,8+offset);//=====2 pf5330.PFa=hexToInt16(string,10+offset);//=====1 pf5330.PEt_KWH=hextToInt32(string,11+offset);//=====2 pf5330.Vb_V=hexToInt16(string,13+offset);//===== 1 pf5330.Vbc_V=hexToInt16(string,14+offset);//===== 1 pf5330.Ib_A=hexToInt16(string,15+offset);//===== 1 pf5330.Pb_KW=hextToInt32(string,16+offset);//=====2 pf5330.Qb_KVAR=hextToInt32(string,18+offset);//=====2 pf5330.Sb_KVA=hextToInt32(string,20+offset);//=====2 pf5330.PFb=hexToInt16(string,22+offset);//=====1 pf5330.Freq_HZ=hexToInt16(string,23+offset);//=====1 pf5330.NA=hexToInt16(string,24+offset);//=====1 pf5330.Vc_V=hexToInt16(string,25+offset);//===== 1 pf5330.Vca_V=hexToInt16(string,26+offset);//===== 1 pf5330.Ic_A=hexToInt16(string,27+offset);//===== 1 pf5330.Pc_KW=hextToInt32(string,28+offset);//=====2 pf5330.Qc_KVAR=hextToInt32(string,30+offset);//=====2 pf5330.Sc_KVA=hextToInt32(string,32+offset);//=====2 pf5330.PFc=hexToInt16(string,34+offset); //=====1 pf5330.PTCT_10=hextToInt32(string,35+offset);//=====2 pf5330.VpAvg_V=hexToInt16(string,37+offset);//=====1 pf5330.VlAvg_V=hexToInt16(string,38+offset);//=====1 pf5330.IpAvg_A=hexToInt16(string,39+offset);//=====1 pf5330.Pt_KW=hextToInt32(string,40+offset);//=====2 pf5330.Qt_KVAR=hextToInt32(string,42+offset);//=====2 pf5330.St_KVA=hextToInt32(string,44+offset);//=====2 pf5330.PF_avg=hextToInt32(string,46+offset);//=====2 pf5330.PTCT=hexToInt16(string,48+offset);//=====1 } //=====pf5330 modbus world to value, total 4 value====== var epr={ time:'', name:product["NAME"], FlowSpeed_M_S:0,FlowRate_M3_HR:0,Frequency_HZ:0,TotalFlow_L:0, }; //===== modbus to value (address base '1') function epr_modbus_transfer_2_value(string, offset){ epr.time=phenomenonTime; epr.FlowSpeed_M_S=hexTofloat32(string,1+offset);//=====2 epr.FlowRate_M3_HR=hexTofloat32(string,3+offset);//=====2 epr.Frequency_HZ=hexTofloat32(string,5+offset);//=====2 epr.TotalFlow_L=hexTodouble64(string,20+offset);//=====4 } //=====eb43 水位(M)轉流量(M^3/s)====== function eb43_level_converter_flow(level,p){ p.A=1; p.B=1; p.C=1; p.D=1; p.E=1; return p.A * (level*level*level*level*level) + p.B * (level*level*level*level) + p.C * (level*level*level) + p.D * (level*level) + p.E; } function csv2jsonvalue(data){ var string=[]; var temperature=0,display_value=0,new_cap=0; var send_string=[]; //error 1 : ",2019/3/17 13:48:7,finetek0005,1,20879,17499,6599,0,47683,17192,12842," //error 2 : " $,2019/3/17 13:50:13,finetek0005,1,17317,17499,6599,0,47683,17192,12842,17434,160,3375,3078,3778," string=data.split(','); //=====找尋 '$' 的位置 for(i=0;i<5;i++){ if(string[i]=='$'){ string_node["dollar"]=string[i]; string_node["time"]=string[i+1]; string_node["product"]=string[i+2]; string_node["device"]=string[i+3]; break; } } var offset = i; //=====找尋 string_node["product"] 的位置 if(i>=5){ // i>=5表示找不到 '$' for(i=0;i<5;i++){ if(string[i]==product["NAME"]){ string_node["product"]=string[i]; string_node["device"]=string[i+1]; //=====string[i]==product["NAME"] 在第三位 if(i==2){ string_node["dollar"]=string[i-2]; string_node["time"]=string[i-1]; i-=2; } //=====string[i]==product["NAME"] 在第二位 else if(i==1){ string_node["time"]=string[i-1]; i-=1; } break; } } } offset=i; string_node.m_data=[]; if(string_node["dollar"]!='$' && string_node["product"]!=product["NAME"]) { console.log('save_log.csv data error'); return 0; } phenomenonTime=modbusT2isoT(string_node["time"]); switch(string_node["device"]){ //===========EB43MC, 使用 STA 到 安研平台 case '1': eb43.Level_M=hexTofloat32(string,1+offset);//eb43=====4128 + (1-1) (1 base) eb43.Level_M/=1000; // 'mm' -> 'm' if(repeat.idx){ eb43.Level2Flow_M3_S=eb43_level_converter_flow(eb43.Level_M,km_server_parameter); // 水位(m) 轉 流量(m^3/s) eb43.Level2TotalFlow_M3=file.readAnsc(product_save_totle_flow_path);// 讀取總流量在Total_Flow.txt(m^3) var total_flow =parseFloat( eb43.Level2TotalFlow_M3); total_flow+=(eb43.Level2Flow_M3_S * (parseFloat(modbus['POLLING_INTERVAL'])/1000));// 總流量(新) = 總流量(舊) + (流量(m3/s) * 量測時間(s)) eb43.Level2TotalFlow_M3=total_flow.toString(); fs.writeFileSync(product_save_totle_flow_path, eb43.Level2TotalFlow_M3); // 存入總流量 } string_node.m_data.push(eb43.Level_M); string_node.m_data.push(eb43.Level2Flow_M3_S); string_node.m_data.push(eb43.Level2TotalFlow_M3); console.log(eb43.Level_M); console.log(eb43.Level2Flow_M3_S); console.log(eb43.Level2TotalFlow_M3); pubtopic_index=0; break; //===========PF5330, 不使用 STA 到 安研平台 case '2': pf5330_modbus_transfer_2_value(string, offset); pf5330.VpAvg_V/=10; // (小數點一位) pf5330.VpAvg_V/=1000; // (安研平台單位為KV) string_node.m_data.push(pf5330.VpAvg_V); console.log(pf5330.VpAvg_V); pubtopic_index=0; break; //===========EPR, 不使用 STA 到 安研平台 case '3': epr_modbus_transfer_2_value(string, offset); //epr_value=hexTofloat32(string,26+offset);//===== //string_node.m_data.push(epr_value); //console.log(epr_value); publish_index=0; break; } return string_node["device"]; } function eb43_send_STA_payload(sent_len){ var connect_parameter ={ack_cnt:0,sent_cnt:0,}; while(connect_parameter.sent_cnt=column_total){ console.log('send column_total: ' + column_total); console.log('send column_now: ' + column_now); //len 絕對不能比 data.length 大 if(column_total< internet["SEND_MAX_LINE"]) sent_data_len-=2; //file.cancel_send_data(product_save_log_path,sent_data_len); console.log('last Ack'); client.end(); //kmClient.end(); } else{ //file.cancel_send_data(product_save_log_path,sent_data_len); console.log('column_total: ' + column_total); console.log('column_now: ' + column_now); sent_data_len=send_data_buffer[column_now].length+2; console.info('sent_data_len = '+sent_data_len); switch(csv2jsonvalue(send_data_buffer[column_now])){ case '1': eb43_send_STA_payload(sent_data_len); break; case '2': var observation={ "phenomenonTime" : phenomenonTime, "result" : string_node.m_data[0], } console.log('topic: ', pubtopic[pubtopic_index]); console.log('data: ', JSON.stringify(observation)); client.publish(pubtopic[pubtopic_index],JSON.stringify(observation),{qos:2},function(err){ if(!err){ //client.publish(pubtopic[pubtopic_index],JSON.stringify(observation),{qos:2},function(err){ if(repeat.idx){ file.cancel_send_data(product_save_log_path,sent_data_len); sent_data_analysis(); } //}); } }); /*kmClient.publish(km_pf5330_topic,JSON.stringify(pf5330),{qos:2},function(err){ if(!err){ file.cancel_send_data(product_save_log_path,sent_data_len); sent_data_analysis(); } }); */ break; case '3': kmClient.publish(km_epr_topic,JSON.stringify(epr),{qos:2},function(err){ if(!err){ file.cancel_send_data(product_save_log_path,sent_data_len); sent_data_analysis(); } }); break; } } } function readlines(path){ var read_file_buffer=[]; var column_tmp=0; var lineReader = readline.createInterface({input:fs.createReadStream(path)}); lineReader.on('line',(line)=>{ if(column_tmp < mqtt_max_line){ read_file_buffer.push(line); if(line.length>0) column_tmp++; } }); lineReader.on('close',()=>{ console.log('readline close...'); column_total = column_tmp; send_data_buffer=read_file_buffer; console.log('column_total ',column_total); if(column_total){ // readline file data ==> abcdefghijk > process > send data buffer ==> abcdefghijk\r\n // +2 為\r\n的字串長度 , 因為 上面 read line 會直接過濾掉 \r\n sent_data_len=send_data_buffer[column_now].length+2; console.log(sent_data_len); console.log(send_data_buffer); switch(csv2jsonvalue(send_data_buffer[column_now])){ case '1': eb43_send_STA_payload(sent_data_len); break; case '2': var observation={ "phenomenonTime" : phenomenonTime, "result" : string_node.m_data[0], } console.log('topic: ', pubtopic[pubtopic_index]); console.log('data: ', JSON.stringify(observation)); client.publish(pubtopic[pubtopic_index],JSON.stringify(observation),{qos:2},function(err){ if(!err){ //client.publish(pubtopic[pubtopic_index],JSON.stringify(observation),{qos:2},function(err){ if(repeat.idx){ file.cancel_send_data(product_save_log_path,sent_data_len); sent_data_analysis(); } //}); } }); /*kmClient.publish(km_pf5330_topic,JSON.stringify(pf5330),{qos:2},function(err){ if(!err){ file.cancel_send_data(product_save_log_path,sent_data_len); sent_data_analysis(); } }); */ break; case '3': kmClient.publish(km_epr_topic,JSON.stringify(epr),{qos:2},function(err){ if(!err){ file.cancel_send_data(product_save_log_path,sent_data_len); sent_data_analysis(); } }); break; default: client.publish('/finetek/1-wire','$,0000000000,finetek0001,NoData,'); break; } } lineReader.close(); }); } //================================mqtt publish test in km mqtt server //================================================================ //================================================================ var client = mqtt.connect('mqtt://'+host_ip,opt); var kmClient = mqtt.connect('mqtt://'+km_ip,opt_km); kmClient.on('connect', function () { console.log('km: ' + km_ip + ':1883'); console.log('km_topic: ' + km_topic); kmClient.subscribe(km_topic); }); kmClient.on('close',function(){ console.log('km mqtt closed'); }); kmClient.on('message', function (topic, message, packet) { console.log('message come in'); console.log(message); }); //================================================================ //================================================================ //================================================================ client.on('connect', function () { ParaInit(); console.log('CONNECTED TO: ' + host_ip + ':1883'); console.log(pubtopic); readlines(product_save_log_path); //var time = Date2SIO8601(new Date(Date.now()+(8 * 60 * 60 * 1000))); //testValue+=1.1; /*observation={ "phenomenonTime" : time, "result" : testValue, } */ //console.log(JSON.stringify(observation)); //client.publish(subtopic,JSON.stringify(observation)); //client.subscribe(pubtopic); }); client.on('close',function(){ console.log('mqtt closed'); }); client.on('message', function (topic, message, packet) { console.log('ack'); }); client.on('error', function(){ console.log('error'); }); var repeat={ idx:1, time:internet["CONNECT_INTERVAL"]/2, }; function mqtt_connect(){ repeat.idx=repeat.idx?0:1; client.reconnect(); //kmClient.reconnect(); } for(i=0;i