OpenDNSSEC-signer  1.4.10
worker.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2009 NLNet Labs. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions
6  * are met:
7  * 1. Redistributions of source code must retain the above copyright
8  * notice, this list of conditions and the following disclaimer.
9  * 2. Redistributions in binary form must reproduce the above copyright
10  * notice, this list of conditions and the following disclaimer in the
11  * documentation and/or other materials provided with the distribution.
12  *
13  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
14  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
15  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
16  * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
17  * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
18  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE
19  * GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
20  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
21  * IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
22  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN
23  * IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
24  *
25  */
26 
32 #include "daemon/engine.h"
33 #include "daemon/worker.h"
34 #include "shared/allocator.h"
35 #include "shared/duration.h"
36 #include "shared/hsm.h"
37 #include "shared/locks.h"
38 #include "shared/log.h"
39 #include "shared/status.h"
40 #include "signer/tools.h"
41 #include "signer/zone.h"
42 
43 #include <time.h> /* time() */
44 
46  { WORKER_WORKER, "worker" },
47  { WORKER_DRUDGER, "drudger" },
48  { 0, NULL }
49 };
50 
51 
56 static const char*
57 worker2str(worker_id type)
58 {
59  ods_lookup_table *lt = ods_lookup_by_id(worker_str, type);
60  if (lt) {
61  return lt->name;
62  }
63  return NULL;
64 }
65 
66 
72 worker_create(allocator_type* allocator, int num, worker_id type)
73 {
74  worker_type* worker;
75  if (!allocator) {
76  return NULL;
77  }
78  worker = (worker_type*) allocator_alloc(allocator, sizeof(worker_type));
79  if (!worker) {
80  return NULL;
81  }
82  ods_log_debug("[%s[%i]] create", worker2str(type), num+1);
83  lock_basic_init(&worker->worker_lock);
84  lock_basic_set(&worker->worker_alarm);
85  lock_basic_lock(&worker->worker_lock);
86  worker->allocator = allocator;
87  worker->thread_num = num +1;
88  worker->engine = NULL;
89  worker->task = NULL;
90  worker->working_with = TASK_NONE;
91  worker->need_to_exit = 0;
92  worker->type = type;
93  worker->clock_in = 0;
94  worker->jobs_appointed = 0;
95  worker->jobs_completed = 0;
96  worker->jobs_failed = 0;
97  worker->sleeping = 0;
98  worker->waiting = 0;
100  return worker;
101 }
102 
103 
108 static void
109 worker_working_with(worker_type* worker, task_id with, task_id next,
110  const char* str, const char* name, task_id* what, time_t* when)
111 {
112  worker->working_with = with;
113  ods_log_verbose("[%s[%i]] %s zone %s", worker2str(worker->type),
114  worker->thread_num, str, name);
115  *what = next;
116  *when = time_now();
117  return;
118 }
119 
120 
125 static int
126 worker_fulfilled(worker_type* worker)
127 {
128  int ret = 0;
129  ret = (worker->jobs_completed + worker->jobs_failed) ==
130  worker->jobs_appointed;
131  return ret;
132 }
133 
134 
139 static void
140 worker_clear_jobs(worker_type* worker)
141 {
142  ods_log_assert(worker);
143  lock_basic_lock(&worker->worker_lock);
144  worker->jobs_appointed = 0;
145  worker->jobs_completed = 0;
146  worker->jobs_failed = 0;
147  lock_basic_unlock(&worker->worker_lock);
148  return;
149 }
150 
151 
156 static void
157 worker_queue_rrset(worker_type* worker, fifoq_type* q, rrset_type* rrset)
158 {
160  int tries = 0;
161  ods_log_assert(worker);
162  ods_log_assert(worker->task);
163  ods_log_assert(q);
164  ods_log_assert(rrset);
165 
166  lock_basic_lock(&q->q_lock);
167  status = fifoq_push(q, (void*) rrset, worker, &tries);
168  while (status == ODS_STATUS_UNCHANGED) {
169  tries++;
170  if (worker->need_to_exit) {
172  return;
173  }
180  lock_basic_sleep(&q->q_nonfull, &q->q_lock, 5);
181  status = fifoq_push(q, (void*) rrset, worker, &tries);
182  }
184 
185  ods_log_assert(status == ODS_STATUS_OK);
186  lock_basic_lock(&worker->worker_lock);
187  worker->jobs_appointed += 1;
188  lock_basic_unlock(&worker->worker_lock);
189  return;
190 }
191 
192 
197 static void
198 worker_queue_domain(worker_type* worker, fifoq_type* q, domain_type* domain)
199 {
200  rrset_type* rrset = NULL;
201  denial_type* denial = NULL;
202  ods_log_assert(worker);
203  ods_log_assert(q);
204  ods_log_assert(domain);
205  rrset = domain->rrsets;
206  while (rrset) {
207  worker_queue_rrset(worker, q, rrset);
208  rrset = rrset->next;
209  }
210  denial = (denial_type*) domain->denial;
211  if (denial && denial->rrset) {
212  worker_queue_rrset(worker, q, denial->rrset);
213  }
214  return;
215 }
216 
217 
222 static void
223 worker_queue_zone(worker_type* worker, fifoq_type* q, zone_type* zone)
224 {
225  ldns_rbnode_t* node = LDNS_RBTREE_NULL;
226  domain_type* domain = NULL;
227  ods_log_assert(worker);
228  ods_log_assert(q);
229  ods_log_assert(zone);
230  worker_clear_jobs(worker);
231  if (!zone->db || !zone->db->domains) {
232  return;
233  }
234  if (zone->db->domains->root != LDNS_RBTREE_NULL) {
235  node = ldns_rbtree_first(zone->db->domains);
236  }
237  while (node && node != LDNS_RBTREE_NULL) {
238  domain = (domain_type*) node->data;
239  worker_queue_domain(worker, q, domain);
240  node = ldns_rbtree_next(node);
241  }
242  return;
243 }
244 
245 
250 static ods_status
251 worker_check_jobs(worker_type* worker, task_type* task)
252 {
253  ods_log_assert(worker);
254  ods_log_assert(task);
255  lock_basic_lock(&worker->worker_lock);
256  if (worker->jobs_failed) {
257  ods_log_error("[%s[%i]] sign zone %s failed: %u RRsets failed",
258  worker2str(worker->type), worker->thread_num,
259  task_who2str(task), worker->jobs_failed);
260  lock_basic_unlock(&worker->worker_lock);
261  return ODS_STATUS_ERR;
262  } else if (worker->jobs_completed != worker->jobs_appointed) {
263  ods_log_error("[%s[%i]] sign zone %s failed: processed %u of %u "
264  "RRsets", worker2str(worker->type), worker->thread_num,
265  task_who2str(task), worker->jobs_completed,
266  worker->jobs_appointed);
267  lock_basic_unlock(&worker->worker_lock);
268  return ODS_STATUS_ERR;
269  } else if (worker->need_to_exit) {
270  ods_log_debug("[%s[%i]] sign zone %s failed: worker needs to exit",
271  worker2str(worker->type), worker->thread_num, task_who2str(task));
272  lock_basic_unlock(&worker->worker_lock);
273  return ODS_STATUS_ERR;
274  } else {
275  ods_log_debug("[%s[%i]] sign zone %s ok: %u of %u RRsets "
276  "succeeded", worker2str(worker->type), worker->thread_num,
277  task_who2str(task), worker->jobs_completed,
278  worker->jobs_appointed);
279  ods_log_assert(worker->jobs_appointed == worker->jobs_completed);
280  }
281  lock_basic_unlock(&worker->worker_lock);
282  return ODS_STATUS_OK;
283 }
284 
285 
290 static void
291 worker_perform_task(worker_type* worker)
292 {
293  engine_type* engine = NULL;
294  zone_type* zone = NULL;
295  task_type* task = NULL;
296  task_id what = TASK_NONE;
297  time_t when = 0;
298  time_t never = (3600*24*365);
299  ods_status status = ODS_STATUS_OK;
300  int backup = 0;
301  time_t start = 0;
302  time_t end = 0;
303 
304  if (!worker || !worker->task || !worker->task->zone || !worker->engine) {
305  return;
306  }
307  engine = (engine_type*) worker->engine;
308  task = (task_type*) worker->task;
309  zone = (zone_type*) worker->task->zone;
310  ods_log_debug("[%s[%i]] perform task %s for zone %s at %u",
311  worker2str(worker->type), worker->thread_num, task_what2str(task->what),
312  task_who2str(task), (uint32_t) worker->clock_in);
313  /* do what you have been told to do */
314  switch (task->what) {
315  case TASK_SIGNCONF:
316  /* perform 'load signconf' task */
317  worker_working_with(worker, TASK_SIGNCONF, TASK_READ,
318  "configure", task_who2str(task), &what, &when);
319  status = tools_signconf(zone);
320  if (status == ODS_STATUS_UNCHANGED) {
321  if (!zone->signconf->last_modified) {
322  ods_log_debug("[%s[%i]] no signconf.xml for zone %s yet",
323  worker2str(worker->type), worker->thread_num,
324  task_who2str(task));
325  status = ODS_STATUS_ERR;
326  }
327  }
328  if (status == ODS_STATUS_UNCHANGED) {
329  if (task->halted != TASK_NONE && task->halted != TASK_SIGNCONF) {
330  goto task_perform_continue;
331  }
332  status = ODS_STATUS_OK;
333  } else if (status == ODS_STATUS_OK) {
334  task->interrupt = TASK_NONE;
335  task->halted = TASK_NONE;
336  } else {
337  if (task->halted == TASK_NONE) {
338  goto task_perform_fail;
339  }
340  goto task_perform_continue;
341  }
342  /* break; */
343  case TASK_READ:
344  /* perform 'read input adapter' task */
345  worker_working_with(worker, TASK_READ, TASK_SIGN,
346  "read", task_who2str(task), &what, &when);
347  task->what = TASK_READ;
348  if (!zone->signconf->last_modified) {
349  ods_log_debug("[%s[%i]] no signconf.xml for zone %s yet",
350  worker2str(worker->type), worker->thread_num,
351  task_who2str(task));
352  status = ODS_STATUS_ERR;
353  } else {
354  if (hsm_check_context()) {
355  ods_log_error("signer instructed to reload due to hsm reset in read task");
356  engine->need_to_reload = 1;
357  status = ODS_STATUS_ERR;
358  } else {
359  status = tools_input(zone);
360  }
361  }
362 
363  if (status == ODS_STATUS_UNCHANGED) {
364  ods_log_verbose("[%s[%i]] zone %s unsigned data not changed, "
365  "continue", worker2str(worker->type), worker->thread_num,
366  task_who2str(task));
367  status = ODS_STATUS_OK;
368  }
369  if (status == ODS_STATUS_OK) {
370  if (task->interrupt > TASK_SIGNCONF) {
371  task->interrupt = TASK_NONE;
372  task->halted = TASK_NONE;
373  }
374  } else {
375  if (task->halted == TASK_NONE) {
376  goto task_perform_fail;
377  }
378  goto task_perform_continue;
379  }
380  /* break; */
381  case TASK_SIGN:
382  /* perform 'sign' task */
383  worker_working_with(worker, TASK_SIGN, TASK_WRITE,
384  "sign", task_who2str(task), &what, &when);
385  task->what = TASK_SIGN;
386  status = zone_update_serial(zone);
387  if (status == ODS_STATUS_OK) {
388  if (task->interrupt > TASK_SIGNCONF) {
389  task->interrupt = TASK_NONE;
390  task->halted = TASK_NONE;
391  }
392  } else {
393  ods_log_error("[%s[%i]] unable to sign zone %s: "
394  "failed to increment serial",
395  worker2str(worker->type), worker->thread_num,
396  task_who2str(task));
397  if (task->halted == TASK_NONE) {
398  goto task_perform_fail;
399  }
400  goto task_perform_continue;
401  }
402 
403  /* start timer */
404  start = time(NULL);
405  if (zone->stats) {
407  if (!zone->stats->start_time) {
408  zone->stats->start_time = start;
409  }
410  zone->stats->sig_count = 0;
411  zone->stats->sig_soa_count = 0;
412  zone->stats->sig_reuse = 0;
413  zone->stats->sig_time = 0;
415  }
416  /* check the HSM connection before queuing sign operations */
417  if (hsm_check_context()) {
418  ods_log_error("signer instructed to reload due to hsm reset in sign task");
419  engine->need_to_reload = 1;
420  goto task_perform_fail;
421  }
422  /* prepare keys */
423  status = zone_prepare_keys(zone);
424  if (status == ODS_STATUS_OK) {
425  /* queue menial, hard signing work */
426  worker_queue_zone(worker, engine->signq, zone);
427  ods_log_deeebug("[%s[%i]] wait until drudgers are finished "
428  "signing zone %s", worker2str(worker->type),
429  worker->thread_num, task_who2str(task));
430  /* sleep until work is done */
431  worker_sleep_unless(worker, 0);
432  }
433  /* stop timer */
434  end = time(NULL);
435  /* check status and jobs */
436  if (status == ODS_STATUS_OK) {
437  status = worker_check_jobs(worker, task);
438  }
439  worker_clear_jobs(worker);
440  if (status == ODS_STATUS_OK && zone->stats) {
442  zone->stats->sig_time = (end-start);
444  }
445  if (status != ODS_STATUS_OK) {
446  if (task->halted == TASK_NONE) {
447  goto task_perform_fail;
448  }
449  goto task_perform_continue;
450  } else {
451  if (task->interrupt > TASK_SIGNCONF) {
452  task->interrupt = TASK_NONE;
453  task->halted = TASK_NONE;
454  }
455  }
456  /* break; */
457  case TASK_WRITE:
458  /* perform 'write to output adapter' task */
459  worker_working_with(worker, TASK_WRITE, TASK_SIGN,
460  "write", task_who2str(task), &what, &when);
461  task->what = TASK_WRITE;
462  status = tools_output(zone, engine);
463  if (status == ODS_STATUS_OK) {
464  if (task->interrupt > TASK_SIGNCONF) {
465  task->interrupt = TASK_NONE;
466  task->halted = TASK_NONE;
467  }
468  } else {
469  /* clear signatures? */
470  if (task->halted == TASK_NONE) {
471  goto task_perform_fail;
472  }
473  goto task_perform_continue;
474  }
475  zone->db->is_processed = 1;
476  if (zone->signconf &&
478  what = TASK_SIGN;
479  when = worker->clock_in +
481  } else {
482  ods_log_error("[%s[%i]] unable to retrieve resign interval "
483  "for zone %s: duration2time() failed",
484  worker2str(worker->type), worker->thread_num,
485  task_who2str(task));
486  ods_log_info("[%s[%i]] defaulting to 1H resign interval for "
487  "zone %s", worker2str(worker->type), worker->thread_num,
488  task_who2str(task));
489  what = TASK_SIGN;
490  when = worker->clock_in + 3600;
491  }
492  backup = 1;
493  break;
494  case TASK_NONE:
495  worker->working_with = TASK_NONE;
496  /* no task */
497  ods_log_warning("[%s[%i]] none task for zone %s",
498  worker2str(worker->type), worker->thread_num,
499  task_who2str(task));
500  when = time_now() + never;
501  break;
502  default:
503  worker->working_with = TASK_NONE;
504  /* unknown task */
505  ods_log_warning("[%s[%i]] unknown task, trying full sign zone %s",
506  worker2str(worker->type), worker->thread_num,
507  task_who2str(task));
508  what = TASK_SIGNCONF;
509  when = time_now();
510  break;
511  }
512  /* no error */
513  task->backoff = 0;
514  if (task->interrupt != TASK_NONE && task->interrupt != what) {
515  ods_log_debug("[%s[%i]] interrupt task %s for zone %s",
516  worker2str(worker->type), worker->thread_num,
517  task_what2str(what), task_who2str(task));
518  task->halted = what;
519  task->halted_when = when;
520  task->what = task->interrupt;
521  task->when = time_now();
522  } else {
523  ods_log_debug("[%s[%i]] next task %s for zone %s",
524  worker2str(worker->type), worker->thread_num,
525  task_what2str(what), task_who2str(task));
526  task->what = what;
527  task->when = when;
528  task->interrupt = TASK_NONE;
529  task->halted = TASK_NONE;
530  task->halted_when = 0;
531  }
532  /* backup the last successful run */
533  if (backup) {
534  status = zone_backup2(zone);
535  if (status != ODS_STATUS_OK) {
536  ods_log_warning("[%s[%i]] unable to backup zone %s: %s",
537  worker2str(worker->type), worker->thread_num,
538  task_who2str(task), ods_status2str(status));
539  /* just a warning */
540  status = ODS_STATUS_OK;
541  }
542  backup = 0;
543  }
544  return;
545 
546 task_perform_fail:
547  if (status != ODS_STATUS_XFR_NOT_READY) {
548  /* other statuses is critical, and we know it is not ODS_STATUS_OK */
549  ods_log_crit("[%s[%i]] CRITICAL: failed to sign zone %s: %s",
550  worker2str(worker->type), worker->thread_num,
551  task_who2str(task), ods_status2str(status));
552  }
553  /* in case of failure, also mark zone processed (for single run usage) */
554  zone->db->is_processed = 1;
555  if (task->backoff) {
556  task->backoff *= 2;
557  } else {
558  task->backoff = 60;
559  }
560  if (task->backoff > ODS_SE_MAX_BACKOFF) {
561  task->backoff = ODS_SE_MAX_BACKOFF;
562  }
563  ods_log_info("[%s[%i]] backoff task %s for zone %s with %u seconds",
564  worker2str(worker->type), worker->thread_num,
565  task_what2str(task->what), task_who2str(task), task->backoff);
566  task->when = time_now() + task->backoff;
567  return;
568 
569 task_perform_continue:
570  ods_log_info("[%s[%i]] continue task %s for zone %s",
571  worker2str(worker->type), worker->thread_num,
572  task_what2str(task->halted), task_who2str(task));
573  task->what = task->halted;
574  task->when = task->halted_when;
575  task->interrupt = TASK_NONE;
576  task->halted = TASK_NONE;
577  task->halted_when = 0;
578  return;
579 }
580 
581 
586 static void
587 worker_work(worker_type* worker)
588 {
589  time_t now = 0;
590  time_t timeout = 1;
591  engine_type* engine = NULL;
592  zone_type* zone = NULL;
593  ods_status status = ODS_STATUS_OK;
594 
595  ods_log_assert(worker);
596  ods_log_assert(worker->type == WORKER_WORKER);
597 
598  engine = (engine_type*) worker->engine;
599  while (worker->need_to_exit == 0) {
600  ods_log_debug("[%s[%i]] report for duty", worker2str(worker->type),
601  worker->thread_num);
602  now = time_now();
604  worker->task = schedule_pop_task(engine->taskq);
605  if (worker->task) {
606  worker->working_with = worker->task->what;
608  zone = (zone_type*) worker->task->zone;
609 
610  lock_basic_lock(&zone->zone_lock);
611  ods_log_debug("[%s[%i]] start working on zone %s",
612  worker2str(worker->type), worker->thread_num, zone->name);
613  worker->clock_in = time(NULL);
614  worker_perform_task(worker);
615  zone->task = worker->task;
616  ods_log_debug("[%s[%i]] finished working on zone %s",
617  worker2str(worker->type), worker->thread_num, zone->name);
618 
620  worker->task = NULL;
621  worker->working_with = TASK_NONE;
622  status = schedule_task(engine->taskq, zone->task, 1);
623  if (status != ODS_STATUS_OK) {
624  ods_log_error("[%s[%i]] unable to schedule task for zone %s: "
625  "%s", worker2str(worker->type), worker->thread_num,
626  zone->name, ods_status2str(status));
627  }
630  timeout = 1;
632  lock_basic_lock(&engine->signal_lock);
633  if (engine->need_to_reload) {
634  lock_basic_alarm(&engine->signal_cond);
635  }
636  lock_basic_unlock(&engine->signal_lock);
637 
638  } else {
639  ods_log_debug("[%s[%i]] nothing to do", worker2str(worker->type),
640  worker->thread_num);
641  worker->task = schedule_get_first_task(engine->taskq);
643  if (worker->task && !engine->taskq->loading) {
644  timeout = (worker->task->when - now);
645  } else {
646  timeout *= 2;
647  }
648  if (timeout > ODS_SE_MAX_BACKOFF) {
649  timeout = ODS_SE_MAX_BACKOFF;
650  }
651  worker->task = NULL;
652  worker_sleep(worker, timeout);
653  }
654  }
655  return;
656 }
657 
658 
663 static void
664 worker_drudge(worker_type* worker)
665 {
666  engine_type* engine = NULL;
667  zone_type* zone = NULL;
668  task_type* task = NULL;
669  rrset_type* rrset = NULL;
670  ods_status status = ODS_STATUS_OK;
671  worker_type* superior = NULL;
672  hsm_ctx_t* ctx = NULL;
673 
674  ods_log_assert(worker);
675  ods_log_assert(worker->engine);
676  ods_log_assert(worker->type == WORKER_DRUDGER);
677 
678  engine = (engine_type*) worker->engine;
679  while (worker->need_to_exit == 0) {
680  ods_log_deeebug("[%s[%i]] report for duty", worker2str(worker->type),
681  worker->thread_num);
682  /* initialize */
683  superior = NULL;
684  zone = NULL;
685  task = NULL;
686  /* get item */
687  lock_basic_lock(&engine->signq->q_lock);
688  rrset = (rrset_type*) fifoq_pop(engine->signq, &superior);
689  if (!rrset) {
690  ods_log_deeebug("[%s[%i]] nothing to do, wait",
691  worker2str(worker->type), worker->thread_num);
699  &engine->signq->q_lock, 0);
700  if(worker->need_to_exit == 0)
701  rrset = (rrset_type*) fifoq_pop(engine->signq, &superior);
702  }
703  lock_basic_unlock(&engine->signq->q_lock);
704  /* do some work */
705  if (rrset) {
706  ods_log_assert(superior);
707  if (!ctx) {
708  ods_log_debug("[%s[%i]] create hsm context",
709  worker2str(worker->type), worker->thread_num);
710  ctx = hsm_create_context();
711  }
712  if (!ctx) {
713  ods_log_crit("[%s[%i]] error creating libhsm context",
714  worker2str(worker->type), worker->thread_num);
715  engine->need_to_reload = 1;
716  ods_log_error("signer instructed to reload due to hsm reset while signing");
717  lock_basic_lock(&superior->worker_lock);
718  superior->jobs_failed++;
719  lock_basic_unlock(&superior->worker_lock);
720  } else {
721  ods_log_assert(ctx);
722  lock_basic_lock(&superior->worker_lock);
723  task = superior->task;
724  ods_log_assert(task);
725  zone = task->zone;
726  lock_basic_unlock(&superior->worker_lock);
727  ods_log_assert(zone);
728  ods_log_assert(zone->apex);
729  ods_log_assert(zone->signconf);
730  worker->clock_in = time(NULL);
731  status = rrset_sign(ctx, rrset, superior->clock_in);
732  lock_basic_lock(&superior->worker_lock);
733  if (status == ODS_STATUS_OK) {
734  superior->jobs_completed++;
735  } else {
736  superior->jobs_failed++;
737  }
738  lock_basic_unlock(&superior->worker_lock);
739  }
740  if (worker_fulfilled(superior) && superior->sleeping) {
741  ods_log_deeebug("[%s[%i]] wake up superior[%u], work is "
742  "done", worker2str(worker->type), worker->thread_num,
743  superior->thread_num);
744  worker_wakeup(superior);
745  }
746  superior = NULL;
747  rrset = NULL;
748  }
749  /* done work */
750  }
751  /* wake up superior */
752  if (superior && superior->sleeping) {
753  ods_log_deeebug("[%s[%i]] wake up superior[%u], i am exiting",
754  worker2str(worker->type), worker->thread_num, superior->thread_num);
755  worker_wakeup(superior);
756  }
757  /* cleanup open HSM sessions */
758  if (ctx) {
759  hsm_destroy_context(ctx);
760  }
761  return;
762 }
763 
764 
769 void
771 {
772  ods_log_assert(worker);
773  switch (worker->type) {
774  case WORKER_DRUDGER:
775  worker_drudge(worker);
776  break;
777  case WORKER_WORKER:
778  worker_work(worker);
779  break;
780  default:
781  ods_log_error("[worker] illegal worker (id=%i)", worker->type);
782  break;
783  }
784  return;
785 }
786 
787 
792 void
793 worker_sleep(worker_type* worker, time_t timeout)
794 {
795  ods_log_assert(worker);
796  if (!worker->need_to_exit) {
797  lock_basic_lock(&worker->worker_lock);
798  worker->sleeping = 1;
799  lock_basic_sleep(&worker->worker_alarm, &worker->worker_lock,
800  timeout);
801  lock_basic_unlock(&worker->worker_lock);
802  }
803  return;
804 }
805 
806 
811 void
812 worker_sleep_unless(worker_type* worker, time_t timeout)
813 {
814  ods_log_assert(worker);
815  lock_basic_lock(&worker->worker_lock);
816  while (!worker->need_to_exit && !worker_fulfilled(worker)) {
817  worker->sleeping = 1;
818  lock_basic_sleep(&worker->worker_alarm, &worker->worker_lock,
819  timeout);
820  ods_log_debug("[%s[%i]] somebody poked me, check completed jobs %u "
821  "appointed, %u completed, %u failed", worker2str(worker->type),
822  worker->thread_num, worker->jobs_appointed, worker->jobs_completed,
823  worker->jobs_failed);
824  }
825  lock_basic_unlock(&worker->worker_lock);
826  return;
827 }
828 
829 
834 void
836 {
837  ods_log_assert(worker);
838  if (worker && worker->sleeping && !worker->waiting) {
839  ods_log_debug("[%s[%i]] wake up", worker2str(worker->type),
840  worker->thread_num);
841  lock_basic_lock(&worker->worker_lock);
842  lock_basic_alarm(&worker->worker_alarm);
843  worker->sleeping = 0;
844  lock_basic_unlock(&worker->worker_lock);
845  }
846  return;
847 }
848 
849 
854 void
855 worker_wait_timeout(lock_basic_type* lock, cond_basic_type* condition,
856  time_t timeout)
857 {
858  lock_basic_lock(lock);
859  lock_basic_sleep(condition, lock, timeout);
860  lock_basic_unlock(lock);
861  return;
862 }
863 
864 
869 void
870 worker_wait(lock_basic_type* lock, cond_basic_type* condition)
871 {
872  worker_wait_timeout(lock, condition, 0);
873  return;
874 }
875 
876 
881 void
882 worker_notify(lock_basic_type* lock, cond_basic_type* condition)
883 {
884  lock_basic_lock(lock);
885  lock_basic_alarm(condition);
886  lock_basic_unlock(lock);
887  return;
888 }
889 
890 
895 void
896 worker_notify_all(lock_basic_type* lock, cond_basic_type* condition)
897 {
898  lock_basic_lock(lock);
899  lock_basic_broadcast(condition);
900  lock_basic_unlock(lock);
901  return;
902 }
903 
904 
909 void
911 {
912  allocator_type* allocator;
913  cond_basic_type worker_cond;
914  lock_basic_type worker_lock;
915  if (!worker) {
916  return;
917  }
918  allocator = worker->allocator;
919  worker_cond = worker->worker_alarm;
920  worker_lock = worker->worker_lock;
921  allocator_deallocate(allocator, (void*) worker);
922  lock_basic_destroy(&worker_lock);
923  lock_basic_off(&worker_cond);
924  return;
925 }
#define lock_basic_off(cond)
Definition: locks.h:101
Definition: task.h:41
rrset_type * rrset
Definition: denial.h:54
size_t jobs_completed
Definition: worker.h:59
unsigned waiting
Definition: worker.h:64
unsigned need_to_exit
Definition: worker.h:65
task_type * schedule_get_first_task(schedule_type *schedule)
Definition: schedule.c:245
void ods_log_debug(const char *format,...)
Definition: log.c:270
time_t when
Definition: task.h:59
size_t jobs_appointed
Definition: worker.h:58
lock_basic_type worker_lock
Definition: worker.h:62
#define lock_basic_destroy(lock)
Definition: locks.h:93
cond_basic_type signal_cond
Definition: engine.h:78
cond_basic_type q_threshold
Definition: fifoq.h:66
task_id interrupt
Definition: task.h:57
void * allocator_alloc(allocator_type *allocator, size_t size)
Definition: allocator.c:66
ods_status tools_signconf(zone_type *zone)
Definition: tools.c:52
lock_basic_type q_lock
Definition: fifoq.h:65
time_t sig_time
Definition: stats.h:63
void * engine
Definition: worker.h:53
ods_status schedule_task(schedule_type *schedule, task_type *task, int log)
Definition: schedule.c:146
void ods_log_info(const char *format,...)
Definition: log.c:302
const char * task_who2str(task_type *task)
Definition: task.c:176
enum ods_enum_status ods_status
Definition: status.h:90
void worker_start(worker_type *worker)
Definition: worker.c:770
lock_basic_type zone_lock
Definition: zone.h:95
time_t backoff
Definition: task.h:61
void ods_log_error(const char *format,...)
Definition: log.c:334
lock_basic_type stats_lock
Definition: stats.h:67
const char * ods_status2str(ods_status status)
Definition: status.c:111
ldns_rbtree_t * domains
Definition: namedb.h:49
int32_t sig_reuse
Definition: stats.h:62
Definition: task.h:45
void * zone
Definition: task.h:63
ods_lookup_table * ods_lookup_by_id(ods_lookup_table *table, int id)
Definition: status.c:94
rrset_type * next
Definition: rrset.h:73
void worker_cleanup(worker_type *worker)
Definition: worker.c:910
#define lock_basic_set(cond)
Definition: locks.h:97
enum task_id_enum task_id
Definition: task.h:48
ods_status fifoq_push(fifoq_type *q, void *item, worker_type *worker, int *tries)
Definition: fifoq.c:119
void ods_log_crit(const char *format,...)
Definition: log.c:350
int32_t sig_count
Definition: stats.h:60
lock_basic_type signal_lock
Definition: engine.h:79
size_t jobs_failed
Definition: worker.h:60
void worker_wait(lock_basic_type *lock, cond_basic_type *condition)
Definition: worker.c:870
ods_status tools_input(zone_type *zone)
Definition: tools.c:93
task_type * task
Definition: worker.h:54
#define lock_basic_lock(lock)
Definition: locks.h:94
namedb_type * db
Definition: zone.h:86
Definition: task.h:43
void worker_sleep(worker_type *worker, time_t timeout)
Definition: worker.c:793
#define lock_basic_sleep(cond, lock, sleep)
Definition: locks.h:98
time_t halted_when
Definition: task.h:60
time_t clock_in
Definition: worker.h:57
int lock_basic_type
Definition: locks.h:91
task_type * schedule_pop_task(schedule_type *schedule)
Definition: schedule.c:285
unsigned is_processed
Definition: namedb.h:56
ods_status tools_output(zone_type *zone, engine_type *engine)
Definition: tools.c:181
void worker_notify_all(lock_basic_type *lock, cond_basic_type *condition)
Definition: worker.c:896
signconf_type * signconf
Definition: zone.h:84
ods_status zone_backup2(zone_type *zone)
Definition: zone.c:1087
time_t start_time
Definition: stats.h:65
ods_status zone_update_serial(zone_type *zone)
Definition: zone.c:472
task_id halted
Definition: task.h:58
void worker_wakeup(worker_type *worker)
Definition: worker.c:835
enum worker_enum worker_id
Definition: worker.h:46
void worker_sleep_unless(worker_type *worker, time_t timeout)
Definition: worker.c:812
time_t duration2time(duration_type *duration)
Definition: duration.c:371
void ods_log_verbose(const char *format,...)
Definition: log.c:286
time_t last_modified
Definition: signconf.h:78
const char * name
Definition: status.h:95
task_id what
Definition: task.h:56
#define lock_basic_init(lock)
Definition: locks.h:92
int thread_num
Definition: worker.h:51
const char * name
Definition: zone.h:76
schedule_type * taskq
Definition: engine.h:61
ods_status zone_prepare_keys(zone_type *zone)
Definition: zone.c:432
duration_type * sig_resign_interval
Definition: signconf.h:55
cond_basic_type worker_alarm
Definition: worker.h:61
void ods_log_deeebug(const char *format,...)
Definition: log.c:254
worker_type * worker_create(allocator_type *allocator, int num, worker_id type)
Definition: worker.c:72
ods_status rrset_sign(hsm_ctx_t *ctx, rrset_type *rrset, time_t signtime)
Definition: rrset.c:662
void allocator_deallocate(allocator_type *allocator, void *data)
Definition: allocator.c:135
unsigned sleeping
Definition: worker.h:63
task_id working_with
Definition: worker.h:55
lock_basic_type schedule_lock
Definition: schedule.h:63
worker_id type
Definition: worker.h:56
void worker_wait_timeout(lock_basic_type *lock, cond_basic_type *condition, time_t timeout)
Definition: worker.c:855
rrset_type * rrsets
Definition: domain.h:60
void * task
Definition: zone.h:92
fifoq_type * signq
Definition: engine.h:62
cond_basic_type q_nonfull
Definition: fifoq.h:67
#define ods_log_assert(x)
Definition: log.h:154
int need_to_reload
Definition: engine.h:75
int32_t sig_soa_count
Definition: stats.h:61
void * denial
Definition: domain.h:56
#define lock_basic_alarm(cond)
Definition: locks.h:99
#define lock_basic_unlock(lock)
Definition: locks.h:95
void ods_log_warning(const char *format,...)
Definition: log.c:318
allocator_type * allocator
Definition: worker.h:50
ldns_rdf * apex
Definition: zone.h:68
const char * task_what2str(task_id what)
Definition: task.c:146
void worker_notify(lock_basic_type *lock, cond_basic_type *condition)
Definition: worker.c:882
time_t time_now(void)
Definition: duration.c:513
ods_lookup_table worker_str[]
Definition: worker.c:45
stats_type * stats
Definition: zone.h:94
#define lock_basic_broadcast(cond)
Definition: locks.h:100
void * fifoq_pop(fifoq_type *q, worker_type **worker)
Definition: fifoq.c:89