diff --git a/Scheduler/Scheduler.cc b/Scheduler/Scheduler.cc index 5e8a860835719e214a45f0723cb5291b34f7f3e8..4f39c4ba7e2429cfdf843d0828651a08127ee3f8 100644 --- a/Scheduler/Scheduler.cc +++ b/Scheduler/Scheduler.cc @@ -71,6 +71,7 @@ #include <errno.h> #include <sys/epoll.h> #include "Utils/Exception.hh" +#include "Utils/MicroTime.hh" static const int EPollInitialSize = 16; @@ -83,6 +84,11 @@ prefix_ satcom::lib::Scheduler::Scheduler & satcom::lib::Scheduler::instance() return instance; } +prefix_ void satcom::lib::Scheduler::timeout(unsigned long timeout, TimerCallback const & cb) +{ + timerQueue_.push(TimerSpec(now()+1000*timeout,cb)); +} + prefix_ satcom::lib::Scheduler::Scheduler() : epollFd_(epoll_create(EPollInitialSize)) { @@ -90,7 +96,7 @@ prefix_ satcom::lib::Scheduler::Scheduler() throw SystemException(errno); } -prefix_ void satcom::lib::Scheduler::do_add(int fd, InternalCallback const & cb, EventId eventMask) +prefix_ void satcom::lib::Scheduler::do_add(int fd, SimpleCallback const & cb, EventId eventMask) { FdTable::iterator i (fdTable_.find(fd)); int action (EPOLL_CTL_MOD); @@ -141,6 +147,7 @@ prefix_ void satcom::lib::Scheduler::do_remove(int fd, EventId eventMask) throw SystemException(errno); } + prefix_ int satcom::lib::Scheduler::EventSpec::epollMask() const { @@ -158,11 +165,21 @@ prefix_ void satcom::lib::Scheduler::process() terminate_ = false; while (! terminate_) { struct epoll_event ev; - int events = epoll_wait(epollFd_, &ev, 1, 1000); + MicroTime timeNow = now(); + while ( ! timerQueue_.empty() && timerQueue_.top().timeout <= timeNow ) { + timerQueue_.top().cb(); + timerQueue_.pop(); + } + if (terminate_) + return; + int timeout = timerQueue_.empty() ? -1 : int((timerQueue_.top().timeout - timeNow)/1000); + + int events = epoll_wait(epollFd_, &ev, 1, timeout); if (events<0) // Hmm ... man epoll says, it will NOT return with EINTR ?? throw SystemException(errno); if (events==0) + // Timeout .. it will be run when reachiung the top of the loop continue; FdTable::iterator i = fdTable_.find(ev.data.fd); diff --git a/Scheduler/Scheduler.hh b/Scheduler/Scheduler.hh index 8916723181261819df9eb2e84d03feb1003216b3..a535fad11610748518a2ca4f71590c6a7da92a6c 100644 --- a/Scheduler/Scheduler.hh +++ b/Scheduler/Scheduler.hh @@ -25,10 +25,13 @@ // Custom includes #include <map> +#include <queue> #include <boost/function.hpp> #include <boost/utility.hpp> #include <boost/call_traits.hpp> +#include "Utils/MicroTime.hh" + //#include "scheduler.mpp" ///////////////////////////////hh.p//////////////////////////////////////// @@ -59,6 +62,8 @@ namespace lib { typedef boost::function<void (typename boost::call_traits<Handle>::param_type, EventId) > Callback; }; + typedef boost::function<void (EventId)> SimpleCallback; + typedef boost::function<void ()> TimerCallback; /////////////////////////////////////////////////////////////////////////// ///\name Structors and default members @@ -78,38 +83,52 @@ namespace lib { template <class Handle> void add(Handle const & handle, typename GenericCallback<Handle>::Callback const & cb, - EventId eventMask = EV_ALL); - template <class Handle> + EventId eventMask = EV_ALL); + template <class Handle> void remove(Handle const & handle, EventId eventMask = EV_ALL); - void process(); + void timeout(unsigned long timeout, TimerCallback const & cb); + void process(); void terminate(); protected: private: Scheduler(); - - typedef boost::function<void (EventId)> InternalCallback; - - void do_add(int fd, InternalCallback const & cb, EventId eventMask = EV_ALL); + + void do_add(int fd, SimpleCallback const & cb, EventId eventMask = EV_ALL); void do_remove(int fd, EventId eventMask = EV_ALL); - + struct EventSpec { - InternalCallback cb_read; - InternalCallback cb_prio; - InternalCallback cb_write; - InternalCallback cb_hup; - InternalCallback cb_err; + SimpleCallback cb_read; + SimpleCallback cb_prio; + SimpleCallback cb_write; + SimpleCallback cb_hup; + SimpleCallback cb_err; int epollMask() const; }; + + struct TimerSpec + { + TimerSpec() : timeout(), cb() {} + TimerSpec(unsigned long long timeout_, TimerCallback cb_) + : timeout(timeout_), cb(cb_) {} + + bool operator< (TimerSpec const & other) const + { return timeout > other.timeout; } + + unsigned long long timeout; + TimerCallback cb; + }; typedef std::map<int,EventSpec> FdTable; + typedef std::priority_queue<TimerSpec> TimerQueue; FdTable fdTable_; + TimerQueue timerQueue_; int epollFd_; bool terminate_; }; diff --git a/Scheduler/Scheduler.test.cc b/Scheduler/Scheduler.test.cc index fb5ae3590bfed5b5a81bae7ab2825137f7031ba2..26dd7fd50add55252ba6b7c3f1427036ca0aee57 100644 --- a/Scheduler/Scheduler.test.cc +++ b/Scheduler/Scheduler.test.cc @@ -163,6 +163,11 @@ namespace { } Scheduler::instance().terminate(); } + + void timeout() + { + Scheduler::instance().terminate(); + } struct HandleWrapper { @@ -182,6 +187,12 @@ namespace { return; callback(handle.fd_,event); } + + bool is_close(MicroTime a, MicroTime b) + { + return (a<b ? b-a : a-b) < 1100; + } + } BOOST_AUTO_UNIT_TEST(scheduler) @@ -216,6 +227,14 @@ BOOST_AUTO_UNIT_TEST(scheduler) buffer[size]=0; BOOST_CHECK_EQUAL( buffer, "READ" ); + BOOST_CHECK_NO_THROW( Scheduler::instance().timeout(100,&timeout) ); + BOOST_CHECK_NO_THROW( Scheduler::instance().timeout(200,&timeout) ); + MicroTime t (now()); + BOOST_CHECK_NO_THROW( Scheduler::instance().process() ); + BOOST_CHECK_PREDICATE( is_close, (now()) (t+100*1000) ); + BOOST_CHECK_NO_THROW( Scheduler::instance().process() ); + BOOST_CHECK_PREDICATE( is_close, (now()) (t+200*1000) ); + HandleWrapper handle(sock,"TheTag"); BOOST_CHECK_NO_THROW( Scheduler::instance().add(handle,&handleCallback,Scheduler::EV_WRITE) ); strcpy(buffer,"WRITE"); diff --git a/Utils/MicroTime.cc b/Utils/MicroTime.cc new file mode 100644 index 0000000000000000000000000000000000000000..40e59c317f81da5b0473f301f0cc800d9298f7bc --- /dev/null +++ b/Utils/MicroTime.cc @@ -0,0 +1,37 @@ +// $Id$ +// +// Copyright (C) 2006 + +// Definition of non-inline non-template functions + +#include "MicroTime.hh" +//#include "MicroTime.ih" + +// Custom includes +#include <sys/time.h> +#include <time.h> +#include <errno.h> + +#include "Utils/Exception.hh" + +//#include "MicroTime.mpp" +#define prefix_ +///////////////////////////////cc.p//////////////////////////////////////// + +prefix_ satcom::lib::MicroTime satcom::lib::now() +{ + struct timeval tv; + if (gettimeofday(&tv,0) < 0) + throw SystemException(errno); + return 1000000*MicroTime(tv.tv_sec) + tv.tv_usec; +} + + +///////////////////////////////cc.e//////////////////////////////////////// +#undef prefix_ +//#include "MicroTime.mpp" + + +// Local Variables: +// mode: c++ +// End: diff --git a/Utils/MicroTime.hh b/Utils/MicroTime.hh new file mode 100644 index 0000000000000000000000000000000000000000..f4fc8cafa8234e3b1677f510d37fa3eb36e42d69 --- /dev/null +++ b/Utils/MicroTime.hh @@ -0,0 +1,33 @@ +// $Id$ +// +// Copyright (C) 2006 + +#ifndef HH_MicroTime_ +#define HH_MicroTime_ 1 + +// Custom includes +#include <boost/cstdint.hpp> + +//#include "MicroTime.mpp" +///////////////////////////////hh.p//////////////////////////////////////// + +namespace satcom { +namespace lib { + + typedef boost::uint64_t MicroTime; + + MicroTime now(); + +}} + +///////////////////////////////hh.e//////////////////////////////////////// +//#include "MicroTime.cci" +//#include "MicroTime.ct" +//#include "MicroTime.cti" +//#include "MicroTime.mpp" +#endif + + +// Local Variables: +// mode: c++ +// End: diff --git a/Utils/MicroTime.test.cc b/Utils/MicroTime.test.cc new file mode 100644 index 0000000000000000000000000000000000000000..93dfd544d5b33689279a4f3c353f85b98027f4f8 --- /dev/null +++ b/Utils/MicroTime.test.cc @@ -0,0 +1,30 @@ +// $Id$ +// +// Copyright (C) 2006 + +// Unit tests + +//#include "MicroTime.test.hh" +//#include "MicroTime.test.ih" + +// Custom includes +#include "MicroTime.hh" + +#include <boost/test/auto_unit_test.hpp> +#include <boost/test/test_tools.hpp> + +#define prefix_ +///////////////////////////////cc.p//////////////////////////////////////// + +BOOST_AUTO_UNIT_TEST(microTime) +{ + BOOST_CHECK_NO_THROW( satcom::lib::now() ); +} + +///////////////////////////////cc.e//////////////////////////////////////// +#undef prefix_ + + +// Local Variables: +// mode: c++ +// End: