diff --git a/lib/sender.js b/lib/sender.js
index 1668cb1..2909daf 100644
--- a/lib/sender.js
+++ b/lib/sender.js
@@ -37,7 +37,7 @@ class FluentSender {
     if (this._eventMode === 'Message') {
       this._sendQueue = []; // queue for items waiting for being sent.
       this._flushInterval = 0;
-      this._messageQueueSizeLimit = options.messageQueueSizeLimit || 0;
+      this._messageQueueSizeLimit = options.messageQueueSizeLimit || 1000;
     } else {
       this._sendQueue = new Map();
       this._flushInterval = options.flushInterval || 100;
@@ -110,10 +110,11 @@ class FluentSender {
       return;
     }
 
-    this._push(tag, timestamp, data, callback);
+    this._push(tag, timestamp, data);
     this._connect(() => {
       this._flushSendQueue();
     });
+    callback()
   }
 
   end(label, data, callback) {
@@ -182,11 +183,10 @@ class FluentSender {
     return msgpack.encode([time, data], { codec: codec });
   }
 
-  _push(tag, time, data, callback) {
+  _push(tag, time, data) {
     if (this._eventMode === 'Message') {
       // Message mode
       const item = this._makePacketItem(tag, time, data);
-      item.callback = callback;
       if (this._messageQueueSizeLimit && this._sendQueue.length === this._messageQueueSizeLimit) {
         this._sendQueue.shift();
       }
@@ -199,13 +199,10 @@ class FluentSender {
         const eventEntryData = this._sendQueue.get(tag);
         eventEntryData.eventEntries.push(eventEntry);
         eventEntryData.size += eventEntry.length;
-        if (callback) eventEntryData.callbacks.push(callback);
       } else {
-        const callbacks = callback ? [callback] : [];
         this._sendQueue.set(tag, {
           eventEntries: [eventEntry],
           size: eventEntry.length,
-          callbacks: callbacks
         });
       }
     }
@@ -292,7 +289,7 @@ class FluentSender {
     this._socket && this._socket.destroy();
     this._socket = null;
     this._status = null;
-    this._connecting = false;
+    setTimeout(()=>{this._connecting = false},this.reconnectInterval)
   }
 
   _handshake(callback) {
@@ -375,7 +372,7 @@ class FluentSender {
         // nothing written;
         return;
       }
-      this._doWrite(item.packet, item.options, timeoutId, [item.callback]);
+      this._doWrite(item.packet, item.options, timeoutId);
     } else {
       if (this._sendQueue.size === 0) {
         this._flushingSendQueue = false;
@@ -398,11 +395,11 @@ class FluentSender {
         eventEntryDataSize: eventEntryData.size
       };
       const packet = msgpack.encode([tag, entries, options], { codec: codec });
-      this._doWrite(packet, options, timeoutId, eventEntryData.callbacks);
+      this._doWrite(packet, options, timeoutId);
     }
   }
 
-  _doWrite(packet, options, timeoutId, callbacks) {
+  _doWrite(packet, options, timeoutId) {
     const sendPacketSize = (options && options.eventEntryDataSize) || this._sendQueueSize;
     this._socket.write(packet, () => {
       if (this.requireAckResponse) {
@@ -414,13 +411,7 @@ class FluentSender {
               'ack in response and chunk id in sent data are different',
               { ack: response.ack, chunk: options.chunk }
             );
-            callbacks.forEach((callback) => {
-              this._handleEvent('error', error, callback);
-            });
-          } else { // no error on ack
-            callbacks.forEach((callback) => {
-              callback && callback();
-            });
+          this._handleEvent('error', error);
           }
           this._sendQueueSize -= sendPacketSize;
           process.nextTick(() => {
@@ -429,15 +420,10 @@ class FluentSender {
         });
         timeoutId = setTimeout(() => {
           const error = new FluentLoggerError.ResponseTimeout('ack response timeout');
-          callbacks.forEach((callback) => {
-            this._handleEvent('error', error, callback);
-          });
+          this._handleEvent('error', error);
         }, this.ackResponseTimeout);
       } else {
         this._sendQueueSize -= sendPacketSize;
-        callbacks.forEach((callback) => {
-          callback && callback();
-        });
         process.nextTick(() => {
           this._waitToWrite();
         });