source: proto/parabix2/pablo_template_multithreads.cpp @ 1051

Last change on this file since 1051 was 1051, checked in by lindanl, 8 years ago

Add multithreads template

File size: 17.0 KB
Line 
1#include <stdio.h>
2#include <stdlib.h>
3#include <errno.h>
4#include <sys/types.h>
5#include <sys/stat.h>
6#include <pthread.h>
7#include "../lib/lib_simd.h"
8#define BLOCK_SIZE 128
9#define SEGMENT_BLOCKS 128
10#define SEGMENT_SIZE (BLOCK_SIZE * SEGMENT_BLOCKS)
11#define OVERLAP_BUFSIZE 16
12
13typedef uint64_t ScanBlock;
14typedef SIMD_type BytePack;
15typedef SIMD_type BitBlock;
16
17#include "../lib/carryQ.h"
18#include "xmldecl.h"
19#include "xml_error.c"
20#include "xmldecl.c"
21#include "namechars.h"
22
23#include "../lib/perflib/perfsec.h"
24#include "../lib/s2p.h"
25
26#include "TagMatcher.h"
27#include "LineColTracker.h"
28BitBlock EOF_mask = simd_const_1(1);
29
30#ifdef BUFFER_PROFILING
31        BOM_Table * parser_timer;
32#endif
33LineColTracker tracker;
34TagMatcher matcher;
35
36int block_base=0;
37int buffer_base=0;
38char * source;
39
40static inline int StreamScan(ScanBlock * stream, int blk_count, int ProcessPos(int)) {
41        int blk;
42        int block_pos = 0;
43
44        for (blk = 0; blk < blk_count; blk++) {
45                ScanBlock s = stream[blk];
46                while(s) {
47                        int code = (ProcessPos(cfzl(s) + block_pos));
48                        if (code) return code;
49                        s = s & (s-1);  // clear rightmost bit.
50                }
51                block_pos += 8 * sizeof(ScanBlock);
52        }
53        return 0;
54}
55
56
57static inline void ReportError(const char * error_msg, int error_pos_in_block) {
58  int error_line, error_column;
59  tracker.get_Line_and_Column(error_pos_in_block, error_line, error_column);
60  fprintf(stderr, "%s at line %i, column %i\n", error_msg, error_line, error_column);
61}
62
63
64static inline int NameStrt_check(int pos) {
65        int block_pos = block_base + pos;
66        if(XML_10_UTF8_NameStrt_bytes((unsigned char*)&source[block_pos]) == 0){
67              ReportError("name start error", pos);
68              exit(-1);
69        }
70        return 0;
71}
72
73static inline int Name_check(int pos) {
74        int block_pos = block_base + pos;
75        if(XML_10_UTF8_NameChar_bytes((unsigned char*)&source[block_pos]) == 0){
76              ReportError("name error", pos);
77              exit(-1);
78        }
79        return 0;
80}
81
82static inline int PIName_check(int pos) {
83        int block_pos = block_base + pos;
84        int file_pos = block_pos+buffer_base;
85        if (at_XxMmLll<ASCII>((unsigned char*)&source[block_pos]) && (source[block_pos+3]=='?' || source[block_pos+3]<= ' ')) {
86              // "<?xml" legal at start of file.
87              if ((file_pos == 2) && at_XmlDecl_start<ASCII>((unsigned char*)&source[0])) return 0;
88              ReportError("[Xx][Mm][Ll] illegal as PI name", pos);
89              exit(-1);
90        }
91        return 0;
92}
93
94static inline int CD_check(int pos) {
95        int block_pos = block_base + pos;
96        if (!at_CDATA1<ASCII>((unsigned char*)&source[block_pos])){
97              ReportError("CDATA error", pos);
98              exit(-1);
99        }
100        return 0;
101}
102
103static inline int GenRef_check(int pos) {
104        int block_pos = block_base + pos;
105        unsigned char* s = (unsigned char*)&source[block_pos];
106        if (!(at_Ref_gt<ASCII>(s)||at_Ref_lt<ASCII>(s)||at_Ref_amp<ASCII>(s)||at_Ref_quot<ASCII>(s)||at_Ref_apos<ASCII>(s))){
107              ReportError("Undefined reference", pos);
108              exit(-1);
109        }
110        return 0;
111}
112
113static inline int HexRef_check(int pos) {
114        int block_pos = block_base + pos;
115        unsigned char* s = (unsigned char*)&source[block_pos];
116        int ch_val = 0;
117        while(at_HexDigit<ASCII>(s)){
118          ch_val = HexVal<ASCII>(s[0]) + (ch_val<<4);
119          if (ch_val> 0x10FFFF ){
120            ReportError("Illegal character reference", pos);
121            exit(-1);
122          }
123          s++;
124        }
125        if ((ch_val == 0x0) || ((ch_val | 0x7FF) == 0xDFFF)|| ((ch_val | 0x1) == 0xFFFF)){
126          ReportError("Illegal character reference", pos);
127          exit(-1);
128        }
129        else if (((ch_val < 0x20) && (ch_val != 0x9) && (ch_val != 0xD) && (ch_val != 0xA))){
130          ReportError("Illegal XML 1.0 character reference", pos);
131          exit(-1);
132        }
133        return 0;
134}
135
136static inline int DecRef_check(int pos) {
137        int block_pos = block_base + pos;
138        unsigned char* s = (unsigned char*)&source[block_pos];
139        int ch_val = 0;
140        while(at_HexDigit<ASCII>(s)){
141          ch_val = DigitVal<ASCII>(s[0]) + ch_val*10;
142          if (ch_val> 0x10FFFF ){
143            ReportError("Illegal character reference", pos);
144            exit(-1);
145          }
146          s++;
147        }
148        if ((ch_val == 0x0) || ((ch_val | 0x7FF) == 0xDFFF)|| ((ch_val | 0x1) == 0xFFFF)){
149          ReportError("Illegal character reference", pos);
150          exit(-1);
151        }
152        else if (((ch_val < 0x20) && (ch_val != 0x9) && (ch_val != 0xD) && (ch_val != 0xA))){
153          ReportError("Illegal XML 1.0 character reference", pos);
154          exit(-1);
155        }
156        return 0;
157}
158
159static inline int AttRef_check(int pos) {
160        int block_pos = block_base + pos;
161        unsigned char* s = (unsigned char*)&source[block_pos];
162        int ch_val = 0;
163        if(s[0]=='#'){
164          s++;
165          if(s[0]=='x' || s[0]=='X'){
166            s++;
167            while(at_HexDigit<ASCII>(s)){
168              ch_val = HexVal<ASCII>(s[0]) + (ch_val<<4);
169              s++;
170            }
171          }
172          else{
173            while(at_HexDigit<ASCII>(s)){
174              ch_val = DigitVal<ASCII>(s[0]) + ch_val*10;
175              s++;
176            }
177          }
178          if (ch_val==60){
179            ReportError("Attribute values contain '<' characters after reference expansion", pos);
180            exit(-1);
181          }
182        }
183        else if(at_Ref_lt<ASCII>(s)){
184          ReportError("Attribute values contain '<' characters after reference expansion", pos);
185          exit(-1);
186        }
187        return 0;
188}
189
190
191
192@global
193
194static inline void s2p_do_block(BytePack U8[], Basis_bits & basis_bits) {
195  s2p(U8[0], U8[1], U8[2], U8[3], U8[4], U8[5], U8[6], U8[7],
196        basis_bits.bit_0, basis_bits.bit_1, basis_bits.bit_2, basis_bits.bit_3, basis_bits.bit_4, basis_bits.bit_5, basis_bits.bit_6, basis_bits.bit_7);
197}
198
199static inline void s2p_do_final_block(BytePack U8[], Basis_bits & basis_bits, BitBlock EOF_mask) {
200  s2p_do_block(U8, basis_bits);
201  basis_bits.bit_0 = simd_and(basis_bits.bit_0, EOF_mask);
202  basis_bits.bit_1 = simd_and(basis_bits.bit_1, EOF_mask);
203  basis_bits.bit_2 = simd_and(basis_bits.bit_2, EOF_mask);
204  basis_bits.bit_3 = simd_and(basis_bits.bit_3, EOF_mask);
205  basis_bits.bit_4 = simd_and(basis_bits.bit_4, EOF_mask);
206  basis_bits.bit_5 = simd_and(basis_bits.bit_5, EOF_mask);
207  basis_bits.bit_6 = simd_and(basis_bits.bit_6, EOF_mask);
208  basis_bits.bit_7 = simd_and(basis_bits.bit_7, EOF_mask);
209}
210
211static inline void s2p_do_segment(BytePack U8[], Basis_bits basis_bits[]) {
212  for (int i = 0; i < SEGMENT_BLOCKS; i++)
213           s2p_do_block(&U8[8*i], basis_bits[i]);
214}
215
216static inline void postprocess_do_block(Lex & lex, CtCDPI_Callouts & ctCDPI_Callouts, Ref_Callouts & ref_Callouts, Check_streams & check_streams, int chars_avail){
217            tracker.StoreNewlines(lex.LF);
218
219                if (bitblock_has_bit(simd_or(check_streams.non_ascii_name_starts, check_streams.non_ascii_names))) {
220                  StreamScan((ScanBlock *) &check_streams.non_ascii_name_starts, sizeof(BitBlock)/sizeof(ScanBlock), NameStrt_check);
221                  StreamScan((ScanBlock *) &check_streams.non_ascii_names, sizeof(BitBlock)/sizeof(ScanBlock), Name_check);
222                }
223
224                if (bitblock_has_bit(ctCDPI_Callouts.PI_name_starts)){
225                  StreamScan((ScanBlock *) &ctCDPI_Callouts.PI_name_starts, sizeof(BitBlock)/sizeof(ScanBlock), PIName_check);
226                }
227
228                if (bitblock_has_bit(ctCDPI_Callouts.CD_starts)){
229                  StreamScan((ScanBlock *) &ctCDPI_Callouts.CD_starts, sizeof(BitBlock)/sizeof(ScanBlock), CD_check);
230                }
231
232                if (bitblock_has_bit(ref_Callouts.GenRef_starts)){
233                  StreamScan((ScanBlock *) &ref_Callouts.GenRef_starts, sizeof(BitBlock)/sizeof(ScanBlock), GenRef_check);
234                }
235
236                if (bitblock_has_bit(ref_Callouts.DecRef_starts)){
237                  StreamScan((ScanBlock *) &ref_Callouts.DecRef_starts, sizeof(BitBlock)/sizeof(ScanBlock), DecRef_check);
238                }
239
240                if (bitblock_has_bit(ref_Callouts.HexRef_starts)){
241                  StreamScan((ScanBlock *) &ref_Callouts.HexRef_starts, sizeof(BitBlock)/sizeof(ScanBlock), HexRef_check);
242                }
243
244                if (bitblock_has_bit(check_streams.att_refs)){
245                  StreamScan((ScanBlock *) &check_streams.att_refs, sizeof(BitBlock)/sizeof(ScanBlock), AttRef_check);
246                }
247
248                if (bitblock_has_bit(check_streams.error_mask)) {
249                  int errpos = count_forward_zeroes(check_streams.error_mask);
250                  ReportError("error found", errpos);
251              exit(-1);
252                }
253
254                matcher.store_streams(check_streams.tag_marks, check_streams.name_follows, check_streams.misc_mask, chars_avail);
255                tracker.AdvanceBlock();
256}
257
258static inline void postprocess_do_segment(Lex lex[], CtCDPI_Callouts ctCDPI_Callouts[], Ref_Callouts ref_Callouts[], Check_streams check_streams[]){
259
260  for (int i = 0; i < SEGMENT_BLOCKS; i++){
261           postprocess_do_block(lex[i], ctCDPI_Callouts[i], ref_Callouts[i], check_streams[i], SEGMENT_SIZE);
262           block_base += BLOCK_SIZE;
263  }
264
265        matcher.StreamScan(SEGMENT_SIZE);
266        matcher.Advance_segment();
267        tracker.Advance_buffer();
268}
269
270struct Para_data{
271
272          char srcbuf[BLOCK_SIZE * (SEGMENT_BLOCKS+1) + OVERLAP_BUFSIZE*2];
273
274          struct U8 u8[SEGMENT_BLOCKS];
275
276          struct Lex lex[SEGMENT_BLOCKS];
277
278          struct Scope1 scope1[SEGMENT_BLOCKS];
279
280          struct CtCDPI_Callouts ctCDPI_Callouts[SEGMENT_BLOCKS];
281
282          struct Ref_Callouts ref_Callouts[SEGMENT_BLOCKS];
283
284          struct Tag_Callouts tag_Callouts[SEGMENT_BLOCKS];
285
286          struct Basis_bits basis_bits[SEGMENT_BLOCKS];
287
288          struct Check_streams check_streams[SEGMENT_BLOCKS];
289
290          struct Xml_names xml_names[SEGMENT_BLOCKS];
291};
292
293Parse_refs parse_refs;
294Parse_tags parse_tags;
295Classify_bytes_Validate_utf8 classify_bytes_Validate_utf8;
296Parse_CtCDPI parse_CtCDPI;
297Do_check_streams do_check_streams;
298Validate_xml_names validate_xml_names;
299Add_scope_streams add_scope_streams;
300
301int file_segs=0;
302FILE *infile;
303FILE *outfile;
304// load with 'consume' (data-dependent) memory ordering
305template<typename T>
306T load_consume(T const* addr)
307{
308  // hardware fence is implicit on x86
309  T v = *const_cast<T const volatile*>(addr);
310#ifdef GCC
311 __sync_synchronize(); // compiler fence
312#endif
313#ifdef ICC
314 __memory_barrier(); // compiler fence
315#endif
316  return v;
317}
318
319// store with 'release' memory ordering
320template<typename T>
321void store_release(T* addr, T v)
322{
323  // hardware fence is implicit on x86
324#ifdef GCC
325 __sync_synchronize(); // compiler fence
326#endif
327#ifdef ICC
328 __memory_barrier(); // compiler fence
329#endif
330  *const_cast<T volatile*>(addr) = v;
331}
332
333// cache line size on modern x86 processors (in bytes)
334size_t const cache_line_size = 64;
335
336// single-producer/single-consumer queue
337template<typename T>
338class spsc_queue
339{
340public:
341  spsc_queue()
342  {
343      node* n = new node;
344      n->next_ = 0;
345      overlap_bytes = 0;
346      tail_ = head1_ = head2_ = head3_ = first_= tail_copy_ = n;
347      count=0;
348  }
349
350  ~spsc_queue()
351  {
352      node* n = first_;
353      do
354      {
355          node* next = n->next_;
356          delete n;
357          n = next;
358      }
359      while (n);
360  }
361
362  void pass1()
363  {
364      node* n = alloc_node();
365      n->next_ = 0;
366      //do pass1
367      char * srcptr = n->para_data.srcbuf + OVERLAP_BUFSIZE;
368      memmove(n->para_data.srcbuf, overlap_buffer, 2*overlap_bytes);
369      int chars_read = fread(&srcptr[overlap_bytes], 1, SEGMENT_SIZE + OVERLAP_BUFSIZE - overlap_bytes, infile);
370      memmove(overlap_buffer, &srcptr[SEGMENT_SIZE-OVERLAP_BUFSIZE], 2*OVERLAP_BUFSIZE);
371      overlap_bytes = OVERLAP_BUFSIZE;
372      s2p_do_segment((BytePack *) srcptr, n->para_data.basis_bits);
373      classify_bytes_Validate_utf8.do_segment(n->para_data.basis_bits, n->para_data.lex, n->para_data.u8);
374
375      store_release(&head1_->next_, n);
376      head1_ = n;
377  }
378  bool pass2()
379  {
380          if(load_consume(&head2_->next_))
381          {
382              //do pass2
383
384                  add_scope_streams.do_segment(head2_->next_->para_data.lex, head2_->next_->para_data.scope1);
385                  parse_CtCDPI.do_segment(head2_->next_->para_data.ctCDPI_Callouts, head2_->next_->para_data.lex, head2_->next_->para_data.scope1, head2_->next_->para_data.check_streams);
386                  parse_refs.do_segment(head2_->next_->para_data.lex, head2_->next_->para_data.scope1, head2_->next_->para_data.ctCDPI_Callouts, head2_->next_->para_data.ref_Callouts);
387
388                  store_release(&head2_, head2_->next_);
389                  return true;
390          }
391      else
392      {
393          return false;
394      }
395  }
396
397  bool pass3()
398  {
399      if ((head3_!= head2_) && load_consume(&head3_->next_))
400      {
401          //do pass3
402          parse_tags.do_segment(head3_->next_->para_data.lex, head3_->next_->para_data.scope1, head3_->next_->para_data.ctCDPI_Callouts, head3_->next_->para_data.tag_Callouts);
403                  validate_xml_names.do_segment(head3_->next_->para_data.ctCDPI_Callouts, head3_->next_->para_data.ref_Callouts, head3_->next_->para_data.tag_Callouts, head3_->next_->para_data.lex, head3_->next_->para_data.u8, head3_->next_->para_data.xml_names, head3_->next_->para_data.check_streams);
404                  do_check_streams.do_segment(head3_->next_->para_data.ctCDPI_Callouts, head3_->next_->para_data.tag_Callouts, head3_->next_->para_data.lex, head3_->next_->para_data.u8, head3_->next_->para_data.scope1, head3_->next_->para_data.ref_Callouts, head3_->next_->para_data.xml_names, head3_->next_->para_data.check_streams);
405
406          store_release(&head3_, head3_->next_);
407          return true;
408      }
409      else
410      {
411          return false;
412      }
413  }
414
415  bool pass4()
416  {
417      if ((tail_!= head3_)  && load_consume(&tail_->next_))
418      {
419          //do pass4
420          source = tail_->next_->para_data.srcbuf+OVERLAP_BUFSIZE;
421          matcher.setSrc(source);
422          postprocess_do_segment(tail_->next_->para_data.lex, tail_->next_->para_data.ctCDPI_Callouts, tail_->next_->para_data.ref_Callouts, tail_->next_->para_data.check_streams);
423          block_base = 0;
424          store_release(&tail_, tail_->next_);
425          return true;
426      }
427      else
428      {
429          return false;
430      }
431  }
432
433  int count;
434private:
435  // internal node structure
436  struct node
437  {
438      node* next_;
439      T para_data;
440  };
441
442  // consumer part
443  // accessed mainly by consumer, infrequently be producer
444  node* tail_; // tail of the queue
445
446  // delimiter between consumer part and producer part,
447  // so that they situated on different cache lines
448  char cache_line_pad_ [cache_line_size];
449
450  int overlap_bytes;
451
452  char overlap_buffer[2*OVERLAP_BUFSIZE];
453
454  // producer part
455  // accessed only by producer
456  node* head1_; // head of the queue
457  node* head2_;
458  node* head3_;
459  node* first_; // last unused node (tail of node cache)
460  node* tail_copy_; // helper (points somewhere between first_ and tail_)
461
462
463  node* alloc_node()
464  {
465      // first tries to allocate node from internal node cache,
466      // if attempt fails, allocates node via ::operator new()
467while(1){
468      if (first_ != tail_copy_)
469      {
470          node* n = first_;
471          first_ = first_->next_;
472          return n;
473      }
474      tail_copy_ = load_consume(&tail_);
475      if (first_ != tail_copy_)
476      {
477          node* n = first_;
478          first_ = first_->next_;
479          return n;
480      }
481      node* n = new node;
482      if(count>8)
483        continue;
484      else
485        count++;
486      return n;
487}
488  }
489
490  spsc_queue(spsc_queue const&);
491  spsc_queue& operator = (spsc_queue const&);
492};
493
494
495spsc_queue<struct Para_data> q;
496
497void *stageOne(void *threadid)
498{
499   long tid = (long)threadid;
500   for(int i=0;i<file_segs;i++){
501           q.pass1();
502//         printf("thread %ld: %i\n",tid,i);
503   }
504   pthread_exit(NULL);
505}
506void *stageTwo(void *threadid)
507{
508   long tid = (long)threadid;
509   for(int i=0;i<file_segs;i++){
510           while(!q.pass2());
511//         printf("thread %ld: %i\n",tid,i);
512   }
513   pthread_exit(NULL);
514}
515
516void *stageThree(void *threadid)
517{
518   long tid = (long)threadid;
519   for(int i=0;i<file_segs;i++){
520           while(!q.pass3());
521//         printf("thread %ld: %i\n",tid,i);
522   }
523   pthread_exit(NULL);
524}
525void *stageFour(void *threadid)
526{
527   long tid = (long)threadid;
528   for(int i=0;i<file_segs;i++){
529           while(!q.pass4());
530//         printf("thread %ld: %i\n",tid,i);
531   }
532   pthread_exit(NULL);
533}
534
535#define NUM_THREADS 4
536
537int
538main(int argc, char * argv[]) {
539        char * infilename, * outfilename;
540        FILE *tmpinfile, *tmpoutfile;
541        struct stat fileinfo;
542
543    pthread_t threads[NUM_THREADS];
544    int rc;
545    long t=0;
546    void *status;
547
548        if (argc < 2) {
549                printf("Usage: %s <filename> [<outputfile>]\n", argv[0]);
550                exit(-1);
551        }
552
553        infilename = argv[1];
554        stat(infilename, &fileinfo);
555        file_segs = fileinfo.st_size/SEGMENT_SIZE;
556        infile = fopen(infilename, "rb");
557        if (!infile) {
558                fprintf(stderr, "Error: cannot open %s for input.\n", infilename);
559                exit(-1);
560        }
561
562        if (argc < 3) outfile = stdout;
563        else {
564                outfilename = argv[2];
565                outfile = fopen(outfilename, "wb");
566                if (!outfile) {
567                        fprintf(stderr, "Error: cannot open %s for writing.\n", outfilename);
568                        exit(-1);
569                }
570        }
571
572        PERF_SEC_INIT(parser_timer);
573
574        PERF_SEC_START(parser_timer);
575
576
577          rc = pthread_create(&threads[t], NULL, stageOne, (void *)t);
578          if (rc){
579                  printf("ERROR; return code from pthread_create() is %d\n", rc);
580                  exit(-1);
581          }
582          t++;
583          rc = pthread_create(&threads[t], NULL, stageTwo, (void *)t);
584          if (rc){
585                  printf("ERROR; return code from pthread_create() is %d\n", rc);
586                  exit(-1);
587          }
588
589          t++;
590          rc = pthread_create(&threads[t], NULL, stageThree, (void *)t);
591          if (rc){
592                  printf("ERROR; return code from pthread_create() is %d\n", rc);
593                  exit(-1);
594          }
595
596          t++;
597          rc = pthread_create(&threads[t], NULL, stageFour, (void *)t);
598          if (rc){
599                  printf("ERROR; return code from pthread_create() is %d\n", rc);
600                  exit(-1);
601          }
602
603          for(t=0; t<NUM_THREADS; t++) {
604             rc = pthread_join(threads[t], &status);
605             if (rc) {
606                printf("ERROR; return code from pthread_join() is %d\n", rc);
607                exit(-1);
608             }
609//           printf("Main: completed join with thread %ld having a status of %ld\n",t,(long)status);
610          }
611
612
613        PERF_SEC_END(parser_timer, fileinfo.st_size/SEGMENT_SIZE*SEGMENT_SIZE);
614
615        PERF_SEC_DUMP(parser_timer);
616
617        PERF_SEC_DESTROY(parser_timer);
618
619//      printf("count = %i\n", q.count);
620
621        pthread_exit(NULL);
622
623        fclose(infile);
624        fclose(outfile);
625        return(0);
626}
Note: See TracBrowser for help on using the repository browser.