diff --git a/.env_template b/.env_template index 249cde5..c658d01 100644 --- a/.env_template +++ b/.env_template @@ -8,3 +8,4 @@ CRAWLER_FORK_DIGEST="0x4a26c58b" CRAWLER_GOSSIP_TOPIC="beacon_block" CRAWLER_SUBNET="all" CRAWLER_PERSIST_CONNEVENTS="false" +IP2LOCATION_TOKEN="" diff --git a/docker-compose.yaml b/docker-compose.yaml index 80ceefa..f05e510 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -55,6 +55,7 @@ services: build: context: . dockerfile: Dockerfile + env_file: .env command: | eth2 --log-level=${CRAWLER_LOG_LEVEL} @@ -72,3 +73,5 @@ services: ports: - "${CRAWLER_PORT}:9020" - "127.0.0.1:${CRAWLER_METRICS_PORT}:9080" + volumes: + - ./pkg/utils/apis/database:/app/database diff --git a/go.mod b/go.mod index 0e6d588..f164e83 100644 --- a/go.mod +++ b/go.mod @@ -56,6 +56,8 @@ require ( github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d // indirect github.com/holiman/uint256 v1.2.0 // indirect github.com/huin/goupnp v1.0.2 // indirect + github.com/ip2location/ip2location-go/v9 v9.6.1 // indirect + github.com/ip2location/ip2proxy-go/v4 v4.0.1 // indirect github.com/ipfs/go-cid v0.0.7 // indirect github.com/ipfs/go-datastore v0.5.0 // indirect github.com/ipfs/go-ipfs-util v0.0.2 // indirect @@ -159,6 +161,7 @@ require ( google.golang.org/protobuf v1.27.1 // indirect gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect + lukechampine.com/uint128 v1.2.0 // indirect ) replace github.com/libp2p/go-libp2p-pubsub v0.5.5 => ./go-libp2p-pubsub diff --git a/go.sum b/go.sum index 0d4bffe..31af7ee 100644 --- a/go.sum +++ b/go.sum @@ -427,6 +427,8 @@ github.com/influxdata/promql/v2 v2.12.0/go.mod h1:fxOPu+DY0bqCTCECchSRtWfc+0X19y github.com/influxdata/roaring v0.4.13-0.20180809181101-fc520f41fab6/go.mod h1:bSgUQ7q5ZLSO+bKBGqJiCBGAl+9DxyW63zLTujjUlOE= github.com/influxdata/tdigest v0.0.0-20181121200506-bf2b5ad3c0a9/go.mod h1:Js0mqiSBE6Ffsg94weZZ2c+v/ciT8QRHFOap7EKDrR0= github.com/influxdata/usage-client v0.0.0-20160829180054-6d3895376368/go.mod h1:Wbbw6tYNvwa5dlB6304Sd+82Z3f7PmVZHVKU637d4po= +github.com/ip2location/ip2proxy-go/v4 v4.0.1 h1:n63WK4EYsXqt5hXHvHABknRYZEnVFqF/KX3xx84Zw8I= +github.com/ip2location/ip2proxy-go/v4 v4.0.1/go.mod h1:knSLTGvow2tCTxGuZNACMiqRW7h9u/F7KFrPa8HBJ8U= github.com/ipfs/go-cid v0.0.1/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= github.com/ipfs/go-cid v0.0.2/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= github.com/ipfs/go-cid v0.0.3/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= @@ -1813,6 +1815,8 @@ honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= honnef.co/go/tools v0.1.3/go.mod h1:NgwopIslSNH47DimFoV78dnkksY2EFtX0ajyb3K/las= +lukechampine.com/uint128 v1.2.0 h1:mBi/5l91vocEN8otkC5bDLhi2KdCticRiwbdB0O+rjI= +lukechampine.com/uint128 v1.2.0/go.mod h1:c4eWIwlEGaxC/+H1VguhU4PHXNWDCDMUlWdIWl2j1gk= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= diff --git a/pkg/crawler/ethereum.go b/pkg/crawler/ethereum.go index aed2fcf..67c180f 100644 --- a/pkg/crawler/ethereum.go +++ b/pkg/crawler/ethereum.go @@ -91,12 +91,12 @@ func NewEthereumCrawler(mainCtx *cli.Context, conf config.EthereumCrawlerConfig) return nil, err } dbClient, err := psql.NewDBClient( - ctx, - ethNode.Network(), - conf.PsqlEndpoint, - backupInterval, - psql.InitializeTables(true), - psql.WithConnectionEventsPersist(conf.PersistConnEvents), + ctx, + ethNode.Network(), + conf.PsqlEndpoint, + backupInterval, + psql.InitializeTables(true), + psql.WithConnectionEventsPersist(conf.PersistConnEvents), ) if err != nil { cancel() diff --git a/pkg/db/models/ip.go b/pkg/db/models/ip.go index 8ac5a75..8374d04 100644 --- a/pkg/db/models/ip.go +++ b/pkg/db/models/ip.go @@ -1,6 +1,8 @@ package models -import "time" +import ( + "time" +) const ( IpInfoTTL = 30 * 24 * time.Hour // 30 days @@ -35,10 +37,9 @@ func (m *IpApiMsg) IsEmpty() bool { } type ApiResp struct { - IpInfo IpInfo - DelayTime time.Duration - AttemptsLeft int - Err error + IpInfo IpInfo + DelayTime time.Duration + Err error } type IpInfo struct { diff --git a/pkg/utils/apis/ip_api.go b/pkg/utils/apis/ip_api.go index 531f10a..307f69a 100644 --- a/pkg/utils/apis/ip_api.go +++ b/pkg/utils/apis/ip_api.go @@ -1,32 +1,41 @@ package apis import ( + "archive/zip" "context" - "encoding/json" + "errors" "fmt" + "io" "io/ioutil" + "math" + "mime" "net/http" + "os" + "path/filepath" + "regexp" "strconv" "strings" "sync" - "sync/atomic" "time" + "github.com/ip2location/ip2proxy-go/v4" "github.com/migalabs/armiarma/pkg/db/models" - "github.com/pkg/errors" log "github.com/sirupsen/logrus" ) +var ( + ErrorQueueFull = errors.New("queue is full") + ErrorQueueEmpty = errors.New("queue is emtpy") +) + const ( defaultIpTTL = 30 * 24 * time.Hour // 30 days ipChanBuffSize = 45 // number of ips that can be buffered unto the channel ipBuffSize = 8192 // number of ip queries that can be queued in the ipQueue - ipApiEndpoint = "http://ip-api.com/json/{__ip__}?fields=status,continent,continentCode,country,countryCode,region,regionName,city,zip,lat,lon,isp,org,as,asname,mobile,proxy,hosting,query" + inApiEndpoint = "https://www.ip2location.com/download/?token=%s&file=%s" minIterTime = 100 * time.Millisecond ) -var TooManyRequestError error = fmt.Errorf("error HTTP 429") - // DB Interface for DBWriter type DBWriter interface { PersistToDB(interface{}) @@ -35,13 +44,18 @@ type DBWriter interface { GetExpiredIpInfo() ([]string, error) } +type ipQueue struct { + sync.RWMutex + queueSize int + ipList []string +} + // PEER LOCALIZER type IpLocator struct { ctx context.Context - // Request channels + // Request channel s locationRequest chan string - // dbClient dbClient DBWriter ipQueue *ipQueue @@ -61,236 +75,410 @@ func NewIpLocator(ctx context.Context, dbCli DBWriter) *IpLocator { } } -// Run the necessary routines to locate the IPs -func (c *IpLocator) Run() { - //l.SetLevel(Logrus.TraceLevel) - c.locatorRoutine() +func newIpQueue(queueSize int) *ipQueue { + return &ipQueue{ + queueSize: queueSize, + ipList: make([]string, 0, queueSize), + } } -// locatorRoutine is the main routine that will wait until an request to identify an IP arrives -// or if the routine gets canceled -func (c *IpLocator) locatorRoutine() { - log.Info("IP locator routine started") - // ip queue reading routine - go func() { - ticker := time.NewTicker(minIterTime) - for { - ip, err := c.ipQueue.readItem() - if err == nil { - // put the request in the Queue - c.locationRequest <- ip - } - select { - case <-ticker.C: - ticker.Reset(minIterTime) +// ----------------------------------------------------------- // +// ------------------ DB UPDATE UTILITIES -------------------- // +// ----------------------------------------------------------- // - case <-c.ctx.Done(): - return - } +const ( + DatabaseDir = "./database/" + IP2LocationToken = "IP2LOCATION_TOKEN" + DBDownloadApiEndpoint = "https://www.ip2location.com/download/?token=%s&file=%s" + UpdateThreshold = 24 * time.Hour + IPDbName = "PX11LITEBIN" + UncompressedFileName = "IP2PROXY-LITE-PX11.BIN" + TimestampFormat = "20060102-150405" +) + +func unzip(zipFile, destDir string) error { + r, err := zip.OpenReader(zipFile) + if err != nil { + return err + } + defer r.Close() + + for _, f := range r.File { + fpath := filepath.Join(destDir, f.Name) + if f.FileInfo().IsDir() { + os.MkdirAll(fpath, os.ModePerm) + continue } - }() - // ip locating routien - go func() { - var nextDelayRequest time.Duration - for { - select { - // New request to identify an IP - case reqIp := <-c.locationRequest: - log.Trace("new request has been received for ip:", reqIp) - reqLoop: - for { - // since it didn't exist or did expire, request the ip - // new API call needs to be done - log.Tracef(" making API call for %s", reqIp) - atomic.AddInt32(c.apiCalls, 1) - respC := c.locateIp(reqIp) - select { - case apiResp := <-respC: - nextDelayRequest = apiResp.DelayTime - log.WithFields(log.Fields{ - "delay": nextDelayRequest, - "attempts left": apiResp.AttemptsLeft, - }).Debug("got response from IP-API request ") - // check if there is an error - switch apiResp.Err { - case TooManyRequestError: - // if the error reports that we tried too many calls on the API, sleep given time and try again - log.Debug("call ", reqIp, " -> error received: ", apiResp.Err.Error(), "\nwaiting ", nextDelayRequest+(5*time.Second)) - ticker := time.NewTicker(nextDelayRequest + (5 * time.Second)) - select { - case <-ticker.C: - continue - case <-c.ctx.Done(): - log.Info("context closure has been detecting, closing IpApi caller") - return - } - case nil: - // if the error is different from TooManyRequestError break loop and store the request - log.Debugf("call %s-> api req success", reqIp) - // Upsert the IP into the db - c.dbClient.PersistToDB(apiResp.IpInfo) - break reqLoop - - default: - log.Debug("call ", reqIp, " -> diff error received: ", apiResp.Err.Error()) - break reqLoop - - } - - case <-c.ctx.Done(): - log.Info("context closure has been detecting, closing IpApi caller") - return - } - } - // check if there is any waiting time that we have to respect before next connection - if nextDelayRequest != time.Duration(0) { - log.Debug("number of allowed requests has been exceed, waiting ", nextDelayRequest+(2*time.Second)) - // set req delay to true, noone can make requests - ticker := time.NewTicker(nextDelayRequest + (2 * time.Second)) - select { - case <-ticker.C: - continue - case <-c.ctx.Done(): - log.Info("context closure has been detecting, closing IpApi caller") - return - } - } + outFile, err := os.OpenFile(fpath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, f.Mode()) + if err != nil { + return err + } - // the context has been deleted, end go routine - case <-c.ctx.Done(): - // close the channels - close(c.locationRequest) - return - } + rc, err := f.Open() + if err != nil { + outFile.Close() + return err } - }() + + _, err = io.Copy(outFile, rc) + outFile.Close() + rc.Close() + if err != nil { + return err + } + } + return nil } -// LocateIP is an externa request that any module could do to identify an IP -func (c *IpLocator) LocateIP(ip string) { - // check first if IP is already in queue (to queue same ip) - if c.ipQueue.ipExists(ip) { - return +func downloadAndSaveZippedDB() error { + dbToken := os.Getenv(IP2LocationToken) + if dbToken == "" { + return errors.New("IP2LOCATION_TOKEN environment variable not set") } - // Check if the IP is already in the cache - exists, expired, err := c.dbClient.CheckIpRecords(ip) + url := fmt.Sprintf(DBDownloadApiEndpoint, dbToken, IPDbName) + resp, err := http.Get(url) if err != nil { - log.Error("unable to check if IP already exists -", err.Error()) // Should it be a Panic? + return err } - // if exists and it didn't expired, don't do anything - if exists && !expired { - return + defer resp.Body.Close() + + contentDisposition := resp.Header.Get("Content-Disposition") + _, params, err := mime.ParseMediaType(contentDisposition) + if err != nil { + return err } - // since it didn't exist or it is expired, locate it again - ticker := time.NewTicker(1 * time.Second) - // wait 1 sec because is the normal time to wait untill we can start querying again - for { - err := c.ipQueue.addItem(ip) - if err == nil { - break + filename := params["filename"] + if filename == "" { + filename = "PX11LITEBIN.zip" + } + + file, err := os.Create(filename) + if err != nil { + return err + } + defer file.Close() + + fmt.Printf("Starting download of IP2Proxy DB to %s\n", filename) + _, err = io.Copy(file, resp.Body) + if err != nil { + fmt.Printf("Error while downloading IP2Proxy DB to %s\n", filename) + return err + } + fmt.Printf("Download completed for IP2Proxy DB to %s\n", filename) + + return nil +} + +// checks if the database needs to be updated +func needsUpdate() bool { + fmt.Println("Ip2Location DB: checking time since last update...") + files, err := ioutil.ReadDir(DatabaseDir) + if err != nil { + log.Fatal(err) + } + + latest := time.Time{} + for _, f := range files { + if strings.HasPrefix(f.Name(), "IP2PROXY-LITE-PX11") && strings.HasSuffix(f.Name(), ".BIN") { + timestampPart := strings.TrimSuffix(f.Name(), ".BIN") + timestampPart = strings.TrimPrefix(timestampPart, "IP2PROXY-LITE-PX11") + fileTime, err := time.Parse(TimestampFormat, timestampPart) + if err == nil && fileTime.After(latest) { + latest = fileTime + } } - <-ticker.C - ticker.Reset(1 * time.Second) - log.Debug("waiting to alocate a new IP request") } - ticker.Stop() + fmt.Println("Finished checking directory...") + + return latest.IsZero() || time.Since(latest) > UpdateThreshold } -func (c *IpLocator) Close() { - log.Info("closing IP-API service") - // close the context for ending up the routine +// finds the latest database file in the directory to see if it's older than 24 hours +func findLatestDbFile() string { + files, err := ioutil.ReadDir(DatabaseDir) + if err != nil { + log.Fatal(err) + } + + latestFile := "" + latest := time.Time{} + for _, f := range files { + if strings.HasPrefix(f.Name(), UncompressedFileName) && strings.HasSuffix(f.Name(), ".BIN") { + nameParts := strings.Split(f.Name(), "-") + if len(nameParts) >= 2 { + timestampPart := nameParts[len(nameParts)-1] + timestampPart = strings.TrimSuffix(timestampPart, ".BIN") + fileTime, err := time.Parse("20060102-150405", timestampPart) + if err == nil && fileTime.After(latest) { + latest = fileTime + latestFile = f.Name() + } + } + } + } + return latestFile } -func (c *IpLocator) locateIp(ip string) chan models.ApiResp { - respC := make(chan models.ApiResp) - go callIpApi(ip, respC) - return respC +func cleanupOldDatabases() error { + files, err := ioutil.ReadDir(DatabaseDir) + if err != nil { + return err + } + + if len(files) <= 1 { + return nil + } + + timestampToFile := make(map[time.Time]string) + var latestTime time.Time + + for _, file := range files { + if strings.HasPrefix(file.Name(), "IP2PROXY-LITE-PX11-") && strings.HasSuffix(file.Name(), ".BIN") { + nameParts := strings.Split(file.Name(), "-") + if len(nameParts) >= 3 { + timestampPart := strings.TrimSuffix(nameParts[len(nameParts)-1], ".BIN") + fileTime, err := time.Parse("20060102150405", timestampPart) + if err != nil { + log.Printf("Failed to parse time from filename '%s': %s\n", file.Name(), err) + continue + } + + timestampToFile[fileTime] = file.Name() + if fileTime.After(latestTime) { + latestTime = fileTime + } + } + } + } + + for fileTime, fileName := range timestampToFile { + if fileTime.Before(latestTime) { + err := os.Remove(filepath.Join(DatabaseDir, fileName)) + if err != nil { + log.Printf("Failed to remove old database file: %s\n", fileName) + } else { + log.Printf("Removed old database file: %s\n", fileName) + } + } + } + + return nil } -// get location country and City from the multiaddress of the peer on the peerstore -func callIpApi(ip string, respC chan models.ApiResp) { - var apiResponse models.ApiResp - apiResponse.IpInfo, apiResponse.DelayTime, apiResponse.AttemptsLeft, apiResponse.Err = CallIpApi(ip) - respC <- apiResponse - // defer ^ +func cleanupFolder() error { + targetFiles := []string{"LICENSE", "README", ".zip"} + + files, err := ioutil.ReadDir(DatabaseDir) + if err != nil { + return err + } + + for _, file := range files { + shouldRemove := false + for _, target := range targetFiles { + if strings.Contains(file.Name(), target) { + if file.Name() == "IP2PROXY-LITE-PX11.BIN" || strings.Contains(file.Name(), "IP2PROXY") { + shouldRemove = false + } else { + shouldRemove = true + } + break + } + } + + if shouldRemove { + err := os.Remove(filepath.Join(DatabaseDir, file.Name())) + if err != nil { + log.Printf("Failed to remove file: %s\n", file.Name()) + } else { + log.Printf("Removed file: %s\n", file.Name()) + } + } + } + + return nil } -func CallIpApi(ip string) (ipInfo models.IpInfo, delay time.Duration, attemptsLeft int, err error) { +func isNumeric(s string) bool { + f, err := strconv.ParseFloat(s, 64) + if err != nil { + return false + } + return !math.IsNaN(f) && !math.IsInf(f, 0) +} - url := strings.Replace(ipApiEndpoint, "{__ip__}", ip, 1) +func verifyDbFile() (bool, error) { + files, err := ioutil.ReadDir(DatabaseDir) + if err != nil { + return false, err + } + if len(files) == 0 { + return false, errors.New("No database file found") + } - // Make the IP-APi request - resp, err := http.Get(url) + // Check if "PX11LITEBIN.zip" file exists + found := false + for _, file := range files { + if file.Name() == "PX11LITEBIN.zip" { + found = true + break + } + } + + if !found { + return false, errors.New("File PX11LITEBIN.zip not found") + } + + // unzip file and check validity of db + err = unzip("PX11LITEBIN.zip", DatabaseDir) if err != nil { - err = errors.Wrap(err, "unable to locate IP"+ip) - return + return false, err } - timeLeft, _ := strconv.Atoi(resp.Header["X-Ttl"][0]) - // check if the error that we are receiving means that we exeeded the request limit - if resp.StatusCode == 429 { - log.Debugf("limit of requests per minute has been exeeded, wait for next call %s secs", resp.Header["X-Ttl"][0]) - err = TooManyRequestError - delay = time.Duration(timeLeft) * time.Second - return + dbPath := filepath.Join(DatabaseDir, "IP2PROXY-LITE-PX11.BIN") // name of the file after unzipping + db, err := ip2proxy.OpenDB(dbPath) + version := db.DatabaseVersion() + if version == "" || !isNumeric(version) { + return false, errors.New("Invalid database version") } + return true, nil +} - // Check the attempts left that we have to call the api - attemptsLeft, _ = strconv.Atoi(resp.Header["X-Rl"][0]) - if attemptsLeft <= 0 { - // if there are no more attempts left against the api, check how much time do we have to wait - // until we can call it again - // set the delayTime that we return to the given seconds to wait - delay = time.Duration(timeLeft) * time.Second +func renameDbFile() error { + files, err := ioutil.ReadDir(DatabaseDir) + if err != nil { + return err + } + if len(files) == 0 { + return errors.New("No database file found") + } + timeStamp := time.Now().Format(TimestampFormat) + for _, file := range files { + if file.Name() == "IP2PROXY-LITE-PX11.BIN" { + err := os.Rename(filepath.Join(DatabaseDir, file.Name()), filepath.Join(DatabaseDir, "IP2PROXY-LITE-PX11-"+timeStamp+".BIN")) + if err != nil { + return err + } + } } + return nil +} - // check if the response was success or not - defer resp.Body.Close() - bodyBytes, err := ioutil.ReadAll(resp.Body) +func updateDb() error { + dbToken := os.Getenv(IP2LocationToken) + if dbToken == "" { + return errors.New("IP2LOCATION_TOKEN environment variable not set") + } + if err := downloadAndSaveZippedDB(); err != nil { + log.Printf("Failed to download DB: %v\n", err) + return err + } + valid, err := verifyDbFile() if err != nil { - err = errors.Wrap(err, "could not read response body") - return + return err + } + if !valid { + return errors.New("Invalid database file") } - var apiMsg models.IpApiMsg - // Convert response body to struct - err = json.Unmarshal(bodyBytes, &apiMsg) + err = renameDbFile() if err != nil { - err = errors.Wrap(err, "could not unmarshall response") - return + return err } - // Check if the status of the request has been succesful - if apiMsg.Status != "success" { - err = errors.New(fmt.Sprintf("status from ip different than success, resp header:\n %#v \n %+v", resp, apiMsg)) - return + + cleanupOldDatabases() // removes all the old versions of the db file and leaves the one with the latest timestamp + cleanupFolder() // cleans the folder from all the other files that are not needed (zip, readme, license) + + return nil +} + +// NEW VERSION +func getDatabaseFile() string { + latestFile := findLatestDbFile() + + if latestFile == "" || needsUpdate() { + updateDb() } + return findLatestDbFile() - ipInfo.ExpirationTime = time.Now().UTC().Add(defaultIpTTL) - ipInfo.IpApiMsg = apiMsg - return } -func newIpQueue(queueSize int) *ipQueue { - return &ipQueue{ - queueSize: queueSize, - ipList: make([]string, 0, queueSize), +// ------------------------------------------------- // + +func isIPv4(ip string) bool { + ipv4Pattern := `^(\d{1,3}\.){3}\d{1,3}$` + match, _ := regexp.MatchString(ipv4Pattern, ip) + return match +} + +func isIPv6(ip string) bool { + ipv6Pattern := `^([0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}$` + match, _ := regexp.MatchString(ipv6Pattern, ip) + return match +} + +// some fields are commented because we don't have data for them +func mapTempIpInfoToApiMsg(data ip2proxy.IP2ProxyRecord, ip string) models.IpApiMsg { + return models.IpApiMsg{ + IP: ip, + Status: "success", + // Continent: "", + // ContinentCode: "", + Country: data.CountryLong, + CountryCode: data.CountryShort, + Region: data.Region, + // RegionName: "", + City: data.City, + // Zip: "", + // Lat: "", + // Lon: "", + Isp: data.Isp, + // Org: "", + // As: "", + // AsName: "", + Mobile: data.UsageType == "MOB", + Proxy: data.ProxyType != "", + // Hosting: false, } } -var ( - ErrorQueueFull = errors.New("queue is full") - ErrorQueueEmpty = errors.New("queue is emtpy") -) +// ------------------------------------------------- // -type ipQueue struct { - sync.RWMutex - queueSize int - ipList []string +func (c *IpLocator) LocateIP(ip string) { + // check first if IP is already in queue (to queue same ip) + if c.ipQueue.ipExists(ip) { + return + } + + // Check if the IP is already in the cache + exists, expired, err := c.dbClient.CheckIpRecords(ip) + if err != nil { + log.Error("unable to check if IP already exists -", err.Error()) // Should it be a Panic? + } + // if exists and it didn't expired, don't do anything + if exists && !expired { + return + } + + // since it didn't exist or it is expired, locate it again + ticker := time.NewTicker(1 * time.Second) + // wait 1 sec because is the normal time to wait untill we can start querying again + for { + err := c.ipQueue.addItem(ip) + if err == nil { + break + } + <-ticker.C + ticker.Reset(1 * time.Second) + log.Debug("waiting to alocate a new IP request") + } + ticker.Stop() } +// add an item to the IP queue func (q *ipQueue) addItem(newItem string) error { q.Lock() defer q.Unlock() @@ -311,6 +499,7 @@ func (q *ipQueue) addItem(newItem string) error { return nil } +// reads items from the IP queue func (q *ipQueue) readItem() (string, error) { q.Lock() defer q.Unlock() @@ -328,6 +517,111 @@ func (q *ipQueue) readItem() (string, error) { return item, nil } +// this function replaces the API call in the old version of the script +func locate(ip string) (ip2proxy.IP2ProxyRecord, error) { + if !isIPv4(ip) && !isIPv6(ip) { + return ip2proxy.IP2ProxyRecord{}, fmt.Errorf("invalid IP address") + } + + dbFile := getDatabaseFile() + db, err := ip2proxy.OpenDB(DatabaseDir + dbFile) + if err != nil { + return ip2proxy.IP2ProxyRecord{}, err + } + defer db.Close() + + results, err := db.GetAll(ip) + if err != nil { + return ip2proxy.IP2ProxyRecord{}, err + } + + return results, err +} + +func (c *IpLocator) locatorRoutine() { + go func() { + ticker := time.NewTicker(minIterTime) + for { + ip, err := c.ipQueue.readItem() + if err == nil { + c.locationRequest <- ip + } + select { + case <-ticker.C: + ticker.Reset(minIterTime) + + case <-c.ctx.Done(): + return + } + } + }() + + go func() { + for { + select { + case ip := <-c.locationRequest: + respC := c.locateIp(ip) + resp := <-respC + if resp.Err != nil { + log.Error("error while locating IP -", resp.Err.Error()) + continue + } + if resp.IpInfo.IsEmpty() { + log.Error("empty response from IP-API") + continue + } + c.dbClient.PersistToDB(resp.IpInfo) + case <-c.ctx.Done(): + return + } + } + }() +} + +func (c *IpLocator) locateIp(ip string) chan models.ApiResp { + respC := make(chan models.ApiResp) + go callIpApi(ip, respC) + return respC +} + +// get location country and City from the multiaddress of the peer on the peerstore +func callIpApi(ip string, respC chan models.ApiResp) { + var apiResponse models.ApiResp + apiResponse.IpInfo, apiResponse.DelayTime, apiResponse.Err = CallIpApi(ip) + respC <- apiResponse + // defer ^ +} + +func CallIpApi(ip string) (ipInfo models.IpInfo, delay time.Duration, err error) { + + var tempInfo ip2proxy.IP2ProxyRecord + tempInfo, err = locate(ip) + if err != nil { + return + } + + var apiMsg models.IpApiMsg + apiMsg = mapTempIpInfoToApiMsg(tempInfo, ip) + + ipInfo.ExpirationTime = time.Now().UTC().Add(defaultIpTTL) + ipInfo.IpApiMsg = apiMsg + return +} + +// ------------------------------------------------- // + +func (c *IpLocator) Run() { + //l.SetLevel(Logrus.TraceLevel) + c.locatorRoutine() +} + +func (c *IpLocator) Close() { + log.Info("closing IP-API service") + // close the context for ending up the routine + c.ctx.Done() + +} + func (q *ipQueue) ipExists(target string) bool { for _, ip := range q.ipList { if ip == target {