Jonathon Klem

Skill issue or efficient? Go concurrency and network connections

Aug 27, 2024

I was helping a client on a project that involved syncing nzbs between usenet networks. There were open source solutions that did this on a smaller scale, but he wanted to have several terabytes of mirroring happening 24/7.

One issue was trying to daemonize these already available services. They all leveraged go's concurrency to great success but invariably there would be lots of hiccups after it was running for a while.

The NNTP protocol itself is not very complicated, so I figured I could take a crack at it. The challenging part was managing multiple channels to facilitate the network communication. With NNTP and most other protocols, there's a lot of overhead in negotiating the connection. So it makes sense to want to keep a connection open and use it to send multiple usenet articles since they're typically small in size.

The average communication looked like this (< = incoming, > = outgoing):

< 200 news.example.com NNTP Service Ready
> AUTHINFO USER [username]
< 381 More authentication required
> AUTHINFO PASS [password]
< 281 Authentication accepted
> POST
< 340 Send article to be posted. End with <CR-LF>.<CR-LF>
> [headers]\r\n\r\n[content]\r\n.\r\n
< 240 Article received OK

Pretty simple. To avoid multiple open connections and more authentication, you can follow up with another POST and repeat the process. Because I have been on a pretty big go kick and I really wanted to make the async communication work, I dove deep into configuring channels and trying to structure things in the proper way, however, it became difficult to debug and maintain because I was handling 2 different connections and acting as a middleman between the two. One from the 'supplier' of the actual articles and the other was the recipient.

There was complexity stacking with this because we had observed some inconsistencies with the network connections, the server responses, and even the contents of the nzb files. Add in the problem of trying to make sexy Go concurrency work properly and it was proving to be too much.

In the end, I capitulated and just used a more serial approach. I would have a set of functions that would open a connection with the supplier and the recipient and loop through a directory of nzb files. It would then check to see if the supplier had said data, if so, it would fetch from supplier and repost to the recipient and just loop through that.

I was able to boost the performance by running multiple instances of the program, so effectively still having concurrent uploads but just not managed in the same executable. This proved to be just as fast as when the concurrent application decided to run but it was sigificantly more maintainable and consistent. We no longer had to worry about chasing down obscure conditions that caused the program to hang.

package main

import (
    "encoding/xml"
    "fmt"
    "net"
    "bufio"
    "io"
    "log"
    "os"
    "path/filepath"
    "strings"
    "time"

    "golang.org/x/net/html/charset"
)

type NZB struct {
    Files []File `xml:"file"`
}

type File struct {
    Segments []Segment `xml:"segments>segment"`
    Subject   string    `xml:"subject,attr"`
    From      string    `xml:"poster,attr"`
    Newsgroups string   `xml:"groups>group"`
}

type Segment struct {
    Number    int    `xml:"number,attr"`
    Bytes     int    `xml:"bytes,attr"`
    ArticleID string `xml:",chardata"`
}

func main() {
    // Replace with your Usenet server addresses and credentials
    firstUsenetHost := "news.supplier.com:119"
    secondUsenetHost := "news.recipient.com:119"
    username := "-"
    password := "-"

    nzbDir := "./incoming/"
    completedDir := "completed"

    // Ensure completed directory exists
    if _, err := os.Stat(completedDir); os.IsNotExist(err) {
        os.Mkdir(completedDir, os.ModePerm)
    }

    firstConn, err := net.Dial("tcp", firstUsenetHost)
    if err != nil {
            log.Fatalf("Failed to connect to the first Usenet host: %v", err)
    }
    defer firstConn.Close()

    // Authenticate with the first Usenet host (if necessary)
    authenticate(firstConn, username, password)

    iterator := 0
    // Cycle through each NZB file in the directory
    err = filepath.Walk(nzbDir, func(path string, info os.FileInfo, err error) error {
        iterator++

        if err != nil {
            return nil
        }

        // Process only .nzb files
        if !info.IsDir() && strings.HasSuffix(info.Name(), ".nzb") {
            fmt.Printf("Processing file: %s\n", path)
            nzb, err := parseNZB(path)
            if err != nil {
                log.Printf("Failed to parse NZB file %s: %v\n", path, err)
                return nil
            }

            // Check if the NZB has only one message ID
            if len(nzb.Files) == 1 && len(nzb.Files[0].Segments) == 1 {
                // Connect to the first Usenet host

                // Download the first segment of the first file
                subject := nzb.Files[0].Subject
                from := nzb.Files[0].From
                newsgroups := nzb.Files[0].Newsgroups
                articleID := nzb.Files[0].Segments[0].ArticleID

                message, err := downloadMessage(firstConn, articleID)
                if err != nil {
                    log.Fatalf("Failed to download message: %v", err)
                }

                // Connect to the second Usenet host
                secondConn, err := net.Dial("tcp", secondUsenetHost)
                if err != nil {
                    log.Fatalf("Failed to connect to the second Usenet host: %v", err)
                }
                defer secondConn.Close()

                // Authenticate with the second Usenet host (if necessary)
                authenticate(secondConn, "-", "-")

                // Post the message to the second Usenet host
                err = postMessage(secondConn, subject, articleID, newsgroups, from, message)
                if err != nil {
                    log.Printf("Failed to post message: %v", err)

                    completedPath := filepath.Join("failed", info.Name())
                    err = os.Rename(path, completedPath)
                    if err != nil {
                            log.Printf("Failed to move file %s to completed directory: %v\n", path, err)
                    } else {
                            fmt.Printf("Moved file %s to %s\n", path, completedPath)
                    }
                    return nil
                }

                completedPath := filepath.Join(completedDir, info.Name())
                err = os.Rename(path, completedPath)
                if err != nil {
                        log.Printf("Failed to move file %s to completed directory: %v\n", path, err)
                } else {
                        fmt.Printf("Moved file %s to %s\n", path, completedPath)
                }
             } else {
                completedPath := filepath.Join("multimessage", info.Name())
                err = os.Rename(path, completedPath)
                if err != nil {
                            log.Printf("Failed to move file %s to completed directory: %v\n", path, err)
                } else {
                            fmt.Printf("Moved file %s to %s\n", path, completedPath)
                }
                log.Printf("File %s does not have exactly one message ID, skipping.\n", path)
             }
        }
        return nil
    })

    if err != nil {
        log.Fatalf("Error processing files: %v\n", err)
    }
    fmt.Println("All files processed.")
}

// authenticate sends the authentication commands to the Usenet server
func authenticate(conn net.Conn, username, password string) {
    reader := bufio.NewReader(conn)

    fmt.Fprintf(conn, "AUTHINFO USER %s\r\n", username)
    _, _ = reader.ReadString('\n')

    fmt.Fprintf(conn, "AUTHINFO PASS %s\r\n", password)
    _, _ = reader.ReadString('\n')
}

// parseNZB parses the NZB file and returns the structure
func parseNZB(filePath string) (*NZB, error) {
    file, err := os.Open(filePath)
    if err != nil {
        return nil, err
    }
    defer file.Close()

    var nzb NZB
    decoder := xml.NewDecoder(file)
    decoder.CharsetReader = charset.NewReaderLabel

    if err := decoder.Decode(&nzb); err != nil {
        return nil, err
    }

    return &nzb, nil
}

// downloadMessage downloads a message from the Usenet server using the ARTICLE command
func downloadMessage(conn net.Conn, articleID string) (string, error) {
    reader := bufio.NewReader(conn)
    fmt.Fprintf(conn, "ARTICLE <%s>\r\n", articleID)

    fmt.Println("Sent ARTICLE")

    var message strings.Builder
    for {
        line, err := reader.ReadString('\n')
        if err != nil && err != io.EOF {
            return "", err
        }
        if strings.HasPrefix(line, "430") {
                return "", err
        }
        if line == ".\r\n" {
            break
        }
        message.WriteString(line)
    }

    return message.String(), nil
}

// postMessage posts the downloaded message to the second Usenet server using the POST command
func postMessage(conn net.Conn, subject string, messageID, newsgroups, from, message string) error {
    reader := bufio.NewReader(conn)
    _, _ = reader.ReadString('\n')      // try to read first

    fmt.Fprintf(conn, "POST\r\n")
    response, _ := reader.ReadString('\n')
    if !strings.HasPrefix(response, "340") {
        return fmt.Errorf("failed to initiate POST command: %s", response)
    }

    headers := fmt.Sprintf(
        "From: %s\r\nSubject: %s\r\nNewsgroups: %s\r\nMessage-ID: <%s>\r\nDate: %s\r\nX-Newsreader: Go Usenet Client\r\n\r\n",
        from, subject, newsgroups, messageID, time.Now().Format(time.RFC1123Z),
    )

    // Combine headers and body
    fullMessage := headers + message

    fmt.Fprintf(conn, "%s\r\n.\r\n", fullMessage)
    response, _ = reader.ReadString('\n')
    if !strings.HasPrefix(response, "240") {
        return fmt.Errorf("failed to post message: %s", response)
    }

    fmt.Println("Received response: ", response)

    return nil
}
Go Back