Changeset 1058 for proto/parabix2


Ignore:
Timestamp:
Mar 30, 2011, 4:38:24 PM (8 years ago)
Author:
lindanl
Message:

multithreads

Location:
proto/parabix2
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • proto/parabix2/pablo_template_multithreads.cpp

    r1057 r1058  
    3030#ifdef BUFFER_PROFILING
    3131        BOM_Table * parser_timer;
     32#ifdef ANALYZE
     33        BOM_Table * T1_total_timer;
     34        BOM_Table * T2_total_timer;
     35        BOM_Table * T3_total_timer;
     36        BOM_Table * T4_total_timer;
     37        BOM_Table * T1_work_timer;
     38        BOM_Table * T2_work_timer;
     39        BOM_Table * T3_work_timer;
     40        BOM_Table * T4_work_timer;     
     41#endif
    3242#endif
    3343LineColTracker tracker;
     
    304314FILE *infile;
    305315FILE *outfile;
    306 // load with 'consume' (data-dependent) memory ordering
    307 template<typename T>
    308 T load_consume(T const* addr)
    309 {
    310   // hardware fence is implicit on x86
    311   T v = *const_cast<T const volatile*>(addr);
    312 #ifdef GCC
    313  __sync_synchronize(); // compiler fence
    314 #endif
    315 #ifdef ICC
    316  __memory_barrier(); // compiler fence
    317 #endif
    318   return v;
    319 }
    320 
    321 // store with 'release' memory ordering
    322 template<typename T>
    323 void store_release(T* addr, T v)
    324 {
    325   // hardware fence is implicit on x86
    326 #ifdef GCC
    327  __sync_synchronize(); // compiler fence
    328 #endif
    329 #ifdef ICC
    330  __memory_barrier(); // compiler fence
    331 #endif
    332   *const_cast<T volatile*>(addr) = v;
    333 }
    334 
    335 // cache line size on modern x86 processors (in bytes)
    336 size_t const cache_line_size = 64;
    337 
    338 // single-producer/single-consumer queue
     316
    339317template<typename T>
    340318class spsc_queue
     
    343321  spsc_queue()
    344322  {
    345       node* n = new node;
    346       n->next_ = 0;
    347       overlap_bytes = 0;
    348       tail_ = head1_ = head2_ = head3_ = first_= tail_copy_ = n;
    349       count=0;
     323          head1_ =  head2_ = head3_ = head4_ = 1;
     324      tail_ = 0;
    350325  }
    351326
    352327  ~spsc_queue()
    353328  {
    354       node* n = first_;
    355       do
    356       {
    357           node* next = n->next_;
    358           delete n;
    359           n = next;
    360       }
    361       while (n);
     329
    362330  }
    363331
    364   void pass1()
     332  bool pass1()
    365333  {
    366       node* n = alloc_node();
    367       n->next_ = 0;
    368       //do pass1
    369       char * srcptr = n->para_data.srcbuf + OVERLAP_BUFSIZE;
    370       memmove(n->para_data.srcbuf, overlap_buffer, 2*overlap_bytes);
    371       int chars_read = fread(&srcptr[overlap_bytes], 1, SEGMENT_SIZE + OVERLAP_BUFSIZE - overlap_bytes, infile);
    372       memmove(overlap_buffer, &srcptr[SEGMENT_SIZE-OVERLAP_BUFSIZE], 2*OVERLAP_BUFSIZE);
    373       overlap_bytes = OVERLAP_BUFSIZE;
    374       s2p_do_segment((BytePack *) srcptr, n->para_data.basis_bits);
    375       classify_bytes.do_segment(n->para_data.basis_bits, n->para_data.lex);
    376 
    377 
    378       store_release(&head1_->next_, n);
    379       head1_ = n;
     334          while(head1_==tail_)
     335          __sync_synchronize();
     336
     337#ifdef ANALYZE
     338PERF_SEC_START(T1_work_timer);
     339#endif
     340          char * srcptr = n[head1_].srcbuf + OVERLAP_BUFSIZE;
     341          memmove(n[head1_].srcbuf, overlap_buffer, 2*overlap_bytes);
     342          int chars_read = fread(&srcptr[overlap_bytes], 1, SEGMENT_SIZE + OVERLAP_BUFSIZE - overlap_bytes, infile);
     343          memmove(overlap_buffer, &srcptr[SEGMENT_SIZE-OVERLAP_BUFSIZE], 2*OVERLAP_BUFSIZE);
     344          overlap_bytes = OVERLAP_BUFSIZE;
     345          s2p_do_segment((BytePack *) srcptr, n[head1_].basis_bits);
     346          classify_bytes.do_segment(n[head1_].basis_bits, n[head1_].lex);
     347
     348#ifdef ANALYZE
     349PERF_SEC_END(T1_work_timer, SEGMENT_SIZE);
     350#endif
     351          head1_ = (head1_+1)%8;
     352
    380353  }
     354
    381355  bool pass2()
    382356  {
    383           if(load_consume(&head2_->next_))
    384           {
    385               //do pass2
    386 
    387                   validate_utf8.do_segment(head2_->next_->para_data.basis_bits, head2_->next_->para_data.u8);
    388                   add_scope_streams.do_segment(head2_->next_->para_data.lex, head2_->next_->para_data.scope1);
    389                   parse_CtCDPI.do_segment(head2_->next_->para_data.ctCDPI_Callouts, head2_->next_->para_data.lex, head2_->next_->para_data.scope1, head2_->next_->para_data.check_streams);
    390                   parse_refs.do_segment(head2_->next_->para_data.lex, head2_->next_->para_data.scope1, head2_->next_->para_data.ctCDPI_Callouts, head2_->next_->para_data.ref_Callouts);
    391 
    392                   store_release(&head2_, head2_->next_);
    393                   return true;
    394           }
    395       else
    396       {
    397           return false;
    398       }
     357          while(head2_==head1_)
     358          __sync_synchronize();
     359
     360#ifdef ANALYZE
     361PERF_SEC_START(T2_work_timer);
     362#endif
     363          validate_utf8.do_segment(n[head2_].basis_bits, n[head2_].u8);
     364          add_scope_streams.do_segment(n[head2_].lex, n[head2_].scope1);
     365          parse_CtCDPI.do_segment(n[head2_].ctCDPI_Callouts, n[head2_].lex, n[head2_].scope1, n[head2_].check_streams);
     366          parse_refs.do_segment(n[head2_].lex, n[head2_].scope1, n[head2_].ctCDPI_Callouts, n[head2_].ref_Callouts);
     367
     368#ifdef ANALYZE
     369PERF_SEC_END(T2_work_timer, SEGMENT_SIZE);
     370#endif
     371          head2_ = (head2_+1)%8;
     372
    399373  }
    400374
    401375  bool pass3()
    402376  {
    403       if ((head3_!= head2_) && load_consume(&head3_->next_))
    404       {
    405           //do pass3
    406           parse_tags.do_segment(head3_->next_->para_data.lex, head3_->next_->para_data.scope1, head3_->next_->para_data.ctCDPI_Callouts, head3_->next_->para_data.tag_Callouts);
    407                   validate_xml_names.do_segment(head3_->next_->para_data.ctCDPI_Callouts, head3_->next_->para_data.ref_Callouts, head3_->next_->para_data.tag_Callouts, head3_->next_->para_data.lex, head3_->next_->para_data.u8, head3_->next_->para_data.xml_names, head3_->next_->para_data.check_streams);
    408                   do_check_streams.do_segment(head3_->next_->para_data.ctCDPI_Callouts, head3_->next_->para_data.tag_Callouts, head3_->next_->para_data.lex, head3_->next_->para_data.u8, head3_->next_->para_data.scope1, head3_->next_->para_data.ref_Callouts, head3_->next_->para_data.xml_names, head3_->next_->para_data.check_streams);
    409 
    410           store_release(&head3_, head3_->next_);
    411           return true;
    412       }
    413       else
    414       {
    415           return false;
    416       }
     377          while(head3_==head2_)
     378          __sync_synchronize();
     379
     380#ifdef ANALYZE
     381PERF_SEC_START(T3_work_timer);
     382#endif
     383         parse_tags.do_segment(n[head3_].lex, n[head3_].scope1, n[head3_].ctCDPI_Callouts, n[head3_].tag_Callouts);
     384         validate_xml_names.do_segment(n[head3_].ctCDPI_Callouts, n[head3_].ref_Callouts, n[head3_].tag_Callouts, n[head3_].lex, n[head3_].u8, n[head3_].xml_names, n[head3_].check_streams);
     385                  do_check_streams.do_segment(n[head3_].ctCDPI_Callouts, n[head3_].tag_Callouts, n[head3_].lex, n[head3_].u8, n[head3_].scope1, n[head3_].ref_Callouts, n[head3_].xml_names, n[head3_].check_streams);
     386          head3_ = (head3_+1)%8;
     387
     388#ifdef ANALYZE
     389PERF_SEC_END(T3_work_timer, SEGMENT_SIZE);
     390#endif
    417391  }
    418392
    419393  bool pass4()
    420394  {
    421       if ((tail_!= head3_)  && load_consume(&tail_->next_))
    422       {
    423           //do pass4
    424           source = tail_->next_->para_data.srcbuf+OVERLAP_BUFSIZE;
     395          while(head4_==head3_)
     396          __sync_synchronize();
     397
     398#ifdef ANALYZE
     399PERF_SEC_START(T4_work_timer);
     400#endif
     401          source = n[head4_].srcbuf+OVERLAP_BUFSIZE;
    425402          matcher.setSrc(source);
    426           postprocess_do_segment(tail_->next_->para_data.lex, tail_->next_->para_data.ctCDPI_Callouts, tail_->next_->para_data.ref_Callouts, tail_->next_->para_data.check_streams);
     403          postprocess_do_segment(n[head4_].lex, n[head4_].ctCDPI_Callouts, n[head4_].ref_Callouts, n[head4_].check_streams);
    427404          block_base = 0;
    428           store_release(&tail_, tail_->next_);
    429           return true;
    430       }
    431       else
    432       {
    433           return false;
    434       }
     405
     406#ifdef ANALYZE
     407PERF_SEC_END(T4_work_timer, SEGMENT_SIZE);
     408#endif
     409          head4_ = (head4_+1)%8;
     410          tail_ = (tail_+1)%8;
     411
    435412  }
    436413
    437   int count;
     414  int overlap_bytes;
     415
     416  char overlap_buffer[2*OVERLAP_BUFSIZE];
     417
     418
    438419private:
    439   // internal node structure
    440   struct node
    441   {
    442       node* next_;
    443       T para_data;
    444   };
    445 
    446   // consumer part
    447   // accessed mainly by consumer, infrequently be producer
    448   node* tail_; // tail of the queue
    449 
    450   // delimiter between consumer part and producer part,
    451   // so that they situated on different cache lines
    452   char cache_line_pad_ [cache_line_size];
    453 
    454   int overlap_bytes;
    455 
    456   char overlap_buffer[2*OVERLAP_BUFSIZE];
    457 
    458   // producer part
    459   // accessed only by producer
    460   node* head1_; // head of the queue
    461   node* head2_;
    462   node* head3_;
    463   node* first_; // last unused node (tail of node cache)
    464   node* tail_copy_; // helper (points somewhere between first_ and tail_)
    465 
    466 
    467   node* alloc_node()
    468   {
    469       // first tries to allocate node from internal node cache,
    470       // if attempt fails, allocates node via ::operator new()
    471 while(1){
    472       if (first_ != tail_copy_)
    473       {
    474           node* n = first_;
    475           first_ = first_->next_;
    476           return n;
    477       }
    478       tail_copy_ = load_consume(&tail_);
    479       if (first_ != tail_copy_)
    480       {
    481           node* n = first_;
    482           first_ = first_->next_;
    483           return n;
    484       }
    485       node* n = new node;
    486       if(count>8)
    487         continue;
    488       else
    489         count++;
    490       return n;
    491 }
    492   }
     420
     421  int tail_; // tail of the queue
     422  int head1_; // head of the queue
     423  int head2_;
     424  int head3_;
     425  int head4_;
     426  T  n[8];
     427
    493428
    494429  spsc_queue(spsc_queue const&);
    495430  spsc_queue& operator = (spsc_queue const&);
    496431};
    497 
    498432
    499433spsc_queue<struct Para_data> q;
     
    503437   long tid = (long)threadid;
    504438   for(int i=0;i<file_segs;i++){
     439#ifdef ANALYZE
     440PERF_SEC_START(T1_total_timer);
     441#endif
    505442           q.pass1();
    506443//         printf("thread %ld: %i\n",tid,i);
     444#ifdef ANALYZE
     445PERF_SEC_END(T1_total_timer, SEGMENT_SIZE);
     446#endif
    507447   }
    508448   pthread_exit(NULL);
     
    512452   long tid = (long)threadid;
    513453   for(int i=0;i<file_segs;i++){
    514            while(!q.pass2());
     454#ifdef ANALYZE
     455PERF_SEC_START(T2_total_timer);
     456#endif
     457           q.pass2();
    515458//         printf("thread %ld: %i\n",tid,i);
     459#ifdef ANALYZE
     460PERF_SEC_END(T2_total_timer, SEGMENT_SIZE);
     461#endif
    516462   }
    517463   pthread_exit(NULL);
     
    522468   long tid = (long)threadid;
    523469   for(int i=0;i<file_segs;i++){
    524            while(!q.pass3());
     470#ifdef ANALYZE
     471PERF_SEC_START(T3_total_timer);
     472#endif
     473           q.pass3();
    525474//         printf("thread %ld: %i\n",tid,i);
     475#ifdef ANALYZE
     476PERF_SEC_END(T3_total_timer, SEGMENT_SIZE);
     477#endif
    526478   }
    527479   pthread_exit(NULL);
     
    531483   long tid = (long)threadid;
    532484   for(int i=0;i<file_segs;i++){
    533            while(!q.pass4());
     485#ifdef ANALYZE
     486PERF_SEC_START(T4_total_timer);
     487#endif
     488           q.pass4();
    534489//         printf("thread %ld: %i\n",tid,i);
     490#ifdef ANALYZE
     491PERF_SEC_END(T4_total_timer, SEGMENT_SIZE);
     492#endif
    535493   }
    536494   pthread_exit(NULL);
     
    575533
    576534        PERF_SEC_INIT(parser_timer);
     535#ifdef ANALYZE
     536        PERF_SEC_INIT(T1_total_timer);
     537        PERF_SEC_INIT(T2_total_timer);
     538        PERF_SEC_INIT(T3_total_timer);
     539        PERF_SEC_INIT(T4_total_timer);
     540        PERF_SEC_INIT(T1_work_timer);
     541        PERF_SEC_INIT(T2_work_timer);
     542        PERF_SEC_INIT(T3_work_timer);
     543        PERF_SEC_INIT(T4_work_timer);
     544#endif
    577545
    578546        PERF_SEC_START(parser_timer);
     
    618586
    619587        PERF_SEC_DUMP(parser_timer);
     588#ifdef ANALYZE
     589printf("\nT1_total_timer\n");
     590        PERF_SEC_DUMP(T1_total_timer);
     591printf("\nT2_total_timer\n");
     592        PERF_SEC_DUMP(T2_total_timer);
     593printf("\nT3_total_timer\n");
     594        PERF_SEC_DUMP(T3_total_timer);
     595printf("\nT4_total_timer\n");
     596        PERF_SEC_DUMP(T4_total_timer);
     597printf("\nT1_work_timer\n");
     598        PERF_SEC_DUMP(T1_work_timer);
     599printf("\nT2_work_timer\n");
     600        PERF_SEC_DUMP(T2_work_timer);
     601printf("\nT3_work_timer\n");
     602        PERF_SEC_DUMP(T3_work_timer);
     603printf("\nT4_work_timer\n");
     604        PERF_SEC_DUMP(T4_work_timer);
     605#endif
    620606
    621607        PERF_SEC_DESTROY(parser_timer);
    622 
     608#ifdef ANALYZE
     609        PERF_SEC_DESTROY(T1_total_timer);
     610        PERF_SEC_DESTROY(T2_total_timer);
     611        PERF_SEC_DESTROY(T3_total_timer);
     612        PERF_SEC_DESTROY(T4_total_timer);
     613        PERF_SEC_DESTROY(T1_work_timer);
     614        PERF_SEC_DESTROY(T2_work_timer);
     615        PERF_SEC_DESTROY(T3_work_timer);
     616        PERF_SEC_DESTROY(T4_work_timer);
     617#endif
    623618//      printf("count = %i\n", q.count);
    624619
  • proto/parabix2/src/Makefile

    r1056 r1058  
    3232
    3333threads:        $(SRCFILE)
    34         g++ -O1 -msse2 -o $(OUTFILE) $(SRCFILE) $(AFLAGS) -lpthread -DBUFFER_PROFILING
     34        g++ -O3 -msse2 -o $(OUTFILE) $(SRCFILE) $(AFLAGS) -lpthread -DBUFFER_PROFILING
    3535       
    3636code_clocker:   $(SRCFILE)
Note: See TracChangeset for help on using the changeset viewer.