Skip to content
Snippets Groups Projects
Commit 253522c0 authored by g0dil's avatar g0dil
Browse files

Scheduler: Implement new timer event API

Scheduler: Move task 'name' argument to constructor
Scheduler: Remove obsolete 'enabled_' members
Scheduler: Remove obsolete SchedulerTimer class
parent 49c49f17
No related branches found
No related tags found
No related merge requests found
Showing
with 48 additions and 99 deletions
......@@ -224,8 +224,10 @@ prefix_ senf::console::Client::Client(Server & server, ClientHandle handle)
: out_t(boost::ref(*this)), senf::log::IOStreamTarget(out_t::member), server_ (server),
handle_ (handle),
binding_ (handle, boost::bind(&Client::setNoninteractive,this), Scheduler::EV_READ, false),
timer_ (Scheduler::instance().eventTime() + ClockService::milliseconds(INTERACTIVE_TIMEOUT),
boost::bind(&Client::setInteractive, this), false),
timer_ ("senf::console::Client interactive timeout",
boost::bind(&Client::setInteractive, this),
Scheduler::instance().eventTime() + ClockService::milliseconds(INTERACTIVE_TIMEOUT),
false),
name_ (server.name()), reader_ (), mode_ (server.mode())
{
handle_.facet<senf::TCPSocketProtocol>().nodelay();
......
......@@ -36,7 +36,6 @@
#include "../Socket/ServerSocketHandle.hh"
#include "../Scheduler/Scheduler.hh"
#include "../Scheduler/Binding.hh"
#include "../Scheduler/Timer.hh"
#include "../Scheduler/ReadHelper.hh"
#include "Parse.hh"
#include "Executor.hh"
......@@ -197,7 +196,7 @@ namespace console {
Server & server_;
ClientHandle handle_;
SchedulerBinding binding_;
SchedulerTimer timer_;
scheduler::TimerEvent timer_;
CommandParser parser_;
Executor executor_;
std::string name_;
......
......@@ -42,13 +42,12 @@
prefix_ void senf::ppi::IdleEvent::v_enable()
{
id_ = Scheduler::instance().timeout(manager().now(), boost::bind(&IdleEvent::cb,this));
timer_.timeout(manager().now());
}
prefix_ void senf::ppi::IdleEvent::v_disable()
{
Scheduler::instance().cancelTimeout(id_);
id_ = 0;
timer_.disable();
}
prefix_ void senf::ppi::IdleEvent::cb()
......
......@@ -32,7 +32,7 @@
// senf::ppi::IdleEvent
prefix_ senf::ppi::IdleEvent::IdleEvent()
: id_(0)
: timer_ ("PPI idle event", boost::bind(&IdleEvent::cb,this))
{}
///////////////////////////////cci.e///////////////////////////////////////
......
......@@ -56,7 +56,7 @@ namespace ppi {
void cb();
unsigned id_;
scheduler::TimerEvent timer_;
};
}}
......
......@@ -49,14 +49,13 @@ prefix_ void senf::ppi::IntervalTimer::v_enable()
prefix_ void senf::ppi::IntervalTimer::v_disable()
{
Scheduler::instance().cancelTimeout(id_);
id_ = 0;
timer_.disable();
}
prefix_ void senf::ppi::IntervalTimer::schedule()
{
info_.expected = info_.intervalStart + ( interval_ * (info_.number+1) ) / eventsPerInterval_;
id_ = Scheduler::instance().timeout(info_.expected, boost::bind(&IntervalTimer::cb,this));
timer_.timeout(info_.expected);
}
prefix_ void senf::ppi::IntervalTimer::cb()
......
......@@ -33,7 +33,8 @@
prefix_ senf::ppi::IntervalTimer::IntervalTimer(ClockService::clock_type interval,
unsigned eventsPerInterval)
: interval_ (interval), eventsPerInterval_ (eventsPerInterval)
: interval_ (interval), eventsPerInterval_ (eventsPerInterval),
timer_("PPI interval timer", boost::bind(&IntervalTimer::cb,this))
{}
///////////////////////////////cci.e///////////////////////////////////////
......
......@@ -82,7 +82,7 @@ namespace ppi {
ClockService::clock_type interval_;
unsigned eventsPerInterval_;
IntervalTimerEventInfo info_;
unsigned id_;
scheduler::TimerEvent timer_;
};
}}
......
......@@ -84,8 +84,9 @@ BOOST_AUTO_UNIT_TEST(activeSocketSink)
senf::UDPv4ClientSocketHandle inputSocket;
inputSocket.bind(senf::INet4SocketAddress("localhost:44344"));
senf::Scheduler::instance().timeout(
senf::ClockService::now() + senf::ClockService::milliseconds(100), &timeout);
senf::scheduler::TimerEvent timer (
"activeSocketSink test timer", &timeout,
senf::ClockService::now() + senf::ClockService::milliseconds(100));
source.submit(p);
senf::ppi::run();
......
......@@ -64,8 +64,9 @@ BOOST_AUTO_UNIT_TEST(socketSource)
senf::UDPv4ClientSocketHandle outputSocket;
outputSocket.writeto(senf::INet4SocketAddress("localhost:44344"),data);
senf::Scheduler::instance().timeout(
senf::ClockService::now() + senf::ClockService::milliseconds(100), &timeout);
senf::scheduler::TimerEvent timer (
"socketSource test timer", &timeout,
senf::ClockService::now() + senf::ClockService::milliseconds(100));
senf::ppi::run();
BOOST_REQUIRE( ! sink.empty() );
......
......@@ -101,6 +101,7 @@ namespace {
struct NullTask
: public senf::scheduler::FIFORunner::TaskInfo
{
NullTask() : senf::scheduler::FIFORunner::TaskInfo ("<null>") {}
void run() {};
};
}
......
......@@ -34,8 +34,8 @@
#define prefix_ inline
///////////////////////////////cci.p///////////////////////////////////////
prefix_ senf::scheduler::FIFORunner::TaskInfo::TaskInfo()
: runnable (false)
prefix_ senf::scheduler::FIFORunner::TaskInfo::TaskInfo(std::string const & name_)
: runnable (false), name (name_)
{}
prefix_ senf::scheduler::FIFORunner::TaskInfo::~TaskInfo()
......
......@@ -73,7 +73,7 @@ namespace scheduler {
struct TaskInfo
: public TaskListBase
{
TaskInfo();
explicit TaskInfo(std::string const & name_);
virtual ~TaskInfo();
bool runnable; ///< Runnable flag
......
......@@ -54,25 +54,19 @@ prefix_ bool senf::scheduler::FdDispatcher::add(std::string const & name, int fd
FdMap::iterator i (fds_.find(fd));
if (i == fds_.end()) {
i = fds_.insert(std::make_pair(fd, FdEvent())).first;
i = fds_.insert(std::make_pair(fd, FdEvent(name))).first;
runner_.enqueue(static_cast<FdEvent::ReadTask*>(&i->second));
runner_.enqueue(static_cast<FdEvent::PrioTask*>(&i->second));
runner_.enqueue(static_cast<FdEvent::WriteTask*>(&i->second));
}
FdEvent & event (i->second);
if (events & EV_READ) {
if (events & EV_READ)
event.FdEvent::ReadTask::cb = cb;
event.FdEvent::ReadTask::name = name;
}
if (events & EV_PRIO) {
if (events & EV_PRIO)
event.FdEvent::PrioTask::cb = cb;
event.FdEvent::PrioTask::name = name;
}
if (events & EV_WRITE) {
if (events & EV_WRITE)
event.FdEvent::WriteTask::cb = cb;
event.FdEvent::WriteTask::name = name;
}
if (! manager_.set(fd, event.activeEvents(), &event)) {
runner_.dequeue(static_cast<FdEvent::ReadTask*>(&i->second));
......@@ -95,18 +89,12 @@ prefix_ void senf::scheduler::FdDispatcher::remove(int fd, int events)
return;
FdEvent & event (i->second);
if (events & EV_READ) {
if (events & EV_READ)
event.FdEvent::ReadTask::cb = 0;
event.FdEvent::ReadTask::name.clear();
}
if (events & EV_PRIO) {
if (events & EV_PRIO)
event.FdEvent::PrioTask::cb = 0;
event.FdEvent::PrioTask::name.clear();
}
if (events & EV_WRITE) {
if (events & EV_WRITE)
event.FdEvent::WriteTask::cb = 0;
event.FdEvent::WriteTask::name.clear();
}
int activeEvents (event.activeEvents());
if (! activeEvents) {
......
......@@ -97,6 +97,9 @@ namespace scheduler {
typedef detail::FdTask<1, FdEvent> PrioTask;
typedef detail::FdTask<2, FdEvent> WriteTask;
explicit FdEvent(std::string const & name)
: ReadTask (name), PrioTask (name), WriteTask (name) {}
virtual void signal(int events);
int activeEvents() const;
int events;
......
......@@ -43,6 +43,9 @@ namespace detail {
struct FdTask
: public FIFORunner::TaskInfo
{
explicit FdTask(std::string const & name)
: FIFORunner::TaskInfo (name) {}
typedef boost::function<void (int)> Callback;
virtual void run();
Self & self();
......
......@@ -55,20 +55,16 @@ prefix_ void senf::scheduler::FileDispatcher::add(std::string const & name, int
FileMap::iterator i (files_.find(fd));
if (i == files_.end()) {
i = files_.insert(std::make_pair(fd, FileEvent())).first;
i = files_.insert(std::make_pair(fd, FileEvent(name))).first;
runner_.enqueue(static_cast<FileEvent::ReadTask*>(&i->second));
runner_.enqueue(static_cast<FileEvent::WriteTask*>(&i->second));
}
FileEvent & event (i->second);
if (events & EV_READ) {
if (events & EV_READ)
event.FileEvent::ReadTask::cb = cb;
event.FileEvent::ReadTask::name = name;
}
if (events & EV_WRITE) {
if (events & EV_WRITE)
event.FileEvent::WriteTask::cb = cb;
event.FileEvent::WriteTask::name = name;
}
manager_.timeout(0);
}
......
......@@ -109,6 +109,9 @@ namespace scheduler {
typedef detail::FdTask<0, FileEvent> ReadTask;
typedef detail::FdTask<1, FileEvent> WriteTask;
explicit FileEvent(std::string const & name)
: ReadTask (name), WriteTask (name) {}
int activeEvents() const;
int events;
};
......
......@@ -44,12 +44,12 @@ prefix_ void senf::Scheduler::process()
{
terminate_ = false;
while(! terminate_ && ! (fdDispatcher_.empty() &&
timerDispatcher_.empty() &&
scheduler::detail::TimerDispatcher::instance().empty() &&
fileDispatcher_.empty())) {
scheduler::detail::SignalDispatcher::instance().unblockSignals();
timerDispatcher_.unblockSignals();
scheduler::detail::TimerDispatcher::instance().unblockSignals();
scheduler::FdManager::instance().processOnce();
timerDispatcher_.blockSignals();
scheduler::detail::TimerDispatcher::instance().blockSignals();
scheduler::detail::SignalDispatcher::instance().blockSignals();
fileDispatcher_.prepareRun();
scheduler::FIFORunner::instance().run();
......@@ -61,7 +61,7 @@ prefix_ void senf::Scheduler::restart()
scheduler::FdManager* fdm (&scheduler::FdManager::instance());
scheduler::FIFORunner* ffr (&scheduler::FIFORunner::instance());
scheduler::FdDispatcher* fdd (&fdDispatcher_);
scheduler::TimerDispatcher* td (&timerDispatcher_);
scheduler::detail::TimerDispatcher* td (&scheduler::detail::TimerDispatcher::instance());
scheduler::detail::SignalDispatcher* sd (&scheduler::detail::SignalDispatcher::instance());
scheduler::FileDispatcher* fld (&fileDispatcher_);
......@@ -75,7 +75,7 @@ prefix_ void senf::Scheduler::restart()
new (fdm) scheduler::FdManager();
new (ffr) scheduler::FIFORunner();
new (fdd) scheduler::FdDispatcher(*fdm, *ffr);
new (td) scheduler::TimerDispatcher(*fdm, *ffr);
new (td) scheduler::detail::TimerDispatcher();
new (sd) scheduler::detail::SignalDispatcher();
new (fld) scheduler::FileDispatcher(*fdm, *ffr);
}
......
......@@ -67,52 +67,6 @@ prefix_ int senf::retrieve_filehandle(int fd)
return fd;
}
prefix_ senf::Scheduler::timer_id senf::Scheduler::timeout(ClockService::clock_type timeout,
SimpleCallback const & cb)
{
return timerDispatcher_.add("<anon timer>", timeout, cb);
}
prefix_ senf::Scheduler::timer_id senf::Scheduler::timeout(std::string const & name,
ClockService::clock_type timeout,
SimpleCallback const & cb)
{
return timerDispatcher_.add(name, timeout, cb);
}
prefix_ void senf::Scheduler::cancelTimeout(timer_id id)
{
timerDispatcher_.remove(id);
}
prefix_ senf::ClockService::clock_type senf::Scheduler::timeoutEarly()
const
{
SENF_LOG( (senf::log::IMPORTANT)
("timeoutEarly() is deprecated and a no-op. It will be removed") );
return 0;
}
prefix_ void senf::Scheduler::timeoutEarly(ClockService::clock_type v)
{
SENF_LOG( (senf::log::IMPORTANT)
("timeoutEarly() is deprecated and a no-op. It will be removed") );
}
prefix_ senf::ClockService::clock_type senf::Scheduler::timeoutAdjust()
const
{
SENF_LOG( (senf::log::IMPORTANT)
("timeoutAdjust() is deprecated and a no-op. It will be removed") );
return 0;
}
prefix_ void senf::Scheduler::timeoutAdjust(ClockService::clock_type v)
{
SENF_LOG( (senf::log::IMPORTANT)
("timeoutAdjust() is deprecated and a no-op. It will be removed") );
}
prefix_ void senf::Scheduler::terminate()
{
terminate_ = true;
......@@ -144,7 +98,6 @@ prefix_ unsigned senf::Scheduler::hangCount()
prefix_ senf::Scheduler::Scheduler()
: terminate_ (false),
fdDispatcher_ (scheduler::FdManager::instance(), scheduler::FIFORunner::instance()),
timerDispatcher_ (scheduler::FdManager::instance(), scheduler::FIFORunner::instance()),
fileDispatcher_ (scheduler::FdManager::instance(), scheduler::FIFORunner::instance())
{}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment