Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion include/iocore/eventsystem/EThread.h
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,29 @@ operator new(size_t, ink_dummy_for_new *p)
}
#define ETHREAD_GET_PTR(thread, offset) ((void *)((char *)(thread) + (offset)))

extern EThread *this_ethread();
inline EThread *
this_ethread()
{
return EThread::this_ethread_ptr;
}

inline EThread *
this_event_thread()
{
EThread *ethread = this_ethread();
if (ethread != nullptr && ethread->tt == REGULAR) {
return ethread;
} else {
return nullptr;
}
}

inline void
EThread::free_event(Event *e)
{
ink_assert(!e->in_the_priority_queue && !e->in_the_prot_queue);
e->mutex = nullptr;
EVENT_FREE(e, eventAllocator, this);
}

extern int thread_max_heartbeat_mseconds;
20 changes: 18 additions & 2 deletions include/iocore/eventsystem/Event.h
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,18 @@ class Event : public Action

// Private

Event();
Event() : in_the_prot_queue(false), in_the_priority_queue(false), immediate(false), globally_allocated(true), in_heap(false) {}

Event *init(Continuation *c, ink_hrtime atimeout_at = 0, ink_hrtime aperiod = 0);
Event *
init(Continuation *c, ink_hrtime atimeout_at = 0, ink_hrtime aperiod = 0)
{
continuation = c;
timeout_at = atimeout_at;
period = aperiod;
immediate = !period && !atimeout_at;
cancelled = false;
return this;
}

#ifdef ENABLE_TIME_TRACE
ink_hrtime start_time;
Expand Down Expand Up @@ -282,6 +291,13 @@ class Event : public Action
//
extern ClassAllocator<Event> eventAllocator;

inline void
Event::free()
{
mutex = nullptr;
eventAllocator.free(this);
}

#define EVENT_ALLOC(_a, _t) THREAD_ALLOC(_a, _t)
#define EVENT_FREE(_p, _a, _t) \
_p->mutex = nullptr; \
Expand Down
8 changes: 4 additions & 4 deletions include/iocore/eventsystem/Freer.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ template <class C> struct DeleterContinuation : public Continuation {
// 1. Make sure to schedule a delete on an ET_TASK thread
// 2. Delay the delete (this should be used sparingly)
template <class C>
TS_INLINE void
void
new_Deleter(C *ap, ink_hrtime t)
{
if (t > 0) {
Expand All @@ -78,7 +78,7 @@ template <class C> struct FreeCallContinuation : public Continuation {
};

template <class C>
TS_INLINE void
void
new_FreeCaller(C *ap, ink_hrtime t)
{
eventProcessor.schedule_in(new FreeCallContinuation<C>(ap), t, ET_TASK);
Expand All @@ -103,7 +103,7 @@ struct FreerContinuation : public Continuation {
explicit FreerContinuation(void *ap) : Continuation(nullptr), p(ap) { SET_HANDLER(&FreerContinuation::dieEvent); }
};

TS_INLINE void
inline void
new_Freer(void *ap, ink_hrtime t)
{
eventProcessor.schedule_in(new FreerContinuation(ap), t, ET_TASK);
Expand All @@ -128,7 +128,7 @@ template <class C> struct DereferContinuation : public Continuation {
};

template <class C>
TS_INLINE void
void
new_Derefer(C *ap, ink_hrtime t)
{
eventProcessor.schedule_in(new DereferContinuation<C>(ap), t, ET_TASK);
Expand Down
41 changes: 41 additions & 0 deletions include/iocore/eventsystem/IOBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -1506,3 +1506,44 @@ IOBufferChain::iterator::operator->() const
{
return _b;
}

//////////////////////////////////////////////////////////////
//
// returns 0 for DEFAULT_BUFFER_BASE_SIZE,
// +1 for each power of 2
//
//////////////////////////////////////////////////////////////
inline int64_t
buffer_size_to_index(int64_t size, int64_t max)
{
int64_t r = max;

while (r && BUFFER_SIZE_FOR_INDEX(r - 1) >= size) {
r--;
}
return r;
}

inline int64_t
iobuffer_size_to_index(int64_t size, int64_t max)
{
if (size > BUFFER_SIZE_FOR_INDEX(max)) {
return BUFFER_SIZE_INDEX_FOR_XMALLOC_SIZE(size);
}
return buffer_size_to_index(size, max);
}

inline int64_t
index_to_buffer_size(int64_t idx)
{
if (BUFFER_SIZE_INDEX_IS_FAST_ALLOCATED(idx)) {
return BUFFER_SIZE_FOR_INDEX(idx);
} else if (BUFFER_SIZE_INDEX_IS_XMALLOCED(idx)) {
return BUFFER_SIZE_FOR_XMALLOC(idx);
// coverity[dead_error_condition]
} else if (BUFFER_SIZE_INDEX_IS_CONSTANT(idx)) {
return BUFFER_SIZE_FOR_CONSTANT(idx);
}
// coverity[dead_error_line]
return 0;
}
50 changes: 50 additions & 0 deletions include/iocore/eventsystem/ProtectedQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,53 @@ struct ProtectedQueue {

ProtectedQueue();
};

inline ProtectedQueue::ProtectedQueue()
{
Event e;
ink_mutex_init(&lock);
ink_atomiclist_init(&al, "ProtectedQueue", (char *)&e.link.next - (char *)&e);
ink_cond_init(&might_have_data);
}

inline void
ProtectedQueue::signal()
{
// Need to get the lock before you can signal the thread
ink_mutex_acquire(&lock);
ink_cond_signal(&might_have_data);
ink_mutex_release(&lock);
}

inline int
ProtectedQueue::try_signal()
{
// Need to get the lock before you can signal the thread
if (ink_mutex_try_acquire(&lock)) {
ink_cond_signal(&might_have_data);
ink_mutex_release(&lock);
return 1;
} else {
return 0;
}
}

// Called from the same thread (don't need to signal)
inline void
ProtectedQueue::enqueue_local(Event *e)
{
ink_assert(!e->in_the_prot_queue && !e->in_the_priority_queue);
e->in_the_prot_queue = 1;
localQueue.enqueue(e);
}

inline Event *
ProtectedQueue::dequeue_local()
{
Event *e = localQueue.dequeue();
if (e) {
ink_assert(e->in_the_prot_queue);
e->in_the_prot_queue = 0;
}
return e;
}
12 changes: 10 additions & 2 deletions include/iocore/eventsystem/Thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,11 @@ class Thread
*/
Ptr<ProxyMutex> mutex;

virtual void set_specific() = 0;
virtual void
set_specific()
{
this_thread_ptr = this;
}

static thread_local Thread *this_thread_ptr;

Expand Down Expand Up @@ -170,4 +174,8 @@ class Thread
Thread();
};

extern Thread *this_thread();
inline Thread *
this_thread()
{
return Thread::this_thread_ptr;
}
52 changes: 46 additions & 6 deletions include/iocore/eventsystem/VConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ typedef struct tsapi_vio *TSVIO;
class VConnection : public Continuation
{
public:
~VConnection() override;
~VConnection() override {}

/**
Read data from the VConnection.
Expand Down Expand Up @@ -306,17 +306,28 @@ class VConnection : public Continuation
*/
virtual void do_io_shutdown(ShutdownHowTo_t howto) = 0;

explicit VConnection(ProxyMutex *aMutex);
explicit VConnection(Ptr<ProxyMutex> &aMutex);
explicit VConnection(ProxyMutex *aMutex) : Continuation(aMutex), lerrno(0) { SET_HANDLER(nullptr); }
explicit VConnection(Ptr<ProxyMutex> &aMutex) : Continuation(aMutex), lerrno(0) { SET_HANDLER(nullptr); }

// Private
// Set continuation on a given vio. The public interface
// is through VIO::set_continuation()
virtual void set_continuation(VIO *vio, Continuation *cont);
virtual void
set_continuation(VIO *, Continuation *)
{
}

// Reenable a given vio. The public interface is through VIO::reenable
virtual void reenable(VIO *vio);
virtual void reenable_re(VIO *vio);
virtual void
reenable(VIO *)
{
}

virtual void
reenable_re(VIO *vio)
{
reenable(vio);
}

/**
Convenience function to retrieve information from VConnection.
Expand Down Expand Up @@ -411,3 +422,32 @@ struct DummyVConnection : public VConnection, public PluginUserArgs<TS_USER_ARGS

explicit DummyVConnection(ProxyMutex *m) : VConnection(m) {}
};

inline const char *
get_vc_event_name(int event)
{
switch (event) {
default:
return "unknown event";
case VC_EVENT_NONE:
return "VC_EVENT_NONE";
case VC_EVENT_IMMEDIATE:
return "VC_EVENT_IMMEDIATE";
case VC_EVENT_READ_READY:
return "VC_EVENT_READ_READY";
case VC_EVENT_WRITE_READY:
return "VC_EVENT_WRITE_READY";
case VC_EVENT_READ_COMPLETE:
return "VC_EVENT_READ_COMPLETE";
case VC_EVENT_WRITE_COMPLETE:
return "VC_EVENT_WRITE_COMPLETE";
case VC_EVENT_EOS:
return "VC_EVENT_EOS";
case VC_EVENT_ERROR:
return "VC_EVENT_ERROR";
case VC_EVENT_INACTIVITY_TIMEOUT:
return "VC_EVENT_INACTIVITY_TIMEOUT";
case VC_EVENT_ACTIVE_TIMEOUT:
return "VC_EVENT_ACTIVE_TIMEOUT";
}
}
65 changes: 53 additions & 12 deletions include/iocore/eventsystem/VIO.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,21 +56,33 @@ class ProxyMutex;
class VIO
{
public:
explicit VIO(int aop);
VIO();
explicit VIO(int aop) : op(aop), buffer(), mutex(nullptr) {}
VIO() : buffer(), mutex(nullptr) {}
~VIO() {}

/** Interface for the VConnection that owns this handle. */
Continuation *get_continuation() const;
void set_continuation(Continuation *cont);
Continuation *
get_continuation() const
{
return cont;
}
void set_continuation(Continuation *cont);

/**
Set nbytes to be what is current available.

Interface to set nbytes to be ndone + buffer.reader()->read_avail()
if a reader is set.
*/
void done();
void
done()
{
if (buffer.reader()) {
nbytes = ndone + buffer.reader()->read_avail();
} else {
nbytes = ndone;
}
}

/**
Determine the number of bytes remaining.
Expand All @@ -81,15 +93,36 @@ class VIO
@return The number of bytes to be processed by the operation.

*/
int64_t ntodo() const;
int64_t
ntodo() const
{
return nbytes - ndone;
}

/////////////////////
// buffer settings //
/////////////////////
void set_writer(MIOBuffer *writer);
void set_reader(IOBufferReader *reader);
MIOBuffer *get_writer() const;
IOBufferReader *get_reader() const;
void
set_writer(MIOBuffer *writer)
{
buffer.writer_for(writer);
}
void
set_reader(IOBufferReader *reader)
{
buffer.reader_for(reader);
}
MIOBuffer *
get_writer() const
{
return buffer.writer();
}

IOBufferReader *
get_reader() const
{
return (buffer.reader());
}

/**
Reenable the IO operation.
Expand Down Expand Up @@ -125,8 +158,16 @@ class VIO
*/
void reenable_re();

void disable();
bool is_disabled() const;
void
disable()
{
this->_disabled = true;
}
bool
is_disabled() const
{
return this->_disabled;
}

enum {
NONE = 0,
Expand Down
Loading