I am trying to read a big file line by line, inspect and cleanse each row and write to either of two output files depending on the outcome. I am using channels for this. I am still quite new to this language so trying to build stuff by looking at examples.
I have kept a global count variable which I am printing at each 10k lines read to keep track of the progress.
package main
import (
"fmt"
"bufio"
"time"
"os"
"log"
"sync"
"golang.org/x/net/publicsuffix"
)
func init() {
// Change the device for logging to stdout.
log.SetOutput(os.Stdout)
}
var count int
// read the file line by line
// push each line (domain name) onto a channel input
// write another function which reads from the input channel
// calls the func EffectiveTLDPlusOne and saves the results
// onto channel result if successful or rejected if unsuccessful
// write another function which reads from results channel and writes to cleansed file
// write another function which reads from rejected channel and writes to rejected file
func readInputFile(fileName string, domains chan string) {
file, err := os.Open(fileName)
if err != nil {
log.Fatal(err)
}
defer file.Close()
scanner := bufio.NewScanner(file)
for scanner.Scan() {
// fmt.Println(scanner.Text())
count++
domains <- scanner.Text()
}
if err := scanner.Err(); err != nil {
log.Fatal(err)
}
}
func cleansingRoutine(domains <-chan string, wg *sync.WaitGroup, results chan string, rejected chan string) {
defer wg.Done()
for domain := range domains {
if(count % 10000 == 0){
fmt.Println("count", count)
}
// get the domain name
// if valid, push to result channel the result
// if invalid, push to rejected channel the domain name as input
domainName, err := publicsuffix.EffectiveTLDPlusOne(domain)
if err != nil {
rejected <- domain
} else {
results <- domainName
}
}
}
func main() {
fmt.Println("start time", time.Now());
concurrency := 500
domains := make(chan string, 1000)
results := make(chan string, 1000)
rejected := make(chan string, 1000)
fileName := os.Args[1]
fmt.Println("fileName", fileName)
go readInputFile(fileName, domains)
wg := new(sync.WaitGroup)
wg.Add(concurrency)
for i := 0; i < concurrency; i++ {
// parallel routine for cleansing
go cleansingRoutine(domains, wg, results, rejected)
}
go writeToCleanFile(fileName, results)
go writeToRejectedFile(fileName, rejected)
wg.Wait()
close(results)
close(rejected)
fmt.Println("end time", time.Now());
return
}
func writeToCleanFile(fileName string, results chan string) {
// write all lines to clean file
resultFileName := fileName + ".cleansed"
resultFile, err := os.OpenFile(resultFileName, os.O_APPEND|os.O_WRONLY, os.ModeAppend)
if err != nil {
log.Fatal(err)
}
defer resultFile.Close()
if err != nil {
log.Fatal(err)
}
for r := range results {
// fmt.Println("results", r)
// write to file resultFileName
_, err := resultFile.WriteString(r + "\n")
if err != nil {
log.Fatal(err)
}
}
}
func writeToRejectedFile(fileName string, rejected chan string) {
// write all lines to clean file
rejectedFileName := fileName + ".rejected"
rejectedFile, err := os.OpenFile(rejectedFileName, os.O_APPEND|os.O_WRONLY, os.ModeAppend)
if err != nil {
log.Fatal(err)
}
defer rejectedFile.Close()
if err != nil {
log.Fatal(err)
}
for r := range rejected {
// fmt.Println("rejected", r)
// write to file rejectedFileName
_, err := rejectedFile.WriteString(r + "\n")
if err != nil {
log.Fatal(err)
}
}
}
writeToCleanFile
andwriteToRejectedFile
are basically the same function--you could handle different arriving request inselect
. That aside: you don't really need waiting group here, you already know how many files are going to be written: the usual pattern is to just count the responses. \$\endgroup\$ – wvxvw Nov 1 '16 at 17:47